"""Lee celdas de un notebook Jupyter via protocolo de colaboracion en tiempo real (CRDT/Y.js).""" from __future__ import annotations import asyncio import json import sys from typing import Any from urllib.error import URLError from urllib.request import Request, urlopen from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url def _resolve_collab_username(server_url: str, token: str) -> str: """Resolve the display name of the active user in Jupyter collaboration.""" headers = {"Accept": "application/json"} if token: headers["Authorization"] = f"token {token}" try: req = Request(f"{server_url}/api/me", headers=headers) with urlopen(req, timeout=5) as resp: me = json.loads(resp.read()) identity = me.get("identity", {}) return identity.get("display_name", "") or identity.get("username", "") or identity.get("name", "Anonymous") except (URLError, OSError, json.JSONDecodeError): return "Anonymous" # --------------------------------------------------------------------------- # Helpers internos (async) # --------------------------------------------------------------------------- def _extract_outputs(outputs: list[dict]) -> list[str]: """Convierte outputs de celda en representaciones legibles.""" result = [] for out in outputs: out_type = out.get("output_type", "") if out_type == "stream": text = out.get("text", "") if isinstance(text, list): text = "".join(text) result.append(text.rstrip()) elif out_type in ("display_data", "execute_result"): data = out.get("data", {}) if "text/plain" in data: plain = data["text/plain"] if isinstance(plain, list): plain = "".join(plain) result.append(plain.rstrip()) elif "text/html" in data: result.append("[HTML Output]") elif "image/png" in data: result.append("[Image Output (PNG)]") else: result.append(f"[Output: {list(data.keys())}]") elif out_type == "error": traceback = out.get("traceback", []) # Strip ANSI codes for clean text import re ansi_escape = re.compile(r"\x1b\[[0-9;]*m") clean = [ansi_escape.sub("", line) for line in traceback] result.append("\n".join(clean)) return result def _cell_to_dict(index: int, cell: Any) -> dict: """Convierte una NotebookNode en un dict normalizado.""" cell_type = cell.get("cell_type", "code") source = cell.get("source", "") if isinstance(source, list): source = "".join(source) entry: dict = { "index": index, "type": cell_type, "source": source, } if cell_type == "code": raw_outputs = cell.get("outputs", []) entry["outputs"] = _extract_outputs(raw_outputs) return entry async def _read_cells_async( notebook_path: str, server_url: str, token: str, cell_index: int | None, ) -> list[dict]: """Conecta al servidor Jupyter y lee las celdas del notebook.""" ws_url = get_jupyter_notebook_websocket_url( server_url, notebook_path, token, ) username = _resolve_collab_username(server_url, token) async with NbModelClient(ws_url, username=username) as client: await client.wait_until_synced() total = len(client) if cell_index is not None: if cell_index < 0 or cell_index >= total: raise IndexError( f"cell_index {cell_index} fuera de rango (0-{total - 1})" ) return [_cell_to_dict(cell_index, client[cell_index])] return [_cell_to_dict(i, client[i]) for i in range(total)] async def _notebook_info_async( notebook_path: str, server_url: str, token: str, ) -> dict: """Conecta al servidor Jupyter y retorna metadata del notebook.""" ws_url = get_jupyter_notebook_websocket_url( server_url, notebook_path, token, ) username = _resolve_collab_username(server_url, token) async with NbModelClient(ws_url, username=username) as client: await client.wait_until_synced() total = len(client) counts: dict[str, int] = {} for i in range(total): ct = client[i].get("cell_type", "code") counts[ct] = counts.get(ct, 0) + 1 return { "notebook_path": notebook_path, "server_url": server_url, "total_cells": total, "cell_counts": counts, } # --------------------------------------------------------------------------- # API publica (sincrona) # --------------------------------------------------------------------------- def jupyter_read_cells( notebook_path: str, server_url: str = "http://localhost:8888", token: str = "", cell_index: int | None = None, ) -> list[dict]: """Lee todas las celdas de un notebook Jupyter o una celda especifica. Conecta via el protocolo de colaboracion en tiempo real (CRDT/Y.js) y devuelve el estado actual del notebook incluyendo cambios no guardados. Args: notebook_path: Ruta relativa al notebook desde la raiz del servidor (ej: "notebooks/analysis.ipynb"). server_url: URL base del servidor Jupyter (default http://localhost:8888). token: Token de autenticacion del servidor Jupyter. cell_index: Si se indica, retorna solo esa celda (0-based). Si es None, retorna todas las celdas. Returns: Lista de dicts con campos: - index (int): posicion de la celda en el notebook - type (str): "code", "markdown" o "raw" - source (str): contenido de la celda - outputs (list[str]): solo para code cells; representacion legible de cada output (stream, texto plano, [HTML Output], [Image Output (PNG)], traceback de errores). Raises: IndexError: Si cell_index esta fuera del rango del notebook. Exception: Si no se puede conectar al servidor Jupyter o al notebook. """ return asyncio.run( _read_cells_async(notebook_path, server_url, token, cell_index) ) def jupyter_notebook_info( notebook_path: str, server_url: str = "http://localhost:8888", token: str = "", ) -> dict: """Retorna metadata de un notebook Jupyter abierto. Args: notebook_path: Ruta relativa al notebook desde la raiz del servidor. server_url: URL base del servidor Jupyter. token: Token de autenticacion. Returns: Dict con: - notebook_path (str): ruta del notebook - server_url (str): URL del servidor - total_cells (int): numero total de celdas - cell_counts (dict): conteo por tipo {"code": N, "markdown": M, ...} Raises: Exception: Si no se puede conectar al servidor Jupyter o al notebook. """ return asyncio.run( _notebook_info_async(notebook_path, server_url, token) ) # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _format_readable(cells: list[dict]) -> str: """Formatea celdas en texto legible con preview de hasta 8 lineas.""" lines = [] for cell in cells: header = f"[{cell['index']}] {cell['type'].upper()}" lines.append(header) lines.append("-" * len(header)) source_lines = cell["source"].splitlines() preview = source_lines[:8] lines.extend(preview) if len(source_lines) > 8: lines.append(f"... ({len(source_lines) - 8} lineas mas)") if "outputs" in cell and cell["outputs"]: lines.append(" -> outputs:") for out in cell["outputs"]: out_preview = out.splitlines()[:4] for ol in out_preview: lines.append(f" {ol}") if len(out.splitlines()) > 4: lines.append(" ...") lines.append("") return "\n".join(lines) def main() -> None: import argparse parser = argparse.ArgumentParser( description="Lee celdas de un notebook Jupyter via CRDT/Y.js" ) parser.add_argument("notebook", help="Ruta del notebook relativa al servidor") parser.add_argument( "--server", default="http://localhost:8888", help="URL del servidor Jupyter (default: http://localhost:8888)", ) parser.add_argument("--token", default="", help="Token de autenticacion") parser.add_argument( "--cell", type=int, default=None, metavar="INDEX", help="Indice de celda especifica (0-based)", ) parser.add_argument( "--info", action="store_true", help="Mostrar solo metadata del notebook", ) parser.add_argument( "--json", action="store_true", dest="as_json", help="Salida en formato JSON", ) args = parser.parse_args() try: if args.info: result = jupyter_notebook_info(args.notebook, args.server, args.token) if args.as_json: print(json.dumps(result, ensure_ascii=False, indent=2)) else: print(f"Notebook: {result['notebook_path']}") print(f"Servidor: {result['server_url']}") print(f"Total celdas: {result['total_cells']}") for ct, count in result["cell_counts"].items(): print(f" {ct}: {count}") else: cells = jupyter_read_cells( args.notebook, args.server, args.token, args.cell ) if args.as_json: print(json.dumps(cells, ensure_ascii=False, indent=2)) else: print(_format_readable(cells)) except Exception as exc: print(f"Error: {exc}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()