From 898502a3215338c9cea0bc95782bd7f7ec88c94f Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Wed, 24 Jun 2026 12:39:58 +0200 Subject: [PATCH] fix(ml): comfyui_wait_result no sale prematuro en jobs de video/3D Exige outputs no vacios (no solo status terminal) para dar por completado un prompt: en jobs pesados ComfyUI marca la entry de /history como terminada antes de poblar outputs, lo que devolvia un dict vacio mientras el job seguia en GPU. Ahora sigue sondeando hasta que los outputs aparecen o hasta agotar el timeout. Timeout default 180s -> 600s (cubre video/3D) y timeout HTTP por-request acotado a 30s. Firma y contrato de retorno intactos. Tests nuevos (mock urllib CI-safe + live opcional contra /history real): golden, regresion del bug, edge imagen corta, timeout y error. v1.0.0 -> 1.1.0. Co-Authored-By: Claude Opus 4.8 (1M context) --- python/functions/ml/comfyui_wait_result.md | 40 ++-- python/functions/ml/comfyui_wait_result.py | 45 +++-- .../ml/tests/test_comfyui_wait_result.py | 179 ++++++++++++++++++ 3 files changed, 240 insertions(+), 24 deletions(-) create mode 100644 python/functions/ml/tests/test_comfyui_wait_result.py diff --git a/python/functions/ml/comfyui_wait_result.md b/python/functions/ml/comfyui_wait_result.md index bda642a9..40d0a717 100644 --- a/python/functions/ml/comfyui_wait_result.md +++ b/python/functions/ml/comfyui_wait_result.md @@ -3,10 +3,10 @@ name: comfyui_wait_result kind: function lang: py domain: ml -version: "1.0.0" +version: "1.1.0" purity: impure -signature: "def comfyui_wait_result(prompt_id: str, server: str = \"127.0.0.1:8188\", timeout: float = 180.0, poll_interval: float = 1.0) -> dict" -description: "Sondea GET /history/{prompt_id} hasta que un prompt ComfyUI completa (status.completed o status_str success/error) o se agota el timeout. Devuelve los outputs por nodo (node_id -> {images: [...]}). Polling como mecanismo principal (no WebSocket). Impura: HTTP GET en bucle + sleep, solo stdlib." +signature: "def comfyui_wait_result(prompt_id: str, server: str = \"127.0.0.1:8188\", timeout: float = 600.0, poll_interval: float = 1.0) -> dict" +description: "Sondea GET /history/{prompt_id} hasta que un prompt ComfyUI completa con estado de exito (status.completed o status_str success) Y outputs ya poblados, o falla (status_str error), o se agota el timeout. Exige outputs no vacios para no salir prematuro en jobs de video/3D donde la entry de /history aparece marcada como terminada antes de poblar resultados. Devuelve los outputs por nodo (node_id -> {images: [...]}). Polling como mecanismo principal (no WebSocket). Impura: HTTP GET en bucle + sleep, solo stdlib." tags: [comfyui, ml, image-generation, stable-diffusion, http, polling] uses_functions: [] uses_types: [] @@ -20,13 +20,13 @@ params: - name: server desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')." - name: timeout - desc: "Maximo de segundos a esperar antes de lanzar TimeoutError." + desc: "Maximo de segundos a esperar antes de lanzar TimeoutError. Default amplio (600s) para cubrir video/3D; imagenes cortas retornan en cuanto los outputs estan listos." - name: poll_interval desc: "Segundos entre sondeos de /history." -output: "dict de outputs {node_id: {'images': [{'filename', 'subfolder', 'type'}, ...]}} tal como ComfyUI los expone en history[prompt_id]['outputs']. Para un txt2img, el nodo SaveImage ('9') trae el PNG. Puede contener otros tipos (gifs, texto) segun los nodos del workflow." -tested: false -tests: [] -test_file_path: "" +output: "dict de outputs {node_id: {'images': [{'filename', 'subfolder', 'type'}, ...]}} tal como ComfyUI los expone en history[prompt_id]['outputs']. Siempre no vacio en caso de exito. Para un txt2img, el nodo SaveImage ('9') trae el PNG. El video (SaveVideo) aparece bajo 'images' con 'animated':[True]. Puede contener otros tipos (gifs, texto) segun los nodos del workflow." +tested: true +tests: ["Golden: espera hasta que /history puebla outputs no vacios (entry tarda en aparecer)", "Regresion bug: 'done' con outputs vacios NO sale prematuro, espera a que aparezcan", "Edge: imagen corta ya completada en el primer sondeo retorna inmediato", "Edge: status.completed sin status_str success tambien vale si hay outputs", "Error: timeout genuino (prompt inexistente) lanza TimeoutError sin colgar", "Error: status_str 'error' lanza RuntimeError", "Live (skippable): job de video ya completado en /history real devuelve outputs"] +test_file_path: "python/functions/ml/tests/test_comfyui_wait_result.py" file_path: "python/functions/ml/comfyui_wait_result.py" --- @@ -63,11 +63,25 @@ a paso) — es portable porque solo depende de HTTP, no de websocket-client. - Bloquea el hilo (sondea + duerme). Para varias generaciones en paralelo, encola todas con submit y luego espera cada prompt_id, o usa hilos. -- El timeout por defecto (180s) puede quedarse corto en GPUs lentas o workflows - pesados (muchos steps, alta resolucion, upscalers). Sube `timeout` segun el - caso. Lanza TimeoutError si se agota. +- El timeout por defecto es ahora 600s (antes 180s) para cubrir video/3D. Sigue + pudiendo quedarse corto en workflows muy pesados o GPUs lentas: sube `timeout` + segun el caso. Lanza TimeoutError si se agota. +- Considera el job terminado solo cuando el estado es de exito **y** los outputs + estan poblados. Si ComfyUI marca la entry como terminada pero aun sin outputs + (ocurre en jobs de video/3D mientras la GPU sigue escribiendo), sigue + sondeando hasta que aparezcan o hasta agotar el timeout — no devuelve un dict + vacio prematuro. Corolario: un workflow sin nodo de guardado (no produce + outputs) agota el timeout en vez de devolver {}. - Lanza RuntimeError si la ejecucion termina con status_str "error" (el detalle del fallo va en el mensaje) o si no se puede conectar al servidor. -- Devuelve metadatos de los PNG (filename, subfolder, type), NO los bytes de la - imagen. Los archivos quedan en la carpeta output/ del servidor; para leerlos +- Devuelve metadatos de los PNG/MP4 (filename, subfolder, type), NO los bytes de + la imagen. Los archivos quedan en la carpeta output/ del servidor; para leerlos desde otra maquina usa GET /view?filename=...&subfolder=...&type=output. + +## Capability growth log + +- v1.1.0 (2026-06-24) — fix salida prematura en jobs de video/3D: ahora exige + outputs no vacios (no solo status terminal) para dar por completado, sigue + sondeando si la entry de /history aparece terminada pero sin outputs, sube el + timeout default 180s -> 600s y acota el timeout HTTP por-request a 30s. + Anadidos tests (mock urllib + live opcional). Documentado en report 0106. diff --git a/python/functions/ml/comfyui_wait_result.py b/python/functions/ml/comfyui_wait_result.py index ffee1852..08e962fe 100644 --- a/python/functions/ml/comfyui_wait_result.py +++ b/python/functions/ml/comfyui_wait_result.py @@ -17,37 +17,55 @@ import urllib.request def comfyui_wait_result( prompt_id: str, server: str = "127.0.0.1:8188", - timeout: float = 180.0, + timeout: float = 600.0, poll_interval: float = 1.0, ) -> dict: """Espera a que ComfyUI termine de ejecutar un prompt y devuelve sus outputs. - Sondea GET /history/{prompt_id} cada poll_interval segundos hasta que - status.completed es True o status.status_str es "success"/"error", o hasta - agotar el timeout. + Sondea GET /history/{prompt_id} cada poll_interval segundos hasta que el + prompt aparece en /history con un estado terminal de exito (status.completed + True o status.status_str "success") **y** outputs ya poblados, o hasta que + falla (status_str "error"), o hasta agotar el timeout. + + El requisito de outputs no vacios es deliberado: en jobs pesados (video, 3D) + ComfyUI puede exponer la entry de /history con el estado marcado como + terminado antes de haber poblado los outputs. Devolver en ese instante daba + un dict vacio mientras el job seguia en GPU (salida prematura). Por eso, si + el estado dice "terminado" pero los outputs aun no estan, se sigue sondeando + hasta que aparezcan o hasta agotar el timeout, que actua como red de + seguridad. Args: prompt_id: id devuelto por comfyui_submit_workflow. server: host:port del servidor ComfyUI (sin esquema). - timeout: maximo de segundos a esperar antes de fallar. + timeout: maximo de segundos a esperar antes de fallar. El default amplio + (600s) cubre workflows largos de video/3D; para imagenes cortas el + retorno sigue siendo inmediato en cuanto los outputs estan listos. poll_interval: segundos entre sondeos. Returns: dict de outputs {node_id: {"images": [{filename, subfolder, type}, ...]}} tal como ComfyUI los expone en history[prompt_id]["outputs"]. Puede - contener otros tipos de output (gifs, texto) segun los nodos del - workflow. + contener otros tipos de output (gifs, texto, video bajo "images" con + "animated") segun los nodos del workflow. Siempre no vacio en caso de + exito (un workflow sin nodo de guardado agotaria el timeout). Raises: - TimeoutError: si se agota el timeout sin que el prompt complete. + TimeoutError: si se agota el timeout sin que el prompt complete con + outputs (incluye el caso de un prompt_id que nunca aparece en + /history porque sigue en cola, no existe, o el workflow no produce + outputs). RuntimeError: si la ejecucion termina con status_str "error", si no se puede conectar, o si la respuesta no es JSON valido. """ url = f"http://{server}/history/{prompt_id}" + # Timeout por-request acotado: una conexion colgada no debe bloquear todo el + # presupuesto global (que ahora puede ser de varios minutos). + req_timeout = min(timeout, 30.0) if timeout > 0 else 30.0 deadline = time.time() + timeout while time.time() < deadline: try: - with urllib.request.urlopen(url, timeout=timeout) as resp: + with urllib.request.urlopen(url, timeout=req_timeout) as resp: hist = json.loads(resp.read()) except urllib.error.URLError as exc: raise RuntimeError( @@ -67,8 +85,13 @@ def comfyui_wait_result( f"comfyui_wait_result: ejecucion fallo para {prompt_id}: " f"{json.dumps(status)[:500]}" ) - if status.get("completed") or status_str == "success": - return entry.get("outputs", {}) + done = bool(status.get("completed")) or status_str == "success" + outputs = entry.get("outputs") or {} + # Solo se considera terminado cuando hay estado de exito Y outputs. + # "done" con outputs vacios = entry creada pero aun sin resultados: + # se sigue esperando (no salir prematuro). + if done and outputs: + return outputs time.sleep(poll_interval) raise TimeoutError( diff --git a/python/functions/ml/tests/test_comfyui_wait_result.py b/python/functions/ml/tests/test_comfyui_wait_result.py new file mode 100644 index 00000000..fac32e8c --- /dev/null +++ b/python/functions/ml/tests/test_comfyui_wait_result.py @@ -0,0 +1,179 @@ +"""Tests para comfyui_wait_result (funcion impura: polling de /history). + +La mayoria mockea urllib.request.urlopen para correr sin GPU ni red (CI-safe). +Un test "live" extra valida contra el /history real de un job de video ya +completado, y se salta limpiamente si el servidor ComfyUI no responde o el job +ya no esta en el history (p. ej. servidor reiniciado). +""" + +import json +import os +import sys +from unittest import mock + +import pytest + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from ml.comfyui_wait_result import comfyui_wait_result + +PID = "test-pid-1" + + +class _FakeResp: + """Context manager minimo que imita la respuesta de urllib.request.urlopen.""" + + def __init__(self, payload): + self._payload = json.dumps(payload).encode() + + def read(self): + return self._payload + + def __enter__(self): + return self + + def __exit__(self, *_): + return False + + +def _seq_urlopen(payloads): + """Fake urlopen que devuelve cada payload en orden; repite el ultimo.""" + state = {"i": 0} + + def _open(url, timeout=None): + i = min(state["i"], len(payloads) - 1) + state["i"] += 1 + return _FakeResp(payloads[i]) + + return _open + + +def _entry(status_str="success", completed=True, outputs=None): + return { + PID: { + "status": {"status_str": status_str, "completed": completed, "messages": []}, + "outputs": outputs if outputs is not None else {}, + } + } + + +# --- Golden: espera a outputs no vacios en un job largo (entry tarda en aparecer) --- +def test_golden_espera_a_que_aparezcan_outputs(): + payloads = [ + {}, # aun en cola: entry ausente + {}, # sigue en cola + _entry( + outputs={ + "79": { + "images": [ + {"filename": "vid.mp4", "subfolder": "", "type": "output"} + ], + "animated": [True], + } + } + ), + ] + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", _seq_urlopen(payloads) + ), mock.patch("ml.comfyui_wait_result.time.sleep", lambda *_: None): + out = comfyui_wait_result(PID, timeout=5, poll_interval=0.001) + assert out, "debe devolver outputs no vacios" + assert out["79"]["images"][0]["filename"] == "vid.mp4" + + +# --- Regresion del bug: 'done' con outputs vacios NO debe salir prematuro --- +def test_no_sale_prematuro_cuando_done_pero_outputs_vacios(): + payloads = [ + _entry(outputs={}), # estado terminal pero outputs aun vacios (caso del bug) + _entry(outputs={}), # sigue vacio mientras la GPU trabaja + _entry(outputs={"9": {"images": [{"filename": "img.png"}]}}), # ya poblado + ] + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", _seq_urlopen(payloads) + ), mock.patch("ml.comfyui_wait_result.time.sleep", lambda *_: None): + out = comfyui_wait_result(PID, timeout=5, poll_interval=0.001) + assert out == {"9": {"images": [{"filename": "img.png"}]}} + + +# --- Edge: imagen corta ya completada en el primer sondeo -> retorno inmediato --- +def test_imagen_corta_devuelve_en_primer_poll(): + payloads = [_entry(outputs={"9": {"images": [{"filename": "apple.png"}]}})] + opener = _seq_urlopen(payloads) + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", opener + ), mock.patch("ml.comfyui_wait_result.time.sleep", lambda *_: None): + out = comfyui_wait_result(PID, timeout=5, poll_interval=0.001) + assert out["9"]["images"][0]["filename"] == "apple.png" + + +# --- Edge: estado 'completed' sin status_str success tambien vale (si hay outputs) --- +def test_completed_sin_status_str_success(): + payloads = [_entry(status_str=None, completed=True, outputs={"9": {"images": [{"filename": "x.png"}]}})] + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", _seq_urlopen(payloads) + ), mock.patch("ml.comfyui_wait_result.time.sleep", lambda *_: None): + out = comfyui_wait_result(PID, timeout=5, poll_interval=0.001) + assert out["9"]["images"][0]["filename"] == "x.png" + + +# --- Error: timeout genuino (prompt nunca aparece) -> TimeoutError, no cuelga --- +def test_timeout_prompt_inexistente_lanza_timeout(): + payloads = [{}] # entry siempre ausente + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", _seq_urlopen(payloads) + ): + with pytest.raises(TimeoutError): + comfyui_wait_result(PID, timeout=0.05, poll_interval=0.02) + + +# --- Error: ejecucion fallida (status_str 'error') -> RuntimeError --- +def test_status_error_lanza_runtimeerror(): + payloads = [ + { + PID: { + "status": { + "status_str": "error", + "completed": False, + "messages": [["execution_error", {"exception_message": "OOM"}]], + }, + "outputs": {}, + } + } + ] + with mock.patch( + "ml.comfyui_wait_result.urllib.request.urlopen", _seq_urlopen(payloads) + ), mock.patch("ml.comfyui_wait_result.time.sleep", lambda *_: None): + with pytest.raises(RuntimeError): + comfyui_wait_result(PID, timeout=5, poll_interval=0.001) + + +# --- Live (opcional): job de video ya completado en el /history real --- +LIVE_SERVER = "127.0.0.1:8188" +LIVE_PID = "8a278988-8a94-4225-add3-88a406f7101c" # LTX T2V del report 0105 + + +def _server_up(server=LIVE_SERVER): + import urllib.request + + try: + urllib.request.urlopen(f"http://{server}/history/__probe__", timeout=2).read() + return True + except Exception: + return False + + +@pytest.mark.skipif(not _server_up(), reason="ComfyUI no responde en 127.0.0.1:8188") +def test_live_job_video_completado_devuelve_outputs(): + import urllib.request + + h = json.loads( + urllib.request.urlopen( + f"http://{LIVE_SERVER}/history/{LIVE_PID}", timeout=3 + ).read() + ) + if LIVE_PID not in h: + pytest.skip("job 8a278988 ya no esta en /history (servidor reiniciado)") + out = comfyui_wait_result(LIVE_PID, server=LIVE_SERVER, timeout=10) + assert out, "un job ya completado debe devolver outputs no vacios y rapido" + assert any(v.get("images") for v in out.values()), "debe traer la clave 'images'"