#!/usr/bin/env python3 """ETL VPS → ClickHouse (cada 5min). Corre EN el VPS datardos (no en el PC) porque popelis-db (Postgres) solo es alcanzable en la red coolify. Lee popelis-db via `docker exec popelis-db psql` y empuja a ClickHouse por su HTTP local (127.0.0.1:8123). Standalone a proposito: el VPS no tiene el registry fn_registry checkouteado, asi que no importa clickhouse_insert_rows_py_infra — replica el POST JSONEachRow minimal. - users, mylist: snapshot completo cada run (snapshot_ts). - events: incremental por id (> max(event_id) ya en CH; ReplacingMergeTree dedup). Lee creds de /opt/analytics/.env (CH_PASSWORD). Pensado para systemd timer en el VPS. Uso: sudo python3 etl_vps.py """ import json import subprocess import sys import urllib.request import urllib.parse import urllib.error from datetime import datetime, timezone ENV = "/opt/analytics/.env" CH_URL = "http://127.0.0.1:8123" CH_USER = "analytics" CH_DB = "analytics" PG_CONTAINER = "popelis-db" PG_USER = "popelis" PG_DB = "popelis" SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") def ch_password(): with open(ENV) as f: for line in f: if line.startswith("CH_PASSWORD="): return line.strip().split("=", 1)[1] raise RuntimeError("CH_PASSWORD no en " + ENV) CH_PASS = ch_password() def pg_json(sql): """Ejecuta SQL en popelis-db y devuelve list[dict] (via json_agg).""" wrapped = f"SELECT COALESCE(json_agg(t), '[]') FROM ({sql}) t" out = subprocess.check_output( ["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc", wrapped], text=True) return json.loads(out.strip() or "[]") def ch_query(sql): url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'default_format': 'JSONEachRow'})}" req = urllib.request.Request(url, data=sql.encode(), headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS}) with urllib.request.urlopen(req, timeout=30) as r: body = r.read().decode() return [json.loads(l) for l in body.splitlines() if l.strip()] def ch_insert(table, rows): if not rows: return 0 body = "\n".join(json.dumps(r, separators=(",", ":")) for r in rows).encode() q = f"INSERT INTO {CH_DB}.{table} FORMAT JSONEachRow" url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'query': q})}" req = urllib.request.Request(url, data=body, headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS, "Content-Type": "text/plain"}) try: urllib.request.urlopen(req, timeout=30).read() except urllib.error.HTTPError as e: raise ValueError(f"CH {e.code}: {e.read()[:300]}") return len(rows) def fmt_ts(s): if not s: return "1970-01-01 00:00:00" return str(s).replace("T", " ").split(".")[0].split("+")[0][:19] def main(): total = 0 # users (snapshot) users = pg_json("SELECT id AS user_id, username, jf_user_id, " "to_char(COALESCE(created_at, now()),'YYYY-MM-DD HH24:MI:SS') AS created_at " "FROM users") if has_col("users", "created_at") else \ pg_json("SELECT id AS user_id, username, jf_user_id FROM users") for u in users: u["snapshot_ts"] = SNAP u.setdefault("created_at", "1970-01-01 00:00:00") u["created_at"] = fmt_ts(u["created_at"]) total += ch_insert("popelis_users", users) # mylist (snapshot) ml = pg_json("SELECT user_id, item_id, " "to_char(COALESCE(added_at, now()),'YYYY-MM-DD HH24:MI:SS') AS added_at FROM mylist") \ if has_col("mylist", "added_at") else \ pg_json("SELECT user_id, item_id FROM mylist") for m in ml: m["snapshot_ts"] = SNAP m.setdefault("added_at", "1970-01-01 00:00:00") m["added_at"] = fmt_ts(m["added_at"]) total += ch_insert("popelis_mylist", ml) # events (incremental por id) last = 0 try: r = ch_query("SELECT max(event_id) AS m FROM popelis_events") last = int(r[0]["m"]) if r and r[0].get("m") not in (None, "") else 0 except Exception: last = 0 ev = pg_json(f"SELECT id AS event_id, " f"to_char(ts,'YYYY-MM-DD HH24:MI:SS') AS event_ts, " f"COALESCE(user_id,0) AS user_id, username, event_type, item_id, meta " f"FROM events WHERE id > {last} ORDER BY id") for e in ev: e["event_ts"] = fmt_ts(e["event_ts"]) total += ch_insert("popelis_events", ev) print(json.dumps({"snapshot_ts": SNAP, "users": len(users), "mylist": len(ml), "events_new": len(ev), "inserted": total})) def has_col(table, col): try: out = subprocess.check_output( ["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc", f"SELECT 1 FROM information_schema.columns WHERE table_name='{table}' AND column_name='{col}'"], text=True).strip() return out == "1" except Exception: return False if __name__ == "__main__": main()