89e0fb58c6
Evita bloquear el event loop asyncio ejecutando execute_cell (operación síncrona con websocket) en un thread executor. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
288 lines
10 KiB
Python
288 lines
10 KiB
Python
"""Ejecuta codigo en kernels de Jupyter via WebSocket.
|
|
|
|
Tres modos de ejecucion:
|
|
- append: añade una celda al final del notebook y la ejecuta
|
|
- cell: ejecuta una celda existente por indice
|
|
- kernel: ejecuta codigo directamente en el kernel sin modificar ningun notebook
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
from functools import partial
|
|
from typing import Any
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from jupyter_kernel_client import KernelClient
|
|
from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers internos
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _api_get(url: str, token: str = "") -> dict | list | None:
|
|
"""GET a Jupyter REST API endpoint."""
|
|
headers = {"Accept": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"token {token}"
|
|
try:
|
|
req = Request(url, headers=headers)
|
|
with urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read())
|
|
except (URLError, OSError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
|
|
"""Find the kernel_id associated with a notebook via the sessions API."""
|
|
sessions = _api_get(f"{server_url}/api/sessions", token) or []
|
|
for session in sessions:
|
|
nb = session.get("notebook", session.get("path", {}))
|
|
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
|
|
if nb_path == notebook_path:
|
|
kernel = session.get("kernel", {})
|
|
return kernel.get("id")
|
|
return None
|
|
|
|
|
|
def _resolve_collab_username(server_url: str, token: str) -> str:
|
|
"""Resolve the display name of the active user in Jupyter collaboration.
|
|
|
|
Queries /api/me to get the identity Jupyter assigned to the browser user.
|
|
Falls back to 'Anonymous' if unavailable.
|
|
"""
|
|
me = _api_get(f"{server_url}/api/me", token)
|
|
if me:
|
|
identity = me.get("identity", {})
|
|
return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous")
|
|
return "Anonymous"
|
|
|
|
|
|
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
|
|
"""Convierte outputs de nbformat a lista de strings legibles."""
|
|
result: list[str] = []
|
|
for output in raw_outputs:
|
|
output_type = output.get("output_type", "")
|
|
if output_type == "stream":
|
|
text = output.get("text", "")
|
|
if isinstance(text, list):
|
|
text = "".join(text)
|
|
result.append(text.rstrip("\n"))
|
|
elif output_type in ("display_data", "execute_result"):
|
|
data = output.get("data", {})
|
|
text = data.get("text/plain", "")
|
|
if isinstance(text, list):
|
|
text = "".join(text)
|
|
result.append(text.rstrip("\n"))
|
|
elif output_type == "error":
|
|
traceback = output.get("traceback", [])
|
|
result.append("\n".join(traceback))
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modo append (async interno)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _async_append_execute(
|
|
notebook_path: str,
|
|
code: str,
|
|
server_url: str,
|
|
token: str,
|
|
) -> dict[str, Any]:
|
|
ws_url = get_jupyter_notebook_websocket_url(
|
|
server_url,
|
|
notebook_path,
|
|
token or None,
|
|
)
|
|
|
|
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
|
|
username = _resolve_collab_username(server_url, token)
|
|
|
|
async with NbModelClient(ws_url, username=username) as nb:
|
|
await nb.wait_until_synced()
|
|
|
|
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
|
|
cell_index = nb.add_code_cell(code)
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None, partial(nb.execute_cell, cell_index, kernel),
|
|
)
|
|
|
|
# Let Y.js propagate changes to other clients (browser)
|
|
await asyncio.sleep(2)
|
|
|
|
outputs = _extract_outputs(result.get("outputs", []))
|
|
return {"cell_index": cell_index, "outputs": outputs}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modo cell (async interno)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def _async_execute_cell(
|
|
notebook_path: str,
|
|
cell_index: int,
|
|
server_url: str,
|
|
token: str,
|
|
) -> dict[str, Any]:
|
|
ws_url = get_jupyter_notebook_websocket_url(
|
|
server_url,
|
|
notebook_path,
|
|
token or None,
|
|
)
|
|
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
|
|
username = _resolve_collab_username(server_url, token)
|
|
|
|
async with NbModelClient(ws_url, username=username) as nb:
|
|
await nb.wait_until_synced()
|
|
|
|
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None, partial(nb.execute_cell, cell_index, kernel),
|
|
)
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
outputs = _extract_outputs(result.get("outputs", []))
|
|
return {"cell_index": cell_index, "outputs": outputs}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API publica
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def jupyter_append_execute(
|
|
notebook_path: str,
|
|
code: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Añade una celda de codigo al final del notebook y la ejecuta.
|
|
|
|
Tanto el agente como el usuario ven la celda y su output en tiempo real
|
|
porque la escritura se realiza a traves del protocolo colaborativo de Jupyter.
|
|
|
|
Args:
|
|
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
|
|
code: Codigo Python a insertar y ejecutar.
|
|
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
|
|
token: Token de autenticacion del servidor Jupyter.
|
|
|
|
Returns:
|
|
dict con 'cell_index' (indice de la nueva celda) y 'outputs' (lista de strings).
|
|
|
|
Raises:
|
|
Exception: si no se puede conectar al servidor o al kernel.
|
|
"""
|
|
return asyncio.run(_async_append_execute(notebook_path, code, server_url, token))
|
|
|
|
|
|
def jupyter_execute_cell(
|
|
notebook_path: str,
|
|
cell_index: int,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Ejecuta una celda existente del notebook por indice.
|
|
|
|
Args:
|
|
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
|
|
cell_index: Indice de la celda a ejecutar (0-based).
|
|
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
|
|
token: Token de autenticacion del servidor Jupyter.
|
|
|
|
Returns:
|
|
dict con 'cell_index' y 'outputs' (lista de strings).
|
|
|
|
Raises:
|
|
IndexError: si cell_index esta fuera de rango.
|
|
Exception: si no se puede conectar al servidor o al kernel.
|
|
"""
|
|
return asyncio.run(_async_execute_cell(notebook_path, cell_index, server_url, token))
|
|
|
|
|
|
def jupyter_kernel_execute(
|
|
code: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Ejecuta codigo directamente en el kernel sin modificar ningun notebook.
|
|
|
|
Util para consultas rapidas, inspeccion de variables, comprobaciones de estado.
|
|
|
|
Args:
|
|
code: Codigo Python a ejecutar en el kernel activo.
|
|
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
|
|
token: Token de autenticacion del servidor Jupyter.
|
|
|
|
Returns:
|
|
dict con 'outputs' (lista de strings) y 'status' ('ok' o 'error').
|
|
|
|
Raises:
|
|
Exception: si no se puede conectar al servidor o al kernel.
|
|
"""
|
|
with KernelClient(server_url=server_url, token=token) as kernel:
|
|
result = kernel.execute(code)
|
|
|
|
outputs = _extract_outputs(result.get("outputs", []))
|
|
return {"outputs": outputs, "status": result.get("status", "unknown")}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import sys
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Ejecuta codigo en kernels de Jupyter",
|
|
)
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
# append
|
|
p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta")
|
|
p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor")
|
|
p_append.add_argument("code", help="Codigo a insertar y ejecutar")
|
|
p_append.add_argument("--server", default="http://localhost:8888")
|
|
p_append.add_argument("--token", default="")
|
|
|
|
# cell
|
|
p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice")
|
|
p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor")
|
|
p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)")
|
|
p_cell.add_argument("--server", default="http://localhost:8888")
|
|
p_cell.add_argument("--token", default="")
|
|
|
|
# kernel
|
|
p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook")
|
|
p_kernel.add_argument("code", help="Codigo a ejecutar")
|
|
p_kernel.add_argument("--server", default="http://localhost:8888")
|
|
p_kernel.add_argument("--token", default="")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
if args.command == "append":
|
|
result = jupyter_append_execute(args.notebook, args.code, args.server, args.token)
|
|
elif args.command == "cell":
|
|
result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token)
|
|
elif args.command == "kernel":
|
|
result = jupyter_kernel_execute(args.code, args.server, args.token)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
except Exception as exc:
|
|
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
|
|
sys.exit(1)
|