import requests from pymongo import MongoClient from prefect import flow, task from dotenv import load_dotenv from prefect.logging import get_run_logger import os from pathlib import Path from datetime import datetime # Cargar variables de entorno desde el archivo .env load_dotenv() # Configuración para la API de OpenWeatherMap y MongoDB MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") DB_NAME = os.getenv("DB_NAME", "weather_data") COLLECTION_NAME = "tiempo_Lucero" @task def obtener_datos_climaticos(latitud, longitud): """Consulta la API de Open-Meteo para obtener datos climáticos de una ubicación.""" logger = get_run_logger() logger.info(f"Consultando datos climáticos para la ubicación: lat={latitud}, lon={longitud}") url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": latitud, "longitude": longitud, "current_weather": True, "timezone": "Europe/Madrid" } try: response = requests.get(url, params=params) response.raise_for_status() logger.info(f"Datos climáticos recibidos correctamente para la ubicación: lat={latitud}, lon={longitud}.") return response.json() except Exception as e: logger.error(f"Error al obtener datos climáticos: {e}") raise @task def guardar_en_mongodb(datos): """Guarda los datos recibidos en MongoDB.""" logger = get_run_logger() logger.info("Guardando datos en MongoDB.") try: client = MongoClient(MONGO_URI) db = client[DB_NAME] collection = db[COLLECTION_NAME] # Añadir una marca de tiempo a los datos datos["timestamp"] = datetime.utcnow() result = collection.insert_one(datos) logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}") return result.inserted_id except Exception as e: logger.error(f"Error al guardar datos en MongoDB: {e}") raise @flow def open_meteo_auto(latitud, longitud): """Flujo principal que consume la API y almacena datos en MongoDB.""" logger = get_run_logger() logger.info(f"Iniciando el pipeline para la ubicación: lat={latitud}, lon={longitud}") try: datos_api = obtener_datos_climaticos(latitud, longitud) id_insertado = guardar_en_mongodb(datos_api) logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}") except Exception as e: logger.error(f"Error en el pipeline: {e}") raise # Configura y despliega el flujo if __name__ == "__main__": open_meteo_auto.from_source( source=str(Path(__file__).parent), # Código almacenado en el directorio local entrypoint="E:/Proyects/API_diverse_consumption/open_meteo_auto.py:open_meteo_auto", ).deploy( name="OpenMeteo_Lucero", parameters={"latitud": 40.391889, "longitud": -3.745}, # Coordenadas de Lucero, Madrid (decimales) work_pool_name="Workers_pc_torre_Lucas" )