import requests from pymongo import MongoClient from prefect import flow, task from dotenv import load_dotenv from prefect.logging import get_run_logger # Cargar variables de entorno desde un archivo .env load_dotenv() from pathlib import Path import os # Configuración para la API y MongoDB API_URL = os.getenv("API_URL", "http://api.weatherstack.com/current") API_KEY = os.getenv("API_KEY", "tu_api_key_aqui") MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") DB_NAME = os.getenv("DB_NAME", "weather_data") COLLECTION_NAME = os.getenv("COLLECTION_NAME", "current_weather") @task def obtener_datos_climaticos(ciudad): """Consulta la API de Weatherstack para obtener datos climáticos de una ciudad.""" logger = get_run_logger() logger.info(f"Iniciando la consulta de datos climáticos para la ciudad: {ciudad}") params = { "access_key": API_KEY, "query": ciudad } try: response = requests.get(API_URL, params=params) response.raise_for_status() # Levanta un error si la solicitud falla logger.info(f"Datos climáticos recibidos correctamente para {ciudad}.") return response.json() except Exception as e: logger.error(f"Error al obtener datos climáticos: {e}") raise @task def procesar_datos(datos): """Procesa los datos recibidos de la API.""" logger = get_run_logger() logger.info("Iniciando el procesamiento de datos climáticos.") try: if "current" not in datos: raise ValueError("Datos incompletos recibidos de la API.") clima = { "ciudad": datos["location"]["name"], "pais": datos["location"]["country"], "temperatura": datos["current"]["temperature"], "clima_descripcion": datos["current"]["weather_descriptions"], "fecha_consulta": datos["location"]["localtime"] } logger.info(f"Datos procesados correctamente: {clima}") return clima except Exception as e: logger.error(f"Error al procesar los datos: {e}") raise @task def guardar_en_mongodb(datos): """Guarda los datos procesados en MongoDB.""" logger = get_run_logger() logger.info("Guardando datos en MongoDB.") try: client = MongoClient(MONGO_URI) db = client[DB_NAME] collection = db[COLLECTION_NAME] result = collection.insert_one(datos) logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}") return result.inserted_id except Exception as e: logger.error(f"Error al guardar datos en MongoDB: {e}") raise @flow def pipeline_climatico(ciudad): """Flujo principal que consume la API y almacena datos en MongoDB.""" logger = get_run_logger() logger.info(f"Iniciando el pipeline para la ciudad: {ciudad}") try: datos_api = obtener_datos_climaticos(ciudad) datos_procesados = procesar_datos(datos_api) id_insertado = guardar_en_mongodb(datos_procesados) logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}") except Exception as e: logger.error(f"Error en el pipeline: {e}") raise # Configura y despliega el flujo if __name__ == "__main__": pipeline_climatico.from_source( source=str(Path(__file__).parent), # Código almacenado en el directorio local entrypoint="E:/Proyects/API_diverse_consumption/weatherstack.py:pipeline_climatico", ).deploy( name="Conseguir_datos_climaticos", parameters={"ciudad": "Madrid"}, work_pool_name="Workers_pc_torre_Lucas" )