#!/usr/bin/env python3 import os import csv import subprocess import platform from pathlib import Path from telegram_utils import send_message ENV_FILE = Path("/home/ubuntu/proyects/healthchecks/.env") HOSTS_FILE = Path("/home/ubuntu/proyects/healthchecks/hosts.csv") def load_env_file(path: Path) -> None: if not path.exists(): raise FileNotFoundError( f"No se encontró el archivo de entorno: {path}" ) with path.open(encoding="utf-8") as env_file: for raw_line in env_file: line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip().strip('"').strip("'") if key and key not in os.environ: os.environ[key] = value def ping_host(host: str) -> bool: """ Hace ping a un host y retorna True si está activo, False si no responde. """ # Determinar el comando ping según el sistema operativo if platform.system().lower() == "windows": command = ["ping", "-n", "1", "-w", "3000", host] else: command = ["ping", "-c", "1", "-W", "3", host] try: result = subprocess.run( command, capture_output=True, text=True, timeout=10 ) return result.returncode == 0 except (subprocess.TimeoutExpired, subprocess.SubprocessError): return False def load_hosts(csv_file: Path) -> list[dict]: """ Carga los hosts desde el archivo CSV. """ if not csv_file.exists(): raise FileNotFoundError(f"No se encontró el archivo: {csv_file}") hosts = [] with csv_file.open(encoding="utf-8") as file: reader = csv.DictReader(file) for row in reader: hosts.append({ "nombre": row["Nombre"].strip(), "host": row["Host"].strip() }) return hosts def send_alert(api_key: str, chat_id: str, name: str, host: str) -> None: """ Envía una alerta por Telegram cuando un host está caído. """ message = f"Ey, {name} con la direccion {host} está caido, ten cuidado" try: send_message(api_key, chat_id, message) print(f"Alerta enviada para {name} ({host})") except Exception as e: print(f"Error enviando alerta para {name} ({host}): {e}") def main() -> None: # Cargar variables de entorno load_env_file(ENV_FILE) api_key = os.environ.get("TELEGRAM_API_KEY") chat_id = os.environ.get("TELEGRAM_CHAT_ID") if not api_key or not chat_id: raise RuntimeError( "Asegúrate de definir TELEGRAM_API_KEY y TELEGRAM_CHAT_ID en tu .env." ) # Cargar hosts desde CSV try: hosts = load_hosts(HOSTS_FILE) except FileNotFoundError as e: print(f"Error: {e}") return print(f"Monitoreando {len(hosts)} hosts...") # Verificar cada host for host_info in hosts: name = host_info["nombre"] host = host_info["host"] print(f"Verificando {name} ({host})...", end=" ") if ping_host(host): print("✓ Activo") else: print("✗ Caído") send_alert(api_key, chat_id, name, host) print("Monitoreo completado.") if __name__ == "__main__": main()