"""Descubrimiento de instancias Jupyter Lab activas via API REST.""" import json import os import urllib.error import urllib.request from pathlib import Path _DEFAULT_PORTS = [8888, 8889, 8890, 8891, 8892] def _get(url: str, timeout: float = 2.0) -> dict | list | None: """Hace GET a url y retorna el JSON parseado, o None si falla.""" try: with urllib.request.urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode()) except Exception: return None def _is_collaborative(config: dict | list | None) -> bool: """Detecta si el servidor tiene jupyter-collaboration/YDocExtension activo.""" if not isinstance(config, dict): return False # Jupyter Lab expone la config de extensiones bajo claves como # 'LabApp' o similares; la presencia de 'collaborative' o 'YDocExtension' # en cualquier valor de primer nivel indica modo colaborativo. raw = json.dumps(config).lower() return "ydocextension" in raw or "collaborative" in raw def _find_jupyter_pid_for_port(port: int) -> int | None: """Busca en /proc el PID del proceso jupyter que escucha en el puerto dado. Solo funciona en Linux (donde /proc existe). Retorna None si no encuentra el proceso o si /proc no esta disponible. """ proc_dir = Path("/proc") if not proc_dir.is_dir(): return None for pid_entry in proc_dir.iterdir(): if not pid_entry.name.isdigit(): continue cmdline_path = pid_entry / "cmdline" try: raw = cmdline_path.read_bytes().decode("utf-8", errors="replace") except OSError: continue if "jupyter" not in raw: continue # Los argumentos estan separados por \0 en /proc/pid/cmdline parts = raw.split("\0") port_found = False for part in parts: if part in (f"--port={port}", f"--ServerApp.port={port}"): port_found = True break # Para el puerto default 8888, el proceso puede no tener --port explícito. # En ese caso verificamos que sea un proceso jupyter y no tenga otro puerto. if not port_found and port == 8888: has_other_port = any( p.startswith("--port=") or p.startswith("--ServerApp.port=") for p in parts ) if not has_other_port and any("jupyter" in p for p in parts): port_found = True if port_found: try: return int(pid_entry.name) except ValueError: continue return None def _get_root_dir_from_proc(pid: int) -> str: """Extrae el root_dir del proceso Jupyter a partir de su cmdline en /proc. Busca --ServerApp.root_dir= o --notebook-dir= en los argumentos del proceso. Si no los encuentra, usa el cwd del proceso como fallback. Retorna cadena vacia si no puede leer /proc. """ try: cmdline_path = f"/proc/{pid}/cmdline" with open(cmdline_path, "rb") as f: parts = f.read().decode("utf-8", errors="replace").split("\0") for part in parts: if part.startswith("--ServerApp.root_dir="): return part.split("=", 1)[1].rstrip("/") if part.startswith("--notebook-dir="): return part.split("=", 1)[1].rstrip("/") # Fallback: cwd del proceso cwd = os.readlink(f"/proc/{pid}/cwd") return cwd.rstrip("/") except OSError: return "" def _extract_analysis_name(root_dir: str) -> str: """Extrae el nombre del analisis del root_dir. Si root_dir contiene 'analysis/{nombre}', retorna '{nombre}'. En caso contrario retorna el ultimo componente del path. """ if not root_dir: return "" parts = root_dir.replace("\\", "/").split("/") # Buscar el segmento 'analysis' y tomar el siguiente for i, part in enumerate(parts): if part == "analysis" and i + 1 < len(parts) and parts[i + 1]: return parts[i + 1] # Fallback: ultimo componente del path return parts[-1] if parts else "" def _query_instance(base_url: str, port: int) -> dict | None: """Consulta la API REST de una instancia Jupyter y retorna su estado. Detecta el root_dir real del proceso via /proc (Linux) para identificar correctamente el analisis que esta sirviendo. Retorna None si la instancia no responde o no es Jupyter. """ status = _get(f"{base_url}/api/status") if status is None: return None config = _get(f"{base_url}/api/config") kernels_raw = _get(f"{base_url}/api/kernels") or [] sessions_raw = _get(f"{base_url}/api/sessions") or [] # Detectar root_dir via /proc root_dir = "" pid = _find_jupyter_pid_for_port(port) if pid is not None: root_dir = _get_root_dir_from_proc(pid) kernels = [] if isinstance(kernels_raw, list): for k in kernels_raw: if isinstance(k, dict): kernels.append({ "id": k.get("id", ""), "name": k.get("name", ""), "execution_state": k.get("execution_state", ""), "last_activity": k.get("last_activity", ""), }) sessions = [] if isinstance(sessions_raw, list): for s in sessions_raw: if isinstance(s, dict): kernel = s.get("kernel") or {} path = s.get("path") or s.get("notebook", {}).get("path", "") sessions.append({ "notebook": path, "kernel_id": kernel.get("id", ""), "kernel_state": kernel.get("execution_state", ""), }) return { "root_dir": root_dir, "kernels": kernels, "sessions": sessions, "collaborative": _is_collaborative(config), } def _scan_analysis_ports(registry_root: str) -> list[tuple[int, str]]: """Escanea subdirectorios de analysis/ buscando archivos .jupyter-port. Retorna lista de (puerto, nombre_analisis). """ root = Path(registry_root) if registry_root else Path.cwd() analysis_dir = root / "analysis" results: list[tuple[int, str]] = [] if not analysis_dir.is_dir(): return results for entry in analysis_dir.iterdir(): if not entry.is_dir(): continue port_file = entry / ".jupyter-port" if port_file.is_file(): try: port = int(port_file.read_text().strip()) results.append((port, entry.name)) except (ValueError, OSError): pass return results def jupyter_discover( registry_root: str = "", ports: list[int] | None = None, ) -> list[dict]: """Descubre instancias de Jupyter Lab activas consultando su API REST. Escanea primero los archivos .jupyter-port en subdirectorios de analysis/ para encontrar puertos registrados, y luego aplica un fallback sobre puertos comunes (8888-8892). Para cada instancia que responde consulta /api/status, /api/config, /api/kernels y /api/sessions. En Linux detecta el root_dir real del proceso Jupyter via /proc/pid/cmdline, lo que permite identificar correctamente el analisis en escenarios multi-instancia donde varios Jupyter corren en puertos distintos. Args: registry_root: Raiz del fn_registry. Si vacio usa el directorio actual o la variable de entorno FN_REGISTRY_ROOT. ports: Lista de puertos a escanear. Si None, usa los puertos encontrados en .jupyter-port mas los defaults (8888-8892). Returns: Lista de dicts con: url, port, analysis, root_dir, collaborative, kernels, sessions. Cada sesion incluye: notebook, kernel_id, kernel_state. """ if not registry_root: registry_root = os.environ.get("FN_REGISTRY_ROOT", "") # Recopilar puertos a escanear port_analysis: dict[int, str] = {} if ports is not None: for p in ports: port_analysis[p] = "" else: # Primero los registrados en .jupyter-port for port, analysis_name in _scan_analysis_ports(registry_root): port_analysis[port] = analysis_name # Fallback: puertos comunes que no estén ya en la lista for p in _DEFAULT_PORTS: if p not in port_analysis: port_analysis[p] = "" results = [] for port, analysis_hint in port_analysis.items(): base_url = f"http://localhost:{port}" info = _query_instance(base_url, port) if info is None: continue # Determinar analysis name: preferir deteccion via /proc sobre .jupyter-port root_dir = info["root_dir"] if root_dir: analysis_name = _extract_analysis_name(root_dir) else: # Fallback al hint del .jupyter-port analysis_name = analysis_hint results.append({ "url": base_url, "port": port, "analysis": analysis_name, "root_dir": root_dir, "collaborative": info["collaborative"], "kernels": info["kernels"], "sessions": info["sessions"], }) return results # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- if __name__ == "__main__": import argparse import sys parser = argparse.ArgumentParser( description="Descubre instancias de Jupyter Lab activas." ) parser.add_argument( "--registry-root", default="", help="Raiz del fn_registry (default: FN_REGISTRY_ROOT env o cwd)", ) parser.add_argument( "--port", dest="ports", type=int, action="append", metavar="PORT", help="Puerto a escanear (puede repetirse). Default: .jupyter-port + 8888-8892", ) parser.add_argument( "--json", action="store_true", help="Emitir salida en JSON", ) args = parser.parse_args() instances = jupyter_discover( registry_root=args.registry_root, ports=args.ports, ) if args.json: print(json.dumps(instances, indent=2)) sys.exit(0) if not instances: print("No se encontraron instancias de Jupyter Lab activas.") sys.exit(0) for inst in instances: collab = "colaborativo" if inst["collaborative"] else "estandar" analysis = inst["analysis"] or "(desconocido)" root_dir = inst["root_dir"] or "(no detectado)" print(f"Puerto {inst['port']} [{collab}]") print(f" url: {inst['url']}") print(f" analysis: {analysis}") print(f" root_dir: {root_dir}") kernel_count = len(inst["kernels"]) if inst["kernels"]: print(f" kernels ({kernel_count}):") for k in inst["kernels"]: print(f" - {k['name']} estado={k['execution_state']} id={k['id'][:8]}...") else: print(" kernels: ninguno") if inst["sessions"]: print(f" sesiones ({len(inst['sessions'])}):") for s in inst["sessions"]: kid = s["kernel_id"][:8] + "..." if s["kernel_id"] else "(sin kernel)" print(f" - {s['notebook']} kernel={kid} estado={s['kernel_state']}") else: print(" sesiones: ninguna") print()