--- name: duckdb_upsert kind: function lang: py domain: infra version: "1.0.0" purity: impure signature: "def duckdb_upsert(db_path: str, table: str, rows: list[dict], key_cols: list[str], update_cols: list[str] | None = None) -> dict" description: "UPSERT idempotente de filas en una tabla DuckDB con ownership selectivo de columnas. Construye INSERT INTO (cols) VALUES (?,...) ON CONFLICT (key_cols) DO UPDATE SET col=excluded.col, ... (o DO NOTHING) y lo ejecuta fila por fila para contar inserts vs updates. La clave del patron es update_cols: en un conflicto solo se actualizan esas columnas, de modo que las columnas excluidas conservan su valor previo (la DB es duena de ellas y un re-ingest no las pisa). update_cols=None actualiza todas menos key_cols; update_cols=[] hace DO NOTHING. Abre duckdb.connect(db_path) en lectura-escritura, commit y close en try/finally. Valida que tabla y columnas casen [A-Za-z_][A-Za-z0-9_]* antes de interpolarlas; los valores van por placeholders '?'. Devuelve dict sin lanzar: {status:'ok', inserted, updated} o {status:'error', error}. key_cols deben tener PRIMARY KEY o UNIQUE en la tabla. Depende del paquete duckdb (1.5.2 en python/.venv)." tags: [duckdb, sql, upsert, idempotent, infra] uses_functions: [] uses_types: [] returns: [] returns_optional: false error_type: "error_py_core" imports: [re, duckdb] params: - name: db_path desc: "ruta al archivo DuckDB. Se abre en lectura-escritura (duckdb.connect), por lo que se crea si no existe; pero la tabla destino debe existir y tener PRIMARY KEY o UNIQUE en key_cols para que ON CONFLICT funcione." - name: table desc: "nombre de la tabla destino. Validado como identificador SQL [A-Za-z_][A-Za-z0-9_]*; un nombre raro devuelve {status:'error'} (no se interpola sin validar)." - name: rows desc: "lista de dicts, un dict por fila (clave=nombre de columna). El esquema de insercion lo fija el conjunto de claves de la PRIMERA fila; todas las filas deben tener exactamente las mismas claves o se devuelve error. Lista vacia -> {status:'ok', inserted:0, updated:0}." - name: key_cols desc: "columnas de la clave de conflicto. Deben existir como PRIMARY KEY o UNIQUE en la tabla y estar presentes en las claves de cada fila. No puede estar vacia." - name: update_cols desc: "columnas a actualizar en caso de conflicto. None (default) = todas las columnas de la fila menos key_cols. Lista vacia [] = DO NOTHING (inserta nuevas, no toca existentes). Lista con columnas = DO UPDATE SET solo esas; las no listadas conservan su valor previo (ownership selectivo)." output: "dict. En exito: {status:'ok', inserted:int, updated:int} donde inserted cuenta las claves que no existian y updated las que ya existian (con update_cols=[] / DO NOTHING, updated cuenta los conflictos vistos pero la fila no cambia). En error (sin lanzar): {status:'error', error:str}." tested: true tests: - "test_upsert_fila_nueva_inserta" - "test_update_cols_selectivo_no_pisa_columnas_excluidas" - "test_update_cols_vacio_do_nothing_no_cambia_existente" - "test_varias_filas_a_la_vez_mezcla_insert_y_update" - "test_rows_vacio_devuelve_cero" - "test_columnas_inconsistentes_devuelve_error" - "test_identificador_invalido_devuelve_error" test_file_path: "python/functions/infra/duckdb_upsert_test.py" file_path: "python/functions/infra/duckdb_upsert.py" --- ## Ejemplo ```python import sys sys.path.insert(0, "python/functions") import duckdb from infra.duckdb_upsert import duckdb_upsert db = "/tmp/leads.duckdb" con = duckdb.connect(db) con.execute("CREATE TABLE leads (email VARCHAR PRIMARY KEY, name VARCHAR, score INTEGER)") con.close() # Re-ingest 1: inserta el lead. print(duckdb_upsert( db, "leads", [{"email": "ana@x.com", "name": "Ana", "score": 0}], key_cols=["email"], )) # {'status': 'ok', 'inserted': 1, 'updated': 0} # Mientras tanto, un proceso de scoring escribio score=87 en la DB (fuente de verdad). con = duckdb.connect(db) con.execute("UPDATE leads SET score = 87 WHERE email = 'ana@x.com'") con.close() # Re-ingest 2: el feed trae name actualizado y score=0 (valor por defecto del feed), # pero solo autorizamos actualizar 'name'. 'score' lo posee la DB y NO se pisa. print(duckdb_upsert( db, "leads", [{"email": "ana@x.com", "name": "Ana Lopez", "score": 0}], key_cols=["email"], update_cols=["name"], )) # {'status': 'ok', 'inserted': 0, 'updated': 1} con = duckdb.connect(db, read_only=True) print(con.execute("SELECT name, score FROM leads WHERE email = 'ana@x.com'").fetchone()) # ('Ana Lopez', 87) <- name actualizado, score conservado con.close() ``` ## Cuando usarla Cuando la DB es la fuente de verdad y un re-ingest no debe pisar campos que ya posee la DB: pasa `update_cols` SIN esos campos. Tipico en pipelines de ingesta idempotente donde una fila se reinserta periodicamente (catalogo, leads, entidades OSINT, snapshots) pero ciertas columnas se enriquecieron despues (score calculado, anotacion manual, flag derivado) y deben sobrevivir al refresco. Usa `update_cols=None` para un upsert "todo" clasico, `update_cols=[]` para insertar solo filas nuevas sin tocar las existentes, y una lista explicita para ownership selectivo. Util como paso de escritura en una composicion: el dict de salida es serializable y reporta cuantas filas se insertaron vs actualizaron. ## Gotchas - Escritura real en disco (impura). `ON CONFLICT (key_cols)` solo funciona si esas columnas tienen **PRIMARY KEY o UNIQUE** en la tabla; sin esa restriccion DuckDB no detecta el conflicto y devolveria `{status:'error', ...}` o duplicaria. La tabla debe existir de antemano (la funcion no la crea). - **Single-writer**: la cuenta inserted/updated consulta la existencia de cada clave en la misma conexion/transaccion justo antes de insertarla. Si otro proceso escribe concurrentemente la misma base, las cuentas pueden desviarse y DuckDB puede rechazar abrir el archivo por lock. DiseƱada para un unico escritor. - **Identificadores validados**: `table` y los nombres de columna deben casar `[A-Za-z_][A-Za-z0-9_]*` (DuckDB no permite parametrizar identificadores, asi que se interpolan tras validar). Un nombre con espacios, comillas, puntos o vacio devuelve `{status:'error'}`. Los valores de las filas siempre van por `?`. - **Esquema fijo por la primera fila**: el conjunto de columnas de insercion lo determina `rows[0]`. Todas las filas deben tener exactamente las mismas claves; si una fila difiere, se devuelve error (no se hace insercion parcial). - `update_cols=[]` genera `DO NOTHING`: la fila existente queda intacta, pero el contador `updated` sigue reflejando los conflictos vistos (no son inserts nuevos). - Nunca lanza: todo fallo (path bloqueado, tabla inexistente, tipo invalido) vuelve como `{status:'error', error:str}`.