---
name: duckdb_upsert
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def duckdb_upsert(db_path: str, table: str, rows: list[dict], key_cols: list[str], update_cols: list[str] | None = None) -> dict"
description: "UPSERT idempotente de filas en una tabla DuckDB con ownership selectivo de columnas. Construye INSERT INTO
(cols) VALUES (?,...) ON CONFLICT (key_cols) DO UPDATE SET col=excluded.col, ... (o DO NOTHING) y lo ejecuta fila por fila para contar inserts vs updates. La clave del patron es update_cols: en un conflicto solo se actualizan esas columnas, de modo que las columnas excluidas conservan su valor previo (la DB es duena de ellas y un re-ingest no las pisa). update_cols=None actualiza todas menos key_cols; update_cols=[] hace DO NOTHING. Abre duckdb.connect(db_path) en lectura-escritura, commit y close en try/finally. Valida que tabla y columnas casen [A-Za-z_][A-Za-z0-9_]* antes de interpolarlas; los valores van por placeholders '?'. Devuelve dict sin lanzar: {status:'ok', inserted, updated} o {status:'error', error}. key_cols deben tener PRIMARY KEY o UNIQUE en la tabla. Depende del paquete duckdb (1.5.2 en python/.venv)."
tags: [duckdb, sql, upsert, idempotent, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_py_core"
imports: [re, duckdb]
params:
- name: db_path
desc: "ruta al archivo DuckDB. Se abre en lectura-escritura (duckdb.connect), por lo que se crea si no existe; pero la tabla destino debe existir y tener PRIMARY KEY o UNIQUE en key_cols para que ON CONFLICT funcione."
- name: table
desc: "nombre de la tabla destino. Validado como identificador SQL [A-Za-z_][A-Za-z0-9_]*; un nombre raro devuelve {status:'error'} (no se interpola sin validar)."
- name: rows
desc: "lista de dicts, un dict por fila (clave=nombre de columna). El esquema de insercion lo fija el conjunto de claves de la PRIMERA fila; todas las filas deben tener exactamente las mismas claves o se devuelve error. Lista vacia -> {status:'ok', inserted:0, updated:0}."
- name: key_cols
desc: "columnas de la clave de conflicto. Deben existir como PRIMARY KEY o UNIQUE en la tabla y estar presentes en las claves de cada fila. No puede estar vacia."
- name: update_cols
desc: "columnas a actualizar en caso de conflicto. None (default) = todas las columnas de la fila menos key_cols. Lista vacia [] = DO NOTHING (inserta nuevas, no toca existentes). Lista con columnas = DO UPDATE SET solo esas; las no listadas conservan su valor previo (ownership selectivo)."
output: "dict. En exito: {status:'ok', inserted:int, updated:int} donde inserted cuenta las claves que no existian y updated las que ya existian (con update_cols=[] / DO NOTHING, updated cuenta los conflictos vistos pero la fila no cambia). En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_upsert_fila_nueva_inserta"
- "test_update_cols_selectivo_no_pisa_columnas_excluidas"
- "test_update_cols_vacio_do_nothing_no_cambia_existente"
- "test_varias_filas_a_la_vez_mezcla_insert_y_update"
- "test_rows_vacio_devuelve_cero"
- "test_columnas_inconsistentes_devuelve_error"
- "test_identificador_invalido_devuelve_error"
test_file_path: "python/functions/infra/duckdb_upsert_test.py"
file_path: "python/functions/infra/duckdb_upsert.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
import duckdb
from infra.duckdb_upsert import duckdb_upsert
db = "/tmp/leads.duckdb"
con = duckdb.connect(db)
con.execute("CREATE TABLE leads (email VARCHAR PRIMARY KEY, name VARCHAR, score INTEGER)")
con.close()
# Re-ingest 1: inserta el lead.
print(duckdb_upsert(
db, "leads",
[{"email": "ana@x.com", "name": "Ana", "score": 0}],
key_cols=["email"],
))
# {'status': 'ok', 'inserted': 1, 'updated': 0}
# Mientras tanto, un proceso de scoring escribio score=87 en la DB (fuente de verdad).
con = duckdb.connect(db)
con.execute("UPDATE leads SET score = 87 WHERE email = 'ana@x.com'")
con.close()
# Re-ingest 2: el feed trae name actualizado y score=0 (valor por defecto del feed),
# pero solo autorizamos actualizar 'name'. 'score' lo posee la DB y NO se pisa.
print(duckdb_upsert(
db, "leads",
[{"email": "ana@x.com", "name": "Ana Lopez", "score": 0}],
key_cols=["email"],
update_cols=["name"],
))
# {'status': 'ok', 'inserted': 0, 'updated': 1}
con = duckdb.connect(db, read_only=True)
print(con.execute("SELECT name, score FROM leads WHERE email = 'ana@x.com'").fetchone())
# ('Ana Lopez', 87) <- name actualizado, score conservado
con.close()
```
## Cuando usarla
Cuando la DB es la fuente de verdad y un re-ingest no debe pisar campos que ya
posee la DB: pasa `update_cols` SIN esos campos. Tipico en pipelines de ingesta
idempotente donde una fila se reinserta periodicamente (catalogo, leads, entidades
OSINT, snapshots) pero ciertas columnas se enriquecieron despues (score calculado,
anotacion manual, flag derivado) y deben sobrevivir al refresco. Usa
`update_cols=None` para un upsert "todo" clasico, `update_cols=[]` para insertar
solo filas nuevas sin tocar las existentes, y una lista explicita para ownership
selectivo. Util como paso de escritura en una composicion: el dict de salida es
serializable y reporta cuantas filas se insertaron vs actualizaron.
## Gotchas
- Escritura real en disco (impura). `ON CONFLICT (key_cols)` solo funciona si esas
columnas tienen **PRIMARY KEY o UNIQUE** en la tabla; sin esa restriccion DuckDB
no detecta el conflicto y devolveria `{status:'error', ...}` o duplicaria. La
tabla debe existir de antemano (la funcion no la crea).
- **Single-writer**: la cuenta inserted/updated consulta la existencia de cada
clave en la misma conexion/transaccion justo antes de insertarla. Si otro
proceso escribe concurrentemente la misma base, las cuentas pueden desviarse y
DuckDB puede rechazar abrir el archivo por lock. DiseƱada para un unico escritor.
- **Identificadores validados**: `table` y los nombres de columna deben casar
`[A-Za-z_][A-Za-z0-9_]*` (DuckDB no permite parametrizar identificadores, asi que
se interpolan tras validar). Un nombre con espacios, comillas, puntos o vacio
devuelve `{status:'error'}`. Los valores de las filas siempre van por `?`.
- **Esquema fijo por la primera fila**: el conjunto de columnas de insercion lo
determina `rows[0]`. Todas las filas deben tener exactamente las mismas claves; si
una fila difiere, se devuelve error (no se hace insercion parcial).
- `update_cols=[]` genera `DO NOTHING`: la fila existente queda intacta, pero el
contador `updated` sigue reflejando los conflictos vistos (no son inserts nuevos).
- Nunca lanza: todo fallo (path bloqueado, tabla inexistente, tipo invalido) vuelve
como `{status:'error', error:str}`.