Files
egutierrez fd5787c55f chore: auto-commit (43 archivos)
- .mcp.json
- bash/functions/infra/write_mcp_jupyter_config.md
- bash/functions/infra/write_mcp_jupyter_config.sh
- cpp/CMakeLists.txt
- cpp/apps/chart_demo
- cpp/apps/shaders_lab
- cpp/functions/gfx/gl_framebuffer.cpp
- cpp/functions/gfx/gl_framebuffer.h
- cpp/functions/gfx/gl_framebuffer.md
- cpp/functions/gfx/mesh_gpu.md
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-30 17:28:47 +02:00

193 lines
6.7 KiB
Python

"""Ejecuta un lote de celdas existentes en una sola conexion WebSocket.
A diferencia de `jupyter_execute_cell` (que abre/cierra un WS por celda),
esta funcion comparte una unica sesion WS para todas las celdas del lote.
Un solo GET /api/contents al inicio + un solo PUT al final.
Latencia total: ~3s fija (overhead) + tiempo de ejecucion real de las celdas,
en lugar de ~3s * N (una conexion por celda con `jupyter_execute_cell`).
"""
import json
import time
from typing import Any
from jupyter_kernel_client import KernelClient
from notebook.jupyter_exec import (
_ensure_session,
_extract_outputs,
_get_notebook_content,
_kernel_outputs_to_nbformat,
_put_notebook_content,
)
def jupyter_run_cells(
notebook_path: str,
cell_indices: list[int],
server_url: str = "http://localhost:8888",
token: str = "",
stop_on_error: bool = True,
timeout_per_cell_s: int = 600,
) -> dict[str, Any]:
"""Ejecuta una lista de celdas existentes (por indice) en un solo paso.
Abre UNA conexion WebSocket para todo el lote. Lee el notebook una vez al
inicio y lo persiste una vez al final. Si `stop_on_error` es True y una
celda produce un output de tipo 'error', el lote para en esa celda y el
PUT se hace con los outputs ejecutados hasta ese momento.
Args:
notebook_path: Ruta relativa al notebook (relativa a la raiz del
servidor Jupyter).
cell_indices: Lista de indices de celdas a ejecutar (0-based, en
orden). Deben ser celdas de tipo 'code'.
server_url: URL del servidor Jupyter (default 'http://localhost:8888').
token: Token de autenticacion (default vacio = sin auth).
stop_on_error: Si True, para al encontrar el primer error en outputs.
timeout_per_cell_s: Timeout en segundos por cada ejecucion individual
de celda (pasado a KernelClient.execute).
Returns:
{
"notebook": str, # notebook_path recibido
"executed": [ # celdas ejecutadas (en orden)
{
"cell_index": int,
"execution_count": int | None,
"outputs": list[str], # strings legibles (via _extract_outputs)
"error": str | None, # traceback si hubo error, else None
"duration_s": float,
},
...
],
"stopped_at": int | None, # indice donde paro si stop_on_error
"kernel_id": str,
"total_duration_s": float,
}
"""
t_total_start = time.monotonic()
kernel_id = _ensure_session(server_url, token, notebook_path)
file_node = _get_notebook_content(notebook_path, server_url, token)
nb = file_node["content"]
cells = nb.get("cells", [])
# Validacion anticipada: todos los indices deben estar en rango y ser code
for idx in cell_indices:
if idx < 0 or idx >= len(cells):
raise IndexError(
f"cell_index {idx} fuera de rango (notebook tiene {len(cells)} celdas)"
)
if cells[idx].get("cell_type") != "code":
raise ValueError(
f"La celda {idx} no es de codigo (cell_type={cells[idx].get('cell_type')!r})"
)
executed: list[dict[str, Any]] = []
stopped_at: int | None = None
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
for idx in cell_indices:
cell = cells[idx]
source = cell.get("source", "")
if isinstance(source, list):
source = "".join(source)
t_cell_start = time.monotonic()
result = kernel.execute(source, timeout=timeout_per_cell_s)
duration_s = round(time.monotonic() - t_cell_start, 3)
raw_outputs = result.get("outputs", [])
cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
cell["execution_count"] = result.get("execution_count")
readable_outputs = _extract_outputs(raw_outputs)
# Detectar si hubo error
error_text: str | None = None
for out in raw_outputs:
if out.get("output_type") == "error":
tb = out.get("traceback", [])
error_text = "\n".join(tb) if isinstance(tb, list) else str(tb)
break
executed.append({
"cell_index": idx,
"execution_count": result.get("execution_count"),
"outputs": readable_outputs,
"error": error_text,
"duration_s": duration_s,
})
if stop_on_error and error_text is not None:
stopped_at = idx
break
# Persiste notebook con todos los outputs actualizados de una vez
_put_notebook_content(notebook_path, server_url, token, nb)
return {
"notebook": notebook_path,
"executed": executed,
"stopped_at": stopped_at,
"kernel_id": kernel_id,
"total_duration_s": round(time.monotonic() - t_total_start, 3),
}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Ejecuta un lote de celdas en un solo paso WS"
)
parser.add_argument("notebook", help="Ruta al notebook relativa al servidor")
parser.add_argument(
"indices",
nargs="*",
type=int,
help="Indices de celdas a ejecutar (0-based). Si se omiten, lee JSON de stdin.",
)
parser.add_argument("--server", default="http://localhost:8888")
parser.add_argument("--token", default="")
parser.add_argument(
"--no-stop-on-error",
action="store_true",
help="Continuar aunque una celda emita error",
)
parser.add_argument("--timeout", type=int, default=600, help="Timeout por celda en segundos")
args = parser.parse_args()
if args.indices:
indices = args.indices
else:
raw = sys.stdin.read().strip()
indices = json.loads(raw) if raw else []
if not indices:
print(json.dumps({"error": "No se especificaron indices de celdas"}), file=sys.stderr)
sys.exit(1)
try:
result = jupyter_run_cells(
notebook_path=args.notebook,
cell_indices=indices,
server_url=args.server,
token=args.token,
stop_on_error=not args.no_stop_on_error,
timeout_per_cell_s=args.timeout,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)