52999ecb86
2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula, popelis users/mylist/events) a ClickHouse en el VPS, visualizado en Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra.
297 lines
12 KiB
Python
297 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""ETL PC → ClickHouse (cada 5min). Extrae las fuentes que viven en este PC:
|
|
Jellyfin (catalogo, usuarios, reproduccion, sesiones), Radarr/Sonarr (history+queue),
|
|
Prowlarr (indexers), y el catalogo gnula (SQLite). Empuja snapshots con snapshot_ts a
|
|
ClickHouse del VPS a traves de un tunel SSH (CH HTTP escucha solo en 127.0.0.1 del VPS).
|
|
|
|
Reusa funciones del registry: clickhouse_insert_rows_py_infra.
|
|
Secrets en ~/.config/popelis/analytics.env (chmod600; el timer no puede usar pass/GPG).
|
|
|
|
Uso: python etl_pc.py [--once] [--dry]
|
|
"""
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
import urllib.parse
|
|
import urllib.error
|
|
from datetime import datetime, timezone
|
|
|
|
# --- registry ---
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))
|
|
try:
|
|
from infra import clickhouse_insert_rows
|
|
if not callable(clickhouse_insert_rows):
|
|
raise ImportError
|
|
except ImportError:
|
|
from infra.clickhouse_insert_rows import clickhouse_insert_rows # noqa: E402
|
|
|
|
ENV_PATH = os.path.expanduser("~/.config/popelis/analytics.env")
|
|
|
|
|
|
def load_env(path):
|
|
cfg = {}
|
|
with open(path) as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line and not line.startswith("#") and "=" in line:
|
|
k, v = line.split("=", 1)
|
|
cfg[k] = v
|
|
return cfg
|
|
|
|
|
|
CFG = load_env(ENV_PATH)
|
|
DRY = "--dry" in sys.argv
|
|
LOCAL_PORT = 18123
|
|
SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def http_json(url, headers=None, data=None, method="GET", timeout=20):
|
|
req = urllib.request.Request(url, data=data, headers=headers or {}, method=method)
|
|
with urllib.request.urlopen(req, timeout=timeout) as r:
|
|
return json.loads(r.read())
|
|
|
|
|
|
def ticks_to_min(t):
|
|
try:
|
|
return round(float(t) / 600_000_000.0, 2) # 1 tick = 100ns
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def fmt_dt(s):
|
|
"""Jellyfin/ISO → 'YYYY-MM-DD HH:MM:SS' o '' ."""
|
|
if not s:
|
|
return None
|
|
s = str(s).replace("Z", "").split(".")[0].replace("T", " ")
|
|
return s[:19] if len(s) >= 19 else None
|
|
|
|
|
|
# ---------- TUNEL SSH ----------
|
|
class Tunnel:
|
|
def __init__(self, host, lport, rhost="127.0.0.1", rport=8123):
|
|
self.host, self.lport, self.rhost, self.rport = host, lport, rhost, rport
|
|
self.proc = None
|
|
|
|
def __enter__(self):
|
|
self.proc = subprocess.Popen(
|
|
["ssh", "-N", "-o", "ExitOnForwardFailure=yes", "-o", "ConnectTimeout=10",
|
|
"-L", f"{self.lport}:{self.rhost}:{self.rport}", self.host],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
# esperar puerto
|
|
for _ in range(30):
|
|
try:
|
|
urllib.request.urlopen(f"http://127.0.0.1:{self.lport}/ping", timeout=2).read()
|
|
return self
|
|
except Exception:
|
|
time.sleep(0.3)
|
|
raise RuntimeError("tunel SSH no abrio puerto local a tiempo")
|
|
|
|
def __exit__(self, *a):
|
|
if self.proc:
|
|
self.proc.terminate()
|
|
|
|
|
|
# ---------- EXTRACTORES ----------
|
|
def jf_auth():
|
|
body = json.dumps({"Username": CFG["JF_USER"], "Pw": CFG["JF_PASS"]}).encode()
|
|
hdr = {"Content-Type": "application/json",
|
|
"X-Emby-Authorization": 'MediaBrowser Client="etl", Device="pc", DeviceId="etl-pc", Version="1.0"'}
|
|
d = http_json(f'{CFG["JF_URL"]}/Users/AuthenticateByName', hdr, body, "POST")
|
|
return d["AccessToken"]
|
|
|
|
|
|
def jf_get(token, path, params=""):
|
|
url = f'{CFG["JF_URL"]}{path}'
|
|
if params:
|
|
url += ("&" if "?" in path else "?") + params
|
|
return http_json(url, {"X-Emby-Token": token}, timeout=40)
|
|
|
|
|
|
def extract_jellyfin():
|
|
out = {"jellyfin_items": [], "jellyfin_users": [], "jellyfin_user_items": [],
|
|
"jellyfin_sessions": []}
|
|
token = jf_auth()
|
|
# items
|
|
fields = "Genres,Path,RunTimeTicks,ProductionYear,CommunityRating,OfficialRating,DateCreated,SeriesName"
|
|
d = jf_get(token, "/Items", f"Recursive=true&IncludeItemTypes=Movie,Series,Episode&Fields={fields}")
|
|
for it in d.get("Items", []):
|
|
out["jellyfin_items"].append({
|
|
"snapshot_ts": SNAP, "item_id": it.get("Id", ""), "type": it.get("Type", ""),
|
|
"name": it.get("Name", ""), "production_year": it.get("ProductionYear") or 0,
|
|
"runtime_min": ticks_to_min(it.get("RunTimeTicks", 0)),
|
|
"genres": it.get("Genres", []) or [],
|
|
"community_rating": it.get("CommunityRating") or 0.0,
|
|
"official_rating": it.get("OfficialRating", "") or "",
|
|
"series_name": it.get("SeriesName", "") or "", "library": "",
|
|
"path": it.get("Path", "") or "",
|
|
"date_created": fmt_dt(it.get("DateCreated")) or "1970-01-01 00:00:00",
|
|
})
|
|
# users
|
|
users = jf_get(token, "/Users")
|
|
for u in users:
|
|
out["jellyfin_users"].append({
|
|
"snapshot_ts": SNAP, "user_id": u.get("Id", ""), "name": u.get("Name", ""),
|
|
"last_login": fmt_dt(u.get("LastLoginDate")) or "1970-01-01 00:00:00",
|
|
"last_activity": fmt_dt(u.get("LastActivityDate")) or "1970-01-01 00:00:00",
|
|
"is_admin": 1 if u.get("Policy", {}).get("IsAdministrator") else 0,
|
|
})
|
|
# played items por usuario (Movie+Episode vistos)
|
|
try:
|
|
pi = jf_get(token, f'/Users/{u["Id"]}/Items',
|
|
"Recursive=true&IncludeItemTypes=Movie,Episode&IsPlayed=true&Fields=UserData")
|
|
for it in pi.get("Items", []):
|
|
ud = it.get("UserData", {}) or {}
|
|
out["jellyfin_user_items"].append({
|
|
"snapshot_ts": SNAP, "user_id": u.get("Id", ""), "user_name": u.get("Name", ""),
|
|
"item_id": it.get("Id", ""), "item_name": it.get("Name", ""),
|
|
"type": it.get("Type", ""), "played": 1 if ud.get("Played") else 0,
|
|
"play_count": ud.get("PlayCount", 0) or 0,
|
|
"playback_pct": round(ud.get("PlayedPercentage", 0.0) or 0.0, 2),
|
|
"last_played": fmt_dt(ud.get("LastPlayedDate")) or "1970-01-01 00:00:00",
|
|
})
|
|
except Exception as e:
|
|
print(f"[jf] user_items {u.get('Name')}: {e}", file=sys.stderr)
|
|
# sesiones activas
|
|
try:
|
|
for s in jf_get(token, "/Sessions"):
|
|
np = s.get("NowPlayingItem")
|
|
if not np:
|
|
continue
|
|
ps = s.get("PlayState", {}) or {}
|
|
pos = ticks_to_min(ps.get("PositionTicks", 0))
|
|
dur = ticks_to_min(np.get("RunTimeTicks", 0)) or 1
|
|
out["jellyfin_sessions"].append({
|
|
"snapshot_ts": SNAP, "user_name": s.get("UserName", ""),
|
|
"item_name": np.get("Name", ""), "item_type": np.get("Type", ""),
|
|
"client": s.get("Client", ""), "device": s.get("DeviceName", ""),
|
|
"play_method": ps.get("PlayMethod", ""), "is_paused": 1 if ps.get("IsPaused") else 0,
|
|
"position_pct": round(100.0 * pos / dur, 2),
|
|
})
|
|
except Exception as e:
|
|
print(f"[jf] sessions: {e}", file=sys.stderr)
|
|
return out
|
|
|
|
|
|
def arr_get(base, key, path, ver="v3"):
|
|
sep = "&" if "?" in path else "?"
|
|
return http_json(f"{base}/api/{ver}/{path}{sep}apikey={key}", timeout=30)
|
|
|
|
|
|
def extract_arr():
|
|
out = {"arr_history": [], "arr_queue": []}
|
|
for app, base, key in [("radarr", CFG["RADARR_URL"], CFG["RADARR_KEY"]),
|
|
("sonarr", CFG["SONARR_URL"], CFG["SONARR_KEY"])]:
|
|
try:
|
|
h = arr_get(base, key, "history?page=1&pageSize=200&sortKey=date&sortDirection=descending")
|
|
for r in h.get("records", []):
|
|
out["arr_history"].append({
|
|
"snapshot_ts": SNAP, "app": app, "history_id": r.get("id", 0),
|
|
"event_type": r.get("eventType", ""),
|
|
"title": (r.get("movie", {}) or r.get("series", {}) or {}).get("title", "") or "",
|
|
"source_title": r.get("sourceTitle", "") or "",
|
|
"indexer": (r.get("data", {}) or {}).get("indexer", "") or "",
|
|
"download_client": (r.get("data", {}) or {}).get("downloadClient", "") or "",
|
|
"quality": (r.get("quality", {}) or {}).get("quality", {}).get("name", "") or "",
|
|
"languages": [l.get("name", "") for l in (r.get("languages", []) or [])],
|
|
"event_date": fmt_dt(r.get("date")) or "1970-01-01 00:00:00",
|
|
})
|
|
except Exception as e:
|
|
print(f"[arr] {app} history: {e}", file=sys.stderr)
|
|
try:
|
|
q = arr_get(base, key, "queue?page=1&pageSize=100")
|
|
for r in q.get("records", []):
|
|
out["arr_queue"].append({
|
|
"snapshot_ts": SNAP, "app": app, "title": r.get("title", "") or "",
|
|
"status": r.get("status", "") or "",
|
|
"tracked_status": r.get("trackedDownloadState", "") or "",
|
|
"size_bytes": int(r.get("size", 0) or 0),
|
|
"sizeleft_bytes": int(r.get("sizeleft", 0) or 0),
|
|
"timeleft": r.get("timeleft", "") or "",
|
|
"indexer": r.get("indexer", "") or "",
|
|
"download_client": r.get("downloadClient", "") or "",
|
|
})
|
|
except Exception as e:
|
|
print(f"[arr] {app} queue: {e}", file=sys.stderr)
|
|
return out
|
|
|
|
|
|
def extract_prowlarr():
|
|
out = {"prowlarr_indexers": []}
|
|
try:
|
|
idx = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexer", ver="v1")
|
|
stats = {}
|
|
try:
|
|
st = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexerstats", ver="v1")
|
|
for s in st.get("indexers", []):
|
|
stats[s.get("indexerId")] = s
|
|
except Exception:
|
|
pass
|
|
for i in idx:
|
|
s = stats.get(i.get("id"), {})
|
|
out["prowlarr_indexers"].append({
|
|
"snapshot_ts": SNAP, "indexer_id": i.get("id", 0), "name": i.get("name", "") or "",
|
|
"enable": 1 if i.get("enable") else 0, "protocol": i.get("protocol", "") or "",
|
|
"privacy": i.get("privacy", "") or "",
|
|
"num_grabs": s.get("numberOfGrabs", 0) or 0,
|
|
"num_queries": s.get("numberOfQueries", 0) or 0,
|
|
"num_grab_fail": s.get("numberOfFailedGrabs", 0) or 0,
|
|
"num_query_fail": s.get("numberOfFailedQueries", 0) or 0,
|
|
})
|
|
except Exception as e:
|
|
print(f"[prowlarr] {e}", file=sys.stderr)
|
|
return out
|
|
|
|
|
|
def extract_gnula():
|
|
out = {"gnula_movies": []}
|
|
db = CFG.get("GNULA_DB", "")
|
|
if not (db and os.path.exists(db)):
|
|
return out
|
|
c = sqlite3.connect(db)
|
|
c.row_factory = sqlite3.Row
|
|
for r in c.execute("SELECT href,title,year,flags,lang_es,status,in_library,detected_at,downloaded_at FROM movies"):
|
|
out["gnula_movies"].append({
|
|
"snapshot_ts": SNAP, "href": r["href"] or "", "title": r["title"] or "",
|
|
"year": r["year"] or 0, "flags": r["flags"] or "", "lang_es": r["lang_es"] or 0,
|
|
"status": r["status"] or "", "in_library": r["in_library"] or 0,
|
|
"detected_at": r["detected_at"] or "", "downloaded_at": r["downloaded_at"] or "",
|
|
})
|
|
c.close()
|
|
return out
|
|
|
|
|
|
def main():
|
|
data = {}
|
|
for fn in (extract_jellyfin, extract_arr, extract_prowlarr, extract_gnula):
|
|
try:
|
|
data.update(fn())
|
|
except Exception as e:
|
|
print(f"[etl] {fn.__name__} FALLO: {e}", file=sys.stderr)
|
|
counts = {t: len(rows) for t, rows in data.items()}
|
|
print(f"[etl] snapshot {SNAP} extraido: {json.dumps(counts)}")
|
|
if DRY:
|
|
print("[etl] --dry: no se inserta"); return
|
|
base = f"http://127.0.0.1:{LOCAL_PORT}"
|
|
total = 0
|
|
with Tunnel(CFG["SSH_HOST"], LOCAL_PORT):
|
|
for table, rows in data.items():
|
|
if not rows:
|
|
continue
|
|
try:
|
|
n = clickhouse_insert_rows(base, f'{CFG["CH_DB"]}.{table}', rows,
|
|
user=CFG["CH_USER"], password=CFG["CH_PASSWORD"],
|
|
database=CFG["CH_DB"])
|
|
total += n
|
|
except Exception as e:
|
|
print(f"[etl] insert {table} FALLO: {e}", file=sys.stderr)
|
|
print(json.dumps({"snapshot_ts": SNAP, "inserted": total, "tables": counts}))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|