# Constantes ########################################################################## PROMPT = "Dame los datos de ventas de los ultimos 3 meses del centro de Alcobendas" # Librerias de Agno ################################################################ from agno.agent import Agent from agno.models.openai import OpenAIChat # Prefect imports ################################################################## from prefect import task, flow from prefect.logging import get_run_logger from prefect.filesystems import LocalFileSystem local_file_system_block = LocalFileSystem.load("localfile") # Cargar variables de entorno ###################################################### import os from dotenv import load_dotenv load_dotenv() openai_api_key = os.getenv("OPENAI_API_KEY") # Imports adicionales ######################################################### import traceback from uuid import uuid4 # Importaciones de Archivos ######################################################### from jinja2 import Template import yaml import json # Utils ################################################################################### from utils.conseguir_datos_bq import consultar_bigquery_paginado from utils.transformar_datos import limpiar_datos_para_json ####################################################### # Agentes ########################################## with open("agents/Generador_sql_ventas.yaml", "r", encoding="utf-8") as f: experto_sql_ventas = yaml.safe_load(f) with open("agents/Analizador_de_datos_de_ventas.yaml", "r", encoding="utf-8") as f: analizador_de_datos_de_ventas = yaml.safe_load(f) ######################################################### # Datos extras ########################## with open("schemas_bbdd/Objeto_ventas.json", "r", encoding="utf-8") as f: schema_json_ventas = json.load(f) with open("detalles_para_agentes/centros_disponibles.md", "r", encoding="utf-8") as f: detalles_centros = f.read() ############################################################# # Datos añadidos a los agentes ########################## schema_str_ventas = json.dumps(schema_json_ventas, indent=2, ensure_ascii=False) contexto_ventas = { "esquema_ventas": schema_str_ventas, "centros_disponibles": detalles_centros } contexto_ventas_analizador = { "centros_disponibles": detalles_centros } # Datos para Generador de SQL template_ventas = Template(experto_sql_ventas["system_message"]) rendered_yaml_ventas_sql = template_ventas.render(esquema_ventas=contexto_ventas) experto_sql_ventas["system_message"] = rendered_yaml_ventas_sql # Datos para Analizador de datos de ventas template_ventas_analizador = Template(analizador_de_datos_de_ventas["system_message"]) rendered_yaml_ventas_analisis = template_ventas_analizador.render(esquema_ventas=contexto_ventas_analizador) analizador_de_datos_de_ventas["system_message"] = rendered_yaml_ventas_analisis ############################################################### # Tareas ########################################################## # Generar SQL ventas ###################################### @task(name="Convierte_prompt_a_sql", log_prints=True) def Convierte_prompt_a_sql(prompt_de_usuario: str): prefect_logger = get_run_logger() prefect_logger.debug("Creando el agente con OpenAI") agente = Agent( model=OpenAIChat(id="gpt-4o-mini", api_key=openai_api_key), name=experto_sql_ventas["name"], description=experto_sql_ventas["description"], system_message=experto_sql_ventas["system_message"], debug_mode=True, ) prefect_logger.debug("Agente creado correctamente") prefect_logger.debug("Enviando el prompt al agente") try: resultado = agente.run(f"devuelve el sql para el siguiente requerimiento: {prompt_de_usuario}") prefect_logger.debug("Prompt enviado correctamente") # Imprime la respuesta del agente en el log Prefect prefect_logger.info("=== RESPUESTA DEL AGENTE ===") prefect_logger.info(str(resultado.content.strip())) # imprime texto generado prefect_logger.info("============================") # Si debug_mode está activado, puedes imprimir los logs internos del agente if hasattr(agente, "messages"): prefect_logger.debug("=== LOG INTERNO DEL AGENTE ===") for msg in agente.messages: prefect_logger.debug(f"{msg.role}: {msg.content}") prefect_logger.debug("===============================") prefect_logger.debug("Ejecución completada correctamente ✅") return resultado.content.strip() except Exception as e: prefect_logger.error("Error al enviar el prompt al agente") traceback_str = traceback.format_exc() prefect_logger.error(f"Traceback: {traceback_str}") raise e # Conseguir datos de ventas a partir del SQL generado ############################## @task(name="Consigue_datos_ventas", log_prints=True) def Consigue_datos_ventas(sql_generado_por_agente: str, num_pagina_deseada: int = 1): prefect_logger = get_run_logger() prefect_logger.info("Iniciando consulta de datos de ventas en BigQuery...") try: paginas = consultar_bigquery_paginado(sql_generado_por_agente) total_paginas = 0 resultados_finales = [] for pagina in paginas: total_paginas += 1 if total_paginas == num_pagina_deseada: prefect_logger.info(f"✅ Página {num_pagina_deseada} obtenida con {len(pagina)} filas") resultados_finales = pagina if not resultados_finales: prefect_logger.warning(f"⚠️ No se encontró la página {num_pagina_deseada}. Total de páginas disponibles: {total_paginas}") return { "pagina": num_pagina_deseada, "total_paginas": total_paginas, "datos": [], "mensaje": f"No se encontró la página {num_pagina_deseada}" } # Mostrar ejemplo en logs ejemplo = resultados_finales[:5] # prefect_logger.info("Ejemplo de datos obtenidos:") # for fila in ejemplo: # prefect_logger.info(str(fila)) prefect_logger.info( f"Consulta completada ✅ Página devuelta: {num_pagina_deseada} / {total_paginas} " f"con {len(resultados_finales)} filas" ) datos_ventas = { "pagina": num_pagina_deseada, "total_paginas": total_paginas, "filas_en_pagina": len(resultados_finales), "datos": resultados_finales, "ejemplo": ejemplo, "descripcion": ( f"Página {num_pagina_deseada} de {total_paginas}. " f"Contiene {len(resultados_finales)} filas. " "Los datos son los resultados de la consulta SQL proporcionada." ) } prefect_logger.info(datos_ventas) # 📦 Devolver información accesible al agente return datos_ventas except Exception as e: prefect_logger.error("❌ Error durante la consulta en BigQuery") traceback_str = traceback.format_exc() prefect_logger.error(f"Traceback: {traceback_str}") raise e # Analizador de resultados ########################################################## @task(name="Analizador_de_resultados", log_prints=True) def Analizador_de_resultados(datos_ventas: dict, prompt_de_usuario: str): prefect_logger = get_run_logger() prefect_logger.info("Iniciando análisis de resultados de ventas...") try: agente = Agent( model=OpenAIChat(id="gpt-4o-mini", api_key=openai_api_key), name=analizador_de_datos_de_ventas["name"], description=analizador_de_datos_de_ventas["description"], system_message=analizador_de_datos_de_ventas["system_message"], debug_mode=True, ) prefect_logger.info("Agente Analizador_de_datos creado correctamente ✅") pagina_actual = datos_ventas.get("pagina", 1) total_paginas = datos_ventas.get("total_paginas", 1) filas_en_pagina = datos_ventas.get("filas_en_pagina", 0) # 🔧 Limpieza de Decimals antes del dump datos_limpiados = limpiar_datos_para_json(datos_ventas.get("datos", [])) datos_json = json.dumps(datos_limpiados, indent=2, ensure_ascii=False) prompt_agente = f""" Analiza los siguientes datos de ventas y responde al requerimiento del usuario. 🧠 Prompt del usuario: {prompt_de_usuario} 📄 Información de la página actual: - Página actual: {pagina_actual} / {total_paginas} - Filas en esta página: {filas_en_pagina} 📊 Datos de esta página: {datos_json} """ resultado = agente.run(prompt_agente) prefect_logger.info("✅ Análisis completado correctamente") prefect_logger.info("=== RESULTADO DEL ANÁLISIS ===") prefect_logger.info(resultado.content.strip()) prefect_logger.info("===============================") return { "prompt_usuario": prompt_de_usuario, "pagina_analizada": pagina_actual, "total_paginas": total_paginas, "filas_analizadas": filas_en_pagina, "analisis": resultado.content.strip(), "datos_analizados": datos_limpiados, } except Exception as e: prefect_logger.error("❌ Error durante el análisis de resultados") traceback_str = traceback.format_exc() prefect_logger.error(f"Traceback: {traceback_str}") raise e # FLUJO PRINCIPAL ########################################################## @flow(name="Agente_ventas", result_storage=local_file_system_block, log_prints=True) # type: ignore def Agente_ventas(PROMPT, num_pagina: int = 1): prefect_logger = get_run_logger() prefect_logger.info("🚀 Iniciando flujo Agente_ventas...") # 🧠 1. Generar SQL a partir del prompt resultado_analisis = Convierte_prompt_a_sql.submit(PROMPT).result() # 🧮 2. Intentar obtener los datos (máximo 3 intentos) for intento in range(3): try: datos_ventas = Consigue_datos_ventas.submit( resultado_analisis, num_pagina_deseada=num_pagina ).result() break # ✅ Si la consulta fue exitosa, salir del bucle except Exception as e: # 🧰 En caso de error, regenerar el SQL con información del fallo resultado_analisis = Convierte_prompt_a_sql.submit(f""" El SQL generado previamente daba error. El SQL era: {resultado_analisis}. El error fue: {str(e)}. Corrige el SQL para que no dé error y vuelve a generarlo. El prompt del usuario era: {PROMPT} """).result() if intento == 2: raise e # ❌ Si falla 3 veces, detener el flujo # 🧾 3. Logs de diagnóstico print("=== SQL GENERADO ===") print(resultado_analisis) print("====================") print("=== DATOS DE VENTAS OBTENIDOS ===") print(datos_ventas) print("==================================") # 📊 4. Analizar los resultados con el agente analista prefect_logger.info(f"🔎 Analizando los resultados de la página {num_pagina}...") analisis = Analizador_de_resultados.submit( datos_ventas, PROMPT ).result() # 🧠 5. Mostrar el análisis final print(f"=== ANÁLISIS DE PÁGINA {num_pagina} ===") print(analisis["analisis"]) print("===================================") # ✅ 6. Devolver resultados combinados return { "sql_generado": resultado_analisis, "analisis": analisis, } # Ejecución directa del flujo if __name__ == "__main__": # num_pagina=1 → analiza solo una página Agente_ventas(PROMPT, num_pagina=1)