5194de3c04
- 0050: jupyter_exec reescrito sin Y.js (REST + KernelClient). Bug raíz adicional: HEAD /api/contents da 405 → cambiado a GET. 9 tests (5 unit + 4 e2e). - 0052: footprint_aurgi cerrado. Bug fix en setup_geo_stack_docker_pipeline (verify aborta si compose up falla; nombre de contenedor incorrecto). - Nueva primitiva docker_container_running_py_infra (7 tests). - /full-git-push y /full-git-pull pasan a modo automático: auto-commit + push sin preguntar, aborta solo si detecta secrets. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
316 lines
12 KiB
Python
316 lines
12 KiB
Python
"""Ejecuta codigo en kernels de Jupyter.
|
|
|
|
Tres modos:
|
|
- append: añade una celda al final del notebook y la ejecuta
|
|
- cell: ejecuta una celda existente por indice
|
|
- kernel: ejecuta codigo directamente en el kernel sin tocar notebook
|
|
|
|
Implementacion basada en REST `/api/contents` + `KernelClient` (websocket clasico
|
|
al kernel). NO usa `jupyter_nbmodel_client` ni el canal colaborativo Y.js, por lo
|
|
que es robusto frente a versiones nuevas de `jupyter-collaboration` (ver issue
|
|
0050). Trade-off: los cambios al notebook se persisten a disco; Jupyter Lab los
|
|
detecta via file watch (puede pedir 'Revert to disk' o 'Overwrite' segun version).
|
|
"""
|
|
|
|
import json
|
|
import uuid
|
|
from typing import Any
|
|
from urllib.error import HTTPError, URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from jupyter_kernel_client import KernelClient
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers REST
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _auth_headers(token: str, content_type: bool = False) -> dict[str, str]:
|
|
headers = {"Accept": "application/json"}
|
|
if content_type:
|
|
headers["Content-Type"] = "application/json"
|
|
if token:
|
|
headers["Authorization"] = f"token {token}"
|
|
return headers
|
|
|
|
|
|
def _notebook_exists(notebook_path: str, server_url: str, token: str) -> bool:
|
|
"""Comprueba si un notebook existe via GET /api/contents (con `content=0`).
|
|
|
|
Nota: Jupyter Server no soporta HEAD en /api/contents (responde 405). Usamos
|
|
GET con content=0 para evitar transferir el cuerpo completo.
|
|
"""
|
|
check_url = f"{server_url}/api/contents/{notebook_path}?content=0"
|
|
req = Request(check_url, headers=_auth_headers(token), method="GET")
|
|
try:
|
|
with urlopen(req, timeout=5):
|
|
return True
|
|
except HTTPError as e:
|
|
if e.code == 404:
|
|
return False
|
|
raise
|
|
|
|
|
|
def _create_notebook(notebook_path: str, server_url: str, token: str, kernel_name: str = "python3") -> None:
|
|
"""Crea un notebook vacio via PUT /api/contents si no existe."""
|
|
if _notebook_exists(notebook_path, server_url, token):
|
|
return
|
|
kernel_display = {"python3": "Python 3 (ipykernel)", "python": "Python 3"}.get(kernel_name, kernel_name)
|
|
notebook_content = {
|
|
"nbformat": 4,
|
|
"nbformat_minor": 5,
|
|
"metadata": {
|
|
"kernelspec": {"name": kernel_name, "display_name": kernel_display, "language": "python"},
|
|
"language_info": {"name": "python"},
|
|
},
|
|
"cells": [],
|
|
}
|
|
body = json.dumps({"type": "notebook", "content": notebook_content}).encode("utf-8")
|
|
url = f"{server_url}/api/contents/{notebook_path}"
|
|
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
|
|
with urlopen(req, timeout=10) as resp:
|
|
resp.read()
|
|
|
|
|
|
def _get_notebook_content(notebook_path: str, server_url: str, token: str) -> dict:
|
|
"""Lee el notebook completo via GET /api/contents (con `content`)."""
|
|
url = f"{server_url}/api/contents/{notebook_path}?content=1&type=notebook"
|
|
req = Request(url, headers=_auth_headers(token), method="GET")
|
|
with urlopen(req, timeout=10) as resp:
|
|
return json.loads(resp.read())
|
|
|
|
|
|
def _put_notebook_content(notebook_path: str, server_url: str, token: str, content: dict) -> None:
|
|
"""Sobrescribe el notebook via PUT /api/contents."""
|
|
body = json.dumps({"type": "notebook", "format": "json", "content": content}).encode("utf-8")
|
|
url = f"{server_url}/api/contents/{notebook_path}"
|
|
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="PUT")
|
|
with urlopen(req, timeout=10) as resp:
|
|
resp.read()
|
|
|
|
|
|
def _ensure_session(server_url: str, token: str, notebook_path: str, kernel_name: str = "python3") -> str:
|
|
"""Garantiza una sesion para el notebook. Retorna kernel_id.
|
|
|
|
Si existe una sesion vinculada al notebook, reusa su kernel. Si no, crea
|
|
sesion+kernel via POST /api/sessions.
|
|
"""
|
|
kernel_id = _resolve_kernel_id(server_url, token, notebook_path)
|
|
if kernel_id:
|
|
return kernel_id
|
|
|
|
body = json.dumps({
|
|
"path": notebook_path,
|
|
"type": "notebook",
|
|
"kernel": {"name": kernel_name},
|
|
}).encode("utf-8")
|
|
url = f"{server_url}/api/sessions"
|
|
req = Request(url, data=body, headers=_auth_headers(token, content_type=True), method="POST")
|
|
with urlopen(req, timeout=10) as resp:
|
|
session = json.loads(resp.read())
|
|
return session.get("kernel", {}).get("id", "")
|
|
|
|
|
|
def _api_get(url: str, token: str = "") -> dict | list | None:
|
|
try:
|
|
req = Request(url, headers=_auth_headers(token))
|
|
with urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read())
|
|
except (URLError, OSError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def _resolve_kernel_id(server_url: str, token: str, notebook_path: str) -> str | None:
|
|
"""Busca el kernel_id de la sesion del notebook via /api/sessions."""
|
|
sessions = _api_get(f"{server_url}/api/sessions", token) or []
|
|
for session in sessions:
|
|
nb = session.get("notebook", session.get("path", {}))
|
|
nb_path = nb.get("path", nb) if isinstance(nb, dict) else str(nb)
|
|
if nb_path == notebook_path:
|
|
kernel = session.get("kernel", {})
|
|
return kernel.get("id")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers nbformat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _new_code_cell(source: str) -> dict:
|
|
"""Crea un dict de celda de codigo nbformat 4.5 con todos los campos."""
|
|
return {
|
|
"id": str(uuid.uuid4()),
|
|
"cell_type": "code",
|
|
"metadata": {},
|
|
"source": source,
|
|
"outputs": [],
|
|
"execution_count": None,
|
|
}
|
|
|
|
|
|
def _extract_outputs(raw_outputs: list[dict]) -> list[str]:
|
|
"""Convierte outputs de nbformat a lista de strings legibles."""
|
|
result: list[str] = []
|
|
for output in raw_outputs:
|
|
output_type = output.get("output_type", "")
|
|
if output_type == "stream":
|
|
text = output.get("text", "")
|
|
if isinstance(text, list):
|
|
text = "".join(text)
|
|
result.append(text.rstrip("\n"))
|
|
elif output_type in ("display_data", "execute_result"):
|
|
data = output.get("data", {})
|
|
text = data.get("text/plain", "")
|
|
if isinstance(text, list):
|
|
text = "".join(text)
|
|
result.append(text.rstrip("\n"))
|
|
elif output_type == "error":
|
|
traceback = output.get("traceback", [])
|
|
result.append("\n".join(traceback))
|
|
return result
|
|
|
|
|
|
def _kernel_outputs_to_nbformat(outputs: list[dict]) -> list[dict]:
|
|
"""Normaliza outputs de KernelClient al esquema nbformat 4.
|
|
|
|
KernelClient ya devuelve dicts con `output_type`, pero algunos casos (errores,
|
|
streams) pueden venir con campos sueltos. Esta funcion los pasa tal cual: el
|
|
cliente actual cumple el esquema; existe como punto de extension futuro.
|
|
"""
|
|
return [dict(o) for o in outputs]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modos
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def jupyter_append_execute(
|
|
notebook_path: str,
|
|
code: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Añade una celda de codigo al final del notebook y la ejecuta.
|
|
|
|
Persiste la celda + outputs a disco via REST `/api/contents`. Jupyter Lab
|
|
detecta el cambio en el filesystem y lo refleja en el browser (puede pedir
|
|
'Revert to disk' segun version y conflictos).
|
|
"""
|
|
_create_notebook(notebook_path, server_url, token)
|
|
kernel_id = _ensure_session(server_url, token, notebook_path)
|
|
|
|
# Lee notebook, añade celda nueva
|
|
file_node = _get_notebook_content(notebook_path, server_url, token)
|
|
nb = file_node["content"]
|
|
nb.setdefault("cells", [])
|
|
new_cell = _new_code_cell(code)
|
|
nb["cells"].append(new_cell)
|
|
cell_index = len(nb["cells"]) - 1
|
|
|
|
# Ejecuta en el kernel del notebook
|
|
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
|
|
result = kernel.execute(code)
|
|
|
|
raw_outputs = result.get("outputs", [])
|
|
new_cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
|
|
new_cell["execution_count"] = result.get("execution_count")
|
|
|
|
_put_notebook_content(notebook_path, server_url, token, nb)
|
|
return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)}
|
|
|
|
|
|
def jupyter_execute_cell(
|
|
notebook_path: str,
|
|
cell_index: int,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Ejecuta una celda existente por indice y persiste sus outputs."""
|
|
kernel_id = _ensure_session(server_url, token, notebook_path)
|
|
|
|
file_node = _get_notebook_content(notebook_path, server_url, token)
|
|
nb = file_node["content"]
|
|
cells = nb.get("cells", [])
|
|
if cell_index < 0 or cell_index >= len(cells):
|
|
raise IndexError(f"cell_index {cell_index} fuera de rango (notebook tiene {len(cells)} celdas)")
|
|
|
|
cell = cells[cell_index]
|
|
if cell.get("cell_type") != "code":
|
|
raise ValueError(f"La celda {cell_index} no es de codigo (cell_type={cell.get('cell_type')!r})")
|
|
|
|
source = cell.get("source", "")
|
|
if isinstance(source, list):
|
|
source = "".join(source)
|
|
|
|
with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel:
|
|
result = kernel.execute(source)
|
|
|
|
raw_outputs = result.get("outputs", [])
|
|
cell["outputs"] = _kernel_outputs_to_nbformat(raw_outputs)
|
|
cell["execution_count"] = result.get("execution_count")
|
|
|
|
_put_notebook_content(notebook_path, server_url, token, nb)
|
|
return {"cell_index": cell_index, "outputs": _extract_outputs(raw_outputs)}
|
|
|
|
|
|
def jupyter_kernel_execute(
|
|
code: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict[str, Any]:
|
|
"""Ejecuta codigo directo en el kernel sin tocar ningun notebook."""
|
|
with KernelClient(server_url=server_url, token=token) as kernel:
|
|
result = kernel.execute(code)
|
|
outputs = _extract_outputs(result.get("outputs", []))
|
|
return {"outputs": outputs, "status": result.get("status", "unknown")}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
import sys
|
|
|
|
parser = argparse.ArgumentParser(description="Ejecuta codigo en kernels de Jupyter")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
p_append = sub.add_parser("append", help="Añade celda al notebook y la ejecuta")
|
|
p_append.add_argument("notebook", help="Ruta al notebook relativa al servidor")
|
|
p_append.add_argument("code", help="Codigo a insertar y ejecutar")
|
|
p_append.add_argument("--server", default="http://localhost:8888")
|
|
p_append.add_argument("--token", default="")
|
|
|
|
p_cell = sub.add_parser("cell", help="Ejecuta celda existente por indice")
|
|
p_cell.add_argument("notebook", help="Ruta al notebook relativa al servidor")
|
|
p_cell.add_argument("index", type=int, help="Indice de la celda (0-based)")
|
|
p_cell.add_argument("--server", default="http://localhost:8888")
|
|
p_cell.add_argument("--token", default="")
|
|
|
|
p_kernel = sub.add_parser("kernel", help="Ejecuta codigo en el kernel sin tocar notebook")
|
|
p_kernel.add_argument("code", help="Codigo a ejecutar")
|
|
p_kernel.add_argument("--server", default="http://localhost:8888")
|
|
p_kernel.add_argument("--token", default="")
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
if args.command == "append":
|
|
result = jupyter_append_execute(args.notebook, args.code, args.server, args.token)
|
|
elif args.command == "cell":
|
|
result = jupyter_execute_cell(args.notebook, args.index, args.server, args.token)
|
|
elif args.command == "kernel":
|
|
result = jupyter_kernel_execute(args.code, args.server, args.token)
|
|
else:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
except Exception as exc:
|
|
print(json.dumps({"error": str(exc)}, ensure_ascii=False), file=sys.stderr)
|
|
sys.exit(1)
|