8c8b58c3c3
Funciones Python para interactuar con Jupyter Lab programáticamente: descubrir instancias, leer/escribir celdas, ejecutar código y gestionar kernels. Reemplazan MCP jupyter con API REST + WebSocket directa. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
294 lines
10 KiB
Python
294 lines
10 KiB
Python
"""Lee celdas de un notebook Jupyter via protocolo de colaboracion en tiempo real (CRDT/Y.js)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from typing import Any
|
|
from urllib.error import URLError
|
|
from urllib.request import Request, urlopen
|
|
|
|
from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url
|
|
|
|
|
|
def _resolve_collab_username(server_url: str, token: str) -> str:
|
|
"""Resolve the display name of the active user in Jupyter collaboration."""
|
|
headers = {"Accept": "application/json"}
|
|
if token:
|
|
headers["Authorization"] = f"token {token}"
|
|
try:
|
|
req = Request(f"{server_url}/api/me", headers=headers)
|
|
with urlopen(req, timeout=5) as resp:
|
|
me = json.loads(resp.read())
|
|
identity = me.get("identity", {})
|
|
return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous")
|
|
except (URLError, OSError, json.JSONDecodeError):
|
|
return "Anonymous"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers internos (async)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_outputs(outputs: list[dict]) -> list[str]:
|
|
"""Convierte outputs de celda en representaciones legibles."""
|
|
result = []
|
|
for out in outputs:
|
|
out_type = out.get("output_type", "")
|
|
if out_type == "stream":
|
|
text = out.get("text", "")
|
|
if isinstance(text, list):
|
|
text = "".join(text)
|
|
result.append(text.rstrip())
|
|
elif out_type in ("display_data", "execute_result"):
|
|
data = out.get("data", {})
|
|
if "text/plain" in data:
|
|
plain = data["text/plain"]
|
|
if isinstance(plain, list):
|
|
plain = "".join(plain)
|
|
result.append(plain.rstrip())
|
|
elif "text/html" in data:
|
|
result.append("[HTML Output]")
|
|
elif "image/png" in data:
|
|
result.append("[Image Output (PNG)]")
|
|
else:
|
|
result.append(f"[Output: {list(data.keys())}]")
|
|
elif out_type == "error":
|
|
traceback = out.get("traceback", [])
|
|
# Strip ANSI codes for clean text
|
|
import re
|
|
ansi_escape = re.compile(r"\x1b\[[0-9;]*m")
|
|
clean = [ansi_escape.sub("", line) for line in traceback]
|
|
result.append("\n".join(clean))
|
|
return result
|
|
|
|
|
|
def _cell_to_dict(index: int, cell: Any) -> dict:
|
|
"""Convierte una NotebookNode en un dict normalizado."""
|
|
cell_type = cell.get("cell_type", "code")
|
|
source = cell.get("source", "")
|
|
if isinstance(source, list):
|
|
source = "".join(source)
|
|
|
|
entry: dict = {
|
|
"index": index,
|
|
"type": cell_type,
|
|
"source": source,
|
|
}
|
|
|
|
if cell_type == "code":
|
|
raw_outputs = cell.get("outputs", [])
|
|
entry["outputs"] = _extract_outputs(raw_outputs)
|
|
|
|
return entry
|
|
|
|
|
|
async def _read_cells_async(
|
|
notebook_path: str,
|
|
server_url: str,
|
|
token: str,
|
|
cell_index: int | None,
|
|
) -> list[dict]:
|
|
"""Conecta al servidor Jupyter y lee las celdas del notebook."""
|
|
ws_url = get_jupyter_notebook_websocket_url(
|
|
server_url,
|
|
notebook_path,
|
|
token,
|
|
)
|
|
username = _resolve_collab_username(server_url, token)
|
|
async with NbModelClient(ws_url, username=username) as client:
|
|
await client.wait_until_synced()
|
|
total = len(client)
|
|
if cell_index is not None:
|
|
if cell_index < 0 or cell_index >= total:
|
|
raise IndexError(
|
|
f"cell_index {cell_index} fuera de rango (0-{total - 1})"
|
|
)
|
|
return [_cell_to_dict(cell_index, client[cell_index])]
|
|
return [_cell_to_dict(i, client[i]) for i in range(total)]
|
|
|
|
|
|
async def _notebook_info_async(
|
|
notebook_path: str,
|
|
server_url: str,
|
|
token: str,
|
|
) -> dict:
|
|
"""Conecta al servidor Jupyter y retorna metadata del notebook."""
|
|
ws_url = get_jupyter_notebook_websocket_url(
|
|
server_url,
|
|
notebook_path,
|
|
token,
|
|
)
|
|
username = _resolve_collab_username(server_url, token)
|
|
async with NbModelClient(ws_url, username=username) as client:
|
|
await client.wait_until_synced()
|
|
total = len(client)
|
|
counts: dict[str, int] = {}
|
|
for i in range(total):
|
|
ct = client[i].get("cell_type", "code")
|
|
counts[ct] = counts.get(ct, 0) + 1
|
|
return {
|
|
"notebook_path": notebook_path,
|
|
"server_url": server_url,
|
|
"total_cells": total,
|
|
"cell_counts": counts,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# API publica (sincrona)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def jupyter_read_cells(
|
|
notebook_path: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
cell_index: int | None = None,
|
|
) -> list[dict]:
|
|
"""Lee todas las celdas de un notebook Jupyter o una celda especifica.
|
|
|
|
Conecta via el protocolo de colaboracion en tiempo real (CRDT/Y.js) y
|
|
devuelve el estado actual del notebook incluyendo cambios no guardados.
|
|
|
|
Args:
|
|
notebook_path: Ruta relativa al notebook desde la raiz del servidor
|
|
(ej: "notebooks/analysis.ipynb").
|
|
server_url: URL base del servidor Jupyter (default http://localhost:8888).
|
|
token: Token de autenticacion del servidor Jupyter.
|
|
cell_index: Si se indica, retorna solo esa celda (0-based). Si es None,
|
|
retorna todas las celdas.
|
|
|
|
Returns:
|
|
Lista de dicts con campos:
|
|
- index (int): posicion de la celda en el notebook
|
|
- type (str): "code", "markdown" o "raw"
|
|
- source (str): contenido de la celda
|
|
- outputs (list[str]): solo para code cells; representacion legible
|
|
de cada output (stream, texto plano, [HTML Output], [Image Output (PNG)],
|
|
traceback de errores).
|
|
|
|
Raises:
|
|
IndexError: Si cell_index esta fuera del rango del notebook.
|
|
Exception: Si no se puede conectar al servidor Jupyter o al notebook.
|
|
"""
|
|
return asyncio.run(
|
|
_read_cells_async(notebook_path, server_url, token, cell_index)
|
|
)
|
|
|
|
|
|
def jupyter_notebook_info(
|
|
notebook_path: str,
|
|
server_url: str = "http://localhost:8888",
|
|
token: str = "",
|
|
) -> dict:
|
|
"""Retorna metadata de un notebook Jupyter abierto.
|
|
|
|
Args:
|
|
notebook_path: Ruta relativa al notebook desde la raiz del servidor.
|
|
server_url: URL base del servidor Jupyter.
|
|
token: Token de autenticacion.
|
|
|
|
Returns:
|
|
Dict con:
|
|
- notebook_path (str): ruta del notebook
|
|
- server_url (str): URL del servidor
|
|
- total_cells (int): numero total de celdas
|
|
- cell_counts (dict): conteo por tipo {"code": N, "markdown": M, ...}
|
|
|
|
Raises:
|
|
Exception: Si no se puede conectar al servidor Jupyter o al notebook.
|
|
"""
|
|
return asyncio.run(
|
|
_notebook_info_async(notebook_path, server_url, token)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _format_readable(cells: list[dict]) -> str:
|
|
"""Formatea celdas en texto legible con preview de hasta 8 lineas."""
|
|
lines = []
|
|
for cell in cells:
|
|
header = f"[{cell['index']}] {cell['type'].upper()}"
|
|
lines.append(header)
|
|
lines.append("-" * len(header))
|
|
source_lines = cell["source"].splitlines()
|
|
preview = source_lines[:8]
|
|
lines.extend(preview)
|
|
if len(source_lines) > 8:
|
|
lines.append(f"... ({len(source_lines) - 8} lineas mas)")
|
|
if "outputs" in cell and cell["outputs"]:
|
|
lines.append(" -> outputs:")
|
|
for out in cell["outputs"]:
|
|
out_preview = out.splitlines()[:4]
|
|
for ol in out_preview:
|
|
lines.append(f" {ol}")
|
|
if len(out.splitlines()) > 4:
|
|
lines.append(" ...")
|
|
lines.append("")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main() -> None:
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Lee celdas de un notebook Jupyter via CRDT/Y.js"
|
|
)
|
|
parser.add_argument("notebook", help="Ruta del notebook relativa al servidor")
|
|
parser.add_argument(
|
|
"--server",
|
|
default="http://localhost:8888",
|
|
help="URL del servidor Jupyter (default: http://localhost:8888)",
|
|
)
|
|
parser.add_argument("--token", default="", help="Token de autenticacion")
|
|
parser.add_argument(
|
|
"--cell",
|
|
type=int,
|
|
default=None,
|
|
metavar="INDEX",
|
|
help="Indice de celda especifica (0-based)",
|
|
)
|
|
parser.add_argument(
|
|
"--info",
|
|
action="store_true",
|
|
help="Mostrar solo metadata del notebook",
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
dest="as_json",
|
|
help="Salida en formato JSON",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
if args.info:
|
|
result = jupyter_notebook_info(args.notebook, args.server, args.token)
|
|
if args.as_json:
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
else:
|
|
print(f"Notebook: {result['notebook_path']}")
|
|
print(f"Servidor: {result['server_url']}")
|
|
print(f"Total celdas: {result['total_cells']}")
|
|
for ct, count in result["cell_counts"].items():
|
|
print(f" {ct}: {count}")
|
|
else:
|
|
cells = jupyter_read_cells(
|
|
args.notebook, args.server, args.token, args.cell
|
|
)
|
|
if args.as_json:
|
|
print(json.dumps(cells, ensure_ascii=False, indent=2))
|
|
else:
|
|
print(_format_readable(cells))
|
|
except Exception as exc:
|
|
print(f"Error: {exc}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|