Files
fn_registry/python/functions/ml/comfyui_fetch_output_video.py
T
egutierrez e8a66f0dad feat(ml): comfyui_run_foreign_workflow_oneshot + helper fetch_output_video
Pipeline one-shot para ejecutar workflows ComfyUI ajenos end-to-end
(import desde cualquier fuente -> resolve deps -> validate -> submit ->
wait -> fetch del output imagen/video/malla) componiendo 9 funciones
existentes del grupo comfyui. Gate de seguridad: si faltan nodos/modelos
NO encola y los reporta en `missing`; nunca descarga modelos a ciegas y
solo instala nodos custom confiables opt-in (install_nodes + node_repos).

Helper comfyui_fetch_output_video: hermana de fetch_output_image y
fetch_output_mesh para los nodos de video/animacion (SaveAnimatedWEBP,
SaveVideo nativo, VHS_VideoCombine). Localiza el output bajo images/gifs/
videos en /history y lo baja via /view a disco; acepta outputs= de
wait_result para evitar re-consultar /history.

Cierra la pieza marcada por el completeness critic (report 0107) del
roadmap 0064/0087. 13 tests unitarios de las partes puras en verde;
validacion de integracion contra server vivo sin generacion pesada
(report 0110).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 12:53:40 +02:00

172 lines
7.4 KiB
Python

"""Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco.
Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh, pero para los
nodos de video/animacion (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine).
Esos nodos exponen su salida en GET /history/{prompt_id} bajo una lista de items
{filename, subfolder, type}; segun el nodo la clave puede ser "images" (SVD y
SaveVideo nativo lo ponen ahi), "gifs" (VHS_VideoCombine) o "videos". Esta funcion
localiza el primer archivo con extension de video/animacion (.mp4/.webm/.webp/.gif/
.mkv/.mov/.avif), lo baja via GET /view a disco y, opcionalmente, lo escribe en `dest`.
Impura: red (HTTP GET a /history y /view) + escritura en disco. Solo stdlib.
"""
import json
import os
import urllib.error
import urllib.parse
import urllib.request
# Extensiones de video/animacion que producen los nodos de ComfyUI.
_VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif")
# Claves de output preferentes para video/animacion (se inspeccionan primero).
_VIDEO_KEYS = ("gifs", "videos", "animated")
def _is_video_item(item: dict) -> bool:
"""True si el item de output apunta a un archivo de video/animacion.
Decide por la extension del filename; como fallback, por un campo "format"
que contenga "video" (algunos nodos VHS/SaveVideo lo anotan).
"""
fn = (item.get("filename") or "").lower()
if fn.endswith(_VIDEO_EXTS):
return True
fmt = (item.get("format") or "").lower()
return "video" in fmt
def _find_video_output(outputs: dict) -> dict | None:
"""Busca en los outputs de /history el primer archivo de video/animacion.
Hace dos pasadas: primero en las claves preferentes de video (gifs/videos/
animated), luego en cualquier clave (cubre SVD/SaveVideo nativo, que exponen
el archivo bajo "images"). Devuelve {filename, subfolder, type} o None.
"""
for prefer in (True, False):
for node_out in outputs.values():
if not isinstance(node_out, dict):
continue
for key, items in node_out.items():
if prefer and key not in _VIDEO_KEYS:
continue
if not isinstance(items, list):
continue
for item in items:
if isinstance(item, dict) and _is_video_item(item):
return {
"filename": item.get("filename", ""),
"subfolder": item.get("subfolder", ""),
"type": item.get("type", "output"),
}
return None
def _resolve_dest(dest: str | None, filename: str) -> str:
"""Resuelve la ruta local destino a partir de `dest` y el basename remoto."""
base = os.path.basename(filename)
if dest is None:
return os.path.join(os.getcwd(), base)
expanded = os.path.expanduser(dest)
if os.path.isdir(expanded) or expanded.endswith(os.sep):
return os.path.join(expanded, base)
return expanded
def comfyui_fetch_output_video(
prompt_id: str,
*,
server: str = "127.0.0.1:8188",
dest: str | None = None,
outputs: dict | None = None,
timeout: float = 120.0,
) -> dict:
"""Descarga el video/animacion de un prompt ComfyUI ya ejecutado a disco local.
Args:
prompt_id: id devuelto por comfyui_submit_workflow, de un workflow cuyo
nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya
termino (usa comfyui_wait_result antes si dudas). Se ignora si se
pasa `outputs`.
server: host:port del servidor ComfyUI (sin esquema). keyword-only.
dest: ruta destino. Si None, escribe el basename del video en el cwd.
Si es un directorio (o termina en separador), escribe el basename
dentro. Si es una ruta de archivo, escribe ahi. keyword-only.
outputs: dict de outputs ya obtenido (el que devuelve comfyui_wait_result).
Si se pasa, se busca el video ahi y NO se consulta /history (evita una
peticion de red extra justo despues de esperar). keyword-only.
timeout: timeout de cada peticion HTTP en segundos. keyword-only.
Returns:
dict {ok, path, format, bytes, error}. path = ruta local del archivo de
video guardado; format = extension sin punto (ej. "mp4" o "webp"); bytes =
tamano descargado. Si falla, ok=False y error explica (sin video en los
outputs, HTTP, conexion o escritura).
"""
# 1. Obtener los outputs: del parametro (sin red) o consultando /history.
if outputs is None:
hist_url = f"http://{server}/history/{prompt_id}"
try:
with urllib.request.urlopen(hist_url, timeout=timeout) as resp:
hist = json.loads(resp.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors="replace")[:200]
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"HTTP {exc.code} en {hist_url}: {body}"}
except urllib.error.URLError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo conectar a {hist_url}: {exc.reason}"}
except json.JSONDecodeError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"respuesta no es JSON valido desde {hist_url}: {exc}"}
entry = hist.get(prompt_id)
if not entry:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"prompt_id {prompt_id} no esta en /history (¿no termino o se purgo?)"}
outputs = entry.get("outputs", {})
video = _find_video_output(outputs or {})
if video is None:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"sin archivo de video/animacion en los outputs de {prompt_id}"}
# 2. Descargar el archivo via GET /view.
qs = urllib.parse.urlencode({
"filename": video["filename"],
"subfolder": video["subfolder"],
"type": video["type"],
})
view_url = f"http://{server}/view?{qs}"
try:
with urllib.request.urlopen(view_url, timeout=timeout) as resp:
blob = resp.read()
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors="replace")[:200]
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"HTTP {exc.code} en {view_url}: {body}"}
except urllib.error.URLError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo conectar a {view_url}: {exc.reason}"}
# 3. Escribir a disco.
out_path = _resolve_dest(dest, video["filename"])
try:
parent = os.path.dirname(out_path)
if parent:
os.makedirs(parent, exist_ok=True)
with open(out_path, "wb") as f:
f.write(blob)
except OSError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo escribir en {out_path!r}: {exc}"}
fmt = os.path.splitext(video["filename"])[1].lstrip(".").lower()
return {"ok": True, "path": out_path, "format": fmt, "bytes": len(blob), "error": ""}
if __name__ == "__main__":
import sys
pid = sys.argv[1] if len(sys.argv) > 1 else "00000000-0000-0000-0000-000000000000"
res = comfyui_fetch_output_video(pid, dest="/tmp/comfy_video")
print(json.dumps(res, indent=2))