"""Ingests del service osint_db: vault Obsidian y servidor DAV (Xandikos). Inversión "DuckDB como fuente de verdad": las tablas de espejo puro (notes, y las entidades que solo guardan frontmatter JSON: organizations/domains/cases/ places) se reconstruyen por reemplazo completo (DELETE + INSERT). Pero persons ya NO es un espejo del frontmatter: la DB es la dueña de sus campos estructurados (teléfonos, emails, direcciones, dni, ...), que se editan por la API y se materializan a la nota. Por eso el ingest del vault es SELECTIVO para persons: - slug que YA existe en la DB -> solo refresca note_path + extra_fm (el frontmatter no-owned); los campos OWNED de la DB se conservan. - slug NUEVO -> INSERT completo (bootstrap): adopta el frontmatter como valor inicial, poblando las listas multi-valor desde los singulares o las listas del frontmatter. contacts y events siguen siendo espejo puro de Xandikos (DELETE + INSERT). Tras cada ingest se re-enlazan los contactos con sus fichas y se reconstruyen las derivadas. Todo bajo el lock single-writer del service. """ from __future__ import annotations import json import os import re from datetime import datetime, timezone from . import davparse from .config import Config from .db import write_conn from .derived import rebuild_derived from .registry_bridge import ( dav_get_collection, dav_list_addressbooks, dav_list_calendars, duckdb_upsert, list_obsidian_notes, pass_get_secret, read_obsidian_note, ) # Campos de persons de los que la DB es dueña: un re-ingest del vault NO los # pisa cuando la ficha ya existe. Incluye los singulares (derivados de las # listas) y fuente (origen de la ficha). dav_uid se deriva de fuente. PERSON_OWNED = ( "nombre", "aliases", "sexo", "fecha_nacimiento", "dni", "telefono", "email", "direccion", "telefonos", "emails", "direcciones", "pais", "contexto", "fuente", "dav_uid", "tags", ) # Claves de control del frontmatter que no van a extra_fm (ni son OWNED): las # gestiona el propio ingest o son metadata de la nota. PERSON_CONTROL = ("slug", "tipo", "note_path") # Orden de columnas de la tabla persons (debe casar con migrations 001 + 002). PERSON_COLUMNS = ( "slug", "note_path", "nombre", "aliases", "sexo", "fecha_nacimiento", "dni", "telefono", "email", "direccion", "pais", "contexto", "fuente", "dav_uid", "tags", "updated_at", "telefonos", "emails", "direcciones", "extra_fm", ) def _norm(value): """Normaliza 'null'/''/None del frontmatter a None real.""" if value is None: return None if isinstance(value, str) and value.strip().lower() in ("null", "none", ""): return None return value def _as_str(value): """Convierte un valor de frontmatter a str (o None), sin perder números.""" v = _norm(value) return None if v is None else str(v) def _as_list(value) -> list: """Convierte un valor de frontmatter a lista (los escalares se envuelven).""" v = _norm(value) if v is None: return [] return v if isinstance(v, list) else [v] def _multi(fm: dict, plural: str, singular: str) -> list: """Lista multi-valor desde el frontmatter: prioriza la clave plural. Si el frontmatter trae la clave plural (telefonos/emails/direcciones) la usa; si no, envuelve el singular (telefono/email/direccion) en lista. Cada valor se normaliza a str y se descartan los vacíos. """ raw = _as_list(fm.get(plural)) or _as_list(fm.get(singular)) out = [] for item in raw: s = _as_str(item) if s: out.append(s) return out def _json(value) -> str: """Serializa un valor a JSON compacto (sin escapar acentos).""" return json.dumps(value, ensure_ascii=False, default=str) def _dav_uid_from_fuente(fuente) -> str | None: """Extrae el UID de Xandikos cuando fuente es 'Xandikos UID '.""" if not fuente: return None m = re.search(r"Xandikos UID\s+(\S+)", str(fuente)) return m.group(1) if m else None def _extra_fm(fm: dict) -> str: """JSON con las claves del frontmatter que NO son OWNED ni de control. Permite que un re-ingest refresque el frontmatter no estructurado de la ficha (campos libres que la DB no posee) sin pisar lo que la DB sí posee. Las claves plurales multi-valor también se excluyen (son OWNED). """ skip = set(PERSON_OWNED) | set(PERSON_CONTROL) extra = {k: v for k, v in fm.items() if k not in skip} return _json(extra) def _person_row_from_fm(slug: str, rel_path: str, fm: dict, mtime, base: str) -> dict: """Construye la fila completa de persons desde el frontmatter (bootstrap). Se usa SOLO para fichas cuyo slug aún no existe en la DB: adopta el frontmatter como valor inicial de los campos OWNED, derivando las listas multi-valor desde el plural o el singular del frontmatter y rellenando los singulares con el primer elemento de cada lista. """ telefonos = _multi(fm, "telefonos", "telefono") emails = _multi(fm, "emails", "email") direcciones = _multi(fm, "direcciones", "direccion") return { "slug": slug, "note_path": rel_path, "nombre": _as_str(fm.get("nombre")) or base, "aliases": _json(_as_list(fm.get("aliases"))), "sexo": _as_str(fm.get("sexo")), "fecha_nacimiento": _as_str(fm.get("fecha_nacimiento")), "dni": _as_str(fm.get("dni")), "telefono": telefonos[0] if telefonos else None, "email": emails[0] if emails else None, "direccion": direcciones[0] if direcciones else None, "pais": _as_str(fm.get("pais")), "contexto": _as_str(fm.get("contexto")), "fuente": _as_str(fm.get("fuente")), "dav_uid": _dav_uid_from_fuente(fm.get("fuente")), "tags": _json(_as_list(fm.get("tags"))), "updated_at": mtime, "telefonos": _json(telefonos), "emails": _json(emails), "direcciones": _json(direcciones), "extra_fm": _extra_fm(fm), } def ingest_vault(cfg: Config) -> dict: """Escanea el vault completo y reconstruye notes + tablas de entidades. notes y las entidades de espejo puro (organizations/domains/cases/places) se reemplazan por completo. persons se ingesta de forma SELECTIVA: las fichas existentes solo refrescan note_path + extra_fm (conservando los campos OWNED de la DB), las nuevas se insertan completas (bootstrap). Devuelve {status:'ok', notes:N, persons:N, persons_inserted:N, persons_updated:N, organizations:N, domains:N, cases:N, places:N, skipped_unreadable:N, derived_rebuilt:[...]}. """ if not os.path.isdir(cfg.vault_dir): return {"status": "error", "error": f"vault no encontrado: {cfg.vault_dir}"} note_rows: list = [] # Filas de espejo puro (todas las entidades menos persons). mirror_rows: dict = { table: [] for _, _, table in cfg.entity_folders if table != "persons" } # Fichas de persona, deduplicadas por slug, como dicts {slug, fm, ...}. person_fichas: dict = {} folder_to_table = {folder: table for folder, _, table in cfg.entity_folders} skipped = 0 for abs_path in list_obsidian_notes(cfg.vault_dir): rel_path = os.path.relpath(abs_path, cfg.vault_dir) base = os.path.splitext(os.path.basename(abs_path))[0] try: note = read_obsidian_note(abs_path) except Exception: # noqa: BLE001 — una nota corrupta no aborta el ingest skipped += 1 continue fm = note.get("frontmatter") or {} mtime = datetime.fromtimestamp(os.path.getmtime(abs_path), tz=timezone.utc) slug = _as_str(fm.get("slug")) or base note_rows.append( [ rel_path, slug, _as_str(fm.get("tipo")), _as_str(fm.get("nombre")) or base, mtime, _json(fm), ] ) # Entidad estructurada: ficha de nivel-1 dentro de una carpeta de # entidades (personas/.md, no personas//.md) y que # no sea una nota de soporte (prefijo _). top_folder = rel_path.split(os.sep)[0] is_level1 = os.path.basename(os.path.dirname(abs_path)) == top_folder if top_folder in folder_to_table and is_level1 and not base.startswith("_"): table = folder_to_table[top_folder] if table == "persons": # Gana la primera ficha vista con cada slug (respeta la PK). if slug not in person_fichas: person_fichas[slug] = { "slug": slug, "rel_path": rel_path, "fm": fm, "mtime": mtime, "base": base, } else: mirror_rows[table].append( [ slug, rel_path, _as_str(fm.get("nombre")) or base, _json(_as_list(fm.get("tags"))), _json(fm), mtime, ] ) with write_conn(cfg.db_path) as conn: conn.execute("BEGIN") try: conn.execute("DELETE FROM notes") if note_rows: conn.executemany( "INSERT INTO notes VALUES (?, ?, ?, ?, ?, ?)", note_rows ) for table in ("organizations", "domains", "cases", "places"): conn.execute(f"DELETE FROM {table}") if mirror_rows[table]: conn.executemany( f"INSERT INTO {table} VALUES (?, ?, ?, ?, ?, ?)", _dedup_by_slug(mirror_rows[table]), ) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise # persons: ingest selectivo via duckdb_upsert (ownership de campos OWNED). # Se hace fuera de la transacción de espejo puro pero bajo el mismo lock de # proceso (write_conn ya lo liberó): single-writer respetado. p_inserted, p_updated = _ingest_persons_selective(cfg.db_path, person_fichas) # Re-enlace de contactos + derivadas, de nuevo bajo el lock de escritura. with write_conn(cfg.db_path) as conn: conn.execute("BEGIN") try: _link_contacts(conn) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise derived_rebuilt = rebuild_derived(conn) return { "status": "ok", "notes": len(note_rows), "persons": len(person_fichas), "persons_inserted": p_inserted, "persons_updated": p_updated, "organizations": len(_dedup_by_slug(mirror_rows["organizations"])), "domains": len(_dedup_by_slug(mirror_rows["domains"])), "cases": len(_dedup_by_slug(mirror_rows["cases"])), "places": len(_dedup_by_slug(mirror_rows["places"])), "skipped_unreadable": skipped, "derived_rebuilt": derived_rebuilt, } def _ingest_persons_selective(db_path: str, person_fichas: dict) -> tuple: """Upsert selectivo de persons: existentes solo note_path+extra_fm, nuevas full. Lee qué slugs existen ya en la DB y reparte las fichas en dos lotes: - existentes -> duckdb_upsert con update_cols=['note_path','extra_fm'], de modo que los campos OWNED de la DB no se pisan. - nuevas -> duckdb_upsert con update_cols=None (full insert de bootstrap). Devuelve (inserted, updated). Bajo el lock single-writer del proceso: duckdb_upsert abre su propia conexión, pero DuckDB comparte la instancia de la base dentro del proceso, así que no hay conflicto de lock. Raises: RuntimeError: si algún upsert de persons devuelve status 'error'. """ if not person_fichas: return 0, 0 # Slugs ya presentes en la DB (lectura read-only, sin tocar el writer). import duckdb conn = duckdb.connect(db_path, read_only=True) try: existing = {row[0] for row in conn.execute("SELECT slug FROM persons").fetchall()} finally: conn.close() existing_rows: list = [] new_rows: list = [] for slug, ficha in person_fichas.items(): full = _person_row_from_fm( ficha["slug"], ficha["rel_path"], ficha["fm"], ficha["mtime"], ficha["base"] ) if slug in existing: # Solo los campos que el vault sigue gobernando para fichas vivas: # dónde está la nota y el frontmatter no-owned. existing_rows.append( { "slug": full["slug"], "note_path": full["note_path"], "extra_fm": full["extra_fm"], } ) else: new_rows.append(full) inserted = updated = 0 if existing_rows: res = duckdb_upsert( db_path, "persons", existing_rows, key_cols=["slug"], update_cols=["note_path", "extra_fm"], ) if res.get("status") != "ok": raise RuntimeError(f"persons upsert (existentes): {res.get('error')}") updated += res.get("updated", 0) # Filas "existentes" que en realidad ya no estaban (carrera) se insertan. inserted += res.get("inserted", 0) if new_rows: res = duckdb_upsert( db_path, "persons", new_rows, key_cols=["slug"], update_cols=None ) if res.get("status") != "ok": raise RuntimeError(f"persons upsert (nuevas): {res.get('error')}") inserted += res.get("inserted", 0) updated += res.get("updated", 0) return inserted, updated def _dedup_by_slug(rows: list) -> list: """Quita filas con slug repetido (gana la primera) para respetar la PK.""" seen, out = set(), [] for row in rows: if row[0] in seen: continue seen.add(row[0]) out.append(row) return out def ingest_dav(cfg: Config) -> dict: """Baja las colecciones de Xandikos y reconstruye contacts + events. Itera TODAS las libretas CardDAV registradas en la tabla addressbooks (no solo la colección fija): cada contacto guarda su collection real. Si la tabla está vacía cae a la colección por defecto de la config. Devuelve {status:'ok', contacts:N, events:N, addressbooks:[...], calendars:[...], contacts_linked:N, derived_rebuilt:[...]} o {status:'error', error}. """ secret = pass_get_secret(cfg.pass_secret) if secret.get("status") != "ok": return { "status": "error", "error": f"pass no devolvió el secreto {cfg.pass_secret!r}: " f"{secret.get('error')}", } pwd = secret["value"] # sensible: nunca logear collections = _addressbook_collections(cfg) now = datetime.now(tz=timezone.utc) contact_rows: list = [] seen_uids: set = set() used_addressbooks: list = [] for collection in collections: coll = dav_get_collection( cfg.dav_base, cfg.dav_user, pwd, collection, "vcard" ) if coll.get("status") != "ok": return { "status": "error", "error": f"CardDAV {collection}: {coll.get('error')} " f"(http {coll.get('http_status')})", } used_addressbooks.append(collection) for res in coll.get("resources", []): parsed = davparse.parse_vcard(res.get("data", "")) uid = parsed["uid"] or os.path.splitext(os.path.basename(res["href"]))[0] if uid in seen_uids: continue seen_uids.add(uid) contact_rows.append( [ uid, collection, res.get("etag"), parsed["fn"] or None, _json(parsed["tels"]), _json(parsed["emails"]), res.get("data", ""), None, # note_path se rellena en el enlace posterior now, ] ) cals = dav_list_calendars(cfg.dav_base, cfg.dav_user, pwd, cfg.dav_calendar_home) if cals.get("status") != "ok": return { "status": "error", "error": f"CalDAV: {cals.get('error')} (http {cals.get('http_status')})", } event_rows: list = [] seen_event_uids: set = set() calendar_names: list = [] for cal in cals.get("calendars", []): cal_name = cal.get("name") or cal.get("href", "").strip("/").rsplit("/", 1)[-1] calendar_names.append(cal_name) cal_coll = dav_get_collection( cfg.dav_base, cfg.dav_user, pwd, cal["href"], "ical" ) if cal_coll.get("status") != "ok": return { "status": "error", "error": f"CalDAV {cal_name}: {cal_coll.get('error')} " f"(http {cal_coll.get('http_status')})", } for res in cal_coll.get("resources", []): for ev in davparse.parse_ical_events(res.get("data", "")): uid = ev["uid"] or os.path.splitext(os.path.basename(res["href"]))[0] if uid in seen_event_uids: continue seen_event_uids.add(uid) event_rows.append( [ uid, cal_name, res.get("etag"), ev["dtstart"] or None, ev["dtend"] or None, ev["all_day"], ev["summary"] or None, ev["location"], ev["rrule"], ev["raw"], now, ] ) with write_conn(cfg.db_path) as conn: conn.execute("BEGIN") try: conn.execute("DELETE FROM contacts") if contact_rows: conn.executemany( "INSERT INTO contacts VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", contact_rows, ) conn.execute("DELETE FROM events") if event_rows: conn.executemany( "INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", event_rows, ) linked = _link_contacts(conn) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise derived_rebuilt = rebuild_derived(conn) return { "status": "ok", "contacts": len(contact_rows), "events": len(event_rows), "addressbooks": used_addressbooks, "calendars": calendar_names, "contacts_linked": linked, "derived_rebuilt": derived_rebuilt, } def _addressbook_collections(cfg: Config) -> list: """Colecciones CardDAV a recorrer en el ingest DAV. Fuente de verdad: la tabla addressbooks de la DB. Si está vacía (o ilegible) cae a la colección por defecto de la config para no romper el ingest. """ try: import duckdb conn = duckdb.connect(cfg.db_path, read_only=True) try: rows = conn.execute( "SELECT collection_path FROM addressbooks " "WHERE collection_path IS NOT NULL ORDER BY slug" ).fetchall() finally: conn.close() collections = [r[0] for r in rows if r[0]] except Exception: # noqa: BLE001 — tabla ausente/ilegible -> default collections = [] if not collections: collections = [cfg.dav_contacts_collection] return collections def _link_contacts(conn) -> int: """Enlaza contacts.note_path contra las fichas de persons. Orden de matching por fiabilidad: UID estilo osint- (creado por el push del vault), dav_uid registrado en la ficha, teléfono normalizado (singular o cualquiera de la lista telefonos[]) y por último email (singular o cualquiera de emails[]). Devuelve el número de contactos enlazados. """ persons = conn.execute( "SELECT slug, note_path, telefono, email, dav_uid, telefonos, emails " "FROM persons" ).fetchall() by_slug, by_dav_uid, by_phone, by_email = {}, {}, {}, {} for slug, note_path, telefono, email, dav_uid, telefonos, emails in persons: by_slug[slug] = note_path if dav_uid: by_dav_uid.setdefault(dav_uid, note_path) # Teléfonos: singular + todos los de la lista multi-valor. for tel in _coalesce_values(telefono, telefonos): key = davparse.norm_phone(tel) if key: by_phone.setdefault(key, note_path) # Emails: singular + todos los de la lista multi-valor. for em in _coalesce_values(email, emails): by_email.setdefault(str(em).strip().lower(), note_path) contacts = conn.execute("SELECT uid, tels, emails FROM contacts").fetchall() linked = 0 for uid, tels_json, emails_json in contacts: note_path = None if uid.startswith("osint-") and uid[len("osint-"):] in by_slug: note_path = by_slug[uid[len("osint-"):]] if note_path is None and uid in by_dav_uid: note_path = by_dav_uid[uid] if note_path is None: for tel in json.loads(tels_json or "[]"): hit = by_phone.get(davparse.norm_phone(tel)) if hit: note_path = hit break if note_path is None: for em in json.loads(emails_json or "[]"): hit = by_email.get(str(em).strip().lower()) if hit: note_path = hit break if note_path is not None: conn.execute( "UPDATE contacts SET note_path = ? WHERE uid = ?", [note_path, uid] ) linked += 1 return linked def _coalesce_values(singular, list_json) -> list: """Une el valor singular con la lista JSON multi-valor (sin vacíos ni dups).""" out, seen = [], set() candidates = [] if singular: candidates.append(singular) try: parsed = json.loads(list_json) if list_json else [] except (TypeError, ValueError): parsed = [] if isinstance(parsed, list): candidates.extend(parsed) for v in candidates: s = str(v).strip() key = s.lower() if s and key not in seen: seen.add(key) out.append(s) return out