commit 52999ecb864de0abd9a13fec19fb594d87ad0f90 Author: Egutierrez Date: Sat May 30 14:55:48 2026 +0200 feat: media_analytics — ETL PC+VPS → ClickHouse + Grafana 2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula, popelis users/mylist/events) a ClickHouse en el VPS, visualizado en Grafana (grafana.datardos.com). Ingesta PC via tunel SSH; popelis via ETL local en el VPS. Usa clickhouse_insert_rows_py_infra. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..458a0d0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.env +*.log +__pycache__/ diff --git a/app.md b/app.md new file mode 100644 index 0000000..9119548 --- /dev/null +++ b/app.md @@ -0,0 +1,65 @@ +--- +name: media_analytics +lang: py +domain: infra +version: 0.1.0 +description: "Analitica del media stack: 2 ETLs cada 5min suben snapshots (Jellyfin, *arr, Prowlarr, gnula, popelis users/mylist/events) a ClickHouse en el VPS, visualizado en Grafana. Ingesta PC via tunel SSH; popelis via ETL local en VPS." +tags: [analytics, clickhouse, grafana, etl, media, popelis, jellyfin, service] +uses_functions: + - clickhouse_insert_rows_py_infra +uses_types: [] +framework: "" +entry_point: "etl_pc.py" +dir_path: "apps/media_analytics" +repo_url: "" +--- + +## Arquitectura + +``` +PC (Docker Desktop) VPS datardos (coolify net) +───────────────────── ────────────────────────── +Jellyfin :8096 ─┐ ClickHouse (interno + 127.0.0.1:8123) +Radarr/Sonarr ─┤ etl_pc.py (5min) ──ssh──► analytics.* (11 tablas snapshot) +Prowlarr ─┤ tunel SSH 18123→8123 ▲ +gnula_catalog ──┘ │ etl_vps.py (5min, root) + popelis-db (Postgres) ── users/mylist/events + │ + Grafana :3000 ──► grafana.datardos.com (Traefik+LE) +``` + +## Componentes + +| Pieza | Dónde | Qué hace | +|---|---|---| +| `etl_pc.py` | PC, systemd-user `media-analytics-etl.timer` (5min) | extrae Jellyfin (items/users/user_items/sessions), Radarr/Sonarr (history/queue), Prowlarr (indexers), gnula SQLite → push a CH via túnel SSH. Usa `clickhouse_insert_rows_py_infra`. | +| `etl_vps.py` | VPS, systemd `media-analytics-vps.timer` (5min, root) | lee popelis-db (users, mylist snapshot; events incremental por id) → CH HTTP local. Standalone (VPS sin registry). | +| `deploy/docker-compose.yml` | VPS `/opt/analytics` | ClickHouse (interno coolify + 127.0.0.1:8123) + Grafana (Traefik grafana.datardos.com). | +| `deploy/clickhouse/schema.sql` | VPS | 11 tablas: jellyfin_{items,users,user_items,sessions}, arr_{history,queue}, prowlarr_indexers, gnula_movies, popelis_{users,mylist,events}. | +| `deploy/grafana/provisioning/` | VPS | datasource ClickHouse (uid `clickhouse`) + dashboard `Media Stack Analytics` (12 paneles). | + +## Secretos +- `pass datardos-vps/clickhouse` (user analytics) · `pass datardos-vps/grafana` (admin). +- PC: `~/.config/popelis/analytics.env` (chmod600; CH pass + JF/arr keys — el timer no usa GPG). +- VPS: `/opt/analytics/.env` (chmod600; CH_PASSWORD, GF_PASSWORD). + +## Ejecutar manual +```bash +# PC ETL +/home/lucas/fn_registry/python/.venv/bin/python3 apps/media_analytics/etl_pc.py # real +/home/lucas/fn_registry/python/.venv/bin/python3 apps/media_analytics/etl_pc.py --dry # solo extrae +# VPS ETL +ssh datardos 'sudo python3 /opt/analytics/etl_vps.py' +# Redeploy infra VPS +rsync -az apps/media_analytics/deploy/ datardos:/opt/analytics/ && ssh datardos 'cd /opt/analytics && sudo docker compose up -d' +``` + +## Visualización +https://grafana.datardos.com (admin / `pass datardos-vps/grafana`). Dashboard "Media Stack Analytics". + +## Gotchas +- **Eventos "play" NO van por popelis** (la reproducción es directa a Jellyfin `/jf`): se capturan del lado Jellyfin (`jellyfin_sessions` + `jellyfin_user_items.play_count`). `popelis_events` cubre login/logout/mylist/user_created (instrumentado en popelis-api). +- ClickHouse HTTP escucha **solo 127.0.0.1 del VPS** (no público). El PC entra por túnel SSH efímero (`ssh -N -L 18123:127.0.0.1:8123`). Grafana usa el nativo :9000 por la red coolify. +- Snapshots son **append con snapshot_ts** → análisis temporal del estado. Eventos son hechos (event_ts) con dedup `ReplacingMergeTree(event_id)`. +- Int64 de ClickHouse vuelve como **string** en JSON (gotcha de `clickhouse_query`/Grafana). +- El timer del PC necesita `ssh datardos` sin passphrase (key sin passphrase o agente cargado). diff --git a/deploy/clickhouse/schema.sql b/deploy/clickhouse/schema.sql new file mode 100644 index 0000000..de683d6 --- /dev/null +++ b/deploy/clickhouse/schema.sql @@ -0,0 +1,173 @@ +-- Esquema analitico media stack. Todas las tablas de snapshot llevan snapshot_ts +-- (momento de la captura del ETL, cada 5min) → permite analisis temporal del estado. +-- Las tablas de eventos llevan event_ts (instante real del evento). +-- Engine MergeTree, particion mensual, orden por (snapshot_ts, clave). + +CREATE DATABASE IF NOT EXISTS analytics; + +-- ============ JELLYFIN ============ +-- Catalogo: peliculas/series/episodios visibles en la biblioteca. +CREATE TABLE IF NOT EXISTS analytics.jellyfin_items ( + snapshot_ts DateTime, + item_id String, + type LowCardinality(String), -- Movie | Series | Episode + name String, + production_year Int32, + runtime_min Float32, + genres Array(String), + community_rating Float32, + official_rating String, + series_name String, + library String, + path String, + date_created DateTime DEFAULT toDateTime(0) +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, type, item_id); + +-- Usuarios Jellyfin (espejos popelis). +CREATE TABLE IF NOT EXISTS analytics.jellyfin_users ( + snapshot_ts DateTime, + user_id String, + name String, + last_login DateTime DEFAULT toDateTime(0), + last_activity DateTime DEFAULT toDateTime(0), + is_admin UInt8 +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, user_id); + +-- Estado de reproduccion por usuario+item (playcount, visto, ultima vez). +CREATE TABLE IF NOT EXISTS analytics.jellyfin_user_items ( + snapshot_ts DateTime, + user_id String, + user_name String, + item_id String, + item_name String, + type LowCardinality(String), + played UInt8, + play_count Int32, + playback_pct Float32, + last_played DateTime DEFAULT toDateTime(0) +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, user_id, item_id); + +-- Sesiones activas (lo que se esta viendo en el momento del snapshot). +CREATE TABLE IF NOT EXISTS analytics.jellyfin_sessions ( + snapshot_ts DateTime, + user_name String, + item_name String, + item_type LowCardinality(String), + client String, + device String, + play_method String, + is_paused UInt8, + position_pct Float32 +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, user_name); + +-- ============ SCRAPERS TORRENTS (*arr) ============ +-- Historial Radarr/Sonarr: grabs, imports, fallos. +CREATE TABLE IF NOT EXISTS analytics.arr_history ( + snapshot_ts DateTime, + app LowCardinality(String), -- radarr | sonarr + history_id Int64, + event_type LowCardinality(String), -- grabbed | downloadFolderImported | ... + title String, + source_title String, + indexer String, + download_client String, + quality String, + languages Array(String), + event_date DateTime DEFAULT toDateTime(0) +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, app, history_id); + +-- Cola activa de descargas. +CREATE TABLE IF NOT EXISTS analytics.arr_queue ( + snapshot_ts DateTime, + app LowCardinality(String), + title String, + status String, + tracked_status String, + size_bytes Int64, + sizeleft_bytes Int64, + timeleft String, + indexer String, + download_client String +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, app, title); + +-- Indexers Prowlarr: estado + contadores grab/query. +CREATE TABLE IF NOT EXISTS analytics.prowlarr_indexers ( + snapshot_ts DateTime, + indexer_id Int32, + name String, + enable UInt8, + protocol String, + privacy String, + num_grabs Int64, + num_queries Int64, + num_grab_fail Int64, + num_query_fail Int64 +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, indexer_id); + +-- ============ GNULA SCRAPPER ============ +-- Catalogo de pelis en castellano detectadas (gnula_catalog.db). +CREATE TABLE IF NOT EXISTS analytics.gnula_movies ( + snapshot_ts DateTime, + href String, + title String, + year Int32, + flags String, + lang_es UInt8, + status LowCardinality(String), -- pending | downloaded | failed | have + in_library UInt8, + detected_at String, + downloaded_at String +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, href); + +-- ============ POPELIS ============ +-- Usuarios (estado). +CREATE TABLE IF NOT EXISTS analytics.popelis_users ( + snapshot_ts DateTime, + user_id Int64, + username String, + jf_user_id String, + created_at DateTime DEFAULT toDateTime(0) +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, user_id); + +-- Mi lista por usuario (estado). +CREATE TABLE IF NOT EXISTS analytics.popelis_mylist ( + snapshot_ts DateTime, + user_id Int64, + item_id String, + added_at DateTime DEFAULT toDateTime(0) +) ENGINE = MergeTree +PARTITION BY toYYYYMM(snapshot_ts) +ORDER BY (snapshot_ts, user_id, item_id); + +-- Eventos (logins, plays, mylist add/remove) — instrumentados en popelis-api. +-- Tabla de hechos: dedup por event_id con ReplacingMergeTree. +CREATE TABLE IF NOT EXISTS analytics.popelis_events ( + event_id Int64, + event_ts DateTime, + user_id Int64, + username String, + event_type LowCardinality(String), -- login | logout | play | mylist_add | mylist_remove + item_id String, + meta String, + ingested_at DateTime DEFAULT now() +) ENGINE = ReplacingMergeTree(ingested_at) +PARTITION BY toYYYYMM(event_ts) +ORDER BY (event_id); diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml new file mode 100644 index 0000000..12e8dd5 --- /dev/null +++ b/deploy/docker-compose.yml @@ -0,0 +1,60 @@ +services: + clickhouse: + image: clickhouse/clickhouse-server:24.8-alpine + container_name: clickhouse + restart: always + environment: + CLICKHOUSE_DB: analytics + CLICKHOUSE_USER: analytics + CLICKHOUSE_PASSWORD: ${CH_PASSWORD} + CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT: "1" + ulimits: + nofile: + soft: 262144 + hard: 262144 + volumes: + - clickhouse_data:/var/lib/clickhouse + - ./clickhouse/schema.sql:/docker-entrypoint-initdb.d/01_schema.sql:ro + networks: + - coolify + ports: + # HTTP solo en localhost del VPS (no publico). Ingesta del PC via tunel SSH. + # Grafana usa el nativo 9000 por la red coolify (no expuesto). + - "127.0.0.1:8123:8123" + deploy: + resources: + limits: + memory: 2g + + grafana: + image: grafana/grafana:11.2.0 + container_name: grafana + restart: always + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: ${GF_PASSWORD} + GF_INSTALL_PLUGINS: grafana-clickhouse-datasource + GF_SERVER_ROOT_URL: https://grafana.datardos.com + GF_USERS_ALLOW_SIGN_UP: "false" + CH_PASSWORD: ${CH_PASSWORD} + volumes: + - grafana_data:/var/lib/grafana + - ./grafana/provisioning:/etc/grafana/provisioning:ro + networks: + - coolify + labels: + traefik.enable: "true" + traefik.docker.network: coolify + traefik.http.routers.grafana.entrypoints: https + traefik.http.routers.grafana.rule: Host(`grafana.datardos.com`) + traefik.http.routers.grafana.tls: "true" + traefik.http.routers.grafana.tls.certresolver: letsencrypt + traefik.http.services.grafana.loadbalancer.server.port: "3000" + +volumes: + clickhouse_data: + grafana_data: + +networks: + coolify: + external: true diff --git a/deploy/grafana/provisioning/dashboards/dashboards.yml b/deploy/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 0000000..c2fc6a9 --- /dev/null +++ b/deploy/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,13 @@ +apiVersion: 1 + +providers: + - name: media-stack + orgId: 1 + folder: Media Stack + type: file + disableDeletion: false + updateIntervalSeconds: 30 + allowUiUpdates: true + options: + path: /etc/grafana/provisioning/dashboards + foldersFromFilesStructure: false diff --git a/deploy/grafana/provisioning/dashboards/media_stack.json b/deploy/grafana/provisioning/dashboards/media_stack.json new file mode 100644 index 0000000..1591a23 --- /dev/null +++ b/deploy/grafana/provisioning/dashboards/media_stack.json @@ -0,0 +1,88 @@ +{ + "uid": "media-stack", + "title": "Media Stack Analytics", + "tags": ["media", "popelis"], + "timezone": "browser", + "schemaVersion": 39, + "version": 1, + "refresh": "5m", + "time": { "from": "now-7d", "to": "now" }, + "templating": { "list": [] }, + "annotations": { "list": [] }, + "panels": [ + { + "id": 1, "type": "stat", "title": "Jellyfin · items (último)", + "gridPos": { "h": 4, "w": 4, "x": 0, "y": 0 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT count() AS items FROM analytics.jellyfin_items WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.jellyfin_items)" } ] + }, + { + "id": 2, "type": "stat", "title": "Popelis · usuarios", + "gridPos": { "h": 4, "w": 4, "x": 4, "y": 0 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT count() AS users FROM analytics.popelis_users WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.popelis_users)" } ] + }, + { + "id": 3, "type": "stat", "title": "gnula · pendientes", + "gridPos": { "h": 4, "w": 4, "x": 8, "y": 0 }, + "fieldConfig": { "defaults": { "color": { "mode": "fixed", "fixedColor": "orange" } }, "overrides": [] }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT countIf(status='pending') AS pending FROM analytics.gnula_movies WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.gnula_movies)" } ] + }, + { + "id": 4, "type": "stat", "title": "gnula · descargadas", + "gridPos": { "h": 4, "w": 4, "x": 12, "y": 0 }, + "fieldConfig": { "defaults": { "color": { "mode": "fixed", "fixedColor": "green" } }, "overrides": [] }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT countIf(status='downloaded') AS downloaded FROM analytics.gnula_movies WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.gnula_movies)" } ] + }, + { + "id": 5, "type": "stat", "title": "*arr · grabs (total)", + "gridPos": { "h": 4, "w": 4, "x": 16, "y": 0 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT countIf(event_type='grabbed') AS grabs FROM analytics.arr_history WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.arr_history)" } ] + }, + { + "id": 6, "type": "stat", "title": "Jellyfin · sesiones activas (último)", + "gridPos": { "h": 4, "w": 4, "x": 20, "y": 0 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT count() AS sesiones FROM analytics.jellyfin_sessions WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.jellyfin_sessions)" } ] + }, + { + "id": 10, "type": "timeseries", "title": "gnula · catálogo en el tiempo", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 4 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "timeseries", "format": 0, "rawSql": "SELECT snapshot_ts AS time, countIf(status='pending') AS pendientes, countIf(status='downloaded') AS descargadas, countIf(in_library=1) AS en_biblioteca FROM analytics.gnula_movies GROUP BY time ORDER BY time" } ] + }, + { + "id": 11, "type": "timeseries", "title": "Jellyfin · tamaño biblioteca en el tiempo", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 4 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "timeseries", "format": 0, "rawSql": "SELECT snapshot_ts AS time, countIf(type='Movie') AS peliculas, countIf(type='Series') AS series, countIf(type='Episode') AS episodios FROM analytics.jellyfin_items GROUP BY time ORDER BY time" } ] + }, + { + "id": 20, "type": "table", "title": "*arr · grabs recientes", + "gridPos": { "h": 9, "w": 12, "x": 0, "y": 12 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT event_date, app, title, indexer, quality, arrayStringConcat(languages, ',') AS idiomas FROM analytics.arr_history WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.arr_history) AND event_type='grabbed' ORDER BY event_date DESC LIMIT 30" } ] + }, + { + "id": 21, "type": "table", "title": "Prowlarr · indexers", + "gridPos": { "h": 9, "w": 12, "x": 12, "y": 12 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT name, enable, protocol, num_grabs, num_queries, num_grab_fail, num_query_fail FROM analytics.prowlarr_indexers WHERE snapshot_ts = (SELECT max(snapshot_ts) FROM analytics.prowlarr_indexers) ORDER BY num_grabs DESC" } ] + }, + { + "id": 30, "type": "table", "title": "Popelis · eventos recientes", + "gridPos": { "h": 9, "w": 12, "x": 0, "y": 21 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "table", "format": 1, "rawSql": "SELECT event_ts, username, event_type, item_id FROM analytics.popelis_events ORDER BY event_ts DESC LIMIT 50" } ] + }, + { + "id": 31, "type": "timeseries", "title": "Popelis · eventos por tipo (por día)", + "gridPos": { "h": 9, "w": 12, "x": 12, "y": 21 }, + "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, + "targets": [ { "refId": "A", "datasource": { "type": "grafana-clickhouse-datasource", "uid": "clickhouse" }, "editorType": "sql", "queryType": "timeseries", "format": 0, "rawSql": "SELECT toStartOfDay(event_ts) AS time, countIf(event_type='login') AS logins, countIf(event_type='mylist_add') AS mylist_add, countIf(event_type='user_created') AS altas FROM analytics.popelis_events GROUP BY time ORDER BY time" } ] + } + ] +} diff --git a/deploy/grafana/provisioning/datasources/clickhouse.yml b/deploy/grafana/provisioning/datasources/clickhouse.yml new file mode 100644 index 0000000..e4a82cf --- /dev/null +++ b/deploy/grafana/provisioning/datasources/clickhouse.yml @@ -0,0 +1,16 @@ +apiVersion: 1 + +datasources: + - name: ClickHouse + uid: clickhouse + type: grafana-clickhouse-datasource + access: proxy + isDefault: true + jsonData: + host: clickhouse + port: 9000 + protocol: native + username: analytics + defaultDatabase: analytics + secureJsonData: + password: ${CH_PASSWORD} diff --git a/etl_pc.py b/etl_pc.py new file mode 100644 index 0000000..8c40285 --- /dev/null +++ b/etl_pc.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +"""ETL PC → ClickHouse (cada 5min). Extrae las fuentes que viven en este PC: +Jellyfin (catalogo, usuarios, reproduccion, sesiones), Radarr/Sonarr (history+queue), +Prowlarr (indexers), y el catalogo gnula (SQLite). Empuja snapshots con snapshot_ts a +ClickHouse del VPS a traves de un tunel SSH (CH HTTP escucha solo en 127.0.0.1 del VPS). + +Reusa funciones del registry: clickhouse_insert_rows_py_infra. +Secrets en ~/.config/popelis/analytics.env (chmod600; el timer no puede usar pass/GPG). + +Uso: python etl_pc.py [--once] [--dry] +""" +import json +import os +import re +import sqlite3 +import subprocess +import sys +import time +import urllib.request +import urllib.parse +import urllib.error +from datetime import datetime, timezone + +# --- registry --- +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions")) +try: + from infra import clickhouse_insert_rows + if not callable(clickhouse_insert_rows): + raise ImportError +except ImportError: + from infra.clickhouse_insert_rows import clickhouse_insert_rows # noqa: E402 + +ENV_PATH = os.path.expanduser("~/.config/popelis/analytics.env") + + +def load_env(path): + cfg = {} + with open(path) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + k, v = line.split("=", 1) + cfg[k] = v + return cfg + + +CFG = load_env(ENV_PATH) +DRY = "--dry" in sys.argv +LOCAL_PORT = 18123 +SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + +def http_json(url, headers=None, data=None, method="GET", timeout=20): + req = urllib.request.Request(url, data=data, headers=headers or {}, method=method) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read()) + + +def ticks_to_min(t): + try: + return round(float(t) / 600_000_000.0, 2) # 1 tick = 100ns + except Exception: + return 0.0 + + +def fmt_dt(s): + """Jellyfin/ISO → 'YYYY-MM-DD HH:MM:SS' o '' .""" + if not s: + return None + s = str(s).replace("Z", "").split(".")[0].replace("T", " ") + return s[:19] if len(s) >= 19 else None + + +# ---------- TUNEL SSH ---------- +class Tunnel: + def __init__(self, host, lport, rhost="127.0.0.1", rport=8123): + self.host, self.lport, self.rhost, self.rport = host, lport, rhost, rport + self.proc = None + + def __enter__(self): + self.proc = subprocess.Popen( + ["ssh", "-N", "-o", "ExitOnForwardFailure=yes", "-o", "ConnectTimeout=10", + "-L", f"{self.lport}:{self.rhost}:{self.rport}", self.host], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + # esperar puerto + for _ in range(30): + try: + urllib.request.urlopen(f"http://127.0.0.1:{self.lport}/ping", timeout=2).read() + return self + except Exception: + time.sleep(0.3) + raise RuntimeError("tunel SSH no abrio puerto local a tiempo") + + def __exit__(self, *a): + if self.proc: + self.proc.terminate() + + +# ---------- EXTRACTORES ---------- +def jf_auth(): + body = json.dumps({"Username": CFG["JF_USER"], "Pw": CFG["JF_PASS"]}).encode() + hdr = {"Content-Type": "application/json", + "X-Emby-Authorization": 'MediaBrowser Client="etl", Device="pc", DeviceId="etl-pc", Version="1.0"'} + d = http_json(f'{CFG["JF_URL"]}/Users/AuthenticateByName', hdr, body, "POST") + return d["AccessToken"] + + +def jf_get(token, path, params=""): + url = f'{CFG["JF_URL"]}{path}' + if params: + url += ("&" if "?" in path else "?") + params + return http_json(url, {"X-Emby-Token": token}, timeout=40) + + +def extract_jellyfin(): + out = {"jellyfin_items": [], "jellyfin_users": [], "jellyfin_user_items": [], + "jellyfin_sessions": []} + token = jf_auth() + # items + fields = "Genres,Path,RunTimeTicks,ProductionYear,CommunityRating,OfficialRating,DateCreated,SeriesName" + d = jf_get(token, "/Items", f"Recursive=true&IncludeItemTypes=Movie,Series,Episode&Fields={fields}") + for it in d.get("Items", []): + out["jellyfin_items"].append({ + "snapshot_ts": SNAP, "item_id": it.get("Id", ""), "type": it.get("Type", ""), + "name": it.get("Name", ""), "production_year": it.get("ProductionYear") or 0, + "runtime_min": ticks_to_min(it.get("RunTimeTicks", 0)), + "genres": it.get("Genres", []) or [], + "community_rating": it.get("CommunityRating") or 0.0, + "official_rating": it.get("OfficialRating", "") or "", + "series_name": it.get("SeriesName", "") or "", "library": "", + "path": it.get("Path", "") or "", + "date_created": fmt_dt(it.get("DateCreated")) or "1970-01-01 00:00:00", + }) + # users + users = jf_get(token, "/Users") + for u in users: + out["jellyfin_users"].append({ + "snapshot_ts": SNAP, "user_id": u.get("Id", ""), "name": u.get("Name", ""), + "last_login": fmt_dt(u.get("LastLoginDate")) or "1970-01-01 00:00:00", + "last_activity": fmt_dt(u.get("LastActivityDate")) or "1970-01-01 00:00:00", + "is_admin": 1 if u.get("Policy", {}).get("IsAdministrator") else 0, + }) + # played items por usuario (Movie+Episode vistos) + try: + pi = jf_get(token, f'/Users/{u["Id"]}/Items', + "Recursive=true&IncludeItemTypes=Movie,Episode&IsPlayed=true&Fields=UserData") + for it in pi.get("Items", []): + ud = it.get("UserData", {}) or {} + out["jellyfin_user_items"].append({ + "snapshot_ts": SNAP, "user_id": u.get("Id", ""), "user_name": u.get("Name", ""), + "item_id": it.get("Id", ""), "item_name": it.get("Name", ""), + "type": it.get("Type", ""), "played": 1 if ud.get("Played") else 0, + "play_count": ud.get("PlayCount", 0) or 0, + "playback_pct": round(ud.get("PlayedPercentage", 0.0) or 0.0, 2), + "last_played": fmt_dt(ud.get("LastPlayedDate")) or "1970-01-01 00:00:00", + }) + except Exception as e: + print(f"[jf] user_items {u.get('Name')}: {e}", file=sys.stderr) + # sesiones activas + try: + for s in jf_get(token, "/Sessions"): + np = s.get("NowPlayingItem") + if not np: + continue + ps = s.get("PlayState", {}) or {} + pos = ticks_to_min(ps.get("PositionTicks", 0)) + dur = ticks_to_min(np.get("RunTimeTicks", 0)) or 1 + out["jellyfin_sessions"].append({ + "snapshot_ts": SNAP, "user_name": s.get("UserName", ""), + "item_name": np.get("Name", ""), "item_type": np.get("Type", ""), + "client": s.get("Client", ""), "device": s.get("DeviceName", ""), + "play_method": ps.get("PlayMethod", ""), "is_paused": 1 if ps.get("IsPaused") else 0, + "position_pct": round(100.0 * pos / dur, 2), + }) + except Exception as e: + print(f"[jf] sessions: {e}", file=sys.stderr) + return out + + +def arr_get(base, key, path, ver="v3"): + sep = "&" if "?" in path else "?" + return http_json(f"{base}/api/{ver}/{path}{sep}apikey={key}", timeout=30) + + +def extract_arr(): + out = {"arr_history": [], "arr_queue": []} + for app, base, key in [("radarr", CFG["RADARR_URL"], CFG["RADARR_KEY"]), + ("sonarr", CFG["SONARR_URL"], CFG["SONARR_KEY"])]: + try: + h = arr_get(base, key, "history?page=1&pageSize=200&sortKey=date&sortDirection=descending") + for r in h.get("records", []): + out["arr_history"].append({ + "snapshot_ts": SNAP, "app": app, "history_id": r.get("id", 0), + "event_type": r.get("eventType", ""), + "title": (r.get("movie", {}) or r.get("series", {}) or {}).get("title", "") or "", + "source_title": r.get("sourceTitle", "") or "", + "indexer": (r.get("data", {}) or {}).get("indexer", "") or "", + "download_client": (r.get("data", {}) or {}).get("downloadClient", "") or "", + "quality": (r.get("quality", {}) or {}).get("quality", {}).get("name", "") or "", + "languages": [l.get("name", "") for l in (r.get("languages", []) or [])], + "event_date": fmt_dt(r.get("date")) or "1970-01-01 00:00:00", + }) + except Exception as e: + print(f"[arr] {app} history: {e}", file=sys.stderr) + try: + q = arr_get(base, key, "queue?page=1&pageSize=100") + for r in q.get("records", []): + out["arr_queue"].append({ + "snapshot_ts": SNAP, "app": app, "title": r.get("title", "") or "", + "status": r.get("status", "") or "", + "tracked_status": r.get("trackedDownloadState", "") or "", + "size_bytes": int(r.get("size", 0) or 0), + "sizeleft_bytes": int(r.get("sizeleft", 0) or 0), + "timeleft": r.get("timeleft", "") or "", + "indexer": r.get("indexer", "") or "", + "download_client": r.get("downloadClient", "") or "", + }) + except Exception as e: + print(f"[arr] {app} queue: {e}", file=sys.stderr) + return out + + +def extract_prowlarr(): + out = {"prowlarr_indexers": []} + try: + idx = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexer", ver="v1") + stats = {} + try: + st = arr_get(CFG["PROWLARR_URL"], CFG["PROWLARR_KEY"], "indexerstats", ver="v1") + for s in st.get("indexers", []): + stats[s.get("indexerId")] = s + except Exception: + pass + for i in idx: + s = stats.get(i.get("id"), {}) + out["prowlarr_indexers"].append({ + "snapshot_ts": SNAP, "indexer_id": i.get("id", 0), "name": i.get("name", "") or "", + "enable": 1 if i.get("enable") else 0, "protocol": i.get("protocol", "") or "", + "privacy": i.get("privacy", "") or "", + "num_grabs": s.get("numberOfGrabs", 0) or 0, + "num_queries": s.get("numberOfQueries", 0) or 0, + "num_grab_fail": s.get("numberOfFailedGrabs", 0) or 0, + "num_query_fail": s.get("numberOfFailedQueries", 0) or 0, + }) + except Exception as e: + print(f"[prowlarr] {e}", file=sys.stderr) + return out + + +def extract_gnula(): + out = {"gnula_movies": []} + db = CFG.get("GNULA_DB", "") + if not (db and os.path.exists(db)): + return out + c = sqlite3.connect(db) + c.row_factory = sqlite3.Row + for r in c.execute("SELECT href,title,year,flags,lang_es,status,in_library,detected_at,downloaded_at FROM movies"): + out["gnula_movies"].append({ + "snapshot_ts": SNAP, "href": r["href"] or "", "title": r["title"] or "", + "year": r["year"] or 0, "flags": r["flags"] or "", "lang_es": r["lang_es"] or 0, + "status": r["status"] or "", "in_library": r["in_library"] or 0, + "detected_at": r["detected_at"] or "", "downloaded_at": r["downloaded_at"] or "", + }) + c.close() + return out + + +def main(): + data = {} + for fn in (extract_jellyfin, extract_arr, extract_prowlarr, extract_gnula): + try: + data.update(fn()) + except Exception as e: + print(f"[etl] {fn.__name__} FALLO: {e}", file=sys.stderr) + counts = {t: len(rows) for t, rows in data.items()} + print(f"[etl] snapshot {SNAP} extraido: {json.dumps(counts)}") + if DRY: + print("[etl] --dry: no se inserta"); return + base = f"http://127.0.0.1:{LOCAL_PORT}" + total = 0 + with Tunnel(CFG["SSH_HOST"], LOCAL_PORT): + for table, rows in data.items(): + if not rows: + continue + try: + n = clickhouse_insert_rows(base, f'{CFG["CH_DB"]}.{table}', rows, + user=CFG["CH_USER"], password=CFG["CH_PASSWORD"], + database=CFG["CH_DB"]) + total += n + except Exception as e: + print(f"[etl] insert {table} FALLO: {e}", file=sys.stderr) + print(json.dumps({"snapshot_ts": SNAP, "inserted": total, "tables": counts})) + + +if __name__ == "__main__": + main() diff --git a/etl_vps.py b/etl_vps.py new file mode 100644 index 0000000..da50098 --- /dev/null +++ b/etl_vps.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +"""ETL VPS → ClickHouse (cada 5min). Corre EN el VPS datardos (no en el PC) porque +popelis-db (Postgres) solo es alcanzable en la red coolify. Lee popelis-db via +`docker exec popelis-db psql` y empuja a ClickHouse por su HTTP local (127.0.0.1:8123). + +Standalone a proposito: el VPS no tiene el registry fn_registry checkouteado, asi que +no importa clickhouse_insert_rows_py_infra — replica el POST JSONEachRow minimal. + +- users, mylist: snapshot completo cada run (snapshot_ts). +- events: incremental por id (> max(event_id) ya en CH; ReplacingMergeTree dedup). + +Lee creds de /opt/analytics/.env (CH_PASSWORD). Pensado para systemd timer en el VPS. +Uso: sudo python3 etl_vps.py +""" +import json +import subprocess +import sys +import urllib.request +import urllib.parse +import urllib.error +from datetime import datetime, timezone + +ENV = "/opt/analytics/.env" +CH_URL = "http://127.0.0.1:8123" +CH_USER = "analytics" +CH_DB = "analytics" +PG_CONTAINER = "popelis-db" +PG_USER = "popelis" +PG_DB = "popelis" +SNAP = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") + + +def ch_password(): + with open(ENV) as f: + for line in f: + if line.startswith("CH_PASSWORD="): + return line.strip().split("=", 1)[1] + raise RuntimeError("CH_PASSWORD no en " + ENV) + + +CH_PASS = ch_password() + + +def pg_json(sql): + """Ejecuta SQL en popelis-db y devuelve list[dict] (via json_agg).""" + wrapped = f"SELECT COALESCE(json_agg(t), '[]') FROM ({sql}) t" + out = subprocess.check_output( + ["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, + "-tAc", wrapped], text=True) + return json.loads(out.strip() or "[]") + + +def ch_query(sql): + url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'default_format': 'JSONEachRow'})}" + req = urllib.request.Request(url, data=sql.encode(), + headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS}) + with urllib.request.urlopen(req, timeout=30) as r: + body = r.read().decode() + return [json.loads(l) for l in body.splitlines() if l.strip()] + + +def ch_insert(table, rows): + if not rows: + return 0 + body = "\n".join(json.dumps(r, separators=(",", ":")) for r in rows).encode() + q = f"INSERT INTO {CH_DB}.{table} FORMAT JSONEachRow" + url = f"{CH_URL}/?{urllib.parse.urlencode({'database': CH_DB, 'query': q})}" + req = urllib.request.Request(url, data=body, + headers={"X-ClickHouse-User": CH_USER, "X-ClickHouse-Key": CH_PASS, + "Content-Type": "text/plain"}) + try: + urllib.request.urlopen(req, timeout=30).read() + except urllib.error.HTTPError as e: + raise ValueError(f"CH {e.code}: {e.read()[:300]}") + return len(rows) + + +def fmt_ts(s): + if not s: + return "1970-01-01 00:00:00" + return str(s).replace("T", " ").split(".")[0].split("+")[0][:19] + + +def main(): + total = 0 + # users (snapshot) + users = pg_json("SELECT id AS user_id, username, jf_user_id, " + "to_char(COALESCE(created_at, now()),'YYYY-MM-DD HH24:MI:SS') AS created_at " + "FROM users") if has_col("users", "created_at") else \ + pg_json("SELECT id AS user_id, username, jf_user_id FROM users") + for u in users: + u["snapshot_ts"] = SNAP + u.setdefault("created_at", "1970-01-01 00:00:00") + u["created_at"] = fmt_ts(u["created_at"]) + total += ch_insert("popelis_users", users) + + # mylist (snapshot) + ml = pg_json("SELECT user_id, item_id, " + "to_char(COALESCE(added_at, now()),'YYYY-MM-DD HH24:MI:SS') AS added_at FROM mylist") \ + if has_col("mylist", "added_at") else \ + pg_json("SELECT user_id, item_id FROM mylist") + for m in ml: + m["snapshot_ts"] = SNAP + m.setdefault("added_at", "1970-01-01 00:00:00") + m["added_at"] = fmt_ts(m["added_at"]) + total += ch_insert("popelis_mylist", ml) + + # events (incremental por id) + last = 0 + try: + r = ch_query("SELECT max(event_id) AS m FROM popelis_events") + last = int(r[0]["m"]) if r and r[0].get("m") not in (None, "") else 0 + except Exception: + last = 0 + ev = pg_json(f"SELECT id AS event_id, " + f"to_char(ts,'YYYY-MM-DD HH24:MI:SS') AS event_ts, " + f"COALESCE(user_id,0) AS user_id, username, event_type, item_id, meta " + f"FROM events WHERE id > {last} ORDER BY id") + for e in ev: + e["event_ts"] = fmt_ts(e["event_ts"]) + total += ch_insert("popelis_events", ev) + + print(json.dumps({"snapshot_ts": SNAP, "users": len(users), "mylist": len(ml), + "events_new": len(ev), "inserted": total})) + + +def has_col(table, col): + try: + out = subprocess.check_output( + ["docker", "exec", PG_CONTAINER, "psql", "-U", PG_USER, "-d", PG_DB, "-tAc", + f"SELECT 1 FROM information_schema.columns WHERE table_name='{table}' AND column_name='{col}'"], + text=True).strip() + return out == "1" + except Exception: + return False + + +if __name__ == "__main__": + main()