diff --git a/.env b/.env new file mode 100644 index 0000000..633e31a --- /dev/null +++ b/.env @@ -0,0 +1,5 @@ +API_URL=http://api.weatherstack.com/current +API_KEY=d46d7419401ae4d339037fd5be77e5d7 +MONGO_URI=mongodb://admin:iO6RQO3xPoXvpm7BXUKZAjhijYLssQ@10.8.0.3:27017/ +DB_NAME=weather_data +COLLECTION_NAME=current_weather \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f5e96db --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +venv \ No newline at end of file diff --git a/__pycache__/weatherstack.cpython-312.pyc b/__pycache__/weatherstack.cpython-312.pyc new file mode 100644 index 0000000..09f226a Binary files /dev/null and b/__pycache__/weatherstack.cpython-312.pyc differ diff --git a/weatherstack.py b/weatherstack.py new file mode 100644 index 0000000..1013f4c --- /dev/null +++ b/weatherstack.py @@ -0,0 +1,97 @@ +import requests +from pymongo import MongoClient +from prefect import flow, task +from dotenv import load_dotenv +from prefect.logging import get_run_logger + +# Cargar variables de entorno desde un archivo .env +load_dotenv() +from pathlib import Path +import os + +# Configuración para la API y MongoDB +API_URL = os.getenv("API_URL", "http://api.weatherstack.com/current") +API_KEY = os.getenv("API_KEY", "tu_api_key_aqui") +MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") +DB_NAME = os.getenv("DB_NAME", "weather_data") +COLLECTION_NAME = os.getenv("COLLECTION_NAME", "current_weather") + +@task +def obtener_datos_climaticos(ciudad): + """Consulta la API de Weatherstack para obtener datos climáticos de una ciudad.""" + logger = get_run_logger() + logger.info(f"Iniciando la consulta de datos climáticos para la ciudad: {ciudad}") + params = { + "access_key": API_KEY, + "query": ciudad + } + try: + response = requests.get(API_URL, params=params) + response.raise_for_status() # Levanta un error si la solicitud falla + logger.info(f"Datos climáticos recibidos correctamente para {ciudad}.") + return response.json() + except Exception as e: + logger.error(f"Error al obtener datos climáticos: {e}") + raise + +@task +def procesar_datos(datos): + """Procesa los datos recibidos de la API.""" + logger = get_run_logger() + logger.info("Iniciando el procesamiento de datos climáticos.") + try: + if "current" not in datos: + raise ValueError("Datos incompletos recibidos de la API.") + clima = { + "ciudad": datos["location"]["name"], + "pais": datos["location"]["country"], + "temperatura": datos["current"]["temperature"], + "clima_descripcion": datos["current"]["weather_descriptions"], + "fecha_consulta": datos["location"]["localtime"] + } + logger.info(f"Datos procesados correctamente: {clima}") + return clima + except Exception as e: + logger.error(f"Error al procesar los datos: {e}") + raise + +@task +def guardar_en_mongodb(datos): + """Guarda los datos procesados en MongoDB.""" + logger = get_run_logger() + logger.info("Guardando datos en MongoDB.") + try: + client = MongoClient(MONGO_URI) + db = client[DB_NAME] + collection = db[COLLECTION_NAME] + result = collection.insert_one(datos) + logger.info(f"Datos guardados en MongoDB con ID: {result.inserted_id}") + return result.inserted_id + except Exception as e: + logger.error(f"Error al guardar datos en MongoDB: {e}") + raise + +@flow +def pipeline_climatico(ciudad): + """Flujo principal que consume la API y almacena datos en MongoDB.""" + logger = get_run_logger() + logger.info(f"Iniciando el pipeline para la ciudad: {ciudad}") + try: + datos_api = obtener_datos_climaticos(ciudad) + datos_procesados = procesar_datos(datos_api) + id_insertado = guardar_en_mongodb(datos_procesados) + logger.info(f"Pipeline completado. Datos insertados con ID: {id_insertado}") + except Exception as e: + logger.error(f"Error en el pipeline: {e}") + raise + +# Configura y despliega el flujo +if __name__ == "__main__": + pipeline_climatico.from_source( + source=str(Path(__file__).parent), # Código almacenado en el directorio local + entrypoint="E:/Proyects/API_diverse_consumption/weatherstack.py:pipeline_climatico", + ).deploy( + name="Conseguir_datos_climaticos", + parameters={"ciudad": "Madrid"}, + work_pool_name="Workers_pc_torre_Lucas" + )