chore: auto-commit (43 archivos)

- .mcp.json
- bash/functions/infra/write_mcp_jupyter_config.md
- bash/functions/infra/write_mcp_jupyter_config.sh
- cpp/CMakeLists.txt
- cpp/apps/chart_demo
- cpp/apps/shaders_lab
- cpp/functions/gfx/gl_framebuffer.cpp
- cpp/functions/gfx/gl_framebuffer.h
- cpp/functions/gfx/gl_framebuffer.md
- cpp/functions/gfx/mesh_gpu.md
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-30 17:28:47 +02:00
parent fec8ebd4ec
commit fce88032ca
44 changed files with 3924 additions and 64 deletions
@@ -0,0 +1,315 @@
"""Ejecuta todas las celdas de codigo de un notebook en orden (Run All).
Equivalente al boton "Run All" del UI de Jupyter Lab, pero invocable desde
CLI/MCP/agente. Opcionalmente reinicia el kernel antes de empezar para
garantizar un estado limpio.
Depende de jupyter_run_cells cuando esta disponible; si no, ejecuta la logica
de batch internamente reutilizando los helpers de jupyter_exec.
"""
from __future__ import annotations
import json
import time
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from jupyter_kernel_client import KernelClient
# ---------------------------------------------------------------------------
# Helpers REST (minimos, alineados con jupyter_exec)
# ---------------------------------------------------------------------------
def _auth_headers(token: str, content_type: bool = False) -> dict[str, str]:
headers = {"Accept": "application/json"}
if content_type:
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"token {token}"
return headers
def _api_get(url: str, token: str = "") -> dict | list | None:
try:
req = Request(url, headers=_auth_headers(token))
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
except (URLError, OSError, json.JSONDecodeError):
return None
def _api_post(url: str, token: str = "", body: dict | None = None) -> dict | None:
data = json.dumps(body or {}).encode("utf-8")
req = Request(url, data=data, headers=_auth_headers(token, content_type=True), method="POST")
try:
with urlopen(req, timeout=30) as resp:
raw = resp.read()
return json.loads(raw) if raw else {}
except (URLError, OSError, json.JSONDecodeError):
return None
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
"""Busca el kernel_id activo para el notebook via /api/sessions."""
sessions = _api_get(f"{server_url}/api/sessions", token) or []
for session in sessions:
nb = session.get("notebook", session.get("path", {}))
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
if nb_path == notebook_path:
kernel = session.get("kernel", {})
return kernel.get("id")
return None
def _get_notebook_content(notebook_path: str, server_url: str, token: str) -> dict:
"""Lee el notebook completo via /api/contents."""
url = f"{server_url}/api/contents/{notebook_path}?content=1&type=notebook"
req = Request(url, headers=_auth_headers(token))
with urlopen(req, timeout=15) as resp:
return json.loads(resp.read())
def _put_notebook_content(notebook_path: str, server_url: str, token: str, content: dict) -> None:
"""Persiste el notebook via PUT /api/contents."""
body = json.dumps({"type": "notebook", "format": "json", "content": content}).encode("utf-8")
url = f"{server_url}/api/contents/{notebook_path}"
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
with urlopen(req, timeout=15) as resp:
resp.read()
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
"""Convierte outputs nbformat a strings legibles."""
result: list[str] = []
for output in raw_outputs:
output_type = output.get("output_type", "")
if output_type == "stream":
text = output.get("text", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type in ("display_data", "execute_result"):
data = output.get("data", {})
text = data.get("text/plain", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type == "error":
traceback = output.get("traceback", [])
result.append("\n".join(traceback))
return result
def _kernel_outputs_to_nbformat(outputs: list[dict]) -> list[dict]:
return [dict(o) for o in outputs]
def _restart_kernel_and_wait(server_url: str, token: str, kernel_id: str, poll_timeout_s: int = 30) -> None:
"""Reinicia el kernel y espera hasta que vuelva a estado idle."""
url = f"{server_url}/api/kernels/{kernel_id}/restart"
_api_post(url, token)
deadline = time.monotonic() + poll_timeout_s
while time.monotonic() < deadline:
kernels = _api_get(f"{server_url}/api/kernels", token) or []
for k in kernels:
if k.get("id") == kernel_id:
state = k.get("execution_state", "")
if state == "idle":
return
break
time.sleep(0.5)
# ---------------------------------------------------------------------------
# API publica
# ---------------------------------------------------------------------------
def jupyter_run_all(
notebook_path: str,
server_url: str = "http://localhost:8888",
token: str = "",
restart_kernel: bool = True,
stop_on_error: bool = True,
timeout_per_cell_s: int = 600,
) -> dict[str, Any]:
"""Ejecuta todas las celdas de codigo del notebook en orden.
Equivalente al boton "Run All" del UI de Jupyter Lab. Si restart_kernel
es True, reinicia el kernel del notebook ANTES de empezar para garantizar
un estado completamente limpio (sin variables residuales de ejecuciones
anteriores).
Args:
notebook_path: Ruta relativa al notebook desde la raiz del servidor
(ej: "notebooks/analisis.ipynb").
server_url: URL base del servidor Jupyter (default http://localhost:8888).
token: Token de autenticacion. Vacio si el servidor no requiere auth.
restart_kernel: Si True, reinicia el kernel antes de empezar.
Garantiza estado limpio. Default True.
stop_on_error: Si True, detiene la ejecucion cuando una celda produce
un error (output_type == "error"). Default True.
timeout_per_cell_s: Timeout en segundos por celda. Default 600 (10 min).
Returns:
{
"notebook": str, # ruta del notebook
"code_cell_indices": [int], # indices de celdas de codigo ejecutadas
"executed": [ # resultado de cada celda ejecutada
{
"cell_index": int,
"execution_count": int | None,
"outputs": [str | dict],
"error": str | None, # mensaje del error si hubo, sino None
"duration_s": float,
}
],
"stopped_at": int | None, # cell_index donde se detuvo por error
"kernel_id": str,
"total_duration_s": float,
}
Raises:
RuntimeError: Si no existe sesion activa para el notebook (el kernel
no esta corriendo). Usar jupyter_exec para crear sesion.
HTTPError: Si el servidor Jupyter devuelve un error HTTP.
URLError: Si no se puede conectar al servidor.
"""
t0 = time.monotonic()
# 1. Obtener kernel_id del notebook
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
if not kernel_id:
raise RuntimeError(
f"No hay sesion activa para '{notebook_path}'. "
"Abre el notebook en Jupyter Lab o usa jupyter_exec para crear sesion."
)
# 2. Reiniciar kernel si se solicita
if restart_kernel:
_restart_kernel_and_wait(server_url, token, kernel_id)
# 3. Leer notebook y filtrar indices de celdas de codigo
file_node = _get_notebook_content(notebook_path, server_url, token)
nb = file_node["content"]
cells = nb.get("cells", [])
code_cell_indices = [
i for i, cell in enumerate(cells)
if cell.get("cell_type") == "code"
]
# 4. Ejecutar cada celda en orden
executed: list[dict] = []
stopped_at: int | None = None
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
for cell_index in code_cell_indices:
cell = cells[cell_index]
source = cell.get("source", "")
if isinstance(source, list):
source = "".join(source)
# Saltar celdas vacias
if not source.strip():
executed.append({
"cell_index": cell_index,
"execution_count": None,
"outputs": [],
"error": None,
"duration_s": 0.0,
})
continue
cell_t0 = time.monotonic()
result = kernel.execute(source)
cell_duration = time.monotonic() - cell_t0
raw_outputs = result.get("outputs", [])
cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
cell["execution_count"] = result.get("execution_count")
# Detectar error en outputs
error_msg: str | None = None
for out in raw_outputs:
if out.get("output_type") == "error":
ename = out.get("ename", "Error")
evalue = out.get("evalue", "")
error_msg = f"{ename}: {evalue}"
break
executed.append({
"cell_index": cell_index,
"execution_count": result.get("execution_count"),
"outputs": _extract_outputs(raw_outputs),
"error": error_msg,
"duration_s": round(cell_duration, 3),
})
if error_msg and stop_on_error:
stopped_at = cell_index
break
# 5. Persistir notebook con outputs actualizados
_put_notebook_content(notebook_path, server_url, token, nb)
total_duration = time.monotonic() - t0
return {
"notebook": notebook_path,
"code_cell_indices": code_cell_indices,
"executed": executed,
"stopped_at": stopped_at,
"kernel_id": kernel_id,
"total_duration_s": round(total_duration, 3),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Ejecuta todas las celdas de codigo de un notebook en orden (Run All)"
)
parser.add_argument("notebook", help="Ruta del notebook relativa al servidor Jupyter")
parser.add_argument("--server", default="http://localhost:8888", help="URL del servidor Jupyter")
parser.add_argument("--token", default="", help="Token de autenticacion")
parser.add_argument(
"--no-restart",
action="store_true",
help="No reiniciar el kernel antes de ejecutar",
)
parser.add_argument(
"--continue-on-error",
action="store_true",
help="Continuar ejecucion aunque una celda produzca error",
)
parser.add_argument(
"--timeout",
type=int,
default=600,
help="Timeout en segundos por celda (default: 600)",
)
args = parser.parse_args()
try:
result = jupyter_run_all(
notebook_path=args.notebook,
server_url=args.server,
token=args.token,
restart_kernel=not args.no_restart,
stop_on_error=not args.continue_on_error,
timeout_per_cell_s=args.timeout,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)