"""Localiza y descarga el output de audio de un workflow ComfyUI a disco. Hermana de comfyui_fetch_output_video / comfyui_fetch_output_image / _mesh, pero para los nodos de audio (SaveAudio, SaveAudioMP3, SaveAudioOpus, SaveAudioAdvanced). Esos nodos exponen su salida en GET /history/{prompt_id} bajo la clave "audio" como lista de items {filename, subfolder, type}. Esta funcion localiza el primer archivo con extension de audio (.flac/.wav/.mp3/.opus/.ogg/.m4a), lo baja via GET /view a disco y, opcionalmente, lo escribe en `dest`. Impura: red (HTTP GET a /history y /view) + escritura en disco. Solo stdlib. """ import json import os import urllib.error import urllib.parse import urllib.request # Extensiones de audio que producen los nodos SaveAudio* de ComfyUI. _AUDIO_EXTS = (".flac", ".wav", ".mp3", ".opus", ".ogg", ".m4a") # Claves de output preferentes para audio (se inspeccionan primero). _AUDIO_KEYS = ("audio", "audios") def _is_audio_item(item: dict) -> bool: """True si el item de output apunta a un archivo de audio (por extension).""" fn = (item.get("filename") or "").lower() return fn.endswith(_AUDIO_EXTS) def _find_audio_output(outputs: dict) -> dict | None: """Busca en los outputs de /history el primer archivo de audio. Hace dos pasadas: primero en la clave preferente "audio" (la que usan los nodos SaveAudio*), luego en cualquier clave por si un nodo lo expone bajo otro nombre. Devuelve {filename, subfolder, type} o None. """ for prefer in (True, False): for node_out in outputs.values(): if not isinstance(node_out, dict): continue for key, items in node_out.items(): if prefer and key not in _AUDIO_KEYS: continue if not isinstance(items, list): continue for item in items: if isinstance(item, dict) and _is_audio_item(item): return { "filename": item.get("filename", ""), "subfolder": item.get("subfolder", ""), "type": item.get("type", "output"), } return None def _resolve_dest(dest: str | None, filename: str) -> str: """Resuelve la ruta local destino a partir de `dest` y el basename remoto.""" base = os.path.basename(filename) if dest is None: return os.path.join(os.getcwd(), base) expanded = os.path.expanduser(dest) if os.path.isdir(expanded) or expanded.endswith(os.sep): return os.path.join(expanded, base) return expanded def comfyui_fetch_output_audio( prompt_id: str, *, server: str = "127.0.0.1:8188", dest: str | None = None, outputs: dict | None = None, timeout: float = 120.0, ) -> dict: """Descarga el audio de un prompt ComfyUI ya ejecutado a disco local. Args: prompt_id: id devuelto por comfyui_submit_workflow, de un workflow cuyo nodo de audio (SaveAudio/SaveAudioMP3/...) ya termino (usa comfyui_wait_result antes si dudas). Se ignora si se pasa `outputs`. server: host:port del servidor ComfyUI (sin esquema). keyword-only. dest: ruta destino. Si None, escribe el basename del audio en el cwd. Si es un directorio (o termina en separador), escribe el basename dentro. Si es una ruta de archivo, escribe ahi. keyword-only. outputs: dict de outputs ya obtenido (el que devuelve comfyui_wait_result). Si se pasa, se busca el audio ahi y NO se consulta /history (evita una peticion de red extra justo despues de esperar). keyword-only. timeout: timeout de cada peticion HTTP en segundos. keyword-only. Returns: dict {ok, path, format, bytes, error}. path = ruta local del archivo de audio guardado; format = extension sin punto (ej. "flac" o "mp3"); bytes = tamano descargado. Si falla, ok=False y error explica (sin audio en los outputs, HTTP, conexion o escritura). """ # 1. Obtener los outputs: del parametro (sin red) o consultando /history. if outputs is None: hist_url = f"http://{server}/history/{prompt_id}" try: with urllib.request.urlopen(hist_url, timeout=timeout) as resp: hist = json.loads(resp.read()) except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:200] return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"HTTP {exc.code} en {hist_url}: {body}"} except urllib.error.URLError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo conectar a {hist_url}: {exc.reason}"} except json.JSONDecodeError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"respuesta no es JSON valido desde {hist_url}: {exc}"} entry = hist.get(prompt_id) if not entry: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"prompt_id {prompt_id} no esta en /history (¿no termino o se purgo?)"} outputs = entry.get("outputs", {}) audio = _find_audio_output(outputs or {}) if audio is None: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"sin archivo de audio en los outputs de {prompt_id}"} # 2. Descargar el archivo via GET /view. qs = urllib.parse.urlencode({ "filename": audio["filename"], "subfolder": audio["subfolder"], "type": audio["type"], }) view_url = f"http://{server}/view?{qs}" try: with urllib.request.urlopen(view_url, timeout=timeout) as resp: blob = resp.read() except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:200] return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"HTTP {exc.code} en {view_url}: {body}"} except urllib.error.URLError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo conectar a {view_url}: {exc.reason}"} # 3. Escribir a disco. out_path = _resolve_dest(dest, audio["filename"]) try: parent = os.path.dirname(out_path) if parent: os.makedirs(parent, exist_ok=True) with open(out_path, "wb") as f: f.write(blob) except OSError as exc: return {"ok": False, "path": "", "format": "", "bytes": 0, "error": f"no se pudo escribir en {out_path!r}: {exc}"} fmt = os.path.splitext(audio["filename"])[1].lstrip(".").lower() return {"ok": True, "path": out_path, "format": fmt, "bytes": len(blob), "error": ""} if __name__ == "__main__": import sys pid = sys.argv[1] if len(sys.argv) > 1 else "00000000-0000-0000-0000-000000000000" res = comfyui_fetch_output_audio(pid, dest="/tmp/comfy_audio") print(json.dumps(res, indent=2))