Files
presupuestos_callcenter/run_via_metabase.py
T
2026-05-21 18:26:30 +02:00

344 lines
12 KiB
Python

#!/usr/bin/env python3
"""Execute the analysis queries against BigQuery via Metabase (db=6).
Saves results as CSV + JSON in data/results/ and prints summary numbers.
ADC-free: uses Metabase service account credentials.
"""
import os
import sys
import json
import csv
from pathlib import Path
sys.path.insert(0, os.path.join(os.environ.get("FN_REGISTRY_ROOT", "/home/egutierrez/fn_registry"), "python", "functions"))
import subprocess
import httpx
API_KEY = subprocess.check_output(["pass", "show", "metabase/aurgi-api-key"], text=True).strip().splitlines()[0]
BASE = "https://reports.autingo.es"
DB_ID = 6
HERE = Path(__file__).parent
OUT = HERE / "data" / "results"
OUT.mkdir(parents=True, exist_ok=True)
PROJECT = "autingo-159109"
DATASET = "psql_dcpublic"
WINDOW_DAYS = 90
REGEN_WINDOW_DAYS = 60
client = httpx.Client(
base_url=BASE,
headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
timeout=300.0,
)
def run_sql(sql: str) -> tuple[list[str], list[list]]:
payload = {
"type": "native",
"database": DB_ID,
"native": {"query": sql},
}
r = client.post("/api/dataset", json=payload)
r.raise_for_status()
data = r.json()["data"]
cols = [c["display_name"] for c in data["cols"]]
rows = data["rows"]
return cols, rows
def save(name: str, cols: list[str], rows: list[list]) -> None:
csv_path = OUT / f"{name}.csv"
with csv_path.open("w", newline="") as f:
w = csv.writer(f)
w.writerow(cols)
w.writerows(rows)
json_path = OUT / f"{name}.json"
with json_path.open("w") as f:
json.dump({"cols": cols, "rows": rows}, f, indent=2, default=str)
print(f" -> {csv_path.name} ({len(rows)} rows)")
# =====================================================================
# QUERY 0 — Sanity: usuarios call_center
# =====================================================================
print("\n[Q0] usuarios call_center")
cols, rows = run_sql(f"""
SELECT
COUNT(DISTINCT u.id) AS users_totales,
SUM(CASE WHEN u.is_active THEN 1 ELSE 0 END) AS users_activos
FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser` u
JOIN `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers` uc
ON u.id = uc.tpvuser_id
WHERE uc.dccenter_id IN (159, 162)
""")
save("00_users_callcenter", cols, rows)
print(f" {dict(zip(cols, rows[0]))}")
# =====================================================================
# QUERY 1 — Conversion rate global por origen
# =====================================================================
print(f"\n[Q1] conversion global ({WINDOW_DAYS}d)")
cols, rows = run_sql(f"""
WITH cc_users AS (
SELECT DISTINCT tpvuser_id AS user_id
FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers`
WHERE dccenter_id IN (159, 162)
)
SELECT
CASE WHEN cc.user_id IS NOT NULL THEN 'call_center' ELSE 'otro' END AS origen,
COUNT(*) AS quotes,
SUM(CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END) AS convertidos,
ROUND(SAFE_DIVIDE(SUM(CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END), COUNT(*)), 4) AS conv_rate
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
LEFT JOIN cc_users cc ON q.created_by_id = cc.user_id
LEFT JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON q.order_id = i.order_id
WHERE q.created_at >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY)
AND q.deleted_at IS NULL
GROUP BY 1
ORDER BY 1
""")
save("01_conversion_origen", cols, rows)
for r in rows:
print(f" {dict(zip(cols, r))}")
# =====================================================================
# QUERY 2 — 3 KPI por centro (A, B, C)
# =====================================================================
print(f"\n[Q2] KPI A/B/C por centro ({WINDOW_DAYS}d)")
cols, rows = run_sql(f"""
DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY);
WITH
cc_users AS (
SELECT DISTINCT tpvuser_id AS user_id
FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers`
WHERE dccenter_id IN (159, 162)
),
cc_converted AS (
SELECT
q.id AS quote_id, q.order_id, o.customer_id, o.vehicle_id,
o.terminal_id, t.center_id, o.total_cost
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
JOIN cc_users cc ON q.created_by_id = cc.user_id
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id
JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON i.order_id = o.id
LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id
WHERE q.created_at >= t_start AND q.deleted_at IS NULL
),
cc_clients AS (
SELECT DISTINCT customer_id, vehicle_id
FROM cc_converted
WHERE customer_id IS NOT NULL
),
all_invoices AS (
SELECT
i.id AS invoice_id, i.order_id,
o.customer_id, o.vehicle_id, o.terminal_id,
t.center_id, o.total_cost
FROM `{PROJECT}.{DATASET}.tpv_orders_invoice` i
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON i.order_id = o.id
LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id
WHERE i.created_at >= t_start
),
client_invoices AS (
SELECT ai.*
FROM all_invoices ai
JOIN cc_clients cc
ON ai.customer_id = cc.customer_id
AND ai.vehicle_id = cc.vehicle_id
WHERE ai.center_id NOT IN (159, 162)
),
kpi_a AS (
SELECT center_id,
COUNT(DISTINCT quote_id) AS quotes_cc_facturados,
ROUND(SUM(total_cost), 2) AS A_eur
FROM cc_converted
WHERE center_id IS NOT NULL AND center_id NOT IN (159,162)
GROUP BY center_id
),
kpi_b AS (
SELECT center_id,
COUNT(DISTINCT invoice_id) AS invoices_b,
ROUND(SUM(total_cost), 2) AS B_eur
FROM client_invoices
GROUP BY center_id
),
kpi_c AS (
SELECT center_id,
COUNT(DISTINCT invoice_id) AS invoices_c,
ROUND(SUM(total_cost), 2) AS C_eur
FROM all_invoices
WHERE center_id IS NOT NULL AND center_id NOT IN (159,162)
GROUP BY center_id
)
SELECT
c.id AS center_id,
c.name AS center_name,
COALESCE(a.quotes_cc_facturados, 0) AS quotes_cc_facturados,
COALESCE(a.A_eur, 0) AS A_quote_cc_eur,
COALESCE(b.B_eur, 0) AS B_mismo_cliente_eur,
COALESCE(c2.C_eur, 0) AS C_total_centro_eur,
ROUND(SAFE_DIVIDE(COALESCE(a.A_eur, 0), c2.C_eur), 4) AS A_sobre_C,
ROUND(SAFE_DIVIDE(COALESCE(b.B_eur, 0), c2.C_eur), 4) AS B_sobre_C,
ROUND(SAFE_DIVIDE(COALESCE(b.B_eur, 0), NULLIF(a.A_eur, 0)), 2) AS lift_B_vs_A
FROM `{PROJECT}.{DATASET}.centers` c
LEFT JOIN kpi_a a ON c.id = a.center_id
LEFT JOIN kpi_b b ON c.id = b.center_id
LEFT JOIN kpi_c c2 ON c.id = c2.center_id
WHERE COALESCE(c2.C_eur, 0) > 0
ORDER BY C_total_centro_eur DESC
""")
save("02_kpi_3_por_centro", cols, rows)
print(f" centros activos: {len(rows)}")
print(f" top5:")
for r in rows[:5]:
print(f" {r[1]:30} A={r[3]:>12,.0f} B={r[4]:>12,.0f} C={r[5]:>12,.0f}")
# Totales globales
A_total = sum(r[3] for r in rows)
B_total = sum(r[4] for r in rows)
C_total = sum(r[5] for r in rows)
print(f"\n TOTALES ({WINDOW_DAYS}d, centros sin call_center 159/162):")
print(f" A (€ quote cc facturados): {A_total:>15,.2f}")
print(f" B (€ mismo cliente centro): {B_total:>15,.2f}")
print(f" C (€ total centros): {C_total:>15,.2f}")
print(f" A/C = {A_total/C_total:.4f} B/C = {B_total/C_total:.4f} lift B/A = {B_total/A_total:.2f}x")
with (OUT / "totales_globales.json").open("w") as f:
json.dump({
"window_days": WINDOW_DAYS,
"A_quote_cc_eur": round(A_total, 2),
"B_mismo_cliente_eur": round(B_total, 2),
"C_total_centros_eur": round(C_total, 2),
"A_sobre_C": round(A_total / C_total, 4),
"B_sobre_C": round(B_total / C_total, 4),
"lift_B_vs_A": round(B_total / A_total, 2),
"centros_activos": len(rows),
}, f, indent=2)
# =====================================================================
# QUERY 3 — Regeneración por centro
# =====================================================================
print(f"\n[Q3] regeneración por centro ({WINDOW_DAYS}d Q0, {REGEN_WINDOW_DAYS}d window)")
cols, rows = run_sql(f"""
DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY);
WITH
cc_users AS (
SELECT DISTINCT tpvuser_id AS user_id
FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers`
WHERE dccenter_id IN (159, 162)
),
q0 AS (
SELECT q.id AS q0_id, q.order_id AS q0_order, q.created_at AS q0_ts,
o.customer_id, o.vehicle_id
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
JOIN cc_users cc ON q.created_by_id = cc.user_id
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id
WHERE q.created_at >= t_start AND q.deleted_at IS NULL
AND o.customer_id IS NOT NULL AND o.vehicle_id IS NOT NULL
),
qN AS (
SELECT q.id AS qn_id, q.order_id AS qn_order, q.created_at AS qn_ts,
o.customer_id, o.vehicle_id, t.center_id
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id
LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id
WHERE q.deleted_at IS NULL
AND t.center_id IS NOT NULL AND t.center_id NOT IN (159,162)
),
regen AS (
SELECT q0.q0_id, q0.q0_order, q0.customer_id, q0.vehicle_id,
qN.qn_id, qN.qn_order, qN.center_id AS regen_center,
TIMESTAMP_DIFF(qN.qn_ts, q0.q0_ts, HOUR) / 24 AS dias_entre
FROM q0
JOIN qN
ON q0.customer_id = qN.customer_id
AND q0.vehicle_id = qN.vehicle_id
AND qN.qn_ts > q0.q0_ts
AND qN.qn_ts <= TIMESTAMP_ADD(q0.q0_ts, INTERVAL {REGEN_WINDOW_DAYS} DAY)
AND qN.qn_order != q0.q0_order
)
SELECT
c.id AS center_id,
c.name AS center_name,
COUNT(DISTINCT r.q0_id) AS q0_regenerados_aqui,
COUNT(*) AS regen_events,
ROUND(AVG(r.dias_entre), 1) AS dias_avg_regen
FROM regen r
JOIN `{PROJECT}.{DATASET}.centers` c ON r.regen_center = c.id
GROUP BY c.id, c.name
ORDER BY q0_regenerados_aqui DESC
LIMIT 30
""")
save("03_regen_por_centro", cols, rows)
print(f" centros con regeneración: {len(rows)}")
print(f" top5:")
for r in rows[:5]:
print(f" {r[1]:30} q0={r[2]:>5} events={r[3]:>5} dias_avg={r[4]}")
# =====================================================================
# QUERY 4 — Totales Q0 con / sin regeneración
# =====================================================================
print(f"\n[Q4] Q0 con/sin regeneración")
cols, rows = run_sql(f"""
DECLARE t_start TIMESTAMP DEFAULT TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {WINDOW_DAYS} DAY);
WITH
cc_users AS (
SELECT DISTINCT tpvuser_id AS user_id
FROM `{PROJECT}.{DATASET}.tpv_authorization_tpvuser_centers`
WHERE dccenter_id IN (159, 162)
),
q0 AS (
SELECT q.id AS q0_id, q.order_id AS q0_order, q.created_at AS q0_ts,
o.customer_id, o.vehicle_id
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
JOIN cc_users cc ON q.created_by_id = cc.user_id
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id
WHERE q.created_at >= t_start AND q.deleted_at IS NULL
AND o.customer_id IS NOT NULL AND o.vehicle_id IS NOT NULL
),
qN AS (
SELECT q.order_id AS qn_order, q.created_at AS qn_ts,
o.customer_id, o.vehicle_id, t.center_id
FROM `{PROJECT}.{DATASET}.tpv_orders_quote` q
JOIN `{PROJECT}.{DATASET}.tpv_orders_order` o ON q.order_id = o.id
LEFT JOIN `{PROJECT}.{DATASET}.tpv_terminals` t ON o.terminal_id = t.id
WHERE q.deleted_at IS NULL
AND t.center_id IS NOT NULL AND t.center_id NOT IN (159,162)
),
regen AS (
SELECT DISTINCT q0.q0_id
FROM q0
JOIN qN
ON q0.customer_id = qN.customer_id
AND q0.vehicle_id = qN.vehicle_id
AND qN.qn_ts > q0.q0_ts
AND qN.qn_ts <= TIMESTAMP_ADD(q0.q0_ts, INTERVAL {REGEN_WINDOW_DAYS} DAY)
AND qN.qn_order != q0.q0_order
),
q0_inv AS (
SELECT q0.q0_id,
CASE WHEN i.id IS NOT NULL THEN 1 ELSE 0 END AS q0_factura
FROM q0
LEFT JOIN `{PROJECT}.{DATASET}.tpv_orders_invoice` i ON i.order_id = q0.q0_order
)
SELECT
CASE WHEN r.q0_id IS NOT NULL THEN 'regenerado' ELSE 'no_regenerado' END AS bucket,
COUNT(*) AS q0_total,
SUM(qi.q0_factura) AS q0_facturado_propio,
ROUND(SAFE_DIVIDE(SUM(qi.q0_factura), COUNT(*)), 4) AS conv_q0_propio
FROM q0_inv qi
LEFT JOIN regen r USING (q0_id)
GROUP BY bucket
ORDER BY bucket
""")
save("04_regen_vs_conversion", cols, rows)
for r in rows:
print(f" {dict(zip(cols, r))}")
print("\nDONE — resultados en data/results/")
client.close()