52999ecb86
2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula, popelis users/mylist/events) a ClickHouse en el VPS, visualizado en Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra.
140 lines
5.1 KiB
Python
140 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""ETL VPS → ClickHouse (cada 5min). Corre EN el VPS datardos (no en el PC) porque
|
|
popelis-db (Postgres) solo es alcanzable en la red coolify. Lee popelis-db via
|
|
`docker exec popelis-db psql` y empuja a ClickHouse por su HTTP local (127.0.0.1:8123).
|
|
|
|
Standalone a proposito: el VPS no tiene el registry fn_registry checkouteado, asi que
|
|
no importa clickhouse_insert_rows_py_infra — replica el POST JSONEachRow minimal.
|
|
|
|
- users, mylist: snapshot completo cada run (snapshot_ts).
|
|
- events: incremental por id (> max(event_id) ya en CH; ReplacingMergeTree dedup).
|
|
|
|
Lee creds de /opt/analytics/.env (CH_PASSWORD). Pensado para systemd timer en el VPS.
|
|
Uso: sudo python3 etl_vps.py
|
|
"""
|
|
import json
|
|
import subprocess
|
|
import sys
|
|
import urllib.request
|
|
import urllib.parse
|
|
import urllib.error
|
|
from datetime import datetime, timezone
|
|
|
|
ENV = "/opt/analytics/.env"
|
|
CH_URL = "http://127.0.0.1:8123"
|
|
CH_USER = "analytics"
|
|
CH_DB = "analytics"
|
|
PG_CONTAINER = "popelis-db"
|
|
PG_USER = "popelis"
|
|
PG_DB = "popelis"
|
|
SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def ch_password():
|
|
with open(ENV) as f:
|
|
for line in f:
|
|
if line.startswith("CH_PASSWORD="):
|
|
return line.strip().split("=", 1)[1]
|
|
raise RuntimeError("CH_PASSWORD no en " + ENV)
|
|
|
|
|
|
CH_PASS = ch_password()
|
|
|
|
|
|
def pg_json(sql):
|
|
"""Ejecuta SQL en popelis-db y devuelve list[dict] (via json_agg)."""
|
|
wrapped = f"SELECT COALESCE(json_agg(t), '[]') FROM ({sql}) t"
|
|
out = subprocess.check_output(
|
|
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB,
|
|
"-tAc", wrapped], text=True)
|
|
return json.loads(out.strip() or "[]")
|
|
|
|
|
|
def ch_query(sql):
|
|
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'default_format': 'JSONEachRow'})}"
|
|
req = urllib.request.Request(url, data=sql.encode(),
|
|
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS})
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
body = r.read().decode()
|
|
return [json.loads(l) for l in body.splitlines() if l.strip()]
|
|
|
|
|
|
def ch_insert(table, rows):
|
|
if not rows:
|
|
return 0
|
|
body = "\n".join(json.dumps(r, separators=(",", ":")) for r in rows).encode()
|
|
q = f"INSERT INTO {CH_DB}.{table} FORMAT JSONEachRow"
|
|
url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'query': q})}"
|
|
req = urllib.request.Request(url, data=body,
|
|
headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS,
|
|
"Content-Type": "text/plain"})
|
|
try:
|
|
urllib.request.urlopen(req, timeout=30).read()
|
|
except urllib.error.HTTPError as e:
|
|
raise ValueError(f"CH {e.code}: {e.read()[:300]}")
|
|
return len(rows)
|
|
|
|
|
|
def fmt_ts(s):
|
|
if not s:
|
|
return "1970-01-01 00:00:00"
|
|
return str(s).replace("T", " ").split(".")[0].split("+")[0][:19]
|
|
|
|
|
|
def main():
|
|
total = 0
|
|
# users (snapshot)
|
|
users = pg_json("SELECT id AS user_id, username, jf_user_id, "
|
|
"to_char(COALESCE(created_at, now()),'YYYY-MM-DD HH24:MI:SS') AS created_at "
|
|
"FROM users") if has_col("users", "created_at") else \
|
|
pg_json("SELECT id AS user_id, username, jf_user_id FROM users")
|
|
for u in users:
|
|
u["snapshot_ts"] = SNAP
|
|
u.setdefault("created_at", "1970-01-01 00:00:00")
|
|
u["created_at"] = fmt_ts(u["created_at"])
|
|
total += ch_insert("popelis_users", users)
|
|
|
|
# mylist (snapshot)
|
|
ml = pg_json("SELECT user_id, item_id, "
|
|
"to_char(COALESCE(added_at, now()),'YYYY-MM-DD HH24:MI:SS') AS added_at FROM mylist") \
|
|
if has_col("mylist", "added_at") else \
|
|
pg_json("SELECT user_id, item_id FROM mylist")
|
|
for m in ml:
|
|
m["snapshot_ts"] = SNAP
|
|
m.setdefault("added_at", "1970-01-01 00:00:00")
|
|
m["added_at"] = fmt_ts(m["added_at"])
|
|
total += ch_insert("popelis_mylist", ml)
|
|
|
|
# events (incremental por id)
|
|
last = 0
|
|
try:
|
|
r = ch_query("SELECT max(event_id) AS m FROM popelis_events")
|
|
last = int(r[0]["m"]) if r and r[0].get("m") not in (None, "") else 0
|
|
except Exception:
|
|
last = 0
|
|
ev = pg_json(f"SELECT id AS event_id, "
|
|
f"to_char(ts,'YYYY-MM-DD HH24:MI:SS') AS event_ts, "
|
|
f"COALESCE(user_id,0) AS user_id, username, event_type, item_id, meta "
|
|
f"FROM events WHERE id > {last} ORDER BY id")
|
|
for e in ev:
|
|
e["event_ts"] = fmt_ts(e["event_ts"])
|
|
total += ch_insert("popelis_events", ev)
|
|
|
|
print(json.dumps({"snapshot_ts": SNAP, "users": len(users), "mylist": len(ml),
|
|
"events_new": len(ev), "inserted": total}))
|
|
|
|
|
|
def has_col(table, col):
|
|
try:
|
|
out = subprocess.check_output(
|
|
["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc",
|
|
f"SELECT 1 FROM information_schema.columns WHERE table_name='{table}' AND column_name='{col}'"],
|
|
text=True).strip()
|
|
return out == "1"
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|