"""Drena la cola JSONL de eventos de la flota desde un cursor persistente. El watcher embebido en fleetview escribe transiciones de estado de la flota a una cola JSONL append-only. El orquestador-Claude se despierta periodicamente y consume los eventos NUEVOS desde la ultima vez (sin reprocesar los ya vistos) agrupados por clasificacion para decidir. """ import json import os def drain_fleet_events( events_path: str | None = None, cursor_path: str | None = None, advance: bool = True, ) -> dict: """Lee los eventos nuevos de la cola de la flota desde un cursor persistente. El cursor es el numero de lineas ya procesadas (entero en texto plano). Se leen las lineas desde la posicion del cursor hasta el final, se parsean como JSON (saltando lineas en blanco o invalidas), se agrupan por el campo `to` y se aisla la lista de eventos urgentes. Args: events_path: ruta a la cola JSONL. Default: ~/.claude/fleet/events.jsonl. cursor_path: ruta al archivo de cursor. Default: ~/.claude/fleet/cursor. advance: si True, escribe el nuevo cursor (= total de lineas) para no reprocesar. Si False, no mueve el cursor (peek/inspeccion). Returns: dict con: - total_new: numero de eventos nuevos validos parseados. - events: lista de los eventos nuevos parseados, en orden. - by_classification: dict que agrupa los eventos por su campo `to`. - urgent: lista de eventos nuevos con urgent == True. - cursor: nueva posicion del cursor (total de lineas de la cola). - reset: presente y True solo si el cursor previo era mayor que el numero de lineas de la cola (cola truncada/rotada): se reinicio desde 0. """ home = os.path.expanduser("~") if events_path is None: events_path = os.path.join(home, ".claude", "fleet", "events.jsonl") if cursor_path is None: cursor_path = os.path.join(home, ".claude", "fleet", "cursor") # Cola ausente -> drain vacio sin crash. if not os.path.exists(events_path): return { "total_new": 0, "events": [], "by_classification": {}, "urgent": [], "cursor": 0, } # Leer el cursor previo (0 si no existe o es ilegible). prev_cursor = 0 if os.path.exists(cursor_path): try: with open(cursor_path, "r", encoding="utf-8") as fh: prev_cursor = int(fh.read().strip() or "0") except (ValueError, OSError): prev_cursor = 0 if prev_cursor < 0: prev_cursor = 0 # Leer todas las lineas de la cola. with open(events_path, "r", encoding="utf-8") as fh: lines = fh.readlines() total_lines = len(lines) # Cursor mayor que el numero de lineas (cola truncada/rotada) -> reinicia. reset = False if prev_cursor > total_lines: prev_cursor = 0 reset = True new_lines = lines[prev_cursor:] events: list[dict] = [] by_classification: dict[str, list[dict]] = {} urgent: list[dict] = [] for raw in new_lines: stripped = raw.strip() if not stripped: # Linea en blanco: saltar sin romper el drain. continue try: event = json.loads(stripped) except (json.JSONDecodeError, ValueError): # JSON invalido: saltar sin romper el drain. continue if not isinstance(event, dict): # No es un objeto JSON: saltar. continue events.append(event) classification = event.get("to") if classification is not None: by_classification.setdefault(classification, []).append(event) if event.get("urgent") is True: urgent.append(event) # Avanzar el cursor al total de lineas si advance=True. if advance: try: os.makedirs(os.path.dirname(cursor_path) or ".", exist_ok=True) with open(cursor_path, "w", encoding="utf-8") as fh: fh.write(str(total_lines)) except OSError: # No se pudo persistir el cursor: el drain sigue siendo valido, # pero la proxima llamada reprocesara estos eventos. pass result = { "total_new": len(events), "events": events, "by_classification": by_classification, "urgent": urgent, "cursor": total_lines, } if reset: result["reset"] = True return result