Files
fn_registry/python/functions/notebook/jupyter_exec.py
egutierrez fc8062bade feat: enhance jupyter notebook functions with auto-init and kernel management
Auto-create notebooks y sesiones en jupyter_exec (append y cell).
Auto-create en jupyter_write (append_code, append_markdown, batch).
Nuevos subcomandos cleanup y shutdown-all en jupyter_kernel.
README.md renombrado a README.txt para evitar error de parseo del indexer.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 00:10:23 +02:00

395 lines
14 KiB
Python

"""Ejecuta codigo en kernels de Jupyter via WebSocket.
Tres modos de ejecucion:
- append: añade una celda al final del notebook y la ejecuta
- cell: ejecuta una celda existente por indice
- kernel: ejecuta codigo directamente en el kernel sin modificar ningun notebook
"""
import asyncio
import json
from functools import partial
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from jupyter_kernel_client import KernelClient
from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url
from nbformat import NotebookNode
# ---------------------------------------------------------------------------
# Helpers internos
# ---------------------------------------------------------------------------
def _notebook_exists(notebook_path: str, server_url: str, token: str) -> bool:
"""Comprueba si un notebook existe en el servidor Jupyter via HEAD /api/contents."""
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
check_url = f"{server_url}/api/contents/{notebook_path}"
req = Request(check_url, headers=headers, method="HEAD")
try:
with urlopen(req, timeout=5):
return True
except HTTPError as e:
if e.code == 404:
return False
raise
def _create_notebook(notebook_path: str, server_url: str, token: str, kernel_name: str = "python3") -> None:
"""Crea un notebook vacio via PUT /api/contents si no existe."""
if _notebook_exists(notebook_path, server_url, token):
return
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
}
if token:
headers["Authorization"] = f"token {token}"
kernel_display = {"python3": "Python 3 (ipykernel)", "python": "Python 3"}.get(kernel_name, kernel_name)
notebook_content = {
"nbformat": 4,
"nbformat_minor": 5,
"metadata": {
"kernelspec": {"name": kernel_name, "display_name": kernel_display, "language": "python"},
"language_info": {"name": "python"},
},
"cells": [],
}
body = json.dumps({"type": "notebook", "content": notebook_content}).encode("utf-8")
url = f"{server_url}/api/contents/{notebook_path}"
req = Request(url, data=body, headers=headers, method="PUT")
with urlopen(req, timeout=10) as resp:
resp.read()
def _ensure_session(server_url: str, token: str, notebook_path: str, kernel_name: str = "python3") -> str:
"""Garantiza que exista una sesion para el notebook. Retorna el kernel_id.
Si ya hay una sesion activa, retorna su kernel_id. Si no, crea una nueva
via POST /api/sessions (lo cual tambien arranca un kernel).
"""
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
if kernel_id:
return kernel_id
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if token:
headers["Authorization"] = f"token {token}"
body = json.dumps({
"path": notebook_path,
"type": "notebook",
"kernel": {"name": kernel_name},
}).encode("utf-8")
url = f"{server_url}/api/sessions"
req = Request(url, data=body, headers=headers, method="POST")
with urlopen(req, timeout=10) as resp:
session = json.loads(resp.read())
return session.get("kernel", {}).get("id", "")
def _api_get(url: str, token: str = "") -> dict | list | None:
"""GET a Jupyter REST API endpoint."""
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except (URLError, OSError, json.JSONDecodeError):
return None
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
"""Find the kernel_id associated with a notebook via the sessions API."""
sessions = _api_get(f"{server_url}/api/sessions", token) or []
for session in sessions:
nb = session.get("notebook", session.get("path", {}))
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
if nb_path == notebook_path:
kernel = session.get("kernel", {})
return kernel.get("id")
return None
def _resolve_collab_username(server_url: str, token: str) -> str:
"""Resolve the display name of the active user in Jupyter collaboration.
Queries /api/me to get the identity Jupyter assigned to the browser user.
Falls back to 'Anonymous' if unavailable.
"""
me = _api_get(f"{server_url}/api/me", token)
if me:
identity = me.get("identity", {})
return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous")
return "Anonymous"
def _normalize_code_cell(cell: NotebookNode) -> dict:
"""Devuelve un dict de celda de codigo con todos los campos requeridos por nbformat.
Celdas creadas manualmente (no via Jupyter UI) pueden omitir 'outputs' o
'execution_count'. El modelo CRDT de jupyter_nbmodel_client accede a estos
campos sin comprobar su existencia, produciendo KeyError al ejecutar.
Este helper garantiza que el dict tenga la estructura completa.
"""
return {
"id": cell.get("id", ""),
"cell_type": "code",
"metadata": cell.get("metadata", {}),
"source": cell.get("source", ""),
"outputs": cell.get("outputs", []),
"execution_count": cell.get("execution_count", None),
}
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
"""Convierte outputs de nbformat a lista de strings legibles."""
result: list[str] = []
for output in raw_outputs:
output_type = output.get("output_type", "")
if output_type == "stream":
text = output.get("text", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type in ("display_data", "execute_result"):
data = output.get("data", {})
text = data.get("text/plain", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type == "error":
traceback = output.get("traceback", [])
result.append("\n".join(traceback))
return result
# ---------------------------------------------------------------------------
# Modo append (async interno)
# ---------------------------------------------------------------------------
async def _async_append_execute(
notebook_path: str,
code: str,
server_url: str,
token: str,
) -> dict[str, Any]:
_create_notebook(notebook_path, server_url, token)
kernel_id = _ensure_session(server_url, token, notebook_path)
ws_url = get_jupyter_notebook_websocket_url(
server_url,
notebook_path,
token or None,
)
username = _resolve_collab_username(server_url, token)
async with NbModelClient(ws_url, username=username) as nb:
await nb.wait_until_synced()
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
cell_index = nb.add_code_cell(code)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, partial(nb.execute_cell, cell_index, kernel),
)
# Let Y.js propagate changes to other clients (browser)
await asyncio.sleep(2)
outputs = _extract_outputs(result.get("outputs", []))
return {"cell_index": cell_index, "outputs": outputs}
# ---------------------------------------------------------------------------
# Modo cell (async interno)
# ---------------------------------------------------------------------------
async def _async_execute_cell(
notebook_path: str,
cell_index: int,
server_url: str,
token: str,
) -> dict[str, Any]:
kernel_id = _ensure_session(server_url, token, notebook_path)
ws_url = get_jupyter_notebook_websocket_url(
server_url,
notebook_path,
token or None,
)
username = _resolve_collab_username(server_url, token)
async with NbModelClient(ws_url, username=username) as nb:
await nb.wait_until_synced()
# Normalizar la celda antes de ejecutar. Las celdas creadas manualmente
# (sin pasar por la UI de Jupyter) pueden carecer de los campos 'outputs'
# o 'execution_count' en el modelo CRDT, lo que provoca KeyError dentro
# de execute_cell al intentar hacer `del ycell["outputs"][:]`.
# Reemplazar la celda via __setitem__ fuerza la re-creacion completa del
# mapa CRDT con todos los campos requeridos por nbformat.
cell = nb[cell_index]
if cell.get("cell_type") == "code" and (
"outputs" not in cell or "execution_count" not in cell
):
nb[cell_index] = _normalize_code_cell(cell)
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, partial(nb.execute_cell, cell_index, kernel),
)
await asyncio.sleep(2)
outputs = _extract_outputs(result.get("outputs", []))
return {"cell_index": cell_index, "outputs": outputs}
# ---------------------------------------------------------------------------
# API publica
# ---------------------------------------------------------------------------
def jupyter_append_execute(
notebook_path: str,
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Añade una celda de codigo al final del notebook y la ejecuta.
Tanto el agente como el usuario ven la celda y su output en tiempo real
porque la escritura se realiza a traves del protocolo colaborativo de Jupyter.
Args:
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
code: Codigo Python a insertar y ejecutar.
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'cell_index' (indice de la nueva celda) y 'outputs' (lista de strings).
Raises:
Exception: si no se puede conectar al servidor o al kernel.
"""
return asyncio.run(_async_append_execute(notebook_path, code, server_url, token))
def jupyter_execute_cell(
notebook_path: str,
cell_index: int,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta una celda existente del notebook por indice.
Args:
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
cell_index: Indice de la celda a ejecutar (0-based).
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'cell_index' y 'outputs' (lista de strings).
Raises:
IndexError: si cell_index esta fuera de rango.
Exception: si no se puede conectar al servidor o al kernel.
"""
return asyncio.run(_async_execute_cell(notebook_path, cell_index, server_url, token))
def jupyter_kernel_execute(
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta codigo directamente en el kernel sin modificar ningun notebook.
Util para consultas rapidas, inspeccion de variables, comprobaciones de estado.
Args:
code: Codigo Python a ejecutar en el kernel activo.
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'outputs' (lista de strings) y 'status' ('ok' o 'error').
Raises:
Exception: si no se puede conectar al servidor o al kernel.
"""
with KernelClient(server_url=server_url, token=token) as kernel:
result = kernel.execute(code)
outputs = _extract_outputs(result.get("outputs", []))
return {"outputs": outputs, "status": result.get("status", "unknown")}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Ejecuta codigo en kernels de Jupyter",
)
sub = parser.add_subparsers(dest="command", required=True)
# append
p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta")
p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_append.add_argument("code", help="Codigo a insertar y ejecutar")
p_append.add_argument("--server", default="http://localhost:8888")
p_append.add_argument("--token", default="")
# cell
p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice")
p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)")
p_cell.add_argument("--server", default="http://localhost:8888")
p_cell.add_argument("--token", default="")
# kernel
p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook")
p_kernel.add_argument("code", help="Codigo a ejecutar")
p_kernel.add_argument("--server", default="http://localhost:8888")
p_kernel.add_argument("--token", default="")
args = parser.parse_args()
try:
if args.command == "append":
result = jupyter_append_execute(args.notebook, args.code, args.server, args.token)
elif args.command == "cell":
result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token)
elif args.command == "kernel":
result = jupyter_kernel_execute(args.code, args.server, args.token)
else:
parser.print_help()
sys.exit(1)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)