Files
suite_logs/Logger/LokiLogger.py
T
2025-10-31 18:09:43 +01:00

186 lines
5.8 KiB
Python

import time
import json
import requests
import traceback
from typing import Optional, Dict, Any, Callable
from functools import wraps
class LokiLogger:
"""
Logger compatible con Grafana Loki / Alloy.
Envía logs en formato JSON a través del endpoint HTTP de Loki.
"""
ALLOWED_LEVELS = (
"TRACE",
"DEBUG",
"INFO",
"WARN",
"ERROR",
"FATAL",
"CRITICAL",
"UNKNOWN",
)
def __init__(
self,
endpoint: str = "http://127.0.0.1:3101/loki/api/v1/push",
default_labels: Optional[Dict[str, str]] = None,
timeout: float = 5.0,
min_level: str = "DEBUG",
service_name: Optional[str] = None,
):
"""
:param endpoint: URL completa del endpoint de Loki / Alloy.
:param default_labels: etiquetas estáticas comunes (ej: {"env": "dev"})
:param timeout: timeout en segundos para la petición HTTP
:param min_level: nivel mínimo para enviar logs
:param service_name: nombre del servicio (se usará también como 'job')
"""
self.endpoint = endpoint
self.timeout = timeout
self.service_name = service_name or "unknown-service"
min_level = min_level.upper()
if min_level not in self.ALLOWED_LEVELS:
raise ValueError(f"min_level debe estar en {self.ALLOWED_LEVELS}")
self.min_level = min_level
# Base labels: incluye el job automáticamente
self.default_labels = dict(default_labels or {})
self.default_labels["service"] = self.service_name
self._level_order = {
"TRACE": 0,
"DEBUG": 1,
"INFO": 2,
"WARN": 3,
"ERROR": 4,
"FATAL": 5,
"CRITICAL": 6,
"UNKNOWN": 7,
}
def _current_ns(self) -> str:
"""Devuelve timestamp actual en nanosegundos como string."""
return str(int(time.time() * 1e9))
def _should_log(self, level: str) -> bool:
"""Comprueba si el nivel cumple con el mínimo configurado."""
return self._level_order[level] >= self._level_order[self.min_level]
def log(
self,
level: str,
message: str,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Envía un log a Loki con los campos mínimos + metadata opcional."""
level = level.upper()
if level == "WARNING":
level = "WARN"
if level not in self.ALLOWED_LEVELS:
raise ValueError(f"Nivel no válido: {level}. Debe estar en {self.ALLOWED_LEVELS}")
if not self._should_log(level):
return
# Combinar etiquetas
final_labels = dict(self.default_labels)
if labels:
final_labels.update(labels)
payload_metadata = {
"service": self.service_name,
"level": level,
}
if metadata:
payload_metadata.update(metadata)
log_line = json.dumps({
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"message": message,
**payload_metadata,
})
body = {
"streams": [
{"stream": final_labels, "values": [[self._current_ns(), log_line]]}
]
}
try:
resp = requests.post(self.endpoint, json=body, timeout=self.timeout)
resp.raise_for_status()
except Exception as e:
print(f"Failed to send log to Loki: {e}", flush=True)
# Métodos estándar por nivel
def trace(self, message, labels=None, metadata=None):
self.log("TRACE", message, labels, metadata)
def debug(self, message, labels=None, metadata=None):
self.log("DEBUG", message, labels, metadata)
def info(self, message, labels=None, metadata=None):
self.log("INFO", message, labels, metadata)
def warn(self, message, labels=None, metadata=None):
self.log("WARN", message, labels, metadata)
def warning(self, message, labels=None, metadata=None):
self.log("WARN", message, labels, metadata)
def error(self, message, labels=None, metadata=None):
self.log("ERROR", message, labels, metadata)
def exception(self, exc: Exception, labels=None, metadata=None):
"""
Registra una excepción con traceback incluido.
Uso típico dentro de un bloque try/except.
"""
tb = traceback.format_exc()
message = str(exc)
metadata = metadata or {}
metadata["traceback"] = tb
self.log("ERROR", message, labels=labels, metadata=metadata)
def fatal(self, message, labels=None, metadata=None):
self.log("FATAL", message, labels, metadata)
def critical(self, message, labels=None, metadata=None):
self.log("CRITICAL", message, labels, metadata)
def unknown(self, message, labels=None, metadata=None):
self.log("UNKNOWN", message, labels, metadata)
# 🧩 Nuevo decorador
def catch_exceptions(self, reraise: bool = False):
"""
Decorador que captura cualquier excepción dentro de la función,
la loguea con traceback, y opcionalmente la relanza.
Ejemplo:
@logger.catch_exceptions()
def run_job():
...
"""
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
tb = traceback.format_exc()
self.error(
f"Exception en función '{func.__name__}': {e}",
metadata={"traceback": tb},
)
if reraise:
raise
return wrapper
return decorator