"""UPSERT idempotente de filas en una tabla PostgreSQL con ownership selectivo de columnas. Funcion impura: abre una conexion psycopg2 con el DSN dado, ejecuta un `INSERT INTO (cols) VALUES %s ON CONFLICT (key_cols) DO UPDATE SET col = EXCLUDED.col, ...` (o `DO NOTHING`) en lote con psycopg2.extras.execute_values, hace commit y cierra en try/finally. Devuelve un dict sin lanzar, siguiendo el estilo de duckdb_upsert del registry: {status:'ok', inserted, updated} en exito y {status:'error', error:str} en fallo. El valor de esta funcion es el "ownership selectivo": al actualizar solo las columnas indicadas en `update_cols` en caso de conflicto, un re-upsert de la misma clave NO pisa las columnas que se dejaron fuera. update_cols=None actualiza todas las columnas menos las key_cols; update_cols=[] hace DO NOTHING (inserta solo filas nuevas). El conteo insert vs update se obtiene del pseudo-columna del sistema `xmax`: en la fila devuelta por RETURNING, xmax = 0 indica un INSERT puro y xmax distinto de 0 indica un UPDATE por conflicto. Identificadores (tabla y columnas) se validan contra `[A-Za-z_][A-Za-z0-9_]*` antes de interpolarlos en el SQL (no se pueden parametrizar identificadores); los valores de las filas siempre van por placeholders de psycopg2. """ import re _IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") def _validate_ident(name: str) -> str: """Valida que `name` sea un identificador SQL seguro y lo devuelve. Acepta solo nombres que casen `[A-Za-z_][A-Za-z0-9_]*`. Lanza ValueError para cualquier otro (espacios, comillas, puntos, vacio), que el caller convierte en {status:'error'}. """ if not isinstance(name, str) or not _IDENT_RE.match(name): raise ValueError(f"identificador invalido: {name!r}") return name def pg_upsert( dsn: str, table: str, rows: list, key_cols: list, update_cols: list = None, ) -> dict: """Hace UPSERT idempotente de `rows` en `table`, con ownership selectivo. Construye `INSERT INTO
(cols) VALUES %s ON CONFLICT (key_cols) DO UPDATE SET col = EXCLUDED.col, ...` (o `DO NOTHING`) y lo ejecuta en lote con execute_values, distinguiendo inserts de updates via el pseudo-columna `xmax` en RETURNING. Args: dsn: cadena de conexion PostgreSQL, p.ej. "postgresql://user:pass@localhost:5433/trends". table: nombre de la tabla destino. Validado como identificador SQL [A-Za-z_][A-Za-z0-9_]*. La tabla debe existir y key_cols debe tener PRIMARY KEY o UNIQUE para que ON CONFLICT funcione. rows: lista de dicts, un dict por fila (clave = nombre de columna). El esquema de insercion lo fija el conjunto de claves de la PRIMERA fila; todas las filas deben tener exactamente las mismas claves o se devuelve {status:'error'}. Lista vacia -> {status:'ok', inserted:0, updated:0}. key_cols: columnas de la clave de conflicto. Deben existir como PRIMARY KEY o UNIQUE en la tabla y estar presentes en las claves de cada fila. No puede estar vacia. update_cols: columnas a actualizar en caso de conflicto. None (default) -> todas las columnas de la fila MENOS las key_cols. Lista vacia [] -> DO NOTHING (inserta nuevas, no toca existentes). Lista con columnas -> DO UPDATE SET solo esas (las no listadas conservan su valor previo: ownership selectivo). Returns: dict. En exito: {status:'ok', inserted:int, updated:int} donde inserted cuenta las filas nuevas (xmax = 0 en RETURNING) y updated las filas que ya existian y se actualizaron. Con update_cols=[] (DO NOTHING) las filas en conflicto NO se devuelven por RETURNING, asi que no cuentan ni como insert ni como update. En error (sin lanzar): {status:'error', error:str}. """ try: import psycopg2 from psycopg2 import extras as pg_extras except ImportError as exc: # pragma: no cover - exercised only without dep return { "status": "error", "error": ( "psycopg2 is required for pg_upsert; install psycopg2-binary " f"({exc})" ), } conn = None try: if not isinstance(rows, list): raise ValueError("rows debe ser una lista de dicts") if not rows: return {"status": "ok", "inserted": 0, "updated": 0} # Esquema de insercion = claves de la primera fila, en orden estable. first_keys = list(rows[0].keys()) insert_cols = [_validate_ident(c) for c in first_keys] insert_set = set(first_keys) # Todas las filas deben tener exactamente las mismas claves. for i, row in enumerate(rows): if not isinstance(row, dict): raise ValueError(f"rows[{i}] no es un dict") if set(row.keys()) != insert_set: raise ValueError( f"rows[{i}] tiene columnas distintas a la primera fila: " f"{sorted(row.keys())} vs {sorted(first_keys)}" ) keys = [_validate_ident(c) for c in key_cols] if not keys: raise ValueError("key_cols no puede estar vacio") for k in keys: if k not in insert_set: raise ValueError(f"key_col {k!r} no esta en las columnas de las filas") # Resolver update_cols. if update_cols is None: updates = [c for c in insert_cols if c not in keys] else: updates = [_validate_ident(c) for c in update_cols] for u in updates: if u not in insert_set: raise ValueError( f"update_col {u!r} no esta en las columnas de las filas" ) cols_sql = ", ".join(insert_cols) conflict_sql = ", ".join(keys) if updates: set_sql = ", ".join(f"{c} = EXCLUDED.{c}" for c in updates) on_conflict = f"ON CONFLICT ({conflict_sql}) DO UPDATE SET {set_sql}" else: on_conflict = f"ON CONFLICT ({conflict_sql}) DO NOTHING" # RETURNING (xmax = 0) AS inserted: True en INSERT puro, False en UPDATE. # En DO NOTHING las filas en conflicto NO se devuelven por RETURNING. sql = ( f"INSERT INTO {table} ({cols_sql}) VALUES %s {on_conflict} " f"RETURNING (xmax = 0) AS inserted" ) values = [tuple(row[c] for c in insert_cols) for row in rows] conn = psycopg2.connect(dsn) with conn.cursor() as cur: returned = pg_extras.execute_values(cur, sql, values, fetch=True) conn.commit() inserted = sum(1 for r in returned if r[0]) updated = sum(1 for r in returned if not r[0]) return {"status": "ok", "inserted": inserted, "updated": updated} except Exception as e: # noqa: BLE001 if conn is not None: conn.rollback() return {"status": "error", "error": str(e)} finally: if conn is not None: conn.close()