Files
Egutierrez 52999ecb86 feat: media_analytics — ETL PC+VPS → ClickHouse + Grafana
2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula,
popelis users/mylist/events) a ClickHouse en el VPS, visualizado en
Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis
via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra.
2026-05-30 14:55:48 +02:00

297 lines
12 KiB
Python

#!/usr/bin/env python3
"""ETL PC → ClickHouse (cada 5min). Extrae las fuentes que viven en este PC:
Jellyfin (catalogo, usuarios, reproduccion, sesiones), Radarr/Sonarr (history+queue),
Prowlarr (indexers), y el catalogo gnula (SQLite). Empuja snapshots con snapshot_ts a
ClickHouse del VPS a traves de un tunel SSH (CH HTTP escucha solo en 127.0.0.1 del VPS).
Reusa funciones del registry: clickhouse_insert_rows_py_infra.
Secrets en ~/.config/popelis/analytics.env (chmod600; el timer no puede usar pass/GPG).
Uso: python etl_pc.py [--once] [--dry]
"""
import json
import os
import re
import sqlite3
import subprocess
import sys
import time
import urllib.request
import urllib.parse
import urllib.error
from datetime import datetime, timezone
# --- registry ---
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))
try:
from infra import clickhouse_insert_rows
if not callable(clickhouse_insert_rows):
raise ImportError
except ImportError:
from infra.clickhouse_insert_rows import clickhouse_insert_rows # noqa: E402
ENV_PATH = os.path.expanduser("~/.config/popelis/analytics.env")
def load_env(path):
cfg = {}
with open(path) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
cfg[k] = v
return cfg
CFG = load_env(ENV_PATH)
DRY = "--dry" in sys.argv
LOCAL_PORT = 18123
SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def http_json(url, headers=None, data=None, method="GET", timeout=20):
req = urllib.request.Request(url, data=data, headers=headers or {}, method=method)
with urllib.request.urlopen(req, timeout=timeout) as r:
return json.loads(r.read())
def ticks_to_min(t):
try:
return round(float(t) / 600_000_000.0, 2) # 1 tick = 100ns
except Exception:
return 0.0
def fmt_dt(s):
"""Jellyfin/ISO → 'YYYY-MM-DD HH:MM:SS' o '' ."""
if not s:
return None
s = str(s).replace("Z", "").split(".")[0].replace("T", " ")
return s[:19] if len(s) >= 19 else None
# ---------- TUNEL SSH ----------
class Tunnel:
def __init__(self, host, lport, rhost="127.0.0.1", rport=8123):
self.host, self.lport, self.rhost, self.rport = host, lport, rhost, rport
self.proc = None
def __enter__(self):
self.proc = subprocess.Popen(
["ssh", "-N", "-o", "ExitOnForwardFailure=yes", "-o", "ConnectTimeout=10",
"-L", f"{self.lport}:{self.rhost}:{self.rport}", self.host],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# esperar puerto
for _ in range(30):
try:
urllib.request.urlopen(f"http://127.0.0.1:{self.lport}/ping", timeout=2).read()
return self
except Exception:
time.sleep(0.3)
raise RuntimeError("tunel SSH no abrio puerto local a tiempo")
def __exit__(self, *a):
if self.proc:
self.proc.terminate()
# ---------- EXTRACTORES ----------
def jf_auth():
body = json.dumps({"Username": CFG["JF_USER"], "Pw": CFG["JF_PASS"]}).encode()
hdr = {"Content-Type": "application/json",
"X-Emby-Authorization": 'MediaBrowser Client="etl", Device="pc", DeviceId="etl-pc", Version="1.0"'}
d = http_json(f'{CFG["JF_URL"]}/Users/AuthenticateByName', hdr, body, "POST")
return d["AccessToken"]
def jf_get(token, path, params=""):
url = f'{CFG["JF_URL"]}{path}'
if params:
url += ("&" if "?" in path else "?") + params
return http_json(url, {"X-Emby-Token": token}, timeout=40)
def extract_jellyfin():
out = {"jellyfin_items": [], "jellyfin_users": [], "jellyfin_user_items": [],
"jellyfin_sessions": []}
token = jf_auth()
# items
fields = "Genres,Path,RunTimeTicks,ProductionYear,CommunityRating,OfficialRating,DateCreated,SeriesName"
d = jf_get(token, "/Items", f"Recursive=true&IncludeItemTypes=Movie,Series,Episode&Fields={fields}")
for it in d.get("Items", []):
out["jellyfin_items"].append({
"snapshot_ts": SNAP, "item_id": it.get("Id", ""), "type": it.get("Type", ""),
"name": it.get("Name", ""), "production_year": it.get("ProductionYear") or 0,
"runtime_min": ticks_to_min(it.get("RunTimeTicks", 0)),
"genres": it.get("Genres", []) or [],
"community_rating": it.get("CommunityRating") or 0.0,
"official_rating": it.get("OfficialRating", "") or "",
"series_name": it.get("SeriesName", "") or "", "library": "",
"path": it.get("Path", "") or "",
"date_created": fmt_dt(it.get("DateCreated")) or "1970-01-01 00:00:00",
})
# users
users = jf_get(token, "/Users")
for u in users:
out["jellyfin_users"].append({
"snapshot_ts": SNAP, "user_id": u.get("Id", ""), "name": u.get("Name", ""),
"last_login": fmt_dt(u.get("LastLoginDate")) or "1970-01-01 00:00:00",
"last_activity": fmt_dt(u.get("LastActivityDate")) or "1970-01-01 00:00:00",
"is_admin": 1 if u.get("Policy", {}).get("IsAdministrator") else 0,
})
# played items por usuario (Movie+Episode vistos)
try:
pi = jf_get(token, f'/Users/{u["Id"]}/Items',
"Recursive=true&IncludeItemTypes=Movie,Episode&IsPlayed=true&Fields=UserData")
for it in pi.get("Items", []):
ud = it.get("UserData", {}) or {}
out["jellyfin_user_items"].append({
"snapshot_ts": SNAP, "user_id": u.get("Id", ""), "user_name": u.get("Name", ""),
"item_id": it.get("Id", ""), "item_name": it.get("Name", ""),
"type": it.get("Type", ""), "played": 1 if ud.get("Played") else 0,
"play_count": ud.get("PlayCount", 0) or 0,
"playback_pct": round(ud.get("PlayedPercentage", 0.0) or 0.0, 2),
"last_played": fmt_dt(ud.get("LastPlayedDate")) or "1970-01-01 00:00:00",
})
except Exception as e:
print(f"[jf] user_items {u.get('Name')}: {e}", file=sys.stderr)
# sesiones activas
try:
for s in jf_get(token, "/Sessions"):
np = s.get("NowPlayingItem")
if not np:
continue
ps = s.get("PlayState", {}) or {}
pos = ticks_to_min(ps.get("PositionTicks", 0))
dur = ticks_to_min(np.get("RunTimeTicks", 0)) or 1
out["jellyfin_sessions"].append({
"snapshot_ts": SNAP, "user_name": s.get("UserName", ""),
"item_name": np.get("Name", ""), "item_type": np.get("Type", ""),
"client": s.get("Client", ""), "device": s.get("DeviceName", ""),
"play_method": ps.get("PlayMethod", ""), "is_paused": 1 if ps.get("IsPaused") else 0,
"position_pct": round(100.0 * pos / dur, 2),
})
except Exception as e:
print(f"[jf] sessions: {e}", file=sys.stderr)
return out
def arr_get(base, key, path, ver="v3"):
sep = "&" if "?" in path else "?"
return http_json(f"{base}/api/{ver}/{path}{sep}apikey={key}", timeout=30)
def extract_arr():
out = {"arr_history": [], "arr_queue": []}
for app, base, key in [("radarr", CFG["RADARR_URL"], CFG["RADARR_KEY"]),
("sonarr", CFG["SONARR_URL"], CFG["SONARR_KEY"])]:
try:
h = arr_get(base, key, "history?page=1&pageSize=200&sortKey=date&sortDirection=descending")
for r in h.get("records", []):
out["arr_history"].append({
"snapshot_ts": SNAP, "app": app, "history_id": r.get("id", 0),
"event_type": r.get("eventType", ""),
"title": (r.get("movie", {}) or r.get("series", {}) or {}).get("title", "") or "",
"source_title": r.get("sourceTitle", "") or "",
"indexer": (r.get("data", {}) or {}).get("indexer", "") or "",
"download_client": (r.get("data", {}) or {}).get("downloadClient", "") or "",
"quality": (r.get("quality", {}) or {}).get("quality", {}).get("name", "") or "",
"languages": [l.get("name", "") for l in (r.get("languages", []) or [])],
"event_date": fmt_dt(r.get("date")) or "1970-01-01 00:00:00",
})
except Exception as e:
print(f"[arr] {app} history: {e}", file=sys.stderr)
try:
q = arr_get(base, key, "queue?page=1&pageSize=100")
for r in q.get("records", []):
out["arr_queue"].append({
"snapshot_ts": SNAP, "app": app, "title": r.get("title", "") or "",
"status": r.get("status", "") or "",
"tracked_status": r.get("trackedDownloadState", "") or "",
"size_bytes": int(r.get("size", 0) or 0),
"sizeleft_bytes": int(r.get("sizeleft", 0) or 0),
"timeleft": r.get("timeleft", "") or "",
"indexer": r.get("indexer", "") or "",
"download_client": r.get("downloadClient", "") or "",
})
except Exception as e:
print(f"[arr] {app} queue: {e}", file=sys.stderr)
return out
def extract_prowlarr():
out = {"prowlarr_indexers": []}
try:
idx = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexer", ver="v1")
stats = {}
try:
st = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexerstats", ver="v1")
for s in st.get("indexers", []):
stats[s.get("indexerId")] = s
except Exception:
pass
for i in idx:
s = stats.get(i.get("id"), {})
out["prowlarr_indexers"].append({
"snapshot_ts": SNAP, "indexer_id": i.get("id", 0), "name": i.get("name", "") or "",
"enable": 1 if i.get("enable") else 0, "protocol": i.get("protocol", "") or "",
"privacy": i.get("privacy", "") or "",
"num_grabs": s.get("numberOfGrabs", 0) or 0,
"num_queries": s.get("numberOfQueries", 0) or 0,
"num_grab_fail": s.get("numberOfFailedGrabs", 0) or 0,
"num_query_fail": s.get("numberOfFailedQueries", 0) or 0,
})
except Exception as e:
print(f"[prowlarr] {e}", file=sys.stderr)
return out
def extract_gnula():
out = {"gnula_movies": []}
db = CFG.get("GNULA_DB", "")
if not (db and os.path.exists(db)):
return out
c = sqlite3.connect(db)
c.row_factory = sqlite3.Row
for r in c.execute("SELECT href,title,year,flags,lang_es,status,in_library,detected_at,downloaded_at FROM movies"):
out["gnula_movies"].append({
"snapshot_ts": SNAP, "href": r["href"] or "", "title": r["title"] or "",
"year": r["year"] or 0, "flags": r["flags"] or "", "lang_es": r["lang_es"] or 0,
"status": r["status"] or "", "in_library": r["in_library"] or 0,
"detected_at": r["detected_at"] or "", "downloaded_at": r["downloaded_at"] or "",
})
c.close()
return out
def main():
data = {}
for fn in (extract_jellyfin, extract_arr, extract_prowlarr, extract_gnula):
try:
data.update(fn())
except Exception as e:
print(f"[etl] {fn.__name__} FALLO: {e}", file=sys.stderr)
counts = {t: len(rows) for t, rows in data.items()}
print(f"[etl] snapshot {SNAP} extraido: {json.dumps(counts)}")
if DRY:
print("[etl] --dry: no se inserta"); return
base = f"http://127.0.0.1:{LOCAL_PORT}"
total = 0
with Tunnel(CFG["SSH_HOST"], LOCAL_PORT):
for table, rows in data.items():
if not rows:
continue
try:
n = clickhouse_insert_rows(base, f'{CFG["CH_DB"]}.{table}', rows,
user=CFG["CH_USER"], password=CFG["CH_PASSWORD"],
database=CFG["CH_DB"])
total += n
except Exception as e:
print(f"[etl] insert {table} FALLO: {e}", file=sys.stderr)
print(json.dumps({"snapshot_ts": SNAP, "inserted": total, "tables": counts}))
if __name__ == "__main__":
main()