Files
fn_registry/python/functions/notebook/jupyter_exec.py
T
egutierrez 974f704214 fix: jupyter_exec usa run_in_executor para execute_cell
Evita bloquear el event loop asyncio ejecutando execute_cell (operación
síncrona con websocket) en un thread executor.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 22:03:59 +02:00

288 lines
10 KiB
Python

"""Ejecuta codigo en kernels de Jupyter via WebSocket.
Tres modos de ejecucion:
- append: añade una celda al final del notebook y la ejecuta
- cell: ejecuta una celda existente por indice
- kernel: ejecuta codigo directamente en el kernel sin modificar ningun notebook
"""
import asyncio
import json
from functools import partial
from typing import Any
from urllib.error import URLError
from urllib.request import Request, urlopen
from jupyter_kernel_client import KernelClient
from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url
# ---------------------------------------------------------------------------
# Helpers internos
# ---------------------------------------------------------------------------
def _api_get(url: str, token: str = "") -> dict | list | None:
"""GET a Jupyter REST API endpoint."""
headers = {"Accept": "application/json"}
if token:
headers["Authorization"] = f"token {token}"
try:
req = Request(url, headers=headers)
with urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except (URLError, OSError, json.JSONDecodeError):
return None
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
"""Find the kernel_id associated with a notebook via the sessions API."""
sessions = _api_get(f"{server_url}/api/sessions", token) or []
for session in sessions:
nb = session.get("notebook", session.get("path", {}))
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
if nb_path == notebook_path:
kernel = session.get("kernel", {})
return kernel.get("id")
return None
def _resolve_collab_username(server_url: str, token: str) -> str:
"""Resolve the display name of the active user in Jupyter collaboration.
Queries /api/me to get the identity Jupyter assigned to the browser user.
Falls back to 'Anonymous' if unavailable.
"""
me = _api_get(f"{server_url}/api/me", token)
if me:
identity = me.get("identity", {})
return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous")
return "Anonymous"
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
"""Convierte outputs de nbformat a lista de strings legibles."""
result: list[str] = []
for output in raw_outputs:
output_type = output.get("output_type", "")
if output_type == "stream":
text = output.get("text", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type in ("display_data", "execute_result"):
data = output.get("data", {})
text = data.get("text/plain", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type == "error":
traceback = output.get("traceback", [])
result.append("\n".join(traceback))
return result
# ---------------------------------------------------------------------------
# Modo append (async interno)
# ---------------------------------------------------------------------------
async def _async_append_execute(
notebook_path: str,
code: str,
server_url: str,
token: str,
) -> dict[str, Any]:
ws_url = get_jupyter_notebook_websocket_url(
server_url,
notebook_path,
token or None,
)
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
username = _resolve_collab_username(server_url, token)
async with NbModelClient(ws_url, username=username) as nb:
await nb.wait_until_synced()
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
cell_index = nb.add_code_cell(code)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, partial(nb.execute_cell, cell_index, kernel),
)
# Let Y.js propagate changes to other clients (browser)
await asyncio.sleep(2)
outputs = _extract_outputs(result.get("outputs", []))
return {"cell_index": cell_index, "outputs": outputs}
# ---------------------------------------------------------------------------
# Modo cell (async interno)
# ---------------------------------------------------------------------------
async def _async_execute_cell(
notebook_path: str,
cell_index: int,
server_url: str,
token: str,
) -> dict[str, Any]:
ws_url = get_jupyter_notebook_websocket_url(
server_url,
notebook_path,
token or None,
)
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
username = _resolve_collab_username(server_url, token)
async with NbModelClient(ws_url, username=username) as nb:
await nb.wait_until_synced()
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None, partial(nb.execute_cell, cell_index, kernel),
)
await asyncio.sleep(2)
outputs = _extract_outputs(result.get("outputs", []))
return {"cell_index": cell_index, "outputs": outputs}
# ---------------------------------------------------------------------------
# API publica
# ---------------------------------------------------------------------------
def jupyter_append_execute(
notebook_path: str,
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Añade una celda de codigo al final del notebook y la ejecuta.
Tanto el agente como el usuario ven la celda y su output en tiempo real
porque la escritura se realiza a traves del protocolo colaborativo de Jupyter.
Args:
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
code: Codigo Python a insertar y ejecutar.
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'cell_index' (indice de la nueva celda) y 'outputs' (lista de strings).
Raises:
Exception: si no se puede conectar al servidor o al kernel.
"""
return asyncio.run(_async_append_execute(notebook_path, code, server_url, token))
def jupyter_execute_cell(
notebook_path: str,
cell_index: int,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta una celda existente del notebook por indice.
Args:
notebook_path: Ruta al notebook relativa a la raiz del servidor Jupyter.
cell_index: Indice de la celda a ejecutar (0-based).
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'cell_index' y 'outputs' (lista de strings).
Raises:
IndexError: si cell_index esta fuera de rango.
Exception: si no se puede conectar al servidor o al kernel.
"""
return asyncio.run(_async_execute_cell(notebook_path, cell_index, server_url, token))
def jupyter_kernel_execute(
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta codigo directamente en el kernel sin modificar ningun notebook.
Util para consultas rapidas, inspeccion de variables, comprobaciones de estado.
Args:
code: Codigo Python a ejecutar en el kernel activo.
server_url: URL del servidor Jupyter; por defecto http://localhost:8888.
token: Token de autenticacion del servidor Jupyter.
Returns:
dict con 'outputs' (lista de strings) y 'status' ('ok' o 'error').
Raises:
Exception: si no se puede conectar al servidor o al kernel.
"""
with KernelClient(server_url=server_url, token=token) as kernel:
result = kernel.execute(code)
outputs = _extract_outputs(result.get("outputs", []))
return {"outputs": outputs, "status": result.get("status", "unknown")}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(
description="Ejecuta codigo en kernels de Jupyter",
)
sub = parser.add_subparsers(dest="command", required=True)
# append
p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta")
p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_append.add_argument("code", help="Codigo a insertar y ejecutar")
p_append.add_argument("--server", default="http://localhost:8888")
p_append.add_argument("--token", default="")
# cell
p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice")
p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)")
p_cell.add_argument("--server", default="http://localhost:8888")
p_cell.add_argument("--token", default="")
# kernel
p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook")
p_kernel.add_argument("code", help="Codigo a ejecutar")
p_kernel.add_argument("--server", default="http://localhost:8888")
p_kernel.add_argument("--token", default="")
args = parser.parse_args()
try:
if args.command == "append":
result = jupyter_append_execute(args.notebook, args.code, args.server, args.token)
elif args.command == "cell":
result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token)
elif args.command == "kernel":
result = jupyter_kernel_execute(args.code, args.server, args.token)
else:
parser.print_help()
sys.exit(1)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)