From e8a66f0dadac78510f445d36433a290d2d587d7d Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Wed, 24 Jun 2026 12:53:40 +0200 Subject: [PATCH] feat(ml): comfyui_run_foreign_workflow_oneshot + helper fetch_output_video Pipeline one-shot para ejecutar workflows ComfyUI ajenos end-to-end (import desde cualquier fuente -> resolve deps -> validate -> submit -> wait -> fetch del output imagen/video/malla) componiendo 9 funciones existentes del grupo comfyui. Gate de seguridad: si faltan nodos/modelos NO encola y los reporta en `missing`; nunca descarga modelos a ciegas y solo instala nodos custom confiables opt-in (install_nodes + node_repos). Helper comfyui_fetch_output_video: hermana de fetch_output_image y fetch_output_mesh para los nodos de video/animacion (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine). Localiza el output bajo images/gifs/ videos en /history y lo baja via /view a disco; acepta outputs= de wait_result para evitar re-consultar /history. Cierra la pieza marcada por el completeness critic (report 0107) del roadmap 0064/0087. 13 tests unitarios de las partes puras en verde; validacion de integracion contra server vivo sin generacion pesada (report 0110). Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/capabilities/comfyui.md | 2 + .../ml/comfyui_fetch_output_video.md | 91 ++++++ .../ml/comfyui_fetch_output_video.py | 171 +++++++++++ .../ml/comfyui_fetch_output_video_test.py | 68 +++++ .../comfyui_run_foreign_workflow_oneshot.md | 124 ++++++++ .../comfyui_run_foreign_workflow_oneshot.py | 275 ++++++++++++++++++ ...mfyui_run_foreign_workflow_oneshot_test.py | 63 ++++ 7 files changed, 794 insertions(+) create mode 100644 python/functions/ml/comfyui_fetch_output_video.md create mode 100644 python/functions/ml/comfyui_fetch_output_video.py create mode 100644 python/functions/ml/comfyui_fetch_output_video_test.py create mode 100644 python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md create mode 100644 python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py create mode 100644 python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py diff --git a/docs/capabilities/comfyui.md b/docs/capabilities/comfyui.md index c5c13e6f..ec0489dd 100644 --- a/docs/capabilities/comfyui.md +++ b/docs/capabilities/comfyui.md @@ -67,6 +67,7 @@ El **API format** (dict de nodos numerados que produce `build_txt2img_workflow` | [comfyui_download_workflow_py_ml](../../python/functions/ml/comfyui_download_workflow.md) | `download_workflow(source, dest=None, *, server, civitai_token, hf_token, timeout) -> dict` | **Dispatcher**: descarga un workflow de CUALQUIER fuente (Google Drive, GitHub, Civitai, HuggingFace, URL directa o path local) y lo normaliza a API format. Detecta el tipo por la URL y delega; tras bajar compone `import_workflow_json`/`import_workflow_png`. Catálogo de fuentes: `reports/0080`. Impura. | | [comfyui_read_png_metadata_py_ml](../../python/functions/ml/comfyui_read_png_metadata.md) | `read_png_metadata(png_path) -> dict` | Lee los parámetros de generación (modelo, seed, steps, cfg, sampler, prompts) de un PNG generado por ComfyUI. Impura (I/O disco). | | [comfyui_fetch_output_image_py_ml](../../python/functions/ml/comfyui_fetch_output_image.md) | `fetch_output_image(filename, *, subfolder='', type_='output', server, dest_dir='.', timeout) -> dict` | Descarga el PNG generado vía GET `/view` a disco local (`wait_result` solo da metadata). Impura. | +| [comfyui_fetch_output_video_py_ml](../../python/functions/ml/comfyui_fetch_output_video.md) | `fetch_output_video(prompt_id, *, server, dest=None, outputs=None, timeout) -> dict` | Localiza y descarga el output de **vídeo/animación** (`.mp4`/`.webp`/`.webm`/`.gif`) de `/history` vía GET `/view`. Cubre SaveAnimatedWEBP/SaveVideo (bajo `"images"`) y VHS_VideoCombine (bajo `"gifs"`). Hermana de `fetch_output_image`/`fetch_output_mesh`. Acepta `outputs=` de `wait_result` para evitar re-consultar `/history`. Impura. | ### Potencia y assets de internet — dominio `ml` (P1, issue 0064) @@ -78,6 +79,7 @@ El **API format** (dict de nodos numerados que produce `build_txt2img_workflow` | [comfyui_search_civitai_models_py_ml](../../python/functions/ml/comfyui_search_civitai_models.md) | `search_civitai_models(query, *, types='Checkpoint', base_model=None, sort, limit=20, token=None) -> dict` | Busca modelos/LoRAs en la API pública de Civitai → `{ok, items:[{name, type, base_model, version_id, download_url, nsfw}], count, error}`. Sin token funciona. Impura. | | [comfyui_install_custom_node_py_ml](../../python/functions/ml/comfyui_install_custom_node.md) | `install_custom_node(repo_url, *, comfyui_dir, pip_install=True, restart=False) -> dict` | git clone en `custom_nodes/` + pip/uv install de requirements en el venv de ComfyUI. NO reinicia el server (restart=False). Impura. | | [comfyui_resolve_workflow_deps_py_ml](../../python/functions/ml/comfyui_resolve_workflow_deps.md) | `resolve_workflow_deps(workflow, server='127.0.0.1:8188') -> dict` | Para un workflow ajeno: valida y traduce lo que falta en acciones (`{missing_nodes, missing_models, suggestions}`). Compone `validate_workflow`. Impura. | +| [comfyui_run_foreign_workflow_oneshot_py_pipelines](../../python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md) | `run_foreign_workflow_oneshot(source, *, server, dest=None, output_kind='auto', install_nodes=False, node_repos=None, wait_timeout, civitai_token, hf_token) -> dict` | **Pipeline** para ejecutar un workflow ComfyUI **ajeno** end-to-end en una llamada: import (cualquier fuente) → resolve deps → (instala solo nodos confiables opt-in) → validate → submit → wait → fetch (imagen/vídeo/malla). **Gate de seguridad**: si faltan deps NO encola y las reporta en `missing`; nunca descarga modelos a ciegas. Compone `download_workflow` + `resolve_workflow_deps` + `install_custom_node` + `submit`/`wait` + `fetch_output_image/video/mesh`. Promoción del roadmap 0064/0087. Impuro. | | [comfyui_list_installed_models_py_ml](../../python/functions/ml/comfyui_list_installed_models.md) | `list_installed_models(folder=None, comfyui_dir='~/ComfyUI') -> dict` | Lista modelos por carpeta resolviendo la ruta real de `extra_model_paths.yaml` (`/mnt/2tb/comfyui_models/`) + la nativa. Escaneo de FS, no depende del server. Impura. | ### Retoque pro y oneshot — dominio `ml` + `pipelines` (P0, lote report 0093) diff --git a/python/functions/ml/comfyui_fetch_output_video.md b/python/functions/ml/comfyui_fetch_output_video.md new file mode 100644 index 00000000..a2b4ae1b --- /dev/null +++ b/python/functions/ml/comfyui_fetch_output_video.md @@ -0,0 +1,91 @@ +--- +name: comfyui_fetch_output_video +kind: function +lang: py +domain: ml +version: "1.0.0" +purity: impure +signature: "def comfyui_fetch_output_video(prompt_id: str, *, server: str = \"127.0.0.1:8188\", dest: str | None = None, outputs: dict | None = None, timeout: float = 120.0) -> dict" +description: "Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco local. Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh pero para los nodos de video (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine): esos exponen su salida en GET /history bajo 'images', 'gifs' o 'videos' con items {filename, subfolder, type}. Localiza el primer .mp4/.webm/.webp/.gif/.mkv/.mov/.avif, lo baja via GET /view y opcionalmente lo escribe en dest. Acepta outputs= ya obtenido de comfyui_wait_result para evitar re-consultar /history. Impura: HTTP GET + escritura en disco, solo stdlib." +tags: [comfyui, video, fetch, animation, ml, download, workflow] +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: error_go_core +imports: [] +params: + - name: prompt_id + desc: "id devuelto por comfyui_submit_workflow, de un workflow cuyo nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya termino (usa comfyui_wait_result antes si dudas). Se ignora si se pasa outputs." + - name: server + desc: "host:port del servidor ComfyUI sin esquema. keyword-only." + - name: dest + desc: "Ruta destino. Si None, escribe el basename del video en el cwd. Si es un directorio existente (o termina en separador), escribe el basename dentro. Si es una ruta de archivo, escribe ahi. keyword-only." + - name: outputs + desc: "dict de outputs ya obtenido (el que devuelve comfyui_wait_result). Si se pasa, se busca el video ahi y NO se consulta /history (evita una peticion de red extra). keyword-only." + - name: timeout + desc: "Timeout de cada peticion HTTP en segundos. keyword-only." +output: "dict {ok, path, format, bytes, error}. path = ruta local del archivo de video guardado, format = extension sin punto (ej. 'mp4' o 'webp'), bytes = bytes descargados. Si falla, ok=False y error explica (sin video en los outputs, HTTP, conexion o escritura)." +tested: true +tests: + - "test_is_video_item_por_extension" + - "test_is_video_item_fallback_format" + - "test_find_saveanimatedwebp_bajo_images" + - "test_find_savevideo_mp4_bajo_images" + - "test_find_vhs_bajo_gifs" + - "test_find_prioriza_clave_video_sobre_images" + - "test_find_sin_video_devuelve_none" +test_file_path: "python/functions/ml/comfyui_fetch_output_video_test.py" +file_path: "python/functions/ml/comfyui_fetch_output_video.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from ml.comfyui_fetch_output_video import comfyui_fetch_output_video + +# Tras comfyui_submit_workflow + comfyui_wait_result de un workflow de video +# (SVD img2vid, LTX, AnimateDiff...), baja el .mp4/.webp al disco. +res = comfyui_fetch_output_video("8a278988-8a94-4225-add3-88a406f7101c", dest="/tmp/videos") +# res == {"ok": True, "path": "/tmp/videos/video_00003_.mp4", +# "format": "mp4", "bytes": 199413, "error": ""} + +# Si ya tienes los outputs de comfyui_wait_result, pasalos y evita re-consultar /history: +outputs = {"79": {"images": [{"filename": "video_00003_.mp4", "subfolder": "", "type": "output"}]}} +res2 = comfyui_fetch_output_video("ignored", dest="/tmp/videos", outputs=outputs) +``` + +Lánzalo con el python del venv (import de arriba o heredoc). Nota: `./fn run` directo no aplica porque la firma usa `*` (keyword-only), no soportado por el generador de runner de `fn run`. + +## Cuando usarla + +Después de generar un vídeo o animación con ComfyUI (img2vid SVD, LTX, AnimateDiff, +Wan, cualquier workflow con SaveAnimatedWEBP / SaveVideo / VHS_VideoCombine), cuando +necesites el archivo `.mp4`/`.webp`/`.webm`/`.gif` real en disco (no solo su nombre): +para reproducirlo, subirlo a un vault, o post-procesarlo. Es la hermana de +`comfyui_fetch_output_image` (imágenes estáticas) y `comfyui_fetch_output_mesh` +(mallas 3D). Para el flujo completo de un workflow ajeno usa el pipeline +`comfyui_run_foreign_workflow_oneshot`, que ya elige este fetch según el tipo de output. + +## Gotchas + +- Impura: hace HTTP GET a /history y /view y escribe en disco. Requiere el server + vivo y que el prompt YA haya terminado (usa `comfyui_wait_result` antes, o pásale + `outputs=`). +- Distintos nodos exponen el vídeo bajo distintas claves: SVD y SaveVideo nativo lo + ponen bajo `"images"`, VHS_VideoCombine bajo `"gifs"`, otros bajo `"videos"`. La + función inspecciona primero las claves preferentes (gifs/videos/animated) y luego + cualquier clave, así que cubre los tres casos. +- Un `.webp` se trata como animación/vídeo (los nodos de animación de ComfyUI lo + usan). Si tu workflow guarda una imagen `.webp` ESTÁTICA, esta función la bajaría + igual; para garantizar imagen estática usa `comfyui_fetch_output_image` con el + filename concreto. +- Toma el PRIMER archivo de vídeo que encuentra. Si un workflow exporta varios, + baja solo uno; para los demás llama otra vez o usa GET /view con el filename concreto. +- El history se purga al reiniciar el server: si el prompt ya no está, devuelve + `ok=False`. Pasar `outputs=` evita esa consulta y el problema. +- `dest` se interpreta: None -> cwd; directorio EXISTENTE -> dentro; ruta de archivo + -> esa ruta. Un directorio que aún no existe se trata como ruta de archivo: créalo + antes (o termina la ruta en separador). diff --git a/python/functions/ml/comfyui_fetch_output_video.py b/python/functions/ml/comfyui_fetch_output_video.py new file mode 100644 index 00000000..ee08120d --- /dev/null +++ b/python/functions/ml/comfyui_fetch_output_video.py @@ -0,0 +1,171 @@ +"""Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco. + +Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh, pero para los +nodos de video/animacion (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine). +Esos nodos exponen su salida en GET /history/{prompt_id} bajo una lista de items +{filename, subfolder, type}; segun el nodo la clave puede ser "images" (SVD y +SaveVideo nativo lo ponen ahi), "gifs" (VHS_VideoCombine) o "videos". Esta funcion +localiza el primer archivo con extension de video/animacion (.mp4/.webm/.webp/.gif/ +.mkv/.mov/.avif), lo baja via GET /view a disco y, opcionalmente, lo escribe en `dest`. + +Impura: red (HTTP GET a /history y /view) + escritura en disco. Solo stdlib. +""" +import json +import os +import urllib.error +import urllib.parse +import urllib.request + +# Extensiones de video/animacion que producen los nodos de ComfyUI. +_VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif") +# Claves de output preferentes para video/animacion (se inspeccionan primero). +_VIDEO_KEYS = ("gifs", "videos", "animated") + + +def _is_video_item(item: dict) -> bool: + """True si el item de output apunta a un archivo de video/animacion. + + Decide por la extension del filename; como fallback, por un campo "format" + que contenga "video" (algunos nodos VHS/SaveVideo lo anotan). + """ + fn = (item.get("filename") or "").lower() + if fn.endswith(_VIDEO_EXTS): + return True + fmt = (item.get("format") or "").lower() + return "video" in fmt + + +def _find_video_output(outputs: dict) -> dict | None: + """Busca en los outputs de /history el primer archivo de video/animacion. + + Hace dos pasadas: primero en las claves preferentes de video (gifs/videos/ + animated), luego en cualquier clave (cubre SVD/SaveVideo nativo, que exponen + el archivo bajo "images"). Devuelve {filename, subfolder, type} o None. + """ + for prefer in (True, False): + for node_out in outputs.values(): + if not isinstance(node_out, dict): + continue + for key, items in node_out.items(): + if prefer and key not in _VIDEO_KEYS: + continue + if not isinstance(items, list): + continue + for item in items: + if isinstance(item, dict) and _is_video_item(item): + return { + "filename": item.get("filename", ""), + "subfolder": item.get("subfolder", ""), + "type": item.get("type", "output"), + } + return None + + +def _resolve_dest(dest: str | None, filename: str) -> str: + """Resuelve la ruta local destino a partir de `dest` y el basename remoto.""" + base = os.path.basename(filename) + if dest is None: + return os.path.join(os.getcwd(), base) + expanded = os.path.expanduser(dest) + if os.path.isdir(expanded) or expanded.endswith(os.sep): + return os.path.join(expanded, base) + return expanded + + +def comfyui_fetch_output_video( + prompt_id: str, + *, + server: str = "127.0.0.1:8188", + dest: str | None = None, + outputs: dict | None = None, + timeout: float = 120.0, +) -> dict: + """Descarga el video/animacion de un prompt ComfyUI ya ejecutado a disco local. + + Args: + prompt_id: id devuelto por comfyui_submit_workflow, de un workflow cuyo + nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya + termino (usa comfyui_wait_result antes si dudas). Se ignora si se + pasa `outputs`. + server: host:port del servidor ComfyUI (sin esquema). keyword-only. + dest: ruta destino. Si None, escribe el basename del video en el cwd. + Si es un directorio (o termina en separador), escribe el basename + dentro. Si es una ruta de archivo, escribe ahi. keyword-only. + outputs: dict de outputs ya obtenido (el que devuelve comfyui_wait_result). + Si se pasa, se busca el video ahi y NO se consulta /history (evita una + peticion de red extra justo despues de esperar). keyword-only. + timeout: timeout de cada peticion HTTP en segundos. keyword-only. + + Returns: + dict {ok, path, format, bytes, error}. path = ruta local del archivo de + video guardado; format = extension sin punto (ej. "mp4" o "webp"); bytes = + tamano descargado. Si falla, ok=False y error explica (sin video en los + outputs, HTTP, conexion o escritura). + """ + # 1. Obtener los outputs: del parametro (sin red) o consultando /history. + if outputs is None: + hist_url = f"http://{server}/history/{prompt_id}" + try: + with urllib.request.urlopen(hist_url, timeout=timeout) as resp: + hist = json.loads(resp.read()) + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:200] + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"HTTP {exc.code} en {hist_url}: {body}"} + except urllib.error.URLError as exc: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"no se pudo conectar a {hist_url}: {exc.reason}"} + except json.JSONDecodeError as exc: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"respuesta no es JSON valido desde {hist_url}: {exc}"} + entry = hist.get(prompt_id) + if not entry: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"prompt_id {prompt_id} no esta en /history (¿no termino o se purgo?)"} + outputs = entry.get("outputs", {}) + + video = _find_video_output(outputs or {}) + if video is None: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"sin archivo de video/animacion en los outputs de {prompt_id}"} + + # 2. Descargar el archivo via GET /view. + qs = urllib.parse.urlencode({ + "filename": video["filename"], + "subfolder": video["subfolder"], + "type": video["type"], + }) + view_url = f"http://{server}/view?{qs}" + try: + with urllib.request.urlopen(view_url, timeout=timeout) as resp: + blob = resp.read() + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:200] + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"HTTP {exc.code} en {view_url}: {body}"} + except urllib.error.URLError as exc: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"no se pudo conectar a {view_url}: {exc.reason}"} + + # 3. Escribir a disco. + out_path = _resolve_dest(dest, video["filename"]) + try: + parent = os.path.dirname(out_path) + if parent: + os.makedirs(parent, exist_ok=True) + with open(out_path, "wb") as f: + f.write(blob) + except OSError as exc: + return {"ok": False, "path": "", "format": "", "bytes": 0, + "error": f"no se pudo escribir en {out_path!r}: {exc}"} + + fmt = os.path.splitext(video["filename"])[1].lstrip(".").lower() + return {"ok": True, "path": out_path, "format": fmt, "bytes": len(blob), "error": ""} + + +if __name__ == "__main__": + import sys + + pid = sys.argv[1] if len(sys.argv) > 1 else "00000000-0000-0000-0000-000000000000" + res = comfyui_fetch_output_video(pid, dest="/tmp/comfy_video") + print(json.dumps(res, indent=2)) diff --git a/python/functions/ml/comfyui_fetch_output_video_test.py b/python/functions/ml/comfyui_fetch_output_video_test.py new file mode 100644 index 00000000..8bd31c9b --- /dev/null +++ b/python/functions/ml/comfyui_fetch_output_video_test.py @@ -0,0 +1,68 @@ +"""Tests de las partes puras de comfyui_fetch_output_video. + +Cubren la localizacion del output de video/animacion (la logica con sustancia) +sin tocar red ni GPU: clasificacion por extension, fallback por campo "format", +prioridad de claves de video, y ausencia de video. La descarga real via /view se +valida por integracion contra el server (ver report 0110). +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from comfyui_fetch_output_video import _find_video_output, _is_video_item + + +def test_is_video_item_por_extension(): + assert _is_video_item({"filename": "a.mp4"}) + assert _is_video_item({"filename": "a.webp"}) + assert _is_video_item({"filename": "a.webm"}) + assert _is_video_item({"filename": "a.gif"}) + assert not _is_video_item({"filename": "a.png"}) + assert not _is_video_item({"filename": "a.glb"}) + + +def test_is_video_item_fallback_format(): + # VHS_VideoCombine / SaveVideo a veces no dan extension clara pero anotan format. + assert _is_video_item({"filename": "clip", "format": "video/h264-mp4"}) + assert not _is_video_item({"filename": "clip", "format": "image/png"}) + + +def test_find_saveanimatedwebp_bajo_images(): + # SVD / SaveAnimatedWEBP exponen el .webp bajo la clave "images". + outs = {"30": {"images": [{"filename": "svd_00001_.webp", "subfolder": "", "type": "output"}]}} + got = _find_video_output(outs) + assert got is not None and got["filename"] == "svd_00001_.webp" + + +def test_find_savevideo_mp4_bajo_images(): + outs = {"79": {"images": [{"filename": "video_00003_.mp4", "subfolder": "", "type": "output"}]}} + assert _find_video_output(outs)["filename"] == "video_00003_.mp4" + + +def test_find_vhs_bajo_gifs(): + # VHS_VideoCombine expone el resultado bajo "gifs". + outs = {"9": {"gifs": [{"filename": "out.webm", "subfolder": "sub", "type": "output"}]}} + got = _find_video_output(outs) + assert got["filename"] == "out.webm" and got["subfolder"] == "sub" + + +def test_find_prioriza_clave_video_sobre_images(): + # Si hay una clave preferente de video y tambien una imagen suelta, gana el video. + outs = { + "1": {"images": [{"filename": "preview.png", "subfolder": "", "type": "output"}]}, + "2": {"videos": [{"filename": "final.mp4", "subfolder": "", "type": "output"}]}, + } + assert _find_video_output(outs)["filename"] == "final.mp4" + + +def test_find_sin_video_devuelve_none(): + outs = {"1": {"images": [{"filename": "foo.png", "subfolder": "", "type": "output"}]}} + assert _find_video_output(outs) is None + assert _find_video_output({}) is None + + +if __name__ == "__main__": + import pytest + + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md new file mode 100644 index 00000000..e364cb10 --- /dev/null +++ b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md @@ -0,0 +1,124 @@ +--- +name: comfyui_run_foreign_workflow_oneshot +kind: pipeline +lang: py +domain: pipelines +version: "1.0.0" +purity: impure +signature: "def comfyui_run_foreign_workflow_oneshot(source, *, server: str = \"127.0.0.1:8188\", dest: str | None = None, output_kind: str = \"auto\", install_nodes: bool = False, node_repos: dict | None = None, wait_timeout: float = 600.0, civitai_token: str | None = None, hf_token: str | None = None) -> dict" +description: "Ejecuta un workflow ComfyUI AJENO end-to-end en una sola llamada: importa desde cualquier fuente (URL Drive/GitHub/Civitai/HuggingFace, PNG/JSON local, o dict en API format) -> resuelve dependencias (nodos/modelos faltantes) -> valida -> encola -> espera -> descarga los outputs (imagen/video/malla). SEGURIDAD: nunca instala modelos a ciegas (los reporta en 'missing') y solo instala nodos custom si install_nodes=True y el caller pasa su URL de repo confiable en node_repos. Si faltan deps, NO encola. Compone comfyui_download_workflow + resolve_workflow_deps + install_custom_node + validate_workflow + submit_workflow + wait_result + fetch_output_image/video/mesh. Pipeline impuro: HTTP + disco." +tags: [comfyui, pipeline, workflow, foreign, oneshot, ml, video, image, mesh] +uses_functions: + - comfyui_download_workflow_py_ml + - comfyui_resolve_workflow_deps_py_ml + - comfyui_install_custom_node_py_ml + - comfyui_validate_workflow_py_ml + - comfyui_submit_workflow_py_ml + - comfyui_wait_result_py_ml + - comfyui_fetch_output_image_py_ml + - comfyui_fetch_output_video_py_ml + - comfyui_fetch_output_mesh_py_ml +uses_types: [] +returns: [] +returns_optional: false +error_type: error_go_core +imports: [] +params: + - name: source + desc: "Workflow de entrada: URL (Google Drive, GitHub, Civitai, HuggingFace o directa a .json/.png/.webp), ruta local (.json/.png), o dict ya en API format ({node_id: {class_type, inputs}})." + - name: server + desc: "host:port del servidor ComfyUI sin esquema. keyword-only." + - name: dest + desc: "Directorio destino de los outputs descargados. None = cwd. Se crea si no existe. keyword-only." + - name: output_kind + desc: "Que outputs descargar: 'auto' (todos: imagenes + primer video + primera malla), 'image', 'video' o 'mesh'. keyword-only." + - name: install_nodes + desc: "Si True, instala los nodos custom faltantes cuyo class_type este mapeado a una URL de repo confiable en node_repos. Tras instalar hay que reiniciar ComfyUI para cargarlos, asi que el pipeline NO completa el submit en esa llamada. Por defecto False. keyword-only." + - name: node_repos + desc: "Mapa {class_type: repo_url} con las URLs de repo confiables de los nodos custom que se permite instalar. Solo se usa si install_nodes=True. Los modelos faltantes NUNCA se instalan. keyword-only." + - name: wait_timeout + desc: "Segundos maximos esperando a que el server termine. keyword-only." + - name: civitai_token + desc: "Token de Civitai (Bearer) para fuentes gated. keyword-only." + - name: hf_token + desc: "Token de HuggingFace (Bearer) para datasets privados. keyword-only." +output: "dict {ok, prompt_id, outputs, missing, source_type, error}. outputs = lista de rutas locales descargadas. missing = lista de dependencias faltantes (cada una {kind: 'node'|'model', name, action, hint, ...}); no vacia cuando el workflow pide algo que no tenemos, y entonces ok=False y NO se encola. source_type = de donde vino ('github', 'drive', 'local', 'dict', ...). Si falla, ok=False y error explica." +tested: true +tests: + - "test_classify_outputs_reparte_por_extension" + - "test_resolve_workflow_dict_api_format" + - "test_resolve_workflow_dict_mal_formado" + - "test_resolve_workflow_tipo_invalido" + - "test_pipeline_output_kind_invalido_no_toca_server" + - "test_pipeline_dict_mal_formado_no_toca_server" +test_file_path: "python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py" +file_path: "python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py" +--- + +## Ejemplo + +```python +import sys, os +sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) +from pipelines.comfyui_run_foreign_workflow_oneshot import comfyui_run_foreign_workflow_oneshot + +# Workflow ajeno desde GitHub raw -> se importa, valida y ejecuta en nuestro server. +res = comfyui_run_foreign_workflow_oneshot( + "https://raw.githubusercontent.com/cubiq/ComfyUI_Workflows/main/ComfyUI_Simple/SDXL_simple.json", + dest="/tmp/comfy_foreign", +) +# Si faltan deps NO encola: +# res == {"ok": False, "prompt_id": "", "outputs": [], "source_type": "github", +# "missing": [{"kind": "model", "name": "sd_xl_base_1.0.safetensors", ...}], +# "error": "dependencias faltantes; no se encola"} + +# Con todo presente: +# res == {"ok": True, "prompt_id": "...", "outputs": ["/tmp/comfy_foreign/out_00001_.png"], +# "missing": [], "source_type": "github", "error": ""} + +# Tambien acepta un workflow ya validado de la libreria local o un dict en API format. +res2 = comfyui_run_foreign_workflow_oneshot( + os.path.expanduser("~/ComfyUI/workflows_library/txt2img_flux_schnell.api.json"), + dest="/tmp/comfy_foreign", output_kind="image", +) +``` + +Lánzalo con el python del venv (import de arriba o heredoc). `./fn run` directo no aplica porque la firma usa `*` (keyword-only). + +## Cuando usarla + +Cuando te pasan un workflow de ComfyUI de internet (un `.json`/`.png` de Drive, +GitHub, Civitai o HuggingFace, o un dict en API format) y quieres ejecutarlo en +nuestro servidor sin montar el flujo a mano. Hace en una llamada lo que antes eran +6-9 pasos: importar a API format, detectar qué nodos/modelos faltan, (opcionalmente) +instalar nodos custom confiables, validar, encolar, esperar y bajar los outputs. +Úsalo como puerta de entrada segura para workflows de terceros: te dice exactamente +qué falta antes de tocar la GPU. Para builders propios del registry (txt2img, img2vid, +img-to-3d) usa sus pipelines dedicados; este es para workflows que NO construimos nosotros. + +## Gotchas + +- Pipeline impuro: HTTP (import/validate/submit/poll/fetch) + escritura en disco + (+ subprocess git/pip si instala nodos). Requiere el server ComfyUI vivo. +- **Seguridad — código de terceros**: un workflow ajeno se ejecuta como grafo en + NUESTRO servidor. Por eso el pipeline SIEMPRE resuelve dependencias antes del + submit y NUNCA instala nada a ciegas. Los **modelos** faltantes se REPORTAN en + `missing`, jamás se descargan automáticamente. Los **nodos custom** solo se instalan + si `install_nodes=True` y el caller pasa la URL del repo en `node_repos` (fuente + confiable explícita). +- Tras instalar un nodo custom, ComfyUI debe reiniciarse para cargarlo (no en + caliente: cortaría generaciones en curso). Por eso, si instala nodos, el pipeline + devuelve `ok=False` con la instrucción de reiniciar y reintentar — no completa el + submit en esa misma llamada. +- Si tras resolver quedan dependencias faltantes (modelos, o nodos sin cargar), NO + encola: `ok=False` con `missing` poblado. Revisa `missing` para saber qué bajar/instalar. +- `output_kind="auto"` baja todas las imágenes + el primer vídeo + la primera malla. + Un `.webp` se clasifica como vídeo/animación (no imagen estática). +- `wait_timeout` por defecto 600s cubre vídeo/3D; súbelo para workflows muy largos. +- `dest` se trata siempre como directorio y se crea si no existe. + +## Capability growth log + +- v1.0.0 (2026-06-24) — creación. Promueve la secuencia del roadmap 0064/0087 + (import → resolve deps → validate → submit → wait → fetch) a un pipeline one-shot + para workflows ComfyUI ajenos, con gate de dependencias y sin instalación a ciegas. diff --git a/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py new file mode 100644 index 00000000..cde317d7 --- /dev/null +++ b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py @@ -0,0 +1,275 @@ +"""comfyui_run_foreign_workflow_oneshot — ejecuta un workflow ComfyUI ajeno end-to-end. + +Promocion de la secuencia del roadmap (issue 0064/0087): traer un workflow +foraneo (URL de Drive/GitHub/Civitai/HuggingFace, PNG/JSON local, o dict en API +format) y ejecutarlo en NUESTRO servidor ComfyUI en una sola llamada, resolviendo +antes sus dependencias para no fallar a ciegas. Compone funciones del registry del +grupo `comfyui`: + + comfyui_download_workflow_py_ml (cualquier fuente -> API format) + comfyui_resolve_workflow_deps_py_ml (detecta nodos/modelos faltantes) + comfyui_install_custom_node_py_ml (instala nodos custom, opt-in + confiable) + comfyui_validate_workflow_py_ml (revalida tras instalar) + comfyui_submit_workflow_py_ml (POST /prompt) + comfyui_wait_result_py_ml (poll /history) + comfyui_fetch_output_image_py_ml (GET /view -> disco, imagenes) + comfyui_fetch_output_video_py_ml (GET /view -> disco, video/animacion) + comfyui_fetch_output_mesh_py_ml (GET /view -> disco, mallas 3D) + +SEGURIDAD: un workflow foraneo es codigo de terceros que se ejecuta como grafo en +nuestro servidor. Por eso el pipeline SIEMPRE resuelve dependencias antes del +submit y NUNCA instala nada a ciegas: + - Modelos faltantes -> se REPORTAN en `missing`, jamas se descargan + automaticamente (no hay forma de saber si la fuente es confiable). + - Nodos custom faltantes -> solo se instalan si install_nodes=True Y el caller + pasa su URL de repo en `node_repos` (fuente confiable explicita). Tras + instalar, ComfyUI debe reiniciarse para cargarlos, asi que el pipeline NO + completa el submit en esa misma llamada: devuelve ok=False con la instruccion + de reiniciar y reintentar. +Si tras resolver quedan dependencias faltantes, el pipeline NO encola: devuelve +ok=False con `missing` poblado. + +Pipeline impuro: red (HTTP) + escritura en disco (+ subprocess git/pip si se +instalan nodos custom). +""" + +from __future__ import annotations + +import json +import os +import sys + +# Importa las funciones del registry (mismo arbol python/functions). +_FUNCTIONS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if _FUNCTIONS_ROOT not in sys.path: + sys.path.insert(0, _FUNCTIONS_ROOT) + +from ml.comfyui_download_workflow import comfyui_download_workflow +from ml.comfyui_fetch_output_image import comfyui_fetch_output_image +from ml.comfyui_fetch_output_mesh import comfyui_fetch_output_mesh +from ml.comfyui_fetch_output_video import comfyui_fetch_output_video +from ml.comfyui_install_custom_node import comfyui_install_custom_node +from ml.comfyui_resolve_workflow_deps import comfyui_resolve_workflow_deps +from ml.comfyui_submit_workflow import comfyui_submit_workflow +from ml.comfyui_validate_workflow import comfyui_validate_workflow +from ml.comfyui_wait_result import comfyui_wait_result + +# Clasificacion de los outputs por extension del filename. +_IMAGE_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tiff") +_VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif") +_MESH_EXTS = (".glb", ".gltf", ".obj", ".ply", ".fbx", ".stl", ".usdz", ".splat") + + +def _classify_outputs(outputs: dict) -> dict: + """Reparte los items de los outputs de /history en image/video/mesh por extension. + + Devuelve {"image": [items], "video": [items], "mesh": [items]} donde cada + item es {filename, subfolder, type}. Un .webp se considera video/animacion + (los nodos de animacion de ComfyUI lo usan); las imagenes estaticas usan png/jpg. + """ + buckets: dict[str, list] = {"image": [], "video": [], "mesh": []} + for node_out in outputs.values(): + if not isinstance(node_out, dict): + continue + for items in node_out.values(): + if not isinstance(items, list): + continue + for item in items: + if not isinstance(item, dict): + continue + fn = (item.get("filename") or "").lower() + rec = { + "filename": item.get("filename", ""), + "subfolder": item.get("subfolder", ""), + "type": item.get("type", "output"), + } + if fn.endswith(_MESH_EXTS): + buckets["mesh"].append(rec) + elif fn.endswith(_VIDEO_EXTS): + buckets["video"].append(rec) + elif fn.endswith(_IMAGE_EXTS): + buckets["image"].append(rec) + return buckets + + +def _resolve_workflow(source, server, civitai_token, hf_token): + """Resuelve el `source` (dict | str) a un workflow en API format. + + Devuelve (workflow, source_type, error). Si source ya es un dict en API + format se usa tal cual; si es str se delega a comfyui_download_workflow. + """ + if isinstance(source, dict): + if source and all( + isinstance(v, dict) and "class_type" in v for v in source.values() + ): + return source, "dict", "" + return {}, "dict", "el dict pasado no esta en API format ({node_id: {class_type, inputs}})" + if not isinstance(source, str): + return {}, "", f"source debe ser str (URL/path) o dict (API format), no {type(source).__name__}" + dl = comfyui_download_workflow( + source, server=server, civitai_token=civitai_token, hf_token=hf_token + ) + if not dl.get("ok"): + return {}, dl.get("source_type", ""), dl.get("error", "no se pudo importar el workflow") + return dl.get("workflow", {}), dl.get("source_type", ""), "" + + +def comfyui_run_foreign_workflow_oneshot( + source, + *, + server: str = "127.0.0.1:8188", + dest: str | None = None, + output_kind: str = "auto", + install_nodes: bool = False, + node_repos: dict | None = None, + wait_timeout: float = 600.0, + civitai_token: str | None = None, + hf_token: str | None = None, +) -> dict: + """Importa, valida, ejecuta y descarga los outputs de un workflow ComfyUI ajeno. + + Args: + source: workflow de entrada. Puede ser una URL (Google Drive, GitHub, + Civitai, HuggingFace o directa a .json/.png/.webp), una ruta local + (.json/.png), o un dict ya en API format ({node_id: {class_type, + inputs}}). + server: host:port del servidor ComfyUI (sin esquema). keyword-only. + dest: directorio destino de los outputs descargados. None = cwd. + keyword-only. + output_kind: que outputs descargar: "auto" (todos: imagenes + el primer + video + la primera malla), "image", "video" o "mesh". keyword-only. + install_nodes: si True, instala los nodos custom faltantes cuyo class_type + este mapeado a una URL de repo confiable en `node_repos`. Tras instalar + hay que reiniciar ComfyUI para cargarlos, asi que el pipeline NO + completa el submit en esa llamada. Por defecto False. keyword-only. + node_repos: mapa {class_type: repo_url} con las URLs de repo confiables de + los nodos custom que se permite instalar. Solo se usa si + install_nodes=True. Los modelos faltantes NUNCA se instalan. keyword-only. + wait_timeout: segundos maximos esperando a que el server termine. + keyword-only. + civitai_token / hf_token: tokens opcionales para fuentes gated. keyword-only. + + Returns: + dict {ok, prompt_id, outputs, missing, source_type, error}: + - outputs: lista de rutas locales de los archivos descargados. + - missing: lista de dependencias faltantes (suggestions de + comfyui_resolve_workflow_deps): cada una {kind: "node"|"model", name, + action, hint, ...}. No vacia cuando el workflow pide algo que no + tenemos; en ese caso ok=False y NO se encola. + - source_type: de donde vino el workflow ("github", "drive", "local", + "dict", ...). + Si falla en cualquier paso, ok=False y error explica el motivo. Nunca + instala modelos ni nodos de fuentes no confiables a ciegas. + """ + if output_kind not in ("auto", "image", "video", "mesh"): + return {"ok": False, "prompt_id": "", "outputs": [], "missing": [], + "source_type": "", "error": f"output_kind {output_kind!r} no valido"} + + # 1. Importar el workflow a API format desde cualquier fuente. + workflow, source_type, err = _resolve_workflow(source, server, civitai_token, hf_token) + if err: + return {"ok": False, "prompt_id": "", "outputs": [], "missing": [], + "source_type": source_type, "error": f"import fallo: {err}"} + if not workflow: + return {"ok": False, "prompt_id": "", "outputs": [], "missing": [], + "source_type": source_type, "error": "el workflow importado esta vacio"} + + # 2. Resolver dependencias (nodos/modelos faltantes) ANTES de encolar. + deps = comfyui_resolve_workflow_deps(workflow, server=server) + if not deps.get("ok"): + return {"ok": False, "prompt_id": "", "outputs": [], "missing": [], + "source_type": source_type, + "error": f"no se pudieron resolver dependencias (¿server vivo?): {deps.get('error')}"} + + missing_nodes = list(deps.get("missing_nodes", [])) + missing_models = list(deps.get("missing_models", [])) + suggestions = list(deps.get("suggestions", [])) + + # 3. Instalar SOLO nodos custom confiables (opt-in + URL provista por el caller). + install_notes = [] + repos = node_repos or {} + if install_nodes and missing_nodes and repos: + for ctype in list(missing_nodes): + repo = repos.get(ctype) + if not repo: + continue # sin URL confiable -> no se instala, queda en missing. + res = comfyui_install_custom_node(repo, restart=False) + if res.get("ok"): + install_notes.append(f"nodo '{ctype}' instalado desde {repo}") + else: + install_notes.append(f"fallo instalando '{ctype}': {res.get('error')}") + + # 4. Si quedan dependencias faltantes (modelos, o nodos sin cargar) -> NO encolar. + # Los nodos recien instalados requieren reiniciar ComfyUI; no se cargan ahora. + if missing_nodes or missing_models: + note = "dependencias faltantes; no se encola" + if install_notes: + note += ". " + "; ".join(install_notes) + note += ". Reinicia ComfyUI (cuando no haya generaciones en curso) y reintenta" + return {"ok": False, "prompt_id": "", "outputs": [], "missing": suggestions, + "source_type": source_type, "error": note} + + # 5. Encolar el workflow. + try: + sub = comfyui_submit_workflow(workflow, server=server) + prompt_id = sub["prompt_id"] + except (RuntimeError, KeyError) as exc: + return {"ok": False, "prompt_id": "", "outputs": [], "missing": [], + "source_type": source_type, "error": f"submit fallo: {exc}"} + + # 6. Esperar a que termine. + try: + outputs = comfyui_wait_result(prompt_id, server=server, timeout=wait_timeout) + except (TimeoutError, RuntimeError) as exc: + return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [], + "source_type": source_type, "error": f"wait fallo: {exc}"} + + # 7. Descargar los outputs segun output_kind. `dest` se trata SIEMPRE como + # directorio (el workflow puede producir varios archivos): se crea de + # antemano para que fetch_video/fetch_mesh escriban el basename dentro + # (si el dir no existe, esos helpers interpretarian la ruta como archivo). + out_dir = dest or "." + try: + os.makedirs(out_dir, exist_ok=True) + except OSError as exc: + return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [], + "source_type": source_type, "error": f"no se pudo crear dest {out_dir!r}: {exc}"} + + buckets = _classify_outputs(outputs) + paths: list[str] = [] + want_image = output_kind in ("auto", "image") + want_video = output_kind in ("auto", "video") + want_mesh = output_kind in ("auto", "mesh") + + if want_image: + for item in buckets["image"]: + r = comfyui_fetch_output_image( + item["filename"], subfolder=item["subfolder"], + type_=item["type"], server=server, dest_dir=out_dir, + ) + if r.get("ok"): + paths.append(r["path"]) + if want_video and buckets["video"]: + r = comfyui_fetch_output_video(prompt_id, server=server, dest=out_dir, outputs=outputs) + if r.get("ok"): + paths.append(r["path"]) + if want_mesh and buckets["mesh"]: + r = comfyui_fetch_output_mesh(prompt_id, server=server, dest=out_dir) + if r.get("ok"): + paths.append(r["path"]) + + if not paths: + return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [], + "source_type": source_type, + "error": f"el workflow termino pero no se descargo ningun output de tipo {output_kind!r}"} + + return {"ok": True, "prompt_id": prompt_id, "outputs": paths, "missing": [], + "source_type": source_type, "error": ""} + + +if __name__ == "__main__": + # Smoke: ejecuta un workflow ya validado de la libreria local (deps presentes). + src = sys.argv[1] if len(sys.argv) > 1 else os.path.expanduser( + "~/ComfyUI/workflows_library/txt2img_flux_schnell.api.json") + res = comfyui_run_foreign_workflow_oneshot(src, dest="/tmp/comfy_foreign") + print(json.dumps(res, indent=2)) diff --git a/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py new file mode 100644 index 00000000..9776f8d1 --- /dev/null +++ b/python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py @@ -0,0 +1,63 @@ +"""Tests de las partes puras de comfyui_run_foreign_workflow_oneshot. + +Cubren sin red ni GPU: la clasificacion de outputs por tipo, la resolucion de un +`source` que ya es dict en API format, y las ramas de error tempranas que retornan +antes de tocar el servidor (output_kind invalido, source de tipo invalido, dict mal +formado). El flujo completo import->resolve->submit->wait->fetch se valida por +integracion contra el server (ver report 0110). +""" +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from pipelines.comfyui_run_foreign_workflow_oneshot import ( + _classify_outputs, + _resolve_workflow, + comfyui_run_foreign_workflow_oneshot, +) + + +def test_classify_outputs_reparte_por_extension(): + outs = { + "1": {"images": [{"filename": "a.png", "subfolder": "", "type": "output"}]}, + "2": {"gifs": [{"filename": "b.webm", "subfolder": "", "type": "output"}]}, + "3": {"images": [{"filename": "c.webp", "subfolder": "", "type": "output"}]}, + "4": {"3d": [{"filename": "d.glb", "subfolder": "", "type": "output"}]}, + } + b = _classify_outputs(outs) + assert [i["filename"] for i in b["image"]] == ["a.png"] + assert sorted(i["filename"] for i in b["video"]) == ["b.webm", "c.webp"] + assert [i["filename"] for i in b["mesh"]] == ["d.glb"] + + +def test_resolve_workflow_dict_api_format(): + wf = {"1": {"class_type": "CheckpointLoaderSimple", "inputs": {}}} + got, st, err = _resolve_workflow(wf, "127.0.0.1:8188", None, None) + assert err == "" and st == "dict" and got is wf + + +def test_resolve_workflow_dict_mal_formado(): + got, st, err = _resolve_workflow({"1": {"no_class": 1}}, "127.0.0.1:8188", None, None) + assert got == {} and "API format" in err + + +def test_resolve_workflow_tipo_invalido(): + got, st, err = _resolve_workflow(123, "127.0.0.1:8188", None, None) + assert got == {} and "source debe ser" in err + + +def test_pipeline_output_kind_invalido_no_toca_server(): + r = comfyui_run_foreign_workflow_oneshot("cualquier", output_kind="bogus") + assert not r["ok"] and "output_kind" in r["error"] and r["prompt_id"] == "" + + +def test_pipeline_dict_mal_formado_no_toca_server(): + r = comfyui_run_foreign_workflow_oneshot({"1": {"no_class": 1}}, server="127.0.0.1:8188") + assert not r["ok"] and "import fallo" in r["error"] and r["prompt_id"] == "" + + +if __name__ == "__main__": + import pytest + + raise SystemExit(pytest.main([__file__, "-v"]))