añadido logger de prueba

This commit is contained in:
2025-10-22 13:08:17 +02:00
parent b294dbb80c
commit e6de5c7223
4 changed files with 171 additions and 4 deletions
+148
View File
@@ -0,0 +1,148 @@
import time
import json
import requests
from typing import Optional, Dict, Any
class LokiLogger:
"""
Logger compatible con Grafana Loki / Alloy.
Envía logs en formato JSON a través del endpoint HTTP de Loki.
"""
ALLOWED_LEVELS = (
"TRACE",
"DEBUG",
"INFO",
"WARN",
"ERROR",
"FATAL",
"CRITICAL",
"UNKNOWN",
)
def __init__(
self,
endpoint: str = "http://127.0.0.1:3101/loki/api/v1/push",
default_labels: Optional[Dict[str, str]] = None,
timeout: float = 5.0,
min_level: str = "DEBUG"
):
"""
:param endpoint: URL completa del endpoint de Loki / Alloy.
:param default_labels: etiquetas estáticas por defecto para todos los logs,
ej: {"job": "my-service", "env": "prod"}
:param timeout: timeout en segundos para la petición HTTP
:param min_level: nivel mínimo para enviar logs. Ej: "INFO" solo enviará INFO o más graves.
"""
self.endpoint = endpoint
self.default_labels = default_labels or {}
self.timeout = timeout
min_level = min_level.upper()
if min_level not in self.ALLOWED_LEVELS:
raise ValueError(f"min_level debe estar en {self.ALLOWED_LEVELS}")
self.min_level = min_level
# Asigna prioridad (menor valor = más detallado)
self._level_order = {
"TRACE": 0,
"DEBUG": 1,
"INFO": 2,
"WARN": 3,
"ERROR": 4,
"FATAL": 5,
"CRITICAL": 6,
"UNKNOWN": 7,
}
def _current_ns(self) -> str:
"""Devuelve timestamp actual en nanosegundos como string."""
return str(int(time.time() * 1e9))
def _should_log(self, level: str) -> bool:
"""Comprueba si el nivel cumple con el mínimo configurado."""
return self._level_order[level] >= self._level_order[self.min_level]
def log(
self,
level: str,
message: str,
service: str,
labels: Optional[Dict[str, str]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> None:
"""
Envía un log a Loki con los campos mínimos + metadata opcional.
"""
level = level.upper()
# Soporte retroactivo
if level == "WARNING":
level = "WARN"
if level not in self.ALLOWED_LEVELS:
raise ValueError(f"Nivel no válido: {level}. Debe estar en {self.ALLOWED_LEVELS}")
if not self._should_log(level):
return # No enviar porque está por debajo del umbral
# Combinar etiquetas
final_labels = dict(self.default_labels)
if labels:
final_labels.update(labels)
# Payload base
payload_metadata = {
"service": service,
"level": level
}
if metadata:
payload_metadata.update(metadata)
# Construir línea JSON
log_line = json.dumps({
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"message": message,
**payload_metadata
})
body = {
"streams": [
{
"stream": final_labels,
"values": [
[self._current_ns(), log_line]
]
}
]
}
# Enviar al endpoint Loki
try:
resp = requests.post(self.endpoint, json=body, timeout=self.timeout)
resp.raise_for_status()
except Exception as e:
print(f"Failed to send log to Loki: {e}", flush=True)
# Métodos por nivel estándar
def trace(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("TRACE", message, service, labels, metadata)
def debug(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("DEBUG", message, service, labels, metadata)
def info(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("INFO", message, service, labels, metadata)
def warn(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("WARN", message, service, labels, metadata)
def error(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("ERROR", message, service, labels, metadata)
def fatal(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("FATAL", message, service, labels, metadata)
def critical(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("CRITICAL", message, service, labels, metadata)
def unknown(self, message: str, service: str, labels: Optional[Dict[str, str]] = None, metadata: Optional[Dict[str, Any]] = None):
self.log("UNKNOWN", message, service, labels, metadata)
View File
+1 -1
View File
@@ -93,7 +93,7 @@ services:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
- "3500:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin123
+22 -3
View File
@@ -1,6 +1,25 @@
def main():
print("Hello from suite-logs!")
from Logger.LokiLogger import LokiLogger
def prueba_log():
logger = LokiLogger(
default_labels={"job": "prueba_ejemplo",
# "env": "production"
},
min_level="DEBUG"
)
logger.trace("Inicio del proceso de ETL", service="etl")
logger.debug("Carga de datos completada", service="etl")
logger.info("Pipeline ejecutado correctamente", service="etl")
logger.warn("Latencia superior a lo esperado", service="etl", metadata={"latency_ms": 850})
logger.error("Error al conectar con base de datos", service="etl", metadata={"db_host": "postgres"})
logger.fatal("Fallo crítico en nodo principal", service="etl")
logger.critical("Memoria insuficiente para procesamiento", service="etl")
logger.unknown("Log sin nivel detectado", service="etl")
if __name__ == "__main__":
main()
prueba_log()