merge(comfyui): comfyui_interrupt_queue — control de cola (interrupt + clear_pending)

This commit is contained in:
2026-06-28 04:54:46 +02:00
3 changed files with 250 additions and 44 deletions
+43 -22
View File
@@ -3,10 +3,10 @@ name: comfyui_interrupt_queue
kind: function
lang: py
domain: ml
version: "1.0.0"
version: "1.1.0"
purity: impure
signature: "def comfyui_interrupt_queue(server: str = \"127.0.0.1:8188\") -> dict"
description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y devuelve el estado de la cola (GET /queue). Devuelve {ok, interrupted, queue_running, queue_pending, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes. Impura: HTTP POST + GET, solo stdlib (urllib, json)."
signature: "def comfyui_interrupt_queue(*, clear_pending: bool = False, server: str = \"127.0.0.1:8188\", timeout: float = 10.0) -> dict"
description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y, si clear_pending=True, vacia ademas la cola de pendientes (POST /queue {\"clear\":true}). Consulta GET /queue al final para reportar queue_remaining. Devuelve {ok, interrupted, cleared, queue_remaining, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes salvo clear_pending. Impura: HTTP POST + GET, solo stdlib (urllib, json)."
tags: [comfyui, ml, queue, interrupt, control, http]
uses_functions: []
uses_types: []
@@ -15,12 +15,16 @@ returns_optional: false
error_type: "error_go_core"
imports: []
params:
- name: clear_pending
desc: "keyword-only. Si True, ademas de cortar el prompt en ejecucion vacia la cola de pendientes con POST /queue {\"clear\":true}. Default False."
- name: server
desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')."
output: "dict con ok (bool, True si interrupt + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), queue_running (int, prompts ejecutandose), queue_pending (int, prompts encolados), error (str, vacio si todo OK)."
tested: false
tests: []
test_file_path: ""
desc: "keyword-only. host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')."
- name: timeout
desc: "keyword-only. Timeout de cada peticion HTTP en segundos (default 10.0)."
output: "dict con ok (bool, True si interrupt + clear (si se pidio) + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), cleared (bool, True si clear_pending y POST /queue {clear:true} respondio; False si no se pidio o fallo), queue_remaining (int, queue_running + queue_pending tras la operacion), error (str, vacio si todo OK)."
tested: true
tests: ["test_interrumpe_sin_vaciar", "test_clear_pending_vacia_cola", "test_clear_pending_cola_vacia_no_rompe", "test_servidor_caido_no_lanza"]
test_file_path: "python/functions/ml/tests/test_comfyui_interrupt_queue.py"
file_path: "python/functions/ml/comfyui_interrupt_queue.py"
---
@@ -31,30 +35,47 @@ import sys, os
sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions"))
from ml.comfyui_interrupt_queue import comfyui_interrupt_queue
# Solo cortar el prompt en ejecucion (los pendientes siguen):
res = comfyui_interrupt_queue()
# {'ok': True, 'interrupted': True, 'queue_running': 0, 'queue_pending': 0, 'error': ''}
if res["ok"] and res["interrupted"]:
print(f"cortado; pendientes en cola: {res['queue_pending']}")
# {'ok': True, 'interrupted': True, 'cleared': False, 'queue_remaining': 3, 'error': ''}
# Cortar el actual Y vaciar los pendientes de golpe:
res = comfyui_interrupt_queue(clear_pending=True)
# {'ok': True, 'interrupted': True, 'cleared': True, 'queue_remaining': 0, 'error': ''}
if res["ok"]:
print(f"cortado; quedan {res['queue_remaining']} en cola")
```
O lanzable directo con: `./fn run comfyui_interrupt_queue`.
O lanzable directo: `./fn run comfyui_interrupt_queue` · `./fn run comfyui_interrupt_queue --clear`.
## Cuando usarla
Para abortar una generacion que se esta tomando demasiado, que tira de mas VRAM de
la prevista, o tras encolar por error un workflow pesado. Tambien para inspeccionar
de un vistazo cuanto queda en cola (`queue_running` / `queue_pending`) sin parsear
el JSON de /queue a mano. Es el freno de mano del round-trip build -> submit -> wait.
la prevista, o tras encolar por error un workflow pesado. Con `clear_pending=True`
es el freno de mano completo: corta el actual y borra todo lo encolado en una sola
llamada (sin tener que encadenar `comfyui_queue_manage("clear")` despues). Tras la
operacion `queue_remaining` dice de un vistazo cuanto queda en cola.
## Gotchas
- `/interrupt` corta SOLO el prompt en ejecucion; los pendientes (`queue_pending`)
siguen y el siguiente arranca de inmediato. Para vaciar la cola entera hay que
llamar `POST /queue` con `{"clear": true}` (no lo hace esta funcion — solo corta
+ lee).
- `/interrupt` corta SOLO el prompt en ejecucion; sin `clear_pending` los pendientes
(`queue_pending`) siguen y el siguiente arranca de inmediato. Pasa
`clear_pending=True` para vaciar tambien la cola (POST /queue {"clear": true}).
- No es idempotente en el sentido de "sin efecto": si hay algo ejecutandose, lo
mata. Si la cola esta vacia, el interrupt es inocuo (interrupted=True igual).
mata. Si la cola esta vacia, tanto el interrupt como el clear son inocuos
(`interrupted=True`/`cleared=True` igual, `queue_remaining=0`).
- `queue_remaining` se lee al FINAL (GET /queue tras interrupt+clear): es
`queue_running + queue_pending`. Justo tras un interrupt sin clear puede ser >0
porque el siguiente pendiente ya arranco.
- En fallo de red NO lanza: devuelve `ok=False` con el mensaje en `error`. Comprueba
`ok` antes de fiarte de los conteos.
`ok` antes de fiarte de `queue_remaining`.
- Tras el interrupt conviene liberar VRAM con `POST /free` si vas a encolar otro
trabajo pesado (esta funcion no lo hace).
trabajo pesado (esta funcion no lo hace; ver el round-trip build -> submit -> wait).
- Para operaciones de cola mas finas (borrar UN prompt por id, contar el historial)
usa `comfyui_queue_manage`; esta funcion se centra en el interrupt + clear masivo.
## Capability growth log
- v1.1.0 (2026-06-28) — anade flag `clear_pending` (vacia la cola en la misma
llamada) + param `timeout`; el output pasa a {ok, interrupted, cleared,
queue_remaining, error} y se anaden tests (mock HTTP local).
+58 -22
View File
@@ -1,38 +1,53 @@
"""Interrumpe la generacion en curso de ComfyUI y devuelve el estado de la cola.
"""Interrumpe la generacion en curso de ComfyUI y, opcionalmente, vacia la cola.
Funcion impura: hace red (HTTP POST /interrupt + GET /queue). Solo stdlib.
Funcion impura: hace red (HTTP POST /interrupt, POST /queue, GET /queue). Solo
stdlib (urllib, json).
POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo (no vacia
la cola: los prompts pendientes siguen). GET /queue devuelve queue_running (lo que
se ejecuta) y queue_pending (lo encolado). Esta funcion combina ambos en un dict
honesto que NO lanza excepcion en fallo de red: devuelve {ok: False, error}.
POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo: NO vacia
los pendientes, solo aborta el actual y el siguiente arranca de inmediato. Para
vaciar de golpe los pendientes hay que ademas hacer POST /queue con {"clear": true}
(lo que activa el flag clear_pending). GET /queue se consulta al final para reportar
cuantos trabajos quedan en cola tras la operacion (queue_remaining).
NO lanza excepcion en fallo de red: devuelve un dict de estado {ok: False, error}.
"""
import json
import urllib.error
import urllib.request
def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
"""Interrumpe la generacion en curso y devuelve el estado de la cola.
def comfyui_interrupt_queue(
*,
clear_pending: bool = False,
server: str = "127.0.0.1:8188",
timeout: float = 10.0,
) -> dict:
"""Corta la generacion en curso de ComfyUI y devuelve el estado de la cola.
Args:
clear_pending: si True, ademas de cortar el prompt en ejecucion vacia la
cola de pendientes con POST /queue {"clear": true}. keyword-only.
server: host:port del servidor ComfyUI sin esquema (default
"127.0.0.1:8188").
"127.0.0.1:8188"). keyword-only.
timeout: timeout de cada peticion HTTP en segundos (default 10.0).
keyword-only.
Returns:
dict con:
- ok (bool): True si tanto el interrupt como la lectura de la cola
tuvieron exito.
- ok (bool): True si el interrupt, la lectura de la cola y (si se pidio)
el clear tuvieron exito.
- interrupted (bool): True si el POST /interrupt respondio sin error.
- queue_running (int): numero de prompts ejecutandose ahora mismo.
- queue_pending (int): numero de prompts encolados pendientes.
- cleared (bool): True si clear_pending era True y el POST /queue
{"clear": true} respondio sin error; False si no se pidio o fallo.
- queue_remaining (int): trabajos que quedan en cola tras la operacion
(queue_running + queue_pending segun GET /queue al final).
- error (str): mensaje de error si algo fallo; cadena vacia si todo OK.
"""
out = {
"ok": False,
"interrupted": False,
"queue_running": 0,
"queue_pending": 0,
"cleared": False,
"queue_remaining": 0,
"error": "",
}
base = f"http://{server}"
@@ -40,19 +55,37 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
# 1. POST /interrupt (cuerpo vacio): corta el prompt en ejecucion.
try:
req = urllib.request.Request(f"{base}/interrupt", data=b"", method="POST")
with urllib.request.urlopen(req, timeout=10.0):
with urllib.request.urlopen(req, timeout=timeout):
out["interrupted"] = True
except urllib.error.URLError as exc:
reason = getattr(exc, "reason", exc)
out["error"] = f"interrupt fallo: no se pudo conectar a {base}/interrupt: {reason}"
return out
# 2. GET /queue: estado actual de la cola tras el interrupt.
# 2. Opcional: POST /queue {"clear": true} para vaciar los pendientes.
if clear_pending:
try:
payload = json.dumps({"clear": True}).encode()
req = urllib.request.Request(
f"{base}/queue",
data=payload,
method="POST",
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=timeout):
out["cleared"] = True
except urllib.error.URLError as exc:
reason = getattr(exc, "reason", exc)
out["error"] = f"clear fallo: no se pudo conectar a {base}/queue: {reason}"
return out
# 3. GET /queue: cuantos trabajos quedan en cola tras la operacion.
try:
with urllib.request.urlopen(f"{base}/queue", timeout=10.0) as resp:
with urllib.request.urlopen(f"{base}/queue", timeout=timeout) as resp:
data = json.loads(resp.read())
out["queue_running"] = len(data.get("queue_running", []))
out["queue_pending"] = len(data.get("queue_pending", []))
running = len(data.get("queue_running", []))
pending = len(data.get("queue_pending", []))
out["queue_remaining"] = running + pending
out["ok"] = True
except urllib.error.URLError as exc:
reason = getattr(exc, "reason", exc)
@@ -63,9 +96,12 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
if __name__ == "__main__":
res = comfyui_interrupt_queue()
import sys
clear = "--clear" in sys.argv[1:]
res = comfyui_interrupt_queue(clear_pending=clear)
print(
f"ok={res['ok']} interrupted={res['interrupted']} "
f"running={res['queue_running']} pending={res['queue_pending']} "
f"cleared={res['cleared']} queue_remaining={res['queue_remaining']} "
f"error={res['error']!r}"
)
@@ -0,0 +1,149 @@
"""Tests de comfyui_interrupt_queue contra un servidor ComfyUI simulado.
La funcion es pura I/O (HTTP), asi que levantamos un http.server local que imita
los endpoints relevantes de ComfyUI (/interrupt, /queue) y verificamos:
- Golden: interrupt sin clear corta el actual pero NO vacia los pendientes.
- Edge: clear_pending=True vacia la cola (queue_remaining=0).
- Edge: clear_pending=True con la cola ya vacia no rompe.
- Error: si el servidor no responde, devuelve {ok:False, error} sin lanzar.
"""
import http.server
import json
import os
import socket
import sys
import threading
sys.path.insert(0, os.path.dirname(__file__))
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from ml.comfyui_interrupt_queue import comfyui_interrupt_queue
class _FakeComfyHandler(http.server.BaseHTTPRequestHandler):
"""Imita ComfyUI: estado de cola mutable compartido via la clase del server."""
def log_message(self, *args): # silenciar el log del servidor en los tests
pass
def _send_json(self, obj, code=200):
body = json.dumps(obj).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self):
st = self.server.state
if self.path == "/interrupt":
st["running"] = [] # interrupt corta el prompt en ejecucion
self._send_json({})
return
if self.path == "/queue":
length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(length) if length else b"{}"
body = json.loads(raw or b"{}")
if body.get("clear"):
st["pending"] = [] # clear vacia los pendientes
elif "delete" in body:
st["pending"] = [
p for p in st["pending"] if p not in body["delete"]
]
self._send_json({})
return
self._send_json({"error": "not found"}, code=404)
def do_GET(self):
st = self.server.state
if self.path == "/queue":
self._send_json(
{
"queue_running": st["running"],
"queue_pending": st["pending"],
}
)
return
self._send_json({"error": "not found"}, code=404)
def _start_fake_server(running, pending):
"""Levanta el servidor fake en un puerto efimero. Devuelve (server, addr, thread)."""
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeComfyHandler)
server.state = {"running": list(running), "pending": list(pending)}
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
host, port = server.server_address
return server, f"{host}:{port}", thread
def _free_port():
"""Reserva y libera un puerto para garantizar que NADA escucha ahi (error path)."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("127.0.0.1", 0))
port = s.getsockname()[1]
s.close()
return port
def test_interrumpe_sin_vaciar():
# Golden: 1 ejecutandose + 2 pendientes; interrupt corta el actual, pendientes siguen.
server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2"])
try:
res = comfyui_interrupt_queue(server=addr)
finally:
server.shutdown()
assert res["ok"] is True
assert res["interrupted"] is True
assert res["cleared"] is False
# running cortado (0) + 2 pendientes que siguen = 2 restantes.
assert res["queue_remaining"] == 2
assert res["error"] == ""
def test_clear_pending_vacia_cola():
# Edge: clear_pending vacia los pendientes -> queue_remaining 0.
server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2", "p3"])
try:
res = comfyui_interrupt_queue(clear_pending=True, server=addr)
finally:
server.shutdown()
assert res["ok"] is True
assert res["interrupted"] is True
assert res["cleared"] is True
assert res["queue_remaining"] == 0
assert res["error"] == ""
def test_clear_pending_cola_vacia_no_rompe():
# Edge: clear_pending con la cola ya vacia es inocuo, no rompe.
server, addr, _ = _start_fake_server(running=[], pending=[])
try:
res = comfyui_interrupt_queue(clear_pending=True, server=addr)
finally:
server.shutdown()
assert res["ok"] is True
assert res["interrupted"] is True
assert res["cleared"] is True
assert res["queue_remaining"] == 0
assert res["error"] == ""
def test_servidor_caido_no_lanza():
# Error: nada escucha en el puerto -> {ok:False, error} sin excepcion cruda.
dead = f"127.0.0.1:{_free_port()}"
res = comfyui_interrupt_queue(server=dead, timeout=1.0)
assert res["ok"] is False
assert res["interrupted"] is False
assert res["error"] != ""
assert "interrupt fallo" in res["error"]
if __name__ == "__main__":
test_interrumpe_sin_vaciar()
test_clear_pending_vacia_cola()
test_clear_pending_cola_vacia_no_rompe()
test_servidor_caido_no_lanza()
print("OK: 4 tests passed")