3063d3c44f
Tabla network_scans (migración 005, schema main, lleva note_path) que otras herramientas pueblan vía HTTP con escaneos de reconocimiento (whois/rdap/dns/ nmap/traceroute/ping). Endpoint POST /api/scan: id determinista <target_slug>:<scan_type>:<YYYYMMDD-HHMM> derivado de scan_ts, idempotente por id (duckdb_upsert ON CONFLICT DO UPDATE) bajo el lock single-writer del service. summary (dict) se serializa a JSON. network_scans no se deriva de notas: ni ingest_vault ni ingest_dav la tocan, así que un re-ingest del vault no la trunca (test lo verifica). Tests: inserción + id derivado, idempotencia mismo-minuto, validación de campos requeridos (422), y no-truncado por ingest del vault. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1635 lines
59 KiB
Python
1635 lines
59 KiB
Python
"""Escritura estructurada del service osint_db (DB como fuente de verdad).
|
||
|
||
Estos helpers implementan los endpoints de escritura de /api/person,
|
||
/api/contact, /api/event, /api/addressbook, /api/calendar y /api/push/dav. El
|
||
patrón común:
|
||
|
||
1. Se escribe en la DB DuckDB bajo el lock single-writer del service.
|
||
2. El push a Xandikos (CardDAV/CalDAV) y el render DB->nota se hacen DESPUÉS
|
||
de cerrar la transacción, para no bloquear la DB con la latencia de red.
|
||
|
||
persons es dueña de sus campos estructurados (multi-valor): los singulares
|
||
telefono/email/direccion se rellenan con el primer elemento de cada lista al
|
||
materializar la ficha, y la nota Markdown se reescribe SIN tocar su prosa
|
||
(update_obsidian_note con set_frontmatter hace merge del frontmatter y conserva
|
||
el body).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
import tempfile
|
||
import time
|
||
from datetime import datetime, timezone
|
||
|
||
from .config import SENTINEL_MARKER, Config
|
||
from .registry_bridge import (
|
||
build_vcard,
|
||
caldav_put_event,
|
||
carddav_put_vcard,
|
||
contact_import_key,
|
||
create_obsidian_note,
|
||
dav_delete_resource,
|
||
dav_get_resource,
|
||
dav_list_resources,
|
||
dav_make_addressbook,
|
||
dav_make_calendar,
|
||
duckdb_execute,
|
||
duckdb_query_readonly,
|
||
duckdb_upsert,
|
||
pass_get_secret,
|
||
read_obsidian_note,
|
||
render_markdown_table,
|
||
update_obsidian_note,
|
||
upsert_sentinel_block,
|
||
)
|
||
|
||
# Columnas de persons gobernadas por la API estructurada (sin slug, que es la
|
||
# clave, ni note_path/extra_fm que gestiona el ingest del vault).
|
||
_PERSON_API_COLS = (
|
||
"nombre",
|
||
"aliases",
|
||
"sexo",
|
||
"fecha_nacimiento",
|
||
"dni",
|
||
"pais",
|
||
"contexto",
|
||
"telefonos",
|
||
"emails",
|
||
"direcciones",
|
||
"tags",
|
||
)
|
||
|
||
|
||
def _now():
|
||
return datetime.now(tz=timezone.utc)
|
||
|
||
|
||
# Clave (slug de persona, uid de contacto/evento) admisible: empieza por
|
||
# alfanumérico y solo contiene alfanuméricos y `._-`. Rechaza explícitamente
|
||
# separadores de ruta (`/`, `\`), saltos de línea y secuencias `..`, que de otro
|
||
# modo entrarían como filas con clave indeleble por la API REST (los `/` rompen
|
||
# el routing de DELETE /api/<x>/{key}) y se acercan a paths fuera del vault.
|
||
_VALID_KEY_RE = re.compile(r"^[A-Za-z0-9][A-Za-z0-9._-]*$")
|
||
|
||
|
||
def _valid_key(value: str) -> bool:
|
||
"""True si ``value`` es una clave admisible (sin `/`, `\\`, `..`, controles)."""
|
||
return bool(_VALID_KEY_RE.match(value or "")) and ".." not in value
|
||
|
||
|
||
def _as_list(value) -> list:
|
||
"""Normaliza a lista de strings no vacíos (string suelto -> [string])."""
|
||
if value is None:
|
||
return []
|
||
seq = value if isinstance(value, list) else [value]
|
||
out = []
|
||
for v in seq:
|
||
s = str(v).strip()
|
||
if s:
|
||
out.append(s)
|
||
return out
|
||
|
||
|
||
def _json(value) -> str:
|
||
return json.dumps(value, ensure_ascii=False, default=str)
|
||
|
||
|
||
def _read_person(db_path: str, slug: str) -> dict | None:
|
||
"""Lee una ficha de persons como dict (o None si no existe)."""
|
||
res = duckdb_query_readonly(
|
||
db_path,
|
||
"SELECT slug, note_path, nombre, aliases, sexo, fecha_nacimiento, dni, "
|
||
"telefono, email, direccion, pais, contexto, fuente, dav_uid, tags, "
|
||
"telefonos, emails, direcciones, extra_fm FROM persons WHERE slug = ?",
|
||
[slug],
|
||
1,
|
||
)
|
||
if res.get("status") != "ok" or not res.get("rows"):
|
||
return None
|
||
return res["rows"][0]
|
||
|
||
|
||
def _decode_json_field(value) -> list:
|
||
"""Decodifica un campo JSON de la DB a lista (tolera None/str/list)."""
|
||
if value is None:
|
||
return []
|
||
if isinstance(value, list):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
except (TypeError, ValueError):
|
||
return []
|
||
return parsed if isinstance(parsed, list) else [parsed]
|
||
|
||
|
||
def _decode_extra_fm(value) -> dict:
|
||
"""Decodifica extra_fm (objeto JSON de la DB) a dict (o {} si no aplica)."""
|
||
if value is None:
|
||
return {}
|
||
if isinstance(value, dict):
|
||
return value
|
||
try:
|
||
parsed = json.loads(value)
|
||
except (TypeError, ValueError):
|
||
return {}
|
||
return parsed if isinstance(parsed, dict) else {}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# persons
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def upsert_person(cfg: Config, slug: str, fields: dict, *, render: bool = True) -> dict:
|
||
"""Crea/actualiza una persona multi-valor y materializa su nota.
|
||
|
||
Escribe los campos estructurados en la DB (la DB es dueña), rellena los
|
||
singulares con el primer elemento de cada lista, y tras cerrar la escritura
|
||
materializa la ficha DB->nota (frontmatter OWNED + merge de extra_fm) sin
|
||
tocar la prosa de la nota.
|
||
"""
|
||
slug = (slug or "").strip()
|
||
if not slug:
|
||
return {"status": "error", "error": "slug vacío"}
|
||
if not _valid_key(slug):
|
||
return {"status": "error", "error": f"slug inválido: {slug!r}"}
|
||
|
||
telefonos = _as_list(fields.get("telefonos"))
|
||
emails = _as_list(fields.get("emails"))
|
||
direcciones = _as_list(fields.get("direcciones"))
|
||
aliases = _as_list(fields.get("aliases"))
|
||
tags = _as_list(fields.get("tags"))
|
||
|
||
existing = _read_person(cfg.db_path, slug)
|
||
note_path = (existing.get("note_path") if existing else None) or os.path.join(
|
||
"personas", f"{slug}.md"
|
||
)
|
||
|
||
row = {
|
||
"slug": slug,
|
||
"note_path": note_path,
|
||
"nombre": (fields.get("nombre") or slug),
|
||
"aliases": _json(aliases),
|
||
"sexo": fields.get("sexo"),
|
||
"fecha_nacimiento": fields.get("fecha_nacimiento"),
|
||
"dni": fields.get("dni"),
|
||
"telefono": telefonos[0] if telefonos else None,
|
||
"email": emails[0] if emails else None,
|
||
"direccion": direcciones[0] if direcciones else None,
|
||
"pais": fields.get("pais"),
|
||
"contexto": fields.get("contexto"),
|
||
"tags": _json(tags),
|
||
"telefonos": _json(telefonos),
|
||
"emails": _json(emails),
|
||
"direcciones": _json(direcciones),
|
||
"updated_at": _now(),
|
||
}
|
||
# update_cols = todo lo que la API gobierna (no pisa fuente/dav_uid/extra_fm
|
||
# que pertenecen al ingest del vault).
|
||
update_cols = [c for c in row if c not in ("slug",)]
|
||
res = duckdb_upsert(
|
||
cfg.db_path, "persons", [row], key_cols=["slug"], update_cols=update_cols
|
||
)
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
|
||
materialized = False
|
||
if render:
|
||
r = render_person(cfg, slug)
|
||
materialized = r.get("status") == "ok"
|
||
|
||
return {
|
||
"status": "ok",
|
||
"slug": slug,
|
||
"inserted": res.get("inserted", 0),
|
||
"updated": res.get("updated", 0),
|
||
"note_path": note_path,
|
||
"materialized": materialized,
|
||
}
|
||
|
||
|
||
def delete_person(cfg: Config, slug: str) -> dict:
|
||
"""Borra una ficha de persons de la DB (no borra la nota del vault)."""
|
||
slug = (slug or "").strip()
|
||
if not slug:
|
||
return {"status": "error", "error": "slug vacío"}
|
||
res = duckdb_execute(cfg.db_path, "DELETE FROM persons WHERE slug = ?", [slug])
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
return {"status": "ok", "slug": slug, "deleted": res.get("rowcount", 0)}
|
||
|
||
|
||
def render_person(cfg: Config, slug: str) -> dict:
|
||
"""Materializa una ficha DB->nota: frontmatter OWNED + extra_fm, sin prosa.
|
||
|
||
Lee la fila de persons, compone el frontmatter (campos OWNED como listas +
|
||
merge de extra_fm) y lo escribe en la nota con update_obsidian_note (que
|
||
conserva el body). Si la nota no existe la crea con un body mínimo.
|
||
"""
|
||
slug = (slug or "").strip()
|
||
person = _read_person(cfg.db_path, slug)
|
||
if person is None:
|
||
return {"status": "error", "error": f"persona desconocida: {slug!r}"}
|
||
|
||
rel = person.get("note_path") or os.path.join("personas", f"{slug}.md")
|
||
if not rel.endswith(".md"):
|
||
rel = rel + ".md"
|
||
abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel))
|
||
vault_real = os.path.realpath(cfg.vault_dir)
|
||
if not os.path.realpath(abs_path).startswith(vault_real + os.sep):
|
||
return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"}
|
||
|
||
telefonos = _decode_json_field(person.get("telefonos"))
|
||
emails = _decode_json_field(person.get("emails"))
|
||
direcciones = _decode_json_field(person.get("direcciones"))
|
||
aliases = _decode_json_field(person.get("aliases"))
|
||
tags = _decode_json_field(person.get("tags"))
|
||
|
||
frontmatter = {
|
||
"tipo": "persona",
|
||
"slug": slug,
|
||
"nombre": person.get("nombre") or slug,
|
||
"aliases": aliases,
|
||
"sexo": person.get("sexo"),
|
||
"fecha_nacimiento": person.get("fecha_nacimiento"),
|
||
"dni": person.get("dni"),
|
||
"telefonos": telefonos,
|
||
"emails": emails,
|
||
"direcciones": direcciones,
|
||
# singulares por compatibilidad con consumidores que aún los leen.
|
||
"telefono": telefonos[0] if telefonos else None,
|
||
"email": emails[0] if emails else None,
|
||
"direccion": direcciones[0] if direcciones else None,
|
||
"pais": person.get("pais"),
|
||
"contexto": person.get("contexto"),
|
||
"fuente": person.get("fuente"),
|
||
"tags": tags,
|
||
}
|
||
# Merge del frontmatter no-owned capturado del vault (no pisa las claves
|
||
# OWNED de arriba). extra_fm es un objeto JSON (dict) en la DB.
|
||
extra = _decode_extra_fm(person.get("extra_fm"))
|
||
if extra:
|
||
merged = dict(extra)
|
||
merged.update(frontmatter)
|
||
frontmatter = merged
|
||
|
||
try:
|
||
if os.path.exists(abs_path):
|
||
# set_frontmatter hace merge y NO toca el body (prosa preservada).
|
||
update_obsidian_note(abs_path, set_frontmatter=frontmatter)
|
||
else:
|
||
create_obsidian_note(
|
||
cfg.vault_dir,
|
||
rel,
|
||
body="## Notas\n",
|
||
frontmatter=frontmatter,
|
||
)
|
||
except Exception as e: # noqa: BLE001
|
||
return {"status": "error", "error": str(e)}
|
||
|
||
return {"status": "ok", "slug": slug, "note_path": rel}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# contacts (DB -> Xandikos)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _resolve_password(cfg: Config) -> tuple:
|
||
"""Resuelve la password de Xandikos desde pass. Devuelve (pwd|None, error|None)."""
|
||
secret = pass_get_secret(cfg.pass_secret)
|
||
if secret.get("status") != "ok":
|
||
return None, (
|
||
f"pass no devolvió el secreto {cfg.pass_secret!r}: {secret.get('error')}"
|
||
)
|
||
return secret["value"], None
|
||
|
||
|
||
def _default_collection(cfg: Config) -> str:
|
||
return cfg.dav_contacts_collection
|
||
|
||
|
||
def _person_agenda_extras(db_path: str, note_path) -> tuple:
|
||
"""Devuelve (direcciones, aliases) de la persons enlazada por note_path.
|
||
|
||
El móvil solo recibe datos de agenda: si el contacto está enlazado a una
|
||
ficha de persons, sus direcciones y aliases se incluyen en el vCard (como
|
||
ADR y NICKNAME). Los campos OSINT (dni/sexo/fecha_nacimiento/pais/contexto)
|
||
NUNCA salen de la DB: no se leen aquí. Devuelve ([], []) si no hay enlace.
|
||
"""
|
||
if not note_path:
|
||
return [], []
|
||
res = duckdb_query_readonly(
|
||
db_path,
|
||
"SELECT direcciones, aliases FROM persons WHERE note_path = ?",
|
||
[note_path],
|
||
1,
|
||
)
|
||
if res.get("status") != "ok" or not res.get("rows"):
|
||
return [], []
|
||
row = res["rows"][0]
|
||
return (
|
||
_decode_json_field(row.get("direcciones")),
|
||
_decode_json_field(row.get("aliases")),
|
||
)
|
||
|
||
|
||
def _compose_agenda_vcard(cfg: Config, uid: str, fields: dict) -> str:
|
||
"""Compone el vCard de AGENDA de un contacto, SIN ningún campo OSINT.
|
||
|
||
Incluye FN, TEL×N, EMAIL×N y, si el contacto está enlazado a una ficha de
|
||
persons (por note_path), las direcciones (ADR×N) y aliases (NICKNAME) de esa
|
||
persona. NUNCA pasa el dict ``osint`` a build_vcard, así que el vCard jamás
|
||
lleva líneas X-OSINT-* (DNI/sexo/fecha-nac/país/contexto quedan solo en
|
||
DuckDB + Obsidian).
|
||
|
||
Args:
|
||
cfg: configuración del service (db_path para resolver la persona).
|
||
uid: UID del contacto.
|
||
fields: dict con fn/nombre, tels/telefonos, emails/correos,
|
||
direcciones/adrs y, opcionalmente, note_path (enlace a persons).
|
||
|
||
Returns:
|
||
Texto vCard 3.0 listo para PUT a Xandikos.
|
||
"""
|
||
# Contacto-empresa (uid 'org-<slug>'): el vCard lleva los teléfonos de las
|
||
# personas de contacto de la organización, cada uno etiquetado con el nombre
|
||
# de la persona (item.X-ABLabel). Lo gestiona _org_contact_vcard.
|
||
if str(uid).startswith("org-"):
|
||
return _org_contact_vcard(cfg, uid, fields.get("fn") or fields.get("nombre"))
|
||
|
||
tels = _as_list(fields.get("tels") or fields.get("telefonos"))
|
||
emails = _as_list(fields.get("emails") or fields.get("correos"))
|
||
fn = fields.get("fn") or fields.get("nombre")
|
||
|
||
# Direcciones y aliases del contacto explícitos en fields...
|
||
adrs = _as_list(fields.get("direcciones") or fields.get("adrs"))
|
||
aliases = _as_list(fields.get("aliases"))
|
||
# ...más los de la persons enlazada (si la hay), deduplicando.
|
||
note_path = fields.get("note_path")
|
||
person_adrs, person_aliases = _person_agenda_extras(cfg.db_path, note_path)
|
||
adrs = _dedup_keep_order(adrs + person_adrs)
|
||
aliases = _dedup_keep_order(aliases + person_aliases)
|
||
|
||
contact = {
|
||
"uid": uid,
|
||
"fn": fn,
|
||
"tels": tels,
|
||
"emails": emails,
|
||
"adrs": adrs,
|
||
"aliases": aliases,
|
||
}
|
||
# CLAVE DE PRIVACIDAD: no se pasa 'osint' -> no se emite ninguna X-OSINT-*.
|
||
return build_vcard(contact)
|
||
|
||
|
||
def _vc_escape(value) -> str:
|
||
"""Escapa un valor de texto para una línea vCard (sin \\r; \\n,;, escapados)."""
|
||
return (
|
||
str(value)
|
||
.replace("\\", "\\\\")
|
||
.replace("\r", "")
|
||
.replace("\n", "\\n")
|
||
.replace(",", "\\,")
|
||
.replace(";", "\\;")
|
||
)
|
||
|
||
|
||
def _org_contact_vcard(cfg: Config, uid: str, fn) -> str:
|
||
"""vCard de una EMPRESA con los teléfonos de sus personas de contacto.
|
||
|
||
Cada teléfono va en una propiedad agrupada ``item<N>.TEL`` etiquetada con
|
||
``item<N>.X-ABLabel`` = "<persona> (<rol>)", leídos de
|
||
``derived.org_contacts``. Así, al abrir la empresa en el móvil, se ven todos
|
||
los teléfonos identificados por la persona de contacto. Sin ningún campo OSINT
|
||
(mismo criterio de privacidad que _compose_agenda_vcard).
|
||
"""
|
||
slug = uid[len("org-"):] if str(uid).startswith("org-") else str(uid)
|
||
name = str(fn).strip() if fn else slug
|
||
res = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT persona, rol, telefono FROM derived.org_contacts "
|
||
"WHERE org_slug = ? ORDER BY persona, telefono",
|
||
[slug],
|
||
1000,
|
||
)
|
||
rows = res.get("rows") or []
|
||
lines = [
|
||
"BEGIN:VCARD",
|
||
"VERSION:3.0",
|
||
"UID:%s" % _vc_escape(uid),
|
||
"FN:%s" % _vc_escape(name),
|
||
"ORG:%s" % _vc_escape(name),
|
||
]
|
||
seen = set()
|
||
i = 0
|
||
for r in rows:
|
||
tel = str(r.get("telefono") or "").strip()
|
||
if not tel or tel in seen:
|
||
continue
|
||
seen.add(tel)
|
||
i += 1
|
||
persona = (r.get("persona") or "contacto").strip()
|
||
rol = (r.get("rol") or "").strip()
|
||
label = "%s (%s)" % (persona, rol) if rol and rol != "contacto" else persona
|
||
lines.append("item%d.TEL;TYPE=CELL:%s" % (i, _vc_escape(tel)))
|
||
lines.append("item%d.X-ABLabel:%s" % (i, _vc_escape(label)))
|
||
lines.append("END:VCARD")
|
||
return "\r\n".join(lines) + "\r\n"
|
||
|
||
|
||
def sync_org_contact_cards(cfg: Config) -> dict:
|
||
"""Crea/actualiza un contacto de agenda por organización con contactos.
|
||
|
||
Por cada organización con filas en ``derived.org_contacts`` inserta (o
|
||
refresca) una fila en ``contacts`` con uid ``org-<slug>`` y el nombre de la
|
||
empresa. El vCard real (con los teléfonos etiquetados por persona) lo compone
|
||
el push via ``_org_contact_vcard``. Idempotente y no destructivo. Devuelve el
|
||
número de tarjetas-empresa sincronizadas.
|
||
"""
|
||
orgs = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT DISTINCT d.org_slug, d.org_nombre FROM derived.org_contacts d "
|
||
"ORDER BY d.org_slug",
|
||
[],
|
||
100000,
|
||
)
|
||
rows = orgs.get("rows") or []
|
||
upserted = 0
|
||
for o in rows:
|
||
slug = o.get("org_slug")
|
||
nombre = o.get("org_nombre") or slug
|
||
uid = "org-%s" % slug
|
||
row = {
|
||
"uid": uid,
|
||
"collection": cfg.dav_contacts_collection,
|
||
"etag": None,
|
||
"fn": nombre,
|
||
"tels": "[]",
|
||
"emails": "[]",
|
||
"raw": "",
|
||
"note_path": None,
|
||
"updated_at": _now(),
|
||
"import_key": contact_import_key(nombre, [], []),
|
||
}
|
||
res = duckdb_upsert(
|
||
cfg.db_path,
|
||
"contacts",
|
||
[row],
|
||
key_cols=["uid"],
|
||
update_cols=["collection", "fn", "updated_at", "import_key"],
|
||
)
|
||
if res.get("status") == "ok":
|
||
upserted += 1
|
||
return {"status": "ok", "org_cards": upserted}
|
||
|
||
|
||
def _dedup_keep_order(items: list) -> list:
|
||
"""Deduplica una lista de strings preservando orden (case-insensitive)."""
|
||
seen, out = set(), []
|
||
for it in items:
|
||
s = str(it).strip()
|
||
key = s.lower()
|
||
if s and key not in seen:
|
||
seen.add(key)
|
||
out.append(s)
|
||
return out
|
||
|
||
|
||
def _resource_href_tail(uid: str) -> str:
|
||
"""Nombre del recurso .vcf que carddav_put_vcard deriva del UID."""
|
||
return _safe_resource(uid) + ".vcf"
|
||
|
||
|
||
def _read_etag_after_push(cfg: Config, pwd: str, collection: str, uid: str):
|
||
"""Lee el etag NUEVO del recurso .vcf de un contacto tras su PUT.
|
||
|
||
Lista la colección (PROPFIND Depth:1) y busca el recurso cuyo href termina
|
||
en el nombre .vcf del uid. Devuelve el etag o None si no se encuentra/falla.
|
||
Capturar el etag del push propio evita que el sync inverso lo confunda con
|
||
una edición hecha desde el móvil.
|
||
"""
|
||
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
|
||
if listing.get("status") != "ok":
|
||
return None
|
||
tail = _resource_href_tail(uid)
|
||
for res in listing.get("resources", []):
|
||
href = res.get("href") or ""
|
||
if href.rstrip("/").rsplit("/", 1)[-1] == tail:
|
||
return res.get("etag")
|
||
return None
|
||
|
||
|
||
def upsert_contact(cfg: Config, uid: str, fields: dict) -> dict:
|
||
"""Crea/actualiza un contacto en la DB y lo empuja a Xandikos (PUT vCard).
|
||
|
||
La escritura DB se hace bajo el lock; el push DAV ocurre después.
|
||
"""
|
||
uid = (uid or "").strip()
|
||
if not uid:
|
||
return {"status": "error", "error": "uid vacío"}
|
||
if not _valid_key(uid):
|
||
return {"status": "error", "error": f"uid inválido: {uid!r}"}
|
||
|
||
tels = _as_list(fields.get("tels") or fields.get("telefonos"))
|
||
emails = _as_list(fields.get("emails") or fields.get("correos"))
|
||
fn = fields.get("fn") or fields.get("nombre")
|
||
collection = fields.get("collection") or _default_collection(cfg)
|
||
|
||
# Si el contacto ya existe y está enlazado a una ficha, hereda su note_path
|
||
# para que el vCard de agenda incluya las direcciones/aliases de la persona.
|
||
note_path = fields.get("note_path")
|
||
if note_path is None:
|
||
existing = duckdb_query_readonly(
|
||
cfg.db_path, "SELECT note_path FROM contacts WHERE uid = ?", [uid], 1
|
||
)
|
||
if existing.get("status") == "ok" and existing.get("rows"):
|
||
note_path = existing["rows"][0].get("note_path")
|
||
|
||
# vCard de AGENDA: nunca lleva X-OSINT-* (privacidad del móvil).
|
||
vcard = _compose_agenda_vcard(cfg, uid, {**fields, "note_path": note_path})
|
||
|
||
row = {
|
||
"uid": uid,
|
||
"collection": collection,
|
||
"etag": None,
|
||
"fn": fn,
|
||
"tels": _json(tels),
|
||
"emails": _json(emails),
|
||
"raw": vcard,
|
||
"note_path": note_path,
|
||
"updated_at": _now(),
|
||
}
|
||
res = duckdb_upsert(
|
||
cfg.db_path,
|
||
"contacts",
|
||
[row],
|
||
key_cols=["uid"],
|
||
update_cols=["collection", "fn", "tels", "emails", "raw", "note_path", "updated_at"],
|
||
)
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
|
||
# Push DB -> Xandikos fuera de cualquier transacción de la DB.
|
||
pwd, err = _resolve_password(cfg)
|
||
pushed = None
|
||
etag = None
|
||
if err is None:
|
||
push = carddav_put_vcard(
|
||
cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard
|
||
)
|
||
pushed = push.get("status") == "ok"
|
||
if pushed:
|
||
# Captura el etag NUEVO del recurso para que el sync inverso no
|
||
# confunda este push propio con una edición del móvil.
|
||
etag = _read_etag_after_push(cfg, pwd, collection, uid)
|
||
if etag:
|
||
duckdb_execute(
|
||
cfg.db_path,
|
||
"UPDATE contacts SET etag = ? WHERE uid = ?",
|
||
[etag, uid],
|
||
)
|
||
return {
|
||
"status": "ok",
|
||
"uid": uid,
|
||
"inserted": res.get("inserted", 0),
|
||
"updated": res.get("updated", 0),
|
||
"pushed": pushed,
|
||
"etag": etag,
|
||
"push_error": err,
|
||
}
|
||
|
||
|
||
def delete_contact(cfg: Config, uid: str) -> dict:
|
||
"""Borra un contacto de la DB y del servidor Xandikos (DELETE del recurso)."""
|
||
uid = (uid or "").strip()
|
||
if not uid:
|
||
return {"status": "error", "error": "uid vacío"}
|
||
|
||
person = duckdb_query_readonly(
|
||
cfg.db_path, "SELECT collection FROM contacts WHERE uid = ?", [uid], 1
|
||
)
|
||
collection = _default_collection(cfg)
|
||
if person.get("status") == "ok" and person.get("rows"):
|
||
collection = person["rows"][0].get("collection") or collection
|
||
|
||
res = duckdb_execute(cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid])
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
|
||
# Borrado remoto del recurso .vcf (DESTRUCTIVO, explícito por el endpoint).
|
||
pwd, err = _resolve_password(cfg)
|
||
deleted_remote = None
|
||
if err is None:
|
||
resource = collection.rstrip("/") + "/" + _safe_resource(uid) + ".vcf"
|
||
rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource)
|
||
deleted_remote = rm.get("status") == "ok"
|
||
return {
|
||
"status": "ok",
|
||
"uid": uid,
|
||
"deleted": res.get("rowcount", 0),
|
||
"deleted_remote": deleted_remote,
|
||
"push_error": err,
|
||
}
|
||
|
||
|
||
def _safe_resource(uid: str) -> str:
|
||
"""Sanea un UID al mismo nombre de recurso que carddav_put_vcard/caldav_put_event."""
|
||
import re
|
||
|
||
return re.sub(r"[^A-Za-z0-9_.-]", "_", uid)[:120]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# events (DB -> Xandikos)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _ical_escape(value) -> str:
|
||
"""Escapa un valor de texto para una propiedad iCalendar (RFC 5545).
|
||
|
||
Evita inyección de propiedades/componentes: un summary/location con saltos de
|
||
línea o `;`/`,` no puede cerrar el VEVENT ni abrir otro. El `\\r` se elimina
|
||
(el folding lo aporta el `\\r\\n` de la serialización).
|
||
"""
|
||
return (
|
||
str(value)
|
||
.replace("\\", "\\\\")
|
||
.replace("\r", "")
|
||
.replace("\n", "\\n")
|
||
.replace(",", "\\,")
|
||
.replace(";", "\\;")
|
||
)
|
||
|
||
|
||
def _ical_sanitize(value) -> str:
|
||
"""Quita saltos de línea de un valor estructurado (UID, RRULE) para evitar
|
||
que se inyecten propiedades nuevas. No escapa `;`/`,` porque son separadores
|
||
legítimos en RRULE."""
|
||
return str(value).replace("\r", "").replace("\n", "")
|
||
|
||
|
||
def _build_vcalendar(uid: str, fields: dict) -> str:
|
||
"""Compone un VCALENDAR mínimo con un VEVENT desde los campos del evento."""
|
||
dtstart = (fields.get("dtstart") or "").replace("-", "").replace(":", "")
|
||
dtend = (fields.get("dtend") or "").replace("-", "").replace(":", "")
|
||
lines = [
|
||
"BEGIN:VCALENDAR",
|
||
"VERSION:2.0",
|
||
"PRODID:-//osint_db//events//EN",
|
||
"BEGIN:VEVENT",
|
||
f"UID:{_ical_sanitize(uid)}",
|
||
f"SUMMARY:{_ical_escape(fields.get('summary') or '')}",
|
||
]
|
||
if dtstart:
|
||
lines.append(f"DTSTART:{_ical_sanitize(dtstart)}")
|
||
if dtend:
|
||
lines.append(f"DTEND:{_ical_sanitize(dtend)}")
|
||
if fields.get("location"):
|
||
lines.append(f"LOCATION:{_ical_escape(fields['location'])}")
|
||
if fields.get("rrule"):
|
||
lines.append(f"RRULE:{_ical_sanitize(fields['rrule'])}")
|
||
lines += ["END:VEVENT", "END:VCALENDAR"]
|
||
return "\r\n".join(lines) + "\r\n"
|
||
|
||
|
||
def upsert_event(cfg: Config, uid: str, fields: dict) -> dict:
|
||
"""Crea/actualiza un evento en la DB y lo empuja a Xandikos (PUT VCALENDAR)."""
|
||
uid = (uid or "").strip()
|
||
if not uid:
|
||
return {"status": "error", "error": "uid vacío"}
|
||
if not _valid_key(uid):
|
||
return {"status": "error", "error": f"uid inválido: {uid!r}"}
|
||
|
||
calendar = fields.get("calendar") or "default"
|
||
raw = _build_vcalendar(uid, fields)
|
||
row = {
|
||
"uid": uid,
|
||
"calendar": calendar,
|
||
"etag": None,
|
||
"dtstart": fields.get("dtstart"),
|
||
"dtend": fields.get("dtend"),
|
||
"all_day": bool(fields.get("all_day")),
|
||
"summary": fields.get("summary"),
|
||
"location": fields.get("location"),
|
||
"rrule": fields.get("rrule"),
|
||
"raw": raw,
|
||
"updated_at": _now(),
|
||
}
|
||
res = duckdb_upsert(
|
||
cfg.db_path,
|
||
"events",
|
||
[row],
|
||
key_cols=["uid"],
|
||
update_cols=[
|
||
"calendar",
|
||
"dtstart",
|
||
"dtend",
|
||
"all_day",
|
||
"summary",
|
||
"location",
|
||
"rrule",
|
||
"raw",
|
||
"updated_at",
|
||
],
|
||
)
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
|
||
# El calendario CalDAV destino se resuelve por su path; usamos el calendar
|
||
# home + slug del calendario. Push fuera de transacción.
|
||
pwd, err = _resolve_password(cfg)
|
||
pushed = None
|
||
if err is None:
|
||
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
|
||
push = caldav_put_event(
|
||
cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw
|
||
)
|
||
pushed = push.get("status") == "ok"
|
||
return {
|
||
"status": "ok",
|
||
"uid": uid,
|
||
"inserted": res.get("inserted", 0),
|
||
"updated": res.get("updated", 0),
|
||
"pushed": pushed,
|
||
"push_error": err,
|
||
}
|
||
|
||
|
||
def delete_event(cfg: Config, uid: str) -> dict:
|
||
"""Borra un evento de la DB y del servidor Xandikos."""
|
||
uid = (uid or "").strip()
|
||
if not uid:
|
||
return {"status": "error", "error": "uid vacío"}
|
||
|
||
row = duckdb_query_readonly(
|
||
cfg.db_path, "SELECT calendar FROM events WHERE uid = ?", [uid], 1
|
||
)
|
||
calendar = "default"
|
||
if row.get("status") == "ok" and row.get("rows"):
|
||
calendar = row["rows"][0].get("calendar") or calendar
|
||
|
||
res = duckdb_execute(cfg.db_path, "DELETE FROM events WHERE uid = ?", [uid])
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
|
||
pwd, err = _resolve_password(cfg)
|
||
deleted_remote = None
|
||
if err is None:
|
||
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
|
||
resource = collection + _safe_resource(uid) + ".ics"
|
||
rm = dav_delete_resource(cfg.dav_base, cfg.dav_user, pwd, resource)
|
||
deleted_remote = rm.get("status") == "ok"
|
||
return {
|
||
"status": "ok",
|
||
"uid": uid,
|
||
"deleted": res.get("rowcount", 0),
|
||
"deleted_remote": deleted_remote,
|
||
"push_error": err,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# addressbooks / calendars
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def make_addressbook(cfg: Config, fields: dict) -> dict:
|
||
"""Crea una libreta CardDAV en Xandikos y la registra en la tabla addressbooks."""
|
||
slug = (fields.get("slug") or "").strip()
|
||
if not slug:
|
||
return {"status": "error", "error": "slug vacío"}
|
||
if not _valid_key(slug):
|
||
return {"status": "error", "error": f"slug inválido: {slug!r}"}
|
||
display_name = fields.get("display_name") or ""
|
||
description = fields.get("description") or ""
|
||
color = fields.get("color") or ""
|
||
|
||
pwd, err = _resolve_password(cfg)
|
||
if err is not None:
|
||
return {"status": "error", "error": err}
|
||
|
||
# contacts_home = el directorio padre de la colección por defecto
|
||
# (/enmanuel/contacts/addressbook/ -> /enmanuel/contacts/).
|
||
contacts_home = "/" + "/".join(
|
||
cfg.dav_contacts_collection.strip("/").split("/")[:-1]
|
||
)
|
||
if not contacts_home.endswith("/"):
|
||
contacts_home += "/"
|
||
|
||
mk = dav_make_addressbook(
|
||
cfg.dav_base,
|
||
cfg.dav_user,
|
||
pwd,
|
||
contacts_home,
|
||
slug,
|
||
display_name,
|
||
description,
|
||
)
|
||
if mk.get("status") != "ok":
|
||
return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")}
|
||
|
||
collection_path = mk.get("href") or (contacts_home + slug + "/")
|
||
row = {
|
||
"slug": slug,
|
||
"display_name": display_name or slug,
|
||
"collection_path": collection_path,
|
||
"description": description or None,
|
||
"color": color or None,
|
||
"created_at": _now(),
|
||
}
|
||
res = duckdb_upsert(
|
||
cfg.db_path,
|
||
"addressbooks",
|
||
[row],
|
||
key_cols=["slug"],
|
||
update_cols=["display_name", "collection_path", "description", "color"],
|
||
)
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
return {
|
||
"status": "ok",
|
||
"slug": slug,
|
||
"collection_path": collection_path,
|
||
"existed": mk.get("existed", False),
|
||
}
|
||
|
||
|
||
def make_calendar(cfg: Config, fields: dict) -> dict:
|
||
"""Crea un calendario CalDAV en Xandikos (paridad con make_addressbook)."""
|
||
slug = (fields.get("slug") or "").strip()
|
||
if not slug:
|
||
return {"status": "error", "error": "slug vacío"}
|
||
if not _valid_key(slug):
|
||
return {"status": "error", "error": f"slug inválido: {slug!r}"}
|
||
display_name = fields.get("display_name") or ""
|
||
color = fields.get("color") or ""
|
||
|
||
pwd, err = _resolve_password(cfg)
|
||
if err is not None:
|
||
return {"status": "error", "error": err}
|
||
|
||
mk = dav_make_calendar(
|
||
cfg.dav_base,
|
||
cfg.dav_user,
|
||
pwd,
|
||
cfg.dav_calendar_home,
|
||
slug,
|
||
display_name,
|
||
color,
|
||
)
|
||
if mk.get("status") != "ok":
|
||
return {"status": "error", "error": mk.get("error"), "http_status": mk.get("http_status")}
|
||
return {
|
||
"status": "ok",
|
||
"slug": slug,
|
||
"href": mk.get("href"),
|
||
"existed": mk.get("existed", False),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# push masivo DB -> Xandikos
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def push_all_dav(cfg: Config) -> dict:
|
||
"""Reconcilia en bloque: empuja todos los contacts y events de la DB a Xandikos.
|
||
|
||
Útil tras una migración para volcar lo que vive solo en la DB. Devuelve los
|
||
conteos de éxito/fallo por tipo. NO borra nada en remoto (solo PUT).
|
||
"""
|
||
pwd, err = _resolve_password(cfg)
|
||
if err is not None:
|
||
return {"status": "error", "error": err}
|
||
|
||
contacts = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT uid, collection, fn, tels, emails, note_path FROM contacts",
|
||
[],
|
||
1000000,
|
||
)
|
||
c_ok = c_fail = 0
|
||
if contacts.get("status") == "ok":
|
||
for row in contacts["rows"]:
|
||
uid = row["uid"]
|
||
collection = row.get("collection") or _default_collection(cfg)
|
||
# vCard de AGENDA: compone con la persona enlazada (direcciones +
|
||
# aliases) pero SIN ningún campo OSINT (privacidad del móvil).
|
||
vcard = _compose_agenda_vcard(
|
||
cfg,
|
||
uid,
|
||
{
|
||
"fn": row.get("fn"),
|
||
"tels": _decode_json_field(row.get("tels")),
|
||
"emails": _decode_json_field(row.get("emails")),
|
||
"note_path": row.get("note_path"),
|
||
},
|
||
)
|
||
push = carddav_put_vcard(
|
||
cfg.dav_base, cfg.dav_user, pwd, collection, uid, vcard
|
||
)
|
||
if push.get("status") == "ok":
|
||
c_ok += 1
|
||
# Captura el etag nuevo del push (sync inverso fiable).
|
||
etag = _read_etag_after_push(cfg, pwd, collection, uid)
|
||
if etag:
|
||
duckdb_execute(
|
||
cfg.db_path,
|
||
"UPDATE contacts SET etag = ? WHERE uid = ?",
|
||
[etag, uid],
|
||
)
|
||
else:
|
||
c_fail += 1
|
||
|
||
events = duckdb_query_readonly(
|
||
cfg.db_path, "SELECT uid, calendar, raw FROM events", [], 1000000
|
||
)
|
||
e_ok = e_fail = 0
|
||
if events.get("status") == "ok":
|
||
for row in events["rows"]:
|
||
uid = row["uid"]
|
||
calendar = row.get("calendar") or "default"
|
||
collection = cfg.dav_calendar_home.rstrip("/") + "/" + calendar + "/"
|
||
raw = _ensure_vcalendar(row.get("raw")) or _build_vcalendar(uid, {})
|
||
push = caldav_put_event(
|
||
cfg.dav_base, cfg.dav_user, pwd, collection, uid, raw
|
||
)
|
||
if push.get("status") == "ok":
|
||
e_ok += 1
|
||
else:
|
||
e_fail += 1
|
||
|
||
return {
|
||
"status": "ok",
|
||
"contacts_pushed": c_ok,
|
||
"contacts_failed": c_fail,
|
||
"events_pushed": e_ok,
|
||
"events_failed": e_fail,
|
||
}
|
||
|
||
|
||
def _ensure_vcalendar(raw) -> str:
|
||
"""Garantiza que un recurso de evento tenga el envoltorio VCALENDAR.
|
||
|
||
El ``raw`` de un evento a veces guarda SOLO el bloque ``BEGIN:VEVENT ...
|
||
END:VEVENT`` (así lo extrae el parser del ingest DAV). Subir eso a CalDAV
|
||
produce un recurso ``.ics`` inválido: Xandikos falla al pedir la propiedad
|
||
``schedule-tag`` (``assert isinstance(cal, Calendar)``) y devuelve 500 para
|
||
todo el calendario. Esta función envuelve el VEVENT en un VCALENDAR mínimo
|
||
cuando falta, normalizando a CRLF; si el raw ya es un VCALENDAR lo deja igual.
|
||
Devuelve cadena vacía si no hay contenido (el llamador cae a _build_vcalendar).
|
||
"""
|
||
text = (raw or "").strip()
|
||
if not text:
|
||
return ""
|
||
if "BEGIN:VCALENDAR" in text.upper():
|
||
return raw if raw.endswith("\r\n") else raw + "\r\n"
|
||
text = text.replace("\r\n", "\n").replace("\r", "\n")
|
||
body = "\r\n".join(text.split("\n"))
|
||
return (
|
||
"BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//osint_db//events//EN\r\n"
|
||
+ body
|
||
+ "\r\nEND:VCALENDAR\r\n"
|
||
)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# push masivo POR DISCO (vía rápida: 1 rsync + 1 commit + 1 PROPFIND)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _write_agenda_vcards_to_dir(cfg: Config, rows: list, out_dir: str) -> dict:
|
||
"""Genera el .vcf de agenda (SIN OSINT) de cada contacto en ``out_dir``.
|
||
|
||
Para cada fila de ``contacts`` (uid, fn, tels, emails, note_path) compone el
|
||
vCard con ``_compose_agenda_vcard`` y lo escribe a
|
||
``out_dir/<_safe_resource(uid)>.vcf`` — EXACTAMENTE el mismo nombre de
|
||
recurso que usa el push HTTP (``carddav_put_vcard``), para que rsync no cree
|
||
duplicados ni huérfanos respecto a la colección remota.
|
||
|
||
Es la parte testeable sin red/SSH del push masivo por disco: genera los
|
||
ficheros locales y devuelve el mapa nombre_recurso -> uid (necesario luego
|
||
para casar los etags del PROPFIND con sus uids).
|
||
|
||
Args:
|
||
cfg: configuración del service (db_path para resolver la persona enlazada).
|
||
rows: filas de contacts como dicts con uid, fn, tels, emails, note_path.
|
||
out_dir: directorio temporal local donde escribir los .vcf.
|
||
|
||
Returns:
|
||
{"written": N, "by_resource": {"<safe>.vcf": uid, ...}}.
|
||
"""
|
||
by_resource: dict = {}
|
||
written = 0
|
||
for row in rows:
|
||
uid = row["uid"]
|
||
vcard = _compose_agenda_vcard(
|
||
cfg,
|
||
uid,
|
||
{
|
||
"fn": row.get("fn"),
|
||
"tels": _decode_json_field(row.get("tels")),
|
||
"emails": _decode_json_field(row.get("emails")),
|
||
"note_path": row.get("note_path"),
|
||
},
|
||
)
|
||
resource = _resource_href_tail(uid) # _safe_resource(uid) + ".vcf"
|
||
# En el caso (raro) de que dos uids saneen al mismo recurso, gana el
|
||
# último, igual que haría el push HTTP secuencial.
|
||
by_resource[resource] = uid
|
||
with open(os.path.join(out_dir, resource), "w", encoding="utf-8") as fh:
|
||
fh.write(vcard)
|
||
written += 1
|
||
return {"written": written, "by_resource": by_resource}
|
||
|
||
|
||
def _rsync_vcards(local_dir: str, ssh_host: str, remote_dir: str) -> dict:
|
||
"""rsync de los .vcf del directorio local al working tree remoto de Xandikos.
|
||
|
||
Sincroniza SOLO los ``*.vcf`` (``--include='*.vcf' --exclude='*'``), de modo
|
||
que ningún otro fichero del working tree remoto se toca: ni ``.git/`` (el
|
||
historial), ni ``.xandikos`` (metadata del tipo de colección), ni
|
||
``push-subscriptions.json`` (suscripciones WebDAV-Push), ni un ``.gitignore``.
|
||
``--delete`` borra del remoto los .vcf que ya no están en local — como local
|
||
contiene TODOS los contactos de la DB, esto deja la colección de .vcf
|
||
EXACTAMENTE igual a la DB (limpia recursos .vcf huérfanos) sin afectar a los
|
||
ficheros no-.vcf, que quedan protegidos por estar excluidos.
|
||
|
||
Returns:
|
||
{"status":"ok", "stdout":..., "stderr":...} o {"status":"error", "error":...}.
|
||
"""
|
||
src = local_dir.rstrip("/") + "/"
|
||
dst = f"{ssh_host}:{remote_dir}"
|
||
cmd = [
|
||
"rsync",
|
||
"-az",
|
||
"--delete",
|
||
"--include=*.vcf",
|
||
"--exclude=*",
|
||
src,
|
||
dst,
|
||
]
|
||
try:
|
||
proc = subprocess.run(
|
||
cmd, capture_output=True, text=True, timeout=300, check=False
|
||
)
|
||
except (OSError, subprocess.TimeoutExpired) as e:
|
||
return {"status": "error", "error": f"rsync falló: {e}"}
|
||
if proc.returncode != 0:
|
||
return {
|
||
"status": "error",
|
||
"error": f"rsync rc={proc.returncode}: {proc.stderr.strip()}",
|
||
}
|
||
return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr}
|
||
|
||
|
||
def _git_commit_remote(ssh_host: str, remote_dir: str) -> dict:
|
||
"""UN solo commit en el working tree remoto (lo que Xandikos sirve).
|
||
|
||
Hace ``git add -A`` (recoge altas, bajas y modificaciones de .vcf que dejó el
|
||
rsync) y un único commit con identidad fija ``osint_db``. El ``|| true``
|
||
evita fallar cuando no hay cambios (commit vacío). Captura el HEAD antes y
|
||
después para confirmar que SOLO se añadió un commit (o ninguno).
|
||
|
||
Returns:
|
||
{"status":"ok", "head_before":..., "head_after":..., "committed":bool} o
|
||
{"status":"error", "error":...}.
|
||
"""
|
||
# rev-parse HEAD antes.
|
||
head_before = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD")
|
||
if head_before.get("status") != "ok":
|
||
return {"status": "error", "error": f"rev-parse(before): {head_before.get('error')}"}
|
||
|
||
script = (
|
||
f"cd {remote_dir} && git add -A && "
|
||
"git -c user.email=osint_db -c user.name=osint_db "
|
||
"commit -m 'bulk push from DuckDB' || true"
|
||
)
|
||
commit = _ssh_capture(ssh_host, script)
|
||
if commit.get("status") != "ok":
|
||
return {"status": "error", "error": f"git commit: {commit.get('error')}"}
|
||
|
||
head_after = _ssh_capture(ssh_host, f"cd {remote_dir} && git rev-parse HEAD")
|
||
if head_after.get("status") != "ok":
|
||
return {"status": "error", "error": f"rev-parse(after): {head_after.get('error')}"}
|
||
|
||
before = (head_before.get("stdout") or "").strip()
|
||
after = (head_after.get("stdout") or "").strip()
|
||
return {
|
||
"status": "ok",
|
||
"head_before": before,
|
||
"head_after": after,
|
||
"committed": before != after,
|
||
}
|
||
|
||
|
||
def _ssh_capture(ssh_host: str, remote_cmd: str) -> dict:
|
||
"""Ejecuta un comando en el host remoto vía ssh y captura stdout/stderr.
|
||
|
||
Usa ``BatchMode=yes`` para fallar rápido si no hay auth por clave (nunca pide
|
||
contraseña interactiva, que colgaría el service). NO interpola secretos en el
|
||
comando — solo paths y verbos git fijos.
|
||
"""
|
||
cmd = [
|
||
"ssh",
|
||
"-o",
|
||
"BatchMode=yes",
|
||
"-o",
|
||
"ConnectTimeout=10",
|
||
ssh_host,
|
||
remote_cmd,
|
||
]
|
||
try:
|
||
proc = subprocess.run(
|
||
cmd, capture_output=True, text=True, timeout=60, check=False
|
||
)
|
||
except (OSError, subprocess.TimeoutExpired) as e:
|
||
return {"status": "error", "error": f"ssh falló: {e}"}
|
||
if proc.returncode != 0:
|
||
return {
|
||
"status": "error",
|
||
"error": f"ssh rc={proc.returncode}: {proc.stderr.strip()}",
|
||
}
|
||
return {"status": "ok", "stdout": proc.stdout, "stderr": proc.stderr}
|
||
|
||
|
||
def _capture_etags_after_bulk(
|
||
cfg: Config, pwd: str, collection: str, by_resource: dict
|
||
) -> dict:
|
||
"""Lee en UN PROPFIND los etags de la colección y los persiste por uid.
|
||
|
||
Tras el commit remoto, lista la colección entera (PROPFIND Depth:1 -> [{href,
|
||
etag}]) y casa cada recurso con su uid usando ``by_resource`` (nombre .vcf ->
|
||
uid construido al generar los ficheros). Hace un UPDATE de ``contacts.etag``
|
||
por uid encontrado, dejando los etags guardados == los del servidor, para que
|
||
el próximo ``/api/sync/dav-pull`` no lo confunda con una edición del móvil.
|
||
|
||
Returns:
|
||
{"status":"ok", "updated":N} o {"status":"error", "error":...}.
|
||
"""
|
||
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
|
||
if listing.get("status") != "ok":
|
||
return {
|
||
"status": "error",
|
||
"error": f"PROPFIND {collection}: {listing.get('error')} "
|
||
f"(http {listing.get('http_status')})",
|
||
}
|
||
updated = 0
|
||
for res in listing.get("resources", []):
|
||
href = res.get("href") or ""
|
||
etag = res.get("etag")
|
||
tail = href.rstrip("/").rsplit("/", 1)[-1]
|
||
uid = by_resource.get(tail)
|
||
if uid is None or not etag:
|
||
continue
|
||
up = duckdb_execute(
|
||
cfg.db_path, "UPDATE contacts SET etag = ? WHERE uid = ?", [etag, uid]
|
||
)
|
||
if up.get("status") == "ok":
|
||
updated += up.get("rowcount", 0) or 0
|
||
return {"status": "ok", "updated": updated}
|
||
|
||
|
||
def push_all_dav_bulk(cfg: Config) -> dict:
|
||
"""Push masivo DB -> Xandikos por DISCO: 1 rsync + 1 commit + 1 PROPFIND.
|
||
|
||
Vía RÁPIDA equivalente a ``push_all_dav`` para los CONTACTOS, pensada para
|
||
reconciliar las ~1000 fichas de la DB en segundos en vez de minutos. Evita
|
||
los tres cuellos del push HTTP secuencial:
|
||
|
||
- 1 PUT HTTP por contacto -> 1 rsync de todos los .vcf de golpe.
|
||
- 1 commit git por PUT -> 1 solo commit en el working tree remoto.
|
||
- 1 PROPFIND por contacto -> 1 PROPFIND de toda la colección al final.
|
||
|
||
Flujo:
|
||
1. Lee TODOS los contacts de la DB (uid, collection, fn, tels, emails,
|
||
note_path) de la colección CardDAV por defecto.
|
||
2. Genera el vCard de AGENDA (SIN OSINT) de cada uno en un tmpdir local,
|
||
con el nombre de recurso EXACTO del push HTTP (``_safe_resource``).
|
||
3. rsync ``--delete`` ese tmpdir al working tree remoto, sincronizando SOLO
|
||
los .vcf (protege .git/.xandikos/push-subscriptions.json).
|
||
4. UN solo commit en el remoto -> Xandikos recalcula el ctag (DAVx5 detecta).
|
||
5. UN PROPFIND de la colección -> {uid: etag} -> UPDATE de ``contacts.etag``.
|
||
|
||
Solo cubre la colección CardDAV por defecto (``cfg.dav_contacts_collection``),
|
||
donde viven todos los contactos del ecosistema OSINT hoy. Los eventos CalDAV
|
||
siguen yendo por el push HTTP (``push_all_dav``).
|
||
|
||
Requisitos: SSH por clave al host de Xandikos (``cfg.dav_bulk_ssh_host``) con
|
||
acceso de escritura al working tree (``cfg.dav_bulk_remote_dir``), y ``rsync``
|
||
instalado en ambos lados. Si no hay SSH, usar ``push_all_dav`` (HTTP) como
|
||
fallback.
|
||
|
||
Returns:
|
||
{"status":"ok", "written":N, "rsynced":bool, "committed":bool,
|
||
"etags_updated":N, "head_before":..., "head_after":..., "elapsed_s":F} o
|
||
{"status":"error", "error":...}.
|
||
"""
|
||
started = time.monotonic()
|
||
collection = _default_collection(cfg)
|
||
|
||
contacts = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT uid, collection, fn, tels, emails, note_path FROM contacts "
|
||
"WHERE collection = ?",
|
||
[collection],
|
||
1000000,
|
||
)
|
||
if contacts.get("status") != "ok":
|
||
return {"status": "error", "error": f"lectura contacts: {contacts.get('error')}"}
|
||
rows = contacts.get("rows", [])
|
||
|
||
# 1+2. Genera los .vcf de agenda en un tmpdir local (parte sin red).
|
||
tmp_dir = tempfile.mkdtemp(prefix="osint_db_bulk_")
|
||
try:
|
||
gen = _write_agenda_vcards_to_dir(cfg, rows, tmp_dir)
|
||
by_resource = gen["by_resource"]
|
||
written = gen["written"]
|
||
|
||
# 3. rsync de todos los .vcf al working tree remoto (solo *.vcf).
|
||
rs = _rsync_vcards(tmp_dir, cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir)
|
||
if rs.get("status") != "ok":
|
||
return {"status": "error", "error": rs.get("error"), "written": written}
|
||
|
||
# 4. UN solo commit en el remoto (Xandikos recalcula el ctag).
|
||
commit = _git_commit_remote(cfg.dav_bulk_ssh_host, cfg.dav_bulk_remote_dir)
|
||
if commit.get("status") != "ok":
|
||
return {"status": "error", "error": commit.get("error"), "written": written}
|
||
finally:
|
||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||
|
||
# 5. UN PROPFIND -> {uid: etag} -> UPDATE contacts.etag (sync inverso fiable).
|
||
pwd, err = _resolve_password(cfg)
|
||
etags_updated = 0
|
||
etag_error = None
|
||
if err is None:
|
||
cap = _capture_etags_after_bulk(cfg, pwd, collection, by_resource)
|
||
if cap.get("status") == "ok":
|
||
etags_updated = cap.get("updated", 0)
|
||
else:
|
||
etag_error = cap.get("error")
|
||
else:
|
||
etag_error = err
|
||
|
||
return {
|
||
"status": "ok",
|
||
"written": written,
|
||
"rsynced": True,
|
||
"committed": commit.get("committed", False),
|
||
"etags_updated": etags_updated,
|
||
"etag_error": etag_error,
|
||
"head_before": commit.get("head_before"),
|
||
"head_after": commit.get("head_after"),
|
||
"elapsed_s": round(time.monotonic() - started, 3),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# sync inverso Xandikos -> DB (pull incremental por etag)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def pull_dav(cfg: Config) -> dict:
|
||
"""Trae a la DB las ediciones del móvil/DAVx5, last-write-wins por etag.
|
||
|
||
A diferencia de ``ingest_dav`` (que hace DELETE + INSERT ciego de TODAS las
|
||
colecciones), este pull es INCREMENTAL y respeta la verdad de la DB salvo
|
||
donde el etag prueba un cambio externo:
|
||
|
||
1. Para cada colección registrada en ``addressbooks`` lista los recursos
|
||
(PROPFIND Depth:1 -> [{href, etag}]).
|
||
2. Por recurso: si su etag difiere del ``contacts.etag`` guardado (o el uid
|
||
no existe en la DB) -> GET + parse + upsert con el etag nuevo. Si el
|
||
etag coincide -> no se toca (la DB ya está al día).
|
||
3. Los uids que la DB tenía en esa colección y que YA no aparecen en el
|
||
PROPFIND se borran (el móvil los eliminó).
|
||
4. Tras el pull se re-enlazan los contactos con sus fichas y se
|
||
reconstruyen las derivadas (bajo el lock single-writer).
|
||
|
||
Devuelve {status:'ok', pulled, updated, deleted, unchanged} o
|
||
{status:'error', error}.
|
||
"""
|
||
# Late imports: evitan ciclo writes<->ingest a nivel de módulo y reusan la
|
||
# lógica de colecciones + enlace + derivadas ya existente (registry-first).
|
||
from . import davparse
|
||
from .db import write_conn
|
||
from .derived import rebuild_derived
|
||
from .ingest import _addressbook_collections, _link_contacts
|
||
|
||
pwd, err = _resolve_password(cfg)
|
||
if err is not None:
|
||
return {"status": "error", "error": err}
|
||
|
||
collections = _addressbook_collections(cfg)
|
||
|
||
pulled = updated = deleted = unchanged = 0
|
||
for collection in collections:
|
||
listing = dav_list_resources(cfg.dav_base, cfg.dav_user, pwd, collection)
|
||
if listing.get("status") != "ok":
|
||
return {
|
||
"status": "error",
|
||
"error": f"PROPFIND {collection}: {listing.get('error')} "
|
||
f"(http {listing.get('http_status')})",
|
||
}
|
||
|
||
# Estado actual de la DB para esta colección: uid -> etag.
|
||
db_state = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT uid, etag FROM contacts WHERE collection = ?",
|
||
[collection],
|
||
1000000,
|
||
)
|
||
db_etags: dict = {}
|
||
if db_state.get("status") == "ok":
|
||
db_etags = {r["uid"]: r.get("etag") for r in db_state["rows"]}
|
||
|
||
seen_uids: set = set()
|
||
for res in listing.get("resources", []):
|
||
href = res.get("href") or ""
|
||
remote_etag = res.get("etag")
|
||
# GET solo cuando el etag cambió o el recurso es nuevo en la DB.
|
||
# uid provisional desde el nombre del recurso para el cruce rápido;
|
||
# el uid real se toma del vCard tras el GET.
|
||
res_name = href.rstrip("/").rsplit("/", 1)[-1]
|
||
guess_uid = os.path.splitext(res_name)[0]
|
||
|
||
# Si ya conocemos este uid (por nombre) y el etag coincide -> skip.
|
||
if guess_uid in db_etags and db_etags[guess_uid] == remote_etag and remote_etag:
|
||
seen_uids.add(guess_uid)
|
||
unchanged += 1
|
||
continue
|
||
|
||
got = dav_get_resource(cfg.dav_base, cfg.dav_user, pwd, href)
|
||
if got.get("status") != "ok":
|
||
# Un recurso ilegible no aborta el pull entero.
|
||
continue
|
||
parsed = davparse.parse_vcard(got.get("text", ""))
|
||
uid = parsed["uid"] or guess_uid
|
||
seen_uids.add(uid)
|
||
|
||
existed = uid in db_etags
|
||
# Confirmación con el uid real: si el etag ya casa, no es cambio.
|
||
if existed and db_etags[uid] == remote_etag and remote_etag:
|
||
unchanged += 1
|
||
continue
|
||
|
||
row = {
|
||
"uid": uid,
|
||
"collection": collection,
|
||
"etag": remote_etag,
|
||
"fn": parsed["fn"] or None,
|
||
"tels": _json(parsed["tels"]),
|
||
"emails": _json(parsed["emails"]),
|
||
"raw": got.get("text", ""),
|
||
# note_path se re-enlaza después; preserva el existente si lo había.
|
||
"note_path": None,
|
||
"updated_at": _now(),
|
||
}
|
||
up = duckdb_upsert(
|
||
cfg.db_path,
|
||
"contacts",
|
||
[row],
|
||
key_cols=["uid"],
|
||
update_cols=["collection", "etag", "fn", "tels", "emails", "raw", "updated_at"],
|
||
)
|
||
if up.get("status") != "ok":
|
||
return {"status": "error", "error": up.get("error")}
|
||
pulled += 1
|
||
if existed:
|
||
updated += 1
|
||
|
||
# Borra los uids que la DB tenía en esta colección y ya no están remotos.
|
||
for uid in db_etags:
|
||
if uid not in seen_uids:
|
||
rm = duckdb_execute(
|
||
cfg.db_path, "DELETE FROM contacts WHERE uid = ?", [uid]
|
||
)
|
||
if rm.get("status") == "ok":
|
||
deleted += rm.get("rowcount", 0)
|
||
|
||
# Re-enlace de contactos + reconstrucción de derivadas, bajo el lock.
|
||
with write_conn(cfg.db_path) as conn:
|
||
conn.execute("BEGIN")
|
||
try:
|
||
_link_contacts(conn)
|
||
conn.execute("COMMIT")
|
||
except Exception:
|
||
conn.execute("ROLLBACK")
|
||
raise
|
||
rebuild_derived(conn)
|
||
|
||
return {
|
||
"status": "ok",
|
||
"pulled": pulled,
|
||
"updated": updated,
|
||
"deleted": deleted,
|
||
"unchanged": unchanged,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Materialización de los contactos de una organización en su ficha .md
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# network_scans (escaneos de red registrados por otras herramientas)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _scan_ts_compact(scan_ts: str) -> tuple:
|
||
"""Deriva (datetime, 'YYYYMMDD-HHMM') de un timestamp ISO del escaneo.
|
||
|
||
Acepta ISO 8601 con o sin 'Z'/offset. Devuelve (dt, compact) o lanza
|
||
ValueError si el string no es parseable (el caller lo convierte en error).
|
||
"""
|
||
raw = (scan_ts or "").strip()
|
||
if not raw:
|
||
raise ValueError("scan_ts vacío")
|
||
# datetime.fromisoformat no acepta el sufijo 'Z' (Python < 3.11), lo
|
||
# normalizamos a +00:00 para que parsee el caso UTC más común.
|
||
iso = raw[:-1] + "+00:00" if raw.endswith("Z") else raw
|
||
dt = datetime.fromisoformat(iso)
|
||
return dt, dt.strftime("%Y%m%d-%H%M")
|
||
|
||
|
||
def record_scan(cfg: Config, fields: dict) -> dict:
|
||
"""Registra un escaneo de red en network_scans (idempotente por id).
|
||
|
||
El id se deriva de ``<target_slug>:<scan_type>:<ts_compact>`` donde
|
||
ts_compact = YYYYMMDD-HHMM de ``scan_ts``. Dos escaneos del mismo target,
|
||
tipo y minuto colapsan al mismo id: el upsert (ON CONFLICT DO UPDATE) los
|
||
hace idempotentes en vez de duplicar. La escritura va bajo el lock
|
||
single-writer del service vía ``duckdb_upsert`` (misma conexión que el resto
|
||
de endpoints de escritura; DuckDB comparte la instancia de la base dentro del
|
||
proceso, así que no rompe el lock).
|
||
|
||
``summary`` es un dict que se serializa a JSON (la columna es de tipo JSON).
|
||
No se pushea nada a la red: a diferencia de contacts/events, los escaneos no
|
||
salen del service.
|
||
|
||
Args:
|
||
cfg: configuración del service (db_path).
|
||
fields: dict con target, target_slug, scan_type, note_path (requeridos),
|
||
tool, summary, scan_ts (opcionales/derivables).
|
||
|
||
Returns:
|
||
{"status":"ok","id":<id>, "inserted":N, "updated":N} o
|
||
{"status":"error","error":...}.
|
||
"""
|
||
target = (fields.get("target") or "").strip()
|
||
target_slug = (fields.get("target_slug") or "").strip()
|
||
scan_type = (fields.get("scan_type") or "").strip()
|
||
note_path = (fields.get("note_path") or "").strip()
|
||
if not target:
|
||
return {"status": "error", "error": "falta 'target'"}
|
||
if not target_slug:
|
||
return {"status": "error", "error": "falta 'target_slug'"}
|
||
if not scan_type:
|
||
return {"status": "error", "error": "falta 'scan_type'"}
|
||
if not note_path:
|
||
return {"status": "error", "error": "falta 'note_path'"}
|
||
|
||
try:
|
||
scan_dt, ts_compact = _scan_ts_compact(fields.get("scan_ts"))
|
||
except ValueError as e:
|
||
return {"status": "error", "error": f"scan_ts inválido: {e}"}
|
||
|
||
scan_id = f"{target_slug}:{scan_type}:{ts_compact}"
|
||
summary = fields.get("summary")
|
||
row = {
|
||
"id": scan_id,
|
||
"target": target,
|
||
"target_slug": target_slug,
|
||
"scan_type": scan_type,
|
||
"tool": fields.get("tool"),
|
||
"scan_ts": scan_dt,
|
||
"note_path": note_path,
|
||
"summary": _json(summary) if summary is not None else None,
|
||
"created_at": _now(),
|
||
}
|
||
res = duckdb_upsert(
|
||
cfg.db_path,
|
||
"network_scans",
|
||
[row],
|
||
key_cols=["id"],
|
||
update_cols=[
|
||
"target",
|
||
"target_slug",
|
||
"scan_type",
|
||
"tool",
|
||
"scan_ts",
|
||
"note_path",
|
||
"summary",
|
||
],
|
||
)
|
||
if res.get("status") != "ok":
|
||
return {"status": "error", "error": res.get("error")}
|
||
return {
|
||
"status": "ok",
|
||
"id": scan_id,
|
||
"inserted": res.get("inserted", 0),
|
||
"updated": res.get("updated", 0),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Materialización de los contactos de una organización en su ficha .md
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def render_org_contacts(cfg: Config, slug: str) -> dict:
|
||
"""Escribe en la ficha de una organización la tabla de sus contactos.
|
||
|
||
Lee de ``derived.org_contacts`` las personas relacionadas con la organización
|
||
(con su rol y teléfono) y vuelca una tabla Markdown dentro de un bloque
|
||
sentinel ``org-contacts`` en el cuerpo de la nota, sin tocar el resto de la
|
||
prosa. Idempotente: re-renderizar reescribe solo ese bloque.
|
||
|
||
Devuelve {status:'ok', slug, contactos:N, note_path} o {status:'skip'|'error'}.
|
||
"""
|
||
slug = (slug or "").strip()
|
||
if not slug:
|
||
return {"status": "error", "error": "slug vacío"}
|
||
|
||
org = duckdb_query_readonly(
|
||
cfg.db_path, "SELECT note_path FROM organizations WHERE slug = ?", [slug], 1
|
||
)
|
||
if org.get("status") != "ok" or not org.get("rows"):
|
||
return {"status": "error", "error": f"organización desconocida: {slug!r}"}
|
||
note_path = org["rows"][0].get("note_path")
|
||
if not note_path:
|
||
return {"status": "skip", "slug": slug, "reason": "sin note_path"}
|
||
|
||
contacts = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT persona, rol, telefono FROM derived.org_contacts "
|
||
"WHERE org_slug = ? ORDER BY persona, telefono",
|
||
[slug],
|
||
500,
|
||
)
|
||
rows = contacts.get("rows") or []
|
||
if not rows:
|
||
return {"status": "skip", "slug": slug, "reason": "sin contactos"}
|
||
|
||
table_md = render_markdown_table(
|
||
rows, columns=["persona", "rol", "telefono"], max_rows=500
|
||
)
|
||
content = "### Contactos\n\n" + (table_md or "_(sin contactos)_")
|
||
|
||
rel = note_path if note_path.endswith(".md") else note_path + ".md"
|
||
abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel))
|
||
vault_real = os.path.realpath(cfg.vault_dir)
|
||
if not os.path.realpath(abs_path).startswith(vault_real + os.sep):
|
||
return {"status": "error", "error": f"note_path fuera del vault: {rel!r}"}
|
||
if not os.path.exists(abs_path):
|
||
return {"status": "skip", "slug": slug, "reason": "nota inexistente"}
|
||
|
||
note = read_obsidian_note(abs_path)
|
||
new_body = upsert_sentinel_block(
|
||
note.get("body", "") or "", "org-contacts", content, marker=SENTINEL_MARKER
|
||
)
|
||
update_obsidian_note(abs_path, body=new_body)
|
||
return {
|
||
"status": "ok",
|
||
"slug": slug,
|
||
"contactos": len(rows),
|
||
"note_path": rel,
|
||
}
|
||
|
||
|
||
def render_all_org_contacts(cfg: Config) -> dict:
|
||
"""Materializa la tabla de contactos en TODAS las organizaciones que tengan.
|
||
|
||
Recorre las organizaciones con al menos una fila en ``derived.org_contacts``
|
||
y llama a ``render_org_contacts`` por cada una. Devuelve conteos agregados.
|
||
"""
|
||
orgs = duckdb_query_readonly(
|
||
cfg.db_path,
|
||
"SELECT DISTINCT org_slug FROM derived.org_contacts ORDER BY org_slug",
|
||
[],
|
||
100000,
|
||
)
|
||
rendered = skipped = errors = 0
|
||
for row in orgs.get("rows") or []:
|
||
res = render_org_contacts(cfg, row["org_slug"])
|
||
status = res.get("status")
|
||
if status == "ok":
|
||
rendered += 1
|
||
elif status == "skip":
|
||
skipped += 1
|
||
else:
|
||
errors += 1
|
||
return {
|
||
"status": "ok",
|
||
"rendered": rendered,
|
||
"skipped": skipped,
|
||
"errors": errors,
|
||
}
|