Files
fn_registry/python/functions/ml/comfyui_wait_result.py
T
egutierrez 898502a321 fix(ml): comfyui_wait_result no sale prematuro en jobs de video/3D
Exige outputs no vacios (no solo status terminal) para dar por completado
un prompt: en jobs pesados ComfyUI marca la entry de /history como
terminada antes de poblar outputs, lo que devolvia un dict vacio mientras
el job seguia en GPU. Ahora sigue sondeando hasta que los outputs aparecen
o hasta agotar el timeout. Timeout default 180s -> 600s (cubre video/3D) y
timeout HTTP por-request acotado a 30s. Firma y contrato de retorno intactos.

Tests nuevos (mock urllib CI-safe + live opcional contra /history real):
golden, regresion del bug, edge imagen corta, timeout y error. v1.0.0 -> 1.1.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 12:39:58 +02:00

122 lines
5.2 KiB
Python

"""Sondea GET /history/{prompt_id} hasta que un workflow ComfyUI termina.
Funcion impura: hace red (HTTP GET en bucle) y duerme entre sondeos. Solo
stdlib (urllib, json, time).
Usa polling de /history como mecanismo principal (no WebSocket): es mas robusto
porque no depende de websocket-client, que no esta garantizado en el venv. Para
saber si el resultado esta listo (no streaming de progreso paso a paso) el
polling de /history es suficiente y portable.
"""
import json
import time
import urllib.error
import urllib.request
def comfyui_wait_result(
prompt_id: str,
server: str = "127.0.0.1:8188",
timeout: float = 600.0,
poll_interval: float = 1.0,
) -> dict:
"""Espera a que ComfyUI termine de ejecutar un prompt y devuelve sus outputs.
Sondea GET /history/{prompt_id} cada poll_interval segundos hasta que el
prompt aparece en /history con un estado terminal de exito (status.completed
True o status.status_str "success") **y** outputs ya poblados, o hasta que
falla (status_str "error"), o hasta agotar el timeout.
El requisito de outputs no vacios es deliberado: en jobs pesados (video, 3D)
ComfyUI puede exponer la entry de /history con el estado marcado como
terminado antes de haber poblado los outputs. Devolver en ese instante daba
un dict vacio mientras el job seguia en GPU (salida prematura). Por eso, si
el estado dice "terminado" pero los outputs aun no estan, se sigue sondeando
hasta que aparezcan o hasta agotar el timeout, que actua como red de
seguridad.
Args:
prompt_id: id devuelto por comfyui_submit_workflow.
server: host:port del servidor ComfyUI (sin esquema).
timeout: maximo de segundos a esperar antes de fallar. El default amplio
(600s) cubre workflows largos de video/3D; para imagenes cortas el
retorno sigue siendo inmediato en cuanto los outputs estan listos.
poll_interval: segundos entre sondeos.
Returns:
dict de outputs {node_id: {"images": [{filename, subfolder, type}, ...]}}
tal como ComfyUI los expone en history[prompt_id]["outputs"]. Puede
contener otros tipos de output (gifs, texto, video bajo "images" con
"animated") segun los nodos del workflow. Siempre no vacio en caso de
exito (un workflow sin nodo de guardado agotaria el timeout).
Raises:
TimeoutError: si se agota el timeout sin que el prompt complete con
outputs (incluye el caso de un prompt_id que nunca aparece en
/history porque sigue en cola, no existe, o el workflow no produce
outputs).
RuntimeError: si la ejecucion termina con status_str "error", si no se
puede conectar, o si la respuesta no es JSON valido.
"""
url = f"http://{server}/history/{prompt_id}"
# Timeout por-request acotado: una conexion colgada no debe bloquear todo el
# presupuesto global (que ahora puede ser de varios minutos).
req_timeout = min(timeout, 30.0) if timeout > 0 else 30.0
deadline = time.time() + timeout
while time.time() < deadline:
try:
with urllib.request.urlopen(url, timeout=req_timeout) as resp:
hist = json.loads(resp.read())
except urllib.error.URLError as exc:
raise RuntimeError(
f"comfyui_wait_result: no se pudo conectar a {url}: {exc.reason}"
) from exc
except json.JSONDecodeError as exc:
raise RuntimeError(
f"comfyui_wait_result: respuesta no es JSON valido desde {url}: {exc}"
) from exc
entry = hist.get(prompt_id)
if entry:
status = entry.get("status", {})
status_str = status.get("status_str")
if status_str == "error":
raise RuntimeError(
f"comfyui_wait_result: ejecucion fallo para {prompt_id}: "
f"{json.dumps(status)[:500]}"
)
done = bool(status.get("completed")) or status_str == "success"
outputs = entry.get("outputs") or {}
# Solo se considera terminado cuando hay estado de exito Y outputs.
# "done" con outputs vacios = entry creada pero aun sin resultados:
# se sigue esperando (no salir prematuro).
if done and outputs:
return outputs
time.sleep(poll_interval)
raise TimeoutError(
f"comfyui_wait_result: timeout de {timeout}s esperando {prompt_id}"
)
if __name__ == "__main__":
import sys
from comfyui_build_txt2img_workflow import comfyui_build_txt2img_workflow
from comfyui_submit_workflow import comfyui_submit_workflow
wf = comfyui_build_txt2img_workflow(
ckpt_name="v1-5-pruned-emaonly-fp16.safetensors",
positive="a red apple on a wooden table, sharp focus",
negative="blurry, low quality",
steps=20,
seed=42,
)
resp = comfyui_submit_workflow(wf)
pid = resp["prompt_id"]
print(f"esperando prompt_id={pid} ...", file=sys.stderr)
outputs = comfyui_wait_result(pid)
for node_id, out in outputs.items():
for img in out.get("images", []):
print(f"OUTPUT node={node_id} filename={img['filename']}")