#!/usr/bin/env python3 """Execute the analysis queries against BigQuery via Metabase (db=6). Saves results as CSV + JSON in data/results/ and prints summary numbers. ADC-free: uses Metabase service account credentials. """ import os import sys import json import csv from pathlib import Path sys.path.insert(0, os.path.join(os.environ.get("FN_REGISTRY_ROOT", "/home/egutierrez/fn_registry"), "python", "functions")) import subprocess import httpx API_KEY = subprocess.check_output(["pass", "show", "metabase/aurgi-api-key"], text=True).strip().splitlines()[0] BASE = "https://reports.autingo.es" DB_ID = 6 HERE = Path(__file__).parent OUT = HERE / "data" / "results" OUT.mkdir(parents=True, exist_ok=True) PROJECT = "autingo-159109" DATASET = "psql_dcpublic" WINDOW_DAYS = 90 REGEN_WINDOW_DAYS = 60 client = httpx.Client( base_url=BASE, headers={"x-api-key": API_KEY, "Content-Type": "application/json"}, timeout=300.0, ) def run_sql(sql: str) -> tuple[list[str], list[list]]: payload = { "type": "native", "database": DB_ID, "native": {"query": sql}, } r = client.post("/api/dataset", json=payload) r.raise_for_status() data = r.json()["data"] cols = [c["display_name"] for c in data["cols"]] rows = data["rows"] return cols, rows def save(name: str, cols: list[str], rows: list[list]) -> None: csv_path = OUT / f"{name}.csv" with csv_path.open("w", newline="") as f: w = csv.writer(f) w.writerow(cols) w.writerows(rows) json_path = OUT / f"{name}.json" with json_path.open("w") as f: json.dump({"cols": cols, "rows": rows}, f, indent=2, default=str) print(f" -> {csv_path.name} ({len(rows)} rows)") # ===================================================================== # QUERY 0 — Sanity: usuarios call_center # ===================================================================== print("\n[Q0] usuarios call_center") cols, rows = run_sql(f""" SELECT COUNT(DISTINCT u.id) AS users_totales, SUM(CASE WHEN u.is_active THEN 1 ELSE 0 END) AS users_activos FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser` u JOIN `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` uc ON u.id = uc.tpvuser_id WHERE uc.dccenter_id IN (159, 162) """) save("00_users_callcenter", cols, rows) print(f" {dict(zip(cols, rows[0]))}") # ===================================================================== # QUERY 1 — Conversion rate global por origen # ===================================================================== print(f"\n[Q1] conversion global ({WINDOW_DAYS}d)") cols, rows = run_sql(f""" WITH cc_users AS ( SELECT DISTINCT tpvuser_id AS user_id FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` WHERE dccenter_id IN (159, 162) ) SELECT CASE WHEN cc.user_id IS NOT NULL THEN 'call_center' ELSE 'otro' END AS origen, COUNT(*) AS quotes, SUM(CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END) AS convertidos, ROUND(SAFE_DIVIDE(SUM(CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END), COUNT(*)), 4) AS conv_rate FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q LEFT JOIN cc_users cc ON q.created_by_id = cc.user_id LEFT JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON q.order_id = i.order_id WHERE q.created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY) AND q.deleted_at IS NULL GROUP BY 1 ORDER BY 1 """) save("01_conversion_origen", cols, rows) for r in rows: print(f" {dict(zip(cols, r))}") # ===================================================================== # QUERY 2 — 3 KPI por centro (A, B, C) # ===================================================================== print(f"\n[Q2] KPI A/B/C por centro ({WINDOW_DAYS}d)") cols, rows = run_sql(f""" DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY); WITH cc_users AS ( SELECT DISTINCT tpvuser_id AS user_id FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` WHERE dccenter_id IN (159, 162) ), cc_converted AS ( SELECT q.id AS quote_id, q.order_id, o.customer_id, o.vehicle_id, o.terminal_id, t.center_id, o.total_cost FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q JOIN cc_users cc ON q.created_by_id = cc.user_id JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON i.order_id = o.id LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id WHERE q.created_at >= t_start AND q.deleted_at IS NULL ), cc_clients AS ( SELECT DISTINCT customer_id, vehicle_id FROM cc_converted WHERE customer_id IS NOT NULL ), all_invoices AS ( SELECT i.id AS invoice_id, i.order_id, o.customer_id, o.vehicle_id, o.terminal_id, t.center_id, o.total_cost FROM `{PROJECT}.{DATASET}.tpv_orders_invoice` i JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON i.order_id = o.id LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id WHERE i.created_at >= t_start ), client_invoices AS ( SELECT ai.* FROM all_invoices ai JOIN cc_clients cc ON ai.customer_id = cc.customer_id AND ai.vehicle_id = cc.vehicle_id WHERE ai.center_id NOT IN (159, 162) ), kpi_a AS ( SELECT center_id, COUNT(DISTINCT quote_id) AS quotes_cc_facturados, ROUND(SUM(total_cost), 2) AS A_eur FROM cc_converted WHERE center_id IS NOT NULL AND center_id NOT IN (159,162) GROUP BY center_id ), kpi_b AS ( SELECT center_id, COUNT(DISTINCT invoice_id) AS invoices_b, ROUND(SUM(total_cost), 2) AS B_eur FROM client_invoices GROUP BY center_id ), kpi_c AS ( SELECT center_id, COUNT(DISTINCT invoice_id) AS invoices_c, ROUND(SUM(total_cost), 2) AS C_eur FROM all_invoices WHERE center_id IS NOT NULL AND center_id NOT IN (159,162) GROUP BY center_id ) SELECT c.id AS center_id, c.name AS center_name, COALESCE(a.quotes_cc_facturados, 0) AS quotes_cc_facturados, COALESCE(a.A_eur, 0) AS A_quote_cc_eur, COALESCE(b.B_eur, 0) AS B_mismo_cliente_eur, COALESCE(c2.C_eur, 0) AS C_total_centro_eur, ROUND(SAFE_DIVIDE(COALESCE(a.A_eur, 0), c2.C_eur), 4) AS A_sobre_C, ROUND(SAFE_DIVIDE(COALESCE(b.B_eur, 0), c2.C_eur), 4) AS B_sobre_C, ROUND(SAFE_DIVIDE(COALESCE(b.B_eur, 0), NULLIF(a.A_eur, 0)), 2) AS lift_B_vs_A FROM `{PROJECT}.{DATASET}.centers` c LEFT JOIN kpi_a a ON c.id = a.center_id LEFT JOIN kpi_b b ON c.id = b.center_id LEFT JOIN kpi_c c2 ON c.id = c2.center_id WHERE COALESCE(c2.C_eur, 0) > 0 ORDER BY C_total_centro_eur DESC """) save("02_kpi_3_por_centro", cols, rows) print(f" centros activos: {len(rows)}") print(f" top5:") for r in rows[:5]: print(f" {r[1]:30} A={r[3]:>12,.0f} B={r[4]:>12,.0f} C={r[5]:>12,.0f}") # Totales globales A_total = sum(r[3] for r in rows) B_total = sum(r[4] for r in rows) C_total = sum(r[5] for r in rows) print(f"\n TOTALES ({WINDOW_DAYS}d, centros sin call_center 159/162):") print(f" A (€ quote cc facturados): {A_total:>15,.2f}") print(f" B (€ mismo cliente centro): {B_total:>15,.2f}") print(f" C (€ total centros): {C_total:>15,.2f}") print(f" A/C = {A_total/C_total:.4f} B/C = {B_total/C_total:.4f} lift B/A = {B_total/A_total:.2f}x") with (OUT / "totales_globales.json").open("w") as f: json.dump({ "window_days": WINDOW_DAYS, "A_quote_cc_eur": round(A_total, 2), "B_mismo_cliente_eur": round(B_total, 2), "C_total_centros_eur": round(C_total, 2), "A_sobre_C": round(A_total / C_total, 4), "B_sobre_C": round(B_total / C_total, 4), "lift_B_vs_A": round(B_total / A_total, 2), "centros_activos": len(rows), }, f, indent=2) # ===================================================================== # QUERY 3 — Regeneración por centro # ===================================================================== print(f"\n[Q3] regeneración por centro ({WINDOW_DAYS}d Q0, {REGEN_WINDOW_DAYS}d window)") cols, rows = run_sql(f""" DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY); WITH cc_users AS ( SELECT DISTINCT tpvuser_id AS user_id FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` WHERE dccenter_id IN (159, 162) ), q0 AS ( SELECT q.id AS q0_id, q.order_id AS q0_order, q.created_at AS q0_ts, o.customer_id, o.vehicle_id FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q JOIN cc_users cc ON q.created_by_id = cc.user_id JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id WHERE q.created_at >= t_start AND q.deleted_at IS NULL AND o.customer_id IS NOT NULL AND o.vehicle_id IS NOT NULL ), qN AS ( SELECT q.id AS qn_id, q.order_id AS qn_order, q.created_at AS qn_ts, o.customer_id, o.vehicle_id, t.center_id FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id WHERE q.deleted_at IS NULL AND t.center_id IS NOT NULL AND t.center_id NOT IN (159,162) ), regen AS ( SELECT q0.q0_id, q0.q0_order, q0.customer_id, q0.vehicle_id, qN.qn_id, qN.qn_order, qN.center_id AS regen_center, TIMESTAMP_DIFF(qN.qn_ts, q0.q0_ts, HOUR) / 24 AS dias_entre FROM q0 JOIN qN ON q0.customer_id = qN.customer_id AND q0.vehicle_id = qN.vehicle_id AND qN.qn_ts > q0.q0_ts AND qN.qn_ts <= TIMESTAMP_ADD(q0.q0_ts, INTERVAL {REGEN_WINDOW_DAYS} DAY) AND qN.qn_order != q0.q0_order ) SELECT c.id AS center_id, c.name AS center_name, COUNT(DISTINCT r.q0_id) AS q0_regenerados_aqui, COUNT(*) AS regen_events, ROUND(AVG(r.dias_entre), 1) AS dias_avg_regen FROM regen r JOIN `{PROJECT}.{DATASET}.centers` c ON r.regen_center = c.id GROUP BY c.id, c.name ORDER BY q0_regenerados_aqui DESC LIMIT 30 """) save("03_regen_por_centro", cols, rows) print(f" centros con regeneración: {len(rows)}") print(f" top5:") for r in rows[:5]: print(f" {r[1]:30} q0={r[2]:>5} events={r[3]:>5} dias_avg={r[4]}") # ===================================================================== # QUERY 4 — Totales Q0 con / sin regeneración # ===================================================================== print(f"\n[Q4] Q0 con/sin regeneración") cols, rows = run_sql(f""" DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY); WITH cc_users AS ( SELECT DISTINCT tpvuser_id AS user_id FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` WHERE dccenter_id IN (159, 162) ), q0 AS ( SELECT q.id AS q0_id, q.order_id AS q0_order, q.created_at AS q0_ts, o.customer_id, o.vehicle_id FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q JOIN cc_users cc ON q.created_by_id = cc.user_id JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id WHERE q.created_at >= t_start AND q.deleted_at IS NULL AND o.customer_id IS NOT NULL AND o.vehicle_id IS NOT NULL ), qN AS ( SELECT q.order_id AS qn_order, q.created_at AS qn_ts, o.customer_id, o.vehicle_id, t.center_id FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id WHERE q.deleted_at IS NULL AND t.center_id IS NOT NULL AND t.center_id NOT IN (159,162) ), regen AS ( SELECT DISTINCT q0.q0_id FROM q0 JOIN qN ON q0.customer_id = qN.customer_id AND q0.vehicle_id = qN.vehicle_id AND qN.qn_ts > q0.q0_ts AND qN.qn_ts <= TIMESTAMP_ADD(q0.q0_ts, INTERVAL {REGEN_WINDOW_DAYS} DAY) AND qN.qn_order != q0.q0_order ), q0_inv AS ( SELECT q0.q0_id, CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END AS q0_factura FROM q0 LEFT JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON i.order_id = q0.q0_order ) SELECT CASE WHEN r.q0_id IS NOT NULL THEN 'regenerado' ELSE 'no_regenerado' END AS bucket, COUNT(*) AS q0_total, SUM(qi.q0_factura) AS q0_facturado_propio, ROUND(SAFE_DIVIDE(SUM(qi.q0_factura), COUNT(*)), 4) AS conv_q0_propio FROM q0_inv qi LEFT JOIN regen r USING (q0_id) GROUP BY bucket ORDER BY bucket """) save("04_regen_vs_conversion", cols, rows) for r in rows: print(f" {dict(zip(cols, r))}") print("\nDONE — resultados en data/results/") client.close()