This repository has been archived on 2025-11-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
conseguir_datos_con_agentes/Agente_de_datos_de_ventas.py
T
egutierrez 28e50b921c feat: Implement new sales data analysis agents and utilities
- Added Router_de_agentes.py to manage agent interactions for sales data analysis.
- Created Analizador_de_datos_de_ventas.yaml for generating structured text reports from sales data.
- Developed Generador_sql_ventas.yaml for generating SQL queries to analyze sales data.
- Established Router_de_agente.yaml as a routing mechanism for agent requests.
- Compiled centros_disponibles.md listing available sales centers.
- Introduced primera_ejecucion_de_un_agente.py as an example for executing agents.
- Added ver_los_prompts_de_un_agente.py to inspect prompts sent to OpenAI.
- Included service account key for BigQuery access in rag-datasets-reader-sa-key.json.
- Defined schema for sales data in Objeto_ventas.json.
- Implemented utility functions for querying BigQuery in conseguir_datos_bq.py.
- Created data transformation utilities in transformar_datos.py for handling decimal and date formats.
2025-10-06 18:48:19 +02:00

378 lines
12 KiB
Python

# Constantes ##########################################################################
PROMPT = "Dame los datos de ventas de los ultimos 3 meses del centro de Alcobendas"
# Librerias de Agno ################################################################
from agno.agent import Agent
from agno.models.openai import OpenAIChat
# Prefect imports ##################################################################
from prefect import task, flow
from prefect.logging import get_run_logger
from prefect.filesystems import LocalFileSystem
local_file_system_block = LocalFileSystem.load("localfile")
# Cargar variables de entorno ######################################################
import os
from dotenv import load_dotenv
load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY")
# Imports adicionales #########################################################
import traceback
from uuid import uuid4
# Importaciones de Archivos #########################################################
from jinja2 import Template
import yaml
import json
# Utils ###################################################################################
from utils.conseguir_datos_bq import consultar_bigquery_paginado
from utils.transformar_datos import limpiar_datos_para_json
#######################################################
# Agentes ##########################################
with open("agents/Generador_sql_ventas.yaml", "r", encoding="utf-8") as f:
experto_sql_ventas = yaml.safe_load(f)
with open("agents/Analizador_de_datos_de_ventas.yaml", "r", encoding="utf-8") as f:
analizador_de_datos_de_ventas = yaml.safe_load(f)
#########################################################
# Datos extras ##########################
with open("schemas_bbdd/Objeto_ventas.json", "r", encoding="utf-8") as f:
schema_json_ventas = json.load(f)
with open("detalles_para_agentes/centros_disponibles.md", "r", encoding="utf-8") as f:
detalles_centros = f.read()
#############################################################
# Datos añadidos a los agentes ##########################
schema_str_ventas = json.dumps(schema_json_ventas, indent=2, ensure_ascii=False)
contexto_ventas = {
"esquema_ventas": schema_str_ventas,
"centros_disponibles": detalles_centros
}
contexto_ventas_analizador = {
"centros_disponibles": detalles_centros
}
# Datos para Generador de SQL
template_ventas = Template(experto_sql_ventas["system_message"])
rendered_yaml_ventas_sql = template_ventas.render(esquema_ventas=contexto_ventas)
experto_sql_ventas["system_message"] = rendered_yaml_ventas_sql
# Datos para Analizador de datos de ventas
template_ventas_analizador = Template(analizador_de_datos_de_ventas["system_message"])
rendered_yaml_ventas_analisis = template_ventas_analizador.render(esquema_ventas=contexto_ventas_analizador)
analizador_de_datos_de_ventas["system_message"] = rendered_yaml_ventas_analisis
###############################################################
# Tareas ##########################################################
# Generar SQL ventas ######################################
@task(name="Convierte_prompt_a_sql", log_prints=True)
def Convierte_prompt_a_sql(prompt_de_usuario: str):
prefect_logger = get_run_logger()
prefect_logger.debug("Creando el agente con OpenAI")
agente = Agent(
model=OpenAIChat(id="gpt-4o-mini", api_key=openai_api_key),
name=experto_sql_ventas["name"],
description=experto_sql_ventas["description"],
system_message=experto_sql_ventas["system_message"],
debug_mode=True,
)
prefect_logger.debug("Agente creado correctamente")
prefect_logger.debug("Enviando el prompt al agente")
try:
resultado = agente.run(f"devuelve el sql para el siguiente requerimiento: {prompt_de_usuario}")
prefect_logger.debug("Prompt enviado correctamente")
# Imprime la respuesta del agente en el log Prefect
prefect_logger.info("=== RESPUESTA DEL AGENTE ===")
prefect_logger.info(str(resultado.content.strip())) # imprime texto generado
prefect_logger.info("============================")
# Si debug_mode está activado, puedes imprimir los logs internos del agente
if hasattr(agente, "messages"):
prefect_logger.debug("=== LOG INTERNO DEL AGENTE ===")
for msg in agente.messages:
prefect_logger.debug(f"{msg.role}: {msg.content}")
prefect_logger.debug("===============================")
prefect_logger.debug("Ejecución completada correctamente ✅")
return resultado.content.strip()
except Exception as e:
prefect_logger.error("Error al enviar el prompt al agente")
traceback_str = traceback.format_exc()
prefect_logger.error(f"Traceback: {traceback_str}")
raise e
# Conseguir datos de ventas a partir del SQL generado ##############################
@task(name="Consigue_datos_ventas", log_prints=True)
def Consigue_datos_ventas(sql_generado_por_agente: str, num_pagina_deseada: int = 1):
prefect_logger = get_run_logger()
prefect_logger.info("Iniciando consulta de datos de ventas en BigQuery...")
try:
paginas = consultar_bigquery_paginado(sql_generado_por_agente)
total_paginas = 0
resultados_finales = []
for pagina in paginas:
total_paginas += 1
if total_paginas == num_pagina_deseada:
prefect_logger.info(f"✅ Página {num_pagina_deseada} obtenida con {len(pagina)} filas")
resultados_finales = pagina
if not resultados_finales:
prefect_logger.warning(f"⚠️ No se encontró la página {num_pagina_deseada}. Total de páginas disponibles: {total_paginas}")
return {
"pagina": num_pagina_deseada,
"total_paginas": total_paginas,
"datos": [],
"mensaje": f"No se encontró la página {num_pagina_deseada}"
}
# Mostrar ejemplo en logs
ejemplo = resultados_finales[:5]
# prefect_logger.info("Ejemplo de datos obtenidos:")
# for fila in ejemplo:
# prefect_logger.info(str(fila))
prefect_logger.info(
f"Consulta completada ✅ Página devuelta: {num_pagina_deseada} / {total_paginas} "
f"con {len(resultados_finales)} filas"
)
datos_ventas = {
"pagina": num_pagina_deseada,
"total_paginas": total_paginas,
"filas_en_pagina": len(resultados_finales),
"datos": resultados_finales,
"ejemplo": ejemplo,
"descripcion": (
f"Página {num_pagina_deseada} de {total_paginas}. "
f"Contiene {len(resultados_finales)} filas. "
"Los datos son los resultados de la consulta SQL proporcionada."
)
}
prefect_logger.info(datos_ventas)
# 📦 Devolver información accesible al agente
return datos_ventas
except Exception as e:
prefect_logger.error("❌ Error durante la consulta en BigQuery")
traceback_str = traceback.format_exc()
prefect_logger.error(f"Traceback: {traceback_str}")
raise e
# Analizador de resultados ##########################################################
@task(name="Analizador_de_resultados", log_prints=True)
def Analizador_de_resultados(datos_ventas: dict, prompt_de_usuario: str):
prefect_logger = get_run_logger()
prefect_logger.info("Iniciando análisis de resultados de ventas...")
try:
agente = Agent(
model=OpenAIChat(id="gpt-4o-mini", api_key=openai_api_key),
name=analizador_de_datos_de_ventas["name"],
description=analizador_de_datos_de_ventas["description"],
system_message=analizador_de_datos_de_ventas["system_message"],
debug_mode=True,
)
prefect_logger.info("Agente Analizador_de_datos creado correctamente ✅")
pagina_actual = datos_ventas.get("pagina", 1)
total_paginas = datos_ventas.get("total_paginas", 1)
filas_en_pagina = datos_ventas.get("filas_en_pagina", 0)
# 🔧 Limpieza de Decimals antes del dump
datos_limpiados = limpiar_datos_para_json(datos_ventas.get("datos", []))
datos_json = json.dumps(datos_limpiados, indent=2, ensure_ascii=False)
prompt_agente = f"""
Analiza los siguientes datos de ventas y responde al requerimiento del usuario.
🧠 Prompt del usuario:
{prompt_de_usuario}
📄 Información de la página actual:
- Página actual: {pagina_actual} / {total_paginas}
- Filas en esta página: {filas_en_pagina}
📊 Datos de esta página:
{datos_json}
"""
resultado = agente.run(prompt_agente)
prefect_logger.info("✅ Análisis completado correctamente")
prefect_logger.info("=== RESULTADO DEL ANÁLISIS ===")
prefect_logger.info(resultado.content.strip())
prefect_logger.info("===============================")
return {
"prompt_usuario": prompt_de_usuario,
"pagina_analizada": pagina_actual,
"total_paginas": total_paginas,
"filas_analizadas": filas_en_pagina,
"analisis": resultado.content.strip(),
"datos_analizados": datos_limpiados,
}
except Exception as e:
prefect_logger.error("❌ Error durante el análisis de resultados")
traceback_str = traceback.format_exc()
prefect_logger.error(f"Traceback: {traceback_str}")
raise e
# FLUJO PRINCIPAL ##########################################################
@flow(name="Agente_ventas", result_storage=local_file_system_block, log_prints=True) # type: ignore
def Agente_ventas(PROMPT, num_pagina: int = 1):
prefect_logger = get_run_logger()
prefect_logger.info("🚀 Iniciando flujo Agente_ventas...")
# 🧠 1. Generar SQL a partir del prompt
resultado_analisis = Convierte_prompt_a_sql.submit(PROMPT).result()
# 🧮 2. Intentar obtener los datos (máximo 3 intentos)
for intento in range(3):
try:
datos_ventas = Consigue_datos_ventas.submit(
resultado_analisis, num_pagina_deseada=num_pagina
).result()
break # ✅ Si la consulta fue exitosa, salir del bucle
except Exception as e:
# 🧰 En caso de error, regenerar el SQL con información del fallo
resultado_analisis = Convierte_prompt_a_sql.submit(f"""
El SQL generado previamente daba error. El SQL era: {resultado_analisis}.
El error fue: {str(e)}.
Corrige el SQL para que no dé error y vuelve a generarlo.
El prompt del usuario era: {PROMPT}
""").result()
if intento == 2:
raise e # ❌ Si falla 3 veces, detener el flujo
# 🧾 3. Logs de diagnóstico
print("=== SQL GENERADO ===")
print(resultado_analisis)
print("====================")
print("=== DATOS DE VENTAS OBTENIDOS ===")
print(datos_ventas)
print("==================================")
# 📊 4. Analizar los resultados con el agente analista
prefect_logger.info(f"🔎 Analizando los resultados de la página {num_pagina}...")
analisis = Analizador_de_resultados.submit(
datos_ventas, PROMPT
).result()
# 🧠 5. Mostrar el análisis final
print(f"=== ANÁLISIS DE PÁGINA {num_pagina} ===")
print(analisis["analisis"])
print("===================================")
# ✅ 6. Devolver resultados combinados
return {
"sql_generado": resultado_analisis,
"analisis": analisis,
}
# Ejecución directa del flujo
if __name__ == "__main__":
# num_pagina=1 → analiza solo una página
Agente_ventas(PROMPT, num_pagina=1)