Files
fn_registry/python/functions/infra/drain_fleet_events.py
T
agent 9365def3dd feat: cerebro reactivo del meta-orquestador (flow 0012, fase 2)
Primitivas (python/functions/infra):
- drain_fleet_events: consume la cola del watcher (~/.claude/fleet/
  events.jsonl) desde un cursor, agrupa por clasificacion, marca
  urgentes. 7 tests.
- set_dod_contract: escribe el DoD-contrato fijo (dod_contract/dod_status)
  en el goal.json de un agente sin pisar el resto (escritura atomica).
  5 tests.

Skill /orquestador evolucionado (sin romper lo existente): vigila la
flota por su DoD (no por 'esta vivo'). Nueva seccion 'Consumo de la cola
de la flota': DoD-contrato obligatorio al lanzar, drenar la cola,
politicas por clasificacion (RECLAMA escala / DICE_TERMINADO verifica /
ESTANCADO nudge / MAL_LANZADO re-DoD), verificador independiente del
ejecutor (lee el report vs dod_contract), splitter con tope de fan-out,
y cadencia (drain al actuar + heartbeat).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-20 20:19:26 +02:00

130 lines
4.4 KiB
Python

"""Drena la cola JSONL de eventos de la flota desde un cursor persistente.
El watcher embebido en fleetview escribe transiciones de estado de la flota a
una cola JSONL append-only. El orquestador-Claude se despierta periodicamente y
consume los eventos NUEVOS desde la ultima vez (sin reprocesar los ya vistos)
agrupados por clasificacion para decidir.
"""
import json
import os
def drain_fleet_events(
events_path: str | None = None,
cursor_path: str | None = None,
advance: bool = True,
) -> dict:
"""Lee los eventos nuevos de la cola de la flota desde un cursor persistente.
El cursor es el numero de lineas ya procesadas (entero en texto plano).
Se leen las lineas desde la posicion del cursor hasta el final, se parsean
como JSON (saltando lineas en blanco o invalidas), se agrupan por el campo
`to` y se aisla la lista de eventos urgentes.
Args:
events_path: ruta a la cola JSONL. Default: ~/.claude/fleet/events.jsonl.
cursor_path: ruta al archivo de cursor. Default: ~/.claude/fleet/cursor.
advance: si True, escribe el nuevo cursor (= total de lineas) para no
reprocesar. Si False, no mueve el cursor (peek/inspeccion).
Returns:
dict con:
- total_new: numero de eventos nuevos validos parseados.
- events: lista de los eventos nuevos parseados, en orden.
- by_classification: dict que agrupa los eventos por su campo `to`.
- urgent: lista de eventos nuevos con urgent == True.
- cursor: nueva posicion del cursor (total de lineas de la cola).
- reset: presente y True solo si el cursor previo era mayor que el
numero de lineas de la cola (cola truncada/rotada): se reinicio
desde 0.
"""
home = os.path.expanduser("~")
if events_path is None:
events_path = os.path.join(home, ".claude", "fleet", "events.jsonl")
if cursor_path is None:
cursor_path = os.path.join(home, ".claude", "fleet", "cursor")
# Cola ausente -> drain vacio sin crash.
if not os.path.exists(events_path):
return {
"total_new": 0,
"events": [],
"by_classification": {},
"urgent": [],
"cursor": 0,
}
# Leer el cursor previo (0 si no existe o es ilegible).
prev_cursor = 0
if os.path.exists(cursor_path):
try:
with open(cursor_path, "r", encoding="utf-8") as fh:
prev_cursor = int(fh.read().strip() or "0")
except (ValueError, OSError):
prev_cursor = 0
if prev_cursor < 0:
prev_cursor = 0
# Leer todas las lineas de la cola.
with open(events_path, "r", encoding="utf-8") as fh:
lines = fh.readlines()
total_lines = len(lines)
# Cursor mayor que el numero de lineas (cola truncada/rotada) -> reinicia.
reset = False
if prev_cursor > total_lines:
prev_cursor = 0
reset = True
new_lines = lines[prev_cursor:]
events: list[dict] = []
by_classification: dict[str, list[dict]] = {}
urgent: list[dict] = []
for raw in new_lines:
stripped = raw.strip()
if not stripped:
# Linea en blanco: saltar sin romper el drain.
continue
try:
event = json.loads(stripped)
except (json.JSONDecodeError, ValueError):
# JSON invalido: saltar sin romper el drain.
continue
if not isinstance(event, dict):
# No es un objeto JSON: saltar.
continue
events.append(event)
classification = event.get("to")
if classification is not None:
by_classification.setdefault(classification, []).append(event)
if event.get("urgent") is True:
urgent.append(event)
# Avanzar el cursor al total de lineas si advance=True.
if advance:
try:
os.makedirs(os.path.dirname(cursor_path) or ".", exist_ok=True)
with open(cursor_path, "w", encoding="utf-8") as fh:
fh.write(str(total_lines))
except OSError:
# No se pudo persistir el cursor: el drain sigue siendo valido,
# pero la proxima llamada reprocesara estos eventos.
pass
result = {
"total_new": len(events),
"events": events,
"by_classification": by_classification,
"urgent": urgent,
"cursor": total_lines,
}
if reset:
result["reset"] = True
return result