"""Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco. Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh, pero para los nodos de video/animacion (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine). Esos nodos exponen su salida en GET /history/{prompt_id} bajo una lista de items {filename, subfolder, type}; segun el nodo la clave puede ser "images" (SVD y SaveVideo nativo lo ponen ahi), "gifs" (VHS_VideoCombine) o "videos". Esta funcion localiza el primer archivo con extension de video/animacion (.mp4/.webm/.webp/.gif/ .mkv/.mov/.avif), lo baja via GET /view a disco y, opcionalmente, lo escribe en `dest`. Impura: red (HTTP GET a /history y /view) + escritura en disco. Solo stdlib. """ import json import os import urllib.error import urllib.parse import urllib.request # Extensiones de video/animacion que producen los nodos de ComfyUI. _VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif") # Claves de output preferentes para video/animacion (se inspeccionan primero). _VIDEO_KEYS = ("gifs", "videos", "animated") def _is_video_item(item: dict) -> bool: """True si el item de output apunta a un archivo de video/animacion. Decide por la extension del filename; como fallback, por un campo "format" que contenga "video" (algunos nodos VHS/SaveVideo lo anotan). """ fn = (item.get("filename") or "").lower() if fn.endswith(_VIDEO_EXTS): return True fmt = (item.get("format") or "").lower() return "video" in fmt def _find_video_output(outputs: dict) -> dict | None: """Busca en los outputs de /history el primer archivo de video/animacion. Hace dos pasadas: primero en las claves preferentes de video (gifs/videos/ animated), luego en cualquier clave (cubre SVD/SaveVideo nativo, que exponen el archivo bajo "images"). Devuelve {filename, subfolder, type} o None. """ for prefer in (True, False): for node_out in outputs.values(): if not isinstance(node_out, dict): continue for key, items in node_out.items(): if prefer and key not in _VIDEO_KEYS: continue if not isinstance(items, list): continue for item in items: if isinstance(item, dict) and _is_video_item(item): return { "filename": item.get("filename", ""), "subfolder": item.get("subfolder", ""), "type": item.get("type", "output"), } return None def _resolve_dest(dest: str | None, filename: str) -> str: """Resuelve la ruta local destino a partir de `dest` y el basename remoto.""" base = os.path.basename(filename) if dest is None: return os.path.join(os.getcwd(), base) expanded = os.path.expanduser(dest) if os.path.isdir(expanded) or expanded.endswith(os.sep): return os.path.join(expanded, base) return expanded def comfyui_fetch_output_video( prompt_id: str, *, server: str = "127.0.0.1:8188", dest: str | None = None, outputs: dict | None = None, timeout: float = 120.0, ) -> dict: """Descarga el video/animacion de un prompt ComfyUI ya ejecutado a disco local. Args: prompt_id: id devuelto por comfyui_submit_workflow, de un workflow cuyo nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya termino (usa comfyui_wait_result antes si dudas). Se ignora si se pasa `outputs`. server: host:port del servidor ComfyUI (sin esquema). keyword-only. dest: ruta destino. Si None, escribe el basename del video en el cwd. Si es un directorio (o termina en separador), escribe el basename dentro. Si es una ruta de archivo, escribe ahi. keyword-only. outputs: dict de outputs ya obtenido (el que devuelve comfyui_wait_result). Si se pasa, se busca el video ahi y NO se consulta /history (evita una peticion de red extra justo despues de esperar). keyword-only. timeout: timeout de cada peticion HTTP en segundos. keyword-only. Returns: dict {ok, path, format, bytes, error}. path = ruta local del archivo de video guardado; format = extension sin punto (ej. "mp4" o "webp"); bytes = tamano descargado. Si falla, ok=False y error explica (sin video en los outputs, HTTP, conexion o escritura). """ # 1. Obtener los outputs: del parametro (sin red) o consultando /history. if outputs is None: hist_url = f"http://{server}/history/{prompt_id}" try: with urllib.request.urlopen(hist_url, timeout=timeout) as resp: hist = json.loads(resp.read()) except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:200] return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"HTTP {exc.code} en {hist_url}: {body}"} except urllib.error.URLError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo conectar a {hist_url}: {exc.reason}"} except json.JSONDecodeError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"respuesta no es JSON valido desde {hist_url}: {exc}"} entry = hist.get(prompt_id) if not entry: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"prompt_id {prompt_id} no esta en /history (¿no termino o se purgo?)"} outputs = entry.get("outputs", {}) video = _find_video_output(outputs or {}) if video is None: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"sin archivo de video/animacion en los outputs de {prompt_id}"} # 2. Descargar el archivo via GET /view. qs = urllib.parse.urlencode({ "filename": video["filename"], "subfolder": video["subfolder"], "type": video["type"], }) view_url = f"http://{server}/view?{qs}" try: with urllib.request.urlopen(view_url, timeout=timeout) as resp: blob = resp.read() except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:200] return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"HTTP {exc.code} en {view_url}: {body}"} except urllib.error.URLError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo conectar a {view_url}: {exc.reason}"} # 3. Escribir a disco. out_path = _resolve_dest(dest, video["filename"]) try: parent = os.path.dirname(out_path) if parent: os.makedirs(parent, exist_ok=True) with open(out_path, "wb") as f: f.write(blob) except OSError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo escribir en {out_path!r}: {exc}"} fmt = os.path.splitext(video["filename"])[1].lstrip(".").lower() return {"ok": True, "path": out_path, "format": fmt, "bytes": len(blob), "error": ""} if __name__ == "__main__": import sys pid = sys.argv[1] if len(sys.argv) > 1 else "00000000-0000-0000-0000-000000000000" res = comfyui_fetch_output_video(pid, dest="/tmp/comfy_video") print(json.dumps(res, indent=2))