Files
osint_db/server/writes.py
T
egutierrez 9677903ca6 feat(org): contacto-empresa en la agenda con los telefonos de sus personas etiquetados
sync_org_contact_cards crea un contacto de agenda por organizacion (uid
org-<slug>); el push compone su vCard via _org_contact_vcard con un item.TEL +
item.X-ABLabel por persona de contacto (nombre + rol) desde derived.org_contacts.
Asi, al abrir la empresa en el movil, se ven todos los telefonos identificados por
persona. Sin campos OSINT (misma privacidad que el resto de la agenda). Nuevo
endpoint POST /api/org/sync-contact-cards.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-13 12:24:46 +02:00

1529 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Escritura estructurada del service osint_db (DB como fuente de verdad).
Estos helpers implementan los endpoints de escritura de /api/person,
/api/contact, /api/event, /api/addressbook, /api/calendar y /api/push/dav. El
patrón común:
1. Se escribe en la DB DuckDB bajo el lock single-writer del service.
2. El push a Xandikos (CardDAV/CalDAV) y el render DB->nota se hacen DESPUÉS
de cerrar la transacción, para no bloquear la DB con la latencia de red.
persons es dueña de sus campos estructurados (multi-valor): los singulares
telefono/email/direccion se rellenan con el primer elemento de cada lista al
materializar la ficha, y la nota Markdown se reescribe SIN tocar su prosa
(update_obsidian_note con set_frontmatter hace merge del frontmatter y conserva
el body).
"""
from __future__ import annotations
import json
import os
import re
import shutil
import subprocess
import tempfile
import time
from datetime import datetime, timezone
from .config import SENTINEL_MARKER, Config
from .registry_bridge import (
build_vcard,
caldav_put_event,
carddav_put_vcard,
contact_import_key,
create_obsidian_note,
dav_delete_resource,
dav_get_resource,
dav_list_resources,
dav_make_addressbook,
dav_make_calendar,
duckdb_execute,
duckdb_query_readonly,
duckdb_upsert,
pass_get_secret,
read_obsidian_note,
render_markdown_table,
update_obsidian_note,
upsert_sentinel_block,
)
# Columnas de persons gobernadas por la API estructurada (sin slug, que es la
# clave, ni note_path/extra_fm que gestiona el ingest del vault).
_PERSON_API_COLS = (
"nombre",
"aliases",
"sexo",
"fecha_nacimiento",
"dni",
"pais",
"contexto",
"telefonos",
"emails",
"direcciones",
"tags",
)
def _now():
return datetime.now(tz=timezone.utc)
# Clave (slug de persona, uid de contacto/evento) admisible: empieza por
# alfanumérico y solo contiene alfanuméricos y `._-`. Rechaza explícitamente
# separadores de ruta (`/`, `\`), saltos de línea y secuencias `..`, que de otro
# modo entrarían como filas con clave indeleble por la API REST (los `/` rompen
# el routing de DELETE /api/<x>/{key}) y se acercan a paths fuera del vault.
_VALID_KEY_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
def _valid_key(value: str) -> bool:
"""True si ``value`` es una clave admisible (sin `/`, `\\`, `..`, controles)."""
return bool(_VALID_KEY_RE.match(value or "")) and ".." not in value
def _as_list(value) -> list:
"""Normaliza a lista de strings no vacíos (string suelto -> [string])."""
if value is None:
return []
seq = value if isinstance(value, list) else [value]
out = []
for v in seq:
s = str(v).strip()
if s:
out.append(s)
return out
def _json(value) -> str:
return json.dumps(value, ensure_ascii=False, default=str)
def _read_person(db_path: str, slug: str) -> dict | None:
"""Lee una ficha de persons como dict (o None si no existe)."""
res = duckdb_query_readonly(
db_path,
"SELECT slug, note_path, nombre, aliases, sexo, fecha_nacimiento, dni, "
"telefono, email, direccion, pais, contexto, fuente, dav_uid, tags, "
"telefonos, emails, direcciones, extra_fm FROM persons WHERE slug = ?",
[slug],
1,
)
if res.get("status") != "ok" or not res.get("rows"):
return None
return res["rows"][0]
def _decode_json_field(value) -> list:
"""Decodifica un campo JSON de la DB a lista (tolera None/str/list)."""
if value is None:
return []
if isinstance(value, list):
return value
try:
parsed = json.loads(value)
except (TypeError, ValueError):
return []
return parsed if isinstance(parsed, list) else [parsed]
def _decode_extra_fm(value) -> dict:
"""Decodifica extra_fm (objeto JSON de la DB) a dict (o {} si no aplica)."""
if value is None:
return {}
if isinstance(value, dict):
return value
try:
parsed = json.loads(value)
except (TypeError, ValueError):
return {}
return parsed if isinstance(parsed, dict) else {}
# ---------------------------------------------------------------------------
# persons
# ---------------------------------------------------------------------------
def upsert_person(cfg: Config, slug: str, fields: dict, *, render: bool = True) -> dict:
"""Crea/actualiza una persona multi-valor y materializa su nota.
Escribe los campos estructurados en la DB (la DB es dueña), rellena los
singulares con el primer elemento de cada lista, y tras cerrar la escritura
materializa la ficha DB->nota (frontmatter OWNED + merge de extra_fm) sin
tocar la prosa de la nota.
"""
slug = (slug or "").strip()
if not slug:
return {"status": "error", "error": "slug vacío"}
if not _valid_key(slug):
return {"status": "error", "error": f"slug inválido: {slug!r}"}
telefonos = _as_list(fields.get("telefonos"))
emails = _as_list(fields.get("emails"))
direcciones = _as_list(fields.get("direcciones"))
aliases = _as_list(fields.get("aliases"))
tags = _as_list(fields.get("tags"))
existing = _read_person(cfg.db_path, slug)
note_path = (existing.get("note_path") if existing else None) or os.path.join(
"personas", f"{slug}.md"
)
row = {
"slug": slug,
"note_path": note_path,
"nombre": (fields.get("nombre") or slug),
"aliases": _json(aliases),
"sexo": fields.get("sexo"),
"fecha_nacimiento": fields.get("fecha_nacimiento"),
"dni": fields.get("dni"),
"telefono": telefonos[0] if telefonos else None,
"email": emails[0] if emails else None,
"direccion": direcciones[0] if direcciones else None,
"pais": fields.get("pais"),
"contexto": fields.get("contexto"),
"tags": _json(tags),
"telefonos": _json(telefonos),
"emails": _json(emails),
"direcciones": _json(direcciones),
"updated_at": _now(),
}
# update_cols = todo lo que la API gobierna (no pisa fuente/dav_uid/extra_fm
# que pertenecen al ingest del vault).
update_cols = [c for c in row if c not in ("slug",)]
res = duckdb_upsert(
cfg.db_path, "persons", [row], key_cols=["slug"], update_cols=update_cols
)
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
materialized = False
if render:
r = render_person(cfg, slug)
materialized = r.get("status") == "ok"
return {
"status": "ok",
"slug": slug,
"inserted": res.get("inserted", 0),
"updated": res.get("updated", 0),
"note_path": note_path,
"materialized": materialized,
}
def delete_person(cfg: Config, slug: str) -> dict:
"""Borra una ficha de persons de la DB (no borra la nota del vault)."""
slug = (slug or "").strip()
if not slug:
return {"status": "error", "error": "slug vacío"}
res = duckdb_execute(cfg.db_path, "DELETE FROM persons WHERE slug = ?", [slug])
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
return {"status": "ok", "slug": slug, "deleted": res.get("rowcount", 0)}
def render_person(cfg: Config, slug: str) -> dict:
"""Materializa una ficha DB->nota: frontmatter OWNED + extra_fm, sin prosa.
Lee la fila de persons, compone el frontmatter (campos OWNED como listas +
merge de extra_fm) y lo escribe en la nota con update_obsidian_note (que
conserva el body). Si la nota no existe la crea con un body mínimo.
"""
slug = (slug or "").strip()
person = _read_person(cfg.db_path, slug)
if person is None:
return {"status": "error", "error": f"persona desconocida: {slug!r}"}
rel = person.get("note_path") or os.path.join("personas", f"{slug}.md")
if not rel.endswith(".md"):
rel = rel + ".md"
abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel))
vault_real = os.path.realpath(cfg.vault_dir)
if not os.path.realpath(abs_path).startswith(vault_real + os.sep):
return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"}
telefonos = _decode_json_field(person.get("telefonos"))
emails = _decode_json_field(person.get("emails"))
direcciones = _decode_json_field(person.get("direcciones"))
aliases = _decode_json_field(person.get("aliases"))
tags = _decode_json_field(person.get("tags"))
frontmatter = {
"tipo": "persona",
"slug": slug,
"nombre": person.get("nombre") or slug,
"aliases": aliases,
"sexo": person.get("sexo"),
"fecha_nacimiento": person.get("fecha_nacimiento"),
"dni": person.get("dni"),
"telefonos": telefonos,
"emails": emails,
"direcciones": direcciones,
# singulares por compatibilidad con consumidores que aún los leen.
"telefono": telefonos[0] if telefonos else None,
"email": emails[0] if emails else None,
"direccion": direcciones[0] if direcciones else None,
"pais": person.get("pais"),
"contexto": person.get("contexto"),
"fuente": person.get("fuente"),
"tags": tags,
}
# Merge del frontmatter no-owned capturado del vault (no pisa las claves
# OWNED de arriba). extra_fm es un objeto JSON (dict) en la DB.
extra = _decode_extra_fm(person.get("extra_fm"))
if extra:
merged = dict(extra)
merged.update(frontmatter)
frontmatter = merged
try:
if os.path.exists(abs_path):
# set_frontmatter hace merge y NO toca el body (prosa preservada).
update_obsidian_note(abs_path, set_frontmatter=frontmatter)
else:
create_obsidian_note(
cfg.vault_dir,
rel,
body="## Notas\n",
frontmatter=frontmatter,
)
except Exception as e: # noqa: BLE001
return {"status": "error", "error": str(e)}
return {"status": "ok", "slug": slug, "note_path": rel}
# ---------------------------------------------------------------------------
# contacts (DB -> Xandikos)
# ---------------------------------------------------------------------------
def _resolve_password(cfg: Config) -> tuple:
"""Resuelve la password de Xandikos desde pass. Devuelve (pwd|None, error|None)."""
secret = pass_get_secret(cfg.pass_secret)
if secret.get("status") != "ok":
return None, (
f"pass no devolvió el secreto {cfg.pass_secret!r}: {secret.get('error')}"
)
return secret["value"], None
def _default_collection(cfg: Config) -> str:
return cfg.dav_contacts_collection
def _person_agenda_extras(db_path: str, note_path) -> tuple:
"""Devuelve (direcciones, aliases) de la persons enlazada por note_path.
El móvil solo recibe datos de agenda: si el contacto está enlazado a una
ficha de persons, sus direcciones y aliases se incluyen en el vCard (como
ADR y NICKNAME). Los campos OSINT (dni/sexo/fecha_nacimiento/pais/contexto)
NUNCA salen de la DB: no se leen aquí. Devuelve ([], []) si no hay enlace.
"""
if not note_path:
return [], []
res = duckdb_query_readonly(
db_path,
"SELECT direcciones, aliases FROM persons WHERE note_path = ?",
[note_path],
1,
)
if res.get("status") != "ok" or not res.get("rows"):
return [], []
row = res["rows"][0]
return (
_decode_json_field(row.get("direcciones")),
_decode_json_field(row.get("aliases")),
)
def _compose_agenda_vcard(cfg: Config, uid: str, fields: dict) -> str:
"""Compone el vCard de AGENDA de un contacto, SIN ningún campo OSINT.
Incluye FN, TEL×N, EMAIL×N y, si el contacto está enlazado a una ficha de
persons (por note_path), las direcciones (ADR×N) y aliases (NICKNAME) de esa
persona. NUNCA pasa el dict ``osint`` a build_vcard, así que el vCard jamás
lleva líneas X-OSINT-* (DNI/sexo/fecha-nac/país/contexto quedan solo en
DuckDB + Obsidian).
Args:
cfg: configuración del service (db_path para resolver la persona).
uid: UID del contacto.
fields: dict con fn/nombre, tels/telefonos, emails/correos,
direcciones/adrs y, opcionalmente, note_path (enlace a persons).
Returns:
Texto vCard 3.0 listo para PUT a Xandikos.
"""
# Contacto-empresa (uid 'org-<slug>'): el vCard lleva los teléfonos de las
# personas de contacto de la organización, cada uno etiquetado con el nombre
# de la persona (item.X-ABLabel). Lo gestiona _org_contact_vcard.
if str(uid).startswith("org-"):
return _org_contact_vcard(cfg, uid, fields.get("fn") or fields.get("nombre"))
tels = _as_list(fields.get("tels") or fields.get("telefonos"))
emails = _as_list(fields.get("emails") or fields.get("correos"))
fn = fields.get("fn") or fields.get("nombre")
# Direcciones y aliases del contacto explícitos en fields...
adrs = _as_list(fields.get("direcciones") or fields.get("adrs"))
aliases = _as_list(fields.get("aliases"))
# ...más los de la persons enlazada (si la hay), deduplicando.
note_path = fields.get("note_path")
person_adrs, person_aliases = _person_agenda_extras(cfg.db_path, note_path)
adrs = _dedup_keep_order(adrs + person_adrs)
aliases = _dedup_keep_order(aliases + person_aliases)
contact = {
"uid": uid,
"fn": fn,
"tels": tels,
"emails": emails,
"adrs": adrs,
"aliases": aliases,
}
# CLAVE DE PRIVACIDAD: no se pasa 'osint' -> no se emite ninguna X-OSINT-*.
return build_vcard(contact)
def _vc_escape(value) -> str:
"""Escapa un valor de texto para una línea vCard (sin \\r; \\n,;, escapados)."""
return (
str(value)
.replace("\\", "\\\\")
.replace("\r", "")
.replace("\n", "\\n")
.replace(",", "\\,")
.replace(";", "\\;")
)
def _org_contact_vcard(cfg: Config, uid: str, fn) -> str:
"""vCard de una EMPRESA con los teléfonos de sus personas de contacto.
Cada teléfono va en una propiedad agrupada ``item<N>.TEL`` etiquetada con
``item<N>.X-ABLabel`` = "<persona> (<rol>)", leídos de
``derived.org_contacts``. Así, al abrir la empresa en el móvil, se ven todos
los teléfonos identificados por la persona de contacto. Sin ningún campo OSINT
(mismo criterio de privacidad que _compose_agenda_vcard).
"""
slug = uid[len("org-"):] if str(uid).startswith("org-") else str(uid)
name = str(fn).strip() if fn else slug
res = duckdb_query_readonly(
cfg.db_path,
"SELECT persona, rol, telefono FROM derived.org_contacts "
"WHERE org_slug = ? ORDER BY persona, telefono",
[slug],
1000,
)
rows = res.get("rows") or []
lines = [
"BEGIN:VCARD",
"VERSION:3.0",
"UID:%s" % _vc_escape(uid),
"FN:%s" % _vc_escape(name),
"ORG:%s" % _vc_escape(name),
]
seen = set()
i = 0
for r in rows:
tel = str(r.get("telefono") or "").strip()
if not tel or tel in seen:
continue
seen.add(tel)
i += 1
persona = (r.get("persona") or "contacto").strip()
rol = (r.get("rol") or "").strip()
label = "%s (%s)" % (persona, rol) if rol and rol != "contacto" else persona
lines.append("item%d.TEL;TYPE=CELL:%s" % (i, _vc_escape(tel)))
lines.append("item%d.X-ABLabel:%s" % (i, _vc_escape(label)))
lines.append("END:VCARD")
return "\r\n".join(lines) + "\r\n"
def sync_org_contact_cards(cfg: Config) -> dict:
"""Crea/actualiza un contacto de agenda por organización con contactos.
Por cada organización con filas en ``derived.org_contacts`` inserta (o
refresca) una fila en ``contacts`` con uid ``org-<slug>`` y el nombre de la
empresa. El vCard real (con los teléfonos etiquetados por persona) lo compone
el push via ``_org_contact_vcard``. Idempotente y no destructivo. Devuelve el
número de tarjetas-empresa sincronizadas.
"""
orgs = duckdb_query_readonly(
cfg.db_path,
"SELECT DISTINCT d.org_slug, d.org_nombre FROM derived.org_contacts d "
"ORDER BY d.org_slug",
[],
100000,
)
rows = orgs.get("rows") or []
upserted = 0
for o in rows:
slug = o.get("org_slug")
nombre = o.get("org_nombre") or slug
uid = "org-%s" % slug
row = {
"uid": uid,
"collection": cfg.dav_contacts_collection,
"etag": None,
"fn": nombre,
"tels": "[]",
"emails": "[]",
"raw": "",
"note_path": None,
"updated_at": _now(),
"import_key": contact_import_key(nombre, [], []),
}
res = duckdb_upsert(
cfg.db_path,
"contacts",
[row],
key_cols=["uid"],
update_cols=["collection", "fn", "updated_at", "import_key"],
)
if res.get("status") == "ok":
upserted += 1
return {"status": "ok", "org_cards": upserted}
def _dedup_keep_order(items: list) -> list:
"""Deduplica una lista de strings preservando orden (case-insensitive)."""
seen, out = set(), []
for it in items:
s = str(it).strip()
key = s.lower()
if s and key not in seen:
seen.add(key)
out.append(s)
return out
def _resource_href_tail(uid: str) -> str:
"""Nombre del recurso .vcf que carddav_put_vcard deriva del UID."""
return _safe_resource(uid) + ".vcf"
def _read_etag_after_push(cfg: Config, pwd: str, collection: str, uid: str):
"""Lee el etag NUEVO del recurso .vcf de un contacto tras su PUT.
Lista la colección (PROPFIND Depth:1) y busca el recurso cuyo href termina
en el nombre .vcf del uid. Devuelve el etag o None si no se encuentra/falla.
Capturar el etag del push propio evita que el sync inverso lo confunda con
una edición hecha desde el móvil.
"""
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
if listing.get("status") != "ok":
return None
tail = _resource_href_tail(uid)
for res in listing.get("resources", []):
href = res.get("href") or ""
if href.rstrip("/").rsplit("/", 1)[-1] == tail:
return res.get("etag")
return None
def upsert_contact(cfg: Config, uid: str, fields: dict) -> dict:
"""Crea/actualiza un contacto en la DB y lo empuja a Xandikos (PUT vCard).
La escritura DB se hace bajo el lock; el push DAV ocurre después.
"""
uid = (uid or "").strip()
if not uid:
return {"status": "error", "error": "uid vacío"}
if not _valid_key(uid):
return {"status": "error", "error": f"uid inválido: {uid!r}"}
tels = _as_list(fields.get("tels") or fields.get("telefonos"))
emails = _as_list(fields.get("emails") or fields.get("correos"))
fn = fields.get("fn") or fields.get("nombre")
collection = fields.get("collection") or _default_collection(cfg)
# Si el contacto ya existe y está enlazado a una ficha, hereda su note_path
# para que el vCard de agenda incluya las direcciones/aliases de la persona.
note_path = fields.get("note_path")
if note_path is None:
existing = duckdb_query_readonly(
cfg.db_path, "SELECT note_path FROM contacts WHERE uid = ?", [uid], 1
)
if existing.get("status") == "ok" and existing.get("rows"):
note_path = existing["rows"][0].get("note_path")
# vCard de AGENDA: nunca lleva X-OSINT-* (privacidad del móvil).
vcard = _compose_agenda_vcard(cfg, uid, {**fields, "note_path": note_path})
row = {
"uid": uid,
"collection": collection,
"etag": None,
"fn": fn,
"tels": _json(tels),
"emails": _json(emails),
"raw": vcard,
"note_path": note_path,
"updated_at": _now(),
}
res = duckdb_upsert(
cfg.db_path,
"contacts",
[row],
key_cols=["uid"],
update_cols=["collection", "fn", "tels", "emails", "raw", "note_path", "updated_at"],
)
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
# Push DB -> Xandikos fuera de cualquier transacción de la DB.
pwd, err = _resolve_password(cfg)
pushed = None
etag = None
if err is None:
push = carddav_put_vcard(
cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard
)
pushed = push.get("status") == "ok"
if pushed:
# Captura el etag NUEVO del recurso para que el sync inverso no
# confunda este push propio con una edición del móvil.
etag = _read_etag_after_push(cfg, pwd, collection, uid)
if etag:
duckdb_execute(
cfg.db_path,
"UPDATE contacts SET etag = ? WHERE uid = ?",
[etag, uid],
)
return {
"status": "ok",
"uid": uid,
"inserted": res.get("inserted", 0),
"updated": res.get("updated", 0),
"pushed": pushed,
"etag": etag,
"push_error": err,
}
def delete_contact(cfg: Config, uid: str) -> dict:
"""Borra un contacto de la DB y del servidor Xandikos (DELETE del recurso)."""
uid = (uid or "").strip()
if not uid:
return {"status": "error", "error": "uid vacío"}
person = duckdb_query_readonly(
cfg.db_path, "SELECT collection FROM contacts WHERE uid = ?", [uid], 1
)
collection = _default_collection(cfg)
if person.get("status") == "ok" and person.get("rows"):
collection = person["rows"][0].get("collection") or collection
res = duckdb_execute(cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid])
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
# Borrado remoto del recurso .vcf (DESTRUCTIVO, explícito por el endpoint).
pwd, err = _resolve_password(cfg)
deleted_remote = None
if err is None:
resource = collection.rstrip("/") + "/" + _safe_resource(uid) + ".vcf"
rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource)
deleted_remote = rm.get("status") == "ok"
return {
"status": "ok",
"uid": uid,
"deleted": res.get("rowcount", 0),
"deleted_remote": deleted_remote,
"push_error": err,
}
def _safe_resource(uid: str) -> str:
"""Sanea un UID al mismo nombre de recurso que carddav_put_vcard/caldav_put_event."""
import re
return re.sub(r"[^A-Za-z0-9_.-]", "_", uid)[:120]
# ---------------------------------------------------------------------------
# events (DB -> Xandikos)
# ---------------------------------------------------------------------------
def _ical_escape(value) -> str:
"""Escapa un valor de texto para una propiedad iCalendar (RFC 5545).
Evita inyección de propiedades/componentes: un summary/location con saltos de
línea o `;`/`,` no puede cerrar el VEVENT ni abrir otro. El `\\r` se elimina
(el folding lo aporta el `\\r\\n` de la serialización).
"""
return (
str(value)
.replace("\\", "\\\\")
.replace("\r", "")
.replace("\n", "\\n")
.replace(",", "\\,")
.replace(";", "\\;")
)
def _ical_sanitize(value) -> str:
"""Quita saltos de línea de un valor estructurado (UID, RRULE) para evitar
que se inyecten propiedades nuevas. No escapa `;`/`,` porque son separadores
legítimos en RRULE."""
return str(value).replace("\r", "").replace("\n", "")
def _build_vcalendar(uid: str, fields: dict) -> str:
"""Compone un VCALENDAR mínimo con un VEVENT desde los campos del evento."""
dtstart = (fields.get("dtstart") or "").replace("-", "").replace(":", "")
dtend = (fields.get("dtend") or "").replace("-", "").replace(":", "")
lines = [
"BEGIN:VCALENDAR",
"VERSION:2.0",
"PRODID:-//osint_db//events//EN",
"BEGIN:VEVENT",
f"UID:{_ical_sanitize(uid)}",
f"SUMMARY:{_ical_escape(fields.get('summary') or '')}",
]
if dtstart:
lines.append(f"DTSTART:{_ical_sanitize(dtstart)}")
if dtend:
lines.append(f"DTEND:{_ical_sanitize(dtend)}")
if fields.get("location"):
lines.append(f"LOCATION:{_ical_escape(fields['location'])}")
if fields.get("rrule"):
lines.append(f"RRULE:{_ical_sanitize(fields['rrule'])}")
lines += ["END:VEVENT", "END:VCALENDAR"]
return "\r\n".join(lines) + "\r\n"
def upsert_event(cfg: Config, uid: str, fields: dict) -> dict:
"""Crea/actualiza un evento en la DB y lo empuja a Xandikos (PUT VCALENDAR)."""
uid = (uid or "").strip()
if not uid:
return {"status": "error", "error": "uid vacío"}
if not _valid_key(uid):
return {"status": "error", "error": f"uid inválido: {uid!r}"}
calendar = fields.get("calendar") or "default"
raw = _build_vcalendar(uid, fields)
row = {
"uid": uid,
"calendar": calendar,
"etag": None,
"dtstart": fields.get("dtstart"),
"dtend": fields.get("dtend"),
"all_day": bool(fields.get("all_day")),
"summary": fields.get("summary"),
"location": fields.get("location"),
"rrule": fields.get("rrule"),
"raw": raw,
"updated_at": _now(),
}
res = duckdb_upsert(
cfg.db_path,
"events",
[row],
key_cols=["uid"],
update_cols=[
"calendar",
"dtstart",
"dtend",
"all_day",
"summary",
"location",
"rrule",
"raw",
"updated_at",
],
)
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
# El calendario CalDAV destino se resuelve por su path; usamos el calendar
# home + slug del calendario. Push fuera de transacción.
pwd, err = _resolve_password(cfg)
pushed = None
if err is None:
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
push = caldav_put_event(
cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw
)
pushed = push.get("status") == "ok"
return {
"status": "ok",
"uid": uid,
"inserted": res.get("inserted", 0),
"updated": res.get("updated", 0),
"pushed": pushed,
"push_error": err,
}
def delete_event(cfg: Config, uid: str) -> dict:
"""Borra un evento de la DB y del servidor Xandikos."""
uid = (uid or "").strip()
if not uid:
return {"status": "error", "error": "uid vacío"}
row = duckdb_query_readonly(
cfg.db_path, "SELECT calendar FROM events WHERE uid = ?", [uid], 1
)
calendar = "default"
if row.get("status") == "ok" and row.get("rows"):
calendar = row["rows"][0].get("calendar") or calendar
res = duckdb_execute(cfg.db_path, "DELETE FROM events WHERE uid = ?", [uid])
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
pwd, err = _resolve_password(cfg)
deleted_remote = None
if err is None:
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
resource = collection + _safe_resource(uid) + ".ics"
rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource)
deleted_remote = rm.get("status") == "ok"
return {
"status": "ok",
"uid": uid,
"deleted": res.get("rowcount", 0),
"deleted_remote": deleted_remote,
"push_error": err,
}
# ---------------------------------------------------------------------------
# addressbooks / calendars
# ---------------------------------------------------------------------------
def make_addressbook(cfg: Config, fields: dict) -> dict:
"""Crea una libreta CardDAV en Xandikos y la registra en la tabla addressbooks."""
slug = (fields.get("slug") or "").strip()
if not slug:
return {"status": "error", "error": "slug vacío"}
if not _valid_key(slug):
return {"status": "error", "error": f"slug inválido: {slug!r}"}
display_name = fields.get("display_name") or ""
description = fields.get("description") or ""
color = fields.get("color") or ""
pwd, err = _resolve_password(cfg)
if err is not None:
return {"status": "error", "error": err}
# contacts_home = el directorio padre de la colección por defecto
# (/enmanuel/contacts/addressbook/ -> /enmanuel/contacts/).
contacts_home = "/" + "/".join(
cfg.dav_contacts_collection.strip("/").split("/")[:-1]
)
if not contacts_home.endswith("/"):
contacts_home += "/"
mk = dav_make_addressbook(
cfg.dav_base,
cfg.dav_user,
pwd,
contacts_home,
slug,
display_name,
description,
)
if mk.get("status") != "ok":
return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")}
collection_path = mk.get("href") or (contacts_home + slug + "/")
row = {
"slug": slug,
"display_name": display_name or slug,
"collection_path": collection_path,
"description": description or None,
"color": color or None,
"created_at": _now(),
}
res = duckdb_upsert(
cfg.db_path,
"addressbooks",
[row],
key_cols=["slug"],
update_cols=["display_name", "collection_path", "description", "color"],
)
if res.get("status") != "ok":
return {"status": "error", "error": res.get("error")}
return {
"status": "ok",
"slug": slug,
"collection_path": collection_path,
"existed": mk.get("existed", False),
}
def make_calendar(cfg: Config, fields: dict) -> dict:
"""Crea un calendario CalDAV en Xandikos (paridad con make_addressbook)."""
slug = (fields.get("slug") or "").strip()
if not slug:
return {"status": "error", "error": "slug vacío"}
if not _valid_key(slug):
return {"status": "error", "error": f"slug inválido: {slug!r}"}
display_name = fields.get("display_name") or ""
color = fields.get("color") or ""
pwd, err = _resolve_password(cfg)
if err is not None:
return {"status": "error", "error": err}
mk = dav_make_calendar(
cfg.dav_base,
cfg.dav_user,
pwd,
cfg.dav_calendar_home,
slug,
display_name,
color,
)
if mk.get("status") != "ok":
return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")}
return {
"status": "ok",
"slug": slug,
"href": mk.get("href"),
"existed": mk.get("existed", False),
}
# ---------------------------------------------------------------------------
# push masivo DB -> Xandikos
# ---------------------------------------------------------------------------
def push_all_dav(cfg: Config) -> dict:
"""Reconcilia en bloque: empuja todos los contacts y events de la DB a Xandikos.
Útil tras una migración para volcar lo que vive solo en la DB. Devuelve los
conteos de éxito/fallo por tipo. NO borra nada en remoto (solo PUT).
"""
pwd, err = _resolve_password(cfg)
if err is not None:
return {"status": "error", "error": err}
contacts = duckdb_query_readonly(
cfg.db_path,
"SELECT uid, collection, fn, tels, emails, note_path FROM contacts",
[],
1000000,
)
c_ok = c_fail = 0
if contacts.get("status") == "ok":
for row in contacts["rows"]:
uid = row["uid"]
collection = row.get("collection") or _default_collection(cfg)
# vCard de AGENDA: compone con la persona enlazada (direcciones +
# aliases) pero SIN ningún campo OSINT (privacidad del móvil).
vcard = _compose_agenda_vcard(
cfg,
uid,
{
"fn": row.get("fn"),
"tels": _decode_json_field(row.get("tels")),
"emails": _decode_json_field(row.get("emails")),
"note_path": row.get("note_path"),
},
)
push = carddav_put_vcard(
cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard
)
if push.get("status") == "ok":
c_ok += 1
# Captura el etag nuevo del push (sync inverso fiable).
etag = _read_etag_after_push(cfg, pwd, collection, uid)
if etag:
duckdb_execute(
cfg.db_path,
"UPDATE contacts SET etag = ? WHERE uid = ?",
[etag, uid],
)
else:
c_fail += 1
events = duckdb_query_readonly(
cfg.db_path, "SELECT uid, calendar, raw FROM events", [], 1000000
)
e_ok = e_fail = 0
if events.get("status") == "ok":
for row in events["rows"]:
uid = row["uid"]
calendar = row.get("calendar") or "default"
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
raw = _ensure_vcalendar(row.get("raw")) or _build_vcalendar(uid, {})
push = caldav_put_event(
cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw
)
if push.get("status") == "ok":
e_ok += 1
else:
e_fail += 1
return {
"status": "ok",
"contacts_pushed": c_ok,
"contacts_failed": c_fail,
"events_pushed": e_ok,
"events_failed": e_fail,
}
def _ensure_vcalendar(raw) -> str:
"""Garantiza que un recurso de evento tenga el envoltorio VCALENDAR.
El ``raw`` de un evento a veces guarda SOLO el bloque ``BEGIN:VEVENT ...
END:VEVENT`` (así lo extrae el parser del ingest DAV). Subir eso a CalDAV
produce un recurso ``.ics`` inválido: Xandikos falla al pedir la propiedad
``schedule-tag`` (``assert isinstance(cal, Calendar)``) y devuelve 500 para
todo el calendario. Esta función envuelve el VEVENT en un VCALENDAR mínimo
cuando falta, normalizando a CRLF; si el raw ya es un VCALENDAR lo deja igual.
Devuelve cadena vacía si no hay contenido (el llamador cae a _build_vcalendar).
"""
text = (raw or "").strip()
if not text:
return ""
if "BEGIN:VCALENDAR" in text.upper():
return raw if raw.endswith("\r\n") else raw + "\r\n"
text = text.replace("\r\n", "\n").replace("\r", "\n")
body = "\r\n".join(text.split("\n"))
return (
"BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//osint_db//events//EN\r\n"
+ body
+ "\r\nEND:VCALENDAR\r\n"
)
# ---------------------------------------------------------------------------
# push masivo POR DISCO (vía rápida: 1 rsync + 1 commit + 1 PROPFIND)
# ---------------------------------------------------------------------------
def _write_agenda_vcards_to_dir(cfg: Config, rows: list, out_dir: str) -> dict:
"""Genera el .vcf de agenda (SIN OSINT) de cada contacto en ``out_dir``.
Para cada fila de ``contacts`` (uid, fn, tels, emails, note_path) compone el
vCard con ``_compose_agenda_vcard`` y lo escribe a
``out_dir/<_safe_resource(uid)>.vcf`` — EXACTAMENTE el mismo nombre de
recurso que usa el push HTTP (``carddav_put_vcard``), para que rsync no cree
duplicados ni huérfanos respecto a la colección remota.
Es la parte testeable sin red/SSH del push masivo por disco: genera los
ficheros locales y devuelve el mapa nombre_recurso -> uid (necesario luego
para casar los etags del PROPFIND con sus uids).
Args:
cfg: configuración del service (db_path para resolver la persona enlazada).
rows: filas de contacts como dicts con uid, fn, tels, emails, note_path.
out_dir: directorio temporal local donde escribir los .vcf.
Returns:
{"written": N, "by_resource": {"<safe>.vcf": uid, ...}}.
"""
by_resource: dict = {}
written = 0
for row in rows:
uid = row["uid"]
vcard = _compose_agenda_vcard(
cfg,
uid,
{
"fn": row.get("fn"),
"tels": _decode_json_field(row.get("tels")),
"emails": _decode_json_field(row.get("emails")),
"note_path": row.get("note_path"),
},
)
resource = _resource_href_tail(uid) # _safe_resource(uid) + ".vcf"
# En el caso (raro) de que dos uids saneen al mismo recurso, gana el
# último, igual que haría el push HTTP secuencial.
by_resource[resource] = uid
with open(os.path.join(out_dir, resource), "w", encoding="utf-8") as fh:
fh.write(vcard)
written += 1
return {"written": written, "by_resource": by_resource}
def _rsync_vcards(local_dir: str, ssh_host: str, remote_dir: str) -> dict:
"""rsync de los .vcf del directorio local al working tree remoto de Xandikos.
Sincroniza SOLO los ``*.vcf`` (``--include='*.vcf' --exclude='*'``), de modo
que ningún otro fichero del working tree remoto se toca: ni ``.git/`` (el
historial), ni ``.xandikos`` (metadata del tipo de colección), ni
``push-subscriptions.json`` (suscripciones WebDAV-Push), ni un ``.gitignore``.
``--delete`` borra del remoto los .vcf que ya no están en local — como local
contiene TODOS los contactos de la DB, esto deja la colección de .vcf
EXACTAMENTE igual a la DB (limpia recursos .vcf huérfanos) sin afectar a los
ficheros no-.vcf, que quedan protegidos por estar excluidos.
Returns:
{"status":"ok", "stdout":..., "stderr":...} o {"status":"error", "error":...}.
"""
src = local_dir.rstrip("/") + "/"
dst = f"{ssh_host}:{remote_dir}"
cmd = [
"rsync",
"-az",
"--delete",
"--include=*.vcf",
"--exclude=*",
src,
dst,
]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=300, check=False
)
except (OSError, subprocess.TimeoutExpired) as e:
return {"status": "error", "error": f"rsync falló: {e}"}
if proc.returncode != 0:
return {
"status": "error",
"error": f"rsync rc={proc.returncode}: {proc.stderr.strip()}",
}
return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr}
def _git_commit_remote(ssh_host: str, remote_dir: str) -> dict:
"""UN solo commit en el working tree remoto (lo que Xandikos sirve).
Hace ``git add -A`` (recoge altas, bajas y modificaciones de .vcf que dejó el
rsync) y un único commit con identidad fija ``osint_db``. El ``|| true``
evita fallar cuando no hay cambios (commit vacío). Captura el HEAD antes y
después para confirmar que SOLO se añadió un commit (o ninguno).
Returns:
{"status":"ok", "head_before":..., "head_after":..., "committed":bool} o
{"status":"error", "error":...}.
"""
# rev-parse HEAD antes.
head_before = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD")
if head_before.get("status") != "ok":
return {"status": "error", "error": f"rev-parse(before): {head_before.get('error')}"}
script = (
f"cd {remote_dir} && git add -A && "
"git -c user.email=osint_db -c user.name=osint_db "
"commit -m 'bulk push from DuckDB' || true"
)
commit = _ssh_capture(ssh_host, script)
if commit.get("status") != "ok":
return {"status": "error", "error": f"git commit: {commit.get('error')}"}
head_after = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD")
if head_after.get("status") != "ok":
return {"status": "error", "error": f"rev-parse(after): {head_after.get('error')}"}
before = (head_before.get("stdout") or "").strip()
after = (head_after.get("stdout") or "").strip()
return {
"status": "ok",
"head_before": before,
"head_after": after,
"committed": before != after,
}
def _ssh_capture(ssh_host: str, remote_cmd: str) -> dict:
"""Ejecuta un comando en el host remoto vía ssh y captura stdout/stderr.
Usa ``BatchMode=yes`` para fallar rápido si no hay auth por clave (nunca pide
contraseña interactiva, que colgaría el service). NO interpola secretos en el
comando — solo paths y verbos git fijos.
"""
cmd = [
"ssh",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=10",
ssh_host,
remote_cmd,
]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=60, check=False
)
except (OSError, subprocess.TimeoutExpired) as e:
return {"status": "error", "error": f"ssh falló: {e}"}
if proc.returncode != 0:
return {
"status": "error",
"error": f"ssh rc={proc.returncode}: {proc.stderr.strip()}",
}
return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr}
def _capture_etags_after_bulk(
cfg: Config, pwd: str, collection: str, by_resource: dict
) -> dict:
"""Lee en UN PROPFIND los etags de la colección y los persiste por uid.
Tras el commit remoto, lista la colección entera (PROPFIND Depth:1 -> [{href,
etag}]) y casa cada recurso con su uid usando ``by_resource`` (nombre .vcf ->
uid construido al generar los ficheros). Hace un UPDATE de ``contacts.etag``
por uid encontrado, dejando los etags guardados == los del servidor, para que
el próximo ``/api/sync/dav-pull`` no lo confunda con una edición del móvil.
Returns:
{"status":"ok", "updated":N} o {"status":"error", "error":...}.
"""
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
if listing.get("status") != "ok":
return {
"status": "error",
"error": f"PROPFIND {collection}: {listing.get('error')} "
f"(http {listing.get('http_status')})",
}
updated = 0
for res in listing.get("resources", []):
href = res.get("href") or ""
etag = res.get("etag")
tail = href.rstrip("/").rsplit("/", 1)[-1]
uid = by_resource.get(tail)
if uid is None or not etag:
continue
up = duckdb_execute(
cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid]
)
if up.get("status") == "ok":
updated += up.get("rowcount", 0) or 0
return {"status": "ok", "updated": updated}
def push_all_dav_bulk(cfg: Config) -> dict:
"""Push masivo DB -> Xandikos por DISCO: 1 rsync + 1 commit + 1 PROPFIND.
Vía RÁPIDA equivalente a ``push_all_dav`` para los CONTACTOS, pensada para
reconciliar las ~1000 fichas de la DB en segundos en vez de minutos. Evita
los tres cuellos del push HTTP secuencial:
- 1 PUT HTTP por contacto -> 1 rsync de todos los .vcf de golpe.
- 1 commit git por PUT -> 1 solo commit en el working tree remoto.
- 1 PROPFIND por contacto -> 1 PROPFIND de toda la colección al final.
Flujo:
1. Lee TODOS los contacts de la DB (uid, collection, fn, tels, emails,
note_path) de la colección CardDAV por defecto.
2. Genera el vCard de AGENDA (SIN OSINT) de cada uno en un tmpdir local,
con el nombre de recurso EXACTO del push HTTP (``_safe_resource``).
3. rsync ``--delete`` ese tmpdir al working tree remoto, sincronizando SOLO
los .vcf (protege .git/.xandikos/push-subscriptions.json).
4. UN solo commit en el remoto -> Xandikos recalcula el ctag (DAVx5 detecta).
5. UN PROPFIND de la colección -> {uid: etag} -> UPDATE de ``contacts.etag``.
Solo cubre la colección CardDAV por defecto (``cfg.dav_contacts_collection``),
donde viven todos los contactos del ecosistema OSINT hoy. Los eventos CalDAV
siguen yendo por el push HTTP (``push_all_dav``).
Requisitos: SSH por clave al host de Xandikos (``cfg.dav_bulk_ssh_host``) con
acceso de escritura al working tree (``cfg.dav_bulk_remote_dir``), y ``rsync``
instalado en ambos lados. Si no hay SSH, usar ``push_all_dav`` (HTTP) como
fallback.
Returns:
{"status":"ok", "written":N, "rsynced":bool, "committed":bool,
"etags_updated":N, "head_before":..., "head_after":..., "elapsed_s":F} o
{"status":"error", "error":...}.
"""
started = time.monotonic()
collection = _default_collection(cfg)
contacts = duckdb_query_readonly(
cfg.db_path,
"SELECT uid, collection, fn, tels, emails, note_path FROM contacts "
"WHERE collection = ?",
[collection],
1000000,
)
if contacts.get("status") != "ok":
return {"status": "error", "error": f"lectura contacts: {contacts.get('error')}"}
rows = contacts.get("rows", [])
# 1+2. Genera los .vcf de agenda en un tmpdir local (parte sin red).
tmp_dir = tempfile.mkdtemp(prefix="osint_db_bulk_")
try:
gen = _write_agenda_vcards_to_dir(cfg, rows, tmp_dir)
by_resource = gen["by_resource"]
written = gen["written"]
# 3. rsync de todos los .vcf al working tree remoto (solo *.vcf).
rs = _rsync_vcards(tmp_dir, cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir)
if rs.get("status") != "ok":
return {"status": "error", "error": rs.get("error"), "written": written}
# 4. UN solo commit en el remoto (Xandikos recalcula el ctag).
commit = _git_commit_remote(cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir)
if commit.get("status") != "ok":
return {"status": "error", "error": commit.get("error"), "written": written}
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
# 5. UN PROPFIND -> {uid: etag} -> UPDATE contacts.etag (sync inverso fiable).
pwd, err = _resolve_password(cfg)
etags_updated = 0
etag_error = None
if err is None:
cap = _capture_etags_after_bulk(cfg, pwd, collection, by_resource)
if cap.get("status") == "ok":
etags_updated = cap.get("updated", 0)
else:
etag_error = cap.get("error")
else:
etag_error = err
return {
"status": "ok",
"written": written,
"rsynced": True,
"committed": commit.get("committed", False),
"etags_updated": etags_updated,
"etag_error": etag_error,
"head_before": commit.get("head_before"),
"head_after": commit.get("head_after"),
"elapsed_s": round(time.monotonic() - started, 3),
}
# ---------------------------------------------------------------------------
# sync inverso Xandikos -> DB (pull incremental por etag)
# ---------------------------------------------------------------------------
def pull_dav(cfg: Config) -> dict:
"""Trae a la DB las ediciones del móvil/DAVx5, last-write-wins por etag.
A diferencia de ``ingest_dav`` (que hace DELETE + INSERT ciego de TODAS las
colecciones), este pull es INCREMENTAL y respeta la verdad de la DB salvo
donde el etag prueba un cambio externo:
1. Para cada colección registrada en ``addressbooks`` lista los recursos
(PROPFIND Depth:1 -> [{href, etag}]).
2. Por recurso: si su etag difiere del ``contacts.etag`` guardado (o el uid
no existe en la DB) -> GET + parse + upsert con el etag nuevo. Si el
etag coincide -> no se toca (la DB ya está al día).
3. Los uids que la DB tenía en esa colección y que YA no aparecen en el
PROPFIND se borran (el móvil los eliminó).
4. Tras el pull se re-enlazan los contactos con sus fichas y se
reconstruyen las derivadas (bajo el lock single-writer).
Devuelve {status:'ok', pulled, updated, deleted, unchanged} o
{status:'error', error}.
"""
# Late imports: evitan ciclo writes<->ingest a nivel de módulo y reusan la
# lógica de colecciones + enlace + derivadas ya existente (registry-first).
from . import davparse
from .db import write_conn
from .derived import rebuild_derived
from .ingest import _addressbook_collections, _link_contacts
pwd, err = _resolve_password(cfg)
if err is not None:
return {"status": "error", "error": err}
collections = _addressbook_collections(cfg)
pulled = updated = deleted = unchanged = 0
for collection in collections:
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
if listing.get("status") != "ok":
return {
"status": "error",
"error": f"PROPFIND {collection}: {listing.get('error')} "
f"(http {listing.get('http_status')})",
}
# Estado actual de la DB para esta colección: uid -> etag.
db_state = duckdb_query_readonly(
cfg.db_path,
"SELECT uid, etag FROM contacts WHERE collection = ?",
[collection],
1000000,
)
db_etags: dict = {}
if db_state.get("status") == "ok":
db_etags = {r["uid"]: r.get("etag") for r in db_state["rows"]}
seen_uids: set = set()
for res in listing.get("resources", []):
href = res.get("href") or ""
remote_etag = res.get("etag")
# GET solo cuando el etag cambió o el recurso es nuevo en la DB.
# uid provisional desde el nombre del recurso para el cruce rápido;
# el uid real se toma del vCard tras el GET.
res_name = href.rstrip("/").rsplit("/", 1)[-1]
guess_uid = os.path.splitext(res_name)[0]
# Si ya conocemos este uid (por nombre) y el etag coincide -> skip.
if guess_uid in db_etags and db_etags[guess_uid] == remote_etag and remote_etag:
seen_uids.add(guess_uid)
unchanged += 1
continue
got = dav_get_resource(cfg.dav_base, cfg.dav_user, pwd, href)
if got.get("status") != "ok":
# Un recurso ilegible no aborta el pull entero.
continue
parsed = davparse.parse_vcard(got.get("text", ""))
uid = parsed["uid"] or guess_uid
seen_uids.add(uid)
existed = uid in db_etags
# Confirmación con el uid real: si el etag ya casa, no es cambio.
if existed and db_etags[uid] == remote_etag and remote_etag:
unchanged += 1
continue
row = {
"uid": uid,
"collection": collection,
"etag": remote_etag,
"fn": parsed["fn"] or None,
"tels": _json(parsed["tels"]),
"emails": _json(parsed["emails"]),
"raw": got.get("text", ""),
# note_path se re-enlaza después; preserva el existente si lo había.
"note_path": None,
"updated_at": _now(),
}
up = duckdb_upsert(
cfg.db_path,
"contacts",
[row],
key_cols=["uid"],
update_cols=["collection", "etag", "fn", "tels", "emails", "raw", "updated_at"],
)
if up.get("status") != "ok":
return {"status": "error", "error": up.get("error")}
pulled += 1
if existed:
updated += 1
# Borra los uids que la DB tenía en esta colección y ya no están remotos.
for uid in db_etags:
if uid not in seen_uids:
rm = duckdb_execute(
cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid]
)
if rm.get("status") == "ok":
deleted += rm.get("rowcount", 0)
# Re-enlace de contactos + reconstrucción de derivadas, bajo el lock.
with write_conn(cfg.db_path) as conn:
conn.execute("BEGIN")
try:
_link_contacts(conn)
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
rebuild_derived(conn)
return {
"status": "ok",
"pulled": pulled,
"updated": updated,
"deleted": deleted,
"unchanged": unchanged,
}
# ---------------------------------------------------------------------------
# Materialización de los contactos de una organización en su ficha .md
# ---------------------------------------------------------------------------
def render_org_contacts(cfg: Config, slug: str) -> dict:
"""Escribe en la ficha de una organización la tabla de sus contactos.
Lee de ``derived.org_contacts`` las personas relacionadas con la organización
(con su rol y teléfono) y vuelca una tabla Markdown dentro de un bloque
sentinel ``org-contacts`` en el cuerpo de la nota, sin tocar el resto de la
prosa. Idempotente: re-renderizar reescribe solo ese bloque.
Devuelve {status:'ok', slug, contactos:N, note_path} o {status:'skip'|'error'}.
"""
slug = (slug or "").strip()
if not slug:
return {"status": "error", "error": "slug vacío"}
org = duckdb_query_readonly(
cfg.db_path, "SELECT note_path FROM organizations WHERE slug = ?", [slug], 1
)
if org.get("status") != "ok" or not org.get("rows"):
return {"status": "error", "error": f"organización desconocida: {slug!r}"}
note_path = org["rows"][0].get("note_path")
if not note_path:
return {"status": "skip", "slug": slug, "reason": "sin note_path"}
contacts = duckdb_query_readonly(
cfg.db_path,
"SELECT persona, rol, telefono FROM derived.org_contacts "
"WHERE org_slug = ? ORDER BY persona, telefono",
[slug],
500,
)
rows = contacts.get("rows") or []
if not rows:
return {"status": "skip", "slug": slug, "reason": "sin contactos"}
table_md = render_markdown_table(
rows, columns=["persona", "rol", "telefono"], max_rows=500
)
content = "### Contactos\n\n" + (table_md or "_(sin contactos)_")
rel = note_path if note_path.endswith(".md") else note_path + ".md"
abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel))
vault_real = os.path.realpath(cfg.vault_dir)
if not os.path.realpath(abs_path).startswith(vault_real + os.sep):
return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"}
if not os.path.exists(abs_path):
return {"status": "skip", "slug": slug, "reason": "nota inexistente"}
note = read_obsidian_note(abs_path)
new_body = upsert_sentinel_block(
note.get("body", "") or "", "org-contacts", content, marker=SENTINEL_MARKER
)
update_obsidian_note(abs_path, body=new_body)
return {
"status": "ok",
"slug": slug,
"contactos": len(rows),
"note_path": rel,
}
def render_all_org_contacts(cfg: Config) -> dict:
"""Materializa la tabla de contactos en TODAS las organizaciones que tengan.
Recorre las organizaciones con al menos una fila en ``derived.org_contacts``
y llama a ``render_org_contacts`` por cada una. Devuelve conteos agregados.
"""
orgs = duckdb_query_readonly(
cfg.db_path,
"SELECT DISTINCT org_slug FROM derived.org_contacts ORDER BY org_slug",
[],
100000,
)
rendered = skipped = errors = 0
for row in orgs.get("rows") or []:
res = render_org_contacts(cfg, row["org_slug"])
status = res.get("status")
if status == "ok":
rendered += 1
elif status == "skip":
skipped += 1
else:
errors += 1
return {
"status": "ok",
"rendered": rendered,
"skipped": skipped,
"errors": errors,
}