"""Ejecuta codigo en kernels de Jupyter. Tres modos: - append: añade una celda al final del notebook y la ejecuta - cell: ejecuta una celda existente por indice - kernel: ejecuta codigo directamente en el kernel sin tocar notebook Implementacion basada en REST `/api/contents` + `KernelClient` (websocket clasico al kernel). NO usa `jupyter_nbmodel_client` ni el canal colaborativo Y.js, por lo que es robusto frente a versiones nuevas de `jupyter-collaboration` (ver issue 0050). Trade-off: los cambios al notebook se persisten a disco; Jupyter Lab los detecta via file watch (puede pedir 'Revert to disk' o 'Overwrite' segun version). """ import json import uuid from typing import Any from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen from jupyter_kernel_client import KernelClient # --------------------------------------------------------------------------- # Helpers REST # --------------------------------------------------------------------------- def _auth_headers(token: str, content_type: bool = False) -> dict[str, str]: headers = {"Accept": "application/json"} if content_type: headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"token {token}" return headers def _notebook_exists(notebook_path: str, server_url: str, token: str) -> bool: """Comprueba si un notebook existe via GET /api/contents (con `content=0`). Nota: Jupyter Server no soporta HEAD en /api/contents (responde 405). Usamos GET con content=0 para evitar transferir el cuerpo completo. """ check_url = f"{server_url}/api/contents/{notebook_path}?content=0" req = Request(check_url, headers=_auth_headers(token), method="GET") try: with urlopen(req, timeout=5): return True except HTTPError as e: if e.code == 404: return False raise def _create_notebook(notebook_path: str, server_url: str, token: str, kernel_name: str = "python3") -> None: """Crea un notebook vacio via PUT /api/contents si no existe.""" if _notebook_exists(notebook_path, server_url, token): return kernel_display = {"python3": "Python 3 (ipykernel)", "python": "Python 3"}.get(kernel_name, kernel_name) notebook_content = { "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": {"name": kernel_name, "display_name": kernel_display, "language": "python"}, "language_info": {"name": "python"}, }, "cells": [], } body = json.dumps({"type": "notebook", "content": notebook_content}).encode("utf-8") url = f"{server_url}/api/contents/{notebook_path}" req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT") with urlopen(req, timeout=10) as resp: resp.read() def _get_notebook_content(notebook_path: str, server_url: str, token: str) -> dict: """Lee el notebook completo via GET /api/contents (con `content`).""" url = f"{server_url}/api/contents/{notebook_path}?content=1&type=notebook" req = Request(url, headers=_auth_headers(token), method="GET") with urlopen(req, timeout=10) as resp: return json.loads(resp.read()) def _put_notebook_content(notebook_path: str, server_url: str, token: str, content: dict) -> None: """Sobrescribe el notebook via PUT /api/contents.""" body = json.dumps({"type": "notebook", "format": "json", "content": content}).encode("utf-8") url = f"{server_url}/api/contents/{notebook_path}" req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT") with urlopen(req, timeout=10) as resp: resp.read() def _ensure_session(server_url: str, token: str, notebook_path: str, kernel_name: str = "python3") -> str: """Garantiza una sesion para el notebook. Retorna kernel_id. Si existe una sesion vinculada al notebook, reusa su kernel. Si no, crea sesion+kernel via POST /api/sessions. """ kernel_id = _resolve_kernel_id(server_url, token, notebook_path) if kernel_id: return kernel_id body = json.dumps({ "path": notebook_path, "type": "notebook", "kernel": {"name": kernel_name}, }).encode("utf-8") url = f"{server_url}/api/sessions" req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="POST") with urlopen(req, timeout=10) as resp: session = json.loads(resp.read()) return session.get("kernel", {}).get("id", "") def _api_get(url: str, token: str = "") -> dict | list | None: try: req = Request(url, headers=_auth_headers(token)) with urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except (URLError, OSError, json.JSONDecodeError): return None def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None: """Busca el kernel_id de la sesion del notebook via /api/sessions.""" sessions = _api_get(f"{server_url}/api/sessions", token) or [] for session in sessions: nb = session.get("notebook", session.get("path", {})) nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb) if nb_path == notebook_path: kernel = session.get("kernel", {}) return kernel.get("id") return None # --------------------------------------------------------------------------- # Helpers nbformat # --------------------------------------------------------------------------- def _new_code_cell(source: str) -> dict: """Crea un dict de celda de codigo nbformat 4.5 con todos los campos.""" return { "id": str(uuid.uuid4()), "cell_type": "code", "metadata": {}, "source": source, "outputs": [], "execution_count": None, } def _extract_outputs(raw_outputs: list[dict]) -> list[str]: """Convierte outputs de nbformat a lista de strings legibles.""" result: list[str] = [] for output in raw_outputs: output_type = output.get("output_type", "") if output_type == "stream": text = output.get("text", "") if isinstance(text, list): text = "".join(text) result.append(text.rstrip("\n")) elif output_type in ("display_data", "execute_result"): data = output.get("data", {}) text = data.get("text/plain", "") if isinstance(text, list): text = "".join(text) result.append(text.rstrip("\n")) elif output_type == "error": traceback = output.get("traceback", []) result.append("\n".join(traceback)) return result def _kernel_outputs_to_nbformat(outputs: list[dict]) -> list[dict]: """Normaliza outputs de KernelClient al esquema nbformat 4. KernelClient ya devuelve dicts con `output_type`, pero algunos casos (errores, streams) pueden venir con campos sueltos. Esta funcion los pasa tal cual: el cliente actual cumple el esquema; existe como punto de extension futuro. """ return [dict(o) for o in outputs] # --------------------------------------------------------------------------- # Modos # --------------------------------------------------------------------------- def jupyter_append_execute( notebook_path: str, code: str, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Añade una celda de codigo al final del notebook y la ejecuta. Persiste la celda + outputs a disco via REST `/api/contents`. Jupyter Lab detecta el cambio en el filesystem y lo refleja en el browser (puede pedir 'Revert to disk' segun version y conflictos). """ _create_notebook(notebook_path, server_url, token) kernel_id = _ensure_session(server_url, token, notebook_path) # Lee notebook, añade celda nueva file_node = _get_notebook_content(notebook_path, server_url, token) nb = file_node["content"] nb.setdefault("cells", []) new_cell = _new_code_cell(code) nb["cells"].append(new_cell) cell_index = len(nb["cells"]) - 1 # Ejecuta en el kernel del notebook with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel: result = kernel.execute(code) raw_outputs = result.get("outputs", []) new_cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs) new_cell["execution_count"] = result.get("execution_count") _put_notebook_content(notebook_path, server_url, token, nb) return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)} def jupyter_execute_cell( notebook_path: str, cell_index: int, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Ejecuta una celda existente por indice y persiste sus outputs.""" kernel_id = _ensure_session(server_url, token, notebook_path) file_node = _get_notebook_content(notebook_path, server_url, token) nb = file_node["content"] cells = nb.get("cells", []) if cell_index < 0 or cell_index >= len(cells): raise IndexError(f"cell_index {cell_index} fuera de rango (notebook tiene {len(cells)} celdas)") cell = cells[cell_index] if cell.get("cell_type") != "code": raise ValueError(f"La celda {cell_index} no es de codigo (cell_type={cell.get('cell_type')!r})") source = cell.get("source", "") if isinstance(source, list): source = "".join(source) with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel: result = kernel.execute(source) raw_outputs = result.get("outputs", []) cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs) cell["execution_count"] = result.get("execution_count") _put_notebook_content(notebook_path, server_url, token, nb) return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)} def jupyter_kernel_execute( code: str, server_url: str = "http://localhost:8888", token: str = "", ) -> dict[str, Any]: """Ejecuta codigo directo en el kernel sin tocar ningun notebook.""" with KernelClient(server_url=server_url, token=token) as kernel: result = kernel.execute(code) outputs = _extract_outputs(result.get("outputs", [])) return {"outputs": outputs, "status": result.get("status", "unknown")} # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse import sys parser = argparse.ArgumentParser(description="Ejecuta codigo en kernels de Jupyter") sub = parser.add_subparsers(dest="command", required=True) p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta") p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor") p_append.add_argument("code", help="Codigo a insertar y ejecutar") p_append.add_argument("--server", default="http://localhost:8888") p_append.add_argument("--token", default="") p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice") p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor") p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)") p_cell.add_argument("--server", default="http://localhost:8888") p_cell.add_argument("--token", default="") p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook") p_kernel.add_argument("code", help="Codigo a ejecutar") p_kernel.add_argument("--server", default="http://localhost:8888") p_kernel.add_argument("--token", default="") args = parser.parse_args() try: if args.command == "append": result = jupyter_append_execute(args.notebook, args.code, args.server, args.token) elif args.command == "cell": result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token) elif args.command == "kernel": result = jupyter_kernel_execute(args.code, args.server, args.token) else: parser.print_help() sys.exit(1) print(json.dumps(result, ensure_ascii=False, indent=2)) except Exception as exc: print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr) sys.exit(1)