77 lines
2.4 KiB
Python
77 lines
2.4 KiB
Python
from prefect import flow, task, get_run_logger
|
|
from prefect.filesystems import LocalFileSystem
|
|
|
|
local_file_system_block = LocalFileSystem.load("localfile")
|
|
|
|
|
|
import logging
|
|
import psycopg2
|
|
from datetime import datetime, timezone
|
|
import json
|
|
|
|
class TimescaleHandler(logging.Handler):
|
|
def __init__(self, conn_info):
|
|
super().__init__()
|
|
self.conn_info = conn_info
|
|
|
|
def emit(self, record: logging.LogRecord):
|
|
try:
|
|
conn = psycopg2.connect(**self.conn_info)
|
|
cur = conn.cursor()
|
|
cur.execute("""
|
|
INSERT INTO logs (service, level, message, timestamp, metadata)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""", (
|
|
"prefect",
|
|
record.levelname,
|
|
record.getMessage(),
|
|
datetime.now(timezone.utc),
|
|
json.dumps({
|
|
"filename": record.pathname,
|
|
"lineno": record.lineno,
|
|
"func": record.funcName,
|
|
"module": record.module,
|
|
}),
|
|
))
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
except Exception as e:
|
|
print(f"[TimescaleHandler] Error al guardar log: {e}")
|
|
|
|
# 🧩 Tarea Prefect con logging
|
|
@task(name="tarea_log", log_prints=True)
|
|
def tarea_log():
|
|
conn_info = {
|
|
"dbname": "basededatos",
|
|
"user": "postgres",
|
|
"password": "mipassword",
|
|
"host": "localhost",
|
|
"port": 55432,
|
|
}
|
|
|
|
# Prefect ya crea este logger con el contexto actual
|
|
logger = get_run_logger()
|
|
timescale_handler = TimescaleHandler(conn_info)
|
|
|
|
# 💡 Aseguramos que se guarden también los DEBUG
|
|
logger.logger.setLevel(logging.DEBUG)
|
|
timescale_handler.setLevel(logging.DEBUG)
|
|
logger.logger.addHandler(timescale_handler)
|
|
|
|
# Ahora todo lo que se loguee aquí también se guarda en TimescaleDB
|
|
logger.debug("Mensaje debug: variables inicializadas correctamente")
|
|
logger.info("Iniciando tarea Prefect con logging en Timescale 🚀")
|
|
logger.warning("Advertencia de ejemplo")
|
|
logger.error("Error simulado para prueba")
|
|
|
|
# Puedes seguir usando print() si quieres, pero prefieren logger
|
|
print("Esto solo se mostrará en stdout, no se guarda en Timescale")
|
|
|
|
@flow(name="comprobar_logs", result_storage=local_file_system_block, log_prints=True) # type: ignore
|
|
def comprobar_logs():
|
|
tarea_log()
|
|
|
|
if __name__ == "__main__":
|
|
comprobar_logs()
|