82 lines
2.9 KiB
Python
82 lines
2.9 KiB
Python
import requests
|
|
from pymongo import MongoClient
|
|
from prefect import flow, task
|
|
from dotenv import load_dotenv
|
|
from prefect.logging import get_run_logger
|
|
import os
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
|
|
# Cargar variables de entorno desde el archivo .env
|
|
load_dotenv()
|
|
|
|
# Configuración para la API de OpenWeatherMap y MongoDB
|
|
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
|
|
DB_NAME = os.getenv("DB_NAME", "weather_data")
|
|
COLLECTION_NAME = "tiempo_Lucero"
|
|
|
|
@task
|
|
def obtener_datos_climaticos(latitud, longitud):
|
|
"""Consulta la API de Open-Meteo para obtener datos climáticos de una ubicación."""
|
|
logger = get_run_logger()
|
|
logger.info(f"Consultando datos climáticos para la ubicación: lat={latitud}, lon={longitud}")
|
|
url = "https://api.open-meteo.com/v1/forecast"
|
|
params = {
|
|
"latitude": latitud,
|
|
"longitude": longitud,
|
|
"current_weather": True,
|
|
"timezone": "Europe/Madrid"
|
|
}
|
|
try:
|
|
response = requests.get(url, params=params)
|
|
response.raise_for_status()
|
|
logger.info(f"Datos climáticos recibidos correctamente para la ubicación: lat={latitud}, lon={longitud}.")
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error al obtener datos climáticos: {e}")
|
|
raise
|
|
|
|
@task
|
|
def guardar_en_mongodb(datos):
|
|
"""Guarda los datos recibidos en MongoDB."""
|
|
logger = get_run_logger()
|
|
logger.info("Guardando datos en MongoDB.")
|
|
try:
|
|
client = MongoClient(MONGO_URI)
|
|
db = client[DB_NAME]
|
|
collection = db[COLLECTION_NAME]
|
|
# Añadir una marca de tiempo a los datos
|
|
datos["timestamp"] = datetime.utcnow()
|
|
result = collection.insert_one(datos)
|
|
logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}")
|
|
return result.inserted_id
|
|
except Exception as e:
|
|
logger.error(f"Error al guardar datos en MongoDB: {e}")
|
|
raise
|
|
|
|
@flow
|
|
def open_meteo_auto(latitud, longitud):
|
|
"""Flujo principal que consume la API y almacena datos en MongoDB."""
|
|
logger = get_run_logger()
|
|
logger.info(f"Iniciando el pipeline para la ubicación: lat={latitud}, lon={longitud}")
|
|
try:
|
|
datos_api = obtener_datos_climaticos(latitud, longitud)
|
|
id_insertado = guardar_en_mongodb(datos_api)
|
|
logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}")
|
|
except Exception as e:
|
|
logger.error(f"Error en el pipeline: {e}")
|
|
raise
|
|
|
|
|
|
# Configura y despliega el flujo
|
|
if __name__ == "__main__":
|
|
open_meteo_auto.from_source(
|
|
source=str(Path(__file__).parent), # Código almacenado en el directorio local
|
|
entrypoint="E:/Proyects/API_diverse_consumption/open_meteo_auto.py:open_meteo_auto",
|
|
).deploy(
|
|
name="OpenMeteo_Lucero",
|
|
parameters={"latitud": 40.391889, "longitud": -3.745}, # Coordenadas de Lucero, Madrid (decimales)
|
|
work_pool_name="Workers_pc_torre_Lucas"
|
|
)
|