"""Ejecuta codigo en kernels de Jupyter via WebSocket. Tres modos de ejecucion: - append: añade una celda al final del notebook y la ejecuta - cell: ejecuta una celda existente por indice - kernel: ejecuta codigo directamente en el kernel sin modificar ningun notebook """ import asyncio import json from typing import Any from urllib.error import URLError from urllib.request import Request, urlopen from jupyter_kernel_client import KernelClient from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url # --------------------------------------------------------------------------- # Helpers internos # --------------------------------------------------------------------------- def _api_get(url: str, token: str = "") -> dict | list | None: """GET a Jupyter REST API endpoint.""" headers = {"Accept": "application/json"} if token: headers["Authorization"] = f"token {token}" try: req = Request(url, headers=headers) with urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except (URLError, OSError, json.JSONDecodeError): return None def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None: """Find the kernel_id associated with a notebook via the sessions API.""" sessions = _api_get(f"{server_url}/api/sessions", token) or [] for session in sessions: nb = session.get("notebook", session.get("path", {})) nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb) if nb_path == notebook_path: kernel = session.get("kernel", {}) return kernel.get("id") return None def _resolve_collab_username(server_url: str, token: str) -> str: """Resolve the display name of the active user in Jupyter collaboration. Queries /api/me to get the identity Jupyter assigned to the browser user. Falls back to 'Anonymous' if unavailable. """ me = _api_get(f"{server_url}/api/me", token) if me: identity = me.get("identity", {}) return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous") return "Anonymous" def _extract_outputs(raw_outputs: list[dict]) -> list[str]: """Convierte outputs de nbformat a lista de strings legibles.""" result: list[str] = [] for output in raw_outputs: output_type = output.get("output_type", "") if output_type == "stream": text = output.get("text", "") if isinstance(text, list): text = "".join(text) result.append(text.rstrip("\n")) elif output_type in ("display_data", "execute_result"): data = output.get("data", {}) text = data.get("text/plain", "") if isinstance(text, list): text = "".join(text) result.append(text.rstrip("\n")) elif output_type == "error": traceback = output.get("traceback", []) result.append("\n".join(traceback)) return result # --------------------------------------------------------------------------- # Modo append (async interno) # --------------------------------------------------------------------------- async def _async_append_execute( notebook_path: str, code: str, server_url: str, token: str, ) -> dict[str, Any]: ws_url = get_jupyter_notebook_websocket_url( server_url, notebook_path, token or None, ) kernel_id = _resolve_kernel_id(server_url, token, notebook_path) username = _resolve_collab_username(server_url, token) async with NbModelClient(ws_url, username=username) as nb: await nb.wait_until_synced() with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel: cell_index = nb.add_code_cell(code) result = nb.execute_cell(cell_index, kernel) # Let Y.js propagate changes to other clients (browser) await asyncio.sleep(2) outputs = _extract_outputs(result.get("outputs", [])) return {"cell_index": cell_index, "outputs": outputs} # --------------------------------------------------------------------------- # Modo cell (async interno) # --------------------------------------------------------------------------- async def _async_execute_cell( notebook_path: str, cell_index: int, server_url: str, token: str, ) -> dict[str, Any]: ws_url = get_jupyter_notebook_websocket_url( server_url, notebook_path, token or None, ) kernel_id = _resolve_kernel_id(server_url, token, notebook_path) username = _resolve_collab_username(server_url, token) async with NbModelClient(ws_url, username=username) as nb: await nb.wait_until_synced() with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel: result = nb.execute_cell(cell_index, kernel) await asyncio.sleep(2) outputs = _extract_outputs(result.get("outputs", [])) return {"cell_index": cell_index, "outputs": outputs} # --------------------------------------------------------------------------- # API publica # --------------------------------------------------------------------------- def jupyter_append_execute( notebook_path: str, code: str, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Añade una celda de codigo al final del notebook y la ejecuta. Tanto el agente como el usuario ven la celda y su output en tiempo real porque la escritura se realiza a traves del protocolo colaborativo de Jupyter. Args: notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter. code: Codigo Python a insertar y ejecutar. server_url: URL del servidor Jupyter; por defecto http://localhost:8888. token: Token de autenticacion del servidor Jupyter. Returns: dict con 'cell_index' (indice de la nueva celda) y 'outputs' (lista de strings). Raises: Exception: si no se puede conectar al servidor o al kernel. """ return asyncio.run(_async_append_execute(notebook_path, code, server_url, token)) def jupyter_execute_cell( notebook_path: str, cell_index: int, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Ejecuta una celda existente del notebook por indice. Args: notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter. cell_index: Indice de la celda a ejecutar (0-based). server_url: URL del servidor Jupyter; por defecto http://localhost:8888. token: Token de autenticacion del servidor Jupyter. Returns: dict con 'cell_index' y 'outputs' (lista de strings). Raises: IndexError: si cell_index esta fuera de rango. Exception: si no se puede conectar al servidor o al kernel. """ return asyncio.run(_async_execute_cell(notebook_path, cell_index, server_url, token)) def jupyter_kernel_execute( code: str, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Ejecuta codigo directamente en el kernel sin modificar ningun notebook. Util para consultas rapidas, inspeccion de variables, comprobaciones de estado. Args: code: Codigo Python a ejecutar en el kernel activo. server_url: URL del servidor Jupyter; por defecto http://localhost:8888. token: Token de autenticacion del servidor Jupyter. Returns: dict con 'outputs' (lista de strings) y 'status' ('ok' o 'error'). Raises: Exception: si no se puede conectar al servidor o al kernel. """ with KernelClient(server_url=server_url, token=token) as kernel: result = kernel.execute(code) outputs = _extract_outputs(result.get("outputs", [])) return {"outputs": outputs, "status": result.get("status", "unknown")} # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse import sys parser = argparse.ArgumentParser( description="Ejecuta codigo en kernels de Jupyter", ) sub = parser.add_subparsers(dest="command", required=True) # append p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta") p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor") p_append.add_argument("code", help="Codigo a insertar y ejecutar") p_append.add_argument("--server", default="http://localhost:8888") p_append.add_argument("--token", default="") # cell p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice") p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor") p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)") p_cell.add_argument("--server", default="http://localhost:8888") p_cell.add_argument("--token", default="") # kernel p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook") p_kernel.add_argument("code", help="Codigo a ejecutar") p_kernel.add_argument("--server", default="http://localhost:8888") p_kernel.add_argument("--token", default="") args = parser.parse_args() try: if args.command == "append": result = jupyter_append_execute(args.notebook, args.code, args.server, args.token) elif args.command == "cell": result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token) elif args.command == "kernel": result = jupyter_kernel_execute(args.code, args.server, args.token) else: parser.print_help() sys.exit(1) print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as exc: print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr) sys.exit(1)