This repository has been archived on 2025-11-27. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files

82 lines
2.9 KiB
Python

import requests
from pymongo import MongoClient
from prefect import flow, task
from dotenv import load_dotenv
from prefect.logging import get_run_logger
import os
from pathlib import Path
from datetime import datetime
# Cargar variables de entorno desde el archivo .env
load_dotenv()
# Configuración para la API de OpenWeatherMap y MongoDB
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
DB_NAME = os.getenv("DB_NAME", "weather_data")
COLLECTION_NAME = "tiempo_Lucero"
@task
def obtener_datos_climaticos(latitud, longitud):
"""Consulta la API de Open-Meteo para obtener datos climáticos de una ubicación."""
logger = get_run_logger()
logger.info(f"Consultando datos climáticos para la ubicación: lat={latitud}, lon={longitud}")
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": latitud,
"longitude": longitud,
"current_weather": True,
"timezone": "Europe/Madrid"
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
logger.info(f"Datos climáticos recibidos correctamente para la ubicación: lat={latitud}, lon={longitud}.")
return response.json()
except Exception as e:
logger.error(f"Error al obtener datos climáticos: {e}")
raise
@task
def guardar_en_mongodb(datos):
"""Guarda los datos recibidos en MongoDB."""
logger = get_run_logger()
logger.info("Guardando datos en MongoDB.")
try:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# Añadir una marca de tiempo a los datos
datos["timestamp"] = datetime.utcnow()
result = collection.insert_one(datos)
logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Error al guardar datos en MongoDB: {e}")
raise
@flow
def open_meteo_auto(latitud, longitud):
"""Flujo principal que consume la API y almacena datos en MongoDB."""
logger = get_run_logger()
logger.info(f"Iniciando el pipeline para la ubicación: lat={latitud}, lon={longitud}")
try:
datos_api = obtener_datos_climaticos(latitud, longitud)
id_insertado = guardar_en_mongodb(datos_api)
logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}")
except Exception as e:
logger.error(f"Error en el pipeline: {e}")
raise
# Configura y despliega el flujo
if __name__ == "__main__":
open_meteo_auto.from_source(
source=str(Path(__file__).parent), # Código almacenado en el directorio local
entrypoint="E:/Proyects/API_diverse_consumption/open_meteo_auto.py:open_meteo_auto",
).deploy(
name="OpenMeteo_Lucero",
parameters={"latitud": 40.391889, "longitud": -3.745}, # Coordenadas de Lucero, Madrid (decimales)
work_pool_name="Workers_pc_torre_Lucas"
)