763e06c127
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
197 lines
8.0 KiB
Python
197 lines
8.0 KiB
Python
"""Pipeline de ingesta diaria de Google Search Console (Search Analytics).
|
|
|
|
Orquesta el snapshot diario de Search Console de una propiedad: autentica con
|
|
una service account, extrae las filas de Search Analytics, las acumula de forma
|
|
idempotente en una tabla DuckDB (la fuente de verdad histórica) y espeja la
|
|
tabla completa a PostgreSQL para que herramientas BI como Metabase la lean.
|
|
|
|
Es un pipeline (kind: pipeline -> siempre impuro): compone funciones del
|
|
registry sin reescribir su lógica. Devuelve un dict sin lanzar excepciones,
|
|
siguiendo el estilo del grupo duckdb/etl del registry:
|
|
{status:'ok', ...} en éxito y {status:'error', error:str} en fallo.
|
|
"""
|
|
|
|
import os
|
|
from datetime import date, timedelta
|
|
|
|
from infra import gsc_auth, duckdb_execute
|
|
from infra.duckdb_upsert import duckdb_upsert
|
|
from datascience import pull_gsc_search_analytics
|
|
from pipelines.duckdb_to_postgres import duckdb_to_postgres
|
|
|
|
# DDL de la tabla acumulada. La restricción UNIQUE es exactamente la clave que
|
|
# duckdb_upsert necesita para que el re-pull de los últimos días actualice en
|
|
# lugar de duplicar (GSC corrige datos hasta ~3 días atrás).
|
|
_TABLE_DDL = """
|
|
CREATE TABLE IF NOT EXISTS gsc_search_analytics (
|
|
snapshot_date DATE, data_date DATE, site_url TEXT, query TEXT, page TEXT,
|
|
country TEXT, device TEXT, search_type TEXT,
|
|
clicks INTEGER, impressions INTEGER, ctr DOUBLE, position DOUBLE,
|
|
UNIQUE (site_url, data_date, query, page, country, device, search_type)
|
|
);
|
|
"""
|
|
|
|
# Columnas que forman la clave única; también las que usa el upsert.
|
|
_KEY_COLS = ["site_url", "data_date", "query", "page", "country", "device", "search_type"]
|
|
|
|
# Lag de la API de GSC: los datos no están consolidados hasta ~3 días después.
|
|
_GSC_LAG_DAYS = 3
|
|
|
|
|
|
def ingest_gsc_search_analytics(
|
|
site_url: str = "",
|
|
duckdb_path: str = "",
|
|
pg_dsn: str = "",
|
|
start_date: str = "",
|
|
end_date: str = "",
|
|
lookback_days: int = 5,
|
|
credentials_path: str = "",
|
|
) -> dict:
|
|
"""Ingesta diaria de Google Search Console: GSC -> DuckDB -> PostgreSQL.
|
|
|
|
Pasos en orden: (1) resuelve defaults desde env; (2) resuelve fechas teniendo
|
|
en cuenta el lag de ~3 días de la API; (3) crea la tabla DuckDB si no existe;
|
|
(4) autentica con la service account; (5) extrae Search Analytics por las
|
|
dimensiones date/query/page; (6) transforma cada fila a la forma de la tabla
|
|
(renombrando ``date`` -> ``data_date`` y rellenando defaults estables para
|
|
las dimensiones no pedidas); (7) hace upsert idempotente en DuckDB; (8) espeja
|
|
la tabla completa a PostgreSQL en modo ``replace``.
|
|
|
|
DuckDB es la verdad acumulada (histórico append idempotente). PostgreSQL es
|
|
un espejo regenerado por completo en cada corrida (mode='replace') para que
|
|
Metabase tenga siempre el snapshot íntegro sin acumular duplicados.
|
|
|
|
Args:
|
|
site_url: propiedad de Search Console (``sc-domain:ejemplo.com`` o la
|
|
URL de prefijo ``https://ejemplo.com/``). Si está vacío, se lee de la
|
|
env var ``GSC_SITE_URL``. Obligatorio (ValueError si falta).
|
|
duckdb_path: ruta al archivo DuckDB de la fuente de verdad. Si está vacío,
|
|
se lee de la env var ``SEO_DUCKDB`` y, en su defecto, se usa
|
|
``~/.fn_seo/seo.duckdb``. El directorio padre se crea si no existe.
|
|
pg_dsn: cadena de conexión PostgreSQL del espejo BI. Si está vacío, se lee
|
|
de la env var ``SEO_DSN``. Obligatorio (ValueError si falta).
|
|
start_date: fecha inicial inclusiva ``YYYY-MM-DD``. Si está vacía, se
|
|
calcula como hoy - (3 + lookback_days).
|
|
end_date: fecha final inclusiva ``YYYY-MM-DD``. Si está vacía, se calcula
|
|
como hoy - 3 (lag de la API).
|
|
lookback_days: nº de días extra hacia atrás que se re-pullean para que el
|
|
upsert corrija los datos que GSC ajusta a posteriori. Default 5.
|
|
credentials_path: ruta al JSON de la service account. Se pasa tal cual a
|
|
``gsc_auth``, que ya hace su propio fallback a la env var
|
|
``GSC_SA_JSON``.
|
|
|
|
Returns:
|
|
dict. En éxito: ``{"status": "ok", "site_url", "start_date", "end_date",
|
|
"rows_pulled", "duckdb", "postgres"}`` donde ``duckdb`` es el resultado
|
|
del upsert y ``postgres`` el del espejo. En error (sin lanzar):
|
|
``{"status": "error", "error": str}``.
|
|
"""
|
|
try:
|
|
# (1) Defaults desde env.
|
|
site_url = site_url or os.environ.get("GSC_SITE_URL", "")
|
|
pg_dsn = pg_dsn or os.environ.get("SEO_DSN", "")
|
|
duckdb_path = (
|
|
duckdb_path
|
|
or os.environ.get("SEO_DUCKDB", "")
|
|
or os.path.expanduser("~/.fn_seo/seo.duckdb")
|
|
)
|
|
|
|
if not site_url:
|
|
raise ValueError(
|
|
"ingest_gsc_search_analytics: falta site_url. Pásalo o define la "
|
|
"env var GSC_SITE_URL con la propiedad de Search Console."
|
|
)
|
|
if not pg_dsn:
|
|
raise ValueError(
|
|
"ingest_gsc_search_analytics: falta pg_dsn. Pásalo o define la "
|
|
"env var SEO_DSN con la cadena de conexión PostgreSQL del espejo."
|
|
)
|
|
|
|
# (2) Fechas: la API de GSC tiene ~3 días de lag.
|
|
today = date.today()
|
|
if not end_date:
|
|
end_date = (today - timedelta(days=_GSC_LAG_DAYS)).isoformat()
|
|
if not start_date:
|
|
start_date = (
|
|
today - timedelta(days=_GSC_LAG_DAYS + int(lookback_days))
|
|
).isoformat()
|
|
|
|
# (3) Crear la tabla DuckDB si no existe (y su directorio padre).
|
|
parent = os.path.dirname(duckdb_path)
|
|
if parent:
|
|
os.makedirs(parent, exist_ok=True)
|
|
ddl_res = duckdb_execute(duckdb_path, _TABLE_DDL)
|
|
if ddl_res.get("status") != "ok":
|
|
return {"status": "error", "error": f"create table: {ddl_res.get('error')}"}
|
|
|
|
# (4) Autenticar.
|
|
service = gsc_auth(credentials_path)
|
|
|
|
# (5) Extraer. Con dimensions=["date","query","page"] cada fila trae las
|
|
# claves "date", "query", "page" más las métricas.
|
|
raw = pull_gsc_search_analytics(
|
|
service,
|
|
site_url,
|
|
start_date,
|
|
end_date,
|
|
dimensions=["date", "query", "page"],
|
|
)
|
|
|
|
# (6) Transformar a la forma de la tabla. La columna se llama data_date,
|
|
# no date -> renombrar. country/device se dejan vacíos y search_type="web"
|
|
# como defaults estables para que la tupla UNIQUE sea consistente.
|
|
snapshot_date = today.isoformat()
|
|
rows = [
|
|
{
|
|
"snapshot_date": snapshot_date,
|
|
"data_date": row["date"],
|
|
"site_url": site_url,
|
|
"query": row.get("query", ""),
|
|
"page": row.get("page", ""),
|
|
"country": "",
|
|
"device": "",
|
|
"search_type": "web",
|
|
"clicks": row.get("clicks"),
|
|
"impressions": row.get("impressions"),
|
|
"ctr": row.get("ctr"),
|
|
"position": row.get("position"),
|
|
}
|
|
for row in raw
|
|
]
|
|
|
|
# (7) Upsert idempotente en DuckDB (la verdad acumulada).
|
|
duckdb_res = duckdb_upsert(
|
|
duckdb_path,
|
|
"gsc_search_analytics",
|
|
rows,
|
|
key_cols=_KEY_COLS,
|
|
)
|
|
|
|
# (8) Espejo completo a PostgreSQL (regenerado cada vez).
|
|
pg_res = duckdb_to_postgres(
|
|
duckdb_path,
|
|
"gsc_search_analytics",
|
|
pg_dsn,
|
|
pg_table="gsc_search_analytics",
|
|
mode="replace",
|
|
)
|
|
|
|
return {
|
|
"status": "ok",
|
|
"site_url": site_url,
|
|
"start_date": start_date,
|
|
"end_date": end_date,
|
|
"rows_pulled": len(raw),
|
|
"duckdb": duckdb_res,
|
|
"postgres": pg_res,
|
|
}
|
|
except Exception as e: # noqa: BLE001
|
|
# Pipeline impuro de borde: nunca propagamos el crash, lo reportamos.
|
|
return {"status": "error", "error": str(e)}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import json
|
|
|
|
print(json.dumps(ingest_gsc_search_analytics(), indent=2, default=str))
|