import time import json import requests import traceback from typing import Optional, Dict, Any, Callable from functools import wraps class LokiLogger: """ Logger compatible con Grafana Loki / Alloy. Envía logs en formato JSON a través del endpoint HTTP de Loki. """ ALLOWED_LEVELS = ( "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "CRITICAL", "UNKNOWN", ) def __init__( self, endpoint: str = "http://127.0.0.1:3101/loki/api/v1/push", add_labels: Optional[Dict[str, str]] = None, timeout: float = 5.0, min_level: str = "DEBUG", service_name: Optional[str] = None, ): """ :param endpoint: URL completa del endpoint de Loki / Alloy. :param add_labels: etiquetas adicionales comunes (ej: {"env": "dev"}) :param timeout: timeout en segundos para la petición HTTP :param min_level: nivel mínimo para enviar logs :param service_name: nombre del servicio (usado como 'service_name' label) """ self.endpoint = endpoint self.timeout = timeout self.service_name = service_name or "unknown-service" min_level = min_level.upper() if min_level not in self.ALLOWED_LEVELS: raise ValueError(f"min_level debe estar en {self.ALLOWED_LEVELS}") self.min_level = min_level # 🔹 Labels adicionales del constructor self.add_labels = dict(add_labels or {}) self._level_order = { "TRACE": 0, "DEBUG": 1, "INFO": 2, "WARN": 3, "ERROR": 4, "FATAL": 5, "CRITICAL": 6, "UNKNOWN": 7, } def _current_ns(self) -> str: """Devuelve timestamp actual en nanosegundos como string.""" return str(int(time.time() * 1e9)) def _should_log(self, level: str) -> bool: """Comprueba si el nivel cumple con el mínimo configurado.""" return self._level_order[level] >= self._level_order[self.min_level] def log( self, level: str, message: str, add_labels: Optional[Dict[str, str]] = None, add_fields: Optional[Dict[str, Any]] = None, ) -> None: """Envía un log a Loki con los campos mínimos (timestamp + message).""" level = level.upper() if level == "WARNING": level = "WARN" if level not in self.ALLOWED_LEVELS: raise ValueError(f"Nivel no válido: {level}. Debe estar en {self.ALLOWED_LEVELS}") if not self._should_log(level): return # 🔸 Construimos labels: solo 2 labels por defecto final_labels = { "service_name": self.service_name, "detected_level": level, } # Agrega labels adicionales del constructor if self.add_labels: final_labels.update(self.add_labels) # Agrega labels adicionales del método if add_labels: final_labels.update(add_labels) # 🧾 El log line lleva timestamp + message + campos adicionales log_line_data = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime()), "message": message, } # Agrega campos adicionales si se pasan if add_fields: log_line_data.update(add_fields) log_line = json.dumps(log_line_data, ensure_ascii=False) body = { "streams": [ {"stream": final_labels, "values": [[self._current_ns(), log_line]]} ] } try: resp = requests.post(self.endpoint, json=body, timeout=self.timeout) resp.raise_for_status() except Exception as e: print(f"Failed to send log to Loki: {e}", flush=True) # Métodos estándar por nivel def trace(self, message, add_labels=None, add_fields=None): self.log("TRACE", message, add_labels, add_fields) def debug(self, message, add_labels=None, add_fields=None): self.log("DEBUG", message, add_labels, add_fields) def info(self, message, add_labels=None, add_fields=None): self.log("INFO", message, add_labels, add_fields) def warn(self, message, add_labels=None, add_fields=None): self.log("WARN", message, add_labels, add_fields) def warning(self, message, add_labels=None, add_fields=None): self.log("WARN", message, add_labels, add_fields) def error(self, message, add_labels=None, add_fields=None): self.log("ERROR", message, add_labels, add_fields) def exception(self, exc: Exception, add_labels=None, add_fields=None): """Registra una excepción con traceback incluido.""" tb = traceback.format_exc() message = str(exc) self.log("ERROR", f"{message}\n{tb}", add_labels=add_labels, add_fields=add_fields) def fatal(self, message, add_labels=None, add_fields=None): self.log("FATAL", message, add_labels, add_fields) def critical(self, message, add_labels=None, add_fields=None): self.log("CRITICAL", message, add_labels, add_fields) def unknown(self, message, add_labels=None, add_fields=None): self.log("UNKNOWN", message, add_labels, add_fields) # 🧩 Decorador para capturar excepciones def catch_exceptions(self, reraise: bool = False): """Decorador que captura excepciones y las loguea.""" def decorator(func: Callable): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: tb = traceback.format_exc() self.error( f"Exception en función '{func.__name__}': {e}\n{tb}" ) if reraise: raise return wrapper return decorator