import time import json import requests from typing import Optional, Dict, Any class LokiLogger: """ Logger compatible con Grafana Loki / Alloy. Envía logs en formato JSON a través del endpoint HTTP de Loki. """ ALLOWED_LEVELS = ( "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "CRITICAL", "UNKNOWN", ) def __init__( self, endpoint: str = "http://127.0.0.1:3101/loki/api/v1/push", default_labels: Optional[Dict[str, str]] = None, timeout: float = 5.0, min_level: str = "DEBUG", service_name: Optional[str] = None, ): """ :param endpoint: URL completa del endpoint de Loki / Alloy. :param default_labels: etiquetas estáticas comunes (ej: {"env": "dev"}) :param timeout: timeout en segundos para la petición HTTP :param min_level: nivel mínimo para enviar logs :param service_name: nombre del servicio (se usará también como 'job') """ self.endpoint = endpoint self.timeout = timeout self.service_name = service_name or "unknown-service" min_level = min_level.upper() if min_level not in self.ALLOWED_LEVELS: raise ValueError(f"min_level debe estar en {self.ALLOWED_LEVELS}") self.min_level = min_level # Base labels: incluye el job automáticamente self.default_labels = dict(default_labels or {}) self.default_labels["job"] = self.service_name self._level_order = { "TRACE": 0, "DEBUG": 1, "INFO": 2, "WARN": 3, "ERROR": 4, "FATAL": 5, "CRITICAL": 6, "UNKNOWN": 7, } def _current_ns(self) -> str: """Devuelve timestamp actual en nanosegundos como string.""" return str(int(time.time() * 1e9)) def _should_log(self, level: str) -> bool: """Comprueba si el nivel cumple con el mínimo configurado.""" return self._level_order[level] >= self._level_order[self.min_level] def log( self, level: str, message: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None, ) -> None: """Envía un log a Loki con los campos mínimos + metadata opcional.""" level = level.upper() if level == "WARNING": level = "WARN" if level not in self.ALLOWED_LEVELS: raise ValueError(f"Nivel no válido: {level}. Debe estar en {self.ALLOWED_LEVELS}") if not self._should_log(level): return # Combinar etiquetas final_labels = dict(self.default_labels) if labels: final_labels.update(labels) payload_metadata = { "service": self.service_name, "level": level, } if metadata: payload_metadata.update(metadata) log_line = json.dumps({ "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "message": message, **payload_metadata, }) body = { "streams": [ {"stream": final_labels, "values": [[self._current_ns(), log_line]]} ] } try: resp = requests.post(self.endpoint, json=body, timeout=self.timeout) resp.raise_for_status() except Exception as e: print(f"Failed to send log to Loki: {e}", flush=True) # Métodos estándar por nivel def trace(self, message, labels=None, metadata=None): self.log("TRACE", message, labels, metadata) def debug(self, message, labels=None, metadata=None): self.log("DEBUG", message, labels, metadata) def info(self, message, labels=None, metadata=None): self.log("INFO", message, labels, metadata) def warn(self, message, labels=None, metadata=None): self.log("WARN", message, labels, metadata) def warning(self, message, labels=None, metadata=None): self.log("WARN", message, labels, metadata) def error(self, message, labels=None, metadata=None): self.log("ERROR", message, labels, metadata) def fatal(self, message, labels=None, metadata=None): self.log("FATAL", message, labels, metadata) def critical(self, message, labels=None, metadata=None): self.log("CRITICAL", message, labels, metadata) def unknown(self, message, labels=None, metadata=None): self.log("UNKNOWN", message, labels, metadata)