"""Escritura estructurada del service osint_db (DB como fuente de verdad). Estos helpers implementan los endpoints de escritura de /api/person, /api/contact, /api/event, /api/addressbook, /api/calendar y /api/push/dav. El patrón común: 1. Se escribe en la DB DuckDB bajo el lock single-writer del service. 2. El push a Xandikos (CardDAV/CalDAV) y el render DB->nota se hacen DESPUÉS de cerrar la transacción, para no bloquear la DB con la latencia de red. persons es dueña de sus campos estructurados (multi-valor): los singulares telefono/email/direccion se rellenan con el primer elemento de cada lista al materializar la ficha, y la nota Markdown se reescribe SIN tocar su prosa (update_obsidian_note con set_frontmatter hace merge del frontmatter y conserva el body). """ from __future__ import annotations import json import os from datetime import datetime, timezone from .config import Config from .registry_bridge import ( build_vcard, caldav_put_event, carddav_put_vcard, create_obsidian_note, dav_delete_resource, dav_get_resource, dav_list_resources, dav_make_addressbook, dav_make_calendar, duckdb_execute, duckdb_query_readonly, duckdb_upsert, pass_get_secret, update_obsidian_note, ) # Columnas de persons gobernadas por la API estructurada (sin slug, que es la # clave, ni note_path/extra_fm que gestiona el ingest del vault). _PERSON_API_COLS = ( "nombre", "aliases", "sexo", "fecha_nacimiento", "dni", "pais", "contexto", "telefonos", "emails", "direcciones", "tags", ) def _now(): return datetime.now(tz=timezone.utc) def _as_list(value) -> list: """Normaliza a lista de strings no vacíos (string suelto -> [string]).""" if value is None: return [] seq = value if isinstance(value, list) else [value] out = [] for v in seq: s = str(v).strip() if s: out.append(s) return out def _json(value) -> str: return json.dumps(value, ensure_ascii=False, default=str) def _read_person(db_path: str, slug: str) -> dict | None: """Lee una ficha de persons como dict (o None si no existe).""" res = duckdb_query_readonly( db_path, "SELECT slug, note_path, nombre, aliases, sexo, fecha_nacimiento, dni, " "telefono, email, direccion, pais, contexto, fuente, dav_uid, tags, " "telefonos, emails, direcciones, extra_fm FROM persons WHERE slug = ?", [slug], 1, ) if res.get("status") != "ok" or not res.get("rows"): return None return res["rows"][0] def _decode_json_field(value) -> list: """Decodifica un campo JSON de la DB a lista (tolera None/str/list).""" if value is None: return [] if isinstance(value, list): return value try: parsed = json.loads(value) except (TypeError, ValueError): return [] return parsed if isinstance(parsed, list) else [parsed] def _decode_extra_fm(value) -> dict: """Decodifica extra_fm (objeto JSON de la DB) a dict (o {} si no aplica).""" if value is None: return {} if isinstance(value, dict): return value try: parsed = json.loads(value) except (TypeError, ValueError): return {} return parsed if isinstance(parsed, dict) else {} # --------------------------------------------------------------------------- # persons # --------------------------------------------------------------------------- def upsert_person(cfg: Config, slug: str, fields: dict, *, render: bool = True) -> dict: """Crea/actualiza una persona multi-valor y materializa su nota. Escribe los campos estructurados en la DB (la DB es dueña), rellena los singulares con el primer elemento de cada lista, y tras cerrar la escritura materializa la ficha DB->nota (frontmatter OWNED + merge de extra_fm) sin tocar la prosa de la nota. """ slug = (slug or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} telefonos = _as_list(fields.get("telefonos")) emails = _as_list(fields.get("emails")) direcciones = _as_list(fields.get("direcciones")) aliases = _as_list(fields.get("aliases")) tags = _as_list(fields.get("tags")) existing = _read_person(cfg.db_path, slug) note_path = (existing.get("note_path") if existing else None) or os.path.join( "personas", f"{slug}.md" ) row = { "slug": slug, "note_path": note_path, "nombre": (fields.get("nombre") or slug), "aliases": _json(aliases), "sexo": fields.get("sexo"), "fecha_nacimiento": fields.get("fecha_nacimiento"), "dni": fields.get("dni"), "telefono": telefonos[0] if telefonos else None, "email": emails[0] if emails else None, "direccion": direcciones[0] if direcciones else None, "pais": fields.get("pais"), "contexto": fields.get("contexto"), "tags": _json(tags), "telefonos": _json(telefonos), "emails": _json(emails), "direcciones": _json(direcciones), "updated_at": _now(), } # update_cols = todo lo que la API gobierna (no pisa fuente/dav_uid/extra_fm # que pertenecen al ingest del vault). update_cols = [c for c in row if c not in ("slug",)] res = duckdb_upsert( cfg.db_path, "persons", [row], key_cols=["slug"], update_cols=update_cols ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} materialized = False if render: r = render_person(cfg, slug) materialized = r.get("status") == "ok" return { "status": "ok", "slug": slug, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "note_path": note_path, "materialized": materialized, } def delete_person(cfg: Config, slug: str) -> dict: """Borra una ficha de persons de la DB (no borra la nota del vault).""" slug = (slug or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} res = duckdb_execute(cfg.db_path, "DELETE FROM persons WHERE slug = ?", [slug]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} return {"status": "ok", "slug": slug, "deleted": res.get("rowcount", 0)} def render_person(cfg: Config, slug: str) -> dict: """Materializa una ficha DB->nota: frontmatter OWNED + extra_fm, sin prosa. Lee la fila de persons, compone el frontmatter (campos OWNED como listas + merge de extra_fm) y lo escribe en la nota con update_obsidian_note (que conserva el body). Si la nota no existe la crea con un body mínimo. """ slug = (slug or "").strip() person = _read_person(cfg.db_path, slug) if person is None: return {"status": "error", "error": f"persona desconocida: {slug!r}"} rel = person.get("note_path") or os.path.join("personas", f"{slug}.md") if not rel.endswith(".md"): rel = rel + ".md" abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel)) vault_real = os.path.realpath(cfg.vault_dir) if not os.path.realpath(abs_path).startswith(vault_real + os.sep): return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"} telefonos = _decode_json_field(person.get("telefonos")) emails = _decode_json_field(person.get("emails")) direcciones = _decode_json_field(person.get("direcciones")) aliases = _decode_json_field(person.get("aliases")) tags = _decode_json_field(person.get("tags")) frontmatter = { "tipo": "persona", "slug": slug, "nombre": person.get("nombre") or slug, "aliases": aliases, "sexo": person.get("sexo"), "fecha_nacimiento": person.get("fecha_nacimiento"), "dni": person.get("dni"), "telefonos": telefonos, "emails": emails, "direcciones": direcciones, # singulares por compatibilidad con consumidores que aún los leen. "telefono": telefonos[0] if telefonos else None, "email": emails[0] if emails else None, "direccion": direcciones[0] if direcciones else None, "pais": person.get("pais"), "contexto": person.get("contexto"), "fuente": person.get("fuente"), "tags": tags, } # Merge del frontmatter no-owned capturado del vault (no pisa las claves # OWNED de arriba). extra_fm es un objeto JSON (dict) en la DB. extra = _decode_extra_fm(person.get("extra_fm")) if extra: merged = dict(extra) merged.update(frontmatter) frontmatter = merged try: if os.path.exists(abs_path): # set_frontmatter hace merge y NO toca el body (prosa preservada). update_obsidian_note(abs_path, set_frontmatter=frontmatter) else: create_obsidian_note( cfg.vault_dir, rel, body="## Notas\n", frontmatter=frontmatter, ) except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} return {"status": "ok", "slug": slug, "note_path": rel} # --------------------------------------------------------------------------- # contacts (DB -> Xandikos) # --------------------------------------------------------------------------- def _resolve_password(cfg: Config) -> tuple: """Resuelve la password de Xandikos desde pass. Devuelve (pwd|None, error|None).""" secret = pass_get_secret(cfg.pass_secret) if secret.get("status") != "ok": return None, ( f"pass no devolvió el secreto {cfg.pass_secret!r}: {secret.get('error')}" ) return secret["value"], None def _default_collection(cfg: Config) -> str: return cfg.dav_contacts_collection def _person_agenda_extras(db_path: str, note_path) -> tuple: """Devuelve (direcciones, aliases) de la persons enlazada por note_path. El móvil solo recibe datos de agenda: si el contacto está enlazado a una ficha de persons, sus direcciones y aliases se incluyen en el vCard (como ADR y NICKNAME). Los campos OSINT (dni/sexo/fecha_nacimiento/pais/contexto) NUNCA salen de la DB: no se leen aquí. Devuelve ([], []) si no hay enlace. """ if not note_path: return [], [] res = duckdb_query_readonly( db_path, "SELECT direcciones, aliases FROM persons WHERE note_path = ?", [note_path], 1, ) if res.get("status") != "ok" or not res.get("rows"): return [], [] row = res["rows"][0] return ( _decode_json_field(row.get("direcciones")), _decode_json_field(row.get("aliases")), ) def _compose_agenda_vcard(cfg: Config, uid: str, fields: dict) -> str: """Compone el vCard de AGENDA de un contacto, SIN ningún campo OSINT. Incluye FN, TEL×N, EMAIL×N y, si el contacto está enlazado a una ficha de persons (por note_path), las direcciones (ADR×N) y aliases (NICKNAME) de esa persona. NUNCA pasa el dict ``osint`` a build_vcard, así que el vCard jamás lleva líneas X-OSINT-* (DNI/sexo/fecha-nac/país/contexto quedan solo en DuckDB + Obsidian). Args: cfg: configuración del service (db_path para resolver la persona). uid: UID del contacto. fields: dict con fn/nombre, tels/telefonos, emails/correos, direcciones/adrs y, opcionalmente, note_path (enlace a persons). Returns: Texto vCard 3.0 listo para PUT a Xandikos. """ tels = _as_list(fields.get("tels") or fields.get("telefonos")) emails = _as_list(fields.get("emails") or fields.get("correos")) fn = fields.get("fn") or fields.get("nombre") # Direcciones y aliases del contacto explícitos en fields... adrs = _as_list(fields.get("direcciones") or fields.get("adrs")) aliases = _as_list(fields.get("aliases")) # ...más los de la persons enlazada (si la hay), deduplicando. note_path = fields.get("note_path") person_adrs, person_aliases = _person_agenda_extras(cfg.db_path, note_path) adrs = _dedup_keep_order(adrs + person_adrs) aliases = _dedup_keep_order(aliases + person_aliases) contact = { "uid": uid, "fn": fn, "tels": tels, "emails": emails, "adrs": adrs, "aliases": aliases, } # CLAVE DE PRIVACIDAD: no se pasa 'osint' -> no se emite ninguna X-OSINT-*. return build_vcard(contact) def _dedup_keep_order(items: list) -> list: """Deduplica una lista de strings preservando orden (case-insensitive).""" seen, out = set(), [] for it in items: s = str(it).strip() key = s.lower() if s and key not in seen: seen.add(key) out.append(s) return out def _resource_href_tail(uid: str) -> str: """Nombre del recurso .vcf que carddav_put_vcard deriva del UID.""" return _safe_resource(uid) + ".vcf" def _read_etag_after_push(cfg: Config, pwd: str, collection: str, uid: str): """Lee el etag NUEVO del recurso .vcf de un contacto tras su PUT. Lista la colección (PROPFIND Depth:1) y busca el recurso cuyo href termina en el nombre .vcf del uid. Devuelve el etag o None si no se encuentra/falla. Capturar el etag del push propio evita que el sync inverso lo confunda con una edición hecha desde el móvil. """ listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection) if listing.get("status") != "ok": return None tail = _resource_href_tail(uid) for res in listing.get("resources", []): href = res.get("href") or "" if href.rstrip("/").rsplit("/", 1)[-1] == tail: return res.get("etag") return None def upsert_contact(cfg: Config, uid: str, fields: dict) -> dict: """Crea/actualiza un contacto en la DB y lo empuja a Xandikos (PUT vCard). La escritura DB se hace bajo el lock; el push DAV ocurre después. """ uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} tels = _as_list(fields.get("tels") or fields.get("telefonos")) emails = _as_list(fields.get("emails") or fields.get("correos")) fn = fields.get("fn") or fields.get("nombre") collection = fields.get("collection") or _default_collection(cfg) # Si el contacto ya existe y está enlazado a una ficha, hereda su note_path # para que el vCard de agenda incluya las direcciones/aliases de la persona. note_path = fields.get("note_path") if note_path is None: existing = duckdb_query_readonly( cfg.db_path, "SELECT note_path FROM contacts WHERE uid = ?", [uid], 1 ) if existing.get("status") == "ok" and existing.get("rows"): note_path = existing["rows"][0].get("note_path") # vCard de AGENDA: nunca lleva X-OSINT-* (privacidad del móvil). vcard = _compose_agenda_vcard(cfg, uid, {**fields, "note_path": note_path}) row = { "uid": uid, "collection": collection, "etag": None, "fn": fn, "tels": _json(tels), "emails": _json(emails), "raw": vcard, "note_path": note_path, "updated_at": _now(), } res = duckdb_upsert( cfg.db_path, "contacts", [row], key_cols=["uid"], update_cols=["collection", "fn", "tels", "emails", "raw", "note_path", "updated_at"], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # Push DB -> Xandikos fuera de cualquier transacción de la DB. pwd, err = _resolve_password(cfg) pushed = None etag = None if err is None: push = carddav_put_vcard( cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard ) pushed = push.get("status") == "ok" if pushed: # Captura el etag NUEVO del recurso para que el sync inverso no # confunda este push propio con una edición del móvil. etag = _read_etag_after_push(cfg, pwd, collection, uid) if etag: duckdb_execute( cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid], ) return { "status": "ok", "uid": uid, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "pushed": pushed, "etag": etag, "push_error": err, } def delete_contact(cfg: Config, uid: str) -> dict: """Borra un contacto de la DB y del servidor Xandikos (DELETE del recurso).""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} person = duckdb_query_readonly( cfg.db_path, "SELECT collection FROM contacts WHERE uid = ?", [uid], 1 ) collection = _default_collection(cfg) if person.get("status") == "ok" and person.get("rows"): collection = person["rows"][0].get("collection") or collection res = duckdb_execute(cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # Borrado remoto del recurso .vcf (DESTRUCTIVO, explícito por el endpoint). pwd, err = _resolve_password(cfg) deleted_remote = None if err is None: resource = collection.rstrip("/") + "/" + _safe_resource(uid) + ".vcf" rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource) deleted_remote = rm.get("status") == "ok" return { "status": "ok", "uid": uid, "deleted": res.get("rowcount", 0), "deleted_remote": deleted_remote, "push_error": err, } def _safe_resource(uid: str) -> str: """Sanea un UID al mismo nombre de recurso que carddav_put_vcard/caldav_put_event.""" import re return re.sub(r"[^A-Za-z0-9_.-]", "_", uid)[:120] # --------------------------------------------------------------------------- # events (DB -> Xandikos) # --------------------------------------------------------------------------- def _ical_escape(value) -> str: """Escapa un valor de texto para una propiedad iCalendar (RFC 5545). Evita inyección de propiedades/componentes: un summary/location con saltos de línea o `;`/`,` no puede cerrar el VEVENT ni abrir otro. El `\\r` se elimina (el folding lo aporta el `\\r\\n` de la serialización). """ return ( str(value) .replace("\\", "\\\\") .replace("\r", "") .replace("\n", "\\n") .replace(",", "\\,") .replace(";", "\\;") ) def _ical_sanitize(value) -> str: """Quita saltos de línea de un valor estructurado (UID, RRULE) para evitar que se inyecten propiedades nuevas. No escapa `;`/`,` porque son separadores legítimos en RRULE.""" return str(value).replace("\r", "").replace("\n", "") def _build_vcalendar(uid: str, fields: dict) -> str: """Compone un VCALENDAR mínimo con un VEVENT desde los campos del evento.""" dtstart = (fields.get("dtstart") or "").replace("-", "").replace(":", "") dtend = (fields.get("dtend") or "").replace("-", "").replace(":", "") lines = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//osint_db//events//EN", "BEGIN:VEVENT", f"UID:{_ical_sanitize(uid)}", f"SUMMARY:{_ical_escape(fields.get('summary') or '')}", ] if dtstart: lines.append(f"DTSTART:{_ical_sanitize(dtstart)}") if dtend: lines.append(f"DTEND:{_ical_sanitize(dtend)}") if fields.get("location"): lines.append(f"LOCATION:{_ical_escape(fields['location'])}") if fields.get("rrule"): lines.append(f"RRULE:{_ical_sanitize(fields['rrule'])}") lines += ["END:VEVENT", "END:VCALENDAR"] return "\r\n".join(lines) + "\r\n" def upsert_event(cfg: Config, uid: str, fields: dict) -> dict: """Crea/actualiza un evento en la DB y lo empuja a Xandikos (PUT VCALENDAR).""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} calendar = fields.get("calendar") or "default" raw = _build_vcalendar(uid, fields) row = { "uid": uid, "calendar": calendar, "etag": None, "dtstart": fields.get("dtstart"), "dtend": fields.get("dtend"), "all_day": bool(fields.get("all_day")), "summary": fields.get("summary"), "location": fields.get("location"), "rrule": fields.get("rrule"), "raw": raw, "updated_at": _now(), } res = duckdb_upsert( cfg.db_path, "events", [row], key_cols=["uid"], update_cols=[ "calendar", "dtstart", "dtend", "all_day", "summary", "location", "rrule", "raw", "updated_at", ], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # El calendario CalDAV destino se resuelve por su path; usamos el calendar # home + slug del calendario. Push fuera de transacción. pwd, err = _resolve_password(cfg) pushed = None if err is None: collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" push = caldav_put_event( cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw ) pushed = push.get("status") == "ok" return { "status": "ok", "uid": uid, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "pushed": pushed, "push_error": err, } def delete_event(cfg: Config, uid: str) -> dict: """Borra un evento de la DB y del servidor Xandikos.""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} row = duckdb_query_readonly( cfg.db_path, "SELECT calendar FROM events WHERE uid = ?", [uid], 1 ) calendar = "default" if row.get("status") == "ok" and row.get("rows"): calendar = row["rows"][0].get("calendar") or calendar res = duckdb_execute(cfg.db_path, "DELETE FROM events WHERE uid = ?", [uid]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} pwd, err = _resolve_password(cfg) deleted_remote = None if err is None: collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" resource = collection + _safe_resource(uid) + ".ics" rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource) deleted_remote = rm.get("status") == "ok" return { "status": "ok", "uid": uid, "deleted": res.get("rowcount", 0), "deleted_remote": deleted_remote, "push_error": err, } # --------------------------------------------------------------------------- # addressbooks / calendars # --------------------------------------------------------------------------- def make_addressbook(cfg: Config, fields: dict) -> dict: """Crea una libreta CardDAV en Xandikos y la registra en la tabla addressbooks.""" slug = (fields.get("slug") or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} display_name = fields.get("display_name") or "" description = fields.get("description") or "" color = fields.get("color") or "" pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} # contacts_home = el directorio padre de la colección por defecto # (/enmanuel/contacts/addressbook/ -> /enmanuel/contacts/). contacts_home = "/" + "/".join( cfg.dav_contacts_collection.strip("/").split("/")[:-1] ) if not contacts_home.endswith("/"): contacts_home += "/" mk = dav_make_addressbook( cfg.dav_base, cfg.dav_user, pwd, contacts_home, slug, display_name, description, ) if mk.get("status") != "ok": return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")} collection_path = mk.get("href") or (contacts_home + slug + "/") row = { "slug": slug, "display_name": display_name or slug, "collection_path": collection_path, "description": description or None, "color": color or None, "created_at": _now(), } res = duckdb_upsert( cfg.db_path, "addressbooks", [row], key_cols=["slug"], update_cols=["display_name", "collection_path", "description", "color"], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} return { "status": "ok", "slug": slug, "collection_path": collection_path, "existed": mk.get("existed", False), } def make_calendar(cfg: Config, fields: dict) -> dict: """Crea un calendario CalDAV en Xandikos (paridad con make_addressbook).""" slug = (fields.get("slug") or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} display_name = fields.get("display_name") or "" color = fields.get("color") or "" pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} mk = dav_make_calendar( cfg.dav_base, cfg.dav_user, pwd, cfg.dav_calendar_home, slug, display_name, color, ) if mk.get("status") != "ok": return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")} return { "status": "ok", "slug": slug, "href": mk.get("href"), "existed": mk.get("existed", False), } # --------------------------------------------------------------------------- # push masivo DB -> Xandikos # --------------------------------------------------------------------------- def push_all_dav(cfg: Config) -> dict: """Reconcilia en bloque: empuja todos los contacts y events de la DB a Xandikos. Útil tras una migración para volcar lo que vive solo en la DB. Devuelve los conteos de éxito/fallo por tipo. NO borra nada en remoto (solo PUT). """ pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} contacts = duckdb_query_readonly( cfg.db_path, "SELECT uid, collection, fn, tels, emails, note_path FROM contacts", [], 1000000, ) c_ok = c_fail = 0 if contacts.get("status") == "ok": for row in contacts["rows"]: uid = row["uid"] collection = row.get("collection") or _default_collection(cfg) # vCard de AGENDA: compone con la persona enlazada (direcciones + # aliases) pero SIN ningún campo OSINT (privacidad del móvil). vcard = _compose_agenda_vcard( cfg, uid, { "fn": row.get("fn"), "tels": _decode_json_field(row.get("tels")), "emails": _decode_json_field(row.get("emails")), "note_path": row.get("note_path"), }, ) push = carddav_put_vcard( cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard ) if push.get("status") == "ok": c_ok += 1 # Captura el etag nuevo del push (sync inverso fiable). etag = _read_etag_after_push(cfg, pwd, collection, uid) if etag: duckdb_execute( cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid], ) else: c_fail += 1 events = duckdb_query_readonly( cfg.db_path, "SELECT uid, calendar, raw FROM events", [], 1000000 ) e_ok = e_fail = 0 if events.get("status") == "ok": for row in events["rows"]: uid = row["uid"] calendar = row.get("calendar") or "default" collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" raw = row.get("raw") or _build_vcalendar(uid, {}) push = caldav_put_event( cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw ) if push.get("status") == "ok": e_ok += 1 else: e_fail += 1 return { "status": "ok", "contacts_pushed": c_ok, "contacts_failed": c_fail, "events_pushed": e_ok, "events_failed": e_fail, } # --------------------------------------------------------------------------- # sync inverso Xandikos -> DB (pull incremental por etag) # --------------------------------------------------------------------------- def pull_dav(cfg: Config) -> dict: """Trae a la DB las ediciones del móvil/DAVx5, last-write-wins por etag. A diferencia de ``ingest_dav`` (que hace DELETE + INSERT ciego de TODAS las colecciones), este pull es INCREMENTAL y respeta la verdad de la DB salvo donde el etag prueba un cambio externo: 1. Para cada colección registrada en ``addressbooks`` lista los recursos (PROPFIND Depth:1 -> [{href, etag}]). 2. Por recurso: si su etag difiere del ``contacts.etag`` guardado (o el uid no existe en la DB) -> GET + parse + upsert con el etag nuevo. Si el etag coincide -> no se toca (la DB ya está al día). 3. Los uids que la DB tenía en esa colección y que YA no aparecen en el PROPFIND se borran (el móvil los eliminó). 4. Tras el pull se re-enlazan los contactos con sus fichas y se reconstruyen las derivadas (bajo el lock single-writer). Devuelve {status:'ok', pulled, updated, deleted, unchanged} o {status:'error', error}. """ # Late imports: evitan ciclo writes<->ingest a nivel de módulo y reusan la # lógica de colecciones + enlace + derivadas ya existente (registry-first). from . import davparse from .db import write_conn from .derived import rebuild_derived from .ingest import _addressbook_collections, _link_contacts pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} collections = _addressbook_collections(cfg) pulled = updated = deleted = unchanged = 0 for collection in collections: listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection) if listing.get("status") != "ok": return { "status": "error", "error": f"PROPFIND {collection}: {listing.get('error')} " f"(http {listing.get('http_status')})", } # Estado actual de la DB para esta colección: uid -> etag. db_state = duckdb_query_readonly( cfg.db_path, "SELECT uid, etag FROM contacts WHERE collection = ?", [collection], 1000000, ) db_etags: dict = {} if db_state.get("status") == "ok": db_etags = {r["uid"]: r.get("etag") for r in db_state["rows"]} seen_uids: set = set() for res in listing.get("resources", []): href = res.get("href") or "" remote_etag = res.get("etag") # GET solo cuando el etag cambió o el recurso es nuevo en la DB. # uid provisional desde el nombre del recurso para el cruce rápido; # el uid real se toma del vCard tras el GET. res_name = href.rstrip("/").rsplit("/", 1)[-1] guess_uid = os.path.splitext(res_name)[0] # Si ya conocemos este uid (por nombre) y el etag coincide -> skip. if guess_uid in db_etags and db_etags[guess_uid] == remote_etag and remote_etag: seen_uids.add(guess_uid) unchanged += 1 continue got = dav_get_resource(cfg.dav_base, cfg.dav_user, pwd, href) if got.get("status") != "ok": # Un recurso ilegible no aborta el pull entero. continue parsed = davparse.parse_vcard(got.get("text", "")) uid = parsed["uid"] or guess_uid seen_uids.add(uid) existed = uid in db_etags # Confirmación con el uid real: si el etag ya casa, no es cambio. if existed and db_etags[uid] == remote_etag and remote_etag: unchanged += 1 continue row = { "uid": uid, "collection": collection, "etag": remote_etag, "fn": parsed["fn"] or None, "tels": _json(parsed["tels"]), "emails": _json(parsed["emails"]), "raw": got.get("text", ""), # note_path se re-enlaza después; preserva el existente si lo había. "note_path": None, "updated_at": _now(), } up = duckdb_upsert( cfg.db_path, "contacts", [row], key_cols=["uid"], update_cols=["collection", "etag", "fn", "tels", "emails", "raw", "updated_at"], ) if up.get("status") != "ok": return {"status": "error", "error": up.get("error")} pulled += 1 if existed: updated += 1 # Borra los uids que la DB tenía en esta colección y ya no están remotos. for uid in db_etags: if uid not in seen_uids: rm = duckdb_execute( cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid] ) if rm.get("status") == "ok": deleted += rm.get("rowcount", 0) # Re-enlace de contactos + reconstrucción de derivadas, bajo el lock. with write_conn(cfg.db_path) as conn: conn.execute("BEGIN") try: _link_contacts(conn) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise rebuild_derived(conn) return { "status": "ok", "pulled": pulled, "updated": updated, "deleted": deleted, "unchanged": unchanged, }