merge: perf(dav) multiget + cache en disco por ctag

This commit is contained in:
2026-06-12 00:08:07 +02:00
3 changed files with 284 additions and 114 deletions
+3
View File
@@ -6,3 +6,6 @@ server.log
node_modules/
frontend/dist/
local_files/
# Caché en disco de los datos DAV (datos personales sensibles + regenerable)
server/.cache/
+213 -86
View File
@@ -41,18 +41,14 @@ from __future__ import annotations
import argparse
import importlib.util
import json
import os
import re
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
import time
from typing import Optional
# Nº de descargas DAV concurrentes al traer una colección completa (addressbook
# con ~1000 vCards). Secuencial son ~0.11s/recurso (~2 min para 1064); con un
# pool acotado baja a ~10s. Acotado para no saturar al servidor Xandikos.
_DAV_FETCH_WORKERS = 16
def _registry_functions_dir() -> str:
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
@@ -134,8 +130,12 @@ def _load_infra_fn(module_name: str, attr: str):
# --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass ---
dav_list_resources = _load_infra_fn("dav_list_resources", "dav_list_resources")
dav_get_resource = _load_infra_fn("dav_get_resource", "dav_get_resource")
# dav_get_collection trae TODOS los recursos (vCards / VCALENDARs) de una
# colección en UNA petición REPORT con el contenido inline; dav_collection_ctag
# lee el ctag de la colección (PROPFIND Depth:0 barato) para validar la caché en
# disco sin descargar nada cuando nada cambió.
dav_get_collection = _load_infra_fn("dav_get_collection", "dav_get_collection")
dav_collection_ctag = _load_infra_fn("dav_collection_ctag", "dav_collection_ctag")
split_vcards = _load_infra_fn("split_vcards", "split_vcards")
pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret")
@@ -150,6 +150,15 @@ XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel"
XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/"
XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/"
# Caché en disco de los datos DAV ya parseados, indexada por el ctag de la
# colección. Al arrancar el server lee el ctag (PROPFIND barato ~0.1s) y, si
# coincide con el de la caché en disco, sirve los contactos/eventos ya parseados
# sin descargar ni reparsear nada (arranque instantáneo). El directorio vive
# junto al server y está gitignored (datos personales sensibles + regenerable).
_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache")
_CONTACTS_CACHE_FILE = os.path.join(_CACHE_DIR, "contacts.json")
_CALENDAR_CACHE_FILE = os.path.join(_CACHE_DIR, "calendar.json")
# Extensiones de imagen que el frontend muestra en la galería con lightbox.
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
@@ -194,6 +203,49 @@ def _read_pass_secret(entry: str) -> str:
return value.strip()
# ---------------------------------------------------------------------------
# Caché en disco de los datos DAV ya parseados (indexada por ctag)
# ---------------------------------------------------------------------------
def _read_disk_cache(path: str) -> Optional[dict]:
"""Lee una caché DAV del disco: ``{"ctag": str, "items": list}`` o None.
Devuelve None (recargar de la red) ante cualquier problema: archivo
inexistente, JSON corrupto, o estructura inesperada. Nunca lanza: la caché
es un acelerador, no una fuente de verdad — si falla, se cae a la descarga.
"""
try:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh)
except (OSError, ValueError):
return None
if not isinstance(data, dict) or not isinstance(data.get("items"), list):
return None
return data
def _write_disk_cache(path: str, ctag: str, items: list) -> None:
"""Escribe la caché DAV al disco de forma atómica (tmp + rename).
Persiste ``{"ctag": ctag, "items": items, "saved_at": epoch}``. Errores de
escritura se ignoran (no deben tumbar el endpoint): la caché en memoria sigue
sirviendo y el disco se reintentará en el siguiente refresco.
"""
try:
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as fh:
json.dump(
{"ctag": ctag, "items": items, "saved_at": time.time()},
fh,
ensure_ascii=False,
)
os.replace(tmp, path)
except OSError:
pass
# ---------------------------------------------------------------------------
# Estado del servidor: caché del vault + password Xandikos
# ---------------------------------------------------------------------------
@@ -226,9 +278,15 @@ class VaultState:
self._xandikos_password: Optional[str] = None
# Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al
# primer acceso y se invalidan en POST /api/refresh. None = sin cargar.
# Cada caché lleva su ctag para servir del disco sin red cuando la
# colección no cambió. _force_reload (set por /api/refresh) salta la
# validación de ctag en el siguiente acceso.
self._dav_lock = threading.Lock()
self._contacts_cache: Optional[list] = None
self._calendar_cache: Optional[list] = None
self._contacts_ctag: Optional[str] = None
self._calendar_ctag: Optional[str] = None
self._force_reload = False
self.refresh()
# --- vault --------------------------------------------------------------
@@ -411,50 +469,148 @@ class VaultState:
self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY)
return self._xandikos_password
def contacts(self) -> list:
"""Contactos del addressbook Xandikos, parseados y cacheados en memoria.
def _collection_ctag(self, collection_path: str, password: str) -> Optional[str]:
"""Lee el ctag de una colección (PROPFIND barato), o None si no se puede.
Llena la caché al primer acceso (descarga + parseo de todos los
``.vcf``); accesos posteriores la reutilizan hasta ``invalidate_dav()``.
El ctag es el token de versión de la colección: si no cambió, la caché en
disco sigue vigente. Devolver None significa "no pude validar" → se
recarga de la red por seguridad (nunca se sirve caché potencialmente
obsoleta sin confirmación). No lanza.
"""
res = dav_collection_ctag(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path
)
return res.get("ctag") if res.get("status") == "ok" else None
def _load_collection(
self,
collection_path: str,
content_type: str,
cache_file: str,
parse_items,
) -> tuple:
"""Carga una colección DAV con caché en disco validada por ctag.
Flujo (1 o 2 peticiones, nunca N):
1. Lee el ctag de la colección (PROPFIND Depth:0, ~0.1s).
2. Si el ctag coincide con el de la caché en disco (y no hay refresh
forzado), parsea los items del disco y devuelve sin descargar.
3. Si no, hace UN REPORT ``dav_get_collection`` que trae todos los
recursos con su contenido inline, los parsea con ``parse_items`` y
reescribe la caché en disco con el nuevo ctag.
``parse_items(resources) -> list`` transforma la lista
``[{href, etag, data}]`` del registry en la lista de objetos JSON que
sirve el endpoint (contactos o eventos).
Returns:
tuple ``(items, ctag)``.
Raises:
DavUnavailable: si Xandikos no responde al REPORT cuando hay que
descargar (sin red, timeout, auth).
"""
password = self.xandikos_password()
ctag = self._collection_ctag(collection_path, password)
# Caché en disco vigente: mismo ctag y sin refresh forzado → sin red.
if ctag is not None and not self._force_reload:
disk = _read_disk_cache(cache_file)
if disk is not None and disk.get("ctag") == ctag:
return parse_items(disk["items"]), ctag
# Hay que (re)descargar: UN REPORT trae todo con el contenido inline.
got = dav_get_collection(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
collection_path,
content_type,
)
if got.get("status") != "ok":
raise DavUnavailable("Xandikos no responde: %s" % got.get("error"))
resources = got.get("resources", [])
items = parse_items(resources)
# Persistir en disco para el arranque instantáneo de la próxima vez. Si
# no obtuvimos ctag, guardamos cadena vacía: nunca matcheará un ctag real,
# así que la próxima vez se revalidará (cae con elegancia a "siempre
# recargar" sin romper).
_write_disk_cache(cache_file, ctag or "", items)
return items, (ctag or "")
@staticmethod
def _parse_contacts(items: list) -> list:
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a contactos JSON.
Acepta dos formas de ``items``: la lista de recursos del registry (cada
uno con ``data`` = texto vCard, posible multi-tarjeta) que hay que
parsear, o la lista de contactos ya parseados (caché en disco), que se
devuelve tal cual. Se distinguen por la presencia de la clave ``data``.
"""
if items and "data" in items[0]:
contacts: list = []
for res in items:
href = res.get("href")
for card_text in split_vcards(res.get("data", "")):
card = _vcard_to_json(card_text)
card["etag"] = res.get("etag")
card["href"] = href
contacts.append(card)
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
return contacts
return list(items)
@staticmethod
def _parse_events(items: list) -> list:
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a eventos JSON.
Igual criterio que ``_parse_contacts``: si los items llevan ``data`` son
recursos del registry (texto VCALENDAR) y se parsean; si no, ya son
eventos cacheados y se devuelven tal cual.
"""
if items and "data" in items[0]:
events: list = []
for res in items:
href = res.get("href")
for event in _vcalendar_to_events(res.get("data", "")):
event["etag"] = res.get("etag")
event["href"] = href
events.append(event)
events.sort(key=lambda e: e.get("dtstart") or "")
return events
return list(items)
def contacts(self) -> list:
"""Contactos del addressbook Xandikos, parseados y cacheados.
Caché en dos niveles: memoria (mientras vive el proceso) y disco
(``.cache/contacts.json``, validada por ctag para arranque instantáneo).
Al primer acceso descarga TODO en UNA petición REPORT
(``dav_get_collection``) en vez de un GET por ``.vcf``.
Raises:
RuntimeError: si no se puede leer la password de ``pass``.
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._contacts_cache is not None:
if self._contacts_cache is not None and not self._force_reload:
return self._contacts_cache
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
contacts, ctag = self._load_collection(
XANDIKOS_CONTACTS_COLLECTION,
"vcard",
_CONTACTS_CACHE_FILE,
self._parse_contacts,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
contacts: list = []
# Descarga concurrente de los .vcf: secuencial son ~0.11s/recurso
# (~2 min para 1064 contactos); con el pool acotado baja a ~10s.
for res, got in self._fetch_resources(
listing.get("resources", []), ".vcf", password
):
href = res.get("href")
for card_text in split_vcards(got.get("text", "")):
card = _vcard_to_json(card_text)
card["etag"] = res.get("etag")
card["href"] = href
contacts.append(card)
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
self._contacts_cache = contacts
self._contacts_ctag = ctag
self._maybe_clear_force_reload()
return contacts
def calendar(self, dt_from: str = "", dt_to: str = "") -> list:
"""Eventos del calendario Xandikos, cacheados; filtrados por rango.
La descarga + parseo completos se cachean; el filtro por ``[from, to]``
Misma caché en dos niveles que ``contacts``. La descarga + parseo
completos se cachean (UNA petición REPORT); el filtro por ``[from, to]``
se aplica sobre la caché en cada llamada (barato). Sin ``from``/``to``
devuelve todos.
@@ -463,71 +619,42 @@ class VaultState:
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._calendar_cache is None:
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
if self._calendar_cache is None or self._force_reload:
events, ctag = self._load_collection(
XANDIKOS_CALENDAR_COLLECTION,
"ical",
_CALENDAR_CACHE_FILE,
self._parse_events,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
events: list = []
for res, got in self._fetch_resources(
listing.get("resources", []), ".ics", password
):
href = res.get("href")
for event in _vcalendar_to_events(got.get("text", "")):
event["etag"] = res.get("etag")
event["href"] = href
events.append(event)
events.sort(key=lambda e: e.get("dtstart") or "")
self._calendar_cache = events
self._calendar_ctag = ctag
self._maybe_clear_force_reload()
all_events = self._calendar_cache
if not dt_from and not dt_to:
return list(all_events)
return [e for e in all_events if _event_in_range(e, dt_from, dt_to)]
def _fetch_resources(self, resources: list, suffix: str, password: str) -> list:
"""Descarga en paralelo los recursos DAV con la extensión ``suffix``.
def _maybe_clear_force_reload(self) -> None:
"""Apaga el flag de refresh forzado una vez consumido por una recarga.
Filtra los recursos por extensión (``.vcf`` / ``.ics``), los descarga con
``dav_get_resource`` (función del registry) usando un pool acotado de
hilos (``_DAV_FETCH_WORKERS``) y devuelve la lista de pares
``(res, got)`` de los que respondieron ``status == "ok"``, preservando el
orden del listing. La paralelización es solo de la orquestación: la
descarga sigue delegada a la función del registry, que es stdlib y
thread-safe (abre su propia conexión por request). Acotar el pool evita
saturar al servidor Xandikos.
Llamado bajo ``_dav_lock`` tras recargar una colección. El flag lo activa
``invalidate_dav`` (POST /api/refresh) para forzar UNA recarga que ignore
el ctag; tras ella vuelve a la validación normal por ctag.
"""
targets = [
res
for res in resources
if res.get("href") and res["href"].lower().endswith(suffix)
]
if not targets:
return []
def _get(res):
got = dav_get_resource(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, res["href"]
)
return res, got
workers = min(_DAV_FETCH_WORKERS, len(targets))
with ThreadPoolExecutor(max_workers=workers) as pool:
# pool.map preserva el orden de entrada.
results = list(pool.map(_get, targets))
return [(res, got) for res, got in results if got.get("status") == "ok"]
self._force_reload = False
def invalidate_dav(self) -> None:
"""Vacía las cachés de contactos y calendario (no la password)."""
"""Vacía las cachés de contactos y calendario y fuerza una recarga.
Limpia las cachés en memoria y marca ``_force_reload`` para que el
siguiente acceso a cada colección ignore el ctag cacheado y vuelva a
descargar del servidor (el botón "refrescar" debe traer cambios aunque el
ctag no se haya actualizado todavía). No borra la password.
"""
with self._dav_lock:
self._contacts_cache = None
self._calendar_cache = None
self._force_reload = True
# ---------------------------------------------------------------------------
+68 -28
View File
@@ -241,7 +241,12 @@ def test_dav_endpoints_degrade_without_network(client, monkeypatch):
Y los endpoints del vault siguen funcionando offline (no se ven afectados).
"""
monkeypatch.setattr(
srv, "dav_list_resources", lambda *a, **k: {"status": "error", "error": "sin red"}
srv,
"dav_get_collection",
lambda *a, **k: {"status": "error", "error": "sin red"},
)
monkeypatch.setattr(
srv, "dav_collection_ctag", lambda *a, **k: {"status": "error", "error": "sin red"}
)
# Evita leer pass en el test (cachea una password ficticia).
client.app.state.vault._xandikos_password = "x"
@@ -325,39 +330,41 @@ _ICS_BODY_2 = (
@pytest.fixture()
def fake_dav(monkeypatch):
def fake_dav(monkeypatch, tmp_path):
"""Parchea las funciones del registry DAV con fixtures en memoria (sin red).
Devuelve un dict ``{"calls": int}`` que cuenta los PROPFIND para verificar
el cacheo (segunda lectura no re-llama a Xandikos).
Mockea ``dav_get_collection`` (UN REPORT que trae todos los recursos con su
contenido inline) y ``dav_collection_ctag`` (token de versión para la caché
en disco). Redirige la caché en disco a un tmpdir para no escribir en
``server/.cache``. Devuelve un dict con ``{"reports": int, "ctag": str}``:
``reports`` cuenta las descargas reales (REPORT) para verificar el cacheo, y
``ctag`` es mutable para simular un cambio en la colección.
"""
state = {"calls": 0}
state = {"reports": 0, "ctag": "ctag-v1"}
contacts_res = [
{"href": "/enmanuel/contacts/addressbook/maria-001.vcf", "etag": '"a"'},
{"href": "/enmanuel/contacts/addressbook/juan-002.vcf", "etag": '"b"'},
{"href": "/enmanuel/contacts/addressbook/maria-001.vcf", "etag": '"a"', "data": _VCF_BODY},
{"href": "/enmanuel/contacts/addressbook/juan-002.vcf", "etag": '"b"', "data": _VCF_BODY_2},
]
calendar_res = [
{"href": "/enmanuel/calendars/calendar/evt-001.ics", "etag": '"c"'},
{"href": "/enmanuel/calendars/calendar/evt-002.ics", "etag": '"d"'},
{"href": "/enmanuel/calendars/calendar/evt-001.ics", "etag": '"c"', "data": _ICS_BODY},
{"href": "/enmanuel/calendars/calendar/evt-002.ics", "etag": '"d"', "data": _ICS_BODY_2},
]
bodies = {
"/enmanuel/contacts/addressbook/maria-001.vcf": _VCF_BODY,
"/enmanuel/contacts/addressbook/juan-002.vcf": _VCF_BODY_2,
"/enmanuel/calendars/calendar/evt-001.ics": _ICS_BODY,
"/enmanuel/calendars/calendar/evt-002.ics": _ICS_BODY_2,
}
def _list(base, user, pw, collection, **kw):
state["calls"] += 1
def _get_collection(base, user, pw, collection, content_type="vcard", **kw):
state["reports"] += 1
res = contacts_res if "contacts" in collection else calendar_res
return {"status": "ok", "http_status": 207, "resources": res}
def _get(base, user, pw, href, **kw):
return {"status": "ok", "http_status": 200, "text": bodies.get(href, "")}
def _ctag(base, user, pw, collection, **kw):
return {"status": "ok", "http_status": 207, "ctag": state["ctag"]}
monkeypatch.setattr(srv, "dav_list_resources", _list)
monkeypatch.setattr(srv, "dav_get_resource", _get)
monkeypatch.setattr(srv, "dav_get_collection", _get_collection)
monkeypatch.setattr(srv, "dav_collection_ctag", _ctag)
monkeypatch.setattr(srv, "pass_get_secret", lambda *a, **k: {"status": "ok", "value": "x"})
# Caché en disco aislada por test (no toca server/.cache).
cache_dir = tmp_path / "dav_cache"
monkeypatch.setattr(srv, "_CONTACTS_CACHE_FILE", str(cache_dir / "contacts.json"))
monkeypatch.setattr(srv, "_CALENDAR_CACHE_FILE", str(cache_dir / "calendar.json"))
return state
@@ -372,10 +379,10 @@ def test_contacts_endpoint_parsea_y_cachea(client, fake_dav):
assert maria["alias"] == "Mari"
assert maria["telefonos"] == ["+34600111222"]
assert maria["osint"] == {"dni": "12345678Z", "pais": "España"}
# Segunda llamada NO re-hace PROPFIND (sirve de la caché en memoria).
calls_after_first = fake_dav["calls"]
# Segunda llamada NO re-descarga (sirve de la caché en memoria).
reports_after_first = fake_dav["reports"]
client.get("/api/contacts")
assert fake_dav["calls"] == calls_after_first
assert fake_dav["reports"] == reports_after_first
def test_contact_by_uid_desde_cache(client, fake_dav):
@@ -396,10 +403,43 @@ def test_calendar_endpoint_rango_y_cache(client, fake_dav):
def test_refresh_invalida_cache_dav(client, fake_dav):
client.get("/api/contacts") # llena caché
calls_before = fake_dav["calls"]
client.post("/api/refresh") # invalida
client.get("/api/contacts") # vuelve a hacer PROPFIND
assert fake_dav["calls"] > calls_before
reports_before = fake_dav["reports"]
client.post("/api/refresh") # invalida + fuerza recarga
client.get("/api/contacts") # vuelve a descargar (REPORT)
assert fake_dav["reports"] > reports_before
def test_disk_cache_evita_descarga_en_proceso_nuevo(vault, fake_dav):
"""Un proceso nuevo con la caché en disco y el mismo ctag NO descarga.
Simula el reinicio del server: primer cliente descarga (1 REPORT) y escribe
la caché en disco; un segundo cliente (caché en memoria vacía) con el mismo
ctag sirve del disco sin un nuevo REPORT. Esto es el arranque instantáneo.
"""
c1 = TestClient(srv.create_app(vault))
assert c1.get("/api/contacts").json()["count"] == 2
reports_after_first = fake_dav["reports"]
assert reports_after_first >= 1 # hubo descarga al no haber disco aún
# Proceso "nuevo": estado en memoria vacío, pero la caché en disco existe y
# el ctag no cambió → debe servir del disco sin descargar.
c2 = TestClient(srv.create_app(vault))
data = c2.get("/api/contacts").json()
assert data["count"] == 2
assert {x["uid"] for x in data["contacts"]} == {"maria-001", "juan-002"}
assert fake_dav["reports"] == reports_after_first # CERO descargas nuevas
def test_disk_cache_recarga_si_cambia_ctag(vault, fake_dav):
"""Si el ctag de la colección cambia, el proceso nuevo SÍ vuelve a descargar."""
c1 = TestClient(srv.create_app(vault))
c1.get("/api/contacts")
reports_after_first = fake_dav["reports"]
fake_dav["ctag"] = "ctag-v2" # la colección cambió
c2 = TestClient(srv.create_app(vault))
c2.get("/api/contacts")
assert fake_dav["reports"] > reports_after_first # re-descargó
# ---------------------------------------------------------------------------