diff --git a/docs/capabilities/comfyui.md b/docs/capabilities/comfyui.md index 8eac6a9a..aa86dc7f 100644 --- a/docs/capabilities/comfyui.md +++ b/docs/capabilities/comfyui.md @@ -44,6 +44,8 @@ El **API format** (dict de nodos numerados que produce `build_txt2img_workflow` | [comfyui_download_model_py_ml](../../python/functions/ml/comfyui_download_model.md) | `download_model(url, dest_subdir='checkpoints', *, comfyui_dir, filename, token, overwrite, timeout_s) -> dict` | Descarga un checkpoint/LoRA/VAE a `models//`. Soporta Civitai (token) y HuggingFace. Valida que no sea HTML de error ni `.safetensors` corrupto. Impura. | | [comfyui_interrupt_queue_py_ml](../../python/functions/ml/comfyui_interrupt_queue.md) | `interrupt_queue(server='127.0.0.1:8188') -> dict` | Corta la generación en curso (POST `/interrupt`) y lee la cola (GET `/queue`) → `{ok, interrupted, queue_running, queue_pending, error}`. Freno de mano; degrada limpio en fallo de red. Impura. | | [comfyui_batch_generate_py_ml](../../python/functions/ml/comfyui_batch_generate.md) | `batch_generate(workflow, *, seeds=None, server='127.0.0.1:8188') -> dict` | Encola N variantes (una por seed), parcheando el campo de semilla de los nodos sampler sin mutar el original → `{ok, prompt_ids, count, error}`. Re-roll en una llamada. Compone `submit_workflow`. Impura. | +| [comfyui_queue_manage_py_ml](../../python/functions/ml/comfyui_queue_manage.md) | `queue_manage(action, *, server='127.0.0.1:8188', prompt_id=None) -> dict` | API de cola completa que complementa a `interrupt_queue`: `action='status'` (GET `/queue`), `'clear'` (vacía pendientes), `'delete'` (borra un prompt, requiere `prompt_id`), `'history'` (cuenta `/history`) → `{ok, action, queue_running, queue_pending, history_count, error}`. Degrada limpio en fallo de red. Impura. | +| [comfyui_stream_progress_py_ml](../../python/functions/ml/comfyui_stream_progress.md) | `stream_progress(prompt_id, *, server='127.0.0.1:8188', client_id=None, timeout=300) -> dict` | Progreso en vivo por WebSocket `/ws` (alternativa a `wait_result`): cuenta pasos del sampler (`steps_seen`), último nodo, y detecta el fin → `{ok, completed, steps_seen, last_node, method, error}`. Para ver progreso comparte el `client_id` con el submit. Cae a polling si falta `websocket-client`. Impura. | ### Builders, validación e import — dominio `ml` (P0, issue 0064) @@ -82,6 +84,7 @@ promoción del flujo txt2img a una sola llamada. Los class_types se verificaron | [comfyui_build_facedetailer_workflow_py_ml](../../python/functions/ml/comfyui_build_facedetailer_workflow.md) | `build_facedetailer_workflow(base_workflow_or_image, ckpt_name, positive, negative='', *, bbox_model='face_yolov8m.pt', denoise=0.5, ...) -> dict` | Builder **FaceDetailer** (Impact-Pack): detecta caras con `UltralyticsDetectorProvider` (YOLO bbox) y las regenera para recuperar detalle (el pain #1 de retratos). Acepta el nombre de una imagen en `input/` (str) o un workflow base (dict): toma la imagen del `VAEDecode` y reutiliza el `CheckpointLoaderSimple`. No usa SAM (no instalado). **Pura**. | | [comfyui_build_hires_fix_workflow_py_ml](../../python/functions/ml/comfyui_build_hires_fix_workflow.md) | `build_hires_fix_workflow(ckpt_name, positive, negative='', *, first_pass=(768,768), upscale_by=1.5, denoise=0.4, steps=20, ...) -> dict` | Builder **hires fix** de 2 pasadas: genera base (KSampler) y la amplía re-difundiéndola por tiles con `UltimateSDUpscale` + Remacri (`denoise<1` = añade detalle real). Distinto de `build_upscale_workflow` (ESRGAN puro, sin re-difusión). **Pura**. | | [comfyui_txt2img_oneshot_py_pipelines](../../python/functions/pipelines/comfyui_txt2img_oneshot.md) | `txt2img_oneshot(prompt, *, ckpt='dreamshaper_8.safetensors', negative='', server, dest=None, wait_timeout, **gen) -> dict` | **Pipeline** texto → PNG en disco en una llamada: build_txt2img + submit + wait + fetch_output_image → `{ok, image_path, prompt_id, error}`. Promoción de la secuencia (issue 0087). Impuro. | +| [comfyui_build_grid_py_ml](../../python/functions/ml/comfyui_build_grid.md) | `build_grid(image_paths, *, cols=None, cell=512, out_path=None, labels=None) -> dict` | Monta un **grid / contact-sheet** PIL de N imágenes para comparar de un vistazo (p.ej. el output de `batch_generate` con varios seeds). Celdas que conservan aspect ratio, rejilla casi cuadrada por defecto, rótulos opcionales → `{ok, out_path, rows, cols, error}`. Post-proceso local de imagen (no toca el server). Impura (I/O disco, PIL). | ### Vídeo (txt2video) — dominio `ml` (tag `video-generation`) diff --git a/python/functions/ml/comfyui_build_grid.md b/python/functions/ml/comfyui_build_grid.md new file mode 100644 index 00000000..f6e7ef78 --- /dev/null +++ b/python/functions/ml/comfyui_build_grid.md @@ -0,0 +1,73 @@ +--- +name: comfyui_build_grid +kind: function +lang: py +domain: ml +version: "1.0.0" +purity: impure +signature: "def comfyui_build_grid(image_paths: list, *, cols: int | None = None, cell: int = 512, out_path: str | None = None, labels: list | None = None) -> dict" +description: "Monta un grid / contact-sheet PIL de N imagenes para comparacion visual (p.ej. el output de comfyui_batch_generate con varios seeds). Cada celda conserva el aspect ratio (thumbnail centrado sobre fondo oscuro); rejilla casi cuadrada por defecto (cols=ceil(sqrt(N))). Rotulos opcionales por celda. Usa PIL (Pillow) del venv del registry. Devuelve {ok, out_path, rows, cols, error}. Impura: lee N imagenes y escribe un PNG." +tags: [comfyui, ml, grid, montage, pil, image] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [] +params: + - name: image_paths + desc: "lista de rutas a las imagenes a montar, en orden de lectura (izq->der, arriba->abajo)." + - name: cols + desc: "numero de columnas; si None usa ceil(sqrt(N)) para una rejilla casi cuadrada." + - name: cell + desc: "lado en pixeles de cada celda cuadrada; la imagen se reduce para caber conservando proporcion (default 512)." + - name: out_path + desc: "ruta del PNG de salida; si None escribe 'comfy_grid.png' en el dir de la primera imagen." + - name: labels + desc: "rotulos opcionales, uno por imagen (mismo orden); reservan una franja bajo cada celda." +output: "dict con ok (bool), out_path (str, ruta del PNG generado), rows (int, filas), cols (int, columnas), error (str, vacio si OK)." +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/ml/comfyui_build_grid.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_build_grid import comfyui_build_grid + +imgs = [ + os.path.expanduser("~/ComfyUI/output/comfy_00001_.png"), + os.path.expanduser("~/ComfyUI/output/comfy_00002_.png"), + os.path.expanduser("~/ComfyUI/output/comfy_00003_.png"), + os.path.expanduser("~/ComfyUI/output/comfy_00004_.png"), +] +res = comfyui_build_grid(imgs, cols=2, cell=512, out_path="/tmp/seeds_grid.png", + labels=["seed 1", "seed 2", "seed 3", "seed 4"]) +# {'ok': True, 'out_path': '/tmp/seeds_grid.png', 'rows': 2, 'cols': 2, 'error': ''} +``` + +## Cuando usarla + +Tras un barrido de seeds con `comfyui_batch_generate` + `comfyui_fetch_output_image`: +en vez de abrir N PNGs uno a uno, montas un unico contact-sheet para elegir de un +vistazo la mejor variante (o comparar steps/cfg/sampler distintos). Tambien sirve +para documentar un report con una rejilla de resultados. Es post-proceso local +puro de imagen: no toca el servidor ComfyUI. + +## Gotchas + +- Si alguna ruta de `image_paths` no existe, devuelve `ok=False` con la lista de + faltantes (estricto): no monta una rejilla parcial silenciosamente. Filtra las + rutas validas antes si quieres tolerar ausencias. +- Cada imagen se reduce a `cell` px conservando proporcion (thumbnail); imagenes de + distinto tamano quedan centradas en su celda con relleno, no estiradas. +- `labels` se dibuja con la fuente por defecto de PIL (pequeña, sin TTF externo); + para rotulos grandes habria que pasar una fuente — no soportado hoy (KISS). +- Escribe el PNG en disco: si `out_path` apunta a un directorio inexistente lo crea; + si no tiene permiso devuelve `ok=False` con el error. +- N grande con `cell` alto produce un canvas enorme (rows*cols*cell^2 px): para + decenas de imagenes baja `cell` (p.ej. 256) para no agotar memoria. diff --git a/python/functions/ml/comfyui_build_grid.py b/python/functions/ml/comfyui_build_grid.py new file mode 100644 index 00000000..217ab180 --- /dev/null +++ b/python/functions/ml/comfyui_build_grid.py @@ -0,0 +1,114 @@ +"""Monta un grid / contact-sheet PIL de N imagenes para comparacion visual. + +Funcion impura: lee N imagenes de disco y escribe un PNG de salida. Usa PIL +(Pillow), presente en el venv del registry. + +El compañero natural de comfyui_batch_generate: ese encola N variantes de un +workflow (una por seed) pero no junta los resultados. Esta funcion toma las N +imagenes ya descargadas (p.ej. con comfyui_fetch_output_image) y las dispone en +una rejilla regular para compararlas de un vistazo. Cada celda conserva el aspect +ratio (thumbnail centrado sobre fondo oscuro). Opcionalmente rotula cada celda. +""" +import math +import os + + +def comfyui_build_grid( + image_paths: list, + *, + cols: int | None = None, + cell: int = 512, + out_path: str | None = None, + labels: list | None = None, +) -> dict: + """Compone una rejilla de imagenes y la guarda como PNG. + + Args: + image_paths: lista de rutas a las imagenes (PNG/JPG/...) a montar, en + orden de lectura (izquierda->derecha, arriba->abajo). + cols: numero de columnas; si None se usa ceil(sqrt(N)) para una rejilla + casi cuadrada. keyword-only. + cell: lado en pixeles de cada celda cuadrada; cada imagen se reduce para + caber dentro conservando su proporcion. keyword-only. + out_path: ruta del PNG de salida; si None se escribe "comfy_grid.png" en + el directorio de la primera imagen. keyword-only. + labels: rotulos opcionales, uno por imagen (mismo orden); si se pasan, se + reserva una franja bajo cada celda y se dibuja el texto. keyword-only. + + Returns: + dict con: + - ok (bool): True si el grid se monto y guardo. + - out_path (str): ruta del PNG generado. + - rows (int): filas de la rejilla. + - cols (int): columnas de la rejilla. + - error (str): mensaje de error; cadena vacia si todo OK. + """ + out = {"ok": False, "out_path": "", "rows": 0, "cols": 0, "error": ""} + + if not image_paths: + out["error"] = "image_paths vacio: nada que montar" + return out + + try: + from PIL import Image, ImageDraw + except ImportError: + out["error"] = "PIL (Pillow) no esta instalado en este interprete" + return out + + missing = [p for p in image_paths if not os.path.isfile(p)] + if missing: + out["error"] = f"no existen {len(missing)} rutas: {missing[:5]}" + return out + + n = len(image_paths) + cols = int(cols) if cols and cols > 0 else max(1, math.ceil(math.sqrt(n))) + rows = math.ceil(n / cols) + cell = max(16, int(cell)) + label_h = 22 if labels else 0 + bg = (24, 24, 28) + fg = (232, 232, 236) + + canvas = Image.new("RGB", (cols * cell, rows * (cell + label_h)), bg) + draw = ImageDraw.Draw(canvas) if labels else None + + try: + for i, path in enumerate(image_paths): + with Image.open(path) as src: + im = src.convert("RGB") + im.thumbnail((cell, cell)) + r, c = divmod(i, cols) + x = c * cell + (cell - im.width) // 2 + y = r * (cell + label_h) + (cell - im.height) // 2 + canvas.paste(im, (x, y)) + if draw is not None and i < len(labels): + tx = c * cell + 4 + ty = r * (cell + label_h) + cell + 3 + draw.text((tx, ty), str(labels[i]), fill=fg) + except OSError as exc: + out["error"] = f"no se pudo leer/decodificar una imagen: {exc}" + return out + + if out_path is None: + out_path = os.path.join(os.path.dirname(os.path.abspath(image_paths[0])), + "comfy_grid.png") + try: + os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True) + canvas.save(out_path) + except OSError as exc: + out["error"] = f"no se pudo escribir {out_path!r}: {exc}" + return out + + out.update(ok=True, out_path=out_path, rows=rows, cols=cols) + return out + + +if __name__ == "__main__": + import json + import sys + + paths = sys.argv[1:] + if not paths: + print("uso: comfyui_build_grid.py ...", file=sys.stderr) + sys.exit(2) + res = comfyui_build_grid(paths, out_path="/tmp/comfy_grid.png") + print(json.dumps(res, indent=2)) diff --git a/python/functions/ml/comfyui_queue_manage.md b/python/functions/ml/comfyui_queue_manage.md new file mode 100644 index 00000000..1185ebe2 --- /dev/null +++ b/python/functions/ml/comfyui_queue_manage.md @@ -0,0 +1,77 @@ +--- +name: comfyui_queue_manage +kind: function +lang: py +domain: ml +version: "1.0.0" +purity: impure +signature: "def comfyui_queue_manage(action: str, *, server: str = \"127.0.0.1:8188\", prompt_id: str | None = None) -> dict" +description: "Gestiona la cola y el historial de ComfyUI via su API HTTP. action='status' (GET /queue -> queue_running/queue_pending), 'clear' (POST /queue {\"clear\":true} -> vacia pendientes), 'delete' (POST /queue {\"delete\":[prompt_id]} -> borra un prompt, requiere prompt_id), 'history' (GET /history -> history_count). Completa lo que comfyui_interrupt_queue no cubre. Devuelve {ok, action, queue_running, queue_pending, history_count, error}. NO lanza en fallo de red: degrada a {ok:False, error}. Impura: HTTP GET/POST, solo stdlib (urllib, json)." +tags: [comfyui, ml, queue, history, control, http] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [] +params: + - name: action + desc: "operacion: 'status' (estado de la cola), 'clear' (vaciar pendientes), 'delete' (borrar un prompt; requiere prompt_id), 'history' (contar historial)." + - name: server + desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')." + - name: prompt_id + desc: "id del prompt a borrar; obligatorio solo para action='delete'." +output: "dict con ok (bool), action (str, eco), queue_running (int, prompts ejecutandose; status/clear/delete), queue_pending (int, prompts encolados; status/clear/delete), history_count (int, prompts en el historial; action='history'), error (str, vacio si OK)." +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/ml/comfyui_queue_manage.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_queue_manage import comfyui_queue_manage + +# Estado de la cola +st = comfyui_queue_manage("status") +# {'ok': True, 'action': 'status', 'queue_running': 1, 'queue_pending': 3, 'history_count': 0, 'error': ''} + +# Cuantos prompts recuerda el historial +h = comfyui_queue_manage("history") +print(h["history_count"]) + +# Vaciar los pendientes (no corta el que se ejecuta; para eso, comfyui_interrupt_queue) +comfyui_queue_manage("clear") + +# Borrar un prompt concreto de la cola de pendientes +comfyui_queue_manage("delete", prompt_id="abc123-...") +``` + +O lanzable directo: `./fn run comfyui_queue_manage status` · `./fn run comfyui_queue_manage history`. + +## Cuando usarla + +Cuando necesitas operar la cola mas alla de cortar el prompt en curso: ver de un +vistazo cuanto queda (`status`), limpiar de golpe un barrido de seeds que ya no +quieres (`clear`), quitar un prompt pesado encolado por error sin matar el que se +ejecuta (`delete`), o saber cuantas generaciones recuerda el servidor (`history`). +Es el complemento de `comfyui_interrupt_queue` (que solo corta + lee) para cubrir +las cuatro acciones restantes de `/queue` y `/history`. + +## Gotchas + +- `clear` vacia SOLO los pendientes; el prompt en ejecucion sigue. Para cortarlo + usa `comfyui_interrupt_queue` (POST /interrupt) antes del `clear`. +- `delete` requiere `prompt_id`; sin el devuelve `ok=False` con el error. El id es + el que devuelve `comfyui_submit_workflow`. Borrar un prompt que ya no esta en la + cola es inocuo (el servidor lo ignora). +- En `status`/`clear`/`delete` se rellenan `queue_running`/`queue_pending`; en + `history` se rellena `history_count` (los otros quedan en 0). Mira `action` para + saber que campos son significativos. +- En fallo de red NO lanza: devuelve `ok=False` con el mensaje en `error`. + Comprueba `ok` antes de fiarte de los conteos. +- `history_count` es el numero de entradas que el servidor mantiene en memoria, no + un acumulado historico persistente: se reinicia al reiniciar ComfyUI. diff --git a/python/functions/ml/comfyui_queue_manage.py b/python/functions/ml/comfyui_queue_manage.py new file mode 100644 index 00000000..773d5f1a --- /dev/null +++ b/python/functions/ml/comfyui_queue_manage.py @@ -0,0 +1,135 @@ +"""Gestiona la cola y el historial de un servidor ComfyUI via su API HTTP. + +Funcion impura: hace red (HTTP GET/POST). Solo stdlib (urllib, json). + +Completa lo que comfyui_interrupt_queue no cubre. interrupt_queue corta el prompt +en ejecucion; esta funcion expone las cuatro operaciones restantes de la cola: + +- "status": GET /queue -> cuantos prompts se ejecutan ahora (queue_running) y + cuantos estan encolados pendientes (queue_pending). +- "clear": POST /queue {"clear": true} -> vacia los pendientes de golpe. +- "delete": POST /queue {"delete": [prompt_id]} -> borra un prompt concreto de la + cola de pendientes (requiere prompt_id). +- "history": GET /history -> numero de prompts ya ejecutados que el servidor + recuerda (history_count). + +NO lanza excepcion en fallo de red: degrada a {ok: False, error}. +""" +import json +import urllib.error +import urllib.request + + +def comfyui_queue_manage( + action: str, + *, + server: str = "127.0.0.1:8188", + prompt_id: str | None = None, +) -> dict: + """Opera la cola/historial de ComfyUI: status, clear, delete o history. + + Args: + action: operacion a realizar. Una de: + - "status": lee el estado de la cola. + - "clear": vacia los prompts pendientes (POST /queue {"clear": true}). + - "delete": borra un prompt concreto (POST /queue {"delete": [id]}); + requiere prompt_id. + - "history": cuenta los prompts en el historial (GET /history). + server: host:port del servidor ComfyUI sin esquema (default + "127.0.0.1:8188"). keyword-only. + prompt_id: id del prompt a borrar; obligatorio solo para action="delete". + keyword-only. + + Returns: + dict con: + - ok (bool): True si la operacion se completo sin error. + - action (str): la accion solicitada (eco). + - queue_running (int): prompts ejecutandose ahora (status/clear/delete). + - queue_pending (int): prompts encolados pendientes (status/clear/delete). + - history_count (int): numero de prompts en el historial (action=history). + - error (str): mensaje de error; cadena vacia si todo OK. + """ + out = { + "ok": False, + "action": action, + "queue_running": 0, + "queue_pending": 0, + "history_count": 0, + "error": "", + } + base = f"http://{server}" + valid = {"status", "clear", "delete", "history"} + if action not in valid: + out["error"] = f"action desconocida: {action!r}; usa una de {sorted(valid)}" + return out + + def _read_queue() -> bool: + """Rellena queue_running/queue_pending desde GET /queue. True si OK.""" + try: + with urllib.request.urlopen(f"{base}/queue", timeout=10.0) as resp: + data = json.loads(resp.read()) + out["queue_running"] = len(data.get("queue_running", [])) + out["queue_pending"] = len(data.get("queue_pending", [])) + return True + except urllib.error.URLError as exc: + reason = getattr(exc, "reason", exc) + out["error"] = f"GET /queue fallo: no se pudo conectar a {base}/queue: {reason}" + except json.JSONDecodeError as exc: + out["error"] = f"GET /queue fallo: respuesta no es JSON valido: {exc}" + return False + + def _post_queue(body: dict) -> bool: + """POST /queue con cuerpo JSON. True si el servidor respondio sin error.""" + try: + payload = json.dumps(body).encode() + req = urllib.request.Request( + f"{base}/queue", + data=payload, + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=10.0): + return True + except urllib.error.URLError as exc: + reason = getattr(exc, "reason", exc) + out["error"] = f"POST /queue fallo: no se pudo conectar a {base}/queue: {reason}" + return False + + if action == "status": + out["ok"] = _read_queue() + return out + + if action == "clear": + if _post_queue({"clear": True}): + out["ok"] = _read_queue() + return out + + if action == "delete": + if not prompt_id: + out["error"] = "action='delete' requiere prompt_id" + return out + if _post_queue({"delete": [prompt_id]}): + out["ok"] = _read_queue() + return out + + # action == "history" + try: + with urllib.request.urlopen(f"{base}/history", timeout=15.0) as resp: + hist = json.loads(resp.read()) + out["history_count"] = len(hist) if isinstance(hist, dict) else 0 + out["ok"] = True + except urllib.error.URLError as exc: + reason = getattr(exc, "reason", exc) + out["error"] = f"GET /history fallo: no se pudo conectar a {base}/history: {reason}" + except json.JSONDecodeError as exc: + out["error"] = f"GET /history fallo: respuesta no es JSON valido: {exc}" + return out + + +if __name__ == "__main__": + import sys + + act = sys.argv[1] if len(sys.argv) > 1 else "status" + pid = sys.argv[2] if len(sys.argv) > 2 else None + res = comfyui_queue_manage(act, prompt_id=pid) + print(json.dumps(res, indent=2)) diff --git a/python/functions/ml/comfyui_stream_progress.md b/python/functions/ml/comfyui_stream_progress.md new file mode 100644 index 00000000..74216e30 --- /dev/null +++ b/python/functions/ml/comfyui_stream_progress.md @@ -0,0 +1,102 @@ +--- +name: comfyui_stream_progress +kind: function +lang: py +domain: ml +version: "1.1.0" +purity: impure +signature: "def comfyui_stream_progress(prompt_id: str, *, server: str = \"127.0.0.1:8188\", client_id: str | None = None, timeout: float = 300.0) -> dict" +description: "Sigue en vivo el progreso de un prompt ComfyUI por WebSocket ws:///ws?clientId= (eventos progress paso/total, executing por nodo, execution_success/execution_error) en vez de hacer polling. Alternativa en-vivo a comfyui_wait_result. Si websocket-client NO esta en el interprete que ejecuta (el venv del registry no lo trae; el de ComfyUI si), cae limpiamente a polling de /history reutilizando comfyui_wait_result y marca method='polling'. Devuelve {ok, completed, steps_seen, last_node, method, error}. Impura: WebSocket o HTTP." +tags: [comfyui, ml, progress, websocket, stream, http] +uses_functions: [comfyui_wait_result_py_ml] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [] +params: + - name: prompt_id + desc: "id devuelto por comfyui_submit_workflow, el prompt cuyo progreso seguir." + - name: server + desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')." + - name: client_id + desc: "clientId para registrar el socket; si None se genera un uuid4 hex." + - name: timeout + desc: "maximo de segundos a esperar a que el prompt complete (default 300)." +output: "dict con ok (bool), completed (bool, True si el prompt termino), steps_seen (int, mensajes 'progress' vistos por WS; 0 en fallback de polling), last_node (str, ultimo nodo en ejecucion visto), method (str, 'websocket' o 'polling'), error (str, vacio si OK)." +tested: false +tests: [] +test_file_path: "" +file_path: "python/functions/ml/comfyui_stream_progress.py" +--- + +## Ejemplo + +```python +import sys, os, uuid +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_build_txt2img_workflow import comfyui_build_txt2img_workflow +from ml.comfyui_submit_workflow import comfyui_submit_workflow +from ml.comfyui_stream_progress import comfyui_stream_progress + +cid = uuid.uuid4().hex # MISMO clientId en submit y en stream para ver progress en vivo +wf = comfyui_build_txt2img_workflow( + ckpt_name="dreamshaper_8.safetensors", + positive="an ornate brass clockwork dragon", steps=25, seed=424242) +pid = comfyui_submit_workflow(wf, client_id=cid)["prompt_id"] + +res = comfyui_stream_progress(pid, client_id=cid, timeout=300) +# {'ok': True, 'completed': True, 'steps_seen': 13, 'last_node': '9', +# 'method': 'websocket', 'error': ''} +``` + +Si NO compartes el `client_id`, el seguimiento sigue funcionando (detecta el fin por +`/history`) pero `steps_seen` sale 0: ComfyUI envia los eventos `progress` al socket +del `clientId` que encolo el prompt, no a otro. + +Para WebSocket real hay que ejecutarlo con un interprete que tenga `websocket-client` +(el venv de ComfyUI lo trae). Con el venv del registry (`./fn run`) cae a polling +automaticamente y devuelve `method='polling'`. + +## Cuando usarla + +Cuando quieres feedback en vivo de una generacion larga (hires-fix, vídeo, 3D +multi-vista) en lugar de esperar a ciegas con `comfyui_wait_result`: ver por que +nodo va el grafo (`last_node`) y cuantos pasos de sampler han pasado +(`steps_seen`). Util para barras de progreso o para detectar un cuelgue (si +`steps_seen` no avanza). Para el caso simple "solo dime cuando esta listo", +`comfyui_wait_result` (polling) basta y es mas portable. + +## Gotchas + +- `websocket-client` NO esta en el venv del registry, asi que `./fn run + comfyui_stream_progress ` cae a polling (`method='polling'`, `steps_seen=0`). + Para el WebSocket real, ejecutalo con el python de ComfyUI (que si lo trae). El + fallback es transparente: mismo dict de retorno. +- `steps_seen` cuenta mensajes `progress` del WS, no el "step N/total" exacto del + sampler; sirve como senal de avance, no como porcentaje preciso. En trabajos con + nodos cacheados (que completan en <1s) puede salir 0 con `completed=True`: no hubo + pasos que emitir, el fin se detecto por el chequeo de `/history`. +- **Carrera submit/WS (v1.1.0):** un prompt rapido o cacheado puede terminar antes de + que el WS reciba su `execution_success`. La funcion se defiende: comprueba `/history` + al entrar (si ya termino, retorna ya) y en cada ventana de recv sin eventos (detecta + el fin sin esperar al timeout). Por eso NO se cuelga 300s en trabajos veloces. +- **`client_id` debe coincidir con el del submit para ver `progress`.** ComfyUI + enruta los eventos `progress`/`executing` al socket cuyo `clientId` encolo el + prompt (`send_sync(..., sid=client_id)`). Si llamas a esta funcion con un + `client_id` distinto (o None, que genera uno nuevo) NO recibiras esos eventos de + ese prompt y `steps_seen` saldra 0 — aunque `completed` se detecta igual por el + chequeo de `/history`. Para barra de progreso real: genera un `cid`, pasalo a + `comfyui_submit_workflow(..., client_id=cid)` Y a esta funcion. +- En fallo de conexion del WS degrada al fallback de polling en vez de lanzar; si + tambien falla el polling, devuelve `ok=False` con el motivo en `error`. +- Los frames binarios del WS (previews de imagen en vivo) se ignoran; esta funcion + solo sigue el progreso, no descarga la imagen (para eso, `comfyui_fetch_output_image`). + +## Capability growth log + +- v1.1.0 (24/06/2026) — robustez ante la carrera submit/WS: pre-check de `/history` al + entrar + re-check de `/history` en cada ventana de recv sin eventos y al agotar el + timeout. Evita el cuelgue hasta timeout cuando un trabajo cacheado/rapido completa + antes de que el WS reciba `execution_success`. Smoke previo lo destapo (prompt + cacheado completaba en ~0.7s y la v1.0.0 esperaba 180s en vano). diff --git a/python/functions/ml/comfyui_stream_progress.py b/python/functions/ml/comfyui_stream_progress.py new file mode 100644 index 00000000..87b72cf0 --- /dev/null +++ b/python/functions/ml/comfyui_stream_progress.py @@ -0,0 +1,220 @@ +"""Sigue en vivo el progreso de un prompt ComfyUI por WebSocket (/ws). + +Funcion impura: red (WebSocket o, en fallback, HTTP GET en bucle). + +Alternativa en-vivo a comfyui_wait_result (que sondea /history). Aqui se escucha el +canal de eventos del servidor (ws:///ws?clientId=) y se siguen los +mensajes que ComfyUI emite mientras ejecuta: + +- type="progress" -> un paso del sampler (data: {value, max, node}). +- type="executing" -> el grafo entra en un nodo (data: {node, prompt_id}); + node=None con prompt_id propio marca el fin (senal legacy). +- type="execution_success" -> el prompt termino bien (data: {prompt_id}). +- type="execution_error" -> el prompt fallo (data: {prompt_id, ...}). + +Defensa contra carreras: un prompt con nodos cacheados completa en <1s, antes de que +el WS reciba su execution_success. Por eso esta funcion (a) comprueba /history al +entrar — si el prompt ya termino, no hay nada que seguir — y (b) revisa /history en +cada ventana de recv sin eventos, para detectar el fin aunque el WS pierda el evento, +sin esperar al timeout completo. + +Si websocket-client NO esta instalado en el interprete que ejecuta esta funcion +(el venv del registry no lo trae; el de ComfyUI si), cae limpiamente a polling de +/history reutilizando comfyui_wait_result y devuelve el mismo dict con +method="polling". +""" +import json +import time + + +def comfyui_stream_progress( + prompt_id: str, + *, + server: str = "127.0.0.1:8188", + client_id: str | None = None, + timeout: float = 300.0, +) -> dict: + """Sigue el progreso en vivo de un prompt por WebSocket; cae a polling si falta ws. + + Args: + prompt_id: id devuelto por comfyui_submit_workflow. + server: host:port del servidor ComfyUI sin esquema (default + "127.0.0.1:8188"). keyword-only. + client_id: clientId para registrar el socket en el servidor; si None se + genera un uuid4. keyword-only. + timeout: maximo de segundos a esperar a que el prompt complete. + keyword-only. + + Returns: + dict con: + - ok (bool): True si el seguimiento concluyo sin error (incluido el + fallback a polling cuando completa). + - completed (bool): True si el prompt termino (success o senal de fin). + - steps_seen (int): numero de mensajes "progress" observados por WS + (0 en el fallback de polling y en trabajos cacheados, que no emiten + pasos intermedios). + - last_node (str): id del ultimo nodo en ejecucion visto. + - method (str): "websocket" o "polling" segun la via usada. + - error (str): mensaje de error; cadena vacia si todo OK. + """ + out = { + "ok": False, + "completed": False, + "steps_seen": 0, + "last_node": "", + "method": "websocket", + "error": "", + } + + try: + from websocket import ( # type: ignore + WebSocketTimeoutException, + create_connection, + ) + except ImportError: + return _fallback_polling(out, prompt_id, server, timeout) + + import uuid + + cid = client_id or uuid.uuid4().hex + ws_url = f"ws://{server}/ws?clientId={cid}" + deadline = time.time() + timeout + + # Pre-check: el prompt pudo terminar antes de que conectemos (trabajos con nodos + # cacheados completan en <1s). Si ya esta en history, no hay nada que seguir. + done, last = _history_check(server, prompt_id) + if done: + out["completed"] = True + out["ok"] = True + if last: + out["last_node"] = last + return out + + ws = None + try: + ws = create_connection(ws_url, timeout=min(timeout, 30.0)) + except Exception as exc: # noqa: BLE001 — degradar a fallback, no romper + out["error"] = f"no se pudo abrir WS {ws_url}: {exc}" + return _fallback_polling(out, prompt_id, server, timeout) + + try: + while time.time() < deadline: + ws.settimeout(min(2.0, max(0.1, deadline - time.time()))) + try: + msg = ws.recv() + except WebSocketTimeoutException: + # Sin evento en la ventana: el fin pudo perderse (carrera con un + # trabajo rapido). Confirma por history antes de seguir esperando. + done, last = _history_check(server, prompt_id) + if done: + out["completed"] = True + out["ok"] = True + if last and not out["last_node"]: + out["last_node"] = last + break + continue + if isinstance(msg, (bytes, bytearray)): + continue # frames binarios = previews de imagen, se ignoran + try: + evt = json.loads(msg) + except (json.JSONDecodeError, TypeError): + continue + mtype = evt.get("type") + data = evt.get("data", {}) or {} + evt_pid = data.get("prompt_id") + + if mtype == "progress": + out["steps_seen"] += 1 + node = data.get("node") + if node is not None: + out["last_node"] = str(node) + elif mtype == "executing": + node = data.get("node") + if node is not None: + out["last_node"] = str(node) + elif evt_pid == prompt_id: + out["completed"] = True + out["ok"] = True + break + elif mtype == "execution_success" and evt_pid == prompt_id: + out["completed"] = True + out["ok"] = True + break + elif mtype == "execution_error" and evt_pid == prompt_id: + out["error"] = f"execution_error: {json.dumps(data)[:400]}" + break + else: + # deadline agotado sin fin por WS: ultimo check de history por si el + # evento de fin se perdio del todo. + done, last = _history_check(server, prompt_id) + if done: + out["completed"] = True + out["ok"] = True + if last and not out["last_node"]: + out["last_node"] = last + else: + out["error"] = f"timeout de {timeout}s sin fin para {prompt_id}" + finally: + try: + ws.close() + except Exception: # noqa: BLE001 + pass + return out + + +def _history_check(server: str, prompt_id: str) -> tuple: + """Consulta GET /history/{prompt_id} una vez (no bucle). + + Devuelve (done, last_node): done=True solo si el prompt completo con exito; + last_node = id del ultimo nodo de output si lo hay. Errores de red se tratan + como "aun no" (False), no lanzan. + """ + import urllib.error + import urllib.request + + url = f"http://{server}/history/{prompt_id}" + try: + with urllib.request.urlopen(url, timeout=10.0) as resp: + hist = json.loads(resp.read()) + except (urllib.error.URLError, json.JSONDecodeError, OSError, ValueError): + return (False, "") + entry = hist.get(prompt_id) if isinstance(hist, dict) else None + if not entry: + return (False, "") + status = entry.get("status", {}) + if status.get("completed") or status.get("status_str") == "success": + outs = entry.get("outputs", {}) or {} + last = str(list(outs)[-1]) if outs else "" + return (True, last) + return (False, "") + + +def _fallback_polling(out: dict, prompt_id: str, server: str, timeout: float) -> dict: + """Cae a polling de /history reutilizando comfyui_wait_result del registry.""" + out["method"] = "polling" + try: + from comfyui_wait_result import comfyui_wait_result # hermano en ml/ + except ImportError: + from ml.comfyui_wait_result import comfyui_wait_result # via python/functions + try: + outputs = comfyui_wait_result(prompt_id, server=server, timeout=timeout) + out["completed"] = True + out["ok"] = True + if outputs: + out["last_node"] = str(list(outputs)[-1]) + except TimeoutError as exc: + out["error"] = f"polling: {exc}" + except RuntimeError as exc: + out["error"] = f"polling: {exc}" + return out + + +if __name__ == "__main__": + import sys + + pid = sys.argv[1] if len(sys.argv) > 1 else "" + if not pid: + print("uso: comfyui_stream_progress.py ", file=sys.stderr) + sys.exit(2) + res = comfyui_stream_progress(pid) + print(json.dumps(res, indent=2))