"""Test: ejecutar celda en thread para que Y.js sincronice en tiempo real.""" import asyncio import json import sys from functools import partial from urllib.request import Request, urlopen from jupyter_kernel_client import KernelClient from jupyter_nbmodel_client import NbModelClient, get_jupyter_notebook_websocket_url def _api_get(url, token=""): headers = {"Accept": "application/json"} if token: headers["Authorization"] = f"token {token}" req = Request(url, headers=headers) with urlopen(req, timeout=5) as resp: return json.loads(resp.read()) def _resolve_kernel_id(server_url, token, notebook_path): sessions = _api_get(f"{server_url}/api/sessions", token) or [] for s in sessions: nb = s.get("notebook", s.get("path", {})) p = nb.get("path", nb) if isinstance(nb, dict) else str(nb) if p == notebook_path: return s.get("kernel", {}).get("id") return None async def execute_realtime(notebook_path, cell_index, server_url="http://localhost:8889", token=""): ws_url = get_jupyter_notebook_websocket_url(server_url, notebook_path, token or None) kernel_id = _resolve_kernel_id(server_url, token, notebook_path) me = _api_get(f"{server_url}/api/me", token) username = me.get("identity", {}).get("display_name", "Anonymous") if me else "Anonymous" async with NbModelClient(ws_url, username=username) as nb: await nb.wait_until_synced() with KernelClient(server_url=server_url, token=token, kernel_id=kernel_id) as kernel: # KEY FIX: run blocking execute_cell in a thread so the event loop # keeps running and Y.js can sync outputs to the browser in real-time loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, partial(nb.execute_cell, cell_index, kernel), ) # Give Y.js a moment to flush final state await asyncio.sleep(2) return result if __name__ == "__main__": notebook = sys.argv[1] if len(sys.argv) > 1 else "notebooks/01_test_sleep.ipynb" cell = int(sys.argv[2]) if len(sys.argv) > 2 else 3 print(f"Executing cell {cell} of {notebook} (watch your browser!)...") result = asyncio.run(execute_realtime(notebook, cell)) print(json.dumps(result, ensure_ascii=False, indent=2))