From ca07b25297391bf60960bc594c17882650e6d81a Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Sun, 28 Jun 2026 04:54:14 +0200 Subject: [PATCH] =?UTF-8?q?feat(comfyui):=20comfyui=5Finterrupt=5Fqueue=20?= =?UTF-8?q?v1.1.0=20=E2=80=94=20clear=5Fpending=20+=20cleared/queue=5Frema?= =?UTF-8?q?ining=20+=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Alinea la funcion al contrato de control de cola (punto 3 del roadmap ComfyUI): - firma keyword-only: clear_pending (vacia pendientes con POST /queue {clear:true}) + timeout - output {ok, interrupted, cleared, queue_remaining, error}; GET /queue al final - no lanza en fallo de red: degrada a {ok:False, error} - test con mock HTTP local (golden + clear + cola vacia + error path), 4/4 verde - .md autosuficiente con gotchas + capability growth log Co-Authored-By: Claude Opus 4.8 (1M context) --- .../functions/ml/comfyui_interrupt_queue.md | 65 +++++--- .../functions/ml/comfyui_interrupt_queue.py | 80 +++++++--- .../ml/tests/test_comfyui_interrupt_queue.py | 149 ++++++++++++++++++ 3 files changed, 250 insertions(+), 44 deletions(-) create mode 100644 python/functions/ml/tests/test_comfyui_interrupt_queue.py diff --git a/python/functions/ml/comfyui_interrupt_queue.md b/python/functions/ml/comfyui_interrupt_queue.md index 834217c4..0b22877b 100644 --- a/python/functions/ml/comfyui_interrupt_queue.md +++ b/python/functions/ml/comfyui_interrupt_queue.md @@ -3,10 +3,10 @@ name: comfyui_interrupt_queue kind: function lang: py domain: ml -version: "1.0.0" +version: "1.1.0" purity: impure -signature: "def comfyui_interrupt_queue(server: str = \"127.0.0.1:8188\") -> dict" -description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y devuelve el estado de la cola (GET /queue). Devuelve {ok, interrupted, queue_running, queue_pending, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes. Impura: HTTP POST + GET, solo stdlib (urllib, json)." +signature: "def comfyui_interrupt_queue(*, clear_pending: bool = False, server: str = \"127.0.0.1:8188\", timeout: float = 10.0) -> dict" +description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y, si clear_pending=True, vacia ademas la cola de pendientes (POST /queue {\"clear\":true}). Consulta GET /queue al final para reportar queue_remaining. Devuelve {ok, interrupted, cleared, queue_remaining, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes salvo clear_pending. Impura: HTTP POST + GET, solo stdlib (urllib, json)." tags: [comfyui, ml, queue, interrupt, control, http] uses_functions: [] uses_types: [] @@ -15,12 +15,16 @@ returns_optional: false error_type: "error_go_core" imports: [] params: + - name: clear_pending + desc: "keyword-only. Si True, ademas de cortar el prompt en ejecucion vacia la cola de pendientes con POST /queue {\"clear\":true}. Default False." - name: server - desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')." -output: "dict con ok (bool, True si interrupt + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), queue_running (int, prompts ejecutandose), queue_pending (int, prompts encolados), error (str, vacio si todo OK)." -tested: false -tests: [] -test_file_path: "" + desc: "keyword-only. host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')." + - name: timeout + desc: "keyword-only. Timeout de cada peticion HTTP en segundos (default 10.0)." +output: "dict con ok (bool, True si interrupt + clear (si se pidio) + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), cleared (bool, True si clear_pending y POST /queue {clear:true} respondio; False si no se pidio o fallo), queue_remaining (int, queue_running + queue_pending tras la operacion), error (str, vacio si todo OK)." +tested: true +tests: ["test_interrumpe_sin_vaciar", "test_clear_pending_vacia_cola", "test_clear_pending_cola_vacia_no_rompe", "test_servidor_caido_no_lanza"] +test_file_path: "python/functions/ml/tests/test_comfyui_interrupt_queue.py" file_path: "python/functions/ml/comfyui_interrupt_queue.py" --- @@ -31,30 +35,47 @@ import sys, os sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions")) from ml.comfyui_interrupt_queue import comfyui_interrupt_queue +# Solo cortar el prompt en ejecucion (los pendientes siguen): res = comfyui_interrupt_queue() -# {'ok': True, 'interrupted': True, 'queue_running': 0, 'queue_pending': 0, 'error': ''} -if res["ok"] and res["interrupted"]: - print(f"cortado; pendientes en cola: {res['queue_pending']}") +# {'ok': True, 'interrupted': True, 'cleared': False, 'queue_remaining': 3, 'error': ''} + +# Cortar el actual Y vaciar los pendientes de golpe: +res = comfyui_interrupt_queue(clear_pending=True) +# {'ok': True, 'interrupted': True, 'cleared': True, 'queue_remaining': 0, 'error': ''} +if res["ok"]: + print(f"cortado; quedan {res['queue_remaining']} en cola") ``` -O lanzable directo con: `./fn run comfyui_interrupt_queue`. +O lanzable directo: `./fn run comfyui_interrupt_queue` · `./fn run comfyui_interrupt_queue --clear`. ## Cuando usarla Para abortar una generacion que se esta tomando demasiado, que tira de mas VRAM de -la prevista, o tras encolar por error un workflow pesado. Tambien para inspeccionar -de un vistazo cuanto queda en cola (`queue_running` / `queue_pending`) sin parsear -el JSON de /queue a mano. Es el freno de mano del round-trip build -> submit -> wait. +la prevista, o tras encolar por error un workflow pesado. Con `clear_pending=True` +es el freno de mano completo: corta el actual y borra todo lo encolado en una sola +llamada (sin tener que encadenar `comfyui_queue_manage("clear")` despues). Tras la +operacion `queue_remaining` dice de un vistazo cuanto queda en cola. ## Gotchas -- `/interrupt` corta SOLO el prompt en ejecucion; los pendientes (`queue_pending`) - siguen y el siguiente arranca de inmediato. Para vaciar la cola entera hay que - llamar `POST /queue` con `{"clear": true}` (no lo hace esta funcion — solo corta - + lee). +- `/interrupt` corta SOLO el prompt en ejecucion; sin `clear_pending` los pendientes + (`queue_pending`) siguen y el siguiente arranca de inmediato. Pasa + `clear_pending=True` para vaciar tambien la cola (POST /queue {"clear": true}). - No es idempotente en el sentido de "sin efecto": si hay algo ejecutandose, lo - mata. Si la cola esta vacia, el interrupt es inocuo (interrupted=True igual). + mata. Si la cola esta vacia, tanto el interrupt como el clear son inocuos + (`interrupted=True`/`cleared=True` igual, `queue_remaining=0`). +- `queue_remaining` se lee al FINAL (GET /queue tras interrupt+clear): es + `queue_running + queue_pending`. Justo tras un interrupt sin clear puede ser >0 + porque el siguiente pendiente ya arranco. - En fallo de red NO lanza: devuelve `ok=False` con el mensaje en `error`. Comprueba - `ok` antes de fiarte de los conteos. + `ok` antes de fiarte de `queue_remaining`. - Tras el interrupt conviene liberar VRAM con `POST /free` si vas a encolar otro - trabajo pesado (esta funcion no lo hace). + trabajo pesado (esta funcion no lo hace; ver el round-trip build -> submit -> wait). +- Para operaciones de cola mas finas (borrar UN prompt por id, contar el historial) + usa `comfyui_queue_manage`; esta funcion se centra en el interrupt + clear masivo. + +## Capability growth log + +- v1.1.0 (2026-06-28) — anade flag `clear_pending` (vacia la cola en la misma + llamada) + param `timeout`; el output pasa a {ok, interrupted, cleared, + queue_remaining, error} y se anaden tests (mock HTTP local). diff --git a/python/functions/ml/comfyui_interrupt_queue.py b/python/functions/ml/comfyui_interrupt_queue.py index d56c81e9..7d773a0c 100644 --- a/python/functions/ml/comfyui_interrupt_queue.py +++ b/python/functions/ml/comfyui_interrupt_queue.py @@ -1,38 +1,53 @@ -"""Interrumpe la generacion en curso de ComfyUI y devuelve el estado de la cola. +"""Interrumpe la generacion en curso de ComfyUI y, opcionalmente, vacia la cola. -Funcion impura: hace red (HTTP POST /interrupt + GET /queue). Solo stdlib. +Funcion impura: hace red (HTTP POST /interrupt, POST /queue, GET /queue). Solo +stdlib (urllib, json). -POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo (no vacia -la cola: los prompts pendientes siguen). GET /queue devuelve queue_running (lo que -se ejecuta) y queue_pending (lo encolado). Esta funcion combina ambos en un dict -honesto que NO lanza excepcion en fallo de red: devuelve {ok: False, error}. +POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo: NO vacia +los pendientes, solo aborta el actual y el siguiente arranca de inmediato. Para +vaciar de golpe los pendientes hay que ademas hacer POST /queue con {"clear": true} +(lo que activa el flag clear_pending). GET /queue se consulta al final para reportar +cuantos trabajos quedan en cola tras la operacion (queue_remaining). + +NO lanza excepcion en fallo de red: devuelve un dict de estado {ok: False, error}. """ import json import urllib.error import urllib.request -def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict: - """Interrumpe la generacion en curso y devuelve el estado de la cola. +def comfyui_interrupt_queue( + *, + clear_pending: bool = False, + server: str = "127.0.0.1:8188", + timeout: float = 10.0, +) -> dict: + """Corta la generacion en curso de ComfyUI y devuelve el estado de la cola. Args: + clear_pending: si True, ademas de cortar el prompt en ejecucion vacia la + cola de pendientes con POST /queue {"clear": true}. keyword-only. server: host:port del servidor ComfyUI sin esquema (default - "127.0.0.1:8188"). + "127.0.0.1:8188"). keyword-only. + timeout: timeout de cada peticion HTTP en segundos (default 10.0). + keyword-only. Returns: dict con: - - ok (bool): True si tanto el interrupt como la lectura de la cola - tuvieron exito. + - ok (bool): True si el interrupt, la lectura de la cola y (si se pidio) + el clear tuvieron exito. - interrupted (bool): True si el POST /interrupt respondio sin error. - - queue_running (int): numero de prompts ejecutandose ahora mismo. - - queue_pending (int): numero de prompts encolados pendientes. + - cleared (bool): True si clear_pending era True y el POST /queue + {"clear": true} respondio sin error; False si no se pidio o fallo. + - queue_remaining (int): trabajos que quedan en cola tras la operacion + (queue_running + queue_pending segun GET /queue al final). - error (str): mensaje de error si algo fallo; cadena vacia si todo OK. """ out = { "ok": False, "interrupted": False, - "queue_running": 0, - "queue_pending": 0, + "cleared": False, + "queue_remaining": 0, "error": "", } base = f"http://{server}" @@ -40,19 +55,37 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict: # 1. POST /interrupt (cuerpo vacio): corta el prompt en ejecucion. try: req = urllib.request.Request(f"{base}/interrupt", data=b"", method="POST") - with urllib.request.urlopen(req, timeout=10.0): + with urllib.request.urlopen(req, timeout=timeout): out["interrupted"] = True except urllib.error.URLError as exc: reason = getattr(exc, "reason", exc) out["error"] = f"interrupt fallo: no se pudo conectar a {base}/interrupt: {reason}" return out - # 2. GET /queue: estado actual de la cola tras el interrupt. + # 2. Opcional: POST /queue {"clear": true} para vaciar los pendientes. + if clear_pending: + try: + payload = json.dumps({"clear": True}).encode() + req = urllib.request.Request( + f"{base}/queue", + data=payload, + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=timeout): + out["cleared"] = True + except urllib.error.URLError as exc: + reason = getattr(exc, "reason", exc) + out["error"] = f"clear fallo: no se pudo conectar a {base}/queue: {reason}" + return out + + # 3. GET /queue: cuantos trabajos quedan en cola tras la operacion. try: - with urllib.request.urlopen(f"{base}/queue", timeout=10.0) as resp: + with urllib.request.urlopen(f"{base}/queue", timeout=timeout) as resp: data = json.loads(resp.read()) - out["queue_running"] = len(data.get("queue_running", [])) - out["queue_pending"] = len(data.get("queue_pending", [])) + running = len(data.get("queue_running", [])) + pending = len(data.get("queue_pending", [])) + out["queue_remaining"] = running + pending out["ok"] = True except urllib.error.URLError as exc: reason = getattr(exc, "reason", exc) @@ -63,9 +96,12 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict: if __name__ == "__main__": - res = comfyui_interrupt_queue() + import sys + + clear = "--clear" in sys.argv[1:] + res = comfyui_interrupt_queue(clear_pending=clear) print( f"ok={res['ok']} interrupted={res['interrupted']} " - f"running={res['queue_running']} pending={res['queue_pending']} " + f"cleared={res['cleared']} queue_remaining={res['queue_remaining']} " f"error={res['error']!r}" ) diff --git a/python/functions/ml/tests/test_comfyui_interrupt_queue.py b/python/functions/ml/tests/test_comfyui_interrupt_queue.py new file mode 100644 index 00000000..ba7b4998 --- /dev/null +++ b/python/functions/ml/tests/test_comfyui_interrupt_queue.py @@ -0,0 +1,149 @@ +"""Tests de comfyui_interrupt_queue contra un servidor ComfyUI simulado. + +La funcion es pura I/O (HTTP), asi que levantamos un http.server local que imita +los endpoints relevantes de ComfyUI (/interrupt, /queue) y verificamos: + +- Golden: interrupt sin clear corta el actual pero NO vacia los pendientes. +- Edge: clear_pending=True vacia la cola (queue_remaining=0). +- Edge: clear_pending=True con la cola ya vacia no rompe. +- Error: si el servidor no responde, devuelve {ok:False, error} sin lanzar. +""" + +import http.server +import json +import os +import socket +import sys +import threading + +sys.path.insert(0, os.path.dirname(__file__)) +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) + +from ml.comfyui_interrupt_queue import comfyui_interrupt_queue + + +class _FakeComfyHandler(http.server.BaseHTTPRequestHandler): + """Imita ComfyUI: estado de cola mutable compartido via la clase del server.""" + + def log_message(self, *args): # silenciar el log del servidor en los tests + pass + + def _send_json(self, obj, code=200): + body = json.dumps(obj).encode() + self.send_response(code) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_POST(self): + st = self.server.state + if self.path == "/interrupt": + st["running"] = [] # interrupt corta el prompt en ejecucion + self._send_json({}) + return + if self.path == "/queue": + length = int(self.headers.get("Content-Length", 0)) + raw = self.rfile.read(length) if length else b"{}" + body = json.loads(raw or b"{}") + if body.get("clear"): + st["pending"] = [] # clear vacia los pendientes + elif "delete" in body: + st["pending"] = [ + p for p in st["pending"] if p not in body["delete"] + ] + self._send_json({}) + return + self._send_json({"error": "not found"}, code=404) + + def do_GET(self): + st = self.server.state + if self.path == "/queue": + self._send_json( + { + "queue_running": st["running"], + "queue_pending": st["pending"], + } + ) + return + self._send_json({"error": "not found"}, code=404) + + +def _start_fake_server(running, pending): + """Levanta el servidor fake en un puerto efimero. Devuelve (server, addr, thread).""" + server = http.server.HTTPServer(("127.0.0.1", 0), _FakeComfyHandler) + server.state = {"running": list(running), "pending": list(pending)} + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + host, port = server.server_address + return server, f"{host}:{port}", thread + + +def _free_port(): + """Reserva y libera un puerto para garantizar que NADA escucha ahi (error path).""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +def test_interrumpe_sin_vaciar(): + # Golden: 1 ejecutandose + 2 pendientes; interrupt corta el actual, pendientes siguen. + server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2"]) + try: + res = comfyui_interrupt_queue(server=addr) + finally: + server.shutdown() + assert res["ok"] is True + assert res["interrupted"] is True + assert res["cleared"] is False + # running cortado (0) + 2 pendientes que siguen = 2 restantes. + assert res["queue_remaining"] == 2 + assert res["error"] == "" + + +def test_clear_pending_vacia_cola(): + # Edge: clear_pending vacia los pendientes -> queue_remaining 0. + server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2", "p3"]) + try: + res = comfyui_interrupt_queue(clear_pending=True, server=addr) + finally: + server.shutdown() + assert res["ok"] is True + assert res["interrupted"] is True + assert res["cleared"] is True + assert res["queue_remaining"] == 0 + assert res["error"] == "" + + +def test_clear_pending_cola_vacia_no_rompe(): + # Edge: clear_pending con la cola ya vacia es inocuo, no rompe. + server, addr, _ = _start_fake_server(running=[], pending=[]) + try: + res = comfyui_interrupt_queue(clear_pending=True, server=addr) + finally: + server.shutdown() + assert res["ok"] is True + assert res["interrupted"] is True + assert res["cleared"] is True + assert res["queue_remaining"] == 0 + assert res["error"] == "" + + +def test_servidor_caido_no_lanza(): + # Error: nada escucha en el puerto -> {ok:False, error} sin excepcion cruda. + dead = f"127.0.0.1:{_free_port()}" + res = comfyui_interrupt_queue(server=dead, timeout=1.0) + assert res["ok"] is False + assert res["interrupted"] is False + assert res["error"] != "" + assert "interrupt fallo" in res["error"] + + +if __name__ == "__main__": + test_interrumpe_sin_vaciar() + test_clear_pending_vacia_cola() + test_clear_pending_cola_vacia_no_rompe() + test_servidor_caido_no_lanza() + print("OK: 4 tests passed")