Files
healthchecks/monitor_hosts.py
2025-11-04 12:56:53 +00:00

123 lines
3.3 KiB
Python

#!/usr/bin/env python3
import os
import csv
import subprocess
import platform
from pathlib import Path
from telegram_utils import send_message
ENV_FILE = Path("/home/ubuntu/proyects/healthchecks/.env")
HOSTS_FILE = Path("/home/ubuntu/proyects/healthchecks/hosts.csv")
def load_env_file(path: Path) -> None:
if not path.exists():
raise FileNotFoundError(
f"No se encontró el archivo de entorno: {path}"
)
with path.open(encoding="utf-8") as env_file:
for raw_line in env_file:
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and key not in os.environ:
os.environ[key] = value
def ping_host(host: str) -> bool:
"""
Hace ping a un host y retorna True si está activo, False si no responde.
"""
# Determinar el comando ping según el sistema operativo
if platform.system().lower() == "windows":
command = ["ping", "-n", "1", "-w", "3000", host]
else:
command = ["ping", "-c", "1", "-W", "3", host]
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except (subprocess.TimeoutExpired, subprocess.SubprocessError):
return False
def load_hosts(csv_file: Path) -> list[dict]:
"""
Carga los hosts desde el archivo CSV.
"""
if not csv_file.exists():
raise FileNotFoundError(f"No se encontró el archivo: {csv_file}")
hosts = []
with csv_file.open(encoding="utf-8") as file:
reader = csv.DictReader(file)
for row in reader:
hosts.append({
"nombre": row["Nombre"].strip(),
"host": row["Host"].strip()
})
return hosts
def send_alert(api_key: str, chat_id: str, name: str, host: str) -> None:
"""
Envía una alerta por Telegram cuando un host está caído.
"""
message = f"Ey, {name} con la direccion {host} está caido, ten cuidado"
try:
send_message(api_key, chat_id, message)
print(f"Alerta enviada para {name} ({host})")
except Exception as e:
print(f"Error enviando alerta para {name} ({host}): {e}")
def main() -> None:
# Cargar variables de entorno
load_env_file(ENV_FILE)
api_key = os.environ.get("TELEGRAM_API_KEY")
chat_id = os.environ.get("TELEGRAM_CHAT_ID")
if not api_key or not chat_id:
raise RuntimeError(
"Asegúrate de definir TELEGRAM_API_KEY y TELEGRAM_CHAT_ID en tu .env."
)
# Cargar hosts desde CSV
try:
hosts = load_hosts(HOSTS_FILE)
except FileNotFoundError as e:
print(f"Error: {e}")
return
print(f"Monitoreando {len(hosts)} hosts...")
# Verificar cada host
for host_info in hosts:
name = host_info["nombre"]
host = host_info["host"]
print(f"Verificando {name} ({host})...", end=" ")
if ping_host(host):
print("✓ Activo")
else:
print("✗ Caído")
send_alert(api_key, chat_id, name, host)
print("Monitoreo completado.")
if __name__ == "__main__":
main()