Files
egutierrez 47fac22230 chore: auto-commit (799 archivos)
- .claude/CLAUDE.md
- .claude/commands/subagentes.md
- .claude/rules/INDEX.md
- .mcp.json
- bash/functions/cybersecurity/analyze_dns.md
- bash/functions/cybersecurity/audit_http_headers.md
- bash/functions/cybersecurity/audit_ssh_config.md
- bash/functions/cybersecurity/check_firewall.md
- bash/functions/cybersecurity/detect_suspicious_users.md
- bash/functions/cybersecurity/encrypt_file.md
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 00:28:20 +02:00

218 lines
6.6 KiB
Python

"""Telemetria de invocaciones del registry desde Python (issue 0085c).
Activacion explicita: env var FN_TELEMETRY=1.
Mecanismo: sys.meta_path importer que detecta cuando se carga un modulo bajo
un paquete del registry (core, finance, metabase, ...) y envuelve cada
funcion publica con un wrapper que mide duration y registra en
projects/fn_monitoring/apps/call_monitor/operations.db.
Reglas:
- No-op silencioso si BD no existe o INSERT falla. NUNCA rompe codigo cliente.
- Solo guarda function_id, tool_used, duration_ms, success, error_class.
NUNCA guarda argumentos concretos.
- Idempotente: install() puede llamarse N veces sin duplicar el finder.
- Bypass: si FN_TELEMETRY != "1", install() es no-op. La funcion sigue siendo
importable para usos manuales (wrap_namespace).
"""
from __future__ import annotations
import functools
import importlib.abc
import importlib.util
import os
import sqlite3
import sys
import time
from pathlib import Path
from typing import Iterable, Optional
_ENABLED = os.environ.get("FN_TELEMETRY", "") == "1"
_SESSION_ID = os.environ.get("CLAUDE_SESSION_ID", "")
_DB_PATH: Optional[str] = None
_REGISTRY_ROOT: Optional[str] = None
def _resolve_root() -> Optional[str]:
global _REGISTRY_ROOT
if _REGISTRY_ROOT:
return _REGISTRY_ROOT
env = os.environ.get("FN_REGISTRY_ROOT")
if env and (Path(env) / "registry.db").exists():
_REGISTRY_ROOT = env
return env
here = Path(__file__).resolve()
for parent in here.parents:
if (parent / "registry.db").exists():
_REGISTRY_ROOT = str(parent)
return _REGISTRY_ROOT
return None
def _resolve_db() -> Optional[str]:
global _DB_PATH
if _DB_PATH:
return _DB_PATH
root = _resolve_root()
if not root:
return None
candidate = Path(root) / "projects" / "fn_monitoring" / "apps" / "call_monitor" / "operations.db"
if candidate.exists():
_DB_PATH = str(candidate)
return _DB_PATH
return None
def _registry_packages() -> set[str]:
root = _resolve_root()
if not root:
return set()
fns_dir = Path(root) / "python" / "functions"
if not fns_dir.is_dir():
return set()
return {p.name for p in fns_dir.iterdir() if p.is_dir() and not p.name.startswith("_") and not p.name.startswith(".")}
def _log_call(
function_id: str,
duration_ms: float,
success: bool,
error_class: str = "",
error_snippet: str = "",
) -> None:
db = _resolve_db()
if not db:
return
try:
conn = sqlite3.connect(db, timeout=0.1, isolation_level=None)
conn.execute(
"INSERT INTO calls "
"(session_id, function_id, tool_used, args_hash, duration_ms, success, error_class, error_snippet, ts) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, CAST(strftime('%s','now') AS INTEGER))",
(
_SESSION_ID,
function_id,
"python_wrapper",
"",
int(duration_ms),
1 if success else 0,
(error_class or "")[:64],
(error_snippet or "")[:240],
),
)
conn.close()
except Exception:
pass # never break user code
def _wrap_callable(fn, function_id: str):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
t0 = time.perf_counter()
try:
result = fn(*args, **kwargs)
_log_call(function_id, (time.perf_counter() - t0) * 1000, True)
return result
except Exception as e:
_log_call(
function_id,
(time.perf_counter() - t0) * 1000,
False,
type(e).__name__,
str(e),
)
raise
wrapper.__fn_telemetered__ = True # type: ignore[attr-defined]
return wrapper
def wrap_namespace(ns_globals: dict, domain: str) -> None:
"""Wrap public callables defined in ns_globals.
function_id pattern: {func_name}_py_{domain}
Only wraps callables defined in the same module (skips re-exports).
"""
if not _ENABLED:
return
module_name = ns_globals.get("__name__", "")
for name, obj in list(ns_globals.items()):
if name.startswith("_"):
continue
if getattr(obj, "__fn_telemetered__", False):
continue
if not callable(obj):
continue
# Skip classes / exceptions / non-functions
if isinstance(obj, type):
continue
obj_module = getattr(obj, "__module__", None)
if obj_module and module_name and obj_module != module_name and not obj_module.startswith(module_name.split(".")[0]):
continue
function_id = f"{name}_py_{domain}"
ns_globals[name] = _wrap_callable(obj, function_id)
class _RegistryTelemetryLoader(importlib.abc.Loader):
def __init__(self, real_loader, domain: str):
self.real_loader = real_loader
self.domain = domain
def create_module(self, spec):
if hasattr(self.real_loader, "create_module"):
return self.real_loader.create_module(spec)
return None
def exec_module(self, module):
self.real_loader.exec_module(module)
try:
wrap_namespace(vars(module), self.domain)
except Exception:
pass
class _RegistryTelemetryFinder(importlib.abc.MetaPathFinder):
def __init__(self, packages: Iterable[str]):
self._packages = set(packages)
def find_spec(self, fullname, path, target=None):
top = fullname.split(".")[0]
if top not in self._packages:
return None
if fullname == __name__:
return None
# ask the other finders for the real spec; do not recurse on self
for finder in sys.meta_path:
if finder is self:
continue
if not hasattr(finder, "find_spec"):
continue
try:
spec = finder.find_spec(fullname, path, target)
except Exception:
continue
if spec is None or spec.loader is None:
continue
spec.loader = _RegistryTelemetryLoader(spec.loader, top)
return spec
return None
def install() -> bool:
"""Install meta_path finder. Idempotent. Returns True if installed."""
if not _ENABLED:
return False
packages = _registry_packages()
if not packages:
return False
for f in sys.meta_path:
if isinstance(f, _RegistryTelemetryFinder):
return False
sys.meta_path.insert(0, _RegistryTelemetryFinder(packages))
return True
if _ENABLED:
install()