"""Telemetria de invocaciones del registry desde Python (issue 0085c). Activacion explicita: env var FN_TELEMETRY=1. Mecanismo: sys.meta_path importer que detecta cuando se carga un modulo bajo un paquete del registry (core, finance, metabase, ...) y envuelve cada funcion publica con un wrapper que mide duration y registra en projects/fn_monitoring/apps/call_monitor/operations.db. Reglas: - No-op silencioso si BD no existe o INSERT falla. NUNCA rompe codigo cliente. - Solo guarda function_id, tool_used, duration_ms, success, error_class. NUNCA guarda argumentos concretos. - Idempotente: install() puede llamarse N veces sin duplicar el finder. - Bypass: si FN_TELEMETRY != "1", install() es no-op. La funcion sigue siendo importable para usos manuales (wrap_namespace). """ from __future__ import annotations import functools import importlib.abc import importlib.util import os import sqlite3 import sys import time from pathlib import Path from typing import Iterable, Optional _ENABLED = os.environ.get("FN_TELEMETRY", "") == "1" _SESSION_ID = os.environ.get("CLAUDE_SESSION_ID", "") _DB_PATH: Optional[str] = None _REGISTRY_ROOT: Optional[str] = None def _resolve_root() -> Optional[str]: global _REGISTRY_ROOT if _REGISTRY_ROOT: return _REGISTRY_ROOT env = os.environ.get("FN_REGISTRY_ROOT") if env and (Path(env) / "registry.db").exists(): _REGISTRY_ROOT = env return env here = Path(__file__).resolve() for parent in here.parents: if (parent / "registry.db").exists(): _REGISTRY_ROOT = str(parent) return _REGISTRY_ROOT return None def _resolve_db() -> Optional[str]: global _DB_PATH if _DB_PATH: return _DB_PATH root = _resolve_root() if not root: return None candidate = Path(root) / "projects" / "fn_monitoring" / "apps" / "call_monitor" / "operations.db" if candidate.exists(): _DB_PATH = str(candidate) return _DB_PATH return None def _registry_packages() -> set[str]: root = _resolve_root() if not root: return set() fns_dir = Path(root) / "python" / "functions" if not fns_dir.is_dir(): return set() return {p.name for p in fns_dir.iterdir() if p.is_dir() and not p.name.startswith("_") and not p.name.startswith(".")} def _log_call( function_id: str, duration_ms: float, success: bool, error_class: str = "", error_snippet: str = "", ) -> None: db = _resolve_db() if not db: return try: conn = sqlite3.connect(db, timeout=0.1, isolation_level=None) conn.execute( "INSERT INTO calls " "(session_id, function_id, tool_used, args_hash, duration_ms, success, error_class, error_snippet, ts) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, CAST(strftime('%s','now') AS INTEGER))", ( _SESSION_ID, function_id, "python_wrapper", "", int(duration_ms), 1 if success else 0, (error_class or "")[:64], (error_snippet or "")[:240], ), ) conn.close() except Exception: pass # never break user code def _wrap_callable(fn, function_id: str): @functools.wraps(fn) def wrapper(*args, **kwargs): t0 = time.perf_counter() try: result = fn(*args, **kwargs) _log_call(function_id, (time.perf_counter() - t0) * 1000, True) return result except Exception as e: _log_call( function_id, (time.perf_counter() - t0) * 1000, False, type(e).__name__, str(e), ) raise wrapper.__fn_telemetered__ = True # type: ignore[attr-defined] return wrapper def wrap_namespace(ns_globals: dict, domain: str) -> None: """Wrap public callables defined in ns_globals. function_id pattern: {func_name}_py_{domain} Only wraps callables defined in the same module (skips re-exports). """ if not _ENABLED: return module_name = ns_globals.get("__name__", "") for name, obj in list(ns_globals.items()): if name.startswith("_"): continue if getattr(obj, "__fn_telemetered__", False): continue if not callable(obj): continue # Skip classes / exceptions / non-functions if isinstance(obj, type): continue obj_module = getattr(obj, "__module__", None) if obj_module and module_name and obj_module != module_name and not obj_module.startswith(module_name.split(".")[0]): continue function_id = f"{name}_py_{domain}" ns_globals[name] = _wrap_callable(obj, function_id) class _RegistryTelemetryLoader(importlib.abc.Loader): def __init__(self, real_loader, domain: str): self.real_loader = real_loader self.domain = domain def create_module(self, spec): if hasattr(self.real_loader, "create_module"): return self.real_loader.create_module(spec) return None def exec_module(self, module): self.real_loader.exec_module(module) try: wrap_namespace(vars(module), self.domain) except Exception: pass class _RegistryTelemetryFinder(importlib.abc.MetaPathFinder): def __init__(self, packages: Iterable[str]): self._packages = set(packages) def find_spec(self, fullname, path, target=None): top = fullname.split(".")[0] if top not in self._packages: return None if fullname == __name__: return None # ask the other finders for the real spec; do not recurse on self for finder in sys.meta_path: if finder is self: continue if not hasattr(finder, "find_spec"): continue try: spec = finder.find_spec(fullname, path, target) except Exception: continue if spec is None or spec.loader is None: continue spec.loader = _RegistryTelemetryLoader(spec.loader, top) return spec return None def install() -> bool: """Install meta_path finder. Idempotent. Returns True if installed.""" if not _ENABLED: return False packages = _registry_packages() if not packages: return False for f in sys.meta_path: if isinstance(f, _RegistryTelemetryFinder): return False sys.meta_path.insert(0, _RegistryTelemetryFinder(packages)) return True if _ENABLED: install()