perf: descarga DAV concurrente + caché de contactos/calendario

Las colecciones Xandikos son grandes (1064 contactos, 98 eventos). Descargar
los .vcf/.ics secuencialmente tardaba ~2 min para los contactos (timeout). Se
añade _fetch_resources con un ThreadPoolExecutor acotado (16 workers): primera
carga de /api/contacts baja a ~9s, segunda (cacheada) a ~10ms. La descarga
sigue delegada a dav_get_resource del registry (stdlib, thread-safe); solo se
paraleliza la orquestación.

Incluye caché en memoria de contactos y calendario (invalidada por
/api/refresh), DavUnavailable para degradación clara sin red, y campos
aliaseados en español (nombre/alias/telefonos/correos/osint) para el frontend.

Verificado contra el vault real (1199 nodos) y Xandikos real (1064 contactos,
98 eventos). 19 tests verdes.
This commit is contained in:
agent
2026-06-11 22:54:54 +02:00
parent 5b51e3d035
commit 59558d43cb
2 changed files with 369 additions and 108 deletions
+196 -105
View File
@@ -45,8 +45,14 @@ import os
import re
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
# Nº de descargas DAV concurrentes al traer una colección completa (addressbook
# con ~1000 vCards). Secuencial son ~0.11s/recurso (~2 min para 1064); con un
# pool acotado baja a ~10s. Acotado para no saturar al servidor Xandikos.
_DAV_FETCH_WORKERS = 16
def _registry_functions_dir() -> str:
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
@@ -148,6 +154,15 @@ XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/"
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
class DavUnavailable(Exception):
"""Xandikos no responde (sin red, timeout, auth caída).
Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un
fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los
endpoints del vault, que deben seguir funcionando offline.
"""
def _attachment_kind(name: str) -> str:
"""Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``."""
ext = os.path.splitext(name)[1].lower()
@@ -396,6 +411,124 @@ class VaultState:
self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY)
return self._xandikos_password
def contacts(self) -> list:
"""Contactos del addressbook Xandikos, parseados y cacheados en memoria.
Llena la caché al primer acceso (descarga + parseo de todos los
``.vcf``); accesos posteriores la reutilizan hasta ``invalidate_dav()``.
Raises:
RuntimeError: si no se puede leer la password de ``pass``.
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._contacts_cache is not None:
return self._contacts_cache
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
XANDIKOS_CONTACTS_COLLECTION,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
contacts: list = []
# Descarga concurrente de los .vcf: secuencial son ~0.11s/recurso
# (~2 min para 1064 contactos); con el pool acotado baja a ~10s.
for res, got in self._fetch_resources(
listing.get("resources", []), ".vcf", password
):
href = res.get("href")
for card_text in split_vcards(got.get("text", "")):
card = _vcard_to_json(card_text)
card["etag"] = res.get("etag")
card["href"] = href
contacts.append(card)
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
self._contacts_cache = contacts
return contacts
def calendar(self, dt_from: str = "", dt_to: str = "") -> list:
"""Eventos del calendario Xandikos, cacheados; filtrados por rango.
La descarga + parseo completos se cachean; el filtro por ``[from, to]``
se aplica sobre la caché en cada llamada (barato). Sin ``from``/``to``
devuelve todos.
Raises:
RuntimeError: si no se puede leer la password de ``pass``.
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._calendar_cache is None:
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
XANDIKOS_CALENDAR_COLLECTION,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
events: list = []
for res, got in self._fetch_resources(
listing.get("resources", []), ".ics", password
):
href = res.get("href")
for event in _vcalendar_to_events(got.get("text", "")):
event["etag"] = res.get("etag")
event["href"] = href
events.append(event)
events.sort(key=lambda e: e.get("dtstart") or "")
self._calendar_cache = events
all_events = self._calendar_cache
if not dt_from and not dt_to:
return list(all_events)
return [e for e in all_events if _event_in_range(e, dt_from, dt_to)]
def _fetch_resources(self, resources: list, suffix: str, password: str) -> list:
"""Descarga en paralelo los recursos DAV con la extensión ``suffix``.
Filtra los recursos por extensión (``.vcf`` / ``.ics``), los descarga con
``dav_get_resource`` (función del registry) usando un pool acotado de
hilos (``_DAV_FETCH_WORKERS``) y devuelve la lista de pares
``(res, got)`` de los que respondieron ``status == "ok"``, preservando el
orden del listing. La paralelización es solo de la orquestación: la
descarga sigue delegada a la función del registry, que es stdlib y
thread-safe (abre su propia conexión por request). Acotar el pool evita
saturar al servidor Xandikos.
"""
targets = [
res
for res in resources
if res.get("href") and res["href"].lower().endswith(suffix)
]
if not targets:
return []
def _get(res):
got = dav_get_resource(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, res["href"]
)
return res, got
workers = min(_DAV_FETCH_WORKERS, len(targets))
with ThreadPoolExecutor(max_workers=workers) as pool:
# pool.map preserva el orden de entrada.
results = list(pool.map(_get, targets))
return [(res, got) for res, got in results if got.get("status") == "ok"]
def invalidate_dav(self) -> None:
"""Vacía las cachés de contactos y calendario (no la password)."""
with self._dav_lock:
self._contacts_cache = None
self._calendar_cache = None
# ---------------------------------------------------------------------------
# Helpers DAV: parseo ligero de vCard / iCalendar a JSON
@@ -570,30 +703,22 @@ def _vcalendar_to_events(vcalendar_text: str) -> list:
def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool:
"""True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``.
La comparación es lexicográfica sobre el prefijo de fecha ``YYYYMMDD`` que
comparten todos los formatos iCal (date y date-time). ``dt_from``/``dt_to``
Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` que comparten todos
los formatos iCal (date y date-time). Los límites se normalizan quitando los
guiones, así acepta tanto el formato documentado del endpoint
(``2026-06-11``) como el iCal crudo (``20260611``). ``dt_from``/``dt_to``
vacíos desactivan ese extremo del filtro.
"""
dtstart = (event.get("dtstart") or "")[:8]
dtstart = (event.get("dtstart") or "").replace("-", "")[:8]
if not dtstart:
return True
if dt_from and dtstart < dt_from[:8]:
if dt_from and dtstart < dt_from.replace("-", "")[:8]:
return False
if dt_to and dtstart > dt_to[:8]:
if dt_to and dtstart > dt_to.replace("-", "")[:8]:
return False
return True
def _uid_to_href(uid: str, resources: list) -> Optional[str]:
"""Localiza el href de un recurso DAV cuyo último segmento contiene el uid."""
for res in resources:
href = res.get("href", "")
tail = href.rstrip("/").rsplit("/", 1)[-1]
if uid in tail or tail.startswith(uid):
return href
return None
# ---------------------------------------------------------------------------
# Construcción de la app FastAPI
# ---------------------------------------------------------------------------
@@ -679,127 +804,93 @@ def create_app(vault_dir: str) -> FastAPI:
@app.get("/api/contacts")
def api_contacts() -> JSONResponse:
"""Contactos del addressbook Xandikos, parseados a JSON.
"""Contactos del addressbook Xandikos, parseados a JSON (cacheados).
Lista los recursos de la colección CardDAV, descarga cada ``.vcf`` y lo
parsea con ``split_vcards`` + parseo ligero. Si Xandikos no responde
(sin red) devuelve un error claro (502), no un crash.
Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[],
osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``).
La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la
invalida). Si Xandikos no responde o falta la password → 503 con un JSON
de error claro, nunca un crash.
"""
try:
password = state.xandikos_password()
except RuntimeError as exc:
return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)})
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
XANDIKOS_CONTACTS_COLLECTION,
)
if listing.get("status") != "ok":
contacts = state.contacts()
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=502,
content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")},
status_code=503, content={"status": "error", "error": str(exc)}
)
contacts: list = []
for res in listing.get("resources", []):
href = res.get("href")
if not href or not href.lower().endswith(".vcf"):
continue
got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href)
if got.get("status") != "ok":
continue
for card_text in split_vcards(got.get("text", "")):
card = _vcard_to_json(card_text)
card["etag"] = res.get("etag")
contacts.append(card)
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
return JSONResponse(content={"status": "ok", "count": len(contacts), "contacts": contacts})
return JSONResponse(
content={"status": "ok", "count": len(contacts), "contacts": contacts}
)
@app.get("/api/contact/{uid}")
def api_contact(uid: str) -> JSONResponse:
"""Un vCard concreto (por UID) a JSON."""
try:
password = state.xandikos_password()
except RuntimeError as exc:
return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)})
"""Un contacto concreto (por UID) parseado a JSON, desde la caché.
listing = dav_list_resources(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CONTACTS_COLLECTION
)
if listing.get("status") != "ok":
Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo
completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no
responde o falta la password.
"""
try:
contacts = state.contacts()
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=502,
content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")},
status_code=503, content={"status": "error", "error": str(exc)}
)
href = _uid_to_href(uid, listing.get("resources", []))
if not href:
raise HTTPException(status_code=404, detail="contacto '%s' no encontrado" % uid)
got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href)
if got.get("status") != "ok":
return JSONResponse(
status_code=502,
content={"status": "error", "error": "no se pudo descargar el vCard"},
match = next((c for c in contacts if c.get("uid") == uid), None)
if match is None:
# Tolerancia: aceptar también el segmento final del href (nombre del
# recurso .vcf) cuando el UID no coincide literalmente.
match = next(
(
c
for c in contacts
if uid in (c.get("href") or "").rsplit("/", 1)[-1]
),
None,
)
cards = split_vcards(got.get("text", ""))
if not cards:
raise HTTPException(status_code=404, detail="vCard vacío")
return JSONResponse(content={"status": "ok", "contact": _vcard_to_json(cards[0])})
if match is None:
raise HTTPException(
status_code=404, detail="contacto '%s' no encontrado" % uid
)
return JSONResponse(content={"status": "ok", "contact": match})
# -- Xandikos: calendario (CalDAV) --
@app.get("/api/calendar")
def api_calendar(
from_: str = Query("", alias="from", description="fecha inicio YYYYMMDD"),
to: str = Query("", description="fecha fin YYYYMMDD"),
from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"),
to: str = Query("", description="fecha fin YYYY-MM-DD"),
) -> JSONResponse:
"""Eventos del calendario Xandikos en ``[from, to]``, parseados a JSON.
"""Eventos del calendario Xandikos en ``[from, to]`` (cacheados).
Lista los recursos de la colección CalDAV, descarga cada ``.ics``,
extrae sus VEVENT y los filtra por DTSTART dentro del rango. Sin red ->
error claro (502), no crash.
Cada evento: ``{uid, summary, dtstart, dtend, location, description}``.
La descarga + parseo completos se cachean (``POST /api/refresh`` los
invalida); el filtro por rango se aplica sobre la caché. Sin ``from``/
``to`` devuelve todos. Si Xandikos no responde o falta la password →
503 con JSON de error claro, nunca un crash.
"""
try:
password = state.xandikos_password()
except RuntimeError as exc:
return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)})
listing = dav_list_resources(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CALENDAR_COLLECTION
)
if listing.get("status") != "ok":
events = state.calendar(from_, to)
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=502,
content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")},
status_code=503, content={"status": "error", "error": str(exc)}
)
events: list = []
for res in listing.get("resources", []):
href = res.get("href")
if not href or not href.lower().endswith(".ics"):
continue
got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href)
if got.get("status") != "ok":
continue
for event in _vcalendar_to_events(got.get("text", "")):
if _event_in_range(event, from_, to):
events.append(event)
events.sort(key=lambda e: e.get("dtstart") or "")
return JSONResponse(content={"status": "ok", "count": len(events), "events": events})
return JSONResponse(
content={"status": "ok", "count": len(events), "events": events}
)
# -- Refresco de cachés --
@app.post("/api/refresh")
def api_refresh() -> dict:
"""Invalida y reconstruye la caché del grafo del vault.
"""Reconstruye la caché del grafo del vault e invalida las cachés DAV.
Los datos DAV no se cachean, así que esto solo afecta al grafo/tablas del
vault. Devuelve el conteo del grafo recién reconstruido.
Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y
calendario, que se recargarán perezosamente en el siguiente acceso.
Devuelve el conteo del grafo recién reconstruido.
"""
summary = state.refresh()
state.invalidate_dav()
return {"status": "refreshed", **summary}
return app