This repository has been archived on 2025-11-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files

77 lines
2.4 KiB
Python

from prefect import flow, task, get_run_logger
from prefect.filesystems import LocalFileSystem
local_file_system_block = LocalFileSystem.load("localfile")
import logging
import psycopg2
from datetime import datetime, timezone
import json
class TimescaleHandler(logging.Handler):
def __init__(self, conn_info):
super().__init__()
self.conn_info = conn_info
def emit(self, record: logging.LogRecord):
try:
conn = psycopg2.connect(**self.conn_info)
cur = conn.cursor()
cur.execute("""
INSERT INTO logs (service, level, message, timestamp, metadata)
VALUES (%s, %s, %s, %s, %s)
""", (
"prefect",
record.levelname,
record.getMessage(),
datetime.now(timezone.utc),
json.dumps({
"filename": record.pathname,
"lineno": record.lineno,
"func": record.funcName,
"module": record.module,
}),
))
conn.commit()
cur.close()
conn.close()
except Exception as e:
print(f"[TimescaleHandler] Error al guardar log: {e}")
# 🧩 Tarea Prefect con logging
@task(name="tarea_log", log_prints=True)
def tarea_log():
conn_info = {
"dbname": "basededatos",
"user": "postgres",
"password": "mipassword",
"host": "localhost",
"port": 55432,
}
# Prefect ya crea este logger con el contexto actual
logger = get_run_logger()
timescale_handler = TimescaleHandler(conn_info)
# 💡 Aseguramos que se guarden también los DEBUG
logger.logger.setLevel(logging.DEBUG)
timescale_handler.setLevel(logging.DEBUG)
logger.logger.addHandler(timescale_handler)
# Ahora todo lo que se loguee aquí también se guarda en TimescaleDB
logger.debug("Mensaje debug: variables inicializadas correctamente")
logger.info("Iniciando tarea Prefect con logging en Timescale 🚀")
logger.warning("Advertencia de ejemplo")
logger.error("Error simulado para prueba")
# Puedes seguir usando print() si quieres, pero prefieren logger
print("Esto solo se mostrará en stdout, no se guarda en Timescale")
@flow(name="comprobar_logs", result_storage=local_file_system_block, log_prints=True) # type: ignore
def comprobar_logs():
tarea_log()
if __name__ == "__main__":
comprobar_logs()