Add initial implementation for weather data retrieval and storage
- Create .env file for environment variables - Add .gitignore to exclude virtual environment - Implement weatherstack.py to fetch weather data from API and store it in MongoDB - Define tasks for data retrieval, processing, and storage using Prefect
This commit is contained in:
@@ -0,0 +1,5 @@
|
|||||||
|
API_URL=http://api.weatherstack.com/current
|
||||||
|
API_KEY=d46d7419401ae4d339037fd5be77e5d7
|
||||||
|
MONGO_URI=mongodb://admin:iO6RQO3xPoXvpm7BXUKZAjhijYLssQ@10.8.0.3:27017/
|
||||||
|
DB_NAME=weather_data
|
||||||
|
COLLECTION_NAME=current_weather
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
venv
|
||||||
Binary file not shown.
@@ -0,0 +1,97 @@
|
|||||||
|
import requests
|
||||||
|
from pymongo import MongoClient
|
||||||
|
from prefect import flow, task
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from prefect.logging import get_run_logger
|
||||||
|
|
||||||
|
# Cargar variables de entorno desde un archivo .env
|
||||||
|
load_dotenv()
|
||||||
|
from pathlib import Path
|
||||||
|
import os
|
||||||
|
|
||||||
|
# Configuración para la API y MongoDB
|
||||||
|
API_URL = os.getenv("API_URL", "http://api.weatherstack.com/current")
|
||||||
|
API_KEY = os.getenv("API_KEY", "tu_api_key_aqui")
|
||||||
|
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
|
||||||
|
DB_NAME = os.getenv("DB_NAME", "weather_data")
|
||||||
|
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "current_weather")
|
||||||
|
|
||||||
|
@task
|
||||||
|
def obtener_datos_climaticos(ciudad):
|
||||||
|
"""Consulta la API de Weatherstack para obtener datos climáticos de una ciudad."""
|
||||||
|
logger = get_run_logger()
|
||||||
|
logger.info(f"Iniciando la consulta de datos climáticos para la ciudad: {ciudad}")
|
||||||
|
params = {
|
||||||
|
"access_key": API_KEY,
|
||||||
|
"query": ciudad
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.get(API_URL, params=params)
|
||||||
|
response.raise_for_status() # Levanta un error si la solicitud falla
|
||||||
|
logger.info(f"Datos climáticos recibidos correctamente para {ciudad}.")
|
||||||
|
return response.json()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al obtener datos climáticos: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@task
|
||||||
|
def procesar_datos(datos):
|
||||||
|
"""Procesa los datos recibidos de la API."""
|
||||||
|
logger = get_run_logger()
|
||||||
|
logger.info("Iniciando el procesamiento de datos climáticos.")
|
||||||
|
try:
|
||||||
|
if "current" not in datos:
|
||||||
|
raise ValueError("Datos incompletos recibidos de la API.")
|
||||||
|
clima = {
|
||||||
|
"ciudad": datos["location"]["name"],
|
||||||
|
"pais": datos["location"]["country"],
|
||||||
|
"temperatura": datos["current"]["temperature"],
|
||||||
|
"clima_descripcion": datos["current"]["weather_descriptions"],
|
||||||
|
"fecha_consulta": datos["location"]["localtime"]
|
||||||
|
}
|
||||||
|
logger.info(f"Datos procesados correctamente: {clima}")
|
||||||
|
return clima
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al procesar los datos: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@task
|
||||||
|
def guardar_en_mongodb(datos):
|
||||||
|
"""Guarda los datos procesados en MongoDB."""
|
||||||
|
logger = get_run_logger()
|
||||||
|
logger.info("Guardando datos en MongoDB.")
|
||||||
|
try:
|
||||||
|
client = MongoClient(MONGO_URI)
|
||||||
|
db = client[DB_NAME]
|
||||||
|
collection = db[COLLECTION_NAME]
|
||||||
|
result = collection.insert_one(datos)
|
||||||
|
logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}")
|
||||||
|
return result.inserted_id
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error al guardar datos en MongoDB: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
@flow
|
||||||
|
def pipeline_climatico(ciudad):
|
||||||
|
"""Flujo principal que consume la API y almacena datos en MongoDB."""
|
||||||
|
logger = get_run_logger()
|
||||||
|
logger.info(f"Iniciando el pipeline para la ciudad: {ciudad}")
|
||||||
|
try:
|
||||||
|
datos_api = obtener_datos_climaticos(ciudad)
|
||||||
|
datos_procesados = procesar_datos(datos_api)
|
||||||
|
id_insertado = guardar_en_mongodb(datos_procesados)
|
||||||
|
logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error en el pipeline: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
|
# Configura y despliega el flujo
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pipeline_climatico.from_source(
|
||||||
|
source=str(Path(__file__).parent), # Código almacenado en el directorio local
|
||||||
|
entrypoint="E:/Proyects/API_diverse_consumption/weatherstack.py:pipeline_climatico",
|
||||||
|
).deploy(
|
||||||
|
name="Conseguir_datos_climaticos",
|
||||||
|
parameters={"ciudad": "Madrid"},
|
||||||
|
work_pool_name="Workers_pc_torre_Lucas"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user