"""Escritura estructurada del service osint_db (DB como fuente de verdad). Estos helpers implementan los endpoints de escritura de /api/person, /api/contact, /api/event, /api/addressbook, /api/calendar y /api/push/dav. El patrón común: 1. Se escribe en la DB DuckDB bajo el lock single-writer del service. 2. El push a Xandikos (CardDAV/CalDAV) y el render DB->nota se hacen DESPUÉS de cerrar la transacción, para no bloquear la DB con la latencia de red. persons es dueña de sus campos estructurados (multi-valor): los singulares telefono/email/direccion se rellenan con el primer elemento de cada lista al materializar la ficha, y la nota Markdown se reescribe SIN tocar su prosa (update_obsidian_note con set_frontmatter hace merge del frontmatter y conserva el body). """ from __future__ import annotations import json import os import re import shutil import subprocess import tempfile import time from datetime import datetime, timezone from .config import SENTINEL_MARKER, Config from .registry_bridge import ( build_vcard, caldav_put_event, carddav_put_vcard, contact_import_key, create_obsidian_note, dav_delete_resource, dav_get_resource, dav_list_resources, dav_make_addressbook, dav_make_calendar, duckdb_execute, duckdb_query_readonly, duckdb_upsert, pass_get_secret, read_obsidian_note, render_markdown_table, update_obsidian_note, upsert_sentinel_block, ) # Columnas de persons gobernadas por la API estructurada (sin slug, que es la # clave, ni note_path/extra_fm que gestiona el ingest del vault). _PERSON_API_COLS = ( "nombre", "aliases", "sexo", "fecha_nacimiento", "dni", "pais", "contexto", "telefonos", "emails", "direcciones", "tags", ) def _now(): return datetime.now(tz=timezone.utc) # Clave (slug de persona, uid de contacto/evento) admisible: empieza por # alfanumérico y solo contiene alfanuméricos y `._-`. Rechaza explícitamente # separadores de ruta (`/`, `\`), saltos de línea y secuencias `..`, que de otro # modo entrarían como filas con clave indeleble por la API REST (los `/` rompen # el routing de DELETE /api//{key}) y se acercan a paths fuera del vault. _VALID_KEY_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$") def _valid_key(value: str) -> bool: """True si ``value`` es una clave admisible (sin `/`, `\\`, `..`, controles).""" return bool(_VALID_KEY_RE.match(value or "")) and ".." not in value def _as_list(value) -> list: """Normaliza a lista de strings no vacíos (string suelto -> [string]).""" if value is None: return [] seq = value if isinstance(value, list) else [value] out = [] for v in seq: s = str(v).strip() if s: out.append(s) return out def _json(value) -> str: return json.dumps(value, ensure_ascii=False, default=str) def _read_person(db_path: str, slug: str) -> dict | None: """Lee una ficha de persons como dict (o None si no existe).""" res = duckdb_query_readonly( db_path, "SELECT slug, note_path, nombre, aliases, sexo, fecha_nacimiento, dni, " "telefono, email, direccion, pais, contexto, fuente, dav_uid, tags, " "telefonos, emails, direcciones, extra_fm FROM persons WHERE slug = ?", [slug], 1, ) if res.get("status") != "ok" or not res.get("rows"): return None return res["rows"][0] def _decode_json_field(value) -> list: """Decodifica un campo JSON de la DB a lista (tolera None/str/list).""" if value is None: return [] if isinstance(value, list): return value try: parsed = json.loads(value) except (TypeError, ValueError): return [] return parsed if isinstance(parsed, list) else [parsed] def _decode_extra_fm(value) -> dict: """Decodifica extra_fm (objeto JSON de la DB) a dict (o {} si no aplica).""" if value is None: return {} if isinstance(value, dict): return value try: parsed = json.loads(value) except (TypeError, ValueError): return {} return parsed if isinstance(parsed, dict) else {} # --------------------------------------------------------------------------- # persons # --------------------------------------------------------------------------- def upsert_person(cfg: Config, slug: str, fields: dict, *, render: bool = True) -> dict: """Crea/actualiza una persona multi-valor y materializa su nota. Escribe los campos estructurados en la DB (la DB es dueña), rellena los singulares con el primer elemento de cada lista, y tras cerrar la escritura materializa la ficha DB->nota (frontmatter OWNED + merge de extra_fm) sin tocar la prosa de la nota. """ slug = (slug or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} if not _valid_key(slug): return {"status": "error", "error": f"slug inválido: {slug!r}"} telefonos = _as_list(fields.get("telefonos")) emails = _as_list(fields.get("emails")) direcciones = _as_list(fields.get("direcciones")) aliases = _as_list(fields.get("aliases")) tags = _as_list(fields.get("tags")) existing = _read_person(cfg.db_path, slug) note_path = (existing.get("note_path") if existing else None) or os.path.join( "personas", f"{slug}.md" ) row = { "slug": slug, "note_path": note_path, "nombre": (fields.get("nombre") or slug), "aliases": _json(aliases), "sexo": fields.get("sexo"), "fecha_nacimiento": fields.get("fecha_nacimiento"), "dni": fields.get("dni"), "telefono": telefonos[0] if telefonos else None, "email": emails[0] if emails else None, "direccion": direcciones[0] if direcciones else None, "pais": fields.get("pais"), "contexto": fields.get("contexto"), "tags": _json(tags), "telefonos": _json(telefonos), "emails": _json(emails), "direcciones": _json(direcciones), "updated_at": _now(), } # update_cols = todo lo que la API gobierna (no pisa fuente/dav_uid/extra_fm # que pertenecen al ingest del vault). update_cols = [c for c in row if c not in ("slug",)] res = duckdb_upsert( cfg.db_path, "persons", [row], key_cols=["slug"], update_cols=update_cols ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} materialized = False if render: r = render_person(cfg, slug) materialized = r.get("status") == "ok" return { "status": "ok", "slug": slug, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "note_path": note_path, "materialized": materialized, } def delete_person(cfg: Config, slug: str) -> dict: """Borra una ficha de persons de la DB (no borra la nota del vault).""" slug = (slug or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} res = duckdb_execute(cfg.db_path, "DELETE FROM persons WHERE slug = ?", [slug]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} return {"status": "ok", "slug": slug, "deleted": res.get("rowcount", 0)} def render_person(cfg: Config, slug: str) -> dict: """Materializa una ficha DB->nota: frontmatter OWNED + extra_fm, sin prosa. Lee la fila de persons, compone el frontmatter (campos OWNED como listas + merge de extra_fm) y lo escribe en la nota con update_obsidian_note (que conserva el body). Si la nota no existe la crea con un body mínimo. """ slug = (slug or "").strip() person = _read_person(cfg.db_path, slug) if person is None: return {"status": "error", "error": f"persona desconocida: {slug!r}"} rel = person.get("note_path") or os.path.join("personas", f"{slug}.md") if not rel.endswith(".md"): rel = rel + ".md" abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel)) vault_real = os.path.realpath(cfg.vault_dir) if not os.path.realpath(abs_path).startswith(vault_real + os.sep): return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"} telefonos = _decode_json_field(person.get("telefonos")) emails = _decode_json_field(person.get("emails")) direcciones = _decode_json_field(person.get("direcciones")) aliases = _decode_json_field(person.get("aliases")) tags = _decode_json_field(person.get("tags")) frontmatter = { "tipo": "persona", "slug": slug, "nombre": person.get("nombre") or slug, "aliases": aliases, "sexo": person.get("sexo"), "fecha_nacimiento": person.get("fecha_nacimiento"), "dni": person.get("dni"), "telefonos": telefonos, "emails": emails, "direcciones": direcciones, # singulares por compatibilidad con consumidores que aún los leen. "telefono": telefonos[0] if telefonos else None, "email": emails[0] if emails else None, "direccion": direcciones[0] if direcciones else None, "pais": person.get("pais"), "contexto": person.get("contexto"), "fuente": person.get("fuente"), "tags": tags, } # Merge del frontmatter no-owned capturado del vault (no pisa las claves # OWNED de arriba). extra_fm es un objeto JSON (dict) en la DB. extra = _decode_extra_fm(person.get("extra_fm")) if extra: merged = dict(extra) merged.update(frontmatter) frontmatter = merged try: if os.path.exists(abs_path): # set_frontmatter hace merge y NO toca el body (prosa preservada). update_obsidian_note(abs_path, set_frontmatter=frontmatter) else: create_obsidian_note( cfg.vault_dir, rel, body="## Notas\n", frontmatter=frontmatter, ) except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} return {"status": "ok", "slug": slug, "note_path": rel} # --------------------------------------------------------------------------- # contacts (DB -> Xandikos) # --------------------------------------------------------------------------- def _resolve_password(cfg: Config) -> tuple: """Resuelve la password de Xandikos desde pass. Devuelve (pwd|None, error|None).""" secret = pass_get_secret(cfg.pass_secret) if secret.get("status") != "ok": return None, ( f"pass no devolvió el secreto {cfg.pass_secret!r}: {secret.get('error')}" ) return secret["value"], None def _default_collection(cfg: Config) -> str: return cfg.dav_contacts_collection def _person_agenda_extras(db_path: str, note_path) -> tuple: """Devuelve (direcciones, aliases) de la persons enlazada por note_path. El móvil solo recibe datos de agenda: si el contacto está enlazado a una ficha de persons, sus direcciones y aliases se incluyen en el vCard (como ADR y NICKNAME). Los campos OSINT (dni/sexo/fecha_nacimiento/pais/contexto) NUNCA salen de la DB: no se leen aquí. Devuelve ([], []) si no hay enlace. """ if not note_path: return [], [] res = duckdb_query_readonly( db_path, "SELECT direcciones, aliases FROM persons WHERE note_path = ?", [note_path], 1, ) if res.get("status") != "ok" or not res.get("rows"): return [], [] row = res["rows"][0] return ( _decode_json_field(row.get("direcciones")), _decode_json_field(row.get("aliases")), ) def _compose_agenda_vcard(cfg: Config, uid: str, fields: dict) -> str: """Compone el vCard de AGENDA de un contacto, SIN ningún campo OSINT. Incluye FN, TEL×N, EMAIL×N y, si el contacto está enlazado a una ficha de persons (por note_path), las direcciones (ADR×N) y aliases (NICKNAME) de esa persona. NUNCA pasa el dict ``osint`` a build_vcard, así que el vCard jamás lleva líneas X-OSINT-* (DNI/sexo/fecha-nac/país/contexto quedan solo en DuckDB + Obsidian). Args: cfg: configuración del service (db_path para resolver la persona). uid: UID del contacto. fields: dict con fn/nombre, tels/telefonos, emails/correos, direcciones/adrs y, opcionalmente, note_path (enlace a persons). Returns: Texto vCard 3.0 listo para PUT a Xandikos. """ # Contacto-empresa (uid 'org-'): el vCard lleva los teléfonos de las # personas de contacto de la organización, cada uno etiquetado con el nombre # de la persona (item.X-ABLabel). Lo gestiona _org_contact_vcard. if str(uid).startswith("org-"): return _org_contact_vcard(cfg, uid, fields.get("fn") or fields.get("nombre")) tels = _as_list(fields.get("tels") or fields.get("telefonos")) emails = _as_list(fields.get("emails") or fields.get("correos")) fn = fields.get("fn") or fields.get("nombre") # Direcciones y aliases del contacto explícitos en fields... adrs = _as_list(fields.get("direcciones") or fields.get("adrs")) aliases = _as_list(fields.get("aliases")) # ...más los de la persons enlazada (si la hay), deduplicando. note_path = fields.get("note_path") person_adrs, person_aliases = _person_agenda_extras(cfg.db_path, note_path) adrs = _dedup_keep_order(adrs + person_adrs) aliases = _dedup_keep_order(aliases + person_aliases) contact = { "uid": uid, "fn": fn, "tels": tels, "emails": emails, "adrs": adrs, "aliases": aliases, } # CLAVE DE PRIVACIDAD: no se pasa 'osint' -> no se emite ninguna X-OSINT-*. return build_vcard(contact) def _vc_escape(value) -> str: """Escapa un valor de texto para una línea vCard (sin \\r; \\n,;, escapados).""" return ( str(value) .replace("\\", "\\\\") .replace("\r", "") .replace("\n", "\\n") .replace(",", "\\,") .replace(";", "\\;") ) def _org_contact_vcard(cfg: Config, uid: str, fn) -> str: """vCard de una EMPRESA con los teléfonos de sus personas de contacto. Cada teléfono va en una propiedad agrupada ``item.TEL`` etiquetada con ``item.X-ABLabel`` = " ()", leídos de ``derived.org_contacts``. Así, al abrir la empresa en el móvil, se ven todos los teléfonos identificados por la persona de contacto. Sin ningún campo OSINT (mismo criterio de privacidad que _compose_agenda_vcard). """ slug = uid[len("org-"):] if str(uid).startswith("org-") else str(uid) name = str(fn).strip() if fn else slug res = duckdb_query_readonly( cfg.db_path, "SELECT persona, rol, telefono FROM derived.org_contacts " "WHERE org_slug = ? ORDER BY persona, telefono", [slug], 1000, ) rows = res.get("rows") or [] lines = [ "BEGIN:VCARD", "VERSION:3.0", "UID:%s" % _vc_escape(uid), "FN:%s" % _vc_escape(name), "ORG:%s" % _vc_escape(name), ] seen = set() i = 0 for r in rows: tel = str(r.get("telefono") or "").strip() if not tel or tel in seen: continue seen.add(tel) i += 1 persona = (r.get("persona") or "contacto").strip() rol = (r.get("rol") or "").strip() label = "%s (%s)" % (persona, rol) if rol and rol != "contacto" else persona lines.append("item%d.TEL;TYPE=CELL:%s" % (i, _vc_escape(tel))) lines.append("item%d.X-ABLabel:%s" % (i, _vc_escape(label))) lines.append("END:VCARD") return "\r\n".join(lines) + "\r\n" def sync_org_contact_cards(cfg: Config) -> dict: """Crea/actualiza un contacto de agenda por organización con contactos. Por cada organización con filas en ``derived.org_contacts`` inserta (o refresca) una fila en ``contacts`` con uid ``org-`` y el nombre de la empresa. El vCard real (con los teléfonos etiquetados por persona) lo compone el push via ``_org_contact_vcard``. Idempotente y no destructivo. Devuelve el número de tarjetas-empresa sincronizadas. """ orgs = duckdb_query_readonly( cfg.db_path, "SELECT DISTINCT d.org_slug, d.org_nombre FROM derived.org_contacts d " "ORDER BY d.org_slug", [], 100000, ) rows = orgs.get("rows") or [] upserted = 0 for o in rows: slug = o.get("org_slug") nombre = o.get("org_nombre") or slug uid = "org-%s" % slug row = { "uid": uid, "collection": cfg.dav_contacts_collection, "etag": None, "fn": nombre, "tels": "[]", "emails": "[]", "raw": "", "note_path": None, "updated_at": _now(), "import_key": contact_import_key(nombre, [], []), } res = duckdb_upsert( cfg.db_path, "contacts", [row], key_cols=["uid"], update_cols=["collection", "fn", "updated_at", "import_key"], ) if res.get("status") == "ok": upserted += 1 return {"status": "ok", "org_cards": upserted} def _dedup_keep_order(items: list) -> list: """Deduplica una lista de strings preservando orden (case-insensitive).""" seen, out = set(), [] for it in items: s = str(it).strip() key = s.lower() if s and key not in seen: seen.add(key) out.append(s) return out def _resource_href_tail(uid: str) -> str: """Nombre del recurso .vcf que carddav_put_vcard deriva del UID.""" return _safe_resource(uid) + ".vcf" def _read_etag_after_push(cfg: Config, pwd: str, collection: str, uid: str): """Lee el etag NUEVO del recurso .vcf de un contacto tras su PUT. Lista la colección (PROPFIND Depth:1) y busca el recurso cuyo href termina en el nombre .vcf del uid. Devuelve el etag o None si no se encuentra/falla. Capturar el etag del push propio evita que el sync inverso lo confunda con una edición hecha desde el móvil. """ listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection) if listing.get("status") != "ok": return None tail = _resource_href_tail(uid) for res in listing.get("resources", []): href = res.get("href") or "" if href.rstrip("/").rsplit("/", 1)[-1] == tail: return res.get("etag") return None def upsert_contact(cfg: Config, uid: str, fields: dict) -> dict: """Crea/actualiza un contacto en la DB y lo empuja a Xandikos (PUT vCard). La escritura DB se hace bajo el lock; el push DAV ocurre después. """ uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} if not _valid_key(uid): return {"status": "error", "error": f"uid inválido: {uid!r}"} tels = _as_list(fields.get("tels") or fields.get("telefonos")) emails = _as_list(fields.get("emails") or fields.get("correos")) fn = fields.get("fn") or fields.get("nombre") collection = fields.get("collection") or _default_collection(cfg) # Si el contacto ya existe y está enlazado a una ficha, hereda su note_path # para que el vCard de agenda incluya las direcciones/aliases de la persona. note_path = fields.get("note_path") if note_path is None: existing = duckdb_query_readonly( cfg.db_path, "SELECT note_path FROM contacts WHERE uid = ?", [uid], 1 ) if existing.get("status") == "ok" and existing.get("rows"): note_path = existing["rows"][0].get("note_path") # vCard de AGENDA: nunca lleva X-OSINT-* (privacidad del móvil). vcard = _compose_agenda_vcard(cfg, uid, {**fields, "note_path": note_path}) row = { "uid": uid, "collection": collection, "etag": None, "fn": fn, "tels": _json(tels), "emails": _json(emails), "raw": vcard, "note_path": note_path, "updated_at": _now(), } res = duckdb_upsert( cfg.db_path, "contacts", [row], key_cols=["uid"], update_cols=["collection", "fn", "tels", "emails", "raw", "note_path", "updated_at"], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # Push DB -> Xandikos fuera de cualquier transacción de la DB. pwd, err = _resolve_password(cfg) pushed = None etag = None if err is None: push = carddav_put_vcard( cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard ) pushed = push.get("status") == "ok" if pushed: # Captura el etag NUEVO del recurso para que el sync inverso no # confunda este push propio con una edición del móvil. etag = _read_etag_after_push(cfg, pwd, collection, uid) if etag: duckdb_execute( cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid], ) return { "status": "ok", "uid": uid, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "pushed": pushed, "etag": etag, "push_error": err, } def delete_contact(cfg: Config, uid: str) -> dict: """Borra un contacto de la DB y del servidor Xandikos (DELETE del recurso).""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} person = duckdb_query_readonly( cfg.db_path, "SELECT collection FROM contacts WHERE uid = ?", [uid], 1 ) collection = _default_collection(cfg) if person.get("status") == "ok" and person.get("rows"): collection = person["rows"][0].get("collection") or collection res = duckdb_execute(cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # Borrado remoto del recurso .vcf (DESTRUCTIVO, explícito por el endpoint). pwd, err = _resolve_password(cfg) deleted_remote = None if err is None: resource = collection.rstrip("/") + "/" + _safe_resource(uid) + ".vcf" rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource) deleted_remote = rm.get("status") == "ok" return { "status": "ok", "uid": uid, "deleted": res.get("rowcount", 0), "deleted_remote": deleted_remote, "push_error": err, } def _safe_resource(uid: str) -> str: """Sanea un UID al mismo nombre de recurso que carddav_put_vcard/caldav_put_event.""" import re return re.sub(r"[^A-Za-z0-9_.-]", "_", uid)[:120] # --------------------------------------------------------------------------- # events (DB -> Xandikos) # --------------------------------------------------------------------------- def _ical_escape(value) -> str: """Escapa un valor de texto para una propiedad iCalendar (RFC 5545). Evita inyección de propiedades/componentes: un summary/location con saltos de línea o `;`/`,` no puede cerrar el VEVENT ni abrir otro. El `\\r` se elimina (el folding lo aporta el `\\r\\n` de la serialización). """ return ( str(value) .replace("\\", "\\\\") .replace("\r", "") .replace("\n", "\\n") .replace(",", "\\,") .replace(";", "\\;") ) def _ical_sanitize(value) -> str: """Quita saltos de línea de un valor estructurado (UID, RRULE) para evitar que se inyecten propiedades nuevas. No escapa `;`/`,` porque son separadores legítimos en RRULE.""" return str(value).replace("\r", "").replace("\n", "") def _build_vcalendar(uid: str, fields: dict) -> str: """Compone un VCALENDAR mínimo con un VEVENT desde los campos del evento.""" dtstart = (fields.get("dtstart") or "").replace("-", "").replace(":", "") dtend = (fields.get("dtend") or "").replace("-", "").replace(":", "") lines = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//osint_db//events//EN", "BEGIN:VEVENT", f"UID:{_ical_sanitize(uid)}", f"SUMMARY:{_ical_escape(fields.get('summary') or '')}", ] if dtstart: lines.append(f"DTSTART:{_ical_sanitize(dtstart)}") if dtend: lines.append(f"DTEND:{_ical_sanitize(dtend)}") if fields.get("location"): lines.append(f"LOCATION:{_ical_escape(fields['location'])}") if fields.get("rrule"): lines.append(f"RRULE:{_ical_sanitize(fields['rrule'])}") lines += ["END:VEVENT", "END:VCALENDAR"] return "\r\n".join(lines) + "\r\n" def upsert_event(cfg: Config, uid: str, fields: dict) -> dict: """Crea/actualiza un evento en la DB y lo empuja a Xandikos (PUT VCALENDAR).""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} if not _valid_key(uid): return {"status": "error", "error": f"uid inválido: {uid!r}"} calendar = fields.get("calendar") or "default" raw = _build_vcalendar(uid, fields) row = { "uid": uid, "calendar": calendar, "etag": None, "dtstart": fields.get("dtstart"), "dtend": fields.get("dtend"), "all_day": bool(fields.get("all_day")), "summary": fields.get("summary"), "location": fields.get("location"), "rrule": fields.get("rrule"), "raw": raw, "updated_at": _now(), } res = duckdb_upsert( cfg.db_path, "events", [row], key_cols=["uid"], update_cols=[ "calendar", "dtstart", "dtend", "all_day", "summary", "location", "rrule", "raw", "updated_at", ], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} # El calendario CalDAV destino se resuelve por su path; usamos el calendar # home + slug del calendario. Push fuera de transacción. pwd, err = _resolve_password(cfg) pushed = None if err is None: collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" push = caldav_put_event( cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw ) pushed = push.get("status") == "ok" return { "status": "ok", "uid": uid, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), "pushed": pushed, "push_error": err, } def delete_event(cfg: Config, uid: str) -> dict: """Borra un evento de la DB y del servidor Xandikos.""" uid = (uid or "").strip() if not uid: return {"status": "error", "error": "uid vacío"} row = duckdb_query_readonly( cfg.db_path, "SELECT calendar FROM events WHERE uid = ?", [uid], 1 ) calendar = "default" if row.get("status") == "ok" and row.get("rows"): calendar = row["rows"][0].get("calendar") or calendar res = duckdb_execute(cfg.db_path, "DELETE FROM events WHERE uid = ?", [uid]) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} pwd, err = _resolve_password(cfg) deleted_remote = None if err is None: collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" resource = collection + _safe_resource(uid) + ".ics" rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource) deleted_remote = rm.get("status") == "ok" return { "status": "ok", "uid": uid, "deleted": res.get("rowcount", 0), "deleted_remote": deleted_remote, "push_error": err, } # --------------------------------------------------------------------------- # addressbooks / calendars # --------------------------------------------------------------------------- def make_addressbook(cfg: Config, fields: dict) -> dict: """Crea una libreta CardDAV en Xandikos y la registra en la tabla addressbooks.""" slug = (fields.get("slug") or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} if not _valid_key(slug): return {"status": "error", "error": f"slug inválido: {slug!r}"} display_name = fields.get("display_name") or "" description = fields.get("description") or "" color = fields.get("color") or "" pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} # contacts_home = el directorio padre de la colección por defecto # (/enmanuel/contacts/addressbook/ -> /enmanuel/contacts/). contacts_home = "/" + "/".join( cfg.dav_contacts_collection.strip("/").split("/")[:-1] ) if not contacts_home.endswith("/"): contacts_home += "/" mk = dav_make_addressbook( cfg.dav_base, cfg.dav_user, pwd, contacts_home, slug, display_name, description, ) if mk.get("status") != "ok": return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")} collection_path = mk.get("href") or (contacts_home + slug + "/") row = { "slug": slug, "display_name": display_name or slug, "collection_path": collection_path, "description": description or None, "color": color or None, "created_at": _now(), } res = duckdb_upsert( cfg.db_path, "addressbooks", [row], key_cols=["slug"], update_cols=["display_name", "collection_path", "description", "color"], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} return { "status": "ok", "slug": slug, "collection_path": collection_path, "existed": mk.get("existed", False), } def make_calendar(cfg: Config, fields: dict) -> dict: """Crea un calendario CalDAV en Xandikos (paridad con make_addressbook).""" slug = (fields.get("slug") or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} if not _valid_key(slug): return {"status": "error", "error": f"slug inválido: {slug!r}"} display_name = fields.get("display_name") or "" color = fields.get("color") or "" pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} mk = dav_make_calendar( cfg.dav_base, cfg.dav_user, pwd, cfg.dav_calendar_home, slug, display_name, color, ) if mk.get("status") != "ok": return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")} return { "status": "ok", "slug": slug, "href": mk.get("href"), "existed": mk.get("existed", False), } # --------------------------------------------------------------------------- # push masivo DB -> Xandikos # --------------------------------------------------------------------------- def push_all_dav(cfg: Config) -> dict: """Reconcilia en bloque: empuja todos los contacts y events de la DB a Xandikos. Útil tras una migración para volcar lo que vive solo en la DB. Devuelve los conteos de éxito/fallo por tipo. NO borra nada en remoto (solo PUT). """ pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} contacts = duckdb_query_readonly( cfg.db_path, "SELECT uid, collection, fn, tels, emails, note_path FROM contacts", [], 1000000, ) c_ok = c_fail = 0 if contacts.get("status") == "ok": for row in contacts["rows"]: uid = row["uid"] collection = row.get("collection") or _default_collection(cfg) # vCard de AGENDA: compone con la persona enlazada (direcciones + # aliases) pero SIN ningún campo OSINT (privacidad del móvil). vcard = _compose_agenda_vcard( cfg, uid, { "fn": row.get("fn"), "tels": _decode_json_field(row.get("tels")), "emails": _decode_json_field(row.get("emails")), "note_path": row.get("note_path"), }, ) push = carddav_put_vcard( cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard ) if push.get("status") == "ok": c_ok += 1 # Captura el etag nuevo del push (sync inverso fiable). etag = _read_etag_after_push(cfg, pwd, collection, uid) if etag: duckdb_execute( cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid], ) else: c_fail += 1 events = duckdb_query_readonly( cfg.db_path, "SELECT uid, calendar, raw FROM events", [], 1000000 ) e_ok = e_fail = 0 if events.get("status") == "ok": for row in events["rows"]: uid = row["uid"] calendar = row.get("calendar") or "default" collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/" raw = _ensure_vcalendar(row.get("raw")) or _build_vcalendar(uid, {}) push = caldav_put_event( cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw ) if push.get("status") == "ok": e_ok += 1 else: e_fail += 1 return { "status": "ok", "contacts_pushed": c_ok, "contacts_failed": c_fail, "events_pushed": e_ok, "events_failed": e_fail, } def _ensure_vcalendar(raw) -> str: """Garantiza que un recurso de evento tenga el envoltorio VCALENDAR. El ``raw`` de un evento a veces guarda SOLO el bloque ``BEGIN:VEVENT ... END:VEVENT`` (así lo extrae el parser del ingest DAV). Subir eso a CalDAV produce un recurso ``.ics`` inválido: Xandikos falla al pedir la propiedad ``schedule-tag`` (``assert isinstance(cal, Calendar)``) y devuelve 500 para todo el calendario. Esta función envuelve el VEVENT en un VCALENDAR mínimo cuando falta, normalizando a CRLF; si el raw ya es un VCALENDAR lo deja igual. Devuelve cadena vacía si no hay contenido (el llamador cae a _build_vcalendar). """ text = (raw or "").strip() if not text: return "" if "BEGIN:VCALENDAR" in text.upper(): return raw if raw.endswith("\r\n") else raw + "\r\n" text = text.replace("\r\n", "\n").replace("\r", "\n") body = "\r\n".join(text.split("\n")) return ( "BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//osint_db//events//EN\r\n" + body + "\r\nEND:VCALENDAR\r\n" ) # --------------------------------------------------------------------------- # push masivo POR DISCO (vía rápida: 1 rsync + 1 commit + 1 PROPFIND) # --------------------------------------------------------------------------- def _write_agenda_vcards_to_dir(cfg: Config, rows: list, out_dir: str) -> dict: """Genera el .vcf de agenda (SIN OSINT) de cada contacto en ``out_dir``. Para cada fila de ``contacts`` (uid, fn, tels, emails, note_path) compone el vCard con ``_compose_agenda_vcard`` y lo escribe a ``out_dir/<_safe_resource(uid)>.vcf`` — EXACTAMENTE el mismo nombre de recurso que usa el push HTTP (``carddav_put_vcard``), para que rsync no cree duplicados ni huérfanos respecto a la colección remota. Es la parte testeable sin red/SSH del push masivo por disco: genera los ficheros locales y devuelve el mapa nombre_recurso -> uid (necesario luego para casar los etags del PROPFIND con sus uids). Args: cfg: configuración del service (db_path para resolver la persona enlazada). rows: filas de contacts como dicts con uid, fn, tels, emails, note_path. out_dir: directorio temporal local donde escribir los .vcf. Returns: {"written": N, "by_resource": {".vcf": uid, ...}}. """ by_resource: dict = {} written = 0 for row in rows: uid = row["uid"] vcard = _compose_agenda_vcard( cfg, uid, { "fn": row.get("fn"), "tels": _decode_json_field(row.get("tels")), "emails": _decode_json_field(row.get("emails")), "note_path": row.get("note_path"), }, ) resource = _resource_href_tail(uid) # _safe_resource(uid) + ".vcf" # En el caso (raro) de que dos uids saneen al mismo recurso, gana el # último, igual que haría el push HTTP secuencial. by_resource[resource] = uid with open(os.path.join(out_dir, resource), "w", encoding="utf-8") as fh: fh.write(vcard) written += 1 return {"written": written, "by_resource": by_resource} def _rsync_vcards(local_dir: str, ssh_host: str, remote_dir: str) -> dict: """rsync de los .vcf del directorio local al working tree remoto de Xandikos. Sincroniza SOLO los ``*.vcf`` (``--include='*.vcf' --exclude='*'``), de modo que ningún otro fichero del working tree remoto se toca: ni ``.git/`` (el historial), ni ``.xandikos`` (metadata del tipo de colección), ni ``push-subscriptions.json`` (suscripciones WebDAV-Push), ni un ``.gitignore``. ``--delete`` borra del remoto los .vcf que ya no están en local — como local contiene TODOS los contactos de la DB, esto deja la colección de .vcf EXACTAMENTE igual a la DB (limpia recursos .vcf huérfanos) sin afectar a los ficheros no-.vcf, que quedan protegidos por estar excluidos. Returns: {"status":"ok", "stdout":..., "stderr":...} o {"status":"error", "error":...}. """ src = local_dir.rstrip("/") + "/" dst = f"{ssh_host}:{remote_dir}" cmd = [ "rsync", "-az", "--delete", "--include=*.vcf", "--exclude=*", src, dst, ] try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=300, check=False ) except (OSError, subprocess.TimeoutExpired) as e: return {"status": "error", "error": f"rsync falló: {e}"} if proc.returncode != 0: return { "status": "error", "error": f"rsync rc={proc.returncode}: {proc.stderr.strip()}", } return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr} def _git_commit_remote(ssh_host: str, remote_dir: str) -> dict: """UN solo commit en el working tree remoto (lo que Xandikos sirve). Hace ``git add -A`` (recoge altas, bajas y modificaciones de .vcf que dejó el rsync) y un único commit con identidad fija ``osint_db``. El ``|| true`` evita fallar cuando no hay cambios (commit vacío). Captura el HEAD antes y después para confirmar que SOLO se añadió un commit (o ninguno). Returns: {"status":"ok", "head_before":..., "head_after":..., "committed":bool} o {"status":"error", "error":...}. """ # rev-parse HEAD antes. head_before = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD") if head_before.get("status") != "ok": return {"status": "error", "error": f"rev-parse(before): {head_before.get('error')}"} script = ( f"cd {remote_dir} && git add -A && " "git -c user.email=osint_db -c user.name=osint_db " "commit -m 'bulk push from DuckDB' || true" ) commit = _ssh_capture(ssh_host, script) if commit.get("status") != "ok": return {"status": "error", "error": f"git commit: {commit.get('error')}"} head_after = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD") if head_after.get("status") != "ok": return {"status": "error", "error": f"rev-parse(after): {head_after.get('error')}"} before = (head_before.get("stdout") or "").strip() after = (head_after.get("stdout") or "").strip() return { "status": "ok", "head_before": before, "head_after": after, "committed": before != after, } def _ssh_capture(ssh_host: str, remote_cmd: str) -> dict: """Ejecuta un comando en el host remoto vía ssh y captura stdout/stderr. Usa ``BatchMode=yes`` para fallar rápido si no hay auth por clave (nunca pide contraseña interactiva, que colgaría el service). NO interpola secretos en el comando — solo paths y verbos git fijos. """ cmd = [ "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", ssh_host, remote_cmd, ] try: proc = subprocess.run( cmd, capture_output=True, text=True, timeout=60, check=False ) except (OSError, subprocess.TimeoutExpired) as e: return {"status": "error", "error": f"ssh falló: {e}"} if proc.returncode != 0: return { "status": "error", "error": f"ssh rc={proc.returncode}: {proc.stderr.strip()}", } return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr} def _capture_etags_after_bulk( cfg: Config, pwd: str, collection: str, by_resource: dict ) -> dict: """Lee en UN PROPFIND los etags de la colección y los persiste por uid. Tras el commit remoto, lista la colección entera (PROPFIND Depth:1 -> [{href, etag}]) y casa cada recurso con su uid usando ``by_resource`` (nombre .vcf -> uid construido al generar los ficheros). Hace un UPDATE de ``contacts.etag`` por uid encontrado, dejando los etags guardados == los del servidor, para que el próximo ``/api/sync/dav-pull`` no lo confunda con una edición del móvil. Returns: {"status":"ok", "updated":N} o {"status":"error", "error":...}. """ listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection) if listing.get("status") != "ok": return { "status": "error", "error": f"PROPFIND {collection}: {listing.get('error')} " f"(http {listing.get('http_status')})", } updated = 0 for res in listing.get("resources", []): href = res.get("href") or "" etag = res.get("etag") tail = href.rstrip("/").rsplit("/", 1)[-1] uid = by_resource.get(tail) if uid is None or not etag: continue up = duckdb_execute( cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid] ) if up.get("status") == "ok": updated += up.get("rowcount", 0) or 0 return {"status": "ok", "updated": updated} def push_all_dav_bulk(cfg: Config) -> dict: """Push masivo DB -> Xandikos por DISCO: 1 rsync + 1 commit + 1 PROPFIND. Vía RÁPIDA equivalente a ``push_all_dav`` para los CONTACTOS, pensada para reconciliar las ~1000 fichas de la DB en segundos en vez de minutos. Evita los tres cuellos del push HTTP secuencial: - 1 PUT HTTP por contacto -> 1 rsync de todos los .vcf de golpe. - 1 commit git por PUT -> 1 solo commit en el working tree remoto. - 1 PROPFIND por contacto -> 1 PROPFIND de toda la colección al final. Flujo: 1. Lee TODOS los contacts de la DB (uid, collection, fn, tels, emails, note_path) de la colección CardDAV por defecto. 2. Genera el vCard de AGENDA (SIN OSINT) de cada uno en un tmpdir local, con el nombre de recurso EXACTO del push HTTP (``_safe_resource``). 3. rsync ``--delete`` ese tmpdir al working tree remoto, sincronizando SOLO los .vcf (protege .git/.xandikos/push-subscriptions.json). 4. UN solo commit en el remoto -> Xandikos recalcula el ctag (DAVx5 detecta). 5. UN PROPFIND de la colección -> {uid: etag} -> UPDATE de ``contacts.etag``. Solo cubre la colección CardDAV por defecto (``cfg.dav_contacts_collection``), donde viven todos los contactos del ecosistema OSINT hoy. Los eventos CalDAV siguen yendo por el push HTTP (``push_all_dav``). Requisitos: SSH por clave al host de Xandikos (``cfg.dav_bulk_ssh_host``) con acceso de escritura al working tree (``cfg.dav_bulk_remote_dir``), y ``rsync`` instalado en ambos lados. Si no hay SSH, usar ``push_all_dav`` (HTTP) como fallback. Returns: {"status":"ok", "written":N, "rsynced":bool, "committed":bool, "etags_updated":N, "head_before":..., "head_after":..., "elapsed_s":F} o {"status":"error", "error":...}. """ started = time.monotonic() collection = _default_collection(cfg) contacts = duckdb_query_readonly( cfg.db_path, "SELECT uid, collection, fn, tels, emails, note_path FROM contacts " "WHERE collection = ?", [collection], 1000000, ) if contacts.get("status") != "ok": return {"status": "error", "error": f"lectura contacts: {contacts.get('error')}"} rows = contacts.get("rows", []) # 1+2. Genera los .vcf de agenda en un tmpdir local (parte sin red). tmp_dir = tempfile.mkdtemp(prefix="osint_db_bulk_") try: gen = _write_agenda_vcards_to_dir(cfg, rows, tmp_dir) by_resource = gen["by_resource"] written = gen["written"] # 3. rsync de todos los .vcf al working tree remoto (solo *.vcf). rs = _rsync_vcards(tmp_dir, cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir) if rs.get("status") != "ok": return {"status": "error", "error": rs.get("error"), "written": written} # 4. UN solo commit en el remoto (Xandikos recalcula el ctag). commit = _git_commit_remote(cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir) if commit.get("status") != "ok": return {"status": "error", "error": commit.get("error"), "written": written} finally: shutil.rmtree(tmp_dir, ignore_errors=True) # 5. UN PROPFIND -> {uid: etag} -> UPDATE contacts.etag (sync inverso fiable). pwd, err = _resolve_password(cfg) etags_updated = 0 etag_error = None if err is None: cap = _capture_etags_after_bulk(cfg, pwd, collection, by_resource) if cap.get("status") == "ok": etags_updated = cap.get("updated", 0) else: etag_error = cap.get("error") else: etag_error = err return { "status": "ok", "written": written, "rsynced": True, "committed": commit.get("committed", False), "etags_updated": etags_updated, "etag_error": etag_error, "head_before": commit.get("head_before"), "head_after": commit.get("head_after"), "elapsed_s": round(time.monotonic() - started, 3), } # --------------------------------------------------------------------------- # sync inverso Xandikos -> DB (pull incremental por etag) # --------------------------------------------------------------------------- def pull_dav(cfg: Config) -> dict: """Trae a la DB las ediciones del móvil/DAVx5, last-write-wins por etag. A diferencia de ``ingest_dav`` (que hace DELETE + INSERT ciego de TODAS las colecciones), este pull es INCREMENTAL y respeta la verdad de la DB salvo donde el etag prueba un cambio externo: 1. Para cada colección registrada en ``addressbooks`` lista los recursos (PROPFIND Depth:1 -> [{href, etag}]). 2. Por recurso: si su etag difiere del ``contacts.etag`` guardado (o el uid no existe en la DB) -> GET + parse + upsert con el etag nuevo. Si el etag coincide -> no se toca (la DB ya está al día). 3. Los uids que la DB tenía en esa colección y que YA no aparecen en el PROPFIND se borran (el móvil los eliminó). 4. Tras el pull se re-enlazan los contactos con sus fichas y se reconstruyen las derivadas (bajo el lock single-writer). Devuelve {status:'ok', pulled, updated, deleted, unchanged} o {status:'error', error}. """ # Late imports: evitan ciclo writes<->ingest a nivel de módulo y reusan la # lógica de colecciones + enlace + derivadas ya existente (registry-first). from . import davparse from .db import write_conn from .derived import rebuild_derived from .ingest import _addressbook_collections, _link_contacts pwd, err = _resolve_password(cfg) if err is not None: return {"status": "error", "error": err} collections = _addressbook_collections(cfg) pulled = updated = deleted = unchanged = 0 for collection in collections: listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection) if listing.get("status") != "ok": return { "status": "error", "error": f"PROPFIND {collection}: {listing.get('error')} " f"(http {listing.get('http_status')})", } # Estado actual de la DB para esta colección: uid -> etag. db_state = duckdb_query_readonly( cfg.db_path, "SELECT uid, etag FROM contacts WHERE collection = ?", [collection], 1000000, ) db_etags: dict = {} if db_state.get("status") == "ok": db_etags = {r["uid"]: r.get("etag") for r in db_state["rows"]} seen_uids: set = set() for res in listing.get("resources", []): href = res.get("href") or "" remote_etag = res.get("etag") # GET solo cuando el etag cambió o el recurso es nuevo en la DB. # uid provisional desde el nombre del recurso para el cruce rápido; # el uid real se toma del vCard tras el GET. res_name = href.rstrip("/").rsplit("/", 1)[-1] guess_uid = os.path.splitext(res_name)[0] # Si ya conocemos este uid (por nombre) y el etag coincide -> skip. if guess_uid in db_etags and db_etags[guess_uid] == remote_etag and remote_etag: seen_uids.add(guess_uid) unchanged += 1 continue got = dav_get_resource(cfg.dav_base, cfg.dav_user, pwd, href) if got.get("status") != "ok": # Un recurso ilegible no aborta el pull entero. continue parsed = davparse.parse_vcard(got.get("text", "")) uid = parsed["uid"] or guess_uid seen_uids.add(uid) existed = uid in db_etags # Confirmación con el uid real: si el etag ya casa, no es cambio. if existed and db_etags[uid] == remote_etag and remote_etag: unchanged += 1 continue row = { "uid": uid, "collection": collection, "etag": remote_etag, "fn": parsed["fn"] or None, "tels": _json(parsed["tels"]), "emails": _json(parsed["emails"]), "raw": got.get("text", ""), # note_path se re-enlaza después; preserva el existente si lo había. "note_path": None, "updated_at": _now(), } up = duckdb_upsert( cfg.db_path, "contacts", [row], key_cols=["uid"], update_cols=["collection", "etag", "fn", "tels", "emails", "raw", "updated_at"], ) if up.get("status") != "ok": return {"status": "error", "error": up.get("error")} pulled += 1 if existed: updated += 1 # Borra los uids que la DB tenía en esta colección y ya no están remotos. for uid in db_etags: if uid not in seen_uids: rm = duckdb_execute( cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid] ) if rm.get("status") == "ok": deleted += rm.get("rowcount", 0) # Re-enlace de contactos + reconstrucción de derivadas, bajo el lock. with write_conn(cfg.db_path) as conn: conn.execute("BEGIN") try: _link_contacts(conn) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise rebuild_derived(conn) return { "status": "ok", "pulled": pulled, "updated": updated, "deleted": deleted, "unchanged": unchanged, } # --------------------------------------------------------------------------- # Materialización de los contactos de una organización en su ficha .md # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # network_scans (escaneos de red registrados por otras herramientas) # --------------------------------------------------------------------------- def _scan_ts_compact(scan_ts: str) -> tuple: """Deriva (datetime, 'YYYYMMDD-HHMM') de un timestamp ISO del escaneo. Acepta ISO 8601 con o sin 'Z'/offset. Devuelve (dt, compact) o lanza ValueError si el string no es parseable (el caller lo convierte en error). """ raw = (scan_ts or "").strip() if not raw: raise ValueError("scan_ts vacío") # datetime.fromisoformat no acepta el sufijo 'Z' (Python < 3.11), lo # normalizamos a +00:00 para que parsee el caso UTC más común. iso = raw[:-1] + "+00:00" if raw.endswith("Z") else raw dt = datetime.fromisoformat(iso) return dt, dt.strftime("%Y%m%d-%H%M") def record_scan(cfg: Config, fields: dict) -> dict: """Registra un escaneo de red en network_scans (idempotente por id). El id se deriva de ``::`` donde ts_compact = YYYYMMDD-HHMM de ``scan_ts``. Dos escaneos del mismo target, tipo y minuto colapsan al mismo id: el upsert (ON CONFLICT DO UPDATE) los hace idempotentes en vez de duplicar. La escritura va bajo el lock single-writer del service vía ``duckdb_upsert`` (misma conexión que el resto de endpoints de escritura; DuckDB comparte la instancia de la base dentro del proceso, así que no rompe el lock). ``summary`` es un dict que se serializa a JSON (la columna es de tipo JSON). No se pushea nada a la red: a diferencia de contacts/events, los escaneos no salen del service. Args: cfg: configuración del service (db_path). fields: dict con target, target_slug, scan_type, note_path (requeridos), tool, summary, scan_ts (opcionales/derivables). Returns: {"status":"ok","id":, "inserted":N, "updated":N} o {"status":"error","error":...}. """ target = (fields.get("target") or "").strip() target_slug = (fields.get("target_slug") or "").strip() scan_type = (fields.get("scan_type") or "").strip() note_path = (fields.get("note_path") or "").strip() if not target: return {"status": "error", "error": "falta 'target'"} if not target_slug: return {"status": "error", "error": "falta 'target_slug'"} if not scan_type: return {"status": "error", "error": "falta 'scan_type'"} if not note_path: return {"status": "error", "error": "falta 'note_path'"} try: scan_dt, ts_compact = _scan_ts_compact(fields.get("scan_ts")) except ValueError as e: return {"status": "error", "error": f"scan_ts inválido: {e}"} scan_id = f"{target_slug}:{scan_type}:{ts_compact}" summary = fields.get("summary") row = { "id": scan_id, "target": target, "target_slug": target_slug, "scan_type": scan_type, "tool": fields.get("tool"), "scan_ts": scan_dt, "note_path": note_path, "summary": _json(summary) if summary is not None else None, "created_at": _now(), } res = duckdb_upsert( cfg.db_path, "network_scans", [row], key_cols=["id"], update_cols=[ "target", "target_slug", "scan_type", "tool", "scan_ts", "note_path", "summary", ], ) if res.get("status") != "ok": return {"status": "error", "error": res.get("error")} return { "status": "ok", "id": scan_id, "inserted": res.get("inserted", 0), "updated": res.get("updated", 0), } # --------------------------------------------------------------------------- # Materialización de los contactos de una organización en su ficha .md # --------------------------------------------------------------------------- def render_org_contacts(cfg: Config, slug: str) -> dict: """Escribe en la ficha de una organización la tabla de sus contactos. Lee de ``derived.org_contacts`` las personas relacionadas con la organización (con su rol y teléfono) y vuelca una tabla Markdown dentro de un bloque sentinel ``org-contacts`` en el cuerpo de la nota, sin tocar el resto de la prosa. Idempotente: re-renderizar reescribe solo ese bloque. Devuelve {status:'ok', slug, contactos:N, note_path} o {status:'skip'|'error'}. """ slug = (slug or "").strip() if not slug: return {"status": "error", "error": "slug vacío"} org = duckdb_query_readonly( cfg.db_path, "SELECT note_path FROM organizations WHERE slug = ?", [slug], 1 ) if org.get("status") != "ok" or not org.get("rows"): return {"status": "error", "error": f"organización desconocida: {slug!r}"} note_path = org["rows"][0].get("note_path") if not note_path: return {"status": "skip", "slug": slug, "reason": "sin note_path"} contacts = duckdb_query_readonly( cfg.db_path, "SELECT persona, rol, telefono FROM derived.org_contacts " "WHERE org_slug = ? ORDER BY persona, telefono", [slug], 500, ) rows = contacts.get("rows") or [] if not rows: return {"status": "skip", "slug": slug, "reason": "sin contactos"} table_md = render_markdown_table( rows, columns=["persona", "rol", "telefono"], max_rows=500 ) content = "### Contactos\n\n" + (table_md or "_(sin contactos)_") rel = note_path if note_path.endswith(".md") else note_path + ".md" abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel)) vault_real = os.path.realpath(cfg.vault_dir) if not os.path.realpath(abs_path).startswith(vault_real + os.sep): return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"} if not os.path.exists(abs_path): return {"status": "skip", "slug": slug, "reason": "nota inexistente"} note = read_obsidian_note(abs_path) new_body = upsert_sentinel_block( note.get("body", "") or "", "org-contacts", content, marker=SENTINEL_MARKER ) update_obsidian_note(abs_path, body=new_body) return { "status": "ok", "slug": slug, "contactos": len(rows), "note_path": rel, } def render_all_org_contacts(cfg: Config) -> dict: """Materializa la tabla de contactos en TODAS las organizaciones que tengan. Recorre las organizaciones con al menos una fila en ``derived.org_contacts`` y llama a ``render_org_contacts`` por cada una. Devuelve conteos agregados. """ orgs = duckdb_query_readonly( cfg.db_path, "SELECT DISTINCT org_slug FROM derived.org_contacts ORDER BY org_slug", [], 100000, ) rendered = skipped = errors = 0 for row in orgs.get("rows") or []: res = render_org_contacts(cfg, row["org_slug"]) status = res.get("status") if status == "ok": rendered += 1 elif status == "skip": skipped += 1 else: errors += 1 return { "status": "ok", "rendered": rendered, "skipped": skipped, "errors": errors, }