Files
egutierrez fd5787c55f chore: auto-commit (43 archivos)
- .mcp.json
- bash/functions/infra/write_mcp_jupyter_config.md
- bash/functions/infra/write_mcp_jupyter_config.sh
- cpp/CMakeLists.txt
- cpp/apps/chart_demo
- cpp/apps/shaders_lab
- cpp/functions/gfx/gl_framebuffer.cpp
- cpp/functions/gfx/gl_framebuffer.h
- cpp/functions/gfx/gl_framebuffer.md
- cpp/functions/gfx/mesh_gpu.md
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 17:28:47 +02:00

316 lines
12 KiB
Python

"""Ejecuta todas las celdas de codigo de un notebook en orden (Run All).
Equivalente al boton "Run All" del UI de Jupyter Lab, pero invocable desde
CLI/MCP/agente. Opcionalmente reinicia el kernel antes de empezar para
garantizar un estado limpio.
Depende de jupyter_run_cells cuando esta disponible; si no, ejecuta la logica
de batch internamente reutilizando los helpers de jupyter_exec.
"""
from __future__ import annotations
import json
import time
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from jupyter_kernel_client import KernelClient
# ---------------------------------------------------------------------------
# Helpers REST (minimos, alineados con jupyter_exec)
# ---------------------------------------------------------------------------
def _auth_headers(token: str, content_type: bool = False) -> dict[str, str]:
headers = {"Accept": "application/json"}
if content_type:
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"token {token}"
return headers
def _api_get(url: str, token: str = "") -> dict | list | None:
try:
req = Request(url, headers=_auth_headers(token))
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except (URLError, OSError, json.JSONDecodeError):
return None
def _api_post(url: str, token: str = "", body: dict | None = None) -> dict | None:
data = json.dumps(body or {}).encode("utf-8")
req = Request(url, data=data, headers=_auth_headers(token, content_type=True), method="POST")
try:
with urlopen(req, timeout=30) as resp:
raw = resp.read()
return json.loads(raw) if raw else {}
except (URLError, OSError, json.JSONDecodeError):
return None
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
"""Busca el kernel_id activo para el notebook via /api/sessions."""
sessions = _api_get(f"{server_url}/api/sessions", token) or []
for session in sessions:
nb = session.get("notebook", session.get("path", {}))
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
if nb_path == notebook_path:
kernel = session.get("kernel", {})
return kernel.get("id")
return None
def _get_notebook_content(notebook_path: str, server_url: str, token: str) -> dict:
"""Lee el notebook completo via /api/contents."""
url = f"{server_url}/api/contents/{notebook_path}?content=1&type=notebook"
req = Request(url, headers=_auth_headers(token))
with urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def _put_notebook_content(notebook_path: str, server_url: str, token: str, content: dict) -> None:
"""Persiste el notebook via PUT /api/contents."""
body = json.dumps({"type": "notebook", "format": "json", "content": content}).encode("utf-8")
url = f"{server_url}/api/contents/{notebook_path}"
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
with urlopen(req, timeout=15) as resp:
resp.read()
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
"""Convierte outputs nbformat a strings legibles."""
result: list[str] = []
for output in raw_outputs:
output_type = output.get("output_type", "")
if output_type == "stream":
text = output.get("text", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type in ("display_data", "execute_result"):
data = output.get("data", {})
text = data.get("text/plain", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type == "error":
traceback = output.get("traceback", [])
result.append("\n".join(traceback))
return result
def _kernel_outputs_to_nbformat(outputs: list[dict]) -> list[dict]:
return [dict(o) for o in outputs]
def _restart_kernel_and_wait(server_url: str, token: str, kernel_id: str, poll_timeout_s: int = 30) -> None:
"""Reinicia el kernel y espera hasta que vuelva a estado idle."""
url = f"{server_url}/api/kernels/{kernel_id}/restart"
_api_post(url, token)
deadline = time.monotonic() + poll_timeout_s
while time.monotonic() < deadline:
kernels = _api_get(f"{server_url}/api/kernels", token) or []
for k in kernels:
if k.get("id") == kernel_id:
state = k.get("execution_state", "")
if state == "idle":
return
break
time.sleep(0.5)
# ---------------------------------------------------------------------------
# API publica
# ---------------------------------------------------------------------------
def jupyter_run_all(
notebook_path: str,
server_url: str = "http://localhost:8888",
token: str = "",
restart_kernel: bool = True,
stop_on_error: bool = True,
timeout_per_cell_s: int = 600,
) -> dict[str, Any]:
"""Ejecuta todas las celdas de codigo del notebook en orden.
Equivalente al boton "Run All" del UI de Jupyter Lab. Si restart_kernel
es True, reinicia el kernel del notebook ANTES de empezar para garantizar
un estado completamente limpio (sin variables residuales de ejecuciones
anteriores).
Args:
notebook_path: Ruta relativa al notebook desde la raiz del servidor
(ej: "notebooks/analisis.ipynb").
server_url: URL base del servidor Jupyter (default http://localhost:8888).
token: Token de autenticacion. Vacio si el servidor no requiere auth.
restart_kernel: Si True, reinicia el kernel antes de empezar.
Garantiza estado limpio. Default True.
stop_on_error: Si True, detiene la ejecucion cuando una celda produce
un error (output_type == "error"). Default True.
timeout_per_cell_s: Timeout en segundos por celda. Default 600 (10 min).
Returns:
{
"notebook": str, # ruta del notebook
"code_cell_indices": [int], # indices de celdas de codigo ejecutadas
"executed": [ # resultado de cada celda ejecutada
{
"cell_index": int,
"execution_count": int | None,
"outputs": [str | dict],
"error": str | None, # mensaje del error si hubo, sino None
"duration_s": float,
}
],
"stopped_at": int | None, # cell_index donde se detuvo por error
"kernel_id": str,
"total_duration_s": float,
}
Raises:
RuntimeError: Si no existe sesion activa para el notebook (el kernel
no esta corriendo). Usar jupyter_exec para crear sesion.
HTTPError: Si el servidor Jupyter devuelve un error HTTP.
URLError: Si no se puede conectar al servidor.
"""
t0 = time.monotonic()
# 1. Obtener kernel_id del notebook
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
if not kernel_id:
raise RuntimeError(
f"No hay sesion activa para '{notebook_path}'. "
"Abre el notebook en Jupyter Lab o usa jupyter_exec para crear sesion."
)
# 2. Reiniciar kernel si se solicita
if restart_kernel:
_restart_kernel_and_wait(server_url, token, kernel_id)
# 3. Leer notebook y filtrar indices de celdas de codigo
file_node = _get_notebook_content(notebook_path, server_url, token)
nb = file_node["content"]
cells = nb.get("cells", [])
code_cell_indices = [
i for i, cell in enumerate(cells)
if cell.get("cell_type") == "code"
]
# 4. Ejecutar cada celda en orden
executed: list[dict] = []
stopped_at: int | None = None
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
for cell_index in code_cell_indices:
cell = cells[cell_index]
source = cell.get("source", "")
if isinstance(source, list):
source = "".join(source)
# Saltar celdas vacias
if not source.strip():
executed.append({
"cell_index": cell_index,
"execution_count": None,
"outputs": [],
"error": None,
"duration_s": 0.0,
})
continue
cell_t0 = time.monotonic()
result = kernel.execute(source)
cell_duration = time.monotonic() - cell_t0
raw_outputs = result.get("outputs", [])
cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
cell["execution_count"] = result.get("execution_count")
# Detectar error en outputs
error_msg: str | None = None
for out in raw_outputs:
if out.get("output_type") == "error":
ename = out.get("ename", "Error")
evalue = out.get("evalue", "")
error_msg = f"{ename}: {evalue}"
break
executed.append({
"cell_index": cell_index,
"execution_count": result.get("execution_count"),
"outputs": _extract_outputs(raw_outputs),
"error": error_msg,
"duration_s": round(cell_duration, 3),
})
if error_msg and stop_on_error:
stopped_at = cell_index
break
# 5. Persistir notebook con outputs actualizados
_put_notebook_content(notebook_path, server_url, token, nb)
total_duration = time.monotonic() - t0
return {
"notebook": notebook_path,
"code_cell_indices": code_cell_indices,
"executed": executed,
"stopped_at": stopped_at,
"kernel_id": kernel_id,
"total_duration_s": round(total_duration, 3),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Ejecuta todas las celdas de codigo de un notebook en orden (Run All)"
)
parser.add_argument("notebook", help="Ruta del notebook relativa al servidor Jupyter")
parser.add_argument("--server", default="http://localhost:8888", help="URL del servidor Jupyter")
parser.add_argument("--token", default="", help="Token de autenticacion")
parser.add_argument(
"--no-restart",
action="store_true",
help="No reiniciar el kernel antes de ejecutar",
)
parser.add_argument(
"--continue-on-error",
action="store_true",
help="Continuar ejecucion aunque una celda produzca error",
)
parser.add_argument(
"--timeout",
type=int,
default=600,
help="Timeout en segundos por celda (default: 600)",
)
args = parser.parse_args()
try:
result = jupyter_run_all(
notebook_path=args.notebook,
server_url=args.server,
token=args.token,
restart_kernel=not args.no_restart,
stop_on_error=not args.continue_on_error,
timeout_per_cell_s=args.timeout,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)