Files
Egutierrez 52999ecb86 feat: media_analytics — ETL PC+VPS → ClickHouse + Grafana
2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula,
popelis users/mylist/events) a ClickHouse en el VPS, visualizado en
Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis
via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra.
2026-05-30 14:55:48 +02:00

140 lines
5.1 KiB
Python

#!/usr/bin/env python3
"""ETL VPS → ClickHouse (cada 5min). Corre EN el VPS datardos (no en el PC) porque
popelis-db (Postgres) solo es alcanzable en la red coolify. Lee popelis-db via
`docker exec popelis-db psql` y empuja a ClickHouse por su HTTP local (127.0.0.1:8123).
Standalone a proposito: el VPS no tiene el registry fn_registry checkouteado, asi que
no importa clickhouse_insert_rows_py_infra — replica el POST JSONEachRow minimal.
- users, mylist: snapshot completo cada run (snapshot_ts).
- events: incremental por id (> max(event_id) ya en CH; ReplacingMergeTree dedup).
Lee creds de /opt/analytics/.env (CH_PASSWORD). Pensado para systemd timer en el VPS.
Uso: sudo python3 etl_vps.py
"""
import json
import subprocess
import sys
import urllib.request
import urllib.parse
import urllib.error
from datetime import datetime, timezone
ENV = "/opt/analytics/.env"
CH_URL = "http://127.0.0.1:8123"
CH_USER = "analytics"
CH_DB = "analytics"
PG_CONTAINER = "popelis-db"
PG_USER = "popelis"
PG_DB = "popelis"
SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def ch_password():
with open(ENV) as f:
for line in f:
if line.startswith("CH_PASSWORD="):
return line.strip().split("=", 1)[1]
raise RuntimeError("CH_PASSWORD no en " + ENV)
CH_PASS = ch_password()
def pg_json(sql):
"""Ejecuta SQL en popelis-db y devuelve list[dict] (via json_agg)."""
wrapped = f"SELECT COALESCE(json_agg(t), '[]') FROM ({sql}) t"
out = subprocess.check_output(
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB,
"-tAc", wrapped], text=True)
return json.loads(out.strip() or "[]")
def ch_query(sql):
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'default_format': 'JSONEachRow'})}"
req = urllib.request.Request(url, data=sql.encode(),
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS})
with urllib.request.urlopen(req, timeout=30) as r:
body = r.read().decode()
return [json.loads(l) for l in body.splitlines() if l.strip()]
def ch_insert(table, rows):
if not rows:
return 0
body = "\n".join(json.dumps(r, separators=(",", ":")) for r in rows).encode()
q = f"INSERT INTO {CH_DB}.{table} FORMAT JSONEachRow"
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'query': q})}"
req = urllib.request.Request(url, data=body,
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS,
"Content-Type": "text/plain"})
try:
urllib.request.urlopen(req, timeout=30).read()
except urllib.error.HTTPError as e:
raise ValueError(f"CH {e.code}: {e.read()[:300]}")
return len(rows)
def fmt_ts(s):
if not s:
return "1970-01-01 00:00:00"
return str(s).replace("T", " ").split(".")[0].split("+")[0][:19]
def main():
total = 0
# users (snapshot)
users = pg_json("SELECT id AS user_id, username, jf_user_id, "
"to_char(COALESCE(created_at, now()),'YYYY-MM-DD HH24:MI:SS') AS created_at "
"FROM users") if has_col("users", "created_at") else \
pg_json("SELECT id AS user_id, username, jf_user_id FROM users")
for u in users:
u["snapshot_ts"] = SNAP
u.setdefault("created_at", "1970-01-01 00:00:00")
u["created_at"] = fmt_ts(u["created_at"])
total += ch_insert("popelis_users", users)
# mylist (snapshot)
ml = pg_json("SELECT user_id, item_id, "
"to_char(COALESCE(added_at, now()),'YYYY-MM-DD HH24:MI:SS') AS added_at FROM mylist") \
if has_col("mylist", "added_at") else \
pg_json("SELECT user_id, item_id FROM mylist")
for m in ml:
m["snapshot_ts"] = SNAP
m.setdefault("added_at", "1970-01-01 00:00:00")
m["added_at"] = fmt_ts(m["added_at"])
total += ch_insert("popelis_mylist", ml)
# events (incremental por id)
last = 0
try:
r = ch_query("SELECT max(event_id) AS m FROM popelis_events")
last = int(r[0]["m"]) if r and r[0].get("m") not in (None, "") else 0
except Exception:
last = 0
ev = pg_json(f"SELECT id AS event_id, "
f"to_char(ts,'YYYY-MM-DD HH24:MI:SS') AS event_ts, "
f"COALESCE(user_id,0) AS user_id, username, event_type, item_id, meta "
f"FROM events WHERE id > {last} ORDER BY id")
for e in ev:
e["event_ts"] = fmt_ts(e["event_ts"])
total += ch_insert("popelis_events", ev)
print(json.dumps({"snapshot_ts": SNAP, "users": len(users), "mylist": len(ml),
"events_new": len(ev), "inserted": total}))
def has_col(table, col):
try:
out = subprocess.check_output(
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc",
f"SELECT 1 FROM information_schema.columns WHERE table_name='{table}' AND column_name='{col}'"],
text=True).strip()
return out == "1"
except Exception:
return False
if __name__ == "__main__":
main()