Files
egutierrez d53d7a9a7e fix(events): envolver VEVENT en VCALENDAR al push (Xandikos 500) + INSERT explicito en contacts (columna import_key)
El raw de un evento guardaba solo BEGIN:VEVENT...END:VEVENT; subirlo a CalDAV
genera un .ics invalido que rompe Xandikos (assert isinstance(cal, Calendar) ->
500 en todo el calendario). _ensure_vcalendar lo envuelve en el push. Ademas, la
columna import_key (migracion 004) rompia los INSERT posicionales de contacts:
ahora son explicitos por columna y el ingest puebla import_key con la funcion del
registry. Tests actualizados (4 derivadas, INSERT explicito).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 12:15:27 +02:00

637 lines
23 KiB
Python

"""Ingests del service osint_db: vault Obsidian y servidor DAV (Xandikos).
Inversión "DuckDB como fuente de verdad": las tablas de espejo puro (notes, y
las entidades que solo guardan frontmatter JSON: organizations/domains/cases/
places) se reconstruyen por reemplazo completo (DELETE + INSERT). Pero persons
ya NO es un espejo del frontmatter: la DB es la dueña de sus campos estructurados
(teléfonos, emails, direcciones, dni, ...), que se editan por la API y se
materializan a la nota. Por eso el ingest del vault es SELECTIVO para persons:
- slug que YA existe en la DB -> solo refresca note_path + extra_fm (el
frontmatter no-owned); los campos OWNED de la DB se conservan.
- slug NUEVO -> INSERT completo (bootstrap): adopta el frontmatter como valor
inicial, poblando las listas multi-valor desde los singulares o las listas
del frontmatter.
contacts y events siguen siendo espejo puro de Xandikos (DELETE + INSERT). Tras
cada ingest se re-enlazan los contactos con sus fichas y se reconstruyen las
derivadas. Todo bajo el lock single-writer del service.
"""
from __future__ import annotations
import json
import os
import re
from datetime import datetime, timezone
from . import davparse
from .config import Config
from .db import write_conn
from .derived import rebuild_derived
from .registry_bridge import (
contact_import_key,
dav_get_collection,
dav_list_addressbooks,
dav_list_calendars,
duckdb_upsert,
list_obsidian_notes,
pass_get_secret,
read_obsidian_note,
)
# Campos de persons de los que la DB es dueña: un re-ingest del vault NO los
# pisa cuando la ficha ya existe. Incluye los singulares (derivados de las
# listas) y fuente (origen de la ficha). dav_uid se deriva de fuente.
PERSON_OWNED = (
"nombre",
"aliases",
"sexo",
"fecha_nacimiento",
"dni",
"telefono",
"email",
"direccion",
"telefonos",
"emails",
"direcciones",
"pais",
"contexto",
"fuente",
"dav_uid",
"tags",
)
# Claves de control del frontmatter que no van a extra_fm (ni son OWNED): las
# gestiona el propio ingest o son metadata de la nota.
PERSON_CONTROL = ("slug", "tipo", "note_path")
# Orden de columnas de la tabla persons (debe casar con migrations 001 + 002).
PERSON_COLUMNS = (
"slug",
"note_path",
"nombre",
"aliases",
"sexo",
"fecha_nacimiento",
"dni",
"telefono",
"email",
"direccion",
"pais",
"contexto",
"fuente",
"dav_uid",
"tags",
"updated_at",
"telefonos",
"emails",
"direcciones",
"extra_fm",
)
def _norm(value):
"""Normaliza 'null'/''/None del frontmatter a None real."""
if value is None:
return None
if isinstance(value, str) and value.strip().lower() in ("null", "none", ""):
return None
return value
def _as_str(value):
"""Convierte un valor de frontmatter a str (o None), sin perder números."""
v = _norm(value)
return None if v is None else str(v)
def _as_list(value) -> list:
"""Convierte un valor de frontmatter a lista (los escalares se envuelven)."""
v = _norm(value)
if v is None:
return []
return v if isinstance(v, list) else [v]
def _multi(fm: dict, plural: str, singular: str) -> list:
"""Lista multi-valor desde el frontmatter: prioriza la clave plural.
Si el frontmatter trae la clave plural (telefonos/emails/direcciones) la
usa; si no, envuelve el singular (telefono/email/direccion) en lista. Cada
valor se normaliza a str y se descartan los vacíos.
"""
raw = _as_list(fm.get(plural)) or _as_list(fm.get(singular))
out = []
for item in raw:
s = _as_str(item)
if s:
out.append(s)
return out
def _json(value) -> str:
"""Serializa un valor a JSON compacto (sin escapar acentos)."""
return json.dumps(value, ensure_ascii=False, default=str)
def _dav_uid_from_fuente(fuente) -> str | None:
"""Extrae el UID de Xandikos cuando fuente es 'Xandikos UID <uid>'."""
if not fuente:
return None
m = re.search(r"Xandikos UID\s+(\S+)", str(fuente))
return m.group(1) if m else None
def _extra_fm(fm: dict) -> str:
"""JSON con las claves del frontmatter que NO son OWNED ni de control.
Permite que un re-ingest refresque el frontmatter no estructurado de la
ficha (campos libres que la DB no posee) sin pisar lo que la DB sí posee.
Las claves plurales multi-valor también se excluyen (son OWNED).
"""
skip = set(PERSON_OWNED) | set(PERSON_CONTROL)
extra = {k: v for k, v in fm.items() if k not in skip}
return _json(extra)
def _person_row_from_fm(slug: str, rel_path: str, fm: dict, mtime, base: str) -> dict:
"""Construye la fila completa de persons desde el frontmatter (bootstrap).
Se usa SOLO para fichas cuyo slug aún no existe en la DB: adopta el
frontmatter como valor inicial de los campos OWNED, derivando las listas
multi-valor desde el plural o el singular del frontmatter y rellenando los
singulares con el primer elemento de cada lista.
"""
telefonos = _multi(fm, "telefonos", "telefono")
emails = _multi(fm, "emails", "email")
direcciones = _multi(fm, "direcciones", "direccion")
return {
"slug": slug,
"note_path": rel_path,
"nombre": _as_str(fm.get("nombre")) or base,
"aliases": _json(_as_list(fm.get("aliases"))),
"sexo": _as_str(fm.get("sexo")),
"fecha_nacimiento": _as_str(fm.get("fecha_nacimiento")),
"dni": _as_str(fm.get("dni")),
"telefono": telefonos[0] if telefonos else None,
"email": emails[0] if emails else None,
"direccion": direcciones[0] if direcciones else None,
"pais": _as_str(fm.get("pais")),
"contexto": _as_str(fm.get("contexto")),
"fuente": _as_str(fm.get("fuente")),
"dav_uid": _dav_uid_from_fuente(fm.get("fuente")),
"tags": _json(_as_list(fm.get("tags"))),
"updated_at": mtime,
"telefonos": _json(telefonos),
"emails": _json(emails),
"direcciones": _json(direcciones),
"extra_fm": _extra_fm(fm),
}
def ingest_vault(cfg: Config) -> dict:
"""Escanea el vault completo y reconstruye notes + tablas de entidades.
notes y las entidades de espejo puro (organizations/domains/cases/places)
se reemplazan por completo. persons se ingesta de forma SELECTIVA: las
fichas existentes solo refrescan note_path + extra_fm (conservando los
campos OWNED de la DB), las nuevas se insertan completas (bootstrap).
Devuelve {status:'ok', notes:N, persons:N, persons_inserted:N,
persons_updated:N, organizations:N, domains:N, cases:N, places:N,
skipped_unreadable:N, derived_rebuilt:[...]}.
"""
if not os.path.isdir(cfg.vault_dir):
return {"status": "error", "error": f"vault no encontrado: {cfg.vault_dir}"}
note_rows: list = []
# Filas de espejo puro (todas las entidades menos persons).
mirror_rows: dict = {
table: [] for _, _, table in cfg.entity_folders if table != "persons"
}
# Fichas de persona, deduplicadas por slug, como dicts {slug, fm, ...}.
person_fichas: dict = {}
folder_to_table = {folder: table for folder, _, table in cfg.entity_folders}
skipped = 0
for abs_path in list_obsidian_notes(cfg.vault_dir):
rel_path = os.path.relpath(abs_path, cfg.vault_dir)
base = os.path.splitext(os.path.basename(abs_path))[0]
try:
note = read_obsidian_note(abs_path)
except Exception: # noqa: BLE001 — una nota corrupta no aborta el ingest
skipped += 1
continue
fm = note.get("frontmatter") or {}
mtime = datetime.fromtimestamp(os.path.getmtime(abs_path), tz=timezone.utc)
slug = _as_str(fm.get("slug")) or base
note_rows.append(
[
rel_path,
slug,
_as_str(fm.get("tipo")),
_as_str(fm.get("nombre")) or base,
mtime,
_json(fm),
]
)
# Entidad estructurada: ficha de nivel-1 dentro de una carpeta de
# entidades (personas/<slug>.md, no personas/<slug>/<doc>.md) y que
# no sea una nota de soporte (prefijo _).
top_folder = rel_path.split(os.sep)[0]
is_level1 = os.path.basename(os.path.dirname(abs_path)) == top_folder
if top_folder in folder_to_table and is_level1 and not base.startswith("_"):
table = folder_to_table[top_folder]
if table == "persons":
# Gana la primera ficha vista con cada slug (respeta la PK).
if slug not in person_fichas:
person_fichas[slug] = {
"slug": slug,
"rel_path": rel_path,
"fm": fm,
"mtime": mtime,
"base": base,
}
else:
mirror_rows[table].append(
[
slug,
rel_path,
_as_str(fm.get("nombre")) or base,
_json(_as_list(fm.get("tags"))),
_json(fm),
mtime,
]
)
with write_conn(cfg.db_path) as conn:
conn.execute("BEGIN")
try:
conn.execute("DELETE FROM notes")
if note_rows:
conn.executemany(
"INSERT INTO notes VALUES (?, ?, ?, ?, ?, ?)", note_rows
)
for table in ("organizations", "domains", "cases", "places"):
conn.execute(f"DELETE FROM {table}")
if mirror_rows[table]:
conn.executemany(
f"INSERT INTO {table} VALUES (?, ?, ?, ?, ?, ?)",
_dedup_by_slug(mirror_rows[table]),
)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
# persons: ingest selectivo via duckdb_upsert (ownership de campos OWNED).
# Se hace fuera de la transacción de espejo puro pero bajo el mismo lock de
# proceso (write_conn ya lo liberó): single-writer respetado.
p_inserted, p_updated = _ingest_persons_selective(cfg.db_path, person_fichas)
# Re-enlace de contactos + derivadas, de nuevo bajo el lock de escritura.
with write_conn(cfg.db_path) as conn:
conn.execute("BEGIN")
try:
_link_contacts(conn)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
derived_rebuilt = rebuild_derived(conn)
return {
"status": "ok",
"notes": len(note_rows),
"persons": len(person_fichas),
"persons_inserted": p_inserted,
"persons_updated": p_updated,
"organizations": len(_dedup_by_slug(mirror_rows["organizations"])),
"domains": len(_dedup_by_slug(mirror_rows["domains"])),
"cases": len(_dedup_by_slug(mirror_rows["cases"])),
"places": len(_dedup_by_slug(mirror_rows["places"])),
"skipped_unreadable": skipped,
"derived_rebuilt": derived_rebuilt,
}
def _ingest_persons_selective(db_path: str, person_fichas: dict) -> tuple:
"""Upsert selectivo de persons: existentes solo note_path+extra_fm, nuevas full.
Lee qué slugs existen ya en la DB y reparte las fichas en dos lotes:
- existentes -> duckdb_upsert con update_cols=['note_path','extra_fm'],
de modo que los campos OWNED de la DB no se pisan.
- nuevas -> duckdb_upsert con update_cols=None (full insert de bootstrap).
Devuelve (inserted, updated). Bajo el lock single-writer del proceso:
duckdb_upsert abre su propia conexión, pero DuckDB comparte la instancia
de la base dentro del proceso, así que no hay conflicto de lock.
Raises:
RuntimeError: si algún upsert de persons devuelve status 'error'.
"""
if not person_fichas:
return 0, 0
# Slugs ya presentes en la DB (lectura read-only, sin tocar el writer).
import duckdb
conn = duckdb.connect(db_path, read_only=True)
try:
existing = {row[0] for row in conn.execute("SELECT slug FROM persons").fetchall()}
finally:
conn.close()
existing_rows: list = []
new_rows: list = []
for slug, ficha in person_fichas.items():
full = _person_row_from_fm(
ficha["slug"], ficha["rel_path"], ficha["fm"], ficha["mtime"], ficha["base"]
)
if slug in existing:
# Solo los campos que el vault sigue gobernando para fichas vivas:
# dónde está la nota y el frontmatter no-owned.
existing_rows.append(
{
"slug": full["slug"],
"note_path": full["note_path"],
"extra_fm": full["extra_fm"],
}
)
else:
new_rows.append(full)
inserted = updated = 0
if existing_rows:
res = duckdb_upsert(
db_path,
"persons",
existing_rows,
key_cols=["slug"],
update_cols=["note_path", "extra_fm"],
)
if res.get("status") != "ok":
raise RuntimeError(f"persons upsert (existentes): {res.get('error')}")
updated += res.get("updated", 0)
# Filas "existentes" que en realidad ya no estaban (carrera) se insertan.
inserted += res.get("inserted", 0)
if new_rows:
res = duckdb_upsert(
db_path, "persons", new_rows, key_cols=["slug"], update_cols=None
)
if res.get("status") != "ok":
raise RuntimeError(f"persons upsert (nuevas): {res.get('error')}")
inserted += res.get("inserted", 0)
updated += res.get("updated", 0)
return inserted, updated
def _dedup_by_slug(rows: list) -> list:
"""Quita filas con slug repetido (gana la primera) para respetar la PK."""
seen, out = set(), []
for row in rows:
if row[0] in seen:
continue
seen.add(row[0])
out.append(row)
return out
def ingest_dav(cfg: Config) -> dict:
"""Baja las colecciones de Xandikos y reconstruye contacts + events.
Itera TODAS las libretas CardDAV registradas en la tabla addressbooks (no
solo la colección fija): cada contacto guarda su collection real. Si la
tabla está vacía cae a la colección por defecto de la config. Devuelve
{status:'ok', contacts:N, events:N, addressbooks:[...], calendars:[...],
contacts_linked:N, derived_rebuilt:[...]} o {status:'error', error}.
"""
secret = pass_get_secret(cfg.pass_secret)
if secret.get("status") != "ok":
return {
"status": "error",
"error": f"pass no devolvió el secreto {cfg.pass_secret!r}: "
f"{secret.get('error')}",
}
pwd = secret["value"] # sensible: nunca logear
collections = _addressbook_collections(cfg)
now = datetime.now(tz=timezone.utc)
contact_rows: list = []
seen_uids: set = set()
used_addressbooks: list = []
for collection in collections:
coll = dav_get_collection(
cfg.dav_base, cfg.dav_user, pwd, collection, "vcard"
)
if coll.get("status") != "ok":
return {
"status": "error",
"error": f"CardDAV {collection}: {coll.get('error')} "
f"(http {coll.get('http_status')})",
}
used_addressbooks.append(collection)
for res in coll.get("resources", []):
parsed = davparse.parse_vcard(res.get("data", ""))
uid = parsed["uid"] or os.path.splitext(os.path.basename(res["href"]))[0]
if uid in seen_uids:
continue
seen_uids.add(uid)
contact_rows.append(
[
uid,
collection,
res.get("etag"),
parsed["fn"] or None,
_json(parsed["tels"]),
_json(parsed["emails"]),
res.get("data", ""),
None, # note_path se rellena en el enlace posterior
now,
# Clave de importación determinística: nace con el contacto
# para que los re-imports lo localicen sin match frágil.
contact_import_key(parsed["fn"] or "", parsed["tels"], parsed["emails"]),
]
)
cals = dav_list_calendars(cfg.dav_base, cfg.dav_user, pwd, cfg.dav_calendar_home)
if cals.get("status") != "ok":
return {
"status": "error",
"error": f"CalDAV: {cals.get('error')} (http {cals.get('http_status')})",
}
event_rows: list = []
seen_event_uids: set = set()
calendar_names: list = []
for cal in cals.get("calendars", []):
cal_name = cal.get("name") or cal.get("href", "").strip("/").rsplit("/", 1)[-1]
calendar_names.append(cal_name)
cal_coll = dav_get_collection(
cfg.dav_base, cfg.dav_user, pwd, cal["href"], "ical"
)
if cal_coll.get("status") != "ok":
return {
"status": "error",
"error": f"CalDAV {cal_name}: {cal_coll.get('error')} "
f"(http {cal_coll.get('http_status')})",
}
for res in cal_coll.get("resources", []):
for ev in davparse.parse_ical_events(res.get("data", "")):
uid = ev["uid"] or os.path.splitext(os.path.basename(res["href"]))[0]
if uid in seen_event_uids:
continue
seen_event_uids.add(uid)
event_rows.append(
[
uid,
cal_name,
res.get("etag"),
ev["dtstart"] or None,
ev["dtend"] or None,
ev["all_day"],
ev["summary"] or None,
ev["location"],
ev["rrule"],
ev["raw"],
now,
]
)
with write_conn(cfg.db_path) as conn:
conn.execute("BEGIN")
try:
conn.execute("DELETE FROM contacts")
if contact_rows:
conn.executemany(
"INSERT INTO contacts (uid, collection, etag, fn, tels, "
"emails, raw, note_path, updated_at, import_key) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
contact_rows,
)
conn.execute("DELETE FROM events")
if event_rows:
conn.executemany(
"INSERT INTO events VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
event_rows,
)
linked = _link_contacts(conn)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
derived_rebuilt = rebuild_derived(conn)
return {
"status": "ok",
"contacts": len(contact_rows),
"events": len(event_rows),
"addressbooks": used_addressbooks,
"calendars": calendar_names,
"contacts_linked": linked,
"derived_rebuilt": derived_rebuilt,
}
def _addressbook_collections(cfg: Config) -> list:
"""Colecciones CardDAV a recorrer en el ingest DAV.
Fuente de verdad: la tabla addressbooks de la DB. Si está vacía (o ilegible)
cae a la colección por defecto de la config para no romper el ingest.
"""
try:
import duckdb
conn = duckdb.connect(cfg.db_path, read_only=True)
try:
rows = conn.execute(
"SELECT collection_path FROM addressbooks "
"WHERE collection_path IS NOT NULL ORDER BY slug"
).fetchall()
finally:
conn.close()
collections = [r[0] for r in rows if r[0]]
except Exception: # noqa: BLE001 — tabla ausente/ilegible -> default
collections = []
if not collections:
collections = [cfg.dav_contacts_collection]
return collections
def _link_contacts(conn) -> int:
"""Enlaza contacts.note_path contra las fichas de persons.
Orden de matching por fiabilidad: UID estilo osint-<slug> (creado por el
push del vault), dav_uid registrado en la ficha, teléfono normalizado
(singular o cualquiera de la lista telefonos[]) y por último email (singular
o cualquiera de emails[]). Devuelve el número de contactos enlazados.
"""
persons = conn.execute(
"SELECT slug, note_path, telefono, email, dav_uid, telefonos, emails "
"FROM persons"
).fetchall()
by_slug, by_dav_uid, by_phone, by_email = {}, {}, {}, {}
for slug, note_path, telefono, email, dav_uid, telefonos, emails in persons:
by_slug[slug] = note_path
if dav_uid:
by_dav_uid.setdefault(dav_uid, note_path)
# Teléfonos: singular + todos los de la lista multi-valor.
for tel in _coalesce_values(telefono, telefonos):
key = davparse.norm_phone(tel)
if key:
by_phone.setdefault(key, note_path)
# Emails: singular + todos los de la lista multi-valor.
for em in _coalesce_values(email, emails):
by_email.setdefault(str(em).strip().lower(), note_path)
contacts = conn.execute("SELECT uid, tels, emails FROM contacts").fetchall()
linked = 0
for uid, tels_json, emails_json in contacts:
note_path = None
if uid.startswith("osint-") and uid[len("osint-"):] in by_slug:
note_path = by_slug[uid[len("osint-"):]]
if note_path is None and uid in by_dav_uid:
note_path = by_dav_uid[uid]
if note_path is None:
for tel in json.loads(tels_json or "[]"):
hit = by_phone.get(davparse.norm_phone(tel))
if hit:
note_path = hit
break
if note_path is None:
for em in json.loads(emails_json or "[]"):
hit = by_email.get(str(em).strip().lower())
if hit:
note_path = hit
break
if note_path is not None:
conn.execute(
"UPDATE contacts SET note_path = ? WHERE uid = ?", [note_path, uid]
)
linked += 1
return linked
def _coalesce_values(singular, list_json) -> list:
"""Une el valor singular con la lista JSON multi-valor (sin vacíos ni dups)."""
out, seen = [], set()
candidates = []
if singular:
candidates.append(singular)
try:
parsed = json.loads(list_json) if list_json else []
except (TypeError, ValueError):
parsed = []
if isinstance(parsed, list):
candidates.extend(parsed)
for v in candidates:
s = str(v).strip()
key = s.lower()
if s and key not in seen:
seen.add(key)
out.append(s)
return out