Files
fn_registry/python/functions/notebook/jupyter_exec.py
T
egutierrez 5194de3c04 feat: cierra issues 0050 y 0052 + commands automáticos
- 0050: jupyter_exec reescrito sin Y.js (REST + KernelClient). Bug raíz adicional: HEAD /api/contents da 405 → cambiado a GET. 9 tests (5 unit + 4 e2e).
- 0052: footprint_aurgi cerrado. Bug fix en setup_geo_stack_docker_pipeline (verify aborta si compose up falla; nombre de contenedor incorrecto).
- Nueva primitiva docker_container_running_py_infra (7 tests).
- /full-git-push y /full-git-pull pasan a modo automático: auto-commit + push sin preguntar, aborta solo si detecta secrets.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:34:03 +02:00

316 lines
12 KiB
Python

"""Ejecuta codigo en kernels de Jupyter.
Tres modos:
- append: añade una celda al final del notebook y la ejecuta
- cell: ejecuta una celda existente por indice
- kernel: ejecuta codigo directamente en el kernel sin tocar notebook
Implementacion basada en REST `/api/contents` + `KernelClient` (websocket clasico
al kernel). NO usa `jupyter_nbmodel_client` ni el canal colaborativo Y.js, por lo
que es robusto frente a versiones nuevas de `jupyter-collaboration` (ver issue
0050). Trade-off: los cambios al notebook se persisten a disco; Jupyter Lab los
detecta via file watch (puede pedir 'Revert to disk' o 'Overwrite' segun version).
"""
import json
import uuid
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
from jupyter_kernel_client import KernelClient
# ---------------------------------------------------------------------------
# Helpers REST
# ---------------------------------------------------------------------------
def _auth_headers(token: str, content_type: bool = False) -> dict[str, str]:
headers = {"Accept": "application/json"}
if content_type:
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"token {token}"
return headers
def _notebook_exists(notebook_path: str, server_url: str, token: str) -> bool:
"""Comprueba si un notebook existe via GET /api/contents (con `content=0`).
Nota: Jupyter Server no soporta HEAD en /api/contents (responde 405). Usamos
GET con content=0 para evitar transferir el cuerpo completo.
"""
check_url = f"{server_url}/api/contents/{notebook_path}?content=0"
req = Request(check_url, headers=_auth_headers(token), method="GET")
try:
with urlopen(req, timeout=5):
return True
except HTTPError as e:
if e.code == 404:
return False
raise
def _create_notebook(notebook_path: str, server_url: str, token: str, kernel_name: str = "python3") -> None:
"""Crea un notebook vacio via PUT /api/contents si no existe."""
if _notebook_exists(notebook_path, server_url, token):
return
kernel_display = {"python3": "Python 3 (ipykernel)", "python": "Python 3"}.get(kernel_name, kernel_name)
notebook_content = {
"nbformat": 4,
"nbformat_minor": 5,
"metadata": {
"kernelspec": {"name": kernel_name, "display_name": kernel_display, "language": "python"},
"language_info": {"name": "python"},
},
"cells": [],
}
body = json.dumps({"type": "notebook", "content": notebook_content}).encode("utf-8")
url = f"{server_url}/api/contents/{notebook_path}"
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
with urlopen(req, timeout=10) as resp:
resp.read()
def _get_notebook_content(notebook_path: str, server_url: str, token: str) -> dict:
"""Lee el notebook completo via GET /api/contents (con `content`)."""
url = f"{server_url}/api/contents/{notebook_path}?content=1&type=notebook"
req = Request(url, headers=_auth_headers(token), method="GET")
with urlopen(req, timeout=10) as resp:
return json.loads(resp.read())
def _put_notebook_content(notebook_path: str, server_url: str, token: str, content: dict) -> None:
"""Sobrescribe el notebook via PUT /api/contents."""
body = json.dumps({"type": "notebook", "format": "json", "content": content}).encode("utf-8")
url = f"{server_url}/api/contents/{notebook_path}"
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
with urlopen(req, timeout=10) as resp:
resp.read()
def _ensure_session(server_url: str, token: str, notebook_path: str, kernel_name: str = "python3") -> str:
"""Garantiza una sesion para el notebook. Retorna kernel_id.
Si existe una sesion vinculada al notebook, reusa su kernel. Si no, crea
sesion+kernel via POST /api/sessions.
"""
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
if kernel_id:
return kernel_id
body = json.dumps({
"path": notebook_path,
"type": "notebook",
"kernel": {"name": kernel_name},
}).encode("utf-8")
url = f"{server_url}/api/sessions"
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="POST")
with urlopen(req, timeout=10) as resp:
session = json.loads(resp.read())
return session.get("kernel", {}).get("id", "")
def _api_get(url: str, token: str = "") -> dict | list | None:
try:
req = Request(url, headers=_auth_headers(token))
with urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except (URLError, OSError, json.JSONDecodeError):
return None
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
"""Busca el kernel_id de la sesion del notebook via /api/sessions."""
sessions = _api_get(f"{server_url}/api/sessions", token) or []
for session in sessions:
nb = session.get("notebook", session.get("path", {}))
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
if nb_path == notebook_path:
kernel = session.get("kernel", {})
return kernel.get("id")
return None
# ---------------------------------------------------------------------------
# Helpers nbformat
# ---------------------------------------------------------------------------
def _new_code_cell(source: str) -> dict:
"""Crea un dict de celda de codigo nbformat 4.5 con todos los campos."""
return {
"id": str(uuid.uuid4()),
"cell_type": "code",
"metadata": {},
"source": source,
"outputs": [],
"execution_count": None,
}
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
"""Convierte outputs de nbformat a lista de strings legibles."""
result: list[str] = []
for output in raw_outputs:
output_type = output.get("output_type", "")
if output_type == "stream":
text = output.get("text", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type in ("display_data", "execute_result"):
data = output.get("data", {})
text = data.get("text/plain", "")
if isinstance(text, list):
text = "".join(text)
result.append(text.rstrip("\n"))
elif output_type == "error":
traceback = output.get("traceback", [])
result.append("\n".join(traceback))
return result
def _kernel_outputs_to_nbformat(outputs: list[dict]) -> list[dict]:
"""Normaliza outputs de KernelClient al esquema nbformat 4.
KernelClient ya devuelve dicts con `output_type`, pero algunos casos (errores,
streams) pueden venir con campos sueltos. Esta funcion los pasa tal cual: el
cliente actual cumple el esquema; existe como punto de extension futuro.
"""
return [dict(o) for o in outputs]
# ---------------------------------------------------------------------------
# Modos
# ---------------------------------------------------------------------------
def jupyter_append_execute(
notebook_path: str,
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Añade una celda de codigo al final del notebook y la ejecuta.
Persiste la celda + outputs a disco via REST `/api/contents`. Jupyter Lab
detecta el cambio en el filesystem y lo refleja en el browser (puede pedir
'Revert to disk' segun version y conflictos).
"""
_create_notebook(notebook_path, server_url, token)
kernel_id = _ensure_session(server_url, token, notebook_path)
# Lee notebook, añade celda nueva
file_node = _get_notebook_content(notebook_path, server_url, token)
nb = file_node["content"]
nb.setdefault("cells", [])
new_cell = _new_code_cell(code)
nb["cells"].append(new_cell)
cell_index = len(nb["cells"]) - 1
# Ejecuta en el kernel del notebook
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
result = kernel.execute(code)
raw_outputs = result.get("outputs", [])
new_cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
new_cell["execution_count"] = result.get("execution_count")
_put_notebook_content(notebook_path, server_url, token, nb)
return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)}
def jupyter_execute_cell(
notebook_path: str,
cell_index: int,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta una celda existente por indice y persiste sus outputs."""
kernel_id = _ensure_session(server_url, token, notebook_path)
file_node = _get_notebook_content(notebook_path, server_url, token)
nb = file_node["content"]
cells = nb.get("cells", [])
if cell_index < 0 or cell_index >= len(cells):
raise IndexError(f"cell_index {cell_index} fuera de rango (notebook tiene {len(cells)} celdas)")
cell = cells[cell_index]
if cell.get("cell_type") != "code":
raise ValueError(f"La celda {cell_index} no es de codigo (cell_type={cell.get('cell_type')!r})")
source = cell.get("source", "")
if isinstance(source, list):
source = "".join(source)
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
result = kernel.execute(source)
raw_outputs = result.get("outputs", [])
cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
cell["execution_count"] = result.get("execution_count")
_put_notebook_content(notebook_path, server_url, token, nb)
return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)}
def jupyter_kernel_execute(
code: str,
server_url: str = "http://localhost:8888",
token: str = "",
) -> dict[str, Any]:
"""Ejecuta codigo directo en el kernel sin tocar ningun notebook."""
with KernelClient(server_url=server_url, token=token) as kernel:
result = kernel.execute(code)
outputs = _extract_outputs(result.get("outputs", []))
return {"outputs": outputs, "status": result.get("status", "unknown")}
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
if __name__ == "__main__":
import argparse
import sys
parser = argparse.ArgumentParser(description="Ejecuta codigo en kernels de Jupyter")
sub = parser.add_subparsers(dest="command", required=True)
p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta")
p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_append.add_argument("code", help="Codigo a insertar y ejecutar")
p_append.add_argument("--server", default="http://localhost:8888")
p_append.add_argument("--token", default="")
p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice")
p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor")
p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)")
p_cell.add_argument("--server", default="http://localhost:8888")
p_cell.add_argument("--token", default="")
p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook")
p_kernel.add_argument("code", help="Codigo a ejecutar")
p_kernel.add_argument("--server", default="http://localhost:8888")
p_kernel.add_argument("--token", default="")
args = parser.parse_args()
try:
if args.command == "append":
result = jupyter_append_execute(args.notebook, args.code, args.server, args.token)
elif args.command == "cell":
result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token)
elif args.command == "kernel":
result = jupyter_kernel_execute(args.code, args.server, args.token)
else:
parser.print_help()
sys.exit(1)
print(json.dumps(result, ensure_ascii=False, indent=2))
except Exception as exc:
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
sys.exit(1)