feat(ml): comfyui_run_foreign_workflow_oneshot + helper fetch_output_video

Pipeline one-shot para ejecutar workflows ComfyUI ajenos end-to-end
(import desde cualquier fuente -> resolve deps -> validate -> submit ->
wait -> fetch del output imagen/video/malla) componiendo 9 funciones
existentes del grupo comfyui. Gate de seguridad: si faltan nodos/modelos
NO encola y los reporta en `missing`; nunca descarga modelos a ciegas y
solo instala nodos custom confiables opt-in (install_nodes + node_repos).

Helper comfyui_fetch_output_video: hermana de fetch_output_image y
fetch_output_mesh para los nodos de video/animacion (SaveAnimatedWEBP,
SaveVideo nativo, VHS_VideoCombine). Localiza el output bajo images/gifs/
videos en /history y lo baja via /view a disco; acepta outputs= de
wait_result para evitar re-consultar /history.

Cierra la pieza marcada por el completeness critic (report 0107) del
roadmap 0064/0087. 13 tests unitarios de las partes puras en verde;
validacion de integracion contra server vivo sin generacion pesada
(report 0110).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-24 12:53:40 +02:00
parent 898502a321
commit e8a66f0dad
7 changed files with 794 additions and 0 deletions
+2
View File
@@ -67,6 +67,7 @@ El **API format** (dict de nodos numerados que produce `build_txt2img_workflow`
| [comfyui_download_workflow_py_ml](../../python/functions/ml/comfyui_download_workflow.md) | `download_workflow(source, dest=None, *, server, civitai_token, hf_token, timeout) -> dict` | **Dispatcher**: descarga un workflow de CUALQUIER fuente (Google Drive, GitHub, Civitai, HuggingFace, URL directa o path local) y lo normaliza a API format. Detecta el tipo por la URL y delega; tras bajar compone `import_workflow_json`/`import_workflow_png`. Catálogo de fuentes: `reports/0080`. Impura. | | [comfyui_download_workflow_py_ml](../../python/functions/ml/comfyui_download_workflow.md) | `download_workflow(source, dest=None, *, server, civitai_token, hf_token, timeout) -> dict` | **Dispatcher**: descarga un workflow de CUALQUIER fuente (Google Drive, GitHub, Civitai, HuggingFace, URL directa o path local) y lo normaliza a API format. Detecta el tipo por la URL y delega; tras bajar compone `import_workflow_json`/`import_workflow_png`. Catálogo de fuentes: `reports/0080`. Impura. |
| [comfyui_read_png_metadata_py_ml](../../python/functions/ml/comfyui_read_png_metadata.md) | `read_png_metadata(png_path) -> dict` | Lee los parámetros de generación (modelo, seed, steps, cfg, sampler, prompts) de un PNG generado por ComfyUI. Impura (I/O disco). | | [comfyui_read_png_metadata_py_ml](../../python/functions/ml/comfyui_read_png_metadata.md) | `read_png_metadata(png_path) -> dict` | Lee los parámetros de generación (modelo, seed, steps, cfg, sampler, prompts) de un PNG generado por ComfyUI. Impura (I/O disco). |
| [comfyui_fetch_output_image_py_ml](../../python/functions/ml/comfyui_fetch_output_image.md) | `fetch_output_image(filename, *, subfolder='', type_='output', server, dest_dir='.', timeout) -> dict` | Descarga el PNG generado vía GET `/view` a disco local (`wait_result` solo da metadata). Impura. | | [comfyui_fetch_output_image_py_ml](../../python/functions/ml/comfyui_fetch_output_image.md) | `fetch_output_image(filename, *, subfolder='', type_='output', server, dest_dir='.', timeout) -> dict` | Descarga el PNG generado vía GET `/view` a disco local (`wait_result` solo da metadata). Impura. |
| [comfyui_fetch_output_video_py_ml](../../python/functions/ml/comfyui_fetch_output_video.md) | `fetch_output_video(prompt_id, *, server, dest=None, outputs=None, timeout) -> dict` | Localiza y descarga el output de **vídeo/animación** (`.mp4`/`.webp`/`.webm`/`.gif`) de `/history` vía GET `/view`. Cubre SaveAnimatedWEBP/SaveVideo (bajo `"images"`) y VHS_VideoCombine (bajo `"gifs"`). Hermana de `fetch_output_image`/`fetch_output_mesh`. Acepta `outputs=` de `wait_result` para evitar re-consultar `/history`. Impura. |
### Potencia y assets de internet — dominio `ml` (P1, issue 0064) ### Potencia y assets de internet — dominio `ml` (P1, issue 0064)
@@ -78,6 +79,7 @@ El **API format** (dict de nodos numerados que produce `build_txt2img_workflow`
| [comfyui_search_civitai_models_py_ml](../../python/functions/ml/comfyui_search_civitai_models.md) | `search_civitai_models(query, *, types='Checkpoint', base_model=None, sort, limit=20, token=None) -> dict` | Busca modelos/LoRAs en la API pública de Civitai → `{ok, items:[{name, type, base_model, version_id, download_url, nsfw}], count, error}`. Sin token funciona. Impura. | | [comfyui_search_civitai_models_py_ml](../../python/functions/ml/comfyui_search_civitai_models.md) | `search_civitai_models(query, *, types='Checkpoint', base_model=None, sort, limit=20, token=None) -> dict` | Busca modelos/LoRAs en la API pública de Civitai → `{ok, items:[{name, type, base_model, version_id, download_url, nsfw}], count, error}`. Sin token funciona. Impura. |
| [comfyui_install_custom_node_py_ml](../../python/functions/ml/comfyui_install_custom_node.md) | `install_custom_node(repo_url, *, comfyui_dir, pip_install=True, restart=False) -> dict` | git clone en `custom_nodes/` + pip/uv install de requirements en el venv de ComfyUI. NO reinicia el server (restart=False). Impura. | | [comfyui_install_custom_node_py_ml](../../python/functions/ml/comfyui_install_custom_node.md) | `install_custom_node(repo_url, *, comfyui_dir, pip_install=True, restart=False) -> dict` | git clone en `custom_nodes/` + pip/uv install de requirements en el venv de ComfyUI. NO reinicia el server (restart=False). Impura. |
| [comfyui_resolve_workflow_deps_py_ml](../../python/functions/ml/comfyui_resolve_workflow_deps.md) | `resolve_workflow_deps(workflow, server='127.0.0.1:8188') -> dict` | Para un workflow ajeno: valida y traduce lo que falta en acciones (`{missing_nodes, missing_models, suggestions}`). Compone `validate_workflow`. Impura. | | [comfyui_resolve_workflow_deps_py_ml](../../python/functions/ml/comfyui_resolve_workflow_deps.md) | `resolve_workflow_deps(workflow, server='127.0.0.1:8188') -> dict` | Para un workflow ajeno: valida y traduce lo que falta en acciones (`{missing_nodes, missing_models, suggestions}`). Compone `validate_workflow`. Impura. |
| [comfyui_run_foreign_workflow_oneshot_py_pipelines](../../python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.md) | `run_foreign_workflow_oneshot(source, *, server, dest=None, output_kind='auto', install_nodes=False, node_repos=None, wait_timeout, civitai_token, hf_token) -> dict` | **Pipeline** para ejecutar un workflow ComfyUI **ajeno** end-to-end en una llamada: import (cualquier fuente) → resolve deps → (instala solo nodos confiables opt-in) → validate → submit → wait → fetch (imagen/vídeo/malla). **Gate de seguridad**: si faltan deps NO encola y las reporta en `missing`; nunca descarga modelos a ciegas. Compone `download_workflow` + `resolve_workflow_deps` + `install_custom_node` + `submit`/`wait` + `fetch_output_image/video/mesh`. Promoción del roadmap 0064/0087. Impuro. |
| [comfyui_list_installed_models_py_ml](../../python/functions/ml/comfyui_list_installed_models.md) | `list_installed_models(folder=None, comfyui_dir='~/ComfyUI') -> dict` | Lista modelos por carpeta resolviendo la ruta real de `extra_model_paths.yaml` (`/mnt/2tb/comfyui_models/`) + la nativa. Escaneo de FS, no depende del server. Impura. | | [comfyui_list_installed_models_py_ml](../../python/functions/ml/comfyui_list_installed_models.md) | `list_installed_models(folder=None, comfyui_dir='~/ComfyUI') -> dict` | Lista modelos por carpeta resolviendo la ruta real de `extra_model_paths.yaml` (`/mnt/2tb/comfyui_models/`) + la nativa. Escaneo de FS, no depende del server. Impura. |
### Retoque pro y oneshot — dominio `ml` + `pipelines` (P0, lote report 0093) ### Retoque pro y oneshot — dominio `ml` + `pipelines` (P0, lote report 0093)
@@ -0,0 +1,91 @@
---
name: comfyui_fetch_output_video
kind: function
lang: py
domain: ml
version: "1.0.0"
purity: impure
signature: "def comfyui_fetch_output_video(prompt_id: str, *, server: str = \"127.0.0.1:8188\", dest: str | None = None, outputs: dict | None = None, timeout: float = 120.0) -> dict"
description: "Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco local. Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh pero para los nodos de video (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine): esos exponen su salida en GET /history bajo 'images', 'gifs' o 'videos' con items {filename, subfolder, type}. Localiza el primer .mp4/.webm/.webp/.gif/.mkv/.mov/.avif, lo baja via GET /view y opcionalmente lo escribe en dest. Acepta outputs= ya obtenido de comfyui_wait_result para evitar re-consultar /history. Impura: HTTP GET + escritura en disco, solo stdlib."
tags: [comfyui, video, fetch, animation, ml, download, workflow]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
params:
- name: prompt_id
desc: "id devuelto por comfyui_submit_workflow, de un workflow cuyo nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya termino (usa comfyui_wait_result antes si dudas). Se ignora si se pasa outputs."
- name: server
desc: "host:port del servidor ComfyUI sin esquema. keyword-only."
- name: dest
desc: "Ruta destino. Si None, escribe el basename del video en el cwd. Si es un directorio existente (o termina en separador), escribe el basename dentro. Si es una ruta de archivo, escribe ahi. keyword-only."
- name: outputs
desc: "dict de outputs ya obtenido (el que devuelve comfyui_wait_result). Si se pasa, se busca el video ahi y NO se consulta /history (evita una peticion de red extra). keyword-only."
- name: timeout
desc: "Timeout de cada peticion HTTP en segundos. keyword-only."
output: "dict {ok, path, format, bytes, error}. path = ruta local del archivo de video guardado, format = extension sin punto (ej. 'mp4' o 'webp'), bytes = bytes descargados. Si falla, ok=False y error explica (sin video en los outputs, HTTP, conexion o escritura)."
tested: true
tests:
- "test_is_video_item_por_extension"
- "test_is_video_item_fallback_format"
- "test_find_saveanimatedwebp_bajo_images"
- "test_find_savevideo_mp4_bajo_images"
- "test_find_vhs_bajo_gifs"
- "test_find_prioriza_clave_video_sobre_images"
- "test_find_sin_video_devuelve_none"
test_file_path: "python/functions/ml/comfyui_fetch_output_video_test.py"
file_path: "python/functions/ml/comfyui_fetch_output_video.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions"))
from ml.comfyui_fetch_output_video import comfyui_fetch_output_video
# Tras comfyui_submit_workflow + comfyui_wait_result de un workflow de video
# (SVD img2vid, LTX, AnimateDiff...), baja el .mp4/.webp al disco.
res = comfyui_fetch_output_video("8a278988-8a94-4225-add3-88a406f7101c", dest="/tmp/videos")
# res == {"ok": True, "path": "/tmp/videos/video_00003_.mp4",
# "format": "mp4", "bytes": 199413, "error": ""}
# Si ya tienes los outputs de comfyui_wait_result, pasalos y evita re-consultar /history:
outputs = {"79": {"images": [{"filename": "video_00003_.mp4", "subfolder": "", "type": "output"}]}}
res2 = comfyui_fetch_output_video("ignored", dest="/tmp/videos", outputs=outputs)
```
Lánzalo con el python del venv (import de arriba o heredoc). Nota: `./fn run` directo no aplica porque la firma usa `*` (keyword-only), no soportado por el generador de runner de `fn run`.
## Cuando usarla
Después de generar un vídeo o animación con ComfyUI (img2vid SVD, LTX, AnimateDiff,
Wan, cualquier workflow con SaveAnimatedWEBP / SaveVideo / VHS_VideoCombine), cuando
necesites el archivo `.mp4`/`.webp`/`.webm`/`.gif` real en disco (no solo su nombre):
para reproducirlo, subirlo a un vault, o post-procesarlo. Es la hermana de
`comfyui_fetch_output_image` (imágenes estáticas) y `comfyui_fetch_output_mesh`
(mallas 3D). Para el flujo completo de un workflow ajeno usa el pipeline
`comfyui_run_foreign_workflow_oneshot`, que ya elige este fetch según el tipo de output.
## Gotchas
- Impura: hace HTTP GET a /history y /view y escribe en disco. Requiere el server
vivo y que el prompt YA haya terminado (usa `comfyui_wait_result` antes, o pásale
`outputs=`).
- Distintos nodos exponen el vídeo bajo distintas claves: SVD y SaveVideo nativo lo
ponen bajo `"images"`, VHS_VideoCombine bajo `"gifs"`, otros bajo `"videos"`. La
función inspecciona primero las claves preferentes (gifs/videos/animated) y luego
cualquier clave, así que cubre los tres casos.
- Un `.webp` se trata como animación/vídeo (los nodos de animación de ComfyUI lo
usan). Si tu workflow guarda una imagen `.webp` ESTÁTICA, esta función la bajaría
igual; para garantizar imagen estática usa `comfyui_fetch_output_image` con el
filename concreto.
- Toma el PRIMER archivo de vídeo que encuentra. Si un workflow exporta varios,
baja solo uno; para los demás llama otra vez o usa GET /view con el filename concreto.
- El history se purga al reiniciar el server: si el prompt ya no está, devuelve
`ok=False`. Pasar `outputs=` evita esa consulta y el problema.
- `dest` se interpreta: None -> cwd; directorio EXISTENTE -> dentro; ruta de archivo
-> esa ruta. Un directorio que aún no existe se trata como ruta de archivo: créalo
antes (o termina la ruta en separador).
@@ -0,0 +1,171 @@
"""Localiza y descarga el output de video/animacion de un workflow ComfyUI a disco.
Hermana de comfyui_fetch_output_image y comfyui_fetch_output_mesh, pero para los
nodos de video/animacion (SaveAnimatedWEBP, SaveVideo nativo, VHS_VideoCombine).
Esos nodos exponen su salida en GET /history/{prompt_id} bajo una lista de items
{filename, subfolder, type}; segun el nodo la clave puede ser "images" (SVD y
SaveVideo nativo lo ponen ahi), "gifs" (VHS_VideoCombine) o "videos". Esta funcion
localiza el primer archivo con extension de video/animacion (.mp4/.webm/.webp/.gif/
.mkv/.mov/.avif), lo baja via GET /view a disco y, opcionalmente, lo escribe en `dest`.
Impura: red (HTTP GET a /history y /view) + escritura en disco. Solo stdlib.
"""
import json
import os
import urllib.error
import urllib.parse
import urllib.request
# Extensiones de video/animacion que producen los nodos de ComfyUI.
_VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif")
# Claves de output preferentes para video/animacion (se inspeccionan primero).
_VIDEO_KEYS = ("gifs", "videos", "animated")
def _is_video_item(item: dict) -> bool:
"""True si el item de output apunta a un archivo de video/animacion.
Decide por la extension del filename; como fallback, por un campo "format"
que contenga "video" (algunos nodos VHS/SaveVideo lo anotan).
"""
fn = (item.get("filename") or "").lower()
if fn.endswith(_VIDEO_EXTS):
return True
fmt = (item.get("format") or "").lower()
return "video" in fmt
def _find_video_output(outputs: dict) -> dict | None:
"""Busca en los outputs de /history el primer archivo de video/animacion.
Hace dos pasadas: primero en las claves preferentes de video (gifs/videos/
animated), luego en cualquier clave (cubre SVD/SaveVideo nativo, que exponen
el archivo bajo "images"). Devuelve {filename, subfolder, type} o None.
"""
for prefer in (True, False):
for node_out in outputs.values():
if not isinstance(node_out, dict):
continue
for key, items in node_out.items():
if prefer and key not in _VIDEO_KEYS:
continue
if not isinstance(items, list):
continue
for item in items:
if isinstance(item, dict) and _is_video_item(item):
return {
"filename": item.get("filename", ""),
"subfolder": item.get("subfolder", ""),
"type": item.get("type", "output"),
}
return None
def _resolve_dest(dest: str | None, filename: str) -> str:
"""Resuelve la ruta local destino a partir de `dest` y el basename remoto."""
base = os.path.basename(filename)
if dest is None:
return os.path.join(os.getcwd(), base)
expanded = os.path.expanduser(dest)
if os.path.isdir(expanded) or expanded.endswith(os.sep):
return os.path.join(expanded, base)
return expanded
def comfyui_fetch_output_video(
prompt_id: str,
*,
server: str = "127.0.0.1:8188",
dest: str | None = None,
outputs: dict | None = None,
timeout: float = 120.0,
) -> dict:
"""Descarga el video/animacion de un prompt ComfyUI ya ejecutado a disco local.
Args:
prompt_id: id devuelto por comfyui_submit_workflow, de un workflow cuyo
nodo de video (SaveAnimatedWEBP/SaveVideo/VHS_VideoCombine) ya
termino (usa comfyui_wait_result antes si dudas). Se ignora si se
pasa `outputs`.
server: host:port del servidor ComfyUI (sin esquema). keyword-only.
dest: ruta destino. Si None, escribe el basename del video en el cwd.
Si es un directorio (o termina en separador), escribe el basename
dentro. Si es una ruta de archivo, escribe ahi. keyword-only.
outputs: dict de outputs ya obtenido (el que devuelve comfyui_wait_result).
Si se pasa, se busca el video ahi y NO se consulta /history (evita una
peticion de red extra justo despues de esperar). keyword-only.
timeout: timeout de cada peticion HTTP en segundos. keyword-only.
Returns:
dict {ok, path, format, bytes, error}. path = ruta local del archivo de
video guardado; format = extension sin punto (ej. "mp4" o "webp"); bytes =
tamano descargado. Si falla, ok=False y error explica (sin video en los
outputs, HTTP, conexion o escritura).
"""
# 1. Obtener los outputs: del parametro (sin red) o consultando /history.
if outputs is None:
hist_url = f"http://{server}/history/{prompt_id}"
try:
with urllib.request.urlopen(hist_url, timeout=timeout) as resp:
hist = json.loads(resp.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors="replace")[:200]
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"HTTP {exc.code} en {hist_url}: {body}"}
except urllib.error.URLError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo conectar a {hist_url}: {exc.reason}"}
except json.JSONDecodeError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"respuesta no es JSON valido desde {hist_url}: {exc}"}
entry = hist.get(prompt_id)
if not entry:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"prompt_id {prompt_id} no esta en /history (¿no termino o se purgo?)"}
outputs = entry.get("outputs", {})
video = _find_video_output(outputs or {})
if video is None:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"sin archivo de video/animacion en los outputs de {prompt_id}"}
# 2. Descargar el archivo via GET /view.
qs = urllib.parse.urlencode({
"filename": video["filename"],
"subfolder": video["subfolder"],
"type": video["type"],
})
view_url = f"http://{server}/view?{qs}"
try:
with urllib.request.urlopen(view_url, timeout=timeout) as resp:
blob = resp.read()
except urllib.error.HTTPError as exc:
body = exc.read().decode(errors="replace")[:200]
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"HTTP {exc.code} en {view_url}: {body}"}
except urllib.error.URLError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo conectar a {view_url}: {exc.reason}"}
# 3. Escribir a disco.
out_path = _resolve_dest(dest, video["filename"])
try:
parent = os.path.dirname(out_path)
if parent:
os.makedirs(parent, exist_ok=True)
with open(out_path, "wb") as f:
f.write(blob)
except OSError as exc:
return {"ok": False, "path": "", "format": "", "bytes": 0,
"error": f"no se pudo escribir en {out_path!r}: {exc}"}
fmt = os.path.splitext(video["filename"])[1].lstrip(".").lower()
return {"ok": True, "path": out_path, "format": fmt, "bytes": len(blob), "error": ""}
if __name__ == "__main__":
import sys
pid = sys.argv[1] if len(sys.argv) > 1 else "00000000-0000-0000-0000-000000000000"
res = comfyui_fetch_output_video(pid, dest="/tmp/comfy_video")
print(json.dumps(res, indent=2))
@@ -0,0 +1,68 @@
"""Tests de las partes puras de comfyui_fetch_output_video.
Cubren la localizacion del output de video/animacion (la logica con sustancia)
sin tocar red ni GPU: clasificacion por extension, fallback por campo "format",
prioridad de claves de video, y ausencia de video. La descarga real via /view se
valida por integracion contra el server (ver report 0110).
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from comfyui_fetch_output_video import _find_video_output, _is_video_item
def test_is_video_item_por_extension():
assert _is_video_item({"filename": "a.mp4"})
assert _is_video_item({"filename": "a.webp"})
assert _is_video_item({"filename": "a.webm"})
assert _is_video_item({"filename": "a.gif"})
assert not _is_video_item({"filename": "a.png"})
assert not _is_video_item({"filename": "a.glb"})
def test_is_video_item_fallback_format():
# VHS_VideoCombine / SaveVideo a veces no dan extension clara pero anotan format.
assert _is_video_item({"filename": "clip", "format": "video/h264-mp4"})
assert not _is_video_item({"filename": "clip", "format": "image/png"})
def test_find_saveanimatedwebp_bajo_images():
# SVD / SaveAnimatedWEBP exponen el .webp bajo la clave "images".
outs = {"30": {"images": [{"filename": "svd_00001_.webp", "subfolder": "", "type": "output"}]}}
got = _find_video_output(outs)
assert got is not None and got["filename"] == "svd_00001_.webp"
def test_find_savevideo_mp4_bajo_images():
outs = {"79": {"images": [{"filename": "video_00003_.mp4", "subfolder": "", "type": "output"}]}}
assert _find_video_output(outs)["filename"] == "video_00003_.mp4"
def test_find_vhs_bajo_gifs():
# VHS_VideoCombine expone el resultado bajo "gifs".
outs = {"9": {"gifs": [{"filename": "out.webm", "subfolder": "sub", "type": "output"}]}}
got = _find_video_output(outs)
assert got["filename"] == "out.webm" and got["subfolder"] == "sub"
def test_find_prioriza_clave_video_sobre_images():
# Si hay una clave preferente de video y tambien una imagen suelta, gana el video.
outs = {
"1": {"images": [{"filename": "preview.png", "subfolder": "", "type": "output"}]},
"2": {"videos": [{"filename": "final.mp4", "subfolder": "", "type": "output"}]},
}
assert _find_video_output(outs)["filename"] == "final.mp4"
def test_find_sin_video_devuelve_none():
outs = {"1": {"images": [{"filename": "foo.png", "subfolder": "", "type": "output"}]}}
assert _find_video_output(outs) is None
assert _find_video_output({}) is None
if __name__ == "__main__":
import pytest
raise SystemExit(pytest.main([__file__, "-v"]))
@@ -0,0 +1,124 @@
---
name: comfyui_run_foreign_workflow_oneshot
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def comfyui_run_foreign_workflow_oneshot(source, *, server: str = \"127.0.0.1:8188\", dest: str | None = None, output_kind: str = \"auto\", install_nodes: bool = False, node_repos: dict | None = None, wait_timeout: float = 600.0, civitai_token: str | None = None, hf_token: str | None = None) -> dict"
description: "Ejecuta un workflow ComfyUI AJENO end-to-end en una sola llamada: importa desde cualquier fuente (URL Drive/GitHub/Civitai/HuggingFace, PNG/JSON local, o dict en API format) -> resuelve dependencias (nodos/modelos faltantes) -> valida -> encola -> espera -> descarga los outputs (imagen/video/malla). SEGURIDAD: nunca instala modelos a ciegas (los reporta en 'missing') y solo instala nodos custom si install_nodes=True y el caller pasa su URL de repo confiable en node_repos. Si faltan deps, NO encola. Compone comfyui_download_workflow + resolve_workflow_deps + install_custom_node + validate_workflow + submit_workflow + wait_result + fetch_output_image/video/mesh. Pipeline impuro: HTTP + disco."
tags: [comfyui, pipeline, workflow, foreign, oneshot, ml, video, image, mesh]
uses_functions:
- comfyui_download_workflow_py_ml
- comfyui_resolve_workflow_deps_py_ml
- comfyui_install_custom_node_py_ml
- comfyui_validate_workflow_py_ml
- comfyui_submit_workflow_py_ml
- comfyui_wait_result_py_ml
- comfyui_fetch_output_image_py_ml
- comfyui_fetch_output_video_py_ml
- comfyui_fetch_output_mesh_py_ml
uses_types: []
returns: []
returns_optional: false
error_type: error_go_core
imports: []
params:
- name: source
desc: "Workflow de entrada: URL (Google Drive, GitHub, Civitai, HuggingFace o directa a .json/.png/.webp), ruta local (.json/.png), o dict ya en API format ({node_id: {class_type, inputs}})."
- name: server
desc: "host:port del servidor ComfyUI sin esquema. keyword-only."
- name: dest
desc: "Directorio destino de los outputs descargados. None = cwd. Se crea si no existe. keyword-only."
- name: output_kind
desc: "Que outputs descargar: 'auto' (todos: imagenes + primer video + primera malla), 'image', 'video' o 'mesh'. keyword-only."
- name: install_nodes
desc: "Si True, instala los nodos custom faltantes cuyo class_type este mapeado a una URL de repo confiable en node_repos. Tras instalar hay que reiniciar ComfyUI para cargarlos, asi que el pipeline NO completa el submit en esa llamada. Por defecto False. keyword-only."
- name: node_repos
desc: "Mapa {class_type: repo_url} con las URLs de repo confiables de los nodos custom que se permite instalar. Solo se usa si install_nodes=True. Los modelos faltantes NUNCA se instalan. keyword-only."
- name: wait_timeout
desc: "Segundos maximos esperando a que el server termine. keyword-only."
- name: civitai_token
desc: "Token de Civitai (Bearer) para fuentes gated. keyword-only."
- name: hf_token
desc: "Token de HuggingFace (Bearer) para datasets privados. keyword-only."
output: "dict {ok, prompt_id, outputs, missing, source_type, error}. outputs = lista de rutas locales descargadas. missing = lista de dependencias faltantes (cada una {kind: 'node'|'model', name, action, hint, ...}); no vacia cuando el workflow pide algo que no tenemos, y entonces ok=False y NO se encola. source_type = de donde vino ('github', 'drive', 'local', 'dict', ...). Si falla, ok=False y error explica."
tested: true
tests:
- "test_classify_outputs_reparte_por_extension"
- "test_resolve_workflow_dict_api_format"
- "test_resolve_workflow_dict_mal_formado"
- "test_resolve_workflow_tipo_invalido"
- "test_pipeline_output_kind_invalido_no_toca_server"
- "test_pipeline_dict_mal_formado_no_toca_server"
test_file_path: "python/functions/pipelines/comfyui_run_foreign_workflow_oneshot_test.py"
file_path: "python/functions/pipelines/comfyui_run_foreign_workflow_oneshot.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions"))
from pipelines.comfyui_run_foreign_workflow_oneshot import comfyui_run_foreign_workflow_oneshot
# Workflow ajeno desde GitHub raw -> se importa, valida y ejecuta en nuestro server.
res = comfyui_run_foreign_workflow_oneshot(
"https://raw.githubusercontent.com/cubiq/ComfyUI_Workflows/main/ComfyUI_Simple/SDXL_simple.json",
dest="/tmp/comfy_foreign",
)
# Si faltan deps NO encola:
# res == {"ok": False, "prompt_id": "", "outputs": [], "source_type": "github",
# "missing": [{"kind": "model", "name": "sd_xl_base_1.0.safetensors", ...}],
# "error": "dependencias faltantes; no se encola"}
# Con todo presente:
# res == {"ok": True, "prompt_id": "...", "outputs": ["/tmp/comfy_foreign/out_00001_.png"],
# "missing": [], "source_type": "github", "error": ""}
# Tambien acepta un workflow ya validado de la libreria local o un dict en API format.
res2 = comfyui_run_foreign_workflow_oneshot(
os.path.expanduser("~/ComfyUI/workflows_library/txt2img_flux_schnell.api.json"),
dest="/tmp/comfy_foreign", output_kind="image",
)
```
Lánzalo con el python del venv (import de arriba o heredoc). `./fn run` directo no aplica porque la firma usa `*` (keyword-only).
## Cuando usarla
Cuando te pasan un workflow de ComfyUI de internet (un `.json`/`.png` de Drive,
GitHub, Civitai o HuggingFace, o un dict en API format) y quieres ejecutarlo en
nuestro servidor sin montar el flujo a mano. Hace en una llamada lo que antes eran
6-9 pasos: importar a API format, detectar qué nodos/modelos faltan, (opcionalmente)
instalar nodos custom confiables, validar, encolar, esperar y bajar los outputs.
Úsalo como puerta de entrada segura para workflows de terceros: te dice exactamente
qué falta antes de tocar la GPU. Para builders propios del registry (txt2img, img2vid,
img-to-3d) usa sus pipelines dedicados; este es para workflows que NO construimos nosotros.
## Gotchas
- Pipeline impuro: HTTP (import/validate/submit/poll/fetch) + escritura en disco
(+ subprocess git/pip si instala nodos). Requiere el server ComfyUI vivo.
- **Seguridad — código de terceros**: un workflow ajeno se ejecuta como grafo en
NUESTRO servidor. Por eso el pipeline SIEMPRE resuelve dependencias antes del
submit y NUNCA instala nada a ciegas. Los **modelos** faltantes se REPORTAN en
`missing`, jamás se descargan automáticamente. Los **nodos custom** solo se instalan
si `install_nodes=True` y el caller pasa la URL del repo en `node_repos` (fuente
confiable explícita).
- Tras instalar un nodo custom, ComfyUI debe reiniciarse para cargarlo (no en
caliente: cortaría generaciones en curso). Por eso, si instala nodos, el pipeline
devuelve `ok=False` con la instrucción de reiniciar y reintentar — no completa el
submit en esa misma llamada.
- Si tras resolver quedan dependencias faltantes (modelos, o nodos sin cargar), NO
encola: `ok=False` con `missing` poblado. Revisa `missing` para saber qué bajar/instalar.
- `output_kind="auto"` baja todas las imágenes + el primer vídeo + la primera malla.
Un `.webp` se clasifica como vídeo/animación (no imagen estática).
- `wait_timeout` por defecto 600s cubre vídeo/3D; súbelo para workflows muy largos.
- `dest` se trata siempre como directorio y se crea si no existe.
## Capability growth log
- v1.0.0 (2026-06-24) — creación. Promueve la secuencia del roadmap 0064/0087
(import → resolve deps → validate → submit → wait → fetch) a un pipeline one-shot
para workflows ComfyUI ajenos, con gate de dependencias y sin instalación a ciegas.
@@ -0,0 +1,275 @@
"""comfyui_run_foreign_workflow_oneshot — ejecuta un workflow ComfyUI ajeno end-to-end.
Promocion de la secuencia del roadmap (issue 0064/0087): traer un workflow
foraneo (URL de Drive/GitHub/Civitai/HuggingFace, PNG/JSON local, o dict en API
format) y ejecutarlo en NUESTRO servidor ComfyUI en una sola llamada, resolviendo
antes sus dependencias para no fallar a ciegas. Compone funciones del registry del
grupo `comfyui`:
comfyui_download_workflow_py_ml (cualquier fuente -> API format)
comfyui_resolve_workflow_deps_py_ml (detecta nodos/modelos faltantes)
comfyui_install_custom_node_py_ml (instala nodos custom, opt-in + confiable)
comfyui_validate_workflow_py_ml (revalida tras instalar)
comfyui_submit_workflow_py_ml (POST /prompt)
comfyui_wait_result_py_ml (poll /history)
comfyui_fetch_output_image_py_ml (GET /view -> disco, imagenes)
comfyui_fetch_output_video_py_ml (GET /view -> disco, video/animacion)
comfyui_fetch_output_mesh_py_ml (GET /view -> disco, mallas 3D)
SEGURIDAD: un workflow foraneo es codigo de terceros que se ejecuta como grafo en
nuestro servidor. Por eso el pipeline SIEMPRE resuelve dependencias antes del
submit y NUNCA instala nada a ciegas:
- Modelos faltantes -> se REPORTAN en `missing`, jamas se descargan
automaticamente (no hay forma de saber si la fuente es confiable).
- Nodos custom faltantes -> solo se instalan si install_nodes=True Y el caller
pasa su URL de repo en `node_repos` (fuente confiable explicita). Tras
instalar, ComfyUI debe reiniciarse para cargarlos, asi que el pipeline NO
completa el submit en esa misma llamada: devuelve ok=False con la instruccion
de reiniciar y reintentar.
Si tras resolver quedan dependencias faltantes, el pipeline NO encola: devuelve
ok=False con `missing` poblado.
Pipeline impuro: red (HTTP) + escritura en disco (+ subprocess git/pip si se
instalan nodos custom).
"""
from __future__ import annotations
import json
import os
import sys
# Importa las funciones del registry (mismo arbol python/functions).
_FUNCTIONS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _FUNCTIONS_ROOT not in sys.path:
sys.path.insert(0, _FUNCTIONS_ROOT)
from ml.comfyui_download_workflow import comfyui_download_workflow
from ml.comfyui_fetch_output_image import comfyui_fetch_output_image
from ml.comfyui_fetch_output_mesh import comfyui_fetch_output_mesh
from ml.comfyui_fetch_output_video import comfyui_fetch_output_video
from ml.comfyui_install_custom_node import comfyui_install_custom_node
from ml.comfyui_resolve_workflow_deps import comfyui_resolve_workflow_deps
from ml.comfyui_submit_workflow import comfyui_submit_workflow
from ml.comfyui_validate_workflow import comfyui_validate_workflow
from ml.comfyui_wait_result import comfyui_wait_result
# Clasificacion de los outputs por extension del filename.
_IMAGE_EXTS = (".png", ".jpg", ".jpeg", ".bmp", ".tiff")
_VIDEO_EXTS = (".mp4", ".webm", ".webp", ".gif", ".mkv", ".mov", ".avif")
_MESH_EXTS = (".glb", ".gltf", ".obj", ".ply", ".fbx", ".stl", ".usdz", ".splat")
def _classify_outputs(outputs: dict) -> dict:
"""Reparte los items de los outputs de /history en image/video/mesh por extension.
Devuelve {"image": [items], "video": [items], "mesh": [items]} donde cada
item es {filename, subfolder, type}. Un .webp se considera video/animacion
(los nodos de animacion de ComfyUI lo usan); las imagenes estaticas usan png/jpg.
"""
buckets: dict[str, list] = {"image": [], "video": [], "mesh": []}
for node_out in outputs.values():
if not isinstance(node_out, dict):
continue
for items in node_out.values():
if not isinstance(items, list):
continue
for item in items:
if not isinstance(item, dict):
continue
fn = (item.get("filename") or "").lower()
rec = {
"filename": item.get("filename", ""),
"subfolder": item.get("subfolder", ""),
"type": item.get("type", "output"),
}
if fn.endswith(_MESH_EXTS):
buckets["mesh"].append(rec)
elif fn.endswith(_VIDEO_EXTS):
buckets["video"].append(rec)
elif fn.endswith(_IMAGE_EXTS):
buckets["image"].append(rec)
return buckets
def _resolve_workflow(source, server, civitai_token, hf_token):
"""Resuelve el `source` (dict | str) a un workflow en API format.
Devuelve (workflow, source_type, error). Si source ya es un dict en API
format se usa tal cual; si es str se delega a comfyui_download_workflow.
"""
if isinstance(source, dict):
if source and all(
isinstance(v, dict) and "class_type" in v for v in source.values()
):
return source, "dict", ""
return {}, "dict", "el dict pasado no esta en API format ({node_id: {class_type, inputs}})"
if not isinstance(source, str):
return {}, "", f"source debe ser str (URL/path) o dict (API format), no {type(source).__name__}"
dl = comfyui_download_workflow(
source, server=server, civitai_token=civitai_token, hf_token=hf_token
)
if not dl.get("ok"):
return {}, dl.get("source_type", ""), dl.get("error", "no se pudo importar el workflow")
return dl.get("workflow", {}), dl.get("source_type", ""), ""
def comfyui_run_foreign_workflow_oneshot(
source,
*,
server: str = "127.0.0.1:8188",
dest: str | None = None,
output_kind: str = "auto",
install_nodes: bool = False,
node_repos: dict | None = None,
wait_timeout: float = 600.0,
civitai_token: str | None = None,
hf_token: str | None = None,
) -> dict:
"""Importa, valida, ejecuta y descarga los outputs de un workflow ComfyUI ajeno.
Args:
source: workflow de entrada. Puede ser una URL (Google Drive, GitHub,
Civitai, HuggingFace o directa a .json/.png/.webp), una ruta local
(.json/.png), o un dict ya en API format ({node_id: {class_type,
inputs}}).
server: host:port del servidor ComfyUI (sin esquema). keyword-only.
dest: directorio destino de los outputs descargados. None = cwd.
keyword-only.
output_kind: que outputs descargar: "auto" (todos: imagenes + el primer
video + la primera malla), "image", "video" o "mesh". keyword-only.
install_nodes: si True, instala los nodos custom faltantes cuyo class_type
este mapeado a una URL de repo confiable en `node_repos`. Tras instalar
hay que reiniciar ComfyUI para cargarlos, asi que el pipeline NO
completa el submit en esa llamada. Por defecto False. keyword-only.
node_repos: mapa {class_type: repo_url} con las URLs de repo confiables de
los nodos custom que se permite instalar. Solo se usa si
install_nodes=True. Los modelos faltantes NUNCA se instalan. keyword-only.
wait_timeout: segundos maximos esperando a que el server termine.
keyword-only.
civitai_token / hf_token: tokens opcionales para fuentes gated. keyword-only.
Returns:
dict {ok, prompt_id, outputs, missing, source_type, error}:
- outputs: lista de rutas locales de los archivos descargados.
- missing: lista de dependencias faltantes (suggestions de
comfyui_resolve_workflow_deps): cada una {kind: "node"|"model", name,
action, hint, ...}. No vacia cuando el workflow pide algo que no
tenemos; en ese caso ok=False y NO se encola.
- source_type: de donde vino el workflow ("github", "drive", "local",
"dict", ...).
Si falla en cualquier paso, ok=False y error explica el motivo. Nunca
instala modelos ni nodos de fuentes no confiables a ciegas.
"""
if output_kind not in ("auto", "image", "video", "mesh"):
return {"ok": False, "prompt_id": "", "outputs": [], "missing": [],
"source_type": "", "error": f"output_kind {output_kind!r} no valido"}
# 1. Importar el workflow a API format desde cualquier fuente.
workflow, source_type, err = _resolve_workflow(source, server, civitai_token, hf_token)
if err:
return {"ok": False, "prompt_id": "", "outputs": [], "missing": [],
"source_type": source_type, "error": f"import fallo: {err}"}
if not workflow:
return {"ok": False, "prompt_id": "", "outputs": [], "missing": [],
"source_type": source_type, "error": "el workflow importado esta vacio"}
# 2. Resolver dependencias (nodos/modelos faltantes) ANTES de encolar.
deps = comfyui_resolve_workflow_deps(workflow, server=server)
if not deps.get("ok"):
return {"ok": False, "prompt_id": "", "outputs": [], "missing": [],
"source_type": source_type,
"error": f"no se pudieron resolver dependencias (¿server vivo?): {deps.get('error')}"}
missing_nodes = list(deps.get("missing_nodes", []))
missing_models = list(deps.get("missing_models", []))
suggestions = list(deps.get("suggestions", []))
# 3. Instalar SOLO nodos custom confiables (opt-in + URL provista por el caller).
install_notes = []
repos = node_repos or {}
if install_nodes and missing_nodes and repos:
for ctype in list(missing_nodes):
repo = repos.get(ctype)
if not repo:
continue # sin URL confiable -> no se instala, queda en missing.
res = comfyui_install_custom_node(repo, restart=False)
if res.get("ok"):
install_notes.append(f"nodo '{ctype}' instalado desde {repo}")
else:
install_notes.append(f"fallo instalando '{ctype}': {res.get('error')}")
# 4. Si quedan dependencias faltantes (modelos, o nodos sin cargar) -> NO encolar.
# Los nodos recien instalados requieren reiniciar ComfyUI; no se cargan ahora.
if missing_nodes or missing_models:
note = "dependencias faltantes; no se encola"
if install_notes:
note += ". " + "; ".join(install_notes)
note += ". Reinicia ComfyUI (cuando no haya generaciones en curso) y reintenta"
return {"ok": False, "prompt_id": "", "outputs": [], "missing": suggestions,
"source_type": source_type, "error": note}
# 5. Encolar el workflow.
try:
sub = comfyui_submit_workflow(workflow, server=server)
prompt_id = sub["prompt_id"]
except (RuntimeError, KeyError) as exc:
return {"ok": False, "prompt_id": "", "outputs": [], "missing": [],
"source_type": source_type, "error": f"submit fallo: {exc}"}
# 6. Esperar a que termine.
try:
outputs = comfyui_wait_result(prompt_id, server=server, timeout=wait_timeout)
except (TimeoutError, RuntimeError) as exc:
return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [],
"source_type": source_type, "error": f"wait fallo: {exc}"}
# 7. Descargar los outputs segun output_kind. `dest` se trata SIEMPRE como
# directorio (el workflow puede producir varios archivos): se crea de
# antemano para que fetch_video/fetch_mesh escriban el basename dentro
# (si el dir no existe, esos helpers interpretarian la ruta como archivo).
out_dir = dest or "."
try:
os.makedirs(out_dir, exist_ok=True)
except OSError as exc:
return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [],
"source_type": source_type, "error": f"no se pudo crear dest {out_dir!r}: {exc}"}
buckets = _classify_outputs(outputs)
paths: list[str] = []
want_image = output_kind in ("auto", "image")
want_video = output_kind in ("auto", "video")
want_mesh = output_kind in ("auto", "mesh")
if want_image:
for item in buckets["image"]:
r = comfyui_fetch_output_image(
item["filename"], subfolder=item["subfolder"],
type_=item["type"], server=server, dest_dir=out_dir,
)
if r.get("ok"):
paths.append(r["path"])
if want_video and buckets["video"]:
r = comfyui_fetch_output_video(prompt_id, server=server, dest=out_dir, outputs=outputs)
if r.get("ok"):
paths.append(r["path"])
if want_mesh and buckets["mesh"]:
r = comfyui_fetch_output_mesh(prompt_id, server=server, dest=out_dir)
if r.get("ok"):
paths.append(r["path"])
if not paths:
return {"ok": False, "prompt_id": prompt_id, "outputs": [], "missing": [],
"source_type": source_type,
"error": f"el workflow termino pero no se descargo ningun output de tipo {output_kind!r}"}
return {"ok": True, "prompt_id": prompt_id, "outputs": paths, "missing": [],
"source_type": source_type, "error": ""}
if __name__ == "__main__":
# Smoke: ejecuta un workflow ya validado de la libreria local (deps presentes).
src = sys.argv[1] if len(sys.argv) > 1 else os.path.expanduser(
"~/ComfyUI/workflows_library/txt2img_flux_schnell.api.json")
res = comfyui_run_foreign_workflow_oneshot(src, dest="/tmp/comfy_foreign")
print(json.dumps(res, indent=2))
@@ -0,0 +1,63 @@
"""Tests de las partes puras de comfyui_run_foreign_workflow_oneshot.
Cubren sin red ni GPU: la clasificacion de outputs por tipo, la resolucion de un
`source` que ya es dict en API format, y las ramas de error tempranas que retornan
antes de tocar el servidor (output_kind invalido, source de tipo invalido, dict mal
formado). El flujo completo import->resolve->submit->wait->fetch se valida por
integracion contra el server (ver report 0110).
"""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pipelines.comfyui_run_foreign_workflow_oneshot import (
_classify_outputs,
_resolve_workflow,
comfyui_run_foreign_workflow_oneshot,
)
def test_classify_outputs_reparte_por_extension():
outs = {
"1": {"images": [{"filename": "a.png", "subfolder": "", "type": "output"}]},
"2": {"gifs": [{"filename": "b.webm", "subfolder": "", "type": "output"}]},
"3": {"images": [{"filename": "c.webp", "subfolder": "", "type": "output"}]},
"4": {"3d": [{"filename": "d.glb", "subfolder": "", "type": "output"}]},
}
b = _classify_outputs(outs)
assert [i["filename"] for i in b["image"]] == ["a.png"]
assert sorted(i["filename"] for i in b["video"]) == ["b.webm", "c.webp"]
assert [i["filename"] for i in b["mesh"]] == ["d.glb"]
def test_resolve_workflow_dict_api_format():
wf = {"1": {"class_type": "CheckpointLoaderSimple", "inputs": {}}}
got, st, err = _resolve_workflow(wf, "127.0.0.1:8188", None, None)
assert err == "" and st == "dict" and got is wf
def test_resolve_workflow_dict_mal_formado():
got, st, err = _resolve_workflow({"1": {"no_class": 1}}, "127.0.0.1:8188", None, None)
assert got == {} and "API format" in err
def test_resolve_workflow_tipo_invalido():
got, st, err = _resolve_workflow(123, "127.0.0.1:8188", None, None)
assert got == {} and "source debe ser" in err
def test_pipeline_output_kind_invalido_no_toca_server():
r = comfyui_run_foreign_workflow_oneshot("cualquier", output_kind="bogus")
assert not r["ok"] and "output_kind" in r["error"] and r["prompt_id"] == ""
def test_pipeline_dict_mal_formado_no_toca_server():
r = comfyui_run_foreign_workflow_oneshot({"1": {"no_class": 1}}, server="127.0.0.1:8188")
assert not r["ok"] and "import fallo" in r["error"] and r["prompt_id"] == ""
if __name__ == "__main__":
import pytest
raise SystemExit(pytest.main([__file__, "-v"]))