Implementa un agente financiero que consulta información sobre la inflación en Argentina utilizando OpenAI y herramientas de Wikipedia y Yahoo Finance. Se define un flujo de trabajo en Prefect para gestionar la ejecución y el registro de mensajes enviados al modelo.
This commit is contained in:
@@ -0,0 +1,90 @@
|
||||
import psycopg2
|
||||
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
||||
|
||||
def recreate_logs_table(conn_info, retention_days=30):
|
||||
"""
|
||||
Elimina y recrea la tabla 'logs' en TimescaleDB con configuración optimizada.
|
||||
Incluye:
|
||||
- Hypertable por 'timestamp'
|
||||
- Índices por service, timestamp y level
|
||||
- Políticas de compresión y retención automáticas
|
||||
"""
|
||||
|
||||
conn = psycopg2.connect(**conn_info)
|
||||
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
cur = conn.cursor()
|
||||
|
||||
print("⚠️ Eliminando tabla 'logs' si existe...")
|
||||
cur.execute("DROP TABLE IF EXISTS logs CASCADE;")
|
||||
|
||||
print("🔧 Creando tabla 'logs'...")
|
||||
cur.execute("""
|
||||
CREATE TABLE logs (
|
||||
id BIGSERIAL,
|
||||
service TEXT NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
metadata JSONB
|
||||
);
|
||||
""")
|
||||
|
||||
print("🧱 Configurando hypertable 'logs'...")
|
||||
cur.execute("""
|
||||
SELECT create_hypertable('logs', 'timestamp', if_not_exists => TRUE);
|
||||
""")
|
||||
|
||||
print("⚡ Creando índices...")
|
||||
cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_service_timestamp
|
||||
ON logs (service, timestamp DESC);
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_level
|
||||
ON logs (level);
|
||||
""")
|
||||
cur.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_timestamp
|
||||
ON logs (timestamp DESC);
|
||||
""")
|
||||
|
||||
print("📦 Activando compresión...")
|
||||
cur.execute("""
|
||||
ALTER TABLE logs SET (
|
||||
timescaledb.compress,
|
||||
timescaledb.compress_segmentby = 'service'
|
||||
);
|
||||
""")
|
||||
|
||||
print("🗓️ Añadiendo políticas automáticas...")
|
||||
cur.execute("""
|
||||
SELECT add_compression_policy('logs', INTERVAL '7 days');
|
||||
""")
|
||||
cur.execute(f"""
|
||||
SELECT add_retention_policy('logs', INTERVAL '{retention_days} days');
|
||||
""")
|
||||
|
||||
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
print("✅ Tabla 'logs' recreada y configurada correctamente.")
|
||||
|
||||
|
||||
|
||||
# Ejecutar la configuración al ejecutar el script directamente
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
|
||||
conn_info = {
|
||||
"dbname": "basededatos",
|
||||
"user": "postgres",
|
||||
"password": "mipassword",
|
||||
"host": "localhost",
|
||||
"port": 55432,
|
||||
}
|
||||
|
||||
recreate_logs_table(conn_info, retention_days=90)
|
||||
@@ -0,0 +1,76 @@
|
||||
from prefect import flow, task, get_run_logger
|
||||
from prefect.filesystems import LocalFileSystem
|
||||
|
||||
local_file_system_block = LocalFileSystem.load("localfile")
|
||||
|
||||
|
||||
import logging
|
||||
import psycopg2
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
|
||||
class TimescaleHandler(logging.Handler):
|
||||
def __init__(self, conn_info):
|
||||
super().__init__()
|
||||
self.conn_info = conn_info
|
||||
|
||||
def emit(self, record: logging.LogRecord):
|
||||
try:
|
||||
conn = psycopg2.connect(**self.conn_info)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
INSERT INTO logs (service, level, message, timestamp, metadata)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (
|
||||
"prefect",
|
||||
record.levelname,
|
||||
record.getMessage(),
|
||||
datetime.now(timezone.utc),
|
||||
json.dumps({
|
||||
"filename": record.pathname,
|
||||
"lineno": record.lineno,
|
||||
"func": record.funcName,
|
||||
"module": record.module,
|
||||
}),
|
||||
))
|
||||
conn.commit()
|
||||
cur.close()
|
||||
conn.close()
|
||||
except Exception as e:
|
||||
print(f"[TimescaleHandler] Error al guardar log: {e}")
|
||||
|
||||
# 🧩 Tarea Prefect con logging
|
||||
@task(name="tarea_log", log_prints=True)
|
||||
def tarea_log():
|
||||
conn_info = {
|
||||
"dbname": "basededatos",
|
||||
"user": "postgres",
|
||||
"password": "mipassword",
|
||||
"host": "localhost",
|
||||
"port": 55432,
|
||||
}
|
||||
|
||||
# Prefect ya crea este logger con el contexto actual
|
||||
logger = get_run_logger()
|
||||
timescale_handler = TimescaleHandler(conn_info)
|
||||
|
||||
# 💡 Aseguramos que se guarden también los DEBUG
|
||||
logger.logger.setLevel(logging.DEBUG)
|
||||
timescale_handler.setLevel(logging.DEBUG)
|
||||
logger.logger.addHandler(timescale_handler)
|
||||
|
||||
# Ahora todo lo que se loguee aquí también se guarda en TimescaleDB
|
||||
logger.debug("Mensaje debug: variables inicializadas correctamente")
|
||||
logger.info("Iniciando tarea Prefect con logging en Timescale 🚀")
|
||||
logger.warning("Advertencia de ejemplo")
|
||||
logger.error("Error simulado para prueba")
|
||||
|
||||
# Puedes seguir usando print() si quieres, pero prefieren logger
|
||||
print("Esto solo se mostrará en stdout, no se guarda en Timescale")
|
||||
|
||||
@flow(name="comprobar_logs", result_storage=local_file_system_block, log_prints=True) # type: ignore
|
||||
def comprobar_logs():
|
||||
tarea_log()
|
||||
|
||||
if __name__ == "__main__":
|
||||
comprobar_logs()
|
||||
Reference in New Issue
Block a user