feat: media_analytics — ETL PC+VPS → ClickHouse + Grafana
2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula, popelis users/mylist/events) a ClickHouse en el VPS, visualizado en Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra.
This commit is contained in:
+139
@@ -0,0 +1,139 @@
|
||||
#!/usr/bin/env python3
|
||||
"""ETL VPS → ClickHouse (cada 5min). Corre EN el VPS datardos (no en el PC) porque
|
||||
popelis-db (Postgres) solo es alcanzable en la red coolify. Lee popelis-db via
|
||||
`docker exec popelis-db psql` y empuja a ClickHouse por su HTTP local (127.0.0.1:8123).
|
||||
|
||||
Standalone a proposito: el VPS no tiene el registry fn_registry checkouteado, asi que
|
||||
no importa clickhouse_insert_rows_py_infra — replica el POST JSONEachRow minimal.
|
||||
|
||||
- users, mylist: snapshot completo cada run (snapshot_ts).
|
||||
- events: incremental por id (> max(event_id) ya en CH; ReplacingMergeTree dedup).
|
||||
|
||||
Lee creds de /opt/analytics/.env (CH_PASSWORD). Pensado para systemd timer en el VPS.
|
||||
Uso: sudo python3 etl_vps.py
|
||||
"""
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.parse
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone
|
||||
|
||||
ENV = "/opt/analytics/.env"
|
||||
CH_URL = "http://127.0.0.1:8123"
|
||||
CH_USER = "analytics"
|
||||
CH_DB = "analytics"
|
||||
PG_CONTAINER = "popelis-db"
|
||||
PG_USER = "popelis"
|
||||
PG_DB = "popelis"
|
||||
SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
|
||||
def ch_password():
|
||||
with open(ENV) as f:
|
||||
for line in f:
|
||||
if line.startswith("CH_PASSWORD="):
|
||||
return line.strip().split("=", 1)[1]
|
||||
raise RuntimeError("CH_PASSWORD no en " + ENV)
|
||||
|
||||
|
||||
CH_PASS = ch_password()
|
||||
|
||||
|
||||
def pg_json(sql):
|
||||
"""Ejecuta SQL en popelis-db y devuelve list[dict] (via json_agg)."""
|
||||
wrapped = f"SELECT COALESCE(json_agg(t), '[]') FROM ({sql}) t"
|
||||
out = subprocess.check_output(
|
||||
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB,
|
||||
"-tAc", wrapped], text=True)
|
||||
return json.loads(out.strip() or "[]")
|
||||
|
||||
|
||||
def ch_query(sql):
|
||||
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'default_format': 'JSONEachRow'})}"
|
||||
req = urllib.request.Request(url, data=sql.encode(),
|
||||
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS})
|
||||
with urllib.request.urlopen(req, timeout=30) as r:
|
||||
body = r.read().decode()
|
||||
return [json.loads(l) for l in body.splitlines() if l.strip()]
|
||||
|
||||
|
||||
def ch_insert(table, rows):
|
||||
if not rows:
|
||||
return 0
|
||||
body = "\n".join(json.dumps(r, separators=(",", ":")) for r in rows).encode()
|
||||
q = f"INSERT INTO {CH_DB}.{table} FORMAT JSONEachRow"
|
||||
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'query': q})}"
|
||||
req = urllib.request.Request(url, data=body,
|
||||
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS,
|
||||
"Content-Type": "text/plain"})
|
||||
try:
|
||||
urllib.request.urlopen(req, timeout=30).read()
|
||||
except urllib.error.HTTPError as e:
|
||||
raise ValueError(f"CH {e.code}: {e.read()[:300]}")
|
||||
return len(rows)
|
||||
|
||||
|
||||
def fmt_ts(s):
|
||||
if not s:
|
||||
return "1970-01-01 00:00:00"
|
||||
return str(s).replace("T", " ").split(".")[0].split("+")[0][:19]
|
||||
|
||||
|
||||
def main():
|
||||
total = 0
|
||||
# users (snapshot)
|
||||
users = pg_json("SELECT id AS user_id, username, jf_user_id, "
|
||||
"to_char(COALESCE(created_at, now()),'YYYY-MM-DD HH24:MI:SS') AS created_at "
|
||||
"FROM users") if has_col("users", "created_at") else \
|
||||
pg_json("SELECT id AS user_id, username, jf_user_id FROM users")
|
||||
for u in users:
|
||||
u["snapshot_ts"] = SNAP
|
||||
u.setdefault("created_at", "1970-01-01 00:00:00")
|
||||
u["created_at"] = fmt_ts(u["created_at"])
|
||||
total += ch_insert("popelis_users", users)
|
||||
|
||||
# mylist (snapshot)
|
||||
ml = pg_json("SELECT user_id, item_id, "
|
||||
"to_char(COALESCE(added_at, now()),'YYYY-MM-DD HH24:MI:SS') AS added_at FROM mylist") \
|
||||
if has_col("mylist", "added_at") else \
|
||||
pg_json("SELECT user_id, item_id FROM mylist")
|
||||
for m in ml:
|
||||
m["snapshot_ts"] = SNAP
|
||||
m.setdefault("added_at", "1970-01-01 00:00:00")
|
||||
m["added_at"] = fmt_ts(m["added_at"])
|
||||
total += ch_insert("popelis_mylist", ml)
|
||||
|
||||
# events (incremental por id)
|
||||
last = 0
|
||||
try:
|
||||
r = ch_query("SELECT max(event_id) AS m FROM popelis_events")
|
||||
last = int(r[0]["m"]) if r and r[0].get("m") not in (None, "") else 0
|
||||
except Exception:
|
||||
last = 0
|
||||
ev = pg_json(f"SELECT id AS event_id, "
|
||||
f"to_char(ts,'YYYY-MM-DD HH24:MI:SS') AS event_ts, "
|
||||
f"COALESCE(user_id,0) AS user_id, username, event_type, item_id, meta "
|
||||
f"FROM events WHERE id > {last} ORDER BY id")
|
||||
for e in ev:
|
||||
e["event_ts"] = fmt_ts(e["event_ts"])
|
||||
total += ch_insert("popelis_events", ev)
|
||||
|
||||
print(json.dumps({"snapshot_ts": SNAP, "users": len(users), "mylist": len(ml),
|
||||
"events_new": len(ev), "inserted": total}))
|
||||
|
||||
|
||||
def has_col(table, col):
|
||||
try:
|
||||
out = subprocess.check_output(
|
||||
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc",
|
||||
f"SELECT 1 FROM information_schema.columns WHERE table_name='{table}' AND column_name='{col}'"],
|
||||
text=True).strip()
|
||||
return out == "1"
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user