Compare commits

...

4 Commits

Author SHA1 Message Date
egutierrez ec9b70a72a feat(tools): import_contacts_vcf — backfill de import_key + enriquecimiento idempotente desde .vcf
Backfill de la clave de importacion (contact_import_key del registry) de los
contactos existentes + enriquecimiento aditivo desde un .vcf de Google
(telefonos/emails faltantes en contacts, direcciones en la persona enlazada).
Match por import_key con fallback por telefono. No destructivo: solo INSERT/UPDATE,
con assert de conteo intacto. Recupero los campos que el import original descarto.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 11:48:03 +02:00
egutierrez d98127115b test(davx5): script reproducible de verificación DAVx5 ↔ Xandikos
12 checks: auth requerida (401 sin/con credencial mala), TLS Let's Encrypt + no-cleartext,
recepción de los 1065 contactos via PROPFIND+REPORT, integridad TEL/EMAIL de un contacto
conocido. Lee la credencial de pass en runtime (sin secretos hardcodeados). Validado en
emulador Pixel_API34 con DAVx5 4.5.14: recibió los 1065 contactos.
2026-06-13 01:24:36 +02:00
egutierrez db05c58893 docs(duckdb): inversión completada — DuckDB como fuente de verdad
Documenta la inversión implementada el 13/06/2026: ingest selectivo anti-pisado,
multi-valor en persons (634 fichas migradas), libretas (addressbooks), endpoints de
escritura estructurada, consumo desde osint_web tras el flag OSINT_DB_BACKEND (ON), las 5
funciones nuevas del registry, el runbook anti doble-verdad y el runtime systemd
(osint-db.service, Restart=always).
2026-06-13 00:57:14 +02:00
egutierrez fe280ec8ac feat(tools): sync bidireccional vault OSINT <-> Xandikos CardDAV
- sync_dav_to_osint.py (NUEVO): reverse sync Xandikos->vault. Trae contactos
  nuevos del movil (contexto: movil, dedup por tel/email) y ediciones de agenda
  (nombre/tel/email/aliases) PRESERVANDO la capa OSINT (relaciones/dni/contexto/
  fuente/tags). Estado persistente .sync_state.json (UID->etag/vault_mtime).
  Reconciliacion por etag; --dry-run (default) / --apply.
- sync_osint_to_dav.py: anade --check (audit read-only vault<->Xandikos: fichas
  sin vCard, vCards huerfanos, agendas divergentes) y optimiza build_existing_index
  con dav_get_collection (1 REPORT, ~9s->~0.5s) en vez del patron N+1.

Usa las funciones del registry: dav_get_collection, dav_delete_resource,
carddav_put_vcard, obsidian CRUD, pass_get_secret.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-12 00:30:27 +02:00
6 changed files with 2220 additions and 0 deletions
+5
View File
@@ -3,3 +3,8 @@ analysis/*/
vaults/*
!vaults/.gitkeep
!vaults/vault.yaml
# Estado local del sync DAV (per-PC, no secretos pero efimero) y caches.
tools/.sync_state.json
tools/__pycache__/
**/__pycache__/
+250
View File
@@ -0,0 +1,250 @@
# Stack DuckDB: la base de datos como fuente de verdad del project osint
Documento de arquitectura y operación del stack DuckDB + Obsidian del project `osint`.
Construido y verificado end-to-end el 12/06/2026. Sustituye (amplía) la decisión "KISS sin
BD intermedia" del issue 0172: la base DuckDB pasa a ser la **fuente de verdad estructurada**
del ecosistema OSINT, y el vault de Obsidian queda como capa de prosa + vista.
## Visión general
```
┌─────────────────────────────────────────┐
│ osint_db (FastAPI, 127.0.0.1:8771) │
│ dueño ÚNICO de data/osint.duckdb │
Xandikos ───────▶│ /api/ingest/dav │
(CardDAV/CalDAV) │ /api/ingest/vault ◀────── vault osint │
│ /api/query · /api/query/named │
│ /api/render/note ────────▶ notas .md │
└───────────────┬─────────────────────────┘
│ HTTP (requestUrl)
┌───────────────▼─────────────────────────┐
│ Plugin Obsidian "osint-db" │
│ code blocks ```osintdb en notas │
└─────────────────────────────────────────┘
```
Tres piezas:
| Pieza | Dónde | Qué hace |
|---|---|---|
| Service `osint_db` | `projects/osint/apps/osint_db/` | FastAPI local (solo 127.0.0.1, puerto 8771). Dueño único en escritura de `data/osint.duckdb`. Ingesta vault + DAV, sirve queries read-only y renderiza tablas dentro de notas. |
| Plugin `osint-db` | `projects/osint/apps/osint_obsidian_plugin/` | Plugin de Obsidian FINO (TypeScript, sin BD embebida). Ejecuta queries contra el service via HTTP y pinta tablas dentro de las notas. |
| Render headless | Funciones del registry | Proyecta resultados de query como tablas Markdown congeladas dentro de notas (bloques sentinel). Legible en móvil y en cualquier editor, sin plugin. |
## Modelo de datos: tres categorías de tablas
La regla central del diseño, en orden:
### 1. Tablas maestras con referencia a notas (schema `main`)
Fuente de verdad de las entidades estructuradas del vault. Cada fila lleva `note_path`
(path relativo de su nota dentro del vault `~/Obsidian/osint`).
| Tabla | Origen | Clave |
|---|---|---|
| `notes` | índice completo del vault (path, slug, tipo, title, mtime, frontmatter JSON) | `note_path` |
| `persons` | `personas/*.md` (nombre, aliases, sexo, fecha_nacimiento, dni, telefono, email, direccion, pais, contexto, fuente, dav_uid, tags) | `slug` |
| `organizations` | `organizaciones/*.md` | `slug` |
| `domains` | `dominios/*.md` | `slug` |
| `cases` | `casos/*.md` | `slug` |
| `places` | `lugares/*.md` | `slug` |
### 2. Tablas maestras DAV (schema `main`)
Eventos y contactos importados del servidor Xandikos (CardDAV/CalDAV). También fuente de
verdad. `contacts.note_path` enlaza el contacto con su ficha del vault cuando hay match
(por `dav_uid` extraído del campo `fuente` de la ficha, por teléfono o por email); puede
ser NULL (contacto sin ficha — visibles con la named query `contactos_sin_nota`).
| Tabla | Origen | Clave |
|---|---|---|
| `contacts` | colecciones CardDAV (uid, collection, etag, fn, tels, emails, raw, note_path) | `uid` |
| `events` | calendarios CalDAV (uid, calendar, dtstart/dtend, all_day, summary, location, rrule, raw) | `uid` |
### 3. Tablas derivadas (schema `derived`)
Datos computados/extra para consultar desde Obsidian. **REGLA DURA: ninguna tabla de
`derived` lleva columna que referencie notas** (`note_path` prohibido). Se reconstruyen
(DROP + CREATE) en cada ingest. Hay un test que verifica la regla contra
`information_schema`.
| Tabla | Qué |
|---|---|
| `derived.person_stats` | agregados de personas por contexto, país y tag |
| `derived.event_monthly` | conteo de eventos por calendario y mes |
| `derived.contact_link_quality` | contactos enlazados a ficha vs sin enlazar (solo números) |
## Ownership por campo (regla anti two-way-sync)
| Dato | Dueño | Dirección |
|---|---|---|
| Campos estructurados (entidades, contactos, eventos) | DuckDB | DB → nota (render) |
| Prosa libre de cada nota (cuerpo, notas de investigación) | Markdown | nunca se toca desde la DB |
| Frontmatter editado a mano en Obsidian | Markdown | se re-ingesta con `/api/ingest/vault` |
Los bloques generados en notas van entre sentinels y no se editan a mano; el resto de la
nota es del humano.
## Inversión completada (13/06/2026)
La dirección de la verdad quedó invertida: DuckDB es ahora la **fuente de verdad** de los
campos estructurados (personas, contactos, eventos), no un espejo. Cambios implementados:
- **Ingest selectivo (anti-pisado)**: `ingest_vault` ya NO hace DELETE+INSERT ciego sobre
`persons`. Si el `slug` ya existe en la DB, solo actualiza `note_path` + `extra_fm` (vía
`duckdb_upsert` con `update_cols` restringido); los campos OWNED por la DB no se pisan. Una
ficha nueva creada a mano en Obsidian se bootstrapea (INSERT completo). Verificado: un
centinela DB-owned sobrevive a un `POST /api/ingest/vault` sobre las 697 fichas.
- **Multi-valor**: `persons` ganó `telefonos`/`emails`/`direcciones`/`extra_fm` (JSON) —
migración `002_multivalue.sql`, backfill desde los singulares (que se mantienen por compat,
= primer elemento). `contacts.tels`/`emails` ya eran arrays. Las 634 fichas con dato se
materializaron a multi-valor (DB→nota, preservando la prosa).
- **Libretas/agendas de contactos**: tabla `addressbooks` (`003_addressbooks.sql`).
`ingest_dav` itera todas las libretas registradas. Crear libreta CardDAV nueva vía
`dav_make_addressbook` (extended MKCOL).
- **Endpoints de escritura estructurada** (ver tabla API): CRUD de person/contact/event,
addressbook/calendar, `/api/person/{slug}/render` (DB→nota) y `/api/push/dav`
(reconcilia DB→Xandikos). La escritura DB va bajo el write lock; el push DAV y el render
ocurren fuera de la transacción (no bloquean la DB con latencia de red).
- **`osint_web` consume `osint_db`** tras el feature flag `OSINT_DB_BACKEND` (en
`apps/osint_web/dev/feature_flags.json`, hoy **ON**): la vista Contactos lee/escribe contra
el service (DuckDB), con contactos multi-valor y libretas en la UI. Con el flag OFF vuelve
al camino histórico (vault `.md` + vCard Xandikos directo).
- **Funciones del registry nuevas** (grupo `duckdb`/`dav`): `duckdb_execute`, `duckdb_upsert`,
`dav_make_addressbook`, `dav_list_addressbooks`, `build_vcard`.
**Runbook (evitar doble-verdad):** con `OSINT_DB_BACKEND` ON, editar contactos/personas
SOLO por la app (osint_web → osint_db) o por la API de osint_db. No editar el mismo campo a
mano en el `.md` y por la app a la vez; el `.md` es la vista materializada desde la DB.
**Runtime:** `osint_db` corre como systemd user service `osint-db.service`
(`Restart=always`), no como proceso manual. Health: `curl 127.0.0.1:8771/api/health`.
## API del service (contrato)
Todas las respuestas son HTTP 200 con campo `status` en el body (`ok` | `error`); los
clientes parsean el body, no el código HTTP.
| Endpoint | Qué |
|---|---|
| `GET /api/health` | `{"status":"ok","db_path":"...","tables":N}` |
| `GET /api/tables` | catálogo: schema, nombre, kind (master/derived), row_count, columnas |
| `POST /api/query` | `{"sql":"SELECT ...","params":[],"max_rows":500}``{status, columns, rows, row_count, truncated}`. Solo lectura (conexión `read_only=True`). |
| `GET /api/queries` | catálogo de named queries (`server/named_queries.py`) |
| `POST /api/query/named` | `{"name":"personas_por_contexto","max_rows":500}` → misma shape que `/api/query` |
| `POST /api/ingest/vault` | escanea el vault completo, upserta maestras y reconstruye derivadas |
| `POST /api/ingest/dav` | baja colecciones Xandikos, upserta contacts/events, enlaza contactos con fichas |
| `POST /api/render/note` | `{"note_path":"tableros/x.md","block_id":"y","sql":...\|"query":...,"title":...}` → renderiza tabla Markdown dentro del bloque sentinel de la nota (la crea si no existe) |
| `POST/PUT/DELETE /api/person[/{slug}]` | CRUD de `persons` (multi-valor) + materializa la ficha (DB→nota) |
| `POST /api/person/{slug}/render` | DB→nota: escribe el frontmatter OWNED (listas) preservando `extra_fm` y la prosa del cuerpo |
| `POST/PUT/DELETE /api/contact[/{uid}]` | CRUD de `contacts` + push DB→Xandikos (`build_vcard` + `carddav_put_vcard` / `dav_delete_resource`) |
| `POST/PUT/DELETE /api/event[/{uid}]` | CRUD de `events` + push DB→Xandikos (`caldav_put_event` / `dav_delete_resource`) |
| `POST /api/addressbook` | crea una libreta CardDAV (`dav_make_addressbook`) + registra en `addressbooks` |
| `POST /api/calendar` | crea un calendario (`dav_make_calendar`) |
| `POST /api/push/dav` | reconcilia DB→Xandikos en bloque (contacts/events de la DB → servidor) |
Named queries incluidas: `personas_por_contexto`, `personas_recientes`, `eventos_proximos`,
`contactos_sin_nota`, `stats_personas`, `calidad_enlace_contactos`, `eventos_por_mes`.
## Plugin de Obsidian (`osint-db`)
Dentro de cualquier nota del vault, un code block con lenguaje `osintdb`:
````markdown
```osintdb
query: contactos_sin_nota
max_rows: 25
```
````
o SQL crudo:
````markdown
```osintdb
SELECT nombre, telefono, contexto, note_path
FROM persons
ORDER BY updated_at DESC
LIMIT 15
```
````
Comportamiento:
- Directivas soportadas al inicio del bloque: `query:` (named), `max_rows:`, `title:`.
Sin directivas, el bloque entero es SQL crudo contra `/api/query`.
- Las celdas de una columna `note_path` cuyo valor termina en `.md` se pintan como enlace
interno de Obsidian: click → abre la ficha. Así las tablas maestras enlazan a sus notas.
- Botón "Refrescar" por bloque. Errores del service se muestran en un callout; si el
service no responde, mensaje con hint de arranque.
- Settings del plugin: base URL (default `http://127.0.0.1:8771`) y `max_rows` default.
- Comando de paleta: "OSINT DB: insertar bloque de query".
- HTTP siempre via `requestUrl` de Obsidian (evita CORS). Desktop only.
## Render headless (sin plugin, legible en móvil)
Para tablas congeladas que viajan con la nota (sync móvil incluido), el service compone
tres funciones del registry:
1. `duckdb_query_readonly_py_infra` — ejecuta la query con conexión read-only.
2. `render_markdown_table_py_core` — filas → tabla Markdown GFM.
3. `upsert_sentinel_block_py_core` — inserta/reemplaza el bloque entre
`<!-- osintdb:begin id=X -->` y `<!-- osintdb:end id=X -->` de forma idempotente.
Ejemplo real: `tableros/db-personas.md` combina una tabla congelada (sentinel) y bloques
vivos del plugin en la misma nota.
```bash
curl -s -X POST http://127.0.0.1:8771/api/render/note \
-H 'Content-Type: application/json' \
-d '{"note_path":"tableros/db-personas.md","block_id":"personas","query":"personas_por_contexto","title":"Personas por contexto"}'
```
## Operación
```bash
# Arrancar el service (runtime manual, sin systemd por ahora)
cd /home/enmanuel/fn_registry/projects/osint/apps/osint_db
.venv/bin/python server/main.py # flags: --vault --db --port --host
# Re-ingestar (tras editar fichas a mano o tras cambios en Xandikos)
curl -s -X POST http://127.0.0.1:8771/api/ingest/vault
curl -s -X POST http://127.0.0.1:8771/api/ingest/dav
# Desplegar el plugin tras un cambio
cd /home/enmanuel/fn_registry/projects/osint/apps/osint_obsidian_plugin
pnpm build && ./deploy.sh # copia a .obsidian/plugins/osint-db/
```
Activación del plugin: ya está activado headless (id `osint-db` en
`.obsidian/community-plugins.json` del vault). Si Obsidian arranca en Restricted mode la
primera vez, un toggle manual único en Settings → Community plugins.
Credenciales DAV: `pass dav/xandikos-enmanuel` (via `pass_get_secret_py_infra`). La config
de colecciones espeja `tools/sync_dav_to_osint.py`.
## Gotchas
- **Single-writer DuckDB**: solo el service escribe la base. Cualquier otro proceso lee con
`read_only=True` (`duckdb_query_readonly`) o pasa por la API HTTP. Una lectura durante un
ingest puede devolver `status:error` momentáneo por el lock exclusivo; reintentar.
- **Versión del motor**: no abrir el `.duckdb` con CLIs/WASM de versión distinta a la del
venv (1.5.x). El formato de archivo puede divergir entre versiones mayores.
- **`read_only=True` exige que el archivo exista** — no crea bases nuevas.
- Los bloques sentinel no se editan a mano: el siguiente render los pisa. La prosa fuera de
los sentinels nunca se toca.
- Migraciones del schema: archivos numerados `migrations/NNN_*.sql` con tabla `_migrations`
(regla `db_migrations` del registry). Nunca borrar la base para "arreglar" el schema.
## Cifras del primer ingest real (12/06/2026)
- Vault: 1175 notes, 697 persons, 367 organizations, 9 places.
- DAV: 1065 contacts (704 enlazados a ficha, 361 sin nota), 98 events.
## Referencias
- Apps: `apps/osint_db/app.md` (service, e2e_checks) y `apps/osint_obsidian_plugin/app.md`
(build, deploy, uso).
- Grupos de capacidad del registry: `docs/capabilities/duckdb.md` y
`docs/capabilities/obsidian.md` (en el repo `fn_registry`).
- Convenciones del vault: `CONVENTIONS.md` (este project).
- Sync DAV ↔ vault preexistente: `tools/sync_dav_to_osint.py` / `tools/sync_osint_to_dav.py`.
+312
View File
@@ -0,0 +1,312 @@
#!/usr/bin/env python3
"""Importación/enriquecimiento idempotente de contactos desde un .vcf.
Dos operaciones, ambas idempotentes y NO destructivas (solo INSERT/UPDATE
aditivo, nunca DELETE):
1. backfill — calcula la clave de importación determinística (import_key) de
los contactos ya presentes en la DB y la guarda. La clave la
genera la función del registry ``contact_import_key`` a partir
de la identidad estable del contacto (teléfonos normalizados >
emails > nombre normalizado).
2. enrich — lee un .vcf (p.ej. el export de Google) y, para cada tarjeta,
localiza el contacto existente por import_key (rápido) con
fallback por teléfono compartido, y le AÑADE lo que falte:
teléfonos y emails nuevos (en contacts) y direcciones (en la
persona enlazada por note_path, que es de donde el push de
agenda las propaga al móvil). Nunca pisa ni borra datos.
La DB osint.duckdb es single-writer (la posee el service osint_db). Este tool
abre una conexión de lectura para el plan y, solo con --apply, una conexión de
escritura breve mientras el service está inactivo. Hacer backup antes de --apply.
Uso:
python3 import_contacts_vcf.py backfill --dry-run
python3 import_contacts_vcf.py backfill --apply
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --dry-run
python3 import_contacts_vcf.py enrich --vcf ~/Downloads/contacts.vcf --apply
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
import duckdb
# --- Acceso a la función del registry contact_import_key ---------------------
_THIS = os.path.dirname(os.path.abspath(__file__))
_FN_DIR = os.path.normpath(os.path.join(_THIS, "..", "..", "..", "python", "functions"))
if not os.path.isdir(os.path.join(_FN_DIR, "core")):
_FN_DIR = os.path.expanduser("~/fn_registry/python/functions")
sys.path.insert(0, _FN_DIR)
from core.contact_import_key import contact_import_key # noqa: E402
DEFAULT_DB = os.path.join(_THIS, "..", "apps", "osint_db", "data", "osint.duckdb")
# --- Normalización y parseo --------------------------------------------------
def norm_phone(p: str) -> str:
"""Últimos 9 dígitos de un teléfono (mismo criterio que la DB/registry)."""
d = re.sub(r"\D", "", str(p or ""))
return d[-9:] if len(d) >= 9 else d
def _unfold(text: str) -> str:
return re.sub(r"\r?\n[ \t]", "", text)
def _adr_to_text(raw: str) -> str:
"""Dirección legible desde un valor ADR estructurado (7 componentes ';')."""
parts = [p.strip() for p in raw.split(";")]
nonempty = [p for p in parts if p]
if len(parts) >= 3 and parts[2]:
tail = [p for p in parts[3:] if p]
return ", ".join([parts[2]] + tail) if tail else parts[2]
return ", ".join(nonempty)
def parse_vcf(path: str) -> list:
"""Parsea un .vcf a una lista de dicts con los campos de interés por tarjeta.
Cada dict: {fn, tels (valores originales), emails, adrs (texto legible),
bdays}. Los teléfonos/emails se devuelven en su forma original (no
normalizada) para preservar el formato legible al añadirlos.
"""
text = _unfold(open(path, encoding="utf-8", errors="replace").read())
cards = []
for block in re.split(r"(?=BEGIN:VCARD)", text):
if "BEGIN:VCARD" not in block:
continue
fn = re.search(r"^FN:(.+)$", block, re.M)
tels = [t.strip() for t in re.findall(r"^TEL[^:]*:(.+)$", block, re.M) if t.strip()]
emails = [e.strip() for e in re.findall(r"^EMAIL[^:]*:(.+)$", block, re.M) if "@" in e]
adrs = [_adr_to_text(a) for a in re.findall(r"^ADR[^:]*:(.+)$", block, re.M)]
adrs = [a for a in adrs if a]
bdays = [b.strip() for b in re.findall(r"^BDAY[^:]*:(.+)$", block, re.M) if b.strip()]
cards.append(
{
"fn": fn.group(1).strip() if fn else "",
"tels": tels,
"emails": emails,
"adrs": adrs,
"bdays": bdays,
}
)
return cards
# --- Carga de la DB ----------------------------------------------------------
def load_contacts(con) -> list:
"""Filas de contacts como dicts con tels/emails decodificados."""
rows = con.execute(
"SELECT uid, fn, tels, emails, note_path, import_key FROM contacts"
).fetchall()
out = []
for uid, fn, tels, emails, note_path, import_key in rows:
out.append(
{
"uid": uid,
"fn": fn,
"tels": json.loads(tels or "[]"),
"emails": json.loads(emails or "[]"),
"note_path": note_path,
"import_key": import_key,
}
)
return out
def key_of(contact: dict) -> str:
return contact_import_key(contact.get("fn") or "", contact.get("tels") or [], contact.get("emails") or [])
# --- backfill ----------------------------------------------------------------
def cmd_backfill(db_path: str, apply: bool) -> int:
con = duckdb.connect(db_path, read_only=not apply)
try:
contacts = load_contacts(con)
updates = []
collisions = {}
for c in contacts:
k = key_of(c)
collisions.setdefault(k, []).append(c["uid"])
if c["import_key"] != k:
updates.append((k, c["uid"]))
dup = {k: v for k, v in collisions.items() if len(v) > 1}
print(f"contactos: {len(contacts)}")
print(f"import_key a (re)calcular: {len(updates)}")
print(f"claves con colisión (>1 contacto): {len(dup)}")
for k, uids in list(dup.items())[:5]:
print(f" {k}: {uids}")
if apply:
for k, uid in updates:
con.execute("UPDATE contacts SET import_key = ? WHERE uid = ?", [k, uid])
print(f"APLICADO: {len(updates)} import_key escritas")
else:
print("(dry-run: nada escrito; usa --apply)")
return 0
finally:
con.close()
# --- enrich ------------------------------------------------------------------
def build_plan(contacts: list, cards: list) -> list:
"""Plan de enriquecimiento: por contacto existente, qué tel/email/adr añadir.
Match por import_key (rápido) con fallback por teléfono normalizado
compartido. Solo genera entradas con algún cambio real.
"""
by_key = {}
by_phone = {}
for c in contacts:
k = c["import_key"] or key_of(c)
by_key.setdefault(k, c)
for t in c["tels"]:
by_phone.setdefault(norm_phone(t), c)
plan = []
for card in cards:
ck = contact_import_key(card["fn"], card["tels"], card["emails"])
hit = by_key.get(ck)
how = "import_key"
if hit is None:
for t in card["tels"]:
hit = by_phone.get(norm_phone(t))
if hit:
how = "phone"
break
if hit is None:
continue
db_tel_norm = {norm_phone(t) for t in hit["tels"]}
db_em = {e.strip().lower() for e in hit["emails"]}
add_tel = [t for t in card["tels"] if norm_phone(t) and norm_phone(t) not in db_tel_norm]
add_em = [e for e in card["emails"] if e.strip().lower() not in db_em]
# dedup interno preservando orden
add_tel = list(dict.fromkeys(add_tel))
add_em = list(dict.fromkeys(add_em))
if add_tel or add_em or card["adrs"]:
plan.append(
{
"uid": hit["uid"],
"fn": hit["fn"],
"note_path": hit["note_path"],
"match": how,
"add_tel": add_tel,
"add_email": add_em,
"add_adr": card["adrs"],
"new_tels": hit["tels"] + add_tel,
"new_emails": hit["emails"] + add_em,
}
)
return plan
def _person_add_adrs(con, note_path: str, adrs: list) -> int:
"""Añade direcciones a la persona enlazada (sin duplicar). Devuelve cuántas."""
row = con.execute(
"SELECT direcciones, direccion FROM persons WHERE note_path = ?", [note_path]
).fetchone()
if row is None:
return 0
current = json.loads(row[0] or "[]") if row[0] else []
if not current and row[1]:
current = [row[1]]
existing_norm = {re.sub(r"\s+", " ", x).strip().lower() for x in current}
added = [a for a in adrs if re.sub(r"\s+", " ", a).strip().lower() not in existing_norm]
if not added:
return 0
merged = current + added
con.execute(
"UPDATE persons SET direcciones = ?, direccion = ? WHERE note_path = ?",
[json.dumps(merged, ensure_ascii=False), merged[0], note_path],
)
return len(added)
def cmd_enrich(db_path: str, vcf: str, apply: bool) -> int:
cards = parse_vcf(vcf)
con = duckdb.connect(db_path, read_only=not apply)
try:
before = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
contacts = load_contacts(con)
plan = build_plan(contacts, cards)
n_tel = sum(len(p["add_tel"]) for p in plan)
n_em = sum(len(p["add_email"]) for p in plan)
n_adr_targets = sum(1 for p in plan if p["add_adr"] and p["note_path"])
print(f".vcf tarjetas: {len(cards)} contactos DB: {before}")
print(f"contactos a enriquecer: {len(plan)} (+{n_tel} tel, +{n_em} email, "
f"direcciones a {n_adr_targets} personas enlazadas)")
for p in plan[:30]:
ch = []
if p["add_tel"]:
ch.append(f"+{len(p['add_tel'])}tel")
if p["add_email"]:
ch.append(f"+{len(p['add_email'])}email")
if p["add_adr"]:
ch.append("+adr" if p["note_path"] else "+adr(SIN persona)")
print(f" [{p['match']:10}] {(p['fn'] or '?')[:34]:34} {' '.join(ch)}")
if not apply:
print("(dry-run: nada escrito; usa --apply)")
return 0
adr_added = 0
for p in plan:
con.execute(
"UPDATE contacts SET tels = ?, emails = ? WHERE uid = ?",
[
json.dumps(p["new_tels"], ensure_ascii=False),
json.dumps(p["new_emails"], ensure_ascii=False),
p["uid"],
],
)
if p["add_adr"] and p["note_path"]:
adr_added += _person_add_adrs(con, p["note_path"], p["add_adr"])
after = con.execute("SELECT count(*) FROM contacts").fetchone()[0]
assert after == before, f"PÉRDIDA: contactos {before} -> {after}"
print(f"APLICADO: {len(plan)} contactos enriquecidos, {adr_added} direcciones "
f"añadidas a personas. Conteo intacto: {before} == {after}")
return 0
finally:
con.close()
def main(argv=None) -> int:
ap = argparse.ArgumentParser(description=__doc__.split("\n")[0])
sub = ap.add_subparsers(dest="cmd", required=True)
pb = sub.add_parser("backfill", help="calcula y guarda import_key de los contactos")
pb.add_argument("--db", default=DEFAULT_DB)
pb.add_argument("--apply", action="store_true")
pe = sub.add_parser("enrich", help="enriquece contactos desde un .vcf")
pe.add_argument("--vcf", required=True)
pe.add_argument("--db", default=DEFAULT_DB)
pe.add_argument("--apply", action="store_true")
args = ap.parse_args(argv)
db = os.path.abspath(os.path.expanduser(args.db))
if not os.path.exists(db):
print(f"ERROR: DB no existe: {db}", file=sys.stderr)
return 2
if args.cmd == "backfill":
return cmd_backfill(db, args.apply)
if args.cmd == "enrich":
vcf = os.path.expanduser(args.vcf)
if not os.path.exists(vcf):
print(f"ERROR: .vcf no existe: {vcf}", file=sys.stderr)
return 2
return cmd_enrich(db, vcf, args.apply)
return 1
if __name__ == "__main__":
sys.exit(main())
+645
View File
@@ -0,0 +1,645 @@
#!/usr/bin/env python3
"""Sincroniza el servidor CardDAV (Xandikos) HACIA el vault OSINT (sentido
inverso de sync_osint_to_dav.py).
El movil (DAVx5) edita la libreta de contactos de Xandikos: anade contactos
nuevos, corrige telefonos/nombres. Este tool trae esos cambios de vuelta al
vault SIN pisar la capa OSINT (relaciones, dni, contexto, fuente, tags) que es
autoritativa en el lado del vault.
MODELO DE RECONCILIACION
========================
- El vault es la FUENTE DE VERDAD de los campos OSINT.
- Xandikos es la fuente de los cambios de AGENDA (nombre, telefono, email,
aliases) que llegan del movil.
Por cada vCard del servidor decidimos su accion comparando contra:
1. el estado persistente (.sync_state.json): UID -> {etag, vault_slug, ...}
2. el vault (match por telefono/email normalizado, o por slug del UID osint-)
Acciones:
- CREATE : UID nuevo (no en estado) y sin ficha que case por tel/email.
-> crea personas/<slug>.md con contexto: movil.
- UPDATE : el etag cambio respecto al estado (el movil lo edito).
-> actualiza SOLO los campos de agenda (nombre, telefono,
email, aliases), PRESERVANDO los campos OSINT.
- LINK : UID nuevo pero ya hay ficha que case por tel/email (no es un
contacto nuevo, es el mismo que ya esta en el vault).
-> registra el UID en el estado y, si el etag implica edicion,
aplica el UPDATE de agenda. No crea ficha duplicada.
- SKIP : etag igual al del estado -> sin cambios desde el ultimo sync.
last-write-wins por timestamp en campos de agenda: cuando el etag de Xandikos
cambia, el movil gano la ultima escritura de ese contacto -> se aplica al
vault. El push posterior (sync_osint_to_dav --apply, en el DAG) re-emite el
vault ya autoritativo en OSINT.
MODOS
=====
--dry-run (DEFAULT) reporta que crearia/actualizaria. NO escribe el vault ni
el estado.
--apply escribe el vault (create/update notes) y reescribe
.sync_state.json.
Tool de PROYECTO (vive en projects/osint/tools/). NO es funcion del registry,
NO se indexa.
"""
import sys
import os
import re
import json
import time
import argparse
sys.path.insert(0, "/home/enmanuel/fn_registry/python/functions")
from infra.pass_get_secret import pass_get_secret # noqa: E402
from infra.dav_get_collection import dav_get_collection # noqa: E402
from obsidian import ( # noqa: E402
slugify_obsidian_name,
list_obsidian_notes,
read_obsidian_note,
create_obsidian_note,
update_obsidian_note,
)
# --------------------------------------------------------------------------
# Configuracion (espejo de sync_osint_to_dav.py)
# --------------------------------------------------------------------------
OSINT = "/home/enmanuel/Obsidian/osint"
DAV_BASE = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
DAV_USER = "enmanuel"
DAV_COLLECTION = "/enmanuel/contacts/addressbook/"
PASS_SECRET = "dav/xandikos-enmanuel"
STATE_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)),
".sync_state.json")
# Carpetas del vault que representan contactos (mismo set que el push).
SUBFOLDERS = (("personas", "persona"), ("organizaciones", "organizacion"))
# Frontmatter canonico de persona (CONVENTIONS.md 3b), en orden. Las fichas que
# creamos aqui lo respetan campo a campo.
PERSON_CANON = [
"tipo", "nombre", "slug", "aliases", "sexo", "fecha_nacimiento", "dni",
"telefono", "email", "direccion", "pais", "relaciones", "contexto",
"fuente", "tags",
]
# Campos de AGENDA que el movil puede editar y que actualizamos en un UPDATE.
# El resto del frontmatter (campos OSINT) NUNCA se toca en un UPDATE.
AGENDA_FIELDS = ("nombre", "telefono", "email", "aliases")
# --------------------------------------------------------------------------
# Parseo de vCard (entrada)
# --------------------------------------------------------------------------
def _unfold(vcard_text: str) -> str:
"""Deshace el folding de lineas de vCard (continuacion con espacio/tab)."""
return re.sub(r"\r?\n[ \t]", "", vcard_text)
def _vcard_values(vcard_text: str, prop: str) -> list:
"""Devuelve todos los valores de una propiedad (TEL, EMAIL, UID, ...).
Acepta `PROP;PARAMS:valor` y `PROP:valor`. Decodifica los escapes simples de
vCard (\\, , \\; , \\n) en el valor.
"""
vals = []
for line in vcard_text.splitlines():
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
if m:
v = m.group(1).strip()
v = (v.replace("\\n", "\n").replace("\\,", ",")
.replace("\\;", ";").replace("\\\\", "\\"))
v = v.strip()
if v:
vals.append(v)
return vals
def parse_vcard(vcard_text: str) -> dict:
"""Extrae los campos de agenda + OSINT de un vCard.
Devuelve {uid, fn, tels[], emails[], nicknames[], sexo, dni, pais,
bday, categories[], note}. Solo lee; no escribe.
"""
txt = _unfold(vcard_text)
nick_raw = _vcard_values(txt, "NICKNAME")
nicknames = []
for nr in nick_raw:
nicknames.extend([a.strip() for a in nr.split(",") if a.strip()])
cat_raw = _vcard_values(txt, "CATEGORIES")
categories = []
for cr in cat_raw:
categories.extend([c.strip() for c in cr.split(",") if c.strip()])
return {
"uid": (_vcard_values(txt, "UID") or [""])[0],
"fn": (_vcard_values(txt, "FN") or [""])[0],
"tels": _dedup_keep_order(_vcard_values(txt, "TEL")),
"emails": _dedup_keep_order(_vcard_values(txt, "EMAIL")),
"nicknames": _dedup_keep_order(nicknames),
"sexo": (_vcard_values(txt, "X-OSINT-SEXO") or [None])[0],
"dni": (_vcard_values(txt, "X-OSINT-DNI") or [None])[0],
"pais": (_vcard_values(txt, "X-OSINT-PAIS") or [None])[0],
"bday": (_vcard_values(txt, "BDAY") or [None])[0],
"categories": categories,
"note": (_vcard_values(txt, "NOTE") or [""])[0],
}
def _dedup_keep_order(items: list) -> list:
seen, out = set(), []
for it in items:
key = str(it).strip().lower()
if key and key not in seen:
seen.add(key)
out.append(str(it).strip())
return out
def _norm_phone(p) -> str:
"""Normaliza un telefono a sus ultimos 9 digitos (numero nacional ES)."""
if not p:
return ""
d = re.sub(r"\D", "", str(p))
return d[-9:] if len(d) >= 9 else d
# --------------------------------------------------------------------------
# Indice del vault: telefono/email/slug -> ficha existente
# --------------------------------------------------------------------------
def _norm(v):
"""Normaliza 'null'/''/None del frontmatter a None real."""
if v is None:
return None
if isinstance(v, str) and v.strip().lower() in ("null", "none", ""):
return None
return v
def load_vault_index() -> dict:
"""Recorre las fichas del vault y construye los indices de match.
Devuelve {by_phone, by_email, by_slug, by_path, count} donde cada indice
mapea la clave -> dict de la ficha {slug, path, nombre, telefono, email,
aliases, mtime}. by_slug mapea slug -> ficha (para casar UID osint-<slug>).
"""
by_phone, by_email, by_slug, by_path = {}, {}, {}, {}
count = 0
for subfolder, _tipo in SUBFOLDERS:
folder = os.path.join(OSINT, subfolder)
if not os.path.isdir(folder):
continue
for p in list_obsidian_notes(OSINT, subfolder=subfolder):
# Solo fichas de nivel-1 (personas/<slug>.md), no las sub-notas.
if os.path.basename(os.path.dirname(p)) != subfolder:
continue
base = os.path.splitext(os.path.basename(p))[0]
if base.startswith("_"):
continue
try:
nd = read_obsidian_note(p)
except Exception: # noqa: BLE001
continue
fm = nd.get("frontmatter") or {}
slug = _norm(fm.get("slug")) or base
tel = _norm(fm.get("telefono"))
em = _norm(fm.get("email"))
aliases = fm.get("aliases") or []
if not isinstance(aliases, list):
aliases = [aliases]
ficha = {
"slug": slug,
"path": p,
"nombre": _norm(fm.get("nombre")) or base.replace("-", " "),
"telefono": tel,
"email": em,
"aliases": aliases,
"mtime": os.path.getmtime(p),
}
count += 1
by_slug[slug] = ficha
by_path[p] = ficha
if tel:
by_phone.setdefault(_norm_phone(tel), ficha)
if em:
by_email.setdefault(str(em).strip().lower(), ficha)
return {"by_phone": by_phone, "by_email": by_email,
"by_slug": by_slug, "by_path": by_path, "count": count}
def match_vault(parsed: dict, index: dict):
"""Devuelve la ficha del vault que casa con un vCard (o None).
Match por telefono primero (mas fiable), luego email, luego por el slug del
UID si es del estilo osint-<slug> (creado por el push). Devuelve (ficha,
source) con source in {phone, email, slug_uid} o (None, None).
"""
for tel in parsed["tels"]:
hit = index["by_phone"].get(_norm_phone(tel))
if hit:
return hit, "phone"
for em in parsed["emails"]:
hit = index["by_email"].get(str(em).strip().lower())
if hit:
return hit, "email"
uid = parsed.get("uid") or ""
if uid.startswith("osint-"):
hit = index["by_slug"].get(uid[len("osint-"):])
if hit:
return hit, "slug_uid"
return None, None
# --------------------------------------------------------------------------
# Estado persistente
# --------------------------------------------------------------------------
def load_state() -> dict:
"""Carga .sync_state.json. {} si no existe o esta corrupto."""
try:
with open(STATE_PATH, "r", encoding="utf-8") as fh:
data = json.load(fh)
return data if isinstance(data, dict) else {}
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_state(state: dict):
"""Reescribe .sync_state.json de forma atomica (tmp + rename)."""
tmp = STATE_PATH + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(state, fh, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(tmp, STATE_PATH)
# --------------------------------------------------------------------------
# Construccion de fichas y de updates de agenda
# --------------------------------------------------------------------------
def _slug_for(parsed: dict) -> str:
"""Slug estable para una ficha nueva: del FN, o del UID si no hay FN."""
if parsed.get("fn"):
s = slugify_obsidian_name(parsed["fn"])
if s:
return s
return slugify_obsidian_name(parsed.get("uid") or "contacto-movil") or "contacto-movil"
def _unique_slug(slug: str, index: dict, planned_slugs: set) -> str:
"""Evita colision de slug: si ya existe, sufija -2, -3, ..."""
if slug not in index["by_slug"] and slug not in planned_slugs:
return slug
i = 2
while f"{slug}-{i}" in index["by_slug"] or f"{slug}-{i}" in planned_slugs:
i += 1
return f"{slug}-{i}"
def build_new_frontmatter(parsed: dict, slug: str) -> dict:
"""Frontmatter canonico (3b) para una ficha creada desde el movil.
contexto: movil (marca el origen). fuente: el UID de Xandikos. Los campos
OSINT que el vCard NO trae se dejan en null/[] como exige el esquema.
"""
bday = parsed.get("bday")
if bday and re.match(r"^\d{8}$", str(bday)):
bday = f"{bday[:4]}-{bday[4:6]}-{bday[6:8]}"
fm = {
"tipo": "persona",
"nombre": parsed.get("fn") or slug.replace("-", " "),
"slug": slug,
"aliases": parsed.get("nicknames") or [],
"sexo": parsed.get("sexo"),
"fecha_nacimiento": bday,
"dni": parsed.get("dni"),
"telefono": parsed["tels"][0] if parsed["tels"] else None,
"email": parsed["emails"][0] if parsed["emails"] else None,
"direccion": None,
"pais": parsed.get("pais"),
"relaciones": [],
"contexto": "movil",
"fuente": f"Xandikos UID {parsed.get('uid')}".strip(),
"tags": ["persona", "osint", "movil"],
}
# Reordenar segun el canon (las claves extra van al final).
ordered = {k: fm.get(k) for k in PERSON_CANON if k in fm}
for k, v in fm.items():
if k not in ordered:
ordered[k] = v
return ordered
def agenda_update_from_vcard(parsed: dict, ficha: dict) -> dict:
"""Calcula el diff de AGENDA (solo campos de agenda) entre vCard y ficha.
Devuelve {changes: {campo: (viejo, nuevo)}, set_frontmatter: {...}} con SOLO
los campos de agenda que difieren. NUNCA toca campos OSINT. Si no hay cambios
de agenda, changes queda vacio.
"""
changes = {}
new_fm = {}
new_nombre = parsed.get("fn")
if new_nombre and new_nombre != ficha.get("nombre"):
changes["nombre"] = (ficha.get("nombre"), new_nombre)
new_fm["nombre"] = new_nombre
new_tel = parsed["tels"][0] if parsed["tels"] else None
cur_tel = ficha.get("telefono")
if new_tel and _norm_phone(new_tel) != _norm_phone(cur_tel or ""):
changes["telefono"] = (cur_tel, new_tel)
new_fm["telefono"] = new_tel
new_email = parsed["emails"][0] if parsed["emails"] else None
cur_email = ficha.get("email")
if new_email and str(new_email).strip().lower() != str(cur_email or "").strip().lower():
changes["email"] = (cur_email, new_email)
new_fm["email"] = new_email
# aliases: union (el movil puede anadir un NICKNAME nuevo). No borramos los
# que ya hubiera en el vault.
new_nicks = parsed.get("nicknames") or []
cur_aliases = ficha.get("aliases") or []
merged = list(cur_aliases)
added = []
for nk in new_nicks:
if nk not in merged:
merged.append(nk)
added.append(nk)
if added:
changes["aliases"] = (cur_aliases, merged)
new_fm["aliases"] = merged
return {"changes": changes, "set_frontmatter": new_fm}
# --------------------------------------------------------------------------
# Plan: clasificar cada vCard en CREATE / UPDATE / LINK / SKIP
# --------------------------------------------------------------------------
def plan_pull(resources: list, index: dict, state: dict) -> dict:
"""Clasifica cada vCard del servidor. Devuelve {actions, counts}.
Cada accion es un dict con {kind, uid, etag, parsed, ...}. kind in
{create, update, link, skip, skip_empty}.
"""
actions = []
counts = {"total": len(resources), "create": 0, "update": 0,
"link": 0, "skip": 0, "skip_empty": 0}
planned_slugs = set()
for r in resources:
parsed = parse_vcard(r["data"])
uid = parsed.get("uid") or os.path.splitext(
os.path.basename(r["href"]))[0]
etag = r.get("etag")
# vCard sin nada util (ni nombre ni tel ni email) -> ignorar.
if not parsed.get("fn") and not parsed["tels"] and not parsed["emails"]:
counts["skip_empty"] += 1
actions.append({"kind": "skip_empty", "uid": uid, "etag": etag,
"href": r["href"]})
continue
st = state.get(uid)
ficha, match_src = match_vault(parsed, index)
if st is None:
# UID nuevo en el estado.
if ficha is None:
# No casa con nada del vault -> contacto nuevo del movil.
slug = _unique_slug(_slug_for(parsed), index, planned_slugs)
planned_slugs.add(slug)
counts["create"] += 1
actions.append({
"kind": "create", "uid": uid, "etag": etag,
"href": r["href"], "parsed": parsed, "slug": slug,
"frontmatter": build_new_frontmatter(parsed, slug),
})
else:
# Ya hay ficha que casa -> el push ya lo subio o es el mismo
# contacto. Es el LINK BASELINE: registramos el mapeo
# UID -> ficha en el estado SIN tocar el vault. NO aplicamos
# agenda aqui: un match por telefono/email es ambiguo (varios
# contactos del movil comparten numero, o el vault ya dedupeo)
# y el vault es autoritativo en su nombre curado. Los cambios de
# agenda solo fluyen cuando, ya con el UID en el estado, su etag
# CAMBIA en un sync posterior (rama UPDATE de abajo). diff vacio.
counts["link"] += 1
actions.append({
"kind": "link", "uid": uid, "etag": etag,
"href": r["href"], "parsed": parsed,
"ficha": ficha, "match_src": match_src,
"diff": {"changes": {}, "set_frontmatter": {}},
})
else:
# UID ya conocido. Comparar etag.
if st.get("etag") == etag and etag is not None:
counts["skip"] += 1
actions.append({"kind": "skip", "uid": uid, "etag": etag,
"href": r["href"]})
continue
# etag cambio (o no teniamos etag) -> el movil edito. Update agenda.
if ficha is None:
# Conociamos el UID pero la ficha ya no casa por tel/email
# (¿cambio el numero?). Intentar por slug guardado en estado.
saved_slug = st.get("vault_slug")
if saved_slug and saved_slug in index["by_slug"]:
ficha = index["by_slug"][saved_slug]
if ficha is None:
# No encontramos la ficha -> tratar como create (raro).
slug = _unique_slug(_slug_for(parsed), index, planned_slugs)
planned_slugs.add(slug)
counts["create"] += 1
actions.append({
"kind": "create", "uid": uid, "etag": etag,
"href": r["href"], "parsed": parsed, "slug": slug,
"frontmatter": build_new_frontmatter(parsed, slug),
})
else:
diff = agenda_update_from_vcard(parsed, ficha)
counts["update"] += 1
actions.append({
"kind": "update", "uid": uid, "etag": etag,
"href": r["href"], "parsed": parsed,
"ficha": ficha, "diff": diff,
})
return {"actions": actions, "counts": counts}
# --------------------------------------------------------------------------
# Aplicar (solo --apply)
# --------------------------------------------------------------------------
def apply_pull(plan: dict, state: dict, index: dict) -> dict:
"""Ejecuta el plan: crea/actualiza fichas y reescribe el estado."""
res = {"created": 0, "updated": 0, "linked": 0, "errors": []}
now = int(time.time())
for a in plan["actions"]:
kind = a["kind"]
uid = a["uid"]
try:
if kind == "create":
rel = f"personas/{a['slug']}.md"
path = create_obsidian_note(
OSINT, rel, body="\n## Notas\n",
frontmatter=a["frontmatter"], overwrite=False)
res["created"] += 1
state[uid] = {"etag": a["etag"], "vault_slug": a["slug"],
"vault_path": path,
"vault_mtime": os.path.getmtime(path),
"last_sync": now}
elif kind in ("update", "link"):
ficha = a["ficha"]
diff = a["diff"]
if diff["set_frontmatter"]:
update_obsidian_note(
ficha["path"], set_frontmatter=diff["set_frontmatter"])
if kind == "update":
res["updated"] += 1
else:
res["linked"] += 1
else:
if kind == "link":
res["linked"] += 1
mtime = (os.path.getmtime(ficha["path"])
if os.path.exists(ficha["path"]) else now)
state[uid] = {"etag": a["etag"], "vault_slug": ficha["slug"],
"vault_path": ficha["path"],
"vault_mtime": mtime, "last_sync": now}
elif kind in ("skip", "skip_empty"):
# Mantener/actualizar el etag conocido sin tocar el vault.
if kind == "skip":
prev = state.get(uid, {})
prev["etag"] = a["etag"]
prev["last_sync"] = now
state[uid] = prev
except Exception as e: # noqa: BLE001
res["errors"].append({"uid": uid, "kind": kind, "error": str(e)})
return res
# --------------------------------------------------------------------------
# Reporte
# --------------------------------------------------------------------------
def report(plan: dict, index: dict, sample_n: int = 8):
c = plan["counts"]
print("=" * 72)
print("DRY-RUN — sync_dav_to_osint.py (Xandikos CardDAV -> vault OSINT)")
print("=" * 72)
print(f"servidor ........................ {DAV_BASE}")
print(f"coleccion ....................... {DAV_COLLECTION}")
print(f"fichas en el vault .............. {index['count']}")
print("-" * 72)
print(f"vCards en Xandikos .............. {c['total']}")
print(f" CREATE (contacto nuevo movil) {c['create']}")
print(f" UPDATE (movil edito agenda) .. {c['update']}")
print(f" LINK (ya en vault, enlazar) {c['link']}")
print(f" SKIP (sin cambios) ......... {c['skip']}")
print(f" SKIP (vCard vacio) ......... {c['skip_empty']}")
print("=" * 72)
# Muestra de CREATE.
creates = [a for a in plan["actions"] if a["kind"] == "create"]
if creates:
print(f"NUEVAS fichas a crear (muestra {min(sample_n, len(creates))} "
f"de {len(creates)}):")
print("-" * 72)
for a in creates[:sample_n]:
fm = a["frontmatter"]
print(f" + personas/{a['slug']}.md | UID {a['uid']}")
print(f" nombre={fm.get('nombre')!r} tel={fm.get('telefono')!r} "
f"email={fm.get('email')!r} contexto={fm.get('contexto')!r}")
print("=" * 72)
# Muestra de UPDATE con changes reales.
updates = [a for a in plan["actions"]
if a["kind"] in ("update", "link") and a["diff"]["changes"]]
if updates:
print(f"UPDATES de agenda (movil edito) (muestra "
f"{min(sample_n, len(updates))} de {len(updates)}):")
print("-" * 72)
for a in updates[:sample_n]:
f = a["ficha"]
print(f" ~ {f['path'].split('/osint/')[-1]} | UID {a['uid']} "
f"[{a['kind']}]")
for campo, (old, new) in a["diff"]["changes"].items():
print(f" {campo}: {old!r} -> {new!r} (OSINT preservado)")
print("=" * 72)
elif c["update"] or c["link"]:
print("(UPDATE/LINK detectados pero sin cambios de agenda netos)")
print("=" * 72)
# --------------------------------------------------------------------------
# main
# --------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(
description="Sincroniza Xandikos CardDAV -> vault OSINT (reverse). "
"Trae contactos nuevos y ediciones de agenda del movil "
"preservando la capa OSINT del vault.")
ap.add_argument("--apply", action="store_true",
help="Escribe el vault y el estado. Por defecto: dry-run.")
ap.add_argument("--dry-run", action="store_true",
help="No escribe nada (es el comportamiento por defecto; "
"el flag existe para ser explicito).")
ap.add_argument("--sample", type=int, default=8,
help="Numero de fichas a mostrar por categoria en el dry-run.")
args = ap.parse_args()
secret = pass_get_secret(PASS_SECRET)
if secret.get("status") != "ok":
print(f"ERROR: no se pudo leer el secreto '{PASS_SECRET}' de pass: "
f"{secret.get('error')}", file=sys.stderr)
return 1
pwd = secret["value"] # sensible: NUNCA logear
print("Descargando coleccion de Xandikos (1 REPORT)...")
coll = dav_get_collection(DAV_BASE, DAV_USER, pwd, DAV_COLLECTION, "vcard")
if coll.get("status") != "ok":
print(f"ERROR: no se pudo leer la coleccion CardDAV: "
f"{coll.get('error')} (http {coll.get('http_status')})",
file=sys.stderr)
return 1
resources = coll.get("resources", [])
print(f" {len(resources)} vCards descargados.")
print("Indexando el vault OSINT...")
index = load_vault_index()
print(f" {index['count']} fichas indexadas.")
state = load_state()
print(f"Estado previo: {len(state)} UIDs conocidos "
f"({'existe' if os.path.exists(STATE_PATH) else 'nuevo'} .sync_state.json)")
plan = plan_pull(resources, index, state)
report(plan, index, sample_n=args.sample)
if args.apply:
print("\nAPLICANDO (escribiendo el vault + estado)...")
res = apply_pull(plan, state, index)
save_state(state)
print(f"APLICADO: created={res['created']} updated={res['updated']} "
f"linked={res['linked']} errors={len(res['errors'])}")
for e in res["errors"][:10]:
print(f" ERROR uid={e['uid']} [{e['kind']}]: {e['error']}")
print(f"Estado reescrito: {len(state)} UIDs en {STATE_PATH}")
else:
print("\n(dry-run: NO se toco el vault ni el estado. "
"Usa --apply para escribir.)")
return 0
if __name__ == "__main__":
sys.exit(main())
+843
View File
@@ -0,0 +1,843 @@
#!/usr/bin/env python3
"""Sincroniza las fichas del vault OSINT hacia el servidor CardDAV (Xandikos),
enriqueciendo cada vCard con la capa de informacion extra de Obsidian: alias,
relaciones, contexto, direccion, lugares, DNI, fecha de nacimiento, etc.
Los contactos ya estan en Xandikos (vinieron de Google con tel/email). Este
sync AÑADE la capa OSINT a esos vCards (o crea los que falten). Es idempotente
por UID: re-ejecutar no duplica.
Tool de PROYECTO (vive en projects/osint/tools/). NO es funcion del registry,
NO se indexa.
=============================================================================
COMO PERSONALIZAR QUE CAMPOS OSINT VIAJAN AL VCARD -> edita FIELD_MAP
=============================================================================
FIELD_MAP es la unica fuente de verdad del mapeo frontmatter -> vCard. Cada
entrada es una tupla:
(campo_frontmatter, sensible, generador)
- campo_frontmatter : str. Clave del frontmatter de la ficha (p.ej. "dni").
Hay claves SINTETICAS calculadas en build_record()
("lugares", "nombre_completo") que tambien se mapean
aqui: no salen tal cual del YAML pero se exponen al
generador como si fueran un campo mas.
- sensible : bool. True marca el campo como dato sensible que viaja
al movil (DNI, direccion, lugares). El dry-run los lista
EXPLICITAMENTE para que confirmes antes de --apply.
- generador : callable(valor) -> list[str]. Recibe el valor del campo
y devuelve 0..N lineas vCard ya formateadas (sin CRLF).
Devuelve [] para omitir el campo (valor vacio/null).
PARA ACTIVAR / DESACTIVAR UN CAMPO:
- Desactivar: comenta (o borra) su linea en FIELD_MAP. Esa propiedad dejara
de generarse y de viajar al servidor.
- Activar uno nuevo: añade una tupla. El generador recibe el valor crudo del
frontmatter; usa las propiedades estandar vCard 3.0 (FN, N, TEL, EMAIL,
ADR, BDAY, NICKNAME, NOTE, CATEGORIES) o una extension X- propia
(X-OSINT-*) para datos que no tienen propiedad estandar.
REGLAS DE FORMATO QUE EL GENERADOR DEBE RESPETAR:
- Una propiedad NOTE por vCard como maximo es lo limpio: por eso TODOS los
textos largos (relaciones, contexto, notas-body, lugares) se acumulan en
NOTE_PARTS (ver helper note()) y se emiten al final como UNA sola linea
NOTE con saltos escapados (\\n). No emitas varias lineas NOTE sueltas.
- Escapa SIEMPRE los valores con vcard_escape() (\\ , ; , : de mas, saltos de
linea). Los generadores de abajo ya lo hacen.
- El orden de FIELD_MAP es el orden en que aparecen las propiedades en el
vCard resultante (salvo NOTE, que siempre va al final).
EJEMPLO — añadir el pais como propiedad X-:
("pais", False, lambda v: [f"X-OSINT-PAIS:{vcard_escape(v)}"] if v else []),
EJEMPLO — desactivar el envio del DNI al movil:
# ("dni", True, lambda v: [f"X-OSINT-DNI:{vcard_escape(v)}"] if v else []),
=============================================================================
"""
import sys
import os
import re
import argparse
sys.path.insert(0, "/home/enmanuel/fn_registry/python/functions")
from infra.pass_get_secret import pass_get_secret # noqa: E402
from infra.dav_list_resources import dav_list_resources # noqa: E402
from infra.dav_get_resource import dav_get_resource # noqa: E402
from infra.dav_get_collection import dav_get_collection # noqa: E402
from infra.carddav_put_vcard import carddav_put_vcard # noqa: E402
from obsidian import ( # noqa: E402
list_obsidian_notes,
read_obsidian_note,
)
# --------------------------------------------------------------------------
# Configuracion
# --------------------------------------------------------------------------
OSINT = "/home/enmanuel/Obsidian/osint"
DAV_BASE = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
DAV_USER = "enmanuel"
DAV_COLLECTION = "/enmanuel/contacts/addressbook/"
PASS_SECRET = "dav/xandikos-enmanuel"
# Carpetas del vault a sincronizar y el tipo que representan.
SUBFOLDERS = (("personas", "persona"), ("organizaciones", "organizacion"))
# Prefijo del UID para fichas que NO casan con un contacto existente de Google.
OSINT_UID_PREFIX = "osint-"
# --------------------------------------------------------------------------
# Helpers de formato vCard
# --------------------------------------------------------------------------
def vcard_escape(value) -> str:
"""Escapa un valor para una propiedad vCard 3.0 (RFC 2426).
Escapa backslash, coma, punto y coma y normaliza los saltos de linea a la
secuencia escapada \\n (un NOTE multilinea valido va en UNA sola linea
logica con \\n literales).
"""
if value is None:
return ""
s = str(value)
s = s.replace("\\", "\\\\")
s = s.replace("\n", "\\n").replace("\r", "")
s = s.replace(",", "\\,").replace(";", "\\;")
return s
def _date_to_bday(value) -> str:
"""Convierte una fecha ISO (YYYY-MM-DD) o un date a BDAY vCard (YYYYMMDD).
Devuelve "" si no parsea. vCard 3.0 acepta tanto YYYY-MM-DD como YYYYMMDD;
usamos el formato basico sin guiones por compatibilidad amplia.
"""
if not value:
return ""
s = str(value).strip()
m = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", s)
if m:
return f"{m.group(1)}{m.group(2)}{m.group(3)}"
m = re.match(r"^(\d{4})(\d{2})(\d{2})$", s)
if m:
return s
return ""
def _clean_relacion(rel: str) -> str:
"""Normaliza una entrada de `relaciones` para mostrarla legible en NOTE.
Las relaciones del vault son del estilo '[[isagri]] — contacto' o
'[[personas/x|Nombre]] — hermano'. Quitamos los corchetes wikilink y la
barra de alias, dejando texto plano legible en el movil.
"""
if not rel:
return ""
s = str(rel)
# [[target|alias]] -> alias ; [[target]] -> target
def _wl(m):
inner = m.group(1)
return inner.split("|", 1)[1] if "|" in inner else inner.split("/")[-1]
s = re.sub(r"\[\[([^\]]+)\]\]", _wl, s)
return s.strip()
# --------------------------------------------------------------------------
# NOTE acumulado: las propiedades de texto largo se juntan en UNA sola NOTE.
# Cada build_vcard() arranca con esta lista vacia (ver build_vcard()).
# --------------------------------------------------------------------------
_NOTE_PARTS: list = []
def note(text: str) -> list:
"""Acumula un fragmento en la NOTE global y devuelve [] (no emite linea).
Los generadores de FIELD_MAP que producen texto descriptivo (relaciones,
contexto, notas del body, lugares) llaman a note() en vez de emitir su
propia linea NOTE: asi el vCard final lleva una unica propiedad NOTE bien
formada con todos los fragmentos separados por \\n.
"""
if text and str(text).strip():
_NOTE_PARTS.append(str(text).strip())
return []
# --------------------------------------------------------------------------
# FIELD_MAP — EDITA AQUI para activar/desactivar campos (ver cabecera).
# Cada tupla: (campo_frontmatter, sensible, generador(valor) -> list[str])
# --------------------------------------------------------------------------
FIELD_MAP = [
# --- Identidad basica ---
# FN es obligatorio en vCard 3.0; el nombre completo de la ficha.
("nombre_completo", False, lambda v: [f"FN:{vcard_escape(v)}"] if v else []),
# N estructurado (apellidos;nombre;;;) — derivado del nombre completo.
("n_struct", False, lambda v: [f"N:{v}"] if v else []),
# Alias / otros nombres por los que aparece -> NICKNAME (CSV escapado).
("aliases", False, lambda v: [
"NICKNAME:" + ",".join(vcard_escape(a) for a in v)
] if v else []),
# --- Contacto (ya suelen estar en el vCard de Google; los reafirmamos) ---
("telefono", False, lambda v: [f"TEL;TYPE=CELL:{vcard_escape(v)}"] if v else []),
("email", False, lambda v: [f"EMAIL;TYPE=INTERNET:{vcard_escape(v)}"] if v else []),
# --- Localizacion ---
# ADR estructurado vCard: ;;<calle>;<ciudad>;;<cp>;<pais>. Metemos la
# direccion completa en el campo street (componente 3) por simplicidad.
("direccion", True, lambda v: [f"ADR;TYPE=HOME:;;{vcard_escape(v)};;;;"] if v else []),
("pais", False, lambda v: [f"X-OSINT-PAIS:{vcard_escape(v)}"] if v else []),
# --- Datos OSINT sin propiedad estandar -> extensiones X- ---
("dni", True, lambda v: [f"X-OSINT-DNI:{vcard_escape(v)}"] if v else []),
("fecha_nacimiento", False, lambda v: (
[f"BDAY:{_date_to_bday(v)}"] if _date_to_bday(v) else []
)),
("sexo", False, lambda v: [f"X-OSINT-SEXO:{vcard_escape(v)}"] if v else []),
("contexto", False, lambda v: note(f"Contexto: {v}") if v else []),
("fuente", False, lambda v: note(f"Fuente: {v}") if v else []),
# --- Texto descriptivo -> se acumula en la NOTE unica ---
("relaciones", False, lambda v: note(
"Relaciones: " + "; ".join(_clean_relacion(r) for r in v if _clean_relacion(r))
) if v else []),
# Lugares: sintetico (extraido de la seccion ## Lugares del body). Sensible.
("lugares", True, lambda v: note(
"Lugares: " + "; ".join(v)
) if v else []),
# Notas libres del body (seccion ## Notas), si las hay.
("notas_body", False, lambda v: note(v) if v else []),
# --- Categoria para agruparlos en la libreta del movil ---
("tags", False, lambda v: [
"CATEGORIES:" + ",".join(vcard_escape(t) for t in v)
] if v else []),
]
# Conjunto de campos marcados sensibles (para el reporte de privacidad).
SENSITIVE_FIELDS = sorted({campo for campo, sensible, _ in FIELD_MAP if sensible})
# --------------------------------------------------------------------------
# Extraccion de datos sinteticos del body de la ficha
# --------------------------------------------------------------------------
def _section(body: str, header: str) -> str:
"""Devuelve el texto crudo de una seccion `## <header>` del body (hasta la
siguiente cabecera `## ` o el final). "" si la seccion no existe o vacia.
"""
if not body:
return ""
pat = re.compile(rf"^##\s+{re.escape(header)}\s*$(.*?)(?=^##\s|\Z)",
re.MULTILINE | re.DOTALL)
m = pat.search(body)
return m.group(1).strip() if m else ""
def _extract_lugares(body: str) -> list:
"""Extrae los lugares de la seccion `## Lugares` como texto plano legible.
Las lineas son del estilo:
- [[lugares/calle-x|Calle X 1 Almachar Malaga]]
Devolvemos el alias legible (lo de despues del |) o el target slug.
"""
sec = _section(body, "Lugares")
out = []
for line in sec.splitlines():
line = line.strip().lstrip("-").strip()
if not line:
continue
m = re.search(r"\[\[([^\]]+)\]\]", line)
if m:
inner = m.group(1)
disp = inner.split("|", 1)[1] if "|" in inner else inner.split("/")[-1]
out.append(disp.strip())
elif line:
out.append(line)
return out
def _extract_notas(body: str) -> str:
"""Extrae el texto libre de la seccion `## Notas` (si tiene contenido)."""
sec = _section(body, "Notas")
# Limpiar wikilinks/embeds del texto de notas para que sea legible plano.
sec = re.sub(r"!?\[\[([^\]]+)\]\]",
lambda m: (m.group(1).split("|", 1)[1] if "|" in m.group(1)
else m.group(1).split("/")[-1]),
sec)
return sec.strip()
def _n_struct_from_nombre(nombre: str) -> str:
"""Construye el campo N (apellidos;nombre;;;) heuristico desde el nombre.
vCard N = Family;Given;Additional;Prefix;Suffix. Heuristica simple para
nombres espanoles: primer token = given, resto = family. No es perfecto
(apellidos compuestos) pero es suficiente para ordenar en el movil; el FN
sigue siendo el nombre completo canonico.
"""
if not nombre:
return ""
parts = [p for p in str(nombre).split() if p]
if not parts:
return ""
given = vcard_escape(parts[0])
family = vcard_escape(" ".join(parts[1:])) if len(parts) > 1 else ""
return f"{family};{given};;;"
# --------------------------------------------------------------------------
# Construccion del "record" enriquecido por ficha + del vCard
# --------------------------------------------------------------------------
def build_record(note_data: dict, tipo: str) -> dict:
"""Aplana una ficha (frontmatter + body) en un dict de campos para FIELD_MAP.
Incluye los campos del frontmatter mas los sinteticos:
- nombre_completo : el nombre canonico (FN).
- n_struct : el campo N estructurado.
- lugares : lista extraida de ## Lugares.
- notas_body : texto libre de ## Notas.
Normaliza los 'null' string/None a None y deja las listas vacias como [].
"""
fm = dict(note_data.get("frontmatter") or {})
body = note_data.get("body") or ""
def _norm(v):
if v is None:
return None
if isinstance(v, str) and v.strip().lower() in ("null", "none", ""):
return None
return v
nombre = _norm(fm.get("nombre")) or os.path.splitext(
os.path.basename(note_data["path"]))[0].replace("-", " ")
rec = {k: _norm(v) for k, v in fm.items()}
# Asegurar listas para los campos lista (evita None en los generadores).
for list_field in ("aliases", "relaciones", "tags"):
val = rec.get(list_field)
if val is None:
rec[list_field] = []
elif not isinstance(val, list):
rec[list_field] = [val]
rec["nombre_completo"] = nombre
rec["n_struct"] = _n_struct_from_nombre(nombre)
rec["lugares"] = _extract_lugares(body)
rec["notas_body"] = _extract_notas(body)
rec["_tipo"] = tipo
rec["_slug"] = _norm(fm.get("slug")) or os.path.splitext(
os.path.basename(note_data["path"]))[0]
rec["_path"] = note_data["path"]
return rec
def build_vcard(rec: dict, uid: str) -> dict:
"""Genera el texto vCard 3.0 de un record aplicando FIELD_MAP.
Devuelve {text, fields_emitted} donde fields_emitted es el conjunto de
campos de FIELD_MAP que produjeron alguna linea (incluye los sensibles que
viajaron). NOTE se emite UNA sola vez al final con todos los fragmentos.
"""
global _NOTE_PARTS
_NOTE_PARTS = [] # reset del acumulador de NOTE para este vCard
lines = ["BEGIN:VCARD", "VERSION:3.0"]
fields_emitted = set()
for campo, _sensible, gen in FIELD_MAP:
value = rec.get(campo)
try:
produced = gen(value)
except Exception: # noqa: BLE001 — un generador no debe tumbar el sync
produced = []
if produced:
lines.extend(produced)
fields_emitted.add(campo)
# Emitir la NOTE acumulada (relaciones + contexto + fuente + lugares + notas).
if _NOTE_PARTS:
joined = "\\n".join(vcard_escape(p) for p in _NOTE_PARTS)
lines.append(f"NOTE:{joined}")
fields_emitted.add("NOTE")
lines.append(f"UID:{uid}")
lines.append("END:VCARD")
text = "\r\n".join(lines) + "\r\n"
return {"text": text, "fields_emitted": fields_emitted}
# --------------------------------------------------------------------------
# Indice de dedup: telefono/email -> UID de un vCard ya existente en Xandikos
# --------------------------------------------------------------------------
def _norm_phone(p) -> str:
"""Normaliza un telefono a sus ultimos 9 digitos (numero nacional ES).
Quita espacios, prefijos +34/0034 y separadores. Dos formatos distintos del
mismo numero ('+34 680 43 88 89' y '680438889') colapsan a la misma clave.
"""
if not p:
return ""
d = re.sub(r"\D", "", str(p))
return d[-9:] if len(d) >= 9 else d
def _vcard_prop_values(text: str, prop: str) -> list:
"""Extrae los valores de una propiedad (TEL, EMAIL, UID...) de un vCard."""
out = []
for line in text.splitlines():
m = re.match(rf"^(?:item\d+\.)?{prop}(?:;[^:]*)?:(.*)$", line, re.IGNORECASE)
if m:
v = m.group(1).strip()
if v:
out.append(v)
return out
def build_existing_index(base, user, pwd, collection, *, verbose=True) -> dict:
"""Descarga los vCards existentes de Xandikos y construye los indices de dedup.
Devuelve {phone: {key->uid}, email: {key->uid}, total: int, fetched: int,
errors: int}. Las claves son telefono normalizado (9 digitos) y email en
minusculas. Cada clave apunta al UID del PRIMER vCard que la declara.
Usa dav_get_collection (1 sola peticion REPORT con el contenido inline) en
vez del patron N+1 (PROPFIND + un GET por recurso): para ~1064 contactos baja
de ~9s a ~1s, lo que mantiene el step PUSH del DAG dentro de su timeout.
"""
coll = dav_get_collection(base, user, pwd, collection, "vcard")
if coll.get("status") != "ok":
return {"phone": {}, "email": {}, "total": 0, "fetched": 0,
"errors": 1, "error": coll.get("error")}
resources = coll.get("resources", [])
phone_idx, email_idx = {}, {}
fetched = errors = 0
for r in resources:
text = r.get("data") or ""
if not text:
errors += 1
continue
fetched += 1
uid = (_vcard_prop_values(text, "UID") or [""])[0]
if not uid:
# UID derivado del nombre del recurso si el vCard no lo declara.
uid = os.path.splitext(os.path.basename(r["href"]))[0]
for tel in _vcard_prop_values(text, "TEL"):
k = _norm_phone(tel)
if k:
phone_idx.setdefault(k, uid)
for em in _vcard_prop_values(text, "EMAIL"):
k = em.strip().lower()
if k:
email_idx.setdefault(k, uid)
if verbose:
print(f" indice Xandikos: {fetched}/{len(resources)} vCards leidos, "
f"{len(phone_idx)} telefonos, {len(email_idx)} emails, "
f"{errors} errores de lectura")
return {"phone": phone_idx, "email": email_idx,
"total": len(resources), "fetched": fetched, "errors": errors}
def resolve_uid(rec: dict, index: dict) -> dict:
"""Resuelve el UID a usar para un record: reusa el existente o crea osint-<slug>.
Devuelve {uid, source} donde source es 'phone', 'email' o 'new'. Match por
telefono primero (mas fiable que email para estos contactos), luego email.
"""
tel = rec.get("telefono")
em = rec.get("email")
if tel:
hit = index["phone"].get(_norm_phone(tel))
if hit:
return {"uid": hit, "source": "phone"}
if em:
hit = index["email"].get(str(em).strip().lower())
if hit:
return {"uid": hit, "source": "email"}
return {"uid": OSINT_UID_PREFIX + rec["_slug"], "source": "new"}
# --------------------------------------------------------------------------
# Orquestacion: planificar todas las fichas
# --------------------------------------------------------------------------
def load_fichas() -> list:
"""Carga todas las fichas de las carpetas configuradas como (note_data, tipo)."""
out = []
for subfolder, tipo in SUBFOLDERS:
for p in list_obsidian_notes(OSINT, subfolder=subfolder):
# Solo fichas de nivel-1 (personas/<slug>.md). Excluye las sub-notas de
# documentos (personas/<slug>/dni.md, fotos.md, ...) que no son contactos.
if os.path.basename(os.path.dirname(p)) != subfolder:
continue
base = os.path.splitext(os.path.basename(p))[0]
if base.startswith("_"):
continue
try:
nd = read_obsidian_note(p)
except Exception: # noqa: BLE001
continue
out.append((nd, tipo))
return out
def plan_sync(index: dict) -> dict:
"""Construye el plan completo: un vCard enriquecido por ficha + su UID.
Devuelve {records: [...], counts: {...}} donde cada record del plan lleva
{rec, uid, uid_source, vcard, fields_emitted, sensitive_emitted}.
"""
fichas = load_fichas()
plan = []
counts = {
"fichas_total": len(fichas),
"persona": 0, "organizacion": 0,
"uid_reused_phone": 0, "uid_reused_email": 0, "uid_new": 0,
}
sensitive_carrying = {f: 0 for f in SENSITIVE_FIELDS}
for note_data, tipo in fichas:
rec = build_record(note_data, tipo)
counts[tipo] = counts.get(tipo, 0) + 1
resolved = resolve_uid(rec, index)
uid = resolved["uid"]
built = build_vcard(rec, uid)
emitted = built["fields_emitted"]
sens_here = sorted(f for f in SENSITIVE_FIELDS if f in emitted)
for f in sens_here:
sensitive_carrying[f] += 1
counts_key = {
"phone": "uid_reused_phone",
"email": "uid_reused_email",
"new": "uid_new",
}[resolved["source"]]
counts[counts_key] += 1
plan.append({
"rec": rec,
"uid": uid,
"uid_source": resolved["source"],
"vcard": built["text"],
"fields_emitted": emitted,
"sensitive_emitted": sens_here,
})
return {"records": plan, "counts": counts,
"sensitive_carrying": sensitive_carrying}
# --------------------------------------------------------------------------
# Aplicar (solo --apply)
# --------------------------------------------------------------------------
def apply_sync(plan, base, user, pwd, collection) -> dict:
"""Sube cada vCard del plan a Xandikos via carddav_put_vcard. Idempotente."""
ok = fail = 0
errors = []
for item in plan["records"]:
res = carddav_put_vcard(base, user, pwd, collection,
item["uid"], item["vcard"])
if res.get("status") == "ok":
ok += 1
else:
fail += 1
errors.append({"uid": item["uid"], "error": res.get("error"),
"http_status": res.get("http_status")})
return {"ok": ok, "fail": fail, "errors": errors}
# --------------------------------------------------------------------------
# Reporte dry-run
# --------------------------------------------------------------------------
def report(plan, index, sample_n=5):
c = plan["counts"]
print("=" * 70)
print("DRY-RUN — sync_osint_to_dav.py (vault OSINT -> Xandikos CardDAV)")
print("=" * 70)
print(f"servidor ........................ {DAV_BASE}")
print(f"coleccion ....................... {DAV_COLLECTION}")
print("-" * 70)
print(f"vCards ya en Xandikos ........... {index['total']} "
f"(leidos {index['fetched']}, errores {index['errors']})")
print(f"fichas OSINT a sincronizar ...... {c['fichas_total']}")
print(f" personas ...................... {c.get('persona', 0)}")
print(f" organizaciones ................ {c.get('organizacion', 0)}")
print("-" * 70)
print("Resolucion de UID (dedup contra Xandikos):")
print(f" reusa UID existente (telefono) {c['uid_reused_phone']}")
print(f" reusa UID existente (email) ... {c['uid_reused_email']}")
print(f" UID nuevo (osint-<slug>) ...... {c['uid_new']}")
reused = c['uid_reused_phone'] + c['uid_reused_email']
print(f" => {reused} fichas ENRIQUECEN un contacto existente, "
f"{c['uid_new']} CREAN uno nuevo")
print("=" * 70)
# ---- Privacidad: que campos sensibles viajarian al movil ----
print("PRIVACIDAD — campos SENSIBLES que viajarian al movil con --apply:")
print("-" * 70)
any_sensitive = False
for f in SENSITIVE_FIELDS:
n = plan["sensitive_carrying"].get(f, 0)
if n:
any_sensitive = True
print(f" [SENSIBLE] {f:<12} -> presente en {n} vCard(s)")
if not any_sensitive:
print(" (ningun campo sensible viajaria con el FIELD_MAP actual)")
else:
print("")
print(" Estos datos se escribirian en la libreta de contactos del movil")
print(" (que se sincroniza/respalda fuera de tu control). Revisa el")
print(" FIELD_MAP y comenta los campos que NO quieras enviar antes de")
print(" ejecutar --apply.")
print("=" * 70)
# ---- Campos OSINT no sensibles que tambien viajan ----
emitted_counter = {}
for item in plan["records"]:
for f in item["fields_emitted"]:
emitted_counter[f] = emitted_counter.get(f, 0) + 1
print("Cobertura de campos (cuantos vCards llevan cada propiedad):")
print("-" * 70)
for f in sorted(emitted_counter, key=lambda k: -emitted_counter[k]):
mark = " [SENSIBLE]" if f in SENSITIVE_FIELDS else ""
print(f" {f:<16} {emitted_counter[f]:>4}{mark}")
print("=" * 70)
# ---- Muestra de N vCards completos ----
print(f"MUESTRA de {sample_n} vCards completos (asi se veria el mapeo):")
print("=" * 70)
# Elegir muestras variadas: prioriza las que llevan mas campos OSINT.
enriched = sorted(plan["records"],
key=lambda it: -len(it["fields_emitted"]))
shown = 0
for item in enriched:
rec = item["rec"]
src = {"phone": "reusa UID (tel)", "email": "reusa UID (email)",
"new": "UID nuevo"}[item["uid_source"]]
print(f"--- {rec['nombre_completo']} [{rec['_tipo']}] | {src}: {item['uid']}")
print(item["vcard"].replace("\r\n", "\n").rstrip())
print("")
shown += 1
if shown >= sample_n:
break
print("=" * 70)
# --------------------------------------------------------------------------
# Audit de consistencia (--check) — read-only, vault <-> Xandikos
# --------------------------------------------------------------------------
def _norm_email(e) -> str:
return str(e).strip().lower() if e else ""
def audit_consistency(base, user, pwd, collection) -> dict:
"""Compara vault OSINT <-> Xandikos y devuelve el reporte de drift.
Read-only: descarga la coleccion en 1 REPORT (dav_get_collection) y la cruza
con las fichas del vault. Reporta:
- vault_sin_vcard : fichas del vault sin contraparte en Xandikos
(por telefono/email normalizado, ni por UID osint-).
- vcard_huerfano : vCards del servidor sin ficha en el vault (anadidos
en el movil que aun no se han traido / contactos no-OSINT).
- difiere : contactos que casan pero donde tel/email/alias difieren
entre vault y vCard.
Devuelve {vault_total, vcard_total, vault_sin_vcard:[...], vcard_huerfano:[...],
difiere:[...]}.
"""
coll = dav_get_collection(base, user, pwd, collection, "vcard")
if coll.get("status") != "ok":
return {"error": coll.get("error"), "http_status": coll.get("http_status")}
resources = coll.get("resources", [])
# Indice del servidor: telefono/email -> {uid, fn, tels, emails, nicks, href}
srv_by_phone, srv_by_email, srv_by_uid = {}, {}, {}
srv_records = []
for r in resources:
txt = re.sub(r"\r?\n[ \t]", "", r["data"]) # unfold
uid = (_vcard_prop_values(txt, "UID") or [""])[0] or \
os.path.splitext(os.path.basename(r["href"]))[0]
fn = (_vcard_prop_values(txt, "FN") or [""])[0]
tels = _vcard_prop_values(txt, "TEL")
emails = _vcard_prop_values(txt, "EMAIL")
nicks = []
for nk in _vcard_prop_values(txt, "NICKNAME"):
nicks.extend([a.strip() for a in nk.split(",") if a.strip()])
rec = {"uid": uid, "fn": fn, "tels": tels, "emails": emails,
"nicks": nicks, "href": r["href"], "matched": False}
srv_records.append(rec)
srv_by_uid[uid] = rec
for t in tels:
srv_by_phone.setdefault(_norm_phone(t), rec)
for e in emails:
srv_by_email.setdefault(_norm_email(e), rec)
# Recorrer el vault y cruzar.
vault_total = 0
vault_sin_vcard, difiere = [], []
for note_data, _tipo in load_fichas():
vault_total += 1
fm = note_data.get("frontmatter") or {}
slug = fm.get("slug") or os.path.splitext(
os.path.basename(note_data["path"]))[0]
nombre = fm.get("nombre") or slug.replace("-", " ")
tel = fm.get("telefono")
em = fm.get("email")
aliases = fm.get("aliases") or []
if not isinstance(aliases, list):
aliases = [aliases]
if isinstance(tel, str) and tel.strip().lower() in ("null", "none", ""):
tel = None
if isinstance(em, str) and em.strip().lower() in ("null", "none", ""):
em = None
rec = None
if tel and _norm_phone(tel) in srv_by_phone:
rec = srv_by_phone[_norm_phone(tel)]
elif em and _norm_email(em) in srv_by_email:
rec = srv_by_email[_norm_email(em)]
elif ("osint-" + slug) in srv_by_uid:
rec = srv_by_uid["osint-" + slug]
if rec is None:
vault_sin_vcard.append({"slug": slug, "nombre": nombre,
"telefono": tel, "email": em})
continue
rec["matched"] = True
# Comparar campos de agenda.
diffs = []
if tel and _norm_phone(tel) not in {_norm_phone(t) for t in rec["tels"]}:
diffs.append(("telefono", tel, ", ".join(rec["tels"]) or "-"))
if em and _norm_email(em) not in {_norm_email(e) for e in rec["emails"]}:
diffs.append(("email", em, ", ".join(rec["emails"]) or "-"))
srv_nick_set = {n.lower() for n in rec["nicks"]}
missing_alias = [a for a in aliases if a and a.lower() not in srv_nick_set]
if missing_alias and rec["nicks"]:
diffs.append(("aliases", "; ".join(aliases),
"; ".join(rec["nicks"]) or "-"))
if diffs:
difiere.append({"slug": slug, "nombre": nombre, "uid": rec["uid"],
"diffs": diffs})
vcard_huerfano = [
{"uid": r["uid"], "fn": r["fn"],
"telefono": ", ".join(r["tels"]) or None,
"email": ", ".join(r["emails"]) or None}
for r in srv_records if not r["matched"]
]
return {
"vault_total": vault_total,
"vcard_total": len(srv_records),
"vault_sin_vcard": vault_sin_vcard,
"vcard_huerfano": vcard_huerfano,
"difiere": difiere,
}
def report_audit(a: dict, sample_n: int = 15):
if a.get("error"):
print(f"ERROR audit: {a['error']} (http {a.get('http_status')})",
file=sys.stderr)
return
print("=" * 72)
print("AUDIT de consistencia — vault OSINT <-> Xandikos CardDAV (read-only)")
print("=" * 72)
print(f"fichas en el vault .............. {a['vault_total']}")
print(f"vCards en Xandikos .............. {a['vcard_total']}")
print("-" * 72)
print(f"fichas del vault SIN vCard ...... {len(a['vault_sin_vcard'])}")
print(f"vCards SIN ficha (huerfanos) .... {len(a['vcard_huerfano'])}")
print(f"contactos con AGENDA divergente . {len(a['difiere'])}")
print("=" * 72)
if a["vault_sin_vcard"]:
print(f"FICHAS DEL VAULT SIN vCard (muestra "
f"{min(sample_n, len(a['vault_sin_vcard']))}):")
print("-" * 72)
for f in a["vault_sin_vcard"][:sample_n]:
print(f" - {f['slug']:<36} tel={f['telefono']!r} email={f['email']!r}")
print("=" * 72)
if a["vcard_huerfano"]:
print(f"vCards HUERFANOS (en el movil, sin ficha) (muestra "
f"{min(sample_n, len(a['vcard_huerfano']))}):")
print("-" * 72)
for r in a["vcard_huerfano"][:sample_n]:
print(f" - {r['fn'][:34]:<34} | UID {r['uid']} tel={r['telefono']!r}")
print("=" * 72)
if a["difiere"]:
print(f"AGENDA DIVERGENTE vault vs movil (muestra "
f"{min(sample_n, len(a['difiere']))}):")
print("-" * 72)
for d in a["difiere"][:sample_n]:
print(f" ~ {d['slug']} (UID {d['uid']})")
for campo, vault_v, srv_v in d["diffs"]:
print(f" {campo}: vault={vault_v!r} movil={srv_v!r}")
print("=" * 72)
# --------------------------------------------------------------------------
# main
# --------------------------------------------------------------------------
def main():
ap = argparse.ArgumentParser(
description="Sincroniza fichas OSINT (Obsidian) -> Xandikos CardDAV, "
"enriqueciendo los vCards con la capa OSINT.")
ap.add_argument("--apply", action="store_true",
help="Sube los vCards al servidor. Por defecto: dry-run "
"(no toca el servidor).")
ap.add_argument("--check", action="store_true",
help="Audit de consistencia READ-ONLY: compara vault <-> "
"Xandikos y reporta fichas sin vCard, vCards huerfanos "
"y agendas divergentes. No sube nada.")
ap.add_argument("--sample", type=int, default=5,
help="Numero de vCards completos a mostrar en el dry-run "
"(o de filas por categoria en --check).")
args = ap.parse_args()
secret = pass_get_secret(PASS_SECRET)
if secret.get("status") != "ok":
print(f"ERROR: no se pudo leer el secreto '{PASS_SECRET}' de pass: "
f"{secret.get('error')}", file=sys.stderr)
return 1
pwd = secret["value"] # sensible: NUNCA logear
if args.check:
a = audit_consistency(DAV_BASE, DAV_USER, pwd, DAV_COLLECTION)
report_audit(a, sample_n=max(args.sample, 15))
return 1 if a.get("error") else 0
print("Construyendo indice de contactos ya existentes en Xandikos...")
index = build_existing_index(DAV_BASE, DAV_USER, pwd, DAV_COLLECTION)
if index.get("errors") and index["fetched"] == 0:
print(f"ERROR: no se pudo listar/leer la coleccion CardDAV: "
f"{index.get('error')}", file=sys.stderr)
return 1
plan = plan_sync(index)
report(plan, index, sample_n=args.sample)
if args.apply:
print("\nAPLICANDO (PUT a Xandikos)...")
res = apply_sync(plan, DAV_BASE, DAV_USER, pwd, DAV_COLLECTION)
print(f"APLICADO: ok={res['ok']} fail={res['fail']}")
for e in res["errors"][:10]:
print(f" FALLO uid={e['uid']}: {e['error']} (http {e['http_status']})")
else:
print("\n(dry-run: NO se toco el servidor. Usa --apply para subir los vCards.)")
return 0
if __name__ == "__main__":
sys.exit(main())
+165
View File
@@ -0,0 +1,165 @@
#!/usr/bin/env bash
#
# test_davx5_sync.sh — Protocol-level test of the Xandikos CardDAV server that
# DAVx5 (Android) syncs against. Exercises exactly what DAVx5 does under the
# hood: PROPFIND discovery, addressbook-query REPORT, plus auth and TLS hardening.
#
# This is the reproducible baseline that does NOT need the Android emulator: it
# validates the server contract DAVx5 depends on. The emulator-side verification
# (1065 raw_contacts in the device contacts provider) is documented in the task
# report but is inherently interactive (UI-driven account setup).
#
# Credentials are read from `pass` at runtime — NEVER hardcoded.
# pass entry: dav/xandikos-enmanuel (first line = password)
#
# Usage:
# ./test_davx5_sync.sh # run all checks
# ./test_davx5_sync.sh -v # verbose (show curl details)
#
# Exit code 0 = all checks passed; non-zero = at least one failure.
set -u
# ---- Config -----------------------------------------------------------------
BASE="https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
PRINCIPAL_PATH="/enmanuel/"
ADDRESSBOOK_PATH="/enmanuel/contacts/addressbook/"
USER="enmanuel"
PASS_ENTRY="dav/xandikos-enmanuel"
# Expected number of contacts (vCards) currently served. Allow a tolerance band
# so the test survives small day-to-day changes without going green on a
# catastrophic loss (e.g. empty collection).
EXPECTED_VCARDS=1065
TOLERANCE=50 # accept EXPECTED +/- TOLERANCE
# A known contact that must round-trip with its TEL and EMAIL intact.
KNOWN_NAME="Nieves"
KNOWN_TEL="676 95 90 40"
KNOWN_EMAIL="nieves@gomezdeseguraabogados.com"
VERBOSE=0
[ "${1:-}" = "-v" ] && VERBOSE=1
# ---- Helpers ----------------------------------------------------------------
PASS_COUNT=0
FAIL_COUNT=0
RED=$'\e[31m'; GREEN=$'\e[32m'; YELLOW=$'\e[33m'; RESET=$'\e[0m'
ok() { echo "${GREEN}PASS${RESET} $1"; PASS_COUNT=$((PASS_COUNT+1)); }
fail() { echo "${RED}FAIL${RESET} $1"; FAIL_COUNT=$((FAIL_COUNT+1)); }
info() { [ "$VERBOSE" = 1 ] && echo " $1"; return 0; }
# Read the DAV password from pass into a variable. Never echo it.
DAV_PASS="$(pass "$PASS_ENTRY" 2>/dev/null | head -1)"
if [ -z "$DAV_PASS" ]; then
echo "${RED}ABORT${RESET}: could not read password from 'pass $PASS_ENTRY'"
exit 2
fi
AB_URL="${BASE}${ADDRESSBOOK_PATH}"
HTTP_URL="http://${BASE#https://}${ADDRESSBOOK_PATH}"
PROPFIND_BODY='<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/><d:getetag/></d:prop></d:propfind>'
REPORT_BODY='<?xml version="1.0"?><c:addressbook-query xmlns:d="DAV:" xmlns:c="urn:ietf:params:xml:ns:carddav"><d:prop><d:getetag/><c:address-data/></d:prop></c:addressbook-query>'
echo "=== DAVx5 / Xandikos CardDAV protocol test ==="
echo "Server: $BASE"
echo "Collection: $ADDRESSBOOK_PATH"
echo
# ---- (a) Auth required -------------------------------------------------------
echo "--- (a) Authentication ---"
code=$(curl -s -o /dev/null -w '%{http_code}' -X PROPFIND -H "Depth: 0" "$AB_URL")
[ "$code" = "401" ] && ok "no auth -> 401 (got $code)" || fail "no auth -> expected 401, got $code"
code=$(curl -s -o /dev/null -w '%{http_code}' -u "${USER}:definitely-wrong-password" -X PROPFIND -H "Depth: 0" "$AB_URL")
[ "$code" = "401" ] && ok "bad password -> 401 (got $code)" || fail "bad password -> expected 401, got $code"
code=$(curl -s -o /dev/null -w '%{http_code}' -u "ghost:${DAV_PASS}" -X PROPFIND -H "Depth: 0" "$AB_URL")
# Xandikos returns 401 for an unknown user too.
{ [ "$code" = "401" ] || [ "$code" = "403" ] || [ "$code" = "404" ]; } \
&& ok "unknown user -> $code (rejected)" || fail "unknown user -> expected 401/403/404, got $code"
code=$(curl -s -o /dev/null -w '%{http_code}' -u "${USER}:${DAV_PASS}" -X PROPFIND -H "Depth: 0" "$AB_URL")
[ "$code" = "207" ] && ok "valid auth -> 207 Multi-Status (got $code)" || fail "valid auth -> expected 207, got $code"
echo
# ---- (b) TLS -----------------------------------------------------------------
echo "--- (b) TLS / transport hardening ---"
# Valid cert: curl WITHOUT -k must succeed.
if curl -sf -o /dev/null -u "${USER}:${DAV_PASS}" -X PROPFIND -H "Depth: 0" "$AB_URL"; then
ok "valid TLS chain (no -k needed)"
else
fail "TLS verification failed without -k"
fi
# Cert issuer should be a real CA (Let's Encrypt here), not self-signed.
issuer=$(echo | openssl s_client -connect "${BASE#https://}:443" -servername "${BASE#https://}" 2>/dev/null \
| openssl x509 -noout -issuer 2>/dev/null)
info "issuer: $issuer"
echo "$issuer" | grep -qi "Let's Encrypt" \
&& ok "cert issued by Let's Encrypt" || fail "unexpected cert issuer: $issuer"
# Plain HTTP must NOT serve data in cleartext: expect a redirect to HTTPS (3xx)
# or refusal — never a 200 with contact data.
code=$(curl -s -o /dev/null -w '%{http_code}' --max-time 10 -X PROPFIND -H "Depth: 0" "$HTTP_URL" 2>/dev/null)
case "$code" in
301|302|307|308) ok "plain HTTP -> $code redirect to HTTPS (no cleartext)";;
000) ok "plain HTTP refused/unreachable (no cleartext)";;
200) fail "plain HTTP served 200 in CLEARTEXT — server leaks data over http!";;
*) ok "plain HTTP -> $code (not 200, no cleartext data)";;
esac
echo
# ---- (c) Reception: N contacts via REPORT -----------------------------------
echo "--- (c) Contact reception (DAVx5's sync mechanism) ---"
# PROPFIND Depth:1 -> count .vcf hrefs
pf=$(curl -s -u "${USER}:${DAV_PASS}" -X PROPFIND -H "Depth: 1" \
-H "Content-Type: application/xml" --data "$PROPFIND_BODY" "$AB_URL")
hrefs=$(printf '%s' "$pf" | grep -oE '<[a-zA-Z0-9]+:href>[^<]+\.vcf</[a-zA-Z0-9]+:href>' | wc -l | tr -d ' ')
info "PROPFIND .vcf hrefs: $hrefs"
if [ "$hrefs" -ge $((EXPECTED_VCARDS - TOLERANCE)) ] && [ "$hrefs" -le $((EXPECTED_VCARDS + TOLERANCE)) ]; then
ok "PROPFIND Depth:1 lists $hrefs vCards (expected ~$EXPECTED_VCARDS)"
else
fail "PROPFIND Depth:1 listed $hrefs vCards (expected ~$EXPECTED_VCARDS +/-$TOLERANCE)"
fi
# REPORT addressbook-query -> count BEGIN:VCARD (actual data download)
rep=$(curl -s -u "${USER}:${DAV_PASS}" -X REPORT -H "Depth: 1" \
-H "Content-Type: application/xml" --data "$REPORT_BODY" "$AB_URL")
vcards=$(printf '%s' "$rep" | grep -c 'BEGIN:VCARD')
info "REPORT BEGIN:VCARD count: $vcards"
if [ "$vcards" -ge $((EXPECTED_VCARDS - TOLERANCE)) ] && [ "$vcards" -le $((EXPECTED_VCARDS + TOLERANCE)) ]; then
ok "REPORT addressbook-query returns $vcards vCards (expected ~$EXPECTED_VCARDS)"
else
fail "REPORT returned $vcards vCards (expected ~$EXPECTED_VCARDS +/-$TOLERANCE)"
fi
echo
# ---- (d) Known contact integrity --------------------------------------------
echo "--- (d) Known-contact integrity (TEL + EMAIL) ---"
# Extract the single vCard block that matches KNOWN_NAME and check fields.
block=$(printf '%s' "$rep" | awk -v name="$KNOWN_NAME" '
/BEGIN:VCARD/ {buf=""}
{buf=buf"\n"$0}
/END:VCARD/ { if (buf ~ ("FN:.*"name)) {print buf; exit} }')
if [ -z "$block" ]; then
fail "known contact matching '$KNOWN_NAME' not found in REPORT"
else
ok "known contact '$KNOWN_NAME' present in addressbook"
printf '%s' "$block" | grep -qF "$KNOWN_TEL" \
&& ok " -> TEL '$KNOWN_TEL' intact" || fail " -> TEL '$KNOWN_TEL' MISSING"
printf '%s' "$block" | grep -qiF "$KNOWN_EMAIL" \
&& ok " -> EMAIL '$KNOWN_EMAIL' intact" || fail " -> EMAIL '$KNOWN_EMAIL' MISSING"
fi
echo
# ---- Summary ----------------------------------------------------------------
echo "=== Summary: ${GREEN}${PASS_COUNT} passed${RESET}, ${RED}${FAIL_COUNT} failed${RESET} ==="
[ "$FAIL_COUNT" -eq 0 ] && exit 0 || exit 1