feat(duckdb,dav): primitivas de escritura DuckDB + libretas CardDAV + vCard multi-valor

Cinco funciones nuevas para soportar DuckDB como fuente de verdad del project osint:

Grupo duckdb (escritura, complementan a duckdb_query_readonly):
- duckdb_execute_py_infra (impure): ejecuta INSERT/UPDATE/DELETE/DDL en read-write, commit, {status,rowcount}. 6 tests.
- duckdb_upsert_py_infra (impure): UPSERT ON CONFLICT actualizando solo update_cols → ownership selectivo (un re-upsert no pisa columnas excluidas). 7 tests.

Grupo dav (libretas de contactos + vCard multi-valor):
- dav_make_addressbook_py_infra (impure): crea una libreta CardDAV nueva via extended MKCOL (RFC 5689). Idempotente. 12 tests.
- dav_list_addressbooks_py_infra (impure): lista las libretas del contacts-home (PROPFIND Depth:1). 7 tests.
- build_vcard_py_core (pure): serializa un contacto a vCard 3.0 multi-valor (N TEL/EMAIL/ADR + X-OSINT-*). 5 tests.

Paginas de capacidad duckdb.md y dav.md actualizadas.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-13 00:33:12 +02:00
parent 1c8a86594f
commit 1c4a4b9259
17 changed files with 1773 additions and 0 deletions
@@ -0,0 +1,91 @@
---
name: dav_list_addressbooks
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def dav_list_addressbooks(base_url: str, username: str, password: str, contacts_home: str, *, timeout_s: float = 20.0, verify_tls: bool = True) -> dict"
description: "Lista las colecciones de libreta de contactos CardDAV bajo un contacts-home en UNA peticion PROPFIND Depth:1. Devuelve solo las colecciones hijas que son libretas CardDAV de verdad (resourcetype {urn:ietf:params:xml:ns:carddav}addressbook), cada una con su href, su displayname (DAV) y su descripcion (addressbook-description de CardDAV) si el servidor la expone. El propio contacts-home (coleccion plana sin el resourcetype addressbook) y cualquier coleccion no-addressbook se excluyen. Considera solo los propstat con estado 2xx para no leer props marcadas 404. Construye Authorization: Basic base64(user:pass) a mano y parsea el multistatus con regex (parser puro _parse_multistatus aislado para testeo sin red). verify_tls=True por defecto. Maneja errores sin lanzar. Solo stdlib (urllib, base64, re, ssl, html). Es la analoga CardDAV de dav_list_calendars. Probada contra Xandikos."
tags: [dav, carddav, addressbook, addressbooks, contacts, propfind, displayname, description, ingest, http, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [base64, html, re, ssl, urllib.error, urllib.request]
params:
- name: base_url
desc: "URL base del servidor DAV (p.ej. 'https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com')."
- name: username
desc: "usuario para HTTP Basic auth (p.ej. 'enmanuel')."
- name: password
desc: "contrasena para HTTP Basic auth. Resolver desde pass con pass_get_secret, nunca hardcodear."
- name: contacts_home
desc: "ruta del contacts-home del usuario (p.ej. '/enmanuel/contacts/'). Las libretas de contactos cuelgan de el."
- name: timeout_s
desc: "timeout de la peticion HTTP en segundos. Default 20.0."
- name: verify_tls
desc: "si True (default) verifica el certificado TLS. No desactivar salvo entorno de prueba."
output: "dict. En exito: {status:'ok', http_status:int, addressbooks:[{href:str, name:str, description:str|None}, ...]} con un elemento por libreta de contactos, ordenadas por nombre. description es el addressbook-description de CardDAV o None. En error (sin lanzar): {status:'error', error:str, http_status:int|None}."
tested: true
tests:
- "test_devuelve_dos_libretas_descarta_home"
- "test_ordenadas_por_nombre"
- "test_extrae_nombre_y_descripcion"
- "test_descripcion_ausente_es_none"
- "test_ignora_props_404"
- "test_multistatus_vacio_devuelve_lista_vacia"
- "test_home_sin_libretas_solo_home"
test_file_path: "python/functions/infra/dav_list_addressbooks_test.py"
file_path: "python/functions/infra/dav_list_addressbooks.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from infra.pass_get_secret import pass_get_secret
from infra.dav_list_addressbooks import dav_list_addressbooks
pw = pass_get_secret("dav/xandikos-enmanuel")["value"] # NO logear
res = dav_list_addressbooks(
base_url="https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com",
username="enmanuel",
password=pw,
contacts_home="/enmanuel/contacts/",
)
for a in res["addressbooks"]:
print(a["name"], a["href"], a["description"])
# Personal /enmanuel/contacts/personal/ Contactos personales & familia
# contacts /enmanuel/contacts/contacts/ None
```
## Cuando usarla
Cuando un ingest necesita recorrer **todas** las libretas de contactos del usuario,
no solo la principal: el contacts-home CardDAV puede tener varias colecciones
(personal, trabajo, familia, ...) y necesitas el href de cada una para luego
volcar sus vCards con `dav_list_resources` / `dav_get_collection`. Tambien sirve
para un selector de libreta en una UI (nombre + descripcion). Devuelve los hrefs
sin que el caller los conozca de antemano ni tenga que distinguir el contacts-home
de las libretas reales. Es la analoga CardDAV de `dav_list_calendars`.
## Gotchas
- **Impura: red + auth.** Lectura remota real sobre TLS; el password viene de
`pass` (`dav/xandikos-enmanuel`) y no se logea. `verify_tls=True` por defecto;
no desactivar salvo entorno de prueba.
- **Depende de que el servidor exponga `resourcetype` en el PROPFIND.** Filtra por
el resourcetype `carddav:addressbook`: el propio contacts-home es una coleccion
plana (sin ese resourcetype) y queda fuera, igual que carpetas intermedias. Si
un servidor no devuelve `resourcetype`, ninguna coleccion pasa el filtro y la
lista sale vacia. Si tu servidor anida libretas mas profundo que Depth:1, llama
con `contacts_home` apuntando al nivel correcto.
- La `description` es el `addressbook-description` de CardDAV
(`urn:ietf:params:xml:ns:carddav`), que Xandikos puede no tener seteado
(devuelve None).
- Solo lee los `<propstat>` con estado 2xx para no confundir un
`<addressbook-description/>` vacio de un propstat 404 con una descripcion real.
@@ -0,0 +1,202 @@
"""Lista las colecciones de libreta de contactos CardDAV bajo un contacts-home (PROPFIND).
Funcion impura: hace UNA peticion HTTP PROPFIND Depth:1 sobre el directorio
contacts-home de un usuario (p.ej. `/enmanuel/contacts/`) y devuelve solo las
colecciones hijas que son libretas CardDAV de verdad — las que declaran el
resourcetype `{urn:ietf:params:xml:ns:carddav}addressbook`. Por cada una extrae
su href, su `displayname` (DAV) y, si el servidor lo expone, su descripcion
(`addressbook-description` de CardDAV). El propio contacts-home (que es una
coleccion plana sin el resourcetype addressbook) se excluye.
Es la analoga CardDAV de `dav_list_calendars`: lo que necesita un ingest o un
selector de libreta cuando el usuario tiene varias colecciones de contactos bajo
su contacts-home y hay que recorrerlas todas (o elegir una), con su nombre y su
descripcion. `dav_list_resources` solo devuelve hrefs+etag de los recursos de UNA
coleccion (las vCards), no las colecciones hijas con su metadata.
Construye el header `Authorization: Basic base64(user:pass)` a mano con stdlib y
parsea el multistatus con regex simple (sin parser XML externo), considerando
solo los `<propstat>` con estado 2xx para no recoger props que el servidor marca
404. Maneja errores sin lanzar. Solo usa stdlib (urllib, base64, re, ssl, html).
Probado contra Xandikos.
"""
import base64
import html
import re
import ssl
import urllib.error
import urllib.request
_RESPONSE_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?response>(.*?)</(?:[A-Za-z0-9]+:)?response>",
re.DOTALL | re.IGNORECASE,
)
_HREF_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?href>\s*(.*?)\s*</(?:[A-Za-z0-9]+:)?href>",
re.DOTALL | re.IGNORECASE,
)
_PROPSTAT_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?propstat>(.*?)</(?:[A-Za-z0-9]+:)?propstat>",
re.DOTALL | re.IGNORECASE,
)
_STATUS_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?status>\s*(.*?)\s*</(?:[A-Za-z0-9]+:)?status>",
re.DOTALL | re.IGNORECASE,
)
_DISPLAYNAME_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?displayname>\s*(.*?)\s*</(?:[A-Za-z0-9]+:)?displayname>",
re.DOTALL | re.IGNORECASE,
)
# Descripcion de CardDAV: <ns:addressbook-description>texto</ns:addressbook-description>.
_DESCRIPTION_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?addressbook-description[^>]*>\s*(.*?)\s*</(?:[A-Za-z0-9]+:)?addressbook-description>",
re.DOTALL | re.IGNORECASE,
)
# Marca de libreta CardDAV en el resourcetype. El elemento `<C:addressbook/>`
# puede venir con o sin prefijo de namespace (`<ns2:addressbook/>`, `<addressbook/>`).
_ADDRESSBOOK_TYPE_RE = re.compile(
r"<(?:[A-Za-z0-9]+:)?addressbook(?:\s[^>]*)?/?>", re.IGNORECASE
)
# El PROPFIND pide nombre, tipo y descripcion. Declarar el namespace de CardDAV
# permite que el servidor responda `addressbook-description` cuando exista.
_PROPFIND_BODY = (
'<?xml version="1.0" encoding="utf-8" ?>'
'<D:propfind xmlns:D="DAV:" '
'xmlns:C="urn:ietf:params:xml:ns:carddav">'
"<D:prop>"
"<D:displayname/><D:resourcetype/><C:addressbook-description/>"
"</D:prop></D:propfind>"
)
def _basic_auth_header(username: str, password: str) -> str:
raw = ("%s:%s" % (username, password)).encode("utf-8")
return "Basic " + base64.b64encode(raw).decode("ascii")
def _join_url(base_url: str, collection_path: str) -> str:
return base_url.rstrip("/") + "/" + collection_path.strip("/") + "/"
def _ok_propstats(response_block: str) -> str:
"""Concatena solo los `<propstat>` con estado 2xx de un `<response>`.
El servidor agrupa las props por estado: las presentes en un propstat 200 y
las ausentes en un propstat 404. Tomar solo los 2xx evita leer un
`<addressbook-description/>` vacio del bloque 404 como si fuera el valor real.
"""
parts = []
for ps in _PROPSTAT_RE.findall(response_block):
status_m = _STATUS_RE.search(ps)
if status_m and " 2" in (" " + status_m.group(1)):
parts.append(ps)
# Si no hay propstat (servidor minimalista), usar el bloque entero.
return "".join(parts) if parts else response_block
def _parse_multistatus(xml_text: str, contacts_home: str) -> list:
"""Parsea un multistatus 207 y devuelve las libretas CardDAV hijas.
Helper puro (sin red): recorre los `<response>` del XML, descarta el propio
contacts-home y las colecciones que no declaran el resourcetype
`carddav:addressbook`, y por cada libreta extrae href, displayname y
addressbook-description (de los propstat 2xx). Devuelve la lista ordenada por
nombre. Aislado para poder testear el parseo sin tocar la red.
Args:
xml_text: cuerpo XML del multistatus (respuesta del PROPFIND Depth:1).
contacts_home: ruta del contacts-home (p.ej. '/enmanuel/contacts/'),
usada para excluir la entrada del propio home.
Returns:
list de dicts {href:str, name:str, description:str|None}, ordenada por
name (case-insensitive).
"""
home_tail = contacts_home.strip("/").rsplit("/", 1)[-1]
addressbooks = []
for block in _RESPONSE_RE.findall(xml_text):
href_m = _HREF_RE.search(block)
if not href_m:
continue
href = href_m.group(1).strip()
tail = href.rstrip("/").rsplit("/", 1)[-1]
# El propio contacts-home (o un href identico al home): se excluye.
if tail == home_tail:
continue
# Solo las colecciones marcadas como libreta CardDAV. El home plano no
# lleva el resourcetype `carddav:addressbook` y queda fuera.
if not _ADDRESSBOOK_TYPE_RE.search(block):
continue
ok = _ok_propstats(block)
name_m = _DISPLAYNAME_RE.search(ok)
name = html.unescape(name_m.group(1).strip()) if name_m else tail
if not name:
name = tail
desc_m = _DESCRIPTION_RE.search(ok)
description = html.unescape(desc_m.group(1).strip()) if desc_m else None
if description == "":
description = None
addressbooks.append({"href": href, "name": name, "description": description})
addressbooks.sort(key=lambda a: a["name"].lower())
return addressbooks
def dav_list_addressbooks(
base_url: str,
username: str,
password: str,
contacts_home: str,
*,
timeout_s: float = 20.0,
verify_tls: bool = True,
) -> dict:
"""Lista las colecciones de libreta de contactos CardDAV bajo un contacts-home.
Hace un PROPFIND Depth:1 sobre `contacts_home` y devuelve solo las colecciones
hijas marcadas como libreta CardDAV (resourcetype `carddav:addressbook`), con
su nombre y descripcion. El propio `contacts_home` y cualquier coleccion
no-addressbook se excluyen.
Args:
base_url: URL base del servidor DAV (p.ej. 'https://dav-x.example.com').
username: usuario para HTTP Basic auth.
password: contrasena para HTTP Basic auth. Resolver desde pass.
contacts_home: ruta del contacts-home del usuario (p.ej.
'/enmanuel/contacts/'). Las libretas de contactos cuelgan de el.
timeout_s: timeout de la peticion en segundos. Default 20.0.
verify_tls: si True (default) verifica el certificado TLS.
Returns:
dict. En exito: {status:'ok', http_status:int,
addressbooks:[{href:str, name:str, description:str|None}, ...]} con un
elemento por libreta de contactos (ordenadas por nombre). `description`
es el `addressbook-description` de CardDAV si el servidor lo expone, o
None. En error (sin lanzar): {status:'error', error:str,
http_status:int|None}.
"""
url = _join_url(base_url, contacts_home)
headers = {
"Authorization": _basic_auth_header(username, password),
"Content-Type": "application/xml; charset=utf-8",
"Depth": "1",
}
req = urllib.request.Request(
url, data=_PROPFIND_BODY.encode("utf-8"), method="PROPFIND", headers=headers
)
context = None if verify_tls else ssl._create_unverified_context()
try:
with urllib.request.urlopen(req, timeout=timeout_s, context=context) as resp:
status = resp.status
xml = resp.read().decode("utf-8", "replace")
except urllib.error.HTTPError as e:
return {"status": "error", "error": "http %s" % e.code, "http_status": e.code}
except urllib.error.URLError as e:
return {"status": "error", "error": str(e.reason), "http_status": None}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e), "http_status": None}
addressbooks = _parse_multistatus(xml, contacts_home)
return {"status": "ok", "http_status": status, "addressbooks": addressbooks}
@@ -0,0 +1,112 @@
"""Tests para dav_list_addressbooks.
Sin red: ejercitan el parser puro `_parse_multistatus` con un multistatus 207
realista (2 libretas CardDAV + el contacts-home). Verifican que descarta el home,
extrae nombre y descripcion correctos, normaliza descripcion ausente a None e
ignora props marcadas 404 por el servidor.
"""
from infra.dav_list_addressbooks import _parse_multistatus
# Multistatus 207 realista al estilo Xandikos: el contacts-home plano + dos
# libretas CardDAV. La segunda no expone addressbook-description en su propstat
# 2xx (Xandikos la devuelve vacia en un propstat 404).
_MULTISTATUS = """<?xml version="1.0" encoding="utf-8"?>
<D:multistatus xmlns:D="DAV:" xmlns:C="urn:ietf:params:xml:ns:carddav">
<D:response>
<D:href>/enmanuel/contacts/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype><D:collection/></D:resourcetype>
<D:displayname>contacts</D:displayname>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
<D:propstat>
<D:prop><C:addressbook-description/></D:prop>
<D:status>HTTP/1.1 404 Not Found</D:status>
</D:propstat>
</D:response>
<D:response>
<D:href>/enmanuel/contacts/personal/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype><D:collection/><C:addressbook/></D:resourcetype>
<D:displayname>Personal</D:displayname>
<C:addressbook-description>Contactos personales &amp; familia</C:addressbook-description>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
<D:response>
<D:href>/enmanuel/contacts/work/</D:href>
<D:propstat>
<D:prop>
<D:resourcetype><D:collection/><C:addressbook/></D:resourcetype>
<D:displayname>Trabajo</D:displayname>
</D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
<D:propstat>
<D:prop><C:addressbook-description/></D:prop>
<D:status>HTTP/1.1 404 Not Found</D:status>
</D:propstat>
</D:response>
</D:multistatus>
"""
def test_devuelve_dos_libretas_descarta_home():
libros = _parse_multistatus(_MULTISTATUS, "/enmanuel/contacts/")
assert len(libros) == 2
hrefs = [a["href"] for a in libros]
assert "/enmanuel/contacts/" not in hrefs
def test_ordenadas_por_nombre():
libros = _parse_multistatus(_MULTISTATUS, "/enmanuel/contacts/")
# Orden case-insensitive: "Personal" < "Trabajo".
assert [a["name"] for a in libros] == ["Personal", "Trabajo"]
def test_extrae_nombre_y_descripcion():
libros = _parse_multistatus(_MULTISTATUS, "/enmanuel/contacts/")
personal = next(a for a in libros if a["href"] == "/enmanuel/contacts/personal/")
assert personal["name"] == "Personal"
# html.unescape convierte &amp; en &.
assert personal["description"] == "Contactos personales & familia"
def test_descripcion_ausente_es_none():
libros = _parse_multistatus(_MULTISTATUS, "/enmanuel/contacts/")
trabajo = next(a for a in libros if a["href"] == "/enmanuel/contacts/work/")
assert trabajo["name"] == "Trabajo"
assert trabajo["description"] is None
def test_ignora_props_404():
# El home tiene un addressbook-description en un propstat 404; aunque no se
# descartara por resourcetype, no debe colarse como libreta ni su prop vacia
# mezclarse con otra entrada.
libros = _parse_multistatus(_MULTISTATUS, "/enmanuel/contacts/")
for a in libros:
# Ninguna descripcion debe ser cadena vacia (404 -> None, no "").
assert a["description"] != ""
def test_multistatus_vacio_devuelve_lista_vacia():
xml = '<D:multistatus xmlns:D="DAV:"></D:multistatus>'
assert _parse_multistatus(xml, "/enmanuel/contacts/") == []
def test_home_sin_libretas_solo_home():
xml = """<D:multistatus xmlns:D="DAV:">
<D:response>
<D:href>/enmanuel/contacts/</D:href>
<D:propstat>
<D:prop><D:resourcetype><D:collection/></D:resourcetype></D:prop>
<D:status>HTTP/1.1 200 OK</D:status>
</D:propstat>
</D:response>
</D:multistatus>"""
assert _parse_multistatus(xml, "/enmanuel/contacts/") == []
@@ -0,0 +1,110 @@
---
name: dav_make_addressbook
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def dav_make_addressbook(base_url: str, username: str, password: str, contacts_home: str, slug: str, display_name: str = \"\", description: str = \"\", *, timeout_s: float = 20.0, verify_tls: bool = True) -> dict"
description: "Crea una nueva coleccion de contactos CardDAV (una libreta/agenda de contactos nueva) bajo el contacts-home de un principal via MKCOL extendido (RFC 5689), declarando el resourcetype como addressbook y fijando el displayname y la descripcion (addressbook-description) en el propio cuerpo XML. La coleccion se crea en <contacts_home><slug>/. El slug se sanea a [a-z0-9_-] (minusculas, espacios->guion); si queda vacio devuelve error de validacion. Idempotente: 201 Created es exito; 405/301 (ya existe) devuelve {status:'ok', existed:True}. Escapa display_name/description para XML. Construye Authorization: Basic base64(user:pass) a mano. Maneja errores sin lanzar (salvo validacion de args). Solo stdlib (urllib, base64, re, ssl, xml.sax.saxutils). Probado contra Xandikos. Analoga de dav_make_calendar para CardDAV."
tags: [dav, carddav, addressbook, contacts, mkcol, create, collection, http, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [base64, re, ssl, urllib.error, urllib.request, xml.sax.saxutils]
params:
- name: base_url
desc: "URL base del servidor DAV sin barra final (p.ej. 'https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com')."
- name: username
desc: "usuario para HTTP Basic auth (p.ej. 'enmanuel')."
- name: password
desc: "contrasena para HTTP Basic auth. Resolver desde pass con pass_get_secret, nunca hardcodear."
- name: contacts_home
desc: "ruta del contacts-home del principal con barra final (p.ej. '/enmanuel/contacts/'). La nueva coleccion cuelga de el."
- name: slug
desc: "segmento de path de la coleccion en la URL (p.ej. 'trabajo'); se sanea a [a-z0-9_-]. La coleccion se crea en <contacts_home><slug>/. Si queda vacio tras sanear, devuelve error de validacion."
- name: display_name
desc: "nombre visible de la coleccion (DAV:displayname). Si vacio, usa el slug saneado."
- name: description
desc: "descripcion de la coleccion (addressbook-description de CardDAV). Opcional; '' lo omite."
- name: timeout_s
desc: "timeout de cada peticion HTTP en segundos. Default 20.0."
- name: verify_tls
desc: "si True (default) verifica el certificado TLS. No desactivar salvo entorno de prueba."
output: "dict. En exito: {status:'ok', http_status:int, href:str} y, si la coleccion ya existia, ademas existed:True. En error (sin lanzar): {status:'error', http_status:int|None, href:str, error:str}. href es la ruta de la coleccion (contacts_home + slug saneado + '/')."
tested: true
tests:
- "test_sanitize_slug_minusculas"
- "test_sanitize_slug_espacios_a_guion"
- "test_sanitize_slug_elimina_caracteres_raros"
- "test_sanitize_slug_colapsa_guiones_y_recorta"
- "test_sanitize_slug_vacio"
- "test_join_url_compone_la_coleccion"
- "test_mkcol_xml_es_mkcol_extendido"
- "test_mkcol_xml_declara_resourcetype_addressbook"
- "test_mkcol_xml_incluye_displayname"
- "test_mkcol_xml_escapa_displayname"
- "test_mkcol_xml_incluye_y_escapa_descripcion"
- "test_mkcol_xml_omite_descripcion_vacia"
test_file_path: "python/functions/infra/dav_make_addressbook_test.py"
file_path: "python/functions/infra/dav_make_addressbook.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from infra.pass_get_secret import pass_get_secret
from infra.dav_make_addressbook import dav_make_addressbook
pw = pass_get_secret("dav/xandikos-enmanuel")["value"] # NO logear
res = dav_make_addressbook(
base_url="https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com",
username="enmanuel",
password=pw,
contacts_home="/enmanuel/contacts/",
slug="trabajo",
display_name="Trabajo",
)
print(res)
# {'status': 'ok', 'http_status': 201, 'href': '/enmanuel/contacts/trabajo/'}
# Volver a llamar con el mismo slug:
# {'status': 'ok', 'http_status': 405, 'href': '/enmanuel/contacts/trabajo/', 'existed': True}
```
## Cuando usarla
Cuando el usuario quiere una libreta/agenda de contactos nueva ademas de la
principal: una coleccion CardDAV separada ("Trabajo", "Personal", "Familia") con
su propio nombre visible, bajo el contacts-home del principal. Es la analoga de
`dav_make_calendar` para CardDAV. El `href` devuelto es la ruta de la coleccion
que luego usas para escribir vCards (PUT de cada contacto) o para listarla en el
selector de libretas.
## Gotchas
- Impura: requiere red + Basic auth contra el servidor DAV. El password viene de
`pass`, no se logea ni se hardcodea.
- Idempotente: si la coleccion ya existe en ese path el servidor responde 405
(Method Not Allowed) o 301; ambos se traducen a `{status:'ok', existed:True}`
en vez de error, asi que es seguro reintentar.
- A diferencia de los calendarios (que tienen el metodo HTTP dedicado
MKCALENDAR), CardDAV NO define un "MKADDRESSBOOK". La creacion se hace con
**MKCOL extendido (RFC 5689)**: metodo HTTP `MKCOL` con un cuerpo XML que
declara el `resourcetype` como `D:collection` + `C:addressbook`. Probado contra
Xandikos, que lo soporta.
- Fallback para servidores sin MKCOL extendido: algunos servidores CardDAV viejos
no aceptan cuerpo en MKCOL y devuelven 415/400. En ese caso el patron es
`MKCOL` simple (sin cuerpo) para crear la coleccion + un `PROPPATCH` posterior
que fije el `resourcetype` addressbook, el `displayname` y la
`addressbook-description`. Esta funcion implementa solo el camino extendido (un
request); si te topas con un servidor que no lo soporta, anade el fallback
MKCOL+PROPPATCH antes de promoverlo.
- El `slug` se sanea a `[a-z0-9_-]` (minusculas, espacios->guion, resto fuera).
Un slug que queda vacio tras sanear (p.ej. solo simbolos) devuelve error de
validacion sin tocar la red. El `display_name` y la `description` se escapan
para XML, pero el `slug` que va en la URL ya esta restringido al charset seguro.
@@ -0,0 +1,171 @@
"""Crea una nueva coleccion de contactos CardDAV bajo un contacts-home.
Funcion impura: hace una peticion HTTP MKCOL extendido (RFC 5689) para crear una
"libreta/agenda de contactos" nueva bajo el contacts-home de un principal. El
cuerpo XML del MKCOL declara el resourcetype como addressbook
(`{urn:ietf:params:xml:ns:carddav}addressbook`) y fija de paso el nombre visible
(DAV:displayname) y la descripcion (CardDAV addressbook-description).
El slug (segmento de path de la coleccion) se sanea a `[a-z0-9_-]` (minusculas,
espacios -> '-'); si queda vacio se devuelve un error de validacion. La coleccion
se crea en `<contacts_home><slug>/`.
Idempotente: un 201 (Created) es exito; un 405 (Method Not Allowed) o un 301 (la
coleccion ya existe en ese path) se devuelven como {status:'ok', existed:True}.
El display_name y la description se escapan para XML. Construye
`Authorization: Basic base64(user:pass)` a mano con stdlib. Maneja errores sin
lanzar (salvo validacion de args). Solo usa stdlib (urllib, base64, re, ssl,
xml.sax.saxutils). Probado contra Xandikos.
"""
import base64
import re
import ssl
import urllib.error
import urllib.request
from xml.sax.saxutils import escape as _xml_escape
_UNSAFE_SLUG_RE = re.compile(r"[^a-z0-9_-]")
def _basic_auth_header(username: str, password: str) -> str:
raw = ("%s:%s" % (username, password)).encode("utf-8")
return "Basic " + base64.b64encode(raw).decode("ascii")
def _sanitize_slug(slug: str) -> str:
"""Sanea un slug a `[a-z0-9_-]`.
Pasa a minusculas, convierte espacios (y runs de espacios) en un guion, y
elimina cualquier otro caracter no permitido. Colapsa guiones repetidos y
recorta guiones de los extremos. Puede devolver "" si no queda nada usable;
el caller trata "" como error de validacion.
"""
s = slug.strip().lower()
s = re.sub(r"\s+", "-", s)
s = _UNSAFE_SLUG_RE.sub("", s)
s = re.sub(r"-{2,}", "-", s).strip("-")
return s
def _build_mkcol_xml(display_name: str, description: str = "") -> str:
"""Cuerpo XML del MKCOL extendido (RFC 5689) para crear un addressbook.
Declara el resourcetype como `D:collection` + `C:addressbook` (CardDAV) y
setea el displayname; si hay descripcion, anade `C:addressbook-description`.
Ambos valores se escapan para XML.
"""
name = _xml_escape(display_name)
props = [
"<D:resourcetype><D:collection/><C:addressbook/></D:resourcetype>",
"<D:displayname>%s</D:displayname>" % name,
]
if description:
props.append(
"<C:addressbook-description>%s</C:addressbook-description>"
% _xml_escape(description)
)
return (
'<?xml version="1.0" encoding="utf-8" ?>'
'<D:mkcol xmlns:D="DAV:" '
'xmlns:C="urn:ietf:params:xml:ns:carddav">'
"<D:set><D:prop>%s</D:prop></D:set>"
"</D:mkcol>"
) % "".join(props)
def _join_url(base_url: str, contacts_home: str, slug: str) -> str:
return base_url.rstrip("/") + "/" + contacts_home.strip("/") + "/" + slug + "/"
def dav_make_addressbook(
base_url: str,
username: str,
password: str,
contacts_home: str,
slug: str,
display_name: str = "",
description: str = "",
*,
timeout_s: float = 20.0,
verify_tls: bool = True,
) -> dict:
"""Crea una nueva coleccion de contactos CardDAV (MKCOL extendido RFC 5689).
Crea la coleccion en `<contacts_home><slug>/` via MKCOL extendido, declarando
el resourcetype como addressbook y fijando el displayname (y la descripcion si
se pasa) en el propio cuerpo. Idempotente: si la coleccion ya existe (405/301)
devuelve {status:'ok', existed:True}.
Args:
base_url: URL base del servidor DAV (sin barra final), p.ej.
'https://dav-x.organic-machine.com'.
username: usuario para HTTP Basic auth.
password: contrasena para HTTP Basic auth. Resolver desde pass.
contacts_home: ruta del contacts-home del principal (con barra final),
p.ej. '/enmanuel/contacts/'. La coleccion cuelga de el.
slug: segmento de path de la coleccion (p.ej. 'trabajo'); se sanea a
[a-z0-9_-]. Si queda vacio tras sanear, error de validacion.
display_name: nombre visible (DAV:displayname). Si vacio, usa el slug.
description: descripcion (CardDAV addressbook-description). Opcional.
timeout_s: timeout de cada peticion en segundos. Default 20.0.
verify_tls: si True (default) verifica el certificado TLS.
Returns:
dict. En exito: {status:'ok', http_status:int, href:str} (y existed:True
si ya existia). En error (sin lanzar): {status:'error', http_status:
int|None, href:str, error:str}.
"""
clean = _sanitize_slug(slug)
href = (contacts_home.rstrip("/") + "/" + clean + "/") if clean else ""
if not clean:
return {
"status": "error",
"http_status": None,
"href": href,
"error": "slug invalido: queda vacio tras sanear a [a-z0-9_-]",
}
name = display_name if display_name else clean
url = _join_url(base_url, contacts_home, clean)
context = None if verify_tls else ssl._create_unverified_context()
headers = {
"Authorization": _basic_auth_header(username, password),
"Content-Type": "application/xml; charset=utf-8",
}
# MKCOL extendido (RFC 5689) — crea la coleccion + resourcetype addressbook +
# displayname + (opcional) descripcion, todo en un solo request.
mk_body = _build_mkcol_xml(name, description).encode("utf-8")
req = urllib.request.Request(url, data=mk_body, method="MKCOL", headers=headers)
existed = False
http_status = None
try:
with urllib.request.urlopen(req, timeout=timeout_s, context=context) as resp:
http_status = resp.status
except urllib.error.HTTPError as e:
# 405/301: la coleccion ya existe en ese path -> idempotente.
if e.code in (301, 405):
existed = True
http_status = e.code
else:
return {
"status": "error",
"http_status": e.code,
"href": href,
"error": "http %s" % e.code,
}
except urllib.error.URLError as e:
return {
"status": "error",
"http_status": None,
"href": href,
"error": str(e.reason),
}
except Exception as e: # noqa: BLE001
return {"status": "error", "http_status": None, "href": href, "error": str(e)}
result = {"status": "ok", "http_status": http_status, "href": href}
if existed:
result["existed"] = True
return result
@@ -0,0 +1,78 @@
"""Tests para dav_make_addressbook.
La funcion publica es impura (hace HTTP), asi que no se prueba contra un servidor
real. Se ejercitan los helpers puros extraidos a nivel de modulo: la
sanitizacion del slug, la construccion de la URL de la coleccion y la generacion
del cuerpo XML del MKCOL extendido (resourcetype addressbook + displayname +
descripcion escapados). Sin red.
"""
from infra.dav_make_addressbook import (
_build_mkcol_xml,
_join_url,
_sanitize_slug,
)
def test_sanitize_slug_minusculas():
assert _sanitize_slug("Trabajo") == "trabajo"
def test_sanitize_slug_espacios_a_guion():
assert _sanitize_slug("agenda de trabajo") == "agenda-de-trabajo"
def test_sanitize_slug_elimina_caracteres_raros():
assert _sanitize_slug("Casa/Ocio!! 2026") == "casaocio-2026"
def test_sanitize_slug_colapsa_guiones_y_recorta():
assert _sanitize_slug(" --Foo Bar-- ") == "foo-bar"
def test_sanitize_slug_vacio():
assert _sanitize_slug(" !!! ") == ""
def test_join_url_compone_la_coleccion():
url = _join_url(
"https://dav-x.organic-machine.com",
"/enmanuel/contacts/",
"trabajo",
)
assert url == "https://dav-x.organic-machine.com/enmanuel/contacts/trabajo/"
def test_mkcol_xml_es_mkcol_extendido():
xml = _build_mkcol_xml("Trabajo")
assert "<D:mkcol" in xml
assert 'xmlns:C="urn:ietf:params:xml:ns:carddav"' in xml
def test_mkcol_xml_declara_resourcetype_addressbook():
xml = _build_mkcol_xml("Trabajo")
assert "<D:resourcetype><D:collection/><C:addressbook/></D:resourcetype>" in xml
def test_mkcol_xml_incluye_displayname():
xml = _build_mkcol_xml("Trabajo")
assert "<D:displayname>Trabajo</D:displayname>" in xml
def test_mkcol_xml_escapa_displayname():
xml = _build_mkcol_xml("Casa & <Ocio>")
assert "Casa &amp; &lt;Ocio&gt;" in xml
assert "<Ocio>" not in xml
def test_mkcol_xml_incluye_y_escapa_descripcion():
xml = _build_mkcol_xml("Trabajo", description="A & B <c>")
assert (
"<C:addressbook-description>A &amp; B &lt;c&gt;</C:addressbook-description>"
in xml
)
def test_mkcol_xml_omite_descripcion_vacia():
xml = _build_mkcol_xml("Trabajo")
assert "addressbook-description" not in xml
+94
View File
@@ -0,0 +1,94 @@
---
name: duckdb_execute
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def duckdb_execute(db_path: str, sql: str, params: list = None) -> dict"
description: "Ejecuta UNA sentencia de escritura (INSERT/UPDATE/DELETE/DDL) contra una base DuckDB abierta en conexion read-write (duckdb.connect(db_path)), hace commit y cierra siempre en try/finally. En modo escritura DuckDB crea el archivo si no existe. Es el primitivo de escritura del grupo duckdb; complementa a duckdb_query_readonly_py_infra (solo lectura). Usa parametros posicionales con el marcador '?'. Devuelve un dict sin lanzar (estilo del grupo): {status:'ok', rowcount} en exito y {status:'error', error} en fallo. rowcount es el numero de filas afectadas; DuckDB no expone un rowcount fiable (siempre -1) pero tras INSERT/UPDATE/DELETE el fetchall() del cursor devuelve [(n,)] de donde se extrae; para DDL u operaciones sin filas queda en -1 sin fallar. Depende del paquete duckdb (1.5.2 en python/.venv)."
tags: [duckdb, sql, execute, write, ddl, dml]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_py_core"
imports: [duckdb]
params:
- name: db_path
desc: "ruta al archivo DuckDB. En modo escritura DuckDB crea el archivo si no existe. Un directorio padre inexistente o un lock de otro proceso devuelve {status:'error'}."
- name: sql
desc: "sentencia SQL de escritura (INSERT/UPDATE/DELETE/DDL). Usa el marcador '?' para parametros posicionales."
- name: params
desc: "lista de parametros posicionales para el SQL en orden. None (default) significa sin parametros. Pasar valores aqui en vez de interpolarlos en el SQL evita inyeccion."
output: "dict. En exito: {status:'ok', rowcount:int} donde rowcount es el numero de filas afectadas (o -1 cuando la sentencia no reporta filas, p.ej. DDL). En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_insert_devuelve_status_ok_y_persiste"
- "test_update_afecta_filas_y_persiste"
- "test_delete_afecta_filas_y_persiste"
- "test_ddl_create_table_status_ok"
- "test_crea_la_base_si_no_existe"
- "test_sql_invalido_devuelve_status_error"
test_file_path: "python/functions/infra/duckdb_execute_test.py"
file_path: "python/functions/infra/duckdb_execute.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
from infra.duckdb_execute import duckdb_execute
db = "/tmp/eventos.duckdb"
# DDL: crear la tabla (la base se crea sola si no existia).
print(duckdb_execute(db, "CREATE TABLE eventos (id INTEGER, tipo VARCHAR)"))
# {'status': 'ok', 'rowcount': -1} (DDL no reporta filas)
# DML: insertar con parametros posicionales.
res = duckdb_execute(
db,
"INSERT INTO eventos VALUES (?, ?), (?, ?)",
params=[1, "login", 2, "logout"],
)
print(res)
# {'status': 'ok', 'rowcount': 2}
# UPDATE.
print(duckdb_execute(db, "UPDATE eventos SET tipo = ? WHERE id = ?", params=["signin", 1]))
# {'status': 'ok', 'rowcount': 1}
```
## Cuando usarla
Cuando un service single-writer necesita escribir DDL/DML en su DuckDB: crear o
migrar tablas, insertar registros nuevos, actualizar estado o borrar filas en un
archivo DuckDB que ese proceso posee. Es la mitad de escritura del grupo `duckdb`:
usa `duckdb_query_readonly_py_infra` para leer (sin riesgo de modificar la base) y
`duckdb_execute_py_infra` para escribir con commit. El dict de salida con
`rowcount` es directamente serializable a JSON para pasarlo al siguiente paso de
una composicion.
## Gotchas
- Escritura real de un archivo en disco (impura). Abre en modo read-write y hace
commit; cualquier fallo se devuelve como `{status:'error', ...}`, nunca se lanza.
- DuckDB es single-writer: solo un proceso puede tener la base abierta en
escritura a la vez. Si otro proceso ya la tiene abierta en write, `connect`
falla con un error de lock (`Could not set lock on file ...`) que se devuelve
como `{status:'error', ...}`. Diseña el acceso para que un unico proceso sea el
escritor; los lectores deben usar `duckdb_query_readonly` (read_only=True).
- `rowcount` no es fiable en todos los casos. DuckDB no expone un `cursor.rowcount`
util (siempre devuelve -1); esta funcion lee el conteo del `fetchall()` que
DuckDB emite tras INSERT/UPDATE/DELETE (`[(n,)]`). Para DDL (`CREATE`/`DROP`/
`ALTER`) y operaciones que no reportan filas, `rowcount` queda en `-1` a
proposito: NO trates `-1` como error.
- Ejecuta UNA sentencia por llamada (`con.execute(sql, params)`). No es para
scripts multi-statement separados por `;`; para eso encadena varias llamadas o
usa una funcion/pipeline dedicada.
- Los parametros van en `params` con el marcador `?`, nunca interpolados en el
string del SQL (previene inyeccion).
- A diferencia del modo read-only, este modo **crea** el archivo si no existe. Un
`db_path` con un directorio padre inexistente si falla y se reporta como error.
+82
View File
@@ -0,0 +1,82 @@
"""Ejecuta una sentencia de escritura (INSERT/UPDATE/DELETE/DDL) contra DuckDB.
Funcion impura: abre un archivo DuckDB con `duckdb.connect(db_path)` en modo
read-write (crea el archivo si no existe, cosa que el modo escritura de DuckDB
permite). Ejecuta UNA sentencia con parametros posicionales (DuckDB usa el
marcador `?`), hace commit y cierra la conexion siempre en un bloque try/finally.
Es el primitivo de escritura del grupo `duckdb` del registry; complementa a
`duckdb_query_readonly_py_infra`, que es solo lectura.
Devuelve un dict sin lanzar excepciones, siguiendo el estilo del grupo
(`{status:'ok', ...}` en exito, `{status:'error', error:str}` en fallo). En exito
incluye `rowcount`: el numero de filas afectadas por la sentencia. DuckDB no expone
un `rowcount` fiable en su cursor (siempre devuelve -1), pero tras un
INSERT/UPDATE/DELETE el `fetchall()` del cursor devuelve `[(n,)]` con el conteo;
de ahi se extrae. Para DDL u operaciones que no reportan filas, `rowcount` queda
en -1 y eso NUNCA hace fallar la funcion.
"""
def _affected_rowcount(cursor) -> int:
"""Extrae el numero de filas afectadas de un cursor DuckDB de escritura.
Estrategia robusta para DuckDB:
1. Si `cursor.rowcount` esta disponible y es >= 0, usarlo.
2. Si no, intentar `cursor.fetchall()`: tras INSERT/UPDATE/DELETE DuckDB
devuelve `[(n,)]` con el conteo. Se extrae el primer entero.
3. Si nada aplica (DDL, sin filas), devolver -1.
Nunca lanza: cualquier problema al leer el conteo cae a -1.
"""
try:
rc = getattr(cursor, "rowcount", -1)
if isinstance(rc, int) and rc >= 0:
return rc
except Exception: # noqa: BLE001
pass
try:
fetched = cursor.fetchall()
except Exception: # noqa: BLE001
return -1
if fetched and fetched[0]:
candidate = fetched[0][0]
if isinstance(candidate, int):
return candidate
return -1
def duckdb_execute(db_path: str, sql: str, params: list = None) -> dict:
"""Ejecuta una sentencia de escritura DuckDB en conexion read-write.
Args:
db_path: ruta al archivo DuckDB. En modo escritura DuckDB crea el archivo
si no existe. Un directorio inexistente o un lock de otro proceso
devuelve {status:'error', ...}.
sql: sentencia SQL de escritura (INSERT/UPDATE/DELETE/DDL). Usa el
marcador `?` para parametros posicionales.
params: lista de parametros posicionales para el SQL en orden. None
(default) significa sin parametros.
Returns:
dict. En exito: {status:'ok', rowcount:int} donde rowcount es el numero
de filas afectadas (o -1 cuando la sentencia no reporta filas, p.ej. DDL).
En error (sin lanzar): {status:'error', error:str}.
"""
conn = None
try:
conn = __import__("duckdb").connect(db_path)
cursor = conn.execute(sql, params if params is not None else [])
rowcount = _affected_rowcount(cursor)
# DuckDB autocommitea por defecto, pero llamar a commit es seguro e
# idempotente: garantiza la durabilidad de la escritura.
conn.commit()
return {"status": "ok", "rowcount": rowcount}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
if conn is not None:
conn.close()
@@ -0,0 +1,85 @@
"""Tests para duckdb_execute."""
import duckdb
import pytest
from .duckdb_execute import duckdb_execute
@pytest.fixture
def db(tmp_path):
"""Crea una base DuckDB temporal con una tabla vacia y devuelve su path."""
path = str(tmp_path / "test.duckdb")
con = duckdb.connect(path)
con.execute("CREATE TABLE t (id INTEGER, name VARCHAR)")
con.close()
return path
def _read_rows(path: str) -> list:
"""Relee la tabla t en una conexion read_only y devuelve las filas."""
con = duckdb.connect(path, read_only=True)
try:
return con.execute("SELECT id, name FROM t ORDER BY id").fetchall()
finally:
con.close()
def test_insert_devuelve_status_ok_y_persiste(db):
res = duckdb_execute(
db,
"INSERT INTO t VALUES (?, ?), (?, ?), (?, ?)",
params=[1, "a", 2, "b", 3, "c"],
)
assert res["status"] == "ok"
assert res["rowcount"] == 3
# Releemos para confirmar el efecto en disco.
assert _read_rows(db) == [(1, "a"), (2, "b"), (3, "c")]
def test_update_afecta_filas_y_persiste(db):
duckdb_execute(db, "INSERT INTO t VALUES (1,'a'),(2,'b'),(3,'c')")
res = duckdb_execute(db, "UPDATE t SET name = ? WHERE id <= ?", params=["x", 2])
assert res["status"] == "ok"
assert res["rowcount"] == 2
assert _read_rows(db) == [(1, "x"), (2, "x"), (3, "c")]
def test_delete_afecta_filas_y_persiste(db):
duckdb_execute(db, "INSERT INTO t VALUES (1,'a'),(2,'b'),(3,'c')")
res = duckdb_execute(db, "DELETE FROM t WHERE id = ?", params=[3])
assert res["status"] == "ok"
assert res["rowcount"] == 1
assert _read_rows(db) == [(1, "a"), (2, "b")]
def test_ddl_create_table_status_ok(db):
res = duckdb_execute(db, "CREATE TABLE u (x INTEGER)")
assert res["status"] == "ok"
# DDL no reporta filas: rowcount queda en -1, no falla.
assert res["rowcount"] == -1
# Confirmamos que la tabla existe insertando en ella.
res2 = duckdb_execute(db, "INSERT INTO u VALUES (42)")
assert res2["status"] == "ok"
con = duckdb.connect(db, read_only=True)
try:
assert con.execute("SELECT x FROM u").fetchall() == [(42,)]
finally:
con.close()
def test_crea_la_base_si_no_existe(tmp_path):
path = str(tmp_path / "nueva.duckdb")
res = duckdb_execute(path, "CREATE TABLE nueva (a INTEGER)")
assert res["status"] == "ok"
res2 = duckdb_execute(path, "INSERT INTO nueva VALUES (7)")
assert res2["status"] == "ok"
assert res2["rowcount"] == 1
def test_sql_invalido_devuelve_status_error(db):
res = duckdb_execute(db, "INSERT INTO tabla_que_no_existe VALUES (1)")
assert res["status"] == "error"
assert "error" in res
assert isinstance(res["error"], str) and res["error"]
# La funcion no lanza: el flujo del test llega hasta aqui sin excepcion.
+116
View File
@@ -0,0 +1,116 @@
---
name: duckdb_upsert
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def duckdb_upsert(db_path: str, table: str, rows: list[dict], key_cols: list[str], update_cols: list[str] | None = None) -> dict"
description: "UPSERT idempotente de filas en una tabla DuckDB con ownership selectivo de columnas. Construye INSERT INTO <table> (cols) VALUES (?,...) ON CONFLICT (key_cols) DO UPDATE SET col=excluded.col, ... (o DO NOTHING) y lo ejecuta fila por fila para contar inserts vs updates. La clave del patron es update_cols: en un conflicto solo se actualizan esas columnas, de modo que las columnas excluidas conservan su valor previo (la DB es duena de ellas y un re-ingest no las pisa). update_cols=None actualiza todas menos key_cols; update_cols=[] hace DO NOTHING. Abre duckdb.connect(db_path) en lectura-escritura, commit y close en try/finally. Valida que tabla y columnas casen [A-Za-z_][A-Za-z0-9_]* antes de interpolarlas; los valores van por placeholders '?'. Devuelve dict sin lanzar: {status:'ok', inserted, updated} o {status:'error', error}. key_cols deben tener PRIMARY KEY o UNIQUE en la tabla. Depende del paquete duckdb (1.5.2 en python/.venv)."
tags: [duckdb, sql, upsert, idempotent, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_py_core"
imports: [re, duckdb]
params:
- name: db_path
desc: "ruta al archivo DuckDB. Se abre en lectura-escritura (duckdb.connect), por lo que se crea si no existe; pero la tabla destino debe existir y tener PRIMARY KEY o UNIQUE en key_cols para que ON CONFLICT funcione."
- name: table
desc: "nombre de la tabla destino. Validado como identificador SQL [A-Za-z_][A-Za-z0-9_]*; un nombre raro devuelve {status:'error'} (no se interpola sin validar)."
- name: rows
desc: "lista de dicts, un dict por fila (clave=nombre de columna). El esquema de insercion lo fija el conjunto de claves de la PRIMERA fila; todas las filas deben tener exactamente las mismas claves o se devuelve error. Lista vacia -> {status:'ok', inserted:0, updated:0}."
- name: key_cols
desc: "columnas de la clave de conflicto. Deben existir como PRIMARY KEY o UNIQUE en la tabla y estar presentes en las claves de cada fila. No puede estar vacia."
- name: update_cols
desc: "columnas a actualizar en caso de conflicto. None (default) = todas las columnas de la fila menos key_cols. Lista vacia [] = DO NOTHING (inserta nuevas, no toca existentes). Lista con columnas = DO UPDATE SET solo esas; las no listadas conservan su valor previo (ownership selectivo)."
output: "dict. En exito: {status:'ok', inserted:int, updated:int} donde inserted cuenta las claves que no existian y updated las que ya existian (con update_cols=[] / DO NOTHING, updated cuenta los conflictos vistos pero la fila no cambia). En error (sin lanzar): {status:'error', error:str}."
tested: true
tests:
- "test_upsert_fila_nueva_inserta"
- "test_update_cols_selectivo_no_pisa_columnas_excluidas"
- "test_update_cols_vacio_do_nothing_no_cambia_existente"
- "test_varias_filas_a_la_vez_mezcla_insert_y_update"
- "test_rows_vacio_devuelve_cero"
- "test_columnas_inconsistentes_devuelve_error"
- "test_identificador_invalido_devuelve_error"
test_file_path: "python/functions/infra/duckdb_upsert_test.py"
file_path: "python/functions/infra/duckdb_upsert.py"
---
## Ejemplo
```python
import sys
sys.path.insert(0, "python/functions")
import duckdb
from infra.duckdb_upsert import duckdb_upsert
db = "/tmp/leads.duckdb"
con = duckdb.connect(db)
con.execute("CREATE TABLE leads (email VARCHAR PRIMARY KEY, name VARCHAR, score INTEGER)")
con.close()
# Re-ingest 1: inserta el lead.
print(duckdb_upsert(
db, "leads",
[{"email": "ana@x.com", "name": "Ana", "score": 0}],
key_cols=["email"],
))
# {'status': 'ok', 'inserted': 1, 'updated': 0}
# Mientras tanto, un proceso de scoring escribio score=87 en la DB (fuente de verdad).
con = duckdb.connect(db)
con.execute("UPDATE leads SET score = 87 WHERE email = 'ana@x.com'")
con.close()
# Re-ingest 2: el feed trae name actualizado y score=0 (valor por defecto del feed),
# pero solo autorizamos actualizar 'name'. 'score' lo posee la DB y NO se pisa.
print(duckdb_upsert(
db, "leads",
[{"email": "ana@x.com", "name": "Ana Lopez", "score": 0}],
key_cols=["email"],
update_cols=["name"],
))
# {'status': 'ok', 'inserted': 0, 'updated': 1}
con = duckdb.connect(db, read_only=True)
print(con.execute("SELECT name, score FROM leads WHERE email = 'ana@x.com'").fetchone())
# ('Ana Lopez', 87) <- name actualizado, score conservado
con.close()
```
## Cuando usarla
Cuando la DB es la fuente de verdad y un re-ingest no debe pisar campos que ya
posee la DB: pasa `update_cols` SIN esos campos. Tipico en pipelines de ingesta
idempotente donde una fila se reinserta periodicamente (catalogo, leads, entidades
OSINT, snapshots) pero ciertas columnas se enriquecieron despues (score calculado,
anotacion manual, flag derivado) y deben sobrevivir al refresco. Usa
`update_cols=None` para un upsert "todo" clasico, `update_cols=[]` para insertar
solo filas nuevas sin tocar las existentes, y una lista explicita para ownership
selectivo. Util como paso de escritura en una composicion: el dict de salida es
serializable y reporta cuantas filas se insertaron vs actualizaron.
## Gotchas
- Escritura real en disco (impura). `ON CONFLICT (key_cols)` solo funciona si esas
columnas tienen **PRIMARY KEY o UNIQUE** en la tabla; sin esa restriccion DuckDB
no detecta el conflicto y devolveria `{status:'error', ...}` o duplicaria. La
tabla debe existir de antemano (la funcion no la crea).
- **Single-writer**: la cuenta inserted/updated consulta la existencia de cada
clave en la misma conexion/transaccion justo antes de insertarla. Si otro
proceso escribe concurrentemente la misma base, las cuentas pueden desviarse y
DuckDB puede rechazar abrir el archivo por lock. Diseñada para un unico escritor.
- **Identificadores validados**: `table` y los nombres de columna deben casar
`[A-Za-z_][A-Za-z0-9_]*` (DuckDB no permite parametrizar identificadores, asi que
se interpolan tras validar). Un nombre con espacios, comillas, puntos o vacio
devuelve `{status:'error'}`. Los valores de las filas siempre van por `?`.
- **Esquema fijo por la primera fila**: el conjunto de columnas de insercion lo
determina `rows[0]`. Todas las filas deben tener exactamente las mismas claves; si
una fila difiere, se devuelve error (no se hace insercion parcial).
- `update_cols=[]` genera `DO NOTHING`: la fila existente queda intacta, pero el
contador `updated` sigue reflejando los conflictos vistos (no son inserts nuevos).
- Nunca lanza: todo fallo (path bloqueado, tabla inexistente, tipo invalido) vuelve
como `{status:'error', error:str}`.
+155
View File
@@ -0,0 +1,155 @@
"""UPSERT idempotente de filas en una tabla DuckDB con ownership selectivo de columnas.
Funcion impura: abre un archivo DuckDB con `duckdb.connect(db_path)` en modo
lectura-escritura, ejecuta un `INSERT ... ON CONFLICT (key_cols) DO UPDATE SET ...`
fila por fila, hace commit y cierra la conexion en un bloque try/finally. Devuelve
un dict sin lanzar excepciones, siguiendo el estilo del grupo duckdb del registry:
{status:'ok', ...} en exito y {status:'error', error:str} en fallo.
El valor de esta funcion es el "ownership selectivo": al actualizar solo las
columnas indicadas en `update_cols` en caso de conflicto, un re-upsert de la misma
clave NO pisa las columnas que se dejaron fuera de `update_cols`. Asi la DB puede
ser la fuente de verdad de ciertos campos (enriquecidos, anotados a mano, derivados)
mientras un proceso de re-ingest refresca solo los campos que aporta.
Identificadores (tabla y columnas) se validan contra `[A-Za-z_][A-Za-z0-9_]*` antes
de interpolarlos en el SQL (DuckDB no permite parametrizar identificadores); los
valores de las filas siempre van por placeholders `?`.
"""
import re
_IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def _validate_ident(name: str) -> str:
"""Valida que `name` sea un identificador SQL seguro y lo devuelve.
Acepta solo nombres que casen `[A-Za-z_][A-Za-z0-9_]*`. Lanza ValueError
para cualquier otro (espacios, comillas, puntos, vacio), que el caller
convierte en {status:'error'}.
"""
if not isinstance(name, str) or not _IDENT_RE.match(name):
raise ValueError(f"identificador invalido: {name!r}")
return name
def duckdb_upsert(
db_path: str,
table: str,
rows: list,
key_cols: list,
update_cols: list = None,
) -> dict:
"""Hace UPSERT idempotente de `rows` en `table`, con ownership selectivo.
Construye `INSERT INTO <table> (cols) VALUES (?,...) ON CONFLICT (key_cols)
DO UPDATE SET col=excluded.col, ...` (o `DO NOTHING`) y lo ejecuta fila por
fila para poder contar inserts vs updates.
Args:
db_path: ruta al archivo DuckDB. Se abre en lectura-escritura
(`duckdb.connect`), por lo que se crea si no existe — pero la tabla
destino debe existir y tener PRIMARY KEY o UNIQUE en `key_cols`.
table: nombre de la tabla destino. Validado como identificador SQL.
rows: lista de dicts, un dict por fila (clave=nombre de columna). El
esquema de insercion lo fija el conjunto de claves de la PRIMERA fila;
todas las filas deben tener exactamente las mismas claves o se devuelve
{status:'error'}. Lista vacia -> {status:'ok', inserted:0, updated:0}.
key_cols: columnas de la clave de conflicto. Deben tener PRIMARY KEY o
UNIQUE en la tabla para que ON CONFLICT funcione. Deben estar presentes
en las claves de las filas.
update_cols: columnas a actualizar en caso de conflicto.
None (default) -> todas las columnas de la fila MENOS las key_cols.
Lista vacia [] -> DO NOTHING (inserta nuevas, no toca existentes).
Lista con columnas -> DO UPDATE SET solo esas (las no listadas
conservan su valor previo: ownership selectivo).
Returns:
dict. En exito: {status:'ok', inserted:int, updated:int} donde inserted
cuenta las claves que no existian y updated las que ya existian (para
update_cols=[] -> DO NOTHING, updated es 0). En error (sin lanzar):
{status:'error', error:str}.
"""
conn = None
try:
if not isinstance(rows, list):
raise ValueError("rows debe ser una lista de dicts")
if not rows:
return {"status": "ok", "inserted": 0, "updated": 0}
# Esquema de insercion = claves de la primera fila, en orden estable.
first_keys = list(rows[0].keys())
insert_cols = [_validate_ident(c) for c in first_keys]
insert_set = set(first_keys)
# Todas las filas deben tener exactamente las mismas claves.
for i, row in enumerate(rows):
if not isinstance(row, dict):
raise ValueError(f"rows[{i}] no es un dict")
if set(row.keys()) != insert_set:
raise ValueError(
f"rows[{i}] tiene columnas distintas a la primera fila: "
f"{sorted(row.keys())} vs {sorted(first_keys)}"
)
keys = [_validate_ident(c) for c in key_cols]
if not keys:
raise ValueError("key_cols no puede estar vacio")
for k in keys:
if k not in insert_set:
raise ValueError(f"key_col {k!r} no esta en las columnas de las filas")
# Resolver update_cols.
if update_cols is None:
updates = [c for c in insert_cols if c not in keys]
else:
updates = [_validate_ident(c) for c in update_cols]
for u in updates:
if u not in insert_set:
raise ValueError(
f"update_col {u!r} no esta en las columnas de las filas"
)
cols_sql = ", ".join(insert_cols)
placeholders = ", ".join(["?"] * len(insert_cols))
conflict_sql = ", ".join(keys)
if updates:
set_sql = ", ".join(f"{c} = excluded.{c}" for c in updates)
on_conflict = f"ON CONFLICT ({conflict_sql}) DO UPDATE SET {set_sql}"
else:
on_conflict = f"ON CONFLICT ({conflict_sql}) DO NOTHING"
sql = (
f"INSERT INTO {table} ({cols_sql}) VALUES ({placeholders}) {on_conflict}"
)
conn = __import__("duckdb").connect(db_path)
# Contamos insert vs update consultando la existencia de la clave antes
# de ejecutar cada fila. Misma conexion/transaccion, single-writer.
where_keys = " AND ".join(f"{k} = ?" for k in keys)
exists_sql = f"SELECT 1 FROM {table} WHERE {where_keys} LIMIT 1"
inserted = 0
updated = 0
for row in rows:
key_vals = [row[k] for k in keys]
existed = conn.execute(exists_sql, key_vals).fetchone() is not None
values = [row[c] for c in insert_cols]
conn.execute(sql, values)
if existed:
updated += 1
else:
inserted += 1
conn.commit()
return {"status": "ok", "inserted": inserted, "updated": updated}
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
finally:
if conn is not None:
conn.close()
@@ -0,0 +1,133 @@
"""Tests para duckdb_upsert."""
import os
import sys
import tempfile
import duckdb
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from infra.duckdb_upsert import duckdb_upsert # noqa: E402
def _fresh_db():
"""Crea un .duckdb temporal con tabla t(k PK, a, b) y devuelve su path."""
fd, path = tempfile.mkstemp(suffix=".duckdb")
os.close(fd)
os.remove(path) # DuckDB crea el archivo limpio.
con = duckdb.connect(path)
con.execute("CREATE TABLE t (k INTEGER PRIMARY KEY, a VARCHAR, b VARCHAR)")
con.close()
return path
def _select_row(path, k):
con = duckdb.connect(path, read_only=True)
try:
return con.execute("SELECT k, a, b FROM t WHERE k = ?", [k]).fetchone()
finally:
con.close()
def test_upsert_fila_nueva_inserta():
path = _fresh_db()
try:
res = duckdb_upsert(
path, "t", [{"k": 1, "a": "a1", "b": "b1"}], key_cols=["k"]
)
assert res == {"status": "ok", "inserted": 1, "updated": 0}
assert _select_row(path, 1) == (1, "a1", "b1")
finally:
os.remove(path)
def test_update_cols_selectivo_no_pisa_columnas_excluidas():
path = _fresh_db()
try:
duckdb_upsert(path, "t", [{"k": 1, "a": "a1", "b": "b1"}], key_cols=["k"])
# Re-upsert de la misma k cambiando a y b en el dict, pero solo
# autorizando actualizar 'a'. 'b' debe conservar el valor viejo.
res = duckdb_upsert(
path,
"t",
[{"k": 1, "a": "a2", "b": "b2"}],
key_cols=["k"],
update_cols=["a"],
)
assert res == {"status": "ok", "inserted": 0, "updated": 1}
assert _select_row(path, 1) == (1, "a2", "b1") # a cambio, b NO
finally:
os.remove(path)
def test_update_cols_vacio_do_nothing_no_cambia_existente():
path = _fresh_db()
try:
duckdb_upsert(path, "t", [{"k": 1, "a": "a1", "b": "b1"}], key_cols=["k"])
res = duckdb_upsert(
path,
"t",
[{"k": 1, "a": "X", "b": "Y"}],
key_cols=["k"],
update_cols=[],
)
assert res == {"status": "ok", "inserted": 0, "updated": 1}
assert _select_row(path, 1) == (1, "a1", "b1") # intacta
finally:
os.remove(path)
def test_varias_filas_a_la_vez_mezcla_insert_y_update():
path = _fresh_db()
try:
duckdb_upsert(path, "t", [{"k": 1, "a": "a1", "b": "b1"}], key_cols=["k"])
res = duckdb_upsert(
path,
"t",
[
{"k": 1, "a": "a1b", "b": "b1b"}, # update
{"k": 2, "a": "a2", "b": "b2"}, # insert
{"k": 3, "a": "a3", "b": "b3"}, # insert
],
key_cols=["k"],
)
assert res == {"status": "ok", "inserted": 2, "updated": 1}
assert _select_row(path, 1) == (1, "a1b", "b1b")
assert _select_row(path, 2) == (2, "a2", "b2")
assert _select_row(path, 3) == (3, "a3", "b3")
finally:
os.remove(path)
def test_rows_vacio_devuelve_cero():
path = _fresh_db()
try:
res = duckdb_upsert(path, "t", [], key_cols=["k"])
assert res == {"status": "ok", "inserted": 0, "updated": 0}
finally:
os.remove(path)
def test_columnas_inconsistentes_devuelve_error():
path = _fresh_db()
try:
res = duckdb_upsert(
path,
"t",
[{"k": 1, "a": "a1", "b": "b1"}, {"k": 2, "a": "a2"}],
key_cols=["k"],
)
assert res["status"] == "error"
finally:
os.remove(path)
def test_identificador_invalido_devuelve_error():
path = _fresh_db()
try:
res = duckdb_upsert(
path, "t; DROP TABLE t", [{"k": 1, "a": "a1", "b": "b1"}], key_cols=["k"]
)
assert res["status"] == "error"
finally:
os.remove(path)