feat(comfyui): comfyui_interrupt_queue v1.1.0 — clear_pending + cleared/queue_remaining + tests
Alinea la funcion al contrato de control de cola (punto 3 del roadmap ComfyUI):
- firma keyword-only: clear_pending (vacia pendientes con POST /queue {clear:true}) + timeout
- output {ok, interrupted, cleared, queue_remaining, error}; GET /queue al final
- no lanza en fallo de red: degrada a {ok:False, error}
- test con mock HTTP local (golden + clear + cola vacia + error path), 4/4 verde
- .md autosuficiente con gotchas + capability growth log
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -3,10 +3,10 @@ name: comfyui_interrupt_queue
|
||||
kind: function
|
||||
lang: py
|
||||
domain: ml
|
||||
version: "1.0.0"
|
||||
version: "1.1.0"
|
||||
purity: impure
|
||||
signature: "def comfyui_interrupt_queue(server: str = \"127.0.0.1:8188\") -> dict"
|
||||
description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y devuelve el estado de la cola (GET /queue). Devuelve {ok, interrupted, queue_running, queue_pending, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes. Impura: HTTP POST + GET, solo stdlib (urllib, json)."
|
||||
signature: "def comfyui_interrupt_queue(*, clear_pending: bool = False, server: str = \"127.0.0.1:8188\", timeout: float = 10.0) -> dict"
|
||||
description: "Corta la generacion en curso de ComfyUI (POST /interrupt) y, si clear_pending=True, vacia ademas la cola de pendientes (POST /queue {\"clear\":true}). Consulta GET /queue al final para reportar queue_remaining. Devuelve {ok, interrupted, cleared, queue_remaining, error}. NO lanza excepcion en fallo de red: degrada a {ok: False, error}. /interrupt corta solo el prompt en ejecucion, no vacia los pendientes salvo clear_pending. Impura: HTTP POST + GET, solo stdlib (urllib, json)."
|
||||
tags: [comfyui, ml, queue, interrupt, control, http]
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
@@ -15,12 +15,16 @@ returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: []
|
||||
params:
|
||||
- name: clear_pending
|
||||
desc: "keyword-only. Si True, ademas de cortar el prompt en ejecucion vacia la cola de pendientes con POST /queue {\"clear\":true}. Default False."
|
||||
- name: server
|
||||
desc: "host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')."
|
||||
output: "dict con ok (bool, True si interrupt + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), queue_running (int, prompts ejecutandose), queue_pending (int, prompts encolados), error (str, vacio si todo OK)."
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
desc: "keyword-only. host:port del servidor ComfyUI sin esquema (default '127.0.0.1:8188')."
|
||||
- name: timeout
|
||||
desc: "keyword-only. Timeout de cada peticion HTTP en segundos (default 10.0)."
|
||||
output: "dict con ok (bool, True si interrupt + clear (si se pidio) + lectura de cola OK), interrupted (bool, True si POST /interrupt respondio), cleared (bool, True si clear_pending y POST /queue {clear:true} respondio; False si no se pidio o fallo), queue_remaining (int, queue_running + queue_pending tras la operacion), error (str, vacio si todo OK)."
|
||||
tested: true
|
||||
tests: ["test_interrumpe_sin_vaciar", "test_clear_pending_vacia_cola", "test_clear_pending_cola_vacia_no_rompe", "test_servidor_caido_no_lanza"]
|
||||
test_file_path: "python/functions/ml/tests/test_comfyui_interrupt_queue.py"
|
||||
file_path: "python/functions/ml/comfyui_interrupt_queue.py"
|
||||
---
|
||||
|
||||
@@ -31,30 +35,47 @@ import sys, os
|
||||
sys.path.insert(0, os.path.join(os.environ["HOME"], "fn_registry", "python", "functions"))
|
||||
from ml.comfyui_interrupt_queue import comfyui_interrupt_queue
|
||||
|
||||
# Solo cortar el prompt en ejecucion (los pendientes siguen):
|
||||
res = comfyui_interrupt_queue()
|
||||
# {'ok': True, 'interrupted': True, 'queue_running': 0, 'queue_pending': 0, 'error': ''}
|
||||
if res["ok"] and res["interrupted"]:
|
||||
print(f"cortado; pendientes en cola: {res['queue_pending']}")
|
||||
# {'ok': True, 'interrupted': True, 'cleared': False, 'queue_remaining': 3, 'error': ''}
|
||||
|
||||
# Cortar el actual Y vaciar los pendientes de golpe:
|
||||
res = comfyui_interrupt_queue(clear_pending=True)
|
||||
# {'ok': True, 'interrupted': True, 'cleared': True, 'queue_remaining': 0, 'error': ''}
|
||||
if res["ok"]:
|
||||
print(f"cortado; quedan {res['queue_remaining']} en cola")
|
||||
```
|
||||
|
||||
O lanzable directo con: `./fn run comfyui_interrupt_queue`.
|
||||
O lanzable directo: `./fn run comfyui_interrupt_queue` · `./fn run comfyui_interrupt_queue --clear`.
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
Para abortar una generacion que se esta tomando demasiado, que tira de mas VRAM de
|
||||
la prevista, o tras encolar por error un workflow pesado. Tambien para inspeccionar
|
||||
de un vistazo cuanto queda en cola (`queue_running` / `queue_pending`) sin parsear
|
||||
el JSON de /queue a mano. Es el freno de mano del round-trip build -> submit -> wait.
|
||||
la prevista, o tras encolar por error un workflow pesado. Con `clear_pending=True`
|
||||
es el freno de mano completo: corta el actual y borra todo lo encolado en una sola
|
||||
llamada (sin tener que encadenar `comfyui_queue_manage("clear")` despues). Tras la
|
||||
operacion `queue_remaining` dice de un vistazo cuanto queda en cola.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- `/interrupt` corta SOLO el prompt en ejecucion; los pendientes (`queue_pending`)
|
||||
siguen y el siguiente arranca de inmediato. Para vaciar la cola entera hay que
|
||||
llamar `POST /queue` con `{"clear": true}` (no lo hace esta funcion — solo corta
|
||||
+ lee).
|
||||
- `/interrupt` corta SOLO el prompt en ejecucion; sin `clear_pending` los pendientes
|
||||
(`queue_pending`) siguen y el siguiente arranca de inmediato. Pasa
|
||||
`clear_pending=True` para vaciar tambien la cola (POST /queue {"clear": true}).
|
||||
- No es idempotente en el sentido de "sin efecto": si hay algo ejecutandose, lo
|
||||
mata. Si la cola esta vacia, el interrupt es inocuo (interrupted=True igual).
|
||||
mata. Si la cola esta vacia, tanto el interrupt como el clear son inocuos
|
||||
(`interrupted=True`/`cleared=True` igual, `queue_remaining=0`).
|
||||
- `queue_remaining` se lee al FINAL (GET /queue tras interrupt+clear): es
|
||||
`queue_running + queue_pending`. Justo tras un interrupt sin clear puede ser >0
|
||||
porque el siguiente pendiente ya arranco.
|
||||
- En fallo de red NO lanza: devuelve `ok=False` con el mensaje en `error`. Comprueba
|
||||
`ok` antes de fiarte de los conteos.
|
||||
`ok` antes de fiarte de `queue_remaining`.
|
||||
- Tras el interrupt conviene liberar VRAM con `POST /free` si vas a encolar otro
|
||||
trabajo pesado (esta funcion no lo hace).
|
||||
trabajo pesado (esta funcion no lo hace; ver el round-trip build -> submit -> wait).
|
||||
- Para operaciones de cola mas finas (borrar UN prompt por id, contar el historial)
|
||||
usa `comfyui_queue_manage`; esta funcion se centra en el interrupt + clear masivo.
|
||||
|
||||
## Capability growth log
|
||||
|
||||
- v1.1.0 (2026-06-28) — anade flag `clear_pending` (vacia la cola en la misma
|
||||
llamada) + param `timeout`; el output pasa a {ok, interrupted, cleared,
|
||||
queue_remaining, error} y se anaden tests (mock HTTP local).
|
||||
|
||||
@@ -1,38 +1,53 @@
|
||||
"""Interrumpe la generacion en curso de ComfyUI y devuelve el estado de la cola.
|
||||
"""Interrumpe la generacion en curso de ComfyUI y, opcionalmente, vacia la cola.
|
||||
|
||||
Funcion impura: hace red (HTTP POST /interrupt + GET /queue). Solo stdlib.
|
||||
Funcion impura: hace red (HTTP POST /interrupt, POST /queue, GET /queue). Solo
|
||||
stdlib (urllib, json).
|
||||
|
||||
POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo (no vacia
|
||||
la cola: los prompts pendientes siguen). GET /queue devuelve queue_running (lo que
|
||||
se ejecuta) y queue_pending (lo encolado). Esta funcion combina ambos en un dict
|
||||
honesto que NO lanza excepcion en fallo de red: devuelve {ok: False, error}.
|
||||
POST /interrupt corta el prompt que ComfyUI esta ejecutando ahora mismo: NO vacia
|
||||
los pendientes, solo aborta el actual y el siguiente arranca de inmediato. Para
|
||||
vaciar de golpe los pendientes hay que ademas hacer POST /queue con {"clear": true}
|
||||
(lo que activa el flag clear_pending). GET /queue se consulta al final para reportar
|
||||
cuantos trabajos quedan en cola tras la operacion (queue_remaining).
|
||||
|
||||
NO lanza excepcion en fallo de red: devuelve un dict de estado {ok: False, error}.
|
||||
"""
|
||||
import json
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
|
||||
|
||||
def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
|
||||
"""Interrumpe la generacion en curso y devuelve el estado de la cola.
|
||||
def comfyui_interrupt_queue(
|
||||
*,
|
||||
clear_pending: bool = False,
|
||||
server: str = "127.0.0.1:8188",
|
||||
timeout: float = 10.0,
|
||||
) -> dict:
|
||||
"""Corta la generacion en curso de ComfyUI y devuelve el estado de la cola.
|
||||
|
||||
Args:
|
||||
clear_pending: si True, ademas de cortar el prompt en ejecucion vacia la
|
||||
cola de pendientes con POST /queue {"clear": true}. keyword-only.
|
||||
server: host:port del servidor ComfyUI sin esquema (default
|
||||
"127.0.0.1:8188").
|
||||
"127.0.0.1:8188"). keyword-only.
|
||||
timeout: timeout de cada peticion HTTP en segundos (default 10.0).
|
||||
keyword-only.
|
||||
|
||||
Returns:
|
||||
dict con:
|
||||
- ok (bool): True si tanto el interrupt como la lectura de la cola
|
||||
tuvieron exito.
|
||||
- ok (bool): True si el interrupt, la lectura de la cola y (si se pidio)
|
||||
el clear tuvieron exito.
|
||||
- interrupted (bool): True si el POST /interrupt respondio sin error.
|
||||
- queue_running (int): numero de prompts ejecutandose ahora mismo.
|
||||
- queue_pending (int): numero de prompts encolados pendientes.
|
||||
- cleared (bool): True si clear_pending era True y el POST /queue
|
||||
{"clear": true} respondio sin error; False si no se pidio o fallo.
|
||||
- queue_remaining (int): trabajos que quedan en cola tras la operacion
|
||||
(queue_running + queue_pending segun GET /queue al final).
|
||||
- error (str): mensaje de error si algo fallo; cadena vacia si todo OK.
|
||||
"""
|
||||
out = {
|
||||
"ok": False,
|
||||
"interrupted": False,
|
||||
"queue_running": 0,
|
||||
"queue_pending": 0,
|
||||
"cleared": False,
|
||||
"queue_remaining": 0,
|
||||
"error": "",
|
||||
}
|
||||
base = f"http://{server}"
|
||||
@@ -40,19 +55,37 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
|
||||
# 1. POST /interrupt (cuerpo vacio): corta el prompt en ejecucion.
|
||||
try:
|
||||
req = urllib.request.Request(f"{base}/interrupt", data=b"", method="POST")
|
||||
with urllib.request.urlopen(req, timeout=10.0):
|
||||
with urllib.request.urlopen(req, timeout=timeout):
|
||||
out["interrupted"] = True
|
||||
except urllib.error.URLError as exc:
|
||||
reason = getattr(exc, "reason", exc)
|
||||
out["error"] = f"interrupt fallo: no se pudo conectar a {base}/interrupt: {reason}"
|
||||
return out
|
||||
|
||||
# 2. GET /queue: estado actual de la cola tras el interrupt.
|
||||
# 2. Opcional: POST /queue {"clear": true} para vaciar los pendientes.
|
||||
if clear_pending:
|
||||
try:
|
||||
payload = json.dumps({"clear": True}).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{base}/queue",
|
||||
data=payload,
|
||||
method="POST",
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=timeout):
|
||||
out["cleared"] = True
|
||||
except urllib.error.URLError as exc:
|
||||
reason = getattr(exc, "reason", exc)
|
||||
out["error"] = f"clear fallo: no se pudo conectar a {base}/queue: {reason}"
|
||||
return out
|
||||
|
||||
# 3. GET /queue: cuantos trabajos quedan en cola tras la operacion.
|
||||
try:
|
||||
with urllib.request.urlopen(f"{base}/queue", timeout=10.0) as resp:
|
||||
with urllib.request.urlopen(f"{base}/queue", timeout=timeout) as resp:
|
||||
data = json.loads(resp.read())
|
||||
out["queue_running"] = len(data.get("queue_running", []))
|
||||
out["queue_pending"] = len(data.get("queue_pending", []))
|
||||
running = len(data.get("queue_running", []))
|
||||
pending = len(data.get("queue_pending", []))
|
||||
out["queue_remaining"] = running + pending
|
||||
out["ok"] = True
|
||||
except urllib.error.URLError as exc:
|
||||
reason = getattr(exc, "reason", exc)
|
||||
@@ -63,9 +96,12 @@ def comfyui_interrupt_queue(server: str = "127.0.0.1:8188") -> dict:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
res = comfyui_interrupt_queue()
|
||||
import sys
|
||||
|
||||
clear = "--clear" in sys.argv[1:]
|
||||
res = comfyui_interrupt_queue(clear_pending=clear)
|
||||
print(
|
||||
f"ok={res['ok']} interrupted={res['interrupted']} "
|
||||
f"running={res['queue_running']} pending={res['queue_pending']} "
|
||||
f"cleared={res['cleared']} queue_remaining={res['queue_remaining']} "
|
||||
f"error={res['error']!r}"
|
||||
)
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
"""Tests de comfyui_interrupt_queue contra un servidor ComfyUI simulado.
|
||||
|
||||
La funcion es pura I/O (HTTP), asi que levantamos un http.server local que imita
|
||||
los endpoints relevantes de ComfyUI (/interrupt, /queue) y verificamos:
|
||||
|
||||
- Golden: interrupt sin clear corta el actual pero NO vacia los pendientes.
|
||||
- Edge: clear_pending=True vacia la cola (queue_remaining=0).
|
||||
- Edge: clear_pending=True con la cola ya vacia no rompe.
|
||||
- Error: si el servidor no responde, devuelve {ok:False, error} sin lanzar.
|
||||
"""
|
||||
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import threading
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__))
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
|
||||
|
||||
from ml.comfyui_interrupt_queue import comfyui_interrupt_queue
|
||||
|
||||
|
||||
class _FakeComfyHandler(http.server.BaseHTTPRequestHandler):
|
||||
"""Imita ComfyUI: estado de cola mutable compartido via la clase del server."""
|
||||
|
||||
def log_message(self, *args): # silenciar el log del servidor en los tests
|
||||
pass
|
||||
|
||||
def _send_json(self, obj, code=200):
|
||||
body = json.dumps(obj).encode()
|
||||
self.send_response(code)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def do_POST(self):
|
||||
st = self.server.state
|
||||
if self.path == "/interrupt":
|
||||
st["running"] = [] # interrupt corta el prompt en ejecucion
|
||||
self._send_json({})
|
||||
return
|
||||
if self.path == "/queue":
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
raw = self.rfile.read(length) if length else b"{}"
|
||||
body = json.loads(raw or b"{}")
|
||||
if body.get("clear"):
|
||||
st["pending"] = [] # clear vacia los pendientes
|
||||
elif "delete" in body:
|
||||
st["pending"] = [
|
||||
p for p in st["pending"] if p not in body["delete"]
|
||||
]
|
||||
self._send_json({})
|
||||
return
|
||||
self._send_json({"error": "not found"}, code=404)
|
||||
|
||||
def do_GET(self):
|
||||
st = self.server.state
|
||||
if self.path == "/queue":
|
||||
self._send_json(
|
||||
{
|
||||
"queue_running": st["running"],
|
||||
"queue_pending": st["pending"],
|
||||
}
|
||||
)
|
||||
return
|
||||
self._send_json({"error": "not found"}, code=404)
|
||||
|
||||
|
||||
def _start_fake_server(running, pending):
|
||||
"""Levanta el servidor fake en un puerto efimero. Devuelve (server, addr, thread)."""
|
||||
server = http.server.HTTPServer(("127.0.0.1", 0), _FakeComfyHandler)
|
||||
server.state = {"running": list(running), "pending": list(pending)}
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
host, port = server.server_address
|
||||
return server, f"{host}:{port}", thread
|
||||
|
||||
|
||||
def _free_port():
|
||||
"""Reserva y libera un puerto para garantizar que NADA escucha ahi (error path)."""
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(("127.0.0.1", 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
def test_interrumpe_sin_vaciar():
|
||||
# Golden: 1 ejecutandose + 2 pendientes; interrupt corta el actual, pendientes siguen.
|
||||
server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2"])
|
||||
try:
|
||||
res = comfyui_interrupt_queue(server=addr)
|
||||
finally:
|
||||
server.shutdown()
|
||||
assert res["ok"] is True
|
||||
assert res["interrupted"] is True
|
||||
assert res["cleared"] is False
|
||||
# running cortado (0) + 2 pendientes que siguen = 2 restantes.
|
||||
assert res["queue_remaining"] == 2
|
||||
assert res["error"] == ""
|
||||
|
||||
|
||||
def test_clear_pending_vacia_cola():
|
||||
# Edge: clear_pending vacia los pendientes -> queue_remaining 0.
|
||||
server, addr, _ = _start_fake_server(running=["r1"], pending=["p1", "p2", "p3"])
|
||||
try:
|
||||
res = comfyui_interrupt_queue(clear_pending=True, server=addr)
|
||||
finally:
|
||||
server.shutdown()
|
||||
assert res["ok"] is True
|
||||
assert res["interrupted"] is True
|
||||
assert res["cleared"] is True
|
||||
assert res["queue_remaining"] == 0
|
||||
assert res["error"] == ""
|
||||
|
||||
|
||||
def test_clear_pending_cola_vacia_no_rompe():
|
||||
# Edge: clear_pending con la cola ya vacia es inocuo, no rompe.
|
||||
server, addr, _ = _start_fake_server(running=[], pending=[])
|
||||
try:
|
||||
res = comfyui_interrupt_queue(clear_pending=True, server=addr)
|
||||
finally:
|
||||
server.shutdown()
|
||||
assert res["ok"] is True
|
||||
assert res["interrupted"] is True
|
||||
assert res["cleared"] is True
|
||||
assert res["queue_remaining"] == 0
|
||||
assert res["error"] == ""
|
||||
|
||||
|
||||
def test_servidor_caido_no_lanza():
|
||||
# Error: nada escucha en el puerto -> {ok:False, error} sin excepcion cruda.
|
||||
dead = f"127.0.0.1:{_free_port()}"
|
||||
res = comfyui_interrupt_queue(server=dead, timeout=1.0)
|
||||
assert res["ok"] is False
|
||||
assert res["interrupted"] is False
|
||||
assert res["error"] != ""
|
||||
assert "interrupt fallo" in res["error"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_interrumpe_sin_vaciar()
|
||||
test_clear_pending_vacia_cola()
|
||||
test_clear_pending_cola_vacia_no_rompe()
|
||||
test_servidor_caido_no_lanza()
|
||||
print("OK: 4 tests passed")
|
||||
Reference in New Issue
Block a user