f5d15a9f7b
- _vcard_escape elimina el retorno de carro crudo: cubre tanto el vCard como SUMMARY/LOCATION del VEVENT (que reusan este escape), cerrando la inyeccion de propiedades iCal/vCard via un \r sin \n. - Middleware que rechaza las peticiones mutantes marcadas cross-site por el navegador (Sec-Fetch-Site), cerrando el CSRF residual de los POST simples sin preflight (p.ej. /api/refresh) que el TrustedHost no filtra. - La cache DAV en disco (.cache/*.json, contiene PII) se crea con permisos 0600 via O_CREAT 0600 + os.chmod, sin depender del umask del proceso. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2687 lines
110 KiB
Python
2687 lines
110 KiB
Python
#!/usr/bin/env python3
|
|
"""Backend FastAPI de la app osint_web.
|
|
|
|
Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres
|
|
fuentes de datos para un frontend web local:
|
|
|
|
1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo,
|
|
fichas con galería de attachments y búsqueda global).
|
|
2. La agenda CardDAV del servidor Xandikos (contactos).
|
|
3. El calendario CalDAV del servidor Xandikos (eventos).
|
|
|
|
Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de
|
|
embeds ni protocolo DAV. Orquesta funciones del registry de los grupos
|
|
``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret``
|
|
para la credencial de Xandikos. La única lógica propia de la app es: cacheo en
|
|
memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y
|
|
el parseo ligero de vCard/iCalendar a JSON.
|
|
|
|
Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que
|
|
el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint
|
|
``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado
|
|
cae dentro del vault; cualquier intento de path traversal devuelve 403.
|
|
|
|
Uso:
|
|
python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470
|
|
|
|
Endpoints (JSON salvo /api/attachment):
|
|
GET /api/health estado + tamaño del grafo cacheado
|
|
GET /api/graph grafo completo {nodes, edges} para sigma.js
|
|
GET /api/nodes?tipo=persona filas de la tabla de ese tipo
|
|
GET /api/node/<slug> ficha: frontmatter + body + attachments
|
|
GET /api/attachment?path=.. binario del attachment (path relativo al vault)
|
|
GET /api/search?q=... nodos cuyo contenido matchea la query
|
|
GET /api/contacts contactos (Xandikos por defecto; osint_db si flag)
|
|
GET /api/contact/<uid> un vCard concreto a JSON
|
|
GET /api/addressbooks libretas de contactos (selector del frontend)
|
|
POST /api/addressbooks crea una libreta nueva (requiere OSINT_DB_BACKEND)
|
|
GET /api/calendars colecciones de calendario bajo /enmanuel/calendars/
|
|
GET /api/calendar?cal=&from=&to= eventos de una colección del calendario (CalDAV)
|
|
POST /api/event crea un VEVENT en una colección de calendario
|
|
PUT /api/event/<uid> edita un VEVENT existente
|
|
DELETE /api/event/<uid> borra un VEVENT
|
|
POST /api/refresh re-escanea el vault y reconstruye la caché
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional
|
|
|
|
try:
|
|
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
|
|
except ImportError: # pragma: no cover - Python < 3.9 sin tzdata
|
|
ZoneInfo = None # type: ignore[assignment]
|
|
|
|
class ZoneInfoNotFoundError(Exception): # type: ignore[no-redef]
|
|
pass
|
|
|
|
|
|
def _registry_functions_dir() -> str:
|
|
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
|
|
|
|
Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y
|
|
``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo
|
|
hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por
|
|
último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el
|
|
backend funciona en cualquier PC con el layout estándar del registry (la app
|
|
vive en ``<root>/projects/osint/apps/osint_web/server/``) sin hardcodear el
|
|
home de un usuario concreto (memoria ``hardcoded-lucas-paths``).
|
|
"""
|
|
env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS")
|
|
if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")):
|
|
return env_functions
|
|
|
|
candidates: list[str] = []
|
|
env_root = os.environ.get("FN_REGISTRY_ROOT")
|
|
if env_root:
|
|
candidates.append(env_root)
|
|
current = os.path.dirname(os.path.abspath(__file__))
|
|
while True:
|
|
candidates.append(current)
|
|
parent = os.path.dirname(current)
|
|
if parent == current:
|
|
break
|
|
current = parent
|
|
candidates.append("/home/enmanuel/fn_registry")
|
|
for root in candidates:
|
|
functions_dir = os.path.join(root, "python", "functions")
|
|
if os.path.isdir(os.path.join(functions_dir, "obsidian")):
|
|
return functions_dir
|
|
raise RuntimeError(
|
|
"no se encontró python/functions/obsidian subiendo desde "
|
|
f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz "
|
|
"del fn_registry"
|
|
)
|
|
|
|
|
|
_FUNCTIONS_DIR = _registry_functions_dir()
|
|
sys.path.insert(0, _FUNCTIONS_DIR)
|
|
|
|
from fastapi import Body, FastAPI, HTTPException, Query # noqa: E402
|
|
from fastapi.responses import FileResponse, JSONResponse # noqa: E402
|
|
from pydantic import BaseModel, Field # noqa: E402
|
|
|
|
# --- Grupo de capacidad obsidian (parseo del vault) ---
|
|
# El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así
|
|
# que se importa directamente.
|
|
from obsidian import ( # noqa: E402 (sys.path debe resolverse antes)
|
|
build_obsidian_graph,
|
|
create_obsidian_note,
|
|
delete_obsidian_note,
|
|
extract_obsidian_embeds,
|
|
list_obsidian_notes,
|
|
read_obsidian_note,
|
|
resolve_obsidian_embed,
|
|
search_obsidian_notes,
|
|
slugify_obsidian_name,
|
|
update_obsidian_note,
|
|
)
|
|
|
|
|
|
def _load_infra_fn(module_name: str, attr: str):
|
|
"""Carga una función del paquete ``infra`` por archivo, sin tocar su __init__.
|
|
|
|
El ``infra/__init__.py`` del registry importa de forma eager
|
|
``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que
|
|
esta app no usa. Importar ``from infra.dav_list_resources import ...``
|
|
arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se
|
|
carga cada módulo concreto del grupo ``dav`` directamente por su ruta de
|
|
archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo
|
|
registry-first: se usa la función del registry sin reimplementarla, solo se
|
|
importa de forma quirúrgica.
|
|
"""
|
|
file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py")
|
|
spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path)
|
|
if spec is None or spec.loader is None: # pragma: no cover - defensivo
|
|
raise ImportError("no se pudo cargar %s" % file_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return getattr(module, attr)
|
|
|
|
|
|
# --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass ---
|
|
# dav_get_collection trae TODOS los recursos (vCards / VCALENDARs) de una
|
|
# colección en UNA petición REPORT con el contenido inline; dav_collection_ctag
|
|
# lee el ctag de la colección (PROPFIND Depth:0 barato) para validar la caché en
|
|
# disco sin descargar nada cuando nada cambió.
|
|
dav_get_collection = _load_infra_fn("dav_get_collection", "dav_get_collection")
|
|
dav_collection_ctag = _load_infra_fn("dav_collection_ctag", "dav_collection_ctag")
|
|
split_vcards = _load_infra_fn("split_vcards", "split_vcards")
|
|
pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret")
|
|
# Escritura CardDAV: PUT (crear/editar) y DELETE (borrar) un vCard. El cambio en
|
|
# el vault .md es la fuente de verdad; estas reflejan el cambio en Xandikos de
|
|
# inmediato para que la app y el móvil lo vean ya, sin esperar al sync periódico.
|
|
carddav_put_vcard = _load_infra_fn("carddav_put_vcard", "carddav_put_vcard")
|
|
dav_delete_resource = _load_infra_fn("dav_delete_resource", "dav_delete_resource")
|
|
# Calendario (CalDAV): crear/editar eventos (PUT de un VCALENDAR por UID) y listar
|
|
# las colecciones de calendario del usuario con su nombre y color.
|
|
caldav_put_event = _load_infra_fn("caldav_put_event", "caldav_put_event")
|
|
dav_list_calendars = _load_infra_fn("dav_list_calendars", "dav_list_calendars")
|
|
# Crear una colección de calendario nueva (MKCALENDAR + PROPPATCH nombre/color).
|
|
dav_make_calendar = _load_infra_fn("dav_make_calendar", "dav_make_calendar")
|
|
# Expandir una RRULE a las fechas de cada ocurrencia dentro de un rango (pura).
|
|
expand_rrule = _load_infra_fn("expand_rrule", "expand_rrule")
|
|
|
|
# Cliente del service osint_db (DuckDB), usado SOLO cuando el feature flag
|
|
# OSINT_DB_BACKEND está ON. El módulo vive junto a este archivo en server/, que
|
|
# está en sys.path tanto al ejecutar `python server/main.py` como al importarlo
|
|
# desde los tests. Se importa siempre (es barato: solo stdlib) pero no se usa a
|
|
# menos que el flag esté activo.
|
|
import osintdb_client # noqa: E402
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuración Xandikos (CardDAV / CalDAV)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
|
|
XANDIKOS_USERNAME = "enmanuel"
|
|
XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel"
|
|
XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/"
|
|
# Libreta (addressbook) por defecto. Cuando el flag OSINT_DB_BACKEND está OFF,
|
|
# todos los contactos viven en esta única libreta; el frontend la muestra como
|
|
# opción por defecto del selector.
|
|
DEFAULT_ADDRESSBOOK_SLUG = "addressbook"
|
|
DEFAULT_ADDRESSBOOK_NAME = "Contactos"
|
|
# Calendar-home del usuario: bajo él cuelgan las colecciones de calendario. El
|
|
# selector de calendario las descubre con dav_list_calendars (PROPFIND Depth:1).
|
|
XANDIKOS_CALENDAR_HOME = "/enmanuel/calendars/"
|
|
# Colección de calendario por defecto (la única hoy). Sigue siendo el destino
|
|
# cuando el cliente no especifica `cal`.
|
|
XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/"
|
|
|
|
# Caché en disco de los datos DAV ya parseados, indexada por el ctag de la
|
|
# colección. Al arrancar el server lee el ctag (PROPFIND barato ~0.1s) y, si
|
|
# coincide con el de la caché en disco, sirve los contactos/eventos ya parseados
|
|
# sin descargar ni reparsear nada (arranque instantáneo). El directorio vive
|
|
# junto al server y está gitignored (datos personales sensibles + regenerable).
|
|
_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache")
|
|
_CONTACTS_CACHE_FILE = os.path.join(_CACHE_DIR, "contacts.json")
|
|
# Caché del calendario por defecto. Para otras colecciones la ruta se deriva del
|
|
# nombre de la colección (_calendar_cache_file), así cada calendario tiene su
|
|
# propia caché en disco.
|
|
_CALENDAR_CACHE_FILE = os.path.join(_CACHE_DIR, "calendar.json")
|
|
|
|
|
|
def _calendar_cache_file(collection_path: str) -> str:
|
|
"""Ruta de la caché en disco de una colección de calendario concreta.
|
|
|
|
La colección por defecto usa ``_CALENDAR_CACHE_FILE`` (compatibilidad con la
|
|
caché previa); cualquier otra deriva su archivo del último segmento del path,
|
|
saneado, para que cada calendario tenga su propia caché aislada.
|
|
"""
|
|
if collection_path.strip("/") == XANDIKOS_CALENDAR_COLLECTION.strip("/"):
|
|
return _CALENDAR_CACHE_FILE
|
|
tail = collection_path.strip("/").rsplit("/", 1)[-1] or "calendar"
|
|
safe = re.sub(r"[^A-Za-z0-9_.-]", "_", tail)
|
|
return os.path.join(_CACHE_DIR, "calendar_%s.json" % safe)
|
|
|
|
# Extensiones de imagen que el frontend muestra en la galería con lightbox.
|
|
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
|
|
|
|
|
|
class DavUnavailable(Exception):
|
|
"""Xandikos no responde (sin red, timeout, auth caída).
|
|
|
|
Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un
|
|
fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los
|
|
endpoints del vault, que deben seguir funcionando offline.
|
|
"""
|
|
|
|
|
|
def _attachment_kind(name: str) -> str:
|
|
"""Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``."""
|
|
ext = os.path.splitext(name)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
return "image"
|
|
if ext == ".pdf":
|
|
return "pdf"
|
|
return "other"
|
|
|
|
|
|
def _read_pass_secret(entry: str) -> str:
|
|
"""Lee la contraseña (primera línea) de una entrada de ``pass``.
|
|
|
|
Wrapper fino sobre la función del registry ``pass_get_secret_py_infra``
|
|
(grupo ``infra``/``flow-replay``), que ejecuta ``pass show <entry>`` sin
|
|
shell y nunca logea el valor. Convierte su resultado ``{status, value|error}``
|
|
en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints
|
|
DAV degraden con un error explicable en vez de un 500 silencioso.
|
|
"""
|
|
res = pass_get_secret(entry)
|
|
if res.get("status") != "ok":
|
|
raise RuntimeError(
|
|
"no se pudo leer la entrada '%s' de pass: %s"
|
|
% (entry, res.get("error", "error desconocido"))
|
|
)
|
|
value = res.get("value", "")
|
|
if not value.strip():
|
|
raise RuntimeError("la entrada '%s' de pass está vacía" % entry)
|
|
return value.strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Caché en disco de los datos DAV ya parseados (indexada por ctag)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _read_disk_cache(path: str) -> Optional[dict]:
|
|
"""Lee una caché DAV del disco: ``{"ctag": str, "items": list}`` o None.
|
|
|
|
Devuelve None (recargar de la red) ante cualquier problema: archivo
|
|
inexistente, JSON corrupto, o estructura inesperada. Nunca lanza: la caché
|
|
es un acelerador, no una fuente de verdad — si falla, se cae a la descarga.
|
|
"""
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
except (OSError, ValueError):
|
|
return None
|
|
if not isinstance(data, dict) or not isinstance(data.get("items"), list):
|
|
return None
|
|
return data
|
|
|
|
|
|
def _write_disk_cache(path: str, ctag: str, items: list) -> None:
|
|
"""Escribe la caché DAV al disco de forma atómica (tmp + rename).
|
|
|
|
Persiste ``{"ctag": ctag, "items": items, "saved_at": epoch}``. Errores de
|
|
escritura se ignoran (no deben tumbar el endpoint): la caché en memoria sigue
|
|
sirviendo y el disco se reintentará en el siguiente refresco.
|
|
"""
|
|
try:
|
|
os.makedirs(os.path.dirname(path), mode=0o700, exist_ok=True)
|
|
tmp = path + ".tmp"
|
|
# La caché contiene PII (contactos, posibles DNIs en osint{}): se crea con
|
|
# permisos 0600 para que ningún otro usuario local pueda leerla, sin
|
|
# depender del umask del proceso.
|
|
fd = os.open(tmp, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
|
|
with os.fdopen(fd, "w", encoding="utf-8") as fh:
|
|
json.dump(
|
|
{"ctag": ctag, "items": items, "saved_at": time.time()},
|
|
fh,
|
|
ensure_ascii=False,
|
|
)
|
|
os.chmod(tmp, 0o600)
|
|
os.replace(tmp, path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Feature flag OSINT_DB_BACKEND: fuente de verdad de los contactos
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# OFF (default): los contactos se escriben como ficha .md en el vault + reflejo
|
|
# del vCard en Xandikos (comportamiento histórico de la app).
|
|
# ON: la app lee/escribe contra el service osint_db (DuckDB, 127.0.0.1:8771),
|
|
# que pasa a ser la fuente de verdad y empuja él mismo el cambio a Xandikos.
|
|
#
|
|
# El flag vive en dev/feature_flags.json (raíz de la app), patrón TBD del
|
|
# registry (.claude/rules/feature_flags.md). Se lee en cada acceso (no se cachea)
|
|
# para que cambiarlo no requiera reiniciar el server.
|
|
|
|
_FLAGS_FILE = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
|
"dev",
|
|
"feature_flags.json",
|
|
)
|
|
|
|
# Service osint_db (DuckDB) — fuente de verdad cuando el flag está ON.
|
|
OSINT_DB_BASE_URL = "http://127.0.0.1:8771"
|
|
|
|
|
|
def _osint_db_backend_enabled() -> bool:
|
|
"""True si el flag ``OSINT_DB_BACKEND`` está activo en dev/feature_flags.json.
|
|
|
|
Lee el archivo en cada llamada (sin caché) para que el flip se note sin
|
|
reiniciar. Tolerante a fallos: archivo ausente, JSON corrupto o clave faltante
|
|
→ False (comportamiento histórico vault+Xandikos), nunca lanza.
|
|
"""
|
|
try:
|
|
with open(_FLAGS_FILE, "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
except (OSError, ValueError):
|
|
return False
|
|
flag = (data.get("flags") or {}).get("OSINT_DB_BACKEND") or {}
|
|
return bool(flag.get("enabled"))
|
|
|
|
|
|
def _contacts_from_osint_db() -> list:
|
|
"""Lee los contactos del osint_db y los adapta al shape JSON del frontend.
|
|
|
|
El osint_db devuelve filas ``{uid, collection, fn, tels, emails, note_path}``
|
|
(``tels``/``emails`` como JSON array). Esta función las mapea al mismo dict que
|
|
produce ``_vcard_to_json`` (``uid, nombre, telefonos, correos, phones,
|
|
emails, osint, ...``), para que ``/api/contacts`` y la vista no distingan la
|
|
fuente. ``collection`` se expone para poder filtrar por libreta.
|
|
|
|
Raises:
|
|
osintdb_client.OsintDbUnavailable: si el service no responde.
|
|
"""
|
|
rows = osintdb_client.list_contacts()
|
|
out: list = []
|
|
for row in rows:
|
|
tels = osintdb_client._parse_json_array(row.get("tels"))
|
|
mails = osintdb_client._parse_json_array(row.get("emails"))
|
|
fn = row.get("fn")
|
|
out.append(
|
|
{
|
|
"uid": row.get("uid"),
|
|
"fn": fn,
|
|
"nombre": fn,
|
|
"nickname": None,
|
|
"alias": None,
|
|
"org": None,
|
|
"note": None,
|
|
"nota": None,
|
|
"collection": row.get("collection"),
|
|
"phones": [{"value": t, "type": ""} for t in tels],
|
|
"emails": [{"value": e, "type": ""} for e in mails],
|
|
"telefonos": tels,
|
|
"correos": mails,
|
|
"direcciones": [],
|
|
"osint": {},
|
|
"note_path": row.get("note_path"),
|
|
}
|
|
)
|
|
out.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
|
|
return out
|
|
|
|
|
|
def _osint_db_contact_payload(data: "ContactIn", uid: Optional[str] = None) -> dict:
|
|
"""Construye el cuerpo JSON de un contacto para el service osint_db.
|
|
|
|
Mapea el ``ContactIn`` (ya reconciliado multi-valor) al contrato del osint_db:
|
|
``{uid?, collection, fn, telefonos, emails, direcciones, nombre, aliases, dni,
|
|
pais, contexto, notas}``. ``collection`` se deriva del campo ``contexto`` si
|
|
apunta a una libreta; por defecto la libreta canónica. ``uid`` solo se incluye
|
|
al crear (el PUT lo lleva en la ruta, no en el cuerpo).
|
|
"""
|
|
nombre = data.nombre.strip()
|
|
payload: dict = {
|
|
"collection": _norm_str(data.collection) or DEFAULT_ADDRESSBOOK_SLUG,
|
|
"fn": nombre,
|
|
"nombre": nombre,
|
|
"telefonos": _norm_list(data.telefonos),
|
|
"emails": _norm_list(data.emails),
|
|
"direcciones": _norm_list(data.direcciones),
|
|
"aliases": _norm_list(data.aliases),
|
|
"dni": _norm_str(data.dni),
|
|
"pais": _norm_str(data.pais),
|
|
"contexto": _norm_str(data.contexto),
|
|
"notas": _norm_str(data.notas),
|
|
}
|
|
if uid:
|
|
payload["uid"] = uid
|
|
return payload
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Estado del servidor: caché del vault + password Xandikos
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class VaultState:
|
|
"""Caché en memoria del vault: grafo agregado + índice slug → nota.
|
|
|
|
Se construye al arrancar y se reconstruye bajo demanda con ``refresh()``
|
|
(botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe
|
|
mediante un lock sobre la reconstrucción. La password de Xandikos se lee de
|
|
``pass`` perezosamente y se cachea en memoria.
|
|
|
|
Raises:
|
|
FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar,
|
|
nunca un 500 silencioso).
|
|
NotADirectoryError: si ``vault_dir`` no es un directorio.
|
|
"""
|
|
|
|
def __init__(self, vault_dir: str):
|
|
if not os.path.exists(vault_dir):
|
|
raise FileNotFoundError(f"el vault no existe: {vault_dir}")
|
|
if not os.path.isdir(vault_dir):
|
|
raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}")
|
|
self.vault_dir = os.path.abspath(vault_dir)
|
|
self._vault_real = os.path.realpath(self.vault_dir)
|
|
self._lock = threading.Lock()
|
|
self.graph: dict = {"nodes": [], "edges": []}
|
|
self.note_index: dict = {} # slug -> {"path", "tipo", "label"}
|
|
self._xandikos_password: Optional[str] = None
|
|
# Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al
|
|
# primer acceso y se invalidan en POST /api/refresh. None = sin cargar.
|
|
# Cada caché lleva su ctag para servir del disco sin red cuando la
|
|
# colección no cambió. _force_reload (set por /api/refresh) salta la
|
|
# validación de ctag en el siguiente acceso.
|
|
self._dav_lock = threading.Lock()
|
|
self._contacts_cache: Optional[list] = None
|
|
# Caché de eventos POR colección de calendario: collection_path → list.
|
|
# Permite varios calendarios sin pisarse; cada uno con su ctag.
|
|
self._calendar_cache: dict[str, list] = {}
|
|
self._calendar_ctag: dict[str, str] = {}
|
|
self._calendars_cache: Optional[list] = None # lista de colecciones
|
|
self._contacts_ctag: Optional[str] = None
|
|
self._force_reload = False
|
|
self.refresh()
|
|
|
|
# --- vault --------------------------------------------------------------
|
|
|
|
def refresh(self) -> dict:
|
|
"""Re-escanea el vault: reconstruye grafo + índice de notas.
|
|
|
|
Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend.
|
|
"""
|
|
with self._lock:
|
|
graph = build_obsidian_graph(self.vault_dir, include_dangling=True)
|
|
nodes_by_id = {n["id"]: n for n in graph["nodes"]}
|
|
note_index: dict = {}
|
|
for path in list_obsidian_notes(self.vault_dir):
|
|
slug = os.path.splitext(os.path.basename(path))[0]
|
|
if not slug or slug in note_index:
|
|
continue
|
|
node = nodes_by_id.get(slug, {})
|
|
note_index[slug] = {
|
|
"path": path,
|
|
"tipo": node.get("tipo", "nota"),
|
|
"label": node.get("label", slug),
|
|
}
|
|
self.graph = graph
|
|
self.note_index = note_index
|
|
return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])}
|
|
|
|
def graph_payload(self) -> dict:
|
|
"""Grafo + conteos por tipo para la leyenda de sigma.js."""
|
|
counts: dict[str, int] = {}
|
|
for node in self.graph["nodes"]:
|
|
counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1
|
|
return {
|
|
"nodes": self.graph["nodes"],
|
|
"edges": self.graph["edges"],
|
|
"counts": counts,
|
|
"total_nodes": len(self.graph["nodes"]),
|
|
"total_edges": len(self.graph["edges"]),
|
|
}
|
|
|
|
def rows_by_tipo(self, tipo: str) -> list:
|
|
"""Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados.
|
|
|
|
Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter``
|
|
completo — el frontend aplana las columnas que le interesen. Sin
|
|
``tipo`` devuelve todos los nodos reales.
|
|
"""
|
|
rows = []
|
|
for node in self.graph["nodes"]:
|
|
if node.get("dangling"):
|
|
continue
|
|
if tipo and node["tipo"] != tipo:
|
|
continue
|
|
rows.append(
|
|
{
|
|
"id": node["id"],
|
|
"label": node["label"],
|
|
"tipo": node["tipo"],
|
|
"frontmatter": node["frontmatter"],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
def _resolve_embed(self, embed_name: str) -> str:
|
|
"""Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault.
|
|
|
|
El vault osint usa dos formas de embed: por nombre de archivo
|
|
(``![[foto.jpg]]``) y por path relativo al vault
|
|
(``![[attachments/personas/<slug>/foto.png]]``). La función del registry
|
|
``resolve_obsidian_embed`` resuelve solo por basename, así que primero se
|
|
intenta el embed como path literal relativo al vault (cubre la forma con
|
|
ruta), y si no existe se cae al resolutor por basename del registry.
|
|
Devuelve cadena vacía si ninguna forma resuelve.
|
|
"""
|
|
# Forma 1: el embed ya es un path relativo al vault.
|
|
literal = os.path.realpath(os.path.join(self._vault_real, embed_name))
|
|
if self._is_within_vault(literal) and os.path.isfile(literal):
|
|
return literal
|
|
# Forma 2: resolución por basename via función del registry.
|
|
return resolve_obsidian_embed(self.vault_dir, embed_name)
|
|
|
|
def node_detail(self, slug: str):
|
|
"""Ficha completa de un nodo: frontmatter + body + attachments.
|
|
|
|
Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a
|
|
paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``)
|
|
y devueltos como paths **relativos al vault** (lo que consume
|
|
``/api/attachment``). Un embed que no resuelve se reporta con
|
|
``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no
|
|
corresponde a ninguna nota del vault.
|
|
"""
|
|
info = self.note_index.get(slug)
|
|
if info is None:
|
|
# Tolerancia: aceptar también nombres sin slugificar.
|
|
info = self.note_index.get(slugify_obsidian_name(slug))
|
|
if info is None:
|
|
return None
|
|
note = read_obsidian_note(info["path"])
|
|
attachments = []
|
|
for name in extract_obsidian_embeds(note["body"]):
|
|
abs_path = self._resolve_embed(name)
|
|
if not abs_path:
|
|
attachments.append({"name": name, "path": "", "kind": "missing"})
|
|
continue
|
|
real = os.path.realpath(abs_path)
|
|
# Defensa en profundidad: solo attachments dentro del vault.
|
|
if not self._is_within_vault(real):
|
|
continue
|
|
rel = os.path.relpath(real, self._vault_real)
|
|
attachments.append(
|
|
{"name": name, "path": rel, "kind": _attachment_kind(abs_path)}
|
|
)
|
|
return {
|
|
"id": os.path.splitext(os.path.basename(info["path"]))[0],
|
|
"tipo": info["tipo"],
|
|
"label": info["label"],
|
|
"frontmatter": note["frontmatter"],
|
|
"body": note["body"],
|
|
"tags": note["tags"],
|
|
"wikilinks": note["wikilinks"],
|
|
"attachments": attachments,
|
|
}
|
|
|
|
def _is_within_vault(self, candidate_real_path: str) -> bool:
|
|
"""True si ``candidate_real_path`` (ya realpath) está dentro del vault.
|
|
|
|
Añade el separador final al vault para que ``/vault-evil`` no cuele como
|
|
prefijo de ``/vault``.
|
|
"""
|
|
return (
|
|
candidate_real_path == self._vault_real
|
|
or candidate_real_path.startswith(self._vault_real + os.sep)
|
|
)
|
|
|
|
def resolve_attachment_path(self, rel_path: str):
|
|
"""Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault.
|
|
|
|
Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y
|
|
sigue symlinks) y exige que el resultado quede estrictamente bajo la
|
|
raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento
|
|
de salir del vault, paths absolutos, o archivos inexistentes.
|
|
"""
|
|
if not rel_path:
|
|
return None
|
|
candidate = os.path.realpath(os.path.join(self._vault_real, rel_path))
|
|
if candidate == self._vault_real:
|
|
return None
|
|
if not candidate.startswith(self._vault_real + os.sep):
|
|
return None
|
|
if not os.path.isfile(candidate):
|
|
return None
|
|
return candidate
|
|
|
|
def search(self, query: str) -> list:
|
|
"""Búsqueda global: nodos cuyas notas matchean la query (substring).
|
|
|
|
Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug,
|
|
label, tipo) + las líneas que matchean.
|
|
"""
|
|
results = []
|
|
for hit in search_obsidian_notes(self.vault_dir, query):
|
|
slug = os.path.splitext(os.path.basename(hit["path"]))[0]
|
|
info = self.note_index.get(slug, {})
|
|
results.append(
|
|
{
|
|
"id": slug,
|
|
"label": info.get("label", slug),
|
|
"tipo": info.get("tipo", "nota"),
|
|
"matches": hit.get("matches", []),
|
|
}
|
|
)
|
|
return results
|
|
|
|
# --- Xandikos -----------------------------------------------------------
|
|
|
|
def xandikos_password(self) -> str:
|
|
"""Password de Xandikos desde ``pass``, cacheada en memoria."""
|
|
with self._lock:
|
|
if self._xandikos_password is None:
|
|
self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY)
|
|
return self._xandikos_password
|
|
|
|
def _collection_ctag(self, collection_path: str, password: str) -> Optional[str]:
|
|
"""Lee el ctag de una colección (PROPFIND barato), o None si no se puede.
|
|
|
|
El ctag es el token de versión de la colección: si no cambió, la caché en
|
|
disco sigue vigente. Devolver None significa "no pude validar" → se
|
|
recarga de la red por seguridad (nunca se sirve caché potencialmente
|
|
obsoleta sin confirmación). No lanza.
|
|
"""
|
|
res = dav_collection_ctag(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path
|
|
)
|
|
return res.get("ctag") if res.get("status") == "ok" else None
|
|
|
|
def _load_collection(
|
|
self,
|
|
collection_path: str,
|
|
content_type: str,
|
|
cache_file: str,
|
|
parse_items,
|
|
) -> tuple:
|
|
"""Carga una colección DAV con caché en disco validada por ctag.
|
|
|
|
Flujo (1 o 2 peticiones, nunca N):
|
|
1. Lee el ctag de la colección (PROPFIND Depth:0, ~0.1s).
|
|
2. Si el ctag coincide con el de la caché en disco (y no hay refresh
|
|
forzado), parsea los items del disco y devuelve sin descargar.
|
|
3. Si no, hace UN REPORT ``dav_get_collection`` que trae todos los
|
|
recursos con su contenido inline, los parsea con ``parse_items`` y
|
|
reescribe la caché en disco con el nuevo ctag.
|
|
|
|
``parse_items(resources) -> list`` transforma la lista
|
|
``[{href, etag, data}]`` del registry en la lista de objetos JSON que
|
|
sirve el endpoint (contactos o eventos).
|
|
|
|
Returns:
|
|
tuple ``(items, ctag)``.
|
|
|
|
Raises:
|
|
DavUnavailable: si Xandikos no responde al REPORT cuando hay que
|
|
descargar (sin red, timeout, auth).
|
|
"""
|
|
password = self.xandikos_password()
|
|
ctag = self._collection_ctag(collection_path, password)
|
|
|
|
# Caché en disco vigente: mismo ctag y sin refresh forzado → sin red.
|
|
if ctag is not None and not self._force_reload:
|
|
disk = _read_disk_cache(cache_file)
|
|
if disk is not None and disk.get("ctag") == ctag:
|
|
return parse_items(disk["items"]), ctag
|
|
|
|
# Hay que (re)descargar: UN REPORT trae todo con el contenido inline.
|
|
got = dav_get_collection(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
collection_path,
|
|
content_type,
|
|
)
|
|
if got.get("status") != "ok":
|
|
raise DavUnavailable("Xandikos no responde: %s" % got.get("error"))
|
|
resources = got.get("resources", [])
|
|
items = parse_items(resources)
|
|
# Persistir en disco para el arranque instantáneo de la próxima vez. Si
|
|
# no obtuvimos ctag, guardamos cadena vacía: nunca matcheará un ctag real,
|
|
# así que la próxima vez se revalidará (cae con elegancia a "siempre
|
|
# recargar" sin romper).
|
|
_write_disk_cache(cache_file, ctag or "", items)
|
|
return items, (ctag or "")
|
|
|
|
@staticmethod
|
|
def _parse_contacts(items: list) -> list:
|
|
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a contactos JSON.
|
|
|
|
Acepta dos formas de ``items``: la lista de recursos del registry (cada
|
|
uno con ``data`` = texto vCard, posible multi-tarjeta) que hay que
|
|
parsear, o la lista de contactos ya parseados (caché en disco), que se
|
|
devuelve tal cual. Se distinguen por la presencia de la clave ``data``.
|
|
"""
|
|
if items and "data" in items[0]:
|
|
contacts: list = []
|
|
for res in items:
|
|
href = res.get("href")
|
|
for card_text in split_vcards(res.get("data", "")):
|
|
card = _vcard_to_json(card_text)
|
|
card["etag"] = res.get("etag")
|
|
card["href"] = href
|
|
contacts.append(card)
|
|
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
|
|
return contacts
|
|
return list(items)
|
|
|
|
@staticmethod
|
|
def _parse_events(items: list) -> list:
|
|
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a eventos JSON.
|
|
|
|
Igual criterio que ``_parse_contacts``: si los items llevan ``data`` son
|
|
recursos del registry (texto VCALENDAR) y se parsean; si no, ya son
|
|
eventos cacheados y se devuelven tal cual.
|
|
"""
|
|
if items and "data" in items[0]:
|
|
events: list = []
|
|
for res in items:
|
|
href = res.get("href")
|
|
for event in _vcalendar_to_events(res.get("data", "")):
|
|
event["etag"] = res.get("etag")
|
|
event["href"] = href
|
|
events.append(event)
|
|
events.sort(key=lambda e: e.get("dtstart") or "")
|
|
return events
|
|
return list(items)
|
|
|
|
def contacts(self) -> list:
|
|
"""Contactos, desde Xandikos (flag OFF) o desde osint_db (flag ON).
|
|
|
|
Con el flag ``OSINT_DB_BACKEND`` activo, la fuente de verdad es el service
|
|
osint_db (DuckDB): se consultan sus contactos y se devuelven con el mismo
|
|
shape JSON que produce el parseo del vCard, para que el frontend no note la
|
|
diferencia. Con el flag OFF (default), camino histórico: addressbook
|
|
Xandikos parseado y cacheado.
|
|
|
|
Caché en dos niveles para el camino DAV: memoria (mientras vive el
|
|
proceso) y disco (``.cache/contacts.json``, validada por ctag para
|
|
arranque instantáneo). Al primer acceso descarga TODO en UNA petición
|
|
REPORT (``dav_get_collection``) en vez de un GET por ``.vcf``.
|
|
|
|
Raises:
|
|
RuntimeError: si no se puede leer la password de ``pass``.
|
|
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
|
|
osintdb_client.OsintDbUnavailable: si el osint_db no responde (flag ON).
|
|
"""
|
|
if _osint_db_backend_enabled():
|
|
return _contacts_from_osint_db()
|
|
with self._dav_lock:
|
|
if self._contacts_cache is not None and not self._force_reload:
|
|
return self._contacts_cache
|
|
contacts, ctag = self._load_collection(
|
|
XANDIKOS_CONTACTS_COLLECTION,
|
|
"vcard",
|
|
_CONTACTS_CACHE_FILE,
|
|
self._parse_contacts,
|
|
)
|
|
self._contacts_cache = contacts
|
|
self._contacts_ctag = ctag
|
|
self._maybe_clear_force_reload()
|
|
return contacts
|
|
|
|
def list_addressbooks(self) -> list:
|
|
"""Libretas (addressbooks) disponibles para los contactos.
|
|
|
|
Con el flag ``OSINT_DB_BACKEND`` ON, consulta las libretas del osint_db
|
|
(``{slug, display_name, collection_path, color}``). Con el flag OFF, hoy
|
|
solo existe la libreta por defecto en el vault; se devuelve esa única
|
|
entrada para que el selector del frontend tenga algo que mostrar.
|
|
|
|
Raises:
|
|
osintdb_client.OsintDbUnavailable: si el osint_db no responde (flag ON).
|
|
"""
|
|
if _osint_db_backend_enabled():
|
|
return osintdb_client.list_addressbooks()
|
|
return [
|
|
{
|
|
"slug": DEFAULT_ADDRESSBOOK_SLUG,
|
|
"display_name": DEFAULT_ADDRESSBOOK_NAME,
|
|
"collection_path": XANDIKOS_CONTACTS_COLLECTION,
|
|
"color": None,
|
|
}
|
|
]
|
|
|
|
def create_addressbook(self, data: "AddressbookIn") -> dict:
|
|
"""Crea una libreta de contactos nueva.
|
|
|
|
Solo soportado con el flag ``OSINT_DB_BACKEND`` ON: el osint_db crea la
|
|
colección CardDAV en Xandikos y la registra en la DuckDB. Con el flag OFF
|
|
no hay forma de crear libretas todavía (no existe ``dav_make_addressbook``
|
|
en el registry) → 501 claro indicando que requiere el flag.
|
|
|
|
Returns:
|
|
dict ``{status, slug, ...}`` del osint_db.
|
|
|
|
Raises:
|
|
HTTPException(400): si el slug/nombre queda vacío.
|
|
HTTPException(501): si el flag está OFF.
|
|
osintdb_client.OsintDbUnavailable: si el osint_db no responde.
|
|
"""
|
|
slug = (data.slug or data.name or "").strip()
|
|
if not slug:
|
|
raise HTTPException(
|
|
status_code=400, detail="el nombre de la libreta es obligatorio"
|
|
)
|
|
if not _osint_db_backend_enabled():
|
|
raise HTTPException(
|
|
status_code=501,
|
|
detail=(
|
|
"crear libretas requiere el backend OSINT_DB_BACKEND activo "
|
|
"(hoy solo existe la libreta por defecto en el vault)"
|
|
),
|
|
)
|
|
res = osintdb_client.create_addressbook(
|
|
slug, data.name or slug, data.color or None
|
|
)
|
|
self.invalidate_dav()
|
|
return res
|
|
|
|
def _resolve_calendar(self, cal: str = "") -> str:
|
|
"""Normaliza el parámetro ``cal`` a una ruta de colección de calendario.
|
|
|
|
Acepta una ruta absoluta (``/enmanuel/calendars/calendar/``), el nombre
|
|
corto de la colección (``calendar``), o vacío (→ colección por defecto).
|
|
Garantiza barras inicial/final. NO valida contra el servidor (eso lo hace
|
|
el propio Xandikos al fallar la petición); solo da forma canónica.
|
|
"""
|
|
cal = (cal or "").strip()
|
|
if not cal:
|
|
return XANDIKOS_CALENDAR_COLLECTION
|
|
if cal.startswith("/"):
|
|
path = cal
|
|
else:
|
|
# Nombre corto → lo colgamos del calendar-home.
|
|
path = XANDIKOS_CALENDAR_HOME.rstrip("/") + "/" + cal.strip("/")
|
|
if not path.endswith("/"):
|
|
path += "/"
|
|
return path
|
|
|
|
def list_calendars(self) -> list:
|
|
"""Colecciones de calendario bajo el calendar-home, con nombre y color.
|
|
|
|
Cacheada en memoria (``POST /api/refresh`` la invalida). Compone la
|
|
función del registry ``dav_list_calendars`` (PROPFIND Depth:1). Devuelve
|
|
``[{href, name, color}, ...]`` ordenadas por nombre.
|
|
|
|
Raises:
|
|
RuntimeError: si no se puede leer la password de ``pass``.
|
|
DavUnavailable: si Xandikos no responde.
|
|
"""
|
|
with self._dav_lock:
|
|
if self._calendars_cache is not None and not self._force_reload:
|
|
return self._calendars_cache
|
|
password = self.xandikos_password()
|
|
res = dav_list_calendars(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CALENDAR_HOME
|
|
)
|
|
if res.get("status") != "ok":
|
|
raise DavUnavailable(
|
|
"Xandikos no responde: %s" % res.get("error")
|
|
)
|
|
calendars = res.get("calendars", [])
|
|
self._calendars_cache = calendars
|
|
self._maybe_clear_force_reload()
|
|
return calendars
|
|
|
|
def create_calendar(self, data: "CalendarIn") -> dict:
|
|
"""Crea una colección de calendario nueva bajo el calendar-home (MKCALENDAR).
|
|
|
|
Compone la función del registry ``dav_make_calendar`` (MKCALENDAR +
|
|
PROPPATCH de nombre/color). Invalida la caché de colecciones para que el
|
|
calendario nuevo aparezca en el selector al recargar.
|
|
|
|
Returns:
|
|
dict ``{status, href, existed?}`` de la función del registry.
|
|
|
|
Raises:
|
|
HTTPException(400): si el slug/nombre queda vacío tras sanear.
|
|
DavUnavailable: si Xandikos rechaza la creación.
|
|
"""
|
|
slug = (data.slug or data.name or "").strip()
|
|
if not slug:
|
|
raise HTTPException(status_code=400, detail="el nombre del calendario es obligatorio")
|
|
password = self.xandikos_password()
|
|
res = dav_make_calendar(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
XANDIKOS_CALENDAR_HOME,
|
|
slug,
|
|
data.name or slug,
|
|
data.color or "",
|
|
data.description or "",
|
|
)
|
|
if res.get("status") != "ok":
|
|
raise DavUnavailable(
|
|
"Xandikos no pudo crear el calendario: %s" % res.get("error")
|
|
)
|
|
with self._dav_lock:
|
|
self._calendars_cache = None
|
|
return res
|
|
|
|
def calendar(self, cal: str = "", dt_from: str = "", dt_to: str = "") -> list:
|
|
"""Eventos de una colección de calendario Xandikos, cacheados y filtrados.
|
|
|
|
Caché por colección (memoria + disco, validada por ctag). La descarga +
|
|
parseo completos se cachean (UNA petición REPORT); el filtro por
|
|
``[from, to]`` se aplica sobre la caché. Sin ``cal`` usa la colección por
|
|
defecto; sin ``from``/``to`` devuelve todos los eventos.
|
|
|
|
Raises:
|
|
RuntimeError: si no se puede leer la password de ``pass``.
|
|
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
|
|
"""
|
|
collection = self._resolve_calendar(cal)
|
|
with self._dav_lock:
|
|
cached = self._calendar_cache.get(collection)
|
|
if cached is None or self._force_reload:
|
|
events, ctag = self._load_collection(
|
|
collection,
|
|
"ical",
|
|
_calendar_cache_file(collection),
|
|
self._parse_events,
|
|
)
|
|
self._calendar_cache[collection] = events
|
|
self._calendar_ctag[collection] = ctag
|
|
self._maybe_clear_force_reload()
|
|
cached = events
|
|
all_events = list(cached)
|
|
# Sin rango: devolvemos los eventos maestros tal cual (no expandimos
|
|
# series infinitas). Con rango: cada serie recurrente se expande a sus
|
|
# ocurrencias dentro de [from, to]; los puntuales se filtran por fecha.
|
|
if not dt_from and not dt_to:
|
|
return all_events
|
|
out: list = []
|
|
for ev in all_events:
|
|
if ev.get("rrule"):
|
|
out.extend(_expand_event_occurrences(ev, dt_from, dt_to))
|
|
elif _event_in_range(ev, dt_from, dt_to):
|
|
out.append(ev)
|
|
return out
|
|
|
|
# --- Escritura de eventos del calendario (CalDAV) -----------------------
|
|
|
|
def create_event(self, data: "EventIn") -> dict:
|
|
"""Crea un VEVENT en una colección de calendario (PUT de un VCALENDAR).
|
|
|
|
Genera un UID nuevo, construye el VCALENDAR/VEVENT (respetando tz/all_day,
|
|
ver ``_build_vcalendar``) y lo sube con ``caldav_put_event``. Invalida la
|
|
caché de esa colección para que el evento aparezca ya.
|
|
|
|
Returns:
|
|
dict ``{uid, cal, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(400): si la fecha es inválida o falta el summary.
|
|
DavUnavailable: si Xandikos rechaza el PUT.
|
|
"""
|
|
if not data.summary or not data.summary.strip():
|
|
raise HTTPException(status_code=400, detail="el summary es obligatorio")
|
|
collection = self._resolve_calendar(data.cal or "")
|
|
uid = str(uuid.uuid4())
|
|
try:
|
|
vcal = _build_vcalendar(data, uid)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
dav = self._put_event(collection, uid, vcal)
|
|
if dav.get("status") != "ok":
|
|
raise DavUnavailable("Xandikos rechazó el evento: %s" % dav.get("error"))
|
|
self._invalidate_calendar(collection)
|
|
return {"uid": uid, "cal": collection, "dav": dav}
|
|
|
|
def update_event(self, uid: str, data: "EventIn") -> dict:
|
|
"""Edita un VEVENT existente: reescribe el recurso ``<uid>.ics`` (PUT).
|
|
|
|
Reutiliza el UID (idempotente). Construye el VCALENDAR de nuevo a partir
|
|
del cuerpo recibido y lo sube. Invalida la caché de la colección.
|
|
|
|
Returns:
|
|
dict ``{uid, cal, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(400): si la fecha es inválida o falta el summary.
|
|
DavUnavailable: si Xandikos rechaza el PUT.
|
|
"""
|
|
if not data.summary or not data.summary.strip():
|
|
raise HTTPException(status_code=400, detail="el summary es obligatorio")
|
|
collection = self._resolve_calendar(data.cal or "")
|
|
try:
|
|
vcal = _build_vcalendar(data, uid)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
|
dav = self._put_event(collection, uid, vcal)
|
|
if dav.get("status") != "ok":
|
|
raise DavUnavailable("Xandikos rechazó el evento: %s" % dav.get("error"))
|
|
self._invalidate_calendar(collection)
|
|
return {"uid": uid, "cal": collection, "dav": dav}
|
|
|
|
def delete_event(self, uid: str, cal: str = "") -> dict:
|
|
"""Borra un VEVENT: elimina el recurso ``<uid>.ics`` de la colección.
|
|
|
|
Trata 404 como idempotente (ya no existía). Invalida la caché de la
|
|
colección.
|
|
|
|
Returns:
|
|
dict ``{uid, deleted, dav}``.
|
|
|
|
Raises:
|
|
DavUnavailable: si Xandikos falla con un error distinto de 404.
|
|
"""
|
|
collection = self._resolve_calendar(cal)
|
|
password = self.xandikos_password()
|
|
resource_path = collection + _safe_event_resource(uid)
|
|
dav = dav_delete_resource(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, resource_path
|
|
)
|
|
if dav.get("status") != "ok" and dav.get("http_status") == 404:
|
|
dav = {"status": "ok", "http_status": 404, "idempotent": True}
|
|
if dav.get("status") != "ok":
|
|
raise DavUnavailable("Xandikos no pudo borrar: %s" % dav.get("error"))
|
|
self._invalidate_calendar(collection)
|
|
return {"uid": uid, "deleted": True, "dav": dav}
|
|
|
|
def _put_event(self, collection: str, uid: str, vcalendar_text: str) -> dict:
|
|
"""Sube (PUT) un VCALENDAR a una colección CalDAV. No lanza por sí sola.
|
|
|
|
Compone la función del registry ``caldav_put_event`` (deriva el nombre
|
|
del recurso de ``safe(uid).ics``). Devuelve su dict
|
|
``{status, http_status|error}``.
|
|
"""
|
|
password = self.xandikos_password()
|
|
return caldav_put_event(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
collection,
|
|
uid,
|
|
vcalendar_text,
|
|
)
|
|
|
|
def _invalidate_calendar(self, collection: str) -> None:
|
|
"""Vacía la caché en memoria de una colección de calendario concreta."""
|
|
with self._dav_lock:
|
|
self._calendar_cache.pop(collection, None)
|
|
self._calendar_ctag.pop(collection, None)
|
|
|
|
def _maybe_clear_force_reload(self) -> None:
|
|
"""Apaga el flag de refresh forzado una vez consumido por una recarga.
|
|
|
|
Llamado bajo ``_dav_lock`` tras recargar una colección. El flag lo activa
|
|
``invalidate_dav`` (POST /api/refresh) para forzar UNA recarga que ignore
|
|
el ctag; tras ella vuelve a la validación normal por ctag.
|
|
"""
|
|
self._force_reload = False
|
|
|
|
def invalidate_dav(self) -> None:
|
|
"""Vacía las cachés de contactos y calendario y fuerza una recarga.
|
|
|
|
Limpia las cachés en memoria y marca ``_force_reload`` para que el
|
|
siguiente acceso a cada colección ignore el ctag cacheado y vuelva a
|
|
descargar del servidor (el botón "refrescar" debe traer cambios aunque el
|
|
ctag no se haya actualizado todavía). No borra la password.
|
|
"""
|
|
with self._dav_lock:
|
|
self._contacts_cache = None
|
|
self._calendar_cache = {}
|
|
self._calendar_ctag = {}
|
|
self._calendars_cache = None
|
|
self._force_reload = True
|
|
|
|
# --- Escritura de contactos: ficha .md (verdad) + reflejo en Xandikos ----
|
|
|
|
def _contact_note_path(self, tipo: str, slug: str) -> str:
|
|
"""Path absoluto de la ficha ``.md`` de un contacto en el vault.
|
|
|
|
``personas/<slug>.md`` o ``organizaciones/<slug>.md`` según el tipo.
|
|
"""
|
|
folder = _TIPO_FOLDER.get(tipo, "personas")
|
|
return os.path.join(self._vault_real, folder, slug + ".md")
|
|
|
|
def _put_vcard(self, slug: str, vcard_text: str) -> dict:
|
|
"""Sube (PUT) un vCard a Xandikos por su UID=slug. No lanza por sí sola.
|
|
|
|
Reflejo inmediato del cambio en la ficha del vault, para que el contacto
|
|
se vea ya en la app y en el móvil sin esperar al sync periódico. Devuelve
|
|
el dict ``{status, http_status|error}`` de ``carddav_put_vcard``.
|
|
"""
|
|
password = self.xandikos_password()
|
|
return carddav_put_vcard(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
XANDIKOS_CONTACTS_COLLECTION,
|
|
slug,
|
|
vcard_text,
|
|
)
|
|
|
|
def _delete_vcard(self, slug: str) -> dict:
|
|
"""Borra (DELETE) el vCard ``<slug>.vcf`` de Xandikos. No lanza.
|
|
|
|
Compone ``dav_delete_resource`` con el href del recurso (mismo nombre que
|
|
usó el PUT: ``<slug>.vcf``). Trata 404 como idempotente (ya no existía →
|
|
objetivo cumplido), igual que un borrado repetido.
|
|
"""
|
|
password = self.xandikos_password()
|
|
resource_path = XANDIKOS_CONTACTS_COLLECTION + slug + ".vcf"
|
|
res = dav_delete_resource(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, resource_path
|
|
)
|
|
if res.get("status") != "ok" and res.get("http_status") == 404:
|
|
return {"status": "ok", "http_status": 404, "idempotent": True}
|
|
return res
|
|
|
|
def create_contact(self, data: "ContactIn") -> dict:
|
|
"""Crea un contacto: ficha ``.md`` (verdad) + reflejo del vCard.
|
|
|
|
1. Genera el slug del nombre. 409 si ya existe la ficha.
|
|
2. Escribe la ficha ``.md`` con el frontmatter canónico (acción primaria).
|
|
3. Hace PUT del vCard a Xandikos (reflejo inmediato; un fallo NO revierte
|
|
la ficha — el sync periódico reconciliará).
|
|
4. Invalida las cachés DAV para que el contacto aparezca ya en la app.
|
|
|
|
Returns:
|
|
dict ``{slug, uid, path, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(409): si ya existe una ficha con ese slug.
|
|
HTTPException(400): si el tipo no es 'persona'|'organizacion' o el
|
|
nombre está vacío.
|
|
"""
|
|
if not data.nombre.strip():
|
|
raise HTTPException(status_code=400, detail="el nombre es obligatorio")
|
|
if _osint_db_backend_enabled():
|
|
# Flag ON: el osint_db es la fuente de verdad. Genera el slug igual que
|
|
# el camino vault (mismo UID), envía el payload y deja que el service
|
|
# escriba la DuckDB + empuje a Xandikos.
|
|
slug = slugify_obsidian_name(data.nombre)
|
|
if not slug:
|
|
raise HTTPException(
|
|
status_code=400, detail="el nombre no produce un slug válido"
|
|
)
|
|
res = osintdb_client.create_contact(_osint_db_contact_payload(data, slug))
|
|
self.invalidate_dav()
|
|
uid = res.get("uid") or slug
|
|
return {"slug": uid, "uid": uid, "path": None, "osint_db": res}
|
|
tipo = (data.tipo or "persona").strip()
|
|
if tipo not in _TIPO_FOLDER:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="tipo inválido '%s' (persona|organizacion)" % tipo,
|
|
)
|
|
if not data.nombre.strip():
|
|
raise HTTPException(status_code=400, detail="el nombre es obligatorio")
|
|
slug = slugify_obsidian_name(data.nombre)
|
|
if not slug:
|
|
raise HTTPException(
|
|
status_code=400, detail="el nombre no produce un slug válido"
|
|
)
|
|
note_path = self._contact_note_path(tipo, slug)
|
|
if os.path.exists(note_path):
|
|
raise HTTPException(
|
|
status_code=409, detail="ya existe un contacto con slug '%s'" % slug
|
|
)
|
|
frontmatter = _contact_frontmatter(data, slug)
|
|
body = _contact_body(data.notas)
|
|
folder = _TIPO_FOLDER[tipo]
|
|
# Acción primaria: la ficha del vault es la fuente de verdad.
|
|
create_obsidian_note(
|
|
self.vault_dir,
|
|
os.path.join(folder, slug),
|
|
body=body,
|
|
frontmatter=frontmatter,
|
|
)
|
|
# Reflejo inmediato en Xandikos (no rompe el alta si Xandikos cae).
|
|
vcard_fm = dict(frontmatter)
|
|
vcard_fm["_notas"] = _norm_str(data.notas)
|
|
dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug))
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "uid": slug, "path": note_path, "dav": dav}
|
|
|
|
def update_contact(self, slug: str, data: "ContactIn") -> dict:
|
|
"""Edita un contacto existente: merge del frontmatter + re-PUT del vCard.
|
|
|
|
Localiza la ficha por slug (en personas/ u organizaciones/ según el tipo
|
|
de la ficha actual). 404 si no existe. Hace merge de los campos editables
|
|
sobre el frontmatter actual (preserva campos heredados no tocados como
|
|
``sexo``, ``fecha_nacimiento``, ``horoscopo``), reescribe el body
|
|
``## Notas`` y re-sube el vCard. Invalida las cachés DAV.
|
|
|
|
Returns:
|
|
dict ``{slug, uid, path, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(404): si no existe la ficha del contacto.
|
|
"""
|
|
if _osint_db_backend_enabled():
|
|
# Flag ON: delega la edición en el osint_db (PUT por UID).
|
|
res = osintdb_client.update_contact(
|
|
slug, _osint_db_contact_payload(data)
|
|
)
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "uid": slug, "path": None, "osint_db": res}
|
|
path = self._find_contact_note(slug)
|
|
if path is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % slug
|
|
)
|
|
note = read_obsidian_note(path)
|
|
current = dict(note.get("frontmatter") or {})
|
|
# Listas multi-valor (ya reconciladas con los singulares en ContactIn).
|
|
telefonos = _norm_list(data.telefonos)
|
|
emails = _norm_list(data.emails)
|
|
direcciones = _norm_list(data.direcciones)
|
|
# Merge de los campos editables (preserva los heredados no tocados). El
|
|
# singular se conserva = primer elemento para los lectores viejos.
|
|
merged = {
|
|
"nombre": data.nombre.strip() or current.get("nombre") or slug,
|
|
"aliases": _norm_list(data.aliases),
|
|
"telefono": telefonos[0] if telefonos else None,
|
|
"telefonos": telefonos,
|
|
"email": emails[0] if emails else None,
|
|
"emails": emails,
|
|
"direccion": direcciones[0] if direcciones else None,
|
|
"direcciones": direcciones,
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": _norm_list(data.relaciones),
|
|
"contexto": _norm_str(data.contexto),
|
|
}
|
|
if current.get("tipo") != "organizacion":
|
|
merged["dni"] = _norm_str(data.dni)
|
|
current.update(merged)
|
|
update_obsidian_note(
|
|
path, body=_contact_body(data.notas), set_frontmatter=current
|
|
)
|
|
vcard_fm = dict(current)
|
|
vcard_fm["_notas"] = _norm_str(data.notas)
|
|
dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug))
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "uid": slug, "path": path, "dav": dav}
|
|
|
|
def delete_contact(self, slug: str) -> dict:
|
|
"""Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos.
|
|
|
|
404 si la ficha no existe. Borra el archivo ``.md`` (acción primaria) y
|
|
el recurso ``<slug>.vcf`` de Xandikos (reflejo). Invalida las cachés DAV.
|
|
|
|
Returns:
|
|
dict ``{slug, deleted: True, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(404): si no existe la ficha del contacto.
|
|
"""
|
|
if _osint_db_backend_enabled():
|
|
# Flag ON: delega el borrado en el osint_db (DELETE por UID).
|
|
res = osintdb_client.delete_contact(slug)
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "deleted": True, "osint_db": res}
|
|
path = self._find_contact_note(slug)
|
|
if path is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % slug
|
|
)
|
|
delete_obsidian_note(path)
|
|
dav = self._delete_vcard(slug)
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "deleted": True, "dav": dav}
|
|
|
|
def _find_contact_note(self, slug: str):
|
|
"""Localiza la ficha ``.md`` de un contacto por slug, o None.
|
|
|
|
Busca ``personas/<slug>.md`` y ``organizaciones/<slug>.md`` (los dos
|
|
tipos de contacto). Devuelve el primer path existente o None.
|
|
"""
|
|
for folder in ("personas", "organizaciones"):
|
|
candidate = os.path.join(self._vault_real, folder, slug + ".md")
|
|
if os.path.isfile(candidate):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers DAV: parseo ligero de vCard / iCalendar a JSON
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _unescape_ical(value: str) -> str:
|
|
"""Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350)."""
|
|
return (
|
|
value.replace("\\n", "\n")
|
|
.replace("\\N", "\n")
|
|
.replace("\\,", ",")
|
|
.replace("\\;", ";")
|
|
.replace("\\\\", "\\")
|
|
)
|
|
|
|
|
|
def _unfold_lines(text: str) -> list:
|
|
"""Des-pliega las líneas continuadas (folding) de un vCard/iCalendar.
|
|
|
|
RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la
|
|
anterior. Esta función las une para parsearlas como propiedades completas.
|
|
"""
|
|
raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
|
|
unfolded: list = []
|
|
for line in raw_lines:
|
|
if line[:1] in (" ", "\t") and unfolded:
|
|
unfolded[-1] += line[1:]
|
|
else:
|
|
unfolded.append(line)
|
|
return unfolded
|
|
|
|
|
|
def _parse_property(line: str) -> Optional[tuple]:
|
|
"""Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``.
|
|
|
|
Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si
|
|
la línea no es una propiedad (sin ``:``). El nombre se devuelve en
|
|
mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden
|
|
Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``);
|
|
los params como dict con claves en mayúsculas.
|
|
"""
|
|
if ":" not in line:
|
|
return None
|
|
head, value = line.split(":", 1)
|
|
parts = head.split(";")
|
|
name = parts[0].strip()
|
|
# Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping).
|
|
if "." in name:
|
|
name = name.rsplit(".", 1)[-1]
|
|
name = name.upper()
|
|
params: dict = {}
|
|
for part in parts[1:]:
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
params[k.strip().upper()] = v.strip()
|
|
return name, params, value
|
|
|
|
|
|
def _vcard_to_json(vcard_text: str) -> dict:
|
|
"""Convierte un VCARD a un dict JSON con los campos de interés.
|
|
|
|
Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME),
|
|
teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque
|
|
``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en
|
|
minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` →
|
|
``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard
|
|
ya viene troceado por ``split_vcards``.
|
|
|
|
Expone tanto las claves en español que consume el frontend del task
|
|
(``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con
|
|
tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el
|
|
frontend a un único shape.
|
|
"""
|
|
out: dict = {
|
|
"uid": None,
|
|
"fn": None,
|
|
"nickname": None,
|
|
"org": None,
|
|
"note": None,
|
|
"phones": [],
|
|
"emails": [],
|
|
"direcciones": [],
|
|
"osint": {},
|
|
}
|
|
for line in _unfold_lines(vcard_text):
|
|
parsed = _parse_property(line)
|
|
if not parsed:
|
|
continue
|
|
name, params, value = parsed
|
|
# ADR es estructurado (7 componentes separados por ';'): NO se des-escapa
|
|
# antes de partir, para no confundir separadores con contenido escapado.
|
|
if name == "ADR":
|
|
adr = _parse_adr_value(value)
|
|
if adr:
|
|
out["direcciones"].append(adr)
|
|
continue
|
|
value = _unescape_ical(value.strip())
|
|
if name == "UID":
|
|
out["uid"] = value
|
|
elif name == "FN":
|
|
out["fn"] = value
|
|
elif name == "NICKNAME":
|
|
out["nickname"] = value
|
|
elif name == "ORG":
|
|
out["org"] = value.replace(";", " ").strip()
|
|
elif name == "NOTE":
|
|
out["note"] = value
|
|
elif name == "TEL":
|
|
out["phones"].append({"value": value, "type": params.get("TYPE", "")})
|
|
elif name == "EMAIL":
|
|
out["emails"].append({"value": value, "type": params.get("TYPE", "")})
|
|
elif name.startswith("X-OSINT-"):
|
|
key = name[len("X-OSINT-") :].lower().replace("-", "_")
|
|
if key:
|
|
out["osint"][key] = value
|
|
elif name == "N" and not out["fn"]:
|
|
# Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido".
|
|
comps = [c for c in value.split(";") if c]
|
|
if len(comps) >= 2:
|
|
out["fn"] = ("%s %s" % (comps[1], comps[0])).strip()
|
|
elif comps:
|
|
out["fn"] = comps[0]
|
|
# Compat con vCards antiguos: la dirección iba en X-OSINT-DIRECCION (un solo
|
|
# valor) en vez de ADR. Si vino por ahí y no hay ADR, súbela a direcciones[]
|
|
# para que el frontend la vea como multi-valor; deja también osint.direccion
|
|
# por si algún lector viejo lo consulta.
|
|
legacy_dir = out["osint"].get("direccion")
|
|
if legacy_dir and legacy_dir not in out["direcciones"]:
|
|
out["direcciones"].append(legacy_dir)
|
|
# Alias en español que consume el frontend del task (mismo dato, otra clave).
|
|
out["nombre"] = out["fn"]
|
|
out["alias"] = out["nickname"]
|
|
out["nota"] = out["note"]
|
|
out["telefonos"] = [p["value"] for p in out["phones"]]
|
|
out["correos"] = [e["value"] for e in out["emails"]]
|
|
return out
|
|
|
|
|
|
def _parse_adr_value(raw: str) -> Optional[str]:
|
|
"""Extrae la dirección legible de un valor ADR estructurado (RFC 6350).
|
|
|
|
El ADR tiene 7 componentes separados por ``;``:
|
|
``po-box;extended;street;locality;region;postal-code;country``. Esta función
|
|
une los componentes no vacíos (des-escapados) en una sola línea legible, con
|
|
preferencia por ``street``; si solo hay un campo, lo devuelve. Devuelve
|
|
``None`` si el ADR queda vacío.
|
|
"""
|
|
parts = raw.split(";")
|
|
# Des-escapa cada componente por separado (el ';' ya se usó para partir).
|
|
comps = [_unescape_ical(p.strip()) for p in parts]
|
|
nonempty = [c for c in comps if c]
|
|
if not nonempty:
|
|
return None
|
|
# street es el 3er componente (índice 2). Si está, suele bastar; si hay más
|
|
# (locality, region, etc.) se concatenan con coma para una línea legible.
|
|
if len(comps) >= 3 and comps[2]:
|
|
tail = [c for c in comps[3:] if c]
|
|
return ", ".join([comps[2]] + tail) if tail else comps[2]
|
|
return ", ".join(nonempty)
|
|
|
|
|
|
_VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE)
|
|
_ICAL_DT_RE = re.compile(
|
|
r"^(\d{4})(\d{2})(\d{2})(?:T(\d{2})(\d{2})(\d{2})?)?(Z)?$"
|
|
)
|
|
|
|
|
|
def _zoneinfo(tzid: str):
|
|
"""``ZoneInfo(tzid)`` o ``None`` si el tz no existe / falta tzdata.
|
|
|
|
Nunca lanza: un TZID desconocido (o un sistema sin base de zonas) degrada a
|
|
None y el llamador trata la hora como naive/local, sin tumbar el parseo.
|
|
"""
|
|
if ZoneInfo is None or not tzid:
|
|
return None
|
|
try:
|
|
return ZoneInfo(tzid)
|
|
except (ZoneInfoNotFoundError, ValueError, KeyError):
|
|
return None
|
|
|
|
|
|
def _parse_ical_datetime(value: str, params: dict) -> Optional[dict]:
|
|
"""Parsea un valor DTSTART/DTEND iCal a una representación normalizada.
|
|
|
|
Maneja las tres formas del calendario:
|
|
- UTC: ``20260611T090000Z`` (sufijo Z).
|
|
- con zona: ``DTSTART;TZID=Europe/Madrid:20260611T090000`` (param TZID).
|
|
- solo fecha (todo el día): ``DTSTART;VALUE=DATE:20260611`` o sin hora.
|
|
|
|
Returns:
|
|
dict ``{iso, tz, all_day, ical}`` o ``None`` si no parsea. ``iso`` es
|
|
ISO 8601 con offset cuando hay zona/UTC (``2026-06-11T11:00:00+02:00``)
|
|
o ``YYYY-MM-DD`` para todo el día; ``tz`` es el TZID original
|
|
(``Europe/Madrid``, ``UTC``, o None si naive/all-day); ``all_day`` True
|
|
si es solo fecha; ``ical`` el prefijo ``YYYYMMDD`` para el filtro de
|
|
rango.
|
|
"""
|
|
value = (value or "").strip()
|
|
m = _ICAL_DT_RE.match(value)
|
|
if not m:
|
|
return None
|
|
year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3))
|
|
has_time = m.group(4) is not None
|
|
is_utc = m.group(7) == "Z"
|
|
is_date_value = (params.get("VALUE", "").upper() == "DATE") or not has_time
|
|
ical_prefix = "%04d%02d%02d" % (year, month, day)
|
|
if is_date_value:
|
|
return {
|
|
"iso": "%04d-%02d-%02d" % (year, month, day),
|
|
"tz": None,
|
|
"all_day": True,
|
|
"ical": ical_prefix,
|
|
}
|
|
hour = int(m.group(4))
|
|
minute = int(m.group(5))
|
|
second = int(m.group(6)) if m.group(6) else 0
|
|
tzid = params.get("TZID", "")
|
|
if is_utc:
|
|
dt = datetime(year, month, day, hour, minute, second, tzinfo=timezone.utc)
|
|
tz_name = "UTC"
|
|
elif tzid and _zoneinfo(tzid) is not None:
|
|
dt = datetime(year, month, day, hour, minute, second, tzinfo=_zoneinfo(tzid))
|
|
tz_name = tzid
|
|
else:
|
|
# Hora "flotante" (sin Z ni TZID, o TZID desconocido): se interpreta como
|
|
# local del visor. La servimos sin offset; el frontend la sitúa en su TZ.
|
|
return {
|
|
"iso": "%04d-%02d-%02dT%02d:%02d:%02d"
|
|
% (year, month, day, hour, minute, second),
|
|
"tz": tzid or None,
|
|
"all_day": False,
|
|
"ical": ical_prefix,
|
|
}
|
|
return {"iso": dt.isoformat(), "tz": tz_name, "all_day": False, "ical": ical_prefix}
|
|
|
|
|
|
def _vevent_to_json(vevent_block: str) -> dict:
|
|
"""Convierte un bloque VEVENT a un dict JSON con los campos de interés.
|
|
|
|
Extrae: uid, summary, dtstart/dtend (ISO con offset cuando hay zona/UTC, o
|
|
``YYYY-MM-DD`` para todo el día), la TZ original (``tz``), ``all_day``,
|
|
location, description y color (propiedad ``COLOR`` RFC 7986 o
|
|
``X-APPLE-CALENDAR-COLOR`` si el evento la trae). ``dtstart_ical`` /
|
|
``dtend_ical`` conservan el prefijo ``YYYYMMDD`` crudo para el filtro de
|
|
rango. Parseo ligero a mano (sin dependencia externa).
|
|
"""
|
|
out: dict = {
|
|
"uid": None,
|
|
"summary": None,
|
|
"dtstart": None,
|
|
"dtend": None,
|
|
"dtstart_ical": None,
|
|
"dtend_ical": None,
|
|
"tz": None,
|
|
"all_day": False,
|
|
"location": None,
|
|
"description": None,
|
|
"color": None,
|
|
"rrule": None,
|
|
"recurring": False,
|
|
"occurrence": False,
|
|
}
|
|
for line in _unfold_lines(vevent_block):
|
|
parsed = _parse_property(line)
|
|
if not parsed:
|
|
continue
|
|
name, params, value = parsed
|
|
value = value.strip()
|
|
if name == "UID":
|
|
out["uid"] = value
|
|
elif name == "SUMMARY":
|
|
out["summary"] = _unescape_ical(value)
|
|
elif name == "DTSTART":
|
|
dt = _parse_ical_datetime(value, params)
|
|
if dt:
|
|
out["dtstart"] = dt["iso"]
|
|
out["dtstart_ical"] = dt["ical"]
|
|
out["tz"] = dt["tz"]
|
|
out["all_day"] = dt["all_day"]
|
|
else:
|
|
out["dtstart"] = value
|
|
elif name == "DTEND":
|
|
dt = _parse_ical_datetime(value, params)
|
|
if dt:
|
|
out["dtend"] = dt["iso"]
|
|
out["dtend_ical"] = dt["ical"]
|
|
else:
|
|
out["dtend"] = value
|
|
elif name == "LOCATION":
|
|
out["location"] = _unescape_ical(value)
|
|
elif name == "DESCRIPTION":
|
|
out["description"] = _unescape_ical(value)
|
|
elif name in ("COLOR", "X-APPLE-CALENDAR-COLOR"):
|
|
out["color"] = value
|
|
elif name == "RRULE":
|
|
out["rrule"] = value
|
|
out["recurring"] = True
|
|
return out
|
|
|
|
|
|
def _vcalendar_to_events(vcalendar_text: str) -> list:
|
|
"""Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON."""
|
|
events = []
|
|
for block in _VEVENT_RE.findall(vcalendar_text):
|
|
events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT"))
|
|
return events
|
|
|
|
|
|
def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool:
|
|
"""True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``.
|
|
|
|
Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` (``dtstart_ical``);
|
|
si falta, se deriva del ISO de ``dtstart``. Los límites se normalizan
|
|
quitando los guiones, así acepta tanto ``2026-06-11`` como ``20260611``.
|
|
``dt_from``/``dt_to`` vacíos desactivan ese extremo del filtro.
|
|
"""
|
|
dtstart = event.get("dtstart_ical") or ""
|
|
if not dtstart:
|
|
dtstart = (event.get("dtstart") or "").replace("-", "")[:8]
|
|
dtstart = dtstart.replace("-", "")[:8]
|
|
if not dtstart:
|
|
return True
|
|
if dt_from and dtstart < dt_from.replace("-", "")[:8]:
|
|
return False
|
|
if dt_to and dtstart > dt_to.replace("-", "")[:8]:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _default_expand_start() -> str:
|
|
"""Límite inferior por defecto al expandir una serie sin rango explícito."""
|
|
return (datetime.now(timezone.utc) - timedelta(days=366)).strftime("%Y%m%d")
|
|
|
|
|
|
def _default_expand_end() -> str:
|
|
"""Límite superior por defecto al expandir una serie sin rango explícito."""
|
|
return (datetime.now(timezone.utc) + timedelta(days=731)).strftime("%Y%m%d")
|
|
|
|
|
|
def _shift_iso_days(value: str, days: int) -> str:
|
|
"""Desplaza la parte de fecha de un ISO (``YYYY-MM-DD`` o con ``T...``).
|
|
|
|
Conserva intacta la parte horaria/offset (``T10:00:00+02:00``) y solo mueve
|
|
la fecha ``days`` días. Para una fecha pura mueve la fecha sola. Si el valor
|
|
no parsea, lo devuelve sin tocar (defensivo).
|
|
"""
|
|
if not value:
|
|
return value
|
|
date_part = value[:10]
|
|
rest = value[10:]
|
|
try:
|
|
base = datetime.strptime(date_part, "%Y-%m-%d")
|
|
except ValueError:
|
|
return value
|
|
shifted = (base + timedelta(days=days)).strftime("%Y-%m-%d")
|
|
return shifted + rest
|
|
|
|
|
|
def _occurrence_clone(event: dict, occ_ymd: str) -> dict:
|
|
"""Clona un evento maestro recurrente reubicado en la fecha ``occ_ymd``.
|
|
|
|
Mantiene la hora local / offset del maestro (solo cambia la fecha) y aplica el
|
|
mismo desplazamiento de días al ``dtend`` para preservar la duración. Marca
|
|
``occurrence=True`` cuando la fecha difiere de la del maestro (la primera
|
|
ocurrencia coincide con el maestro y queda ``occurrence=False``).
|
|
"""
|
|
master_ymd = (
|
|
event.get("dtstart_ical") or (event.get("dtstart") or "").replace("-", "")
|
|
)[:8]
|
|
new_date_iso = "%s-%s-%s" % (occ_ymd[:4], occ_ymd[4:6], occ_ymd[6:8])
|
|
clone = dict(event)
|
|
ds = event.get("dtstart") or ""
|
|
if event.get("all_day") or len(ds) == 10:
|
|
clone["dtstart"] = new_date_iso
|
|
else:
|
|
clone["dtstart"] = new_date_iso + ds[10:]
|
|
clone["dtstart_ical"] = occ_ymd
|
|
try:
|
|
delta = (
|
|
datetime.strptime(occ_ymd, "%Y%m%d")
|
|
- datetime.strptime(master_ymd, "%Y%m%d")
|
|
).days
|
|
except ValueError:
|
|
delta = 0
|
|
de = event.get("dtend")
|
|
if de:
|
|
clone["dtend"] = _shift_iso_days(de, delta)
|
|
de_ical = event.get("dtend_ical")
|
|
if de_ical:
|
|
clone["dtend_ical"] = (clone["dtend"] or "").replace("-", "")[:8]
|
|
clone["occurrence"] = occ_ymd != master_ymd
|
|
return clone
|
|
|
|
|
|
def _expand_event_occurrences(event: dict, dt_from: str, dt_to: str) -> list:
|
|
"""Expande un evento recurrente a sus ocurrencias dentro de ``[from, to]``.
|
|
|
|
Compone la función pura del registry ``expand_rrule`` (solo necesita las
|
|
FECHAS de cada ocurrencia; la hora local se preserva clonando el maestro). Si
|
|
el evento no tiene ``rrule``, o algo no parsea, devuelve ``[event]`` sin
|
|
tocar — nunca pierde el evento original.
|
|
"""
|
|
rrule = event.get("rrule")
|
|
if not rrule:
|
|
return [event]
|
|
master_ymd = (
|
|
event.get("dtstart_ical") or (event.get("dtstart") or "").replace("-", "")
|
|
)[:8]
|
|
if len(master_ymd) < 8:
|
|
return [event]
|
|
rs = (dt_from or "").replace("-", "")[:8] or _default_expand_start()
|
|
re_ = (dt_to or "").replace("-", "")[:8] or _default_expand_end()
|
|
try:
|
|
occ_dates = expand_rrule(master_ymd, rrule, rs, re_, all_day=True)
|
|
except Exception:
|
|
return [event]
|
|
if not occ_dates:
|
|
return []
|
|
return [_occurrence_clone(event, d) for d in occ_dates]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Escritura de contactos: ficha .md del vault (fuente de verdad) + vCard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Subcarpeta del vault por tipo de contacto. La fuente de verdad de un contacto
|
|
# es su ficha en el vault (CONVENTIONS.md §3b para persona, §6 para organización);
|
|
# Xandikos es solo el retransmisor al móvil.
|
|
_TIPO_FOLDER = {"persona": "personas", "organizacion": "organizaciones"}
|
|
|
|
# Tags por defecto de cada tipo (CONVENTIONS.md). Se preservan si la ficha ya
|
|
# trae otros tags al editar.
|
|
_TIPO_TAGS = {
|
|
"persona": ["persona", "osint"],
|
|
"organizacion": ["organizacion", "osint"],
|
|
}
|
|
|
|
|
|
class ContactIn(BaseModel):
|
|
"""Cuerpo de POST/PUT de un contacto (persona u organización).
|
|
|
|
Refleja el esquema canónico del frontmatter (CONVENTIONS.md §3b/§6). Los
|
|
campos vacíos se normalizan a ``null``/``[]`` al escribir la ficha, nunca se
|
|
omiten, para que el score de completitud sea consistente.
|
|
|
|
Multi-valor: un contacto puede tener VARIOS teléfonos, emails y direcciones
|
|
(``telefonos``/``emails``/``direcciones``). Los campos singulares
|
|
``telefono``/``email``/``direccion`` se conservan por compatibilidad con
|
|
clientes y lectores viejos: el validador ``model_post_init`` los reconcilia
|
|
con las listas (singular → ``[valor]`` si la lista está vacía; y el singular
|
|
se rellena con ``lista[0]`` para que los lectores que solo miran el singular
|
|
sigan funcionando).
|
|
"""
|
|
|
|
tipo: str = Field(default="persona")
|
|
nombre: str
|
|
aliases: list[str] = Field(default_factory=list)
|
|
# Singulares (compat) — el primer elemento de cada lista multi-valor.
|
|
telefono: Optional[str] = None
|
|
email: Optional[str] = None
|
|
direccion: Optional[str] = None
|
|
# Multi-valor: listas completas de teléfonos, emails y direcciones.
|
|
telefonos: list[str] = Field(default_factory=list)
|
|
emails: list[str] = Field(default_factory=list)
|
|
direcciones: list[str] = Field(default_factory=list)
|
|
dni: Optional[str] = None
|
|
pais: Optional[str] = None
|
|
contexto: Optional[str] = None
|
|
relaciones: list[str] = Field(default_factory=list)
|
|
notas: Optional[str] = None
|
|
# Libreta (addressbook) destino. Solo se consume con el flag OSINT_DB_BACKEND
|
|
# ON (el osint_db enruta el contacto a esa colección). Con el flag OFF se
|
|
# ignora: hoy solo existe la libreta por defecto en el vault. None → libreta
|
|
# por defecto.
|
|
collection: Optional[str] = None
|
|
|
|
def model_post_init(self, __context: object) -> None:
|
|
"""Reconcilia los campos singulares con las listas multi-valor.
|
|
|
|
Para cada par (singular, lista): si la lista llega vacía pero el singular
|
|
trae valor, la lista se siembra con ``[singular]`` (cliente viejo que solo
|
|
envía el campo singular); y siempre se rellena el singular con el primer
|
|
elemento normalizado de la lista, para que los lectores que solo miran el
|
|
singular (frontmatter compat, vCard heredado) sigan funcionando.
|
|
"""
|
|
for singular, plural in (
|
|
("telefono", "telefonos"),
|
|
("email", "emails"),
|
|
("direccion", "direcciones"),
|
|
):
|
|
lista = _norm_list(getattr(self, plural))
|
|
if not lista:
|
|
single = _norm_str(getattr(self, singular))
|
|
if single:
|
|
lista = [single]
|
|
object.__setattr__(self, plural, lista)
|
|
object.__setattr__(self, singular, lista[0] if lista else None)
|
|
|
|
|
|
def _norm_str(value: Optional[str]) -> Optional[str]:
|
|
"""Normaliza un string opcional: trim; cadena vacía → None."""
|
|
if value is None:
|
|
return None
|
|
value = value.strip()
|
|
return value or None
|
|
|
|
|
|
def _norm_list(values: Optional[list]) -> list:
|
|
"""Normaliza una lista de strings: trim cada item y descarta los vacíos."""
|
|
if not values:
|
|
return []
|
|
out = []
|
|
for v in values:
|
|
s = (v or "").strip()
|
|
if s:
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def _contact_frontmatter(data: "ContactIn", slug: str) -> dict:
|
|
"""Construye el frontmatter canónico de la ficha de un contacto.
|
|
|
|
Para ``tipo: persona`` sigue el esquema completo de CONVENTIONS.md §3b
|
|
(todos los campos presentes, ``null``/``[]`` si vacíos). Para
|
|
``tipo: organizacion`` usa el subconjunto de §6. El orden de claves se
|
|
preserva (``yaml.safe_dump(sort_keys=False)`` en ``format_obsidian_note``).
|
|
"""
|
|
nombre = data.nombre.strip()
|
|
aliases = _norm_list(data.aliases)
|
|
relaciones = _norm_list(data.relaciones)
|
|
# Listas multi-valor ya reconciladas en ContactIn.model_post_init; el campo
|
|
# singular = primer elemento (o None) para los lectores viejos.
|
|
telefonos = _norm_list(data.telefonos)
|
|
emails = _norm_list(data.emails)
|
|
direcciones = _norm_list(data.direcciones)
|
|
if data.tipo == "organizacion":
|
|
return {
|
|
"tipo": "organizacion",
|
|
"nombre": nombre,
|
|
"slug": slug,
|
|
"aliases": aliases,
|
|
"telefono": telefonos[0] if telefonos else None,
|
|
"telefonos": telefonos,
|
|
"email": emails[0] if emails else None,
|
|
"emails": emails,
|
|
"direccion": direcciones[0] if direcciones else None,
|
|
"direcciones": direcciones,
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": relaciones,
|
|
"contexto": _norm_str(data.contexto),
|
|
"fuente": "osint_web (alta manual)",
|
|
"tags": list(_TIPO_TAGS["organizacion"]),
|
|
}
|
|
# Persona: esquema canónico §3b.
|
|
return {
|
|
"tipo": "persona",
|
|
"nombre": nombre,
|
|
"slug": slug,
|
|
"aliases": aliases,
|
|
"sexo": None,
|
|
"fecha_nacimiento": None,
|
|
"dni": _norm_str(data.dni),
|
|
"telefono": telefonos[0] if telefonos else None,
|
|
"telefonos": telefonos,
|
|
"email": emails[0] if emails else None,
|
|
"emails": emails,
|
|
"direccion": direcciones[0] if direcciones else None,
|
|
"direcciones": direcciones,
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": relaciones,
|
|
"contexto": _norm_str(data.contexto),
|
|
"fuente": "osint_web (alta manual)",
|
|
"tags": list(_TIPO_TAGS["persona"]),
|
|
}
|
|
|
|
|
|
def _contact_body(notas: Optional[str]) -> str:
|
|
"""Cuerpo Markdown de la ficha: sección ``## Notas`` con el texto libre."""
|
|
notas = _norm_str(notas)
|
|
if notas:
|
|
return "## Notas\n\n%s\n" % notas
|
|
return "## Notas\n"
|
|
|
|
|
|
def _vcard_escape(value: str) -> str:
|
|
"""Escapa un valor de texto para una línea vCard (RFC 6350).
|
|
|
|
El retorno de carro ``\\r`` crudo se ELIMINA (no se escapa): solo, sin un
|
|
``\\n`` que lo siga, sobreviviría al escape de ``\\n`` y quedaría como carácter
|
|
de control. ``_unfold_lines`` normaliza ``\\r`` a ``\\n``, así que un ``\\r``
|
|
crudo en un valor permitiría inyectar propiedades nuevas (p. ej.
|
|
``X-OSINT-DNI``) en la tarjeta o, al reutilizarse esta función para SUMMARY/
|
|
LOCATION del VEVENT, en el VCALENDAR. Eliminarlo cierra ese vector.
|
|
"""
|
|
return (
|
|
value.replace("\\", "\\\\")
|
|
.replace("\r", "")
|
|
.replace("\n", "\\n")
|
|
.replace(",", "\\,")
|
|
.replace(";", "\\;")
|
|
)
|
|
|
|
|
|
def _vcard_value_list(frontmatter: dict, plural: str, singular: str) -> list:
|
|
"""Lista de valores de un campo multi-valor del frontmatter de contacto.
|
|
|
|
Prefiere la clave plural (``telefonos``/``emails``/``direcciones``); si está
|
|
vacía cae al singular (``telefono``/...) por compatibilidad con fichas
|
|
antiguas. Normaliza (trim + descarta vacíos) y devuelve una lista de strings.
|
|
"""
|
|
values = frontmatter.get(plural)
|
|
if not values:
|
|
single = frontmatter.get(singular)
|
|
values = [single] if single else []
|
|
return _norm_list([str(v) for v in values])
|
|
|
|
|
|
def _build_vcard(frontmatter: dict, slug: str) -> str:
|
|
"""Serializa un frontmatter de contacto a un VCARD 3.0 con el UID = slug.
|
|
|
|
Soporta multi-valor: emite una línea ``TEL`` por teléfono, una ``EMAIL`` por
|
|
email y una ``ADR`` por dirección (campos ``telefonos``/``emails``/
|
|
``direcciones`` del frontmatter; cae a los singulares ``telefono``/... por
|
|
compat). Mapea además: nombre→FN, aliases→NICKNAME, notas→NOTE,
|
|
organización→ORG; y los campos OSINT (dni, pais, contexto, sexo,
|
|
fecha_nacimiento) a propiedades ``X-OSINT-*`` que el parser ``_vcard_to_json``
|
|
ya entiende. El UID es el slug → idempotente: re-subir el mismo slug
|
|
sobrescribe el recurso ``<slug>.vcf``.
|
|
"""
|
|
nombre = (frontmatter.get("nombre") or slug).strip()
|
|
lines = [
|
|
"BEGIN:VCARD",
|
|
"VERSION:3.0",
|
|
"UID:%s" % slug,
|
|
"FN:%s" % _vcard_escape(nombre),
|
|
]
|
|
aliases = frontmatter.get("aliases") or []
|
|
if aliases:
|
|
lines.append("NICKNAME:%s" % _vcard_escape(",".join(str(a) for a in aliases)))
|
|
if frontmatter.get("tipo") == "organizacion":
|
|
lines.append("ORG:%s" % _vcard_escape(nombre))
|
|
# Multi-valor: una línea TEL/EMAIL por elemento.
|
|
for tel in _vcard_value_list(frontmatter, "telefonos", "telefono"):
|
|
lines.append("TEL;TYPE=CELL:%s" % _vcard_escape(tel))
|
|
for email in _vcard_value_list(frontmatter, "emails", "email"):
|
|
lines.append("EMAIL;TYPE=INTERNET:%s" % _vcard_escape(email))
|
|
# Direcciones → ADR estructurado (la dirección va en el componente street;
|
|
# los separadores ';' del ADR NO se escapan, solo el contenido). Una línea
|
|
# ADR por dirección. El parser _vcard_to_json reconstruye la lista desde ADR.
|
|
for adr in _vcard_value_list(frontmatter, "direcciones", "direccion"):
|
|
lines.append("ADR;TYPE=HOME:;;%s;;;;" % _vcard_escape(adr))
|
|
# Campos OSINT → X-OSINT-* (los recoge _vcard_to_json en el bloque osint).
|
|
for fm_key, x_name in (
|
|
("dni", "X-OSINT-DNI"),
|
|
("pais", "X-OSINT-PAIS"),
|
|
("contexto", "X-OSINT-CONTEXTO"),
|
|
("sexo", "X-OSINT-SEXO"),
|
|
("fecha_nacimiento", "X-OSINT-FECHA-NACIMIENTO"),
|
|
):
|
|
val = frontmatter.get(fm_key)
|
|
if val:
|
|
lines.append("%s:%s" % (x_name, _vcard_escape(str(val))))
|
|
notas = frontmatter.get("_notas")
|
|
if notas:
|
|
lines.append("NOTE:%s" % _vcard_escape(str(notas)))
|
|
lines.append("END:VCARD")
|
|
return "\r\n".join(lines) + "\r\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Escritura de eventos del calendario: construcción de VEVENT / VCALENDAR
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_ISO_DT_RE = re.compile(
|
|
r"^(\d{4})-(\d{2})-(\d{2})(?:[T ](\d{2}):(\d{2})(?::(\d{2}))?)?"
|
|
r"(Z|[+-]\d{2}:?\d{2})?$"
|
|
)
|
|
|
|
# Saneado del nombre del recurso .ics: DEBE coincidir con el que aplica
|
|
# caldav_put_event internamente (mismo patrón), para que el DELETE apunte al
|
|
# recurso que el PUT creó.
|
|
_UNSAFE_RESOURCE_RE = re.compile(r"[^A-Za-z0-9_.-]")
|
|
|
|
|
|
def _safe_event_resource(uid: str) -> str:
|
|
"""Nombre del recurso ``.ics`` de un UID (igual que caldav_put_event)."""
|
|
return _UNSAFE_RESOURCE_RE.sub("_", uid)[:120] + ".ics"
|
|
|
|
|
|
class EventIn(BaseModel):
|
|
"""Cuerpo de POST/PUT de un evento del calendario (VEVENT).
|
|
|
|
Las fechas se aceptan en ISO local sin offset (``2026-06-15T10:00``) + un
|
|
``tz`` (TZID, p.ej. ``Europe/Madrid``); o con offset/``Z`` ya incluido. El
|
|
servidor las normaliza al construir el VEVENT (``DTSTART;TZID=...`` o
|
|
``...Z``). Para eventos de todo el día basta ``dtstart`` = ``2026-06-15`` con
|
|
``all_day=True``.
|
|
"""
|
|
|
|
cal: Optional[str] = None
|
|
summary: str
|
|
dtstart: str
|
|
dtend: Optional[str] = None
|
|
tz: Optional[str] = Field(default="Europe/Madrid")
|
|
all_day: bool = False
|
|
location: Optional[str] = None
|
|
description: Optional[str] = None
|
|
color: Optional[str] = None
|
|
# Regla de recurrencia iCalendar SIN el prefijo "RRULE:" (p.ej.
|
|
# "FREQ=WEEKLY;INTERVAL=1;COUNT=10"). None / "" → evento puntual. Editar un
|
|
# evento recurrente reescribe toda la serie (no se soporta editar una sola
|
|
# ocurrencia).
|
|
rrule: Optional[str] = None
|
|
|
|
|
|
class CalendarIn(BaseModel):
|
|
"""Cuerpo de POST /api/calendars: crea una colección de calendario nueva.
|
|
|
|
El ``slug`` es el segmento de URL de la colección (lo sanea la función del
|
|
registry ``dav_make_calendar`` a ``[a-z0-9_-]``). ``name`` es el nombre
|
|
visible; ``color`` un hex ``#rrggbb`` opcional.
|
|
"""
|
|
|
|
slug: str
|
|
name: Optional[str] = ""
|
|
color: Optional[str] = None
|
|
description: Optional[str] = None
|
|
|
|
|
|
class AddressbookIn(BaseModel):
|
|
"""Cuerpo de POST /api/addressbooks: crea una libreta de contactos nueva.
|
|
|
|
``slug`` es el segmento de URL de la colección CardDAV; ``name`` el nombre
|
|
visible; ``color`` un hex ``#rrggbb`` opcional. Solo se procesa con el flag
|
|
``OSINT_DB_BACKEND`` activo (el osint_db crea la colección en Xandikos).
|
|
"""
|
|
|
|
slug: str
|
|
name: Optional[str] = ""
|
|
color: Optional[str] = None
|
|
|
|
|
|
def _parse_iso_input(value: str) -> Optional[dict]:
|
|
"""Parsea una fecha de entrada ISO a ``{year,month,day,hour,minute,second,
|
|
offset,date_only}`` o ``None``.
|
|
|
|
Acepta ``2026-06-15``, ``2026-06-15T10:00``, ``2026-06-15T10:00:00`` y
|
|
variantes con ``Z`` o ``±HH:MM`` al final. Es tolerante a la separación con
|
|
espacio en vez de ``T``.
|
|
"""
|
|
value = (value or "").strip()
|
|
m = _ISO_DT_RE.match(value)
|
|
if not m:
|
|
return None
|
|
has_time = m.group(4) is not None
|
|
return {
|
|
"year": int(m.group(1)),
|
|
"month": int(m.group(2)),
|
|
"day": int(m.group(3)),
|
|
"hour": int(m.group(4)) if has_time else 0,
|
|
"minute": int(m.group(5)) if has_time else 0,
|
|
"second": int(m.group(6)) if (has_time and m.group(6)) else 0,
|
|
"offset": m.group(7),
|
|
"date_only": not has_time,
|
|
}
|
|
|
|
|
|
def _ical_dt_property(prop: str, value: str, tz: Optional[str], all_day: bool) -> str:
|
|
"""Construye una línea DTSTART/DTEND iCal a partir de una fecha de entrada.
|
|
|
|
- all_day → ``DTSTART;VALUE=DATE:YYYYMMDD``.
|
|
- con offset/``Z`` en la entrada → convierte a UTC: ``DTSTART:...Z``.
|
|
- con ``tz`` válido → ``DTSTART;TZID=<tz>:YYYYMMDDTHHMMSS`` (hora local del
|
|
tz, el VTIMEZONE lo aporta el VCALENDAR).
|
|
- sin tz ni offset → hora flotante ``DTSTART:YYYYMMDDTHHMMSS``.
|
|
|
|
Raises:
|
|
ValueError: si ``value`` no es una fecha ISO reconocible.
|
|
"""
|
|
p = _parse_iso_input(value)
|
|
if p is None:
|
|
raise ValueError("fecha inválida: %r (usa ISO YYYY-MM-DD[THH:MM])" % value)
|
|
ymd = "%04d%02d%02d" % (p["year"], p["month"], p["day"])
|
|
if all_day or p["date_only"]:
|
|
return "%s;VALUE=DATE:%s" % (prop, ymd)
|
|
hms = "%02d%02d%02d" % (p["hour"], p["minute"], p["second"])
|
|
if p["offset"]:
|
|
# La entrada ya trae offset/Z: la pasamos a UTC absoluto.
|
|
dt = datetime.fromisoformat(
|
|
"%04d-%02d-%02dT%02d:%02d:%02d%s"
|
|
% (
|
|
p["year"],
|
|
p["month"],
|
|
p["day"],
|
|
p["hour"],
|
|
p["minute"],
|
|
p["second"],
|
|
_normalize_offset(p["offset"]),
|
|
)
|
|
)
|
|
dt_utc = dt.astimezone(timezone.utc)
|
|
return "%s:%s" % (prop, dt_utc.strftime("%Y%m%dT%H%M%SZ"))
|
|
if tz and _zoneinfo(tz) is not None:
|
|
return "%s;TZID=%s:%sT%s" % (prop, tz, ymd, hms)
|
|
# Hora flotante (sin tz reconocible): la escribimos sin Z.
|
|
return "%s:%sT%s" % (prop, ymd, hms)
|
|
|
|
|
|
def _normalize_offset(offset: str) -> str:
|
|
"""Normaliza un offset ISO a la forma ``±HH:MM`` que entiende fromisoformat."""
|
|
if offset == "Z":
|
|
return "+00:00"
|
|
if len(offset) == 5 and ":" not in offset: # ±HHMM
|
|
return offset[:3] + ":" + offset[3:]
|
|
return offset
|
|
|
|
|
|
def _vtimezone_block(tz: str) -> str:
|
|
"""Bloque VTIMEZONE mínimo para un TZID, con el offset estándar y de verano.
|
|
|
|
Calcula los offsets reales del tz para enero (estándar) y julio (verano) del
|
|
año actual con ``zoneinfo`` y emite un VTIMEZONE con ambas observancias. Es
|
|
una aproximación suficiente para que el cliente (y Xandikos) resuelvan la
|
|
hora local; no reproduce las reglas RRULE exactas. Devuelve cadena vacía si
|
|
el tz es UTC, desconocido, o no hace falta (el evento se sirve igual sin él).
|
|
"""
|
|
zone = _zoneinfo(tz)
|
|
if zone is None or tz.upper() == "UTC":
|
|
return ""
|
|
year = datetime.now().year
|
|
jan = datetime(year, 1, 15, 12, tzinfo=zone)
|
|
jul = datetime(year, 7, 15, 12, tzinfo=zone)
|
|
std_off = jan.utcoffset() or timedelta(0)
|
|
dst_off = jul.utcoffset() or timedelta(0)
|
|
|
|
def _fmt(off: timedelta) -> str:
|
|
total = int(off.total_seconds())
|
|
sign = "+" if total >= 0 else "-"
|
|
total = abs(total)
|
|
return "%s%02d%02d" % (sign, total // 3600, (total % 3600) // 60)
|
|
|
|
lines = ["BEGIN:VTIMEZONE", "TZID:%s" % tz]
|
|
if dst_off != std_off:
|
|
# Tiene horario de verano: dos observancias (estándar + verano).
|
|
lines += [
|
|
"BEGIN:STANDARD",
|
|
"DTSTART:19701025T030000",
|
|
"TZOFFSETFROM:%s" % _fmt(dst_off),
|
|
"TZOFFSETTO:%s" % _fmt(std_off),
|
|
"END:STANDARD",
|
|
"BEGIN:DAYLIGHT",
|
|
"DTSTART:19700329T020000",
|
|
"TZOFFSETFROM:%s" % _fmt(std_off),
|
|
"TZOFFSETTO:%s" % _fmt(dst_off),
|
|
"END:DAYLIGHT",
|
|
]
|
|
else:
|
|
lines += [
|
|
"BEGIN:STANDARD",
|
|
"DTSTART:19700101T000000",
|
|
"TZOFFSETFROM:%s" % _fmt(std_off),
|
|
"TZOFFSETTO:%s" % _fmt(std_off),
|
|
"END:STANDARD",
|
|
]
|
|
lines.append("END:VTIMEZONE")
|
|
return "\r\n".join(lines)
|
|
|
|
|
|
def _build_vcalendar(data: "EventIn", uid: str) -> str:
|
|
"""Serializa un ``EventIn`` a un VCALENDAR 2.0 con un VEVENT y su UID.
|
|
|
|
Construye DTSTART/DTEND respetando ``tz``/``all_day``/offset (ver
|
|
``_ical_dt_property``), añade un VTIMEZONE si el evento usa un TZID con
|
|
horario de verano, y mapea summary/location/description/color. El UID se
|
|
reutiliza al editar → idempotente (el recurso ``<uid>.ics`` se sobrescribe).
|
|
|
|
Raises:
|
|
ValueError: si ``dtstart`` no es una fecha ISO reconocible.
|
|
"""
|
|
dtstamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
|
tz = data.tz or ""
|
|
body = [
|
|
"BEGIN:VCALENDAR",
|
|
"VERSION:2.0",
|
|
"PRODID:-//osint_web//calendar//ES",
|
|
"CALSCALE:GREGORIAN",
|
|
]
|
|
if not data.all_day and tz and _zoneinfo(tz) is not None:
|
|
vtz = _vtimezone_block(tz)
|
|
if vtz:
|
|
body.append(vtz)
|
|
vevent = [
|
|
"BEGIN:VEVENT",
|
|
# Sanitizamos el UID (quitamos saltos de línea) para que no pueda inyectar
|
|
# propiedades/componentes iCal nuevos en el VEVENT.
|
|
"UID:%s" % str(uid).replace("\r", "").replace("\n", ""),
|
|
"DTSTAMP:%s" % dtstamp,
|
|
_ical_dt_property("DTSTART", data.dtstart, tz, data.all_day),
|
|
]
|
|
if data.dtend:
|
|
vevent.append(_ical_dt_property("DTEND", data.dtend, tz, data.all_day))
|
|
vevent.append("SUMMARY:%s" % _vcard_escape(data.summary.strip()))
|
|
if data.location and data.location.strip():
|
|
vevent.append("LOCATION:%s" % _vcard_escape(data.location.strip()))
|
|
if data.description and data.description.strip():
|
|
vevent.append("DESCRIPTION:%s" % _vcard_escape(data.description.strip()))
|
|
if data.color and data.color.strip():
|
|
# COLOR (RFC 7986) — nombre CSS3 o, para clientes Apple, el hex va aparte.
|
|
vevent.append("X-APPLE-CALENDAR-COLOR:%s" % data.color.strip())
|
|
rrule = (data.rrule or "").strip()
|
|
if rrule:
|
|
# Acepta tanto "FREQ=..." como "RRULE:FREQ=..."; normaliza a la línea
|
|
# canónica "RRULE:<cuerpo>" que entienden Xandikos y los clientes (DAVx5).
|
|
if rrule.upper().startswith("RRULE:"):
|
|
rrule = rrule[len("RRULE:"):].strip()
|
|
# Sanitizar: quitar saltos de línea para que el valor de la RRULE no
|
|
# inyecte propiedades/componentes nuevos (los `;`/`,` son separadores
|
|
# legítimos de la regla, así que no se escapan).
|
|
vevent.append("RRULE:%s" % rrule.replace("\r", "").replace("\n", ""))
|
|
vevent.append("END:VEVENT")
|
|
body.append("\r\n".join(vevent))
|
|
body.append("END:VCALENDAR")
|
|
return "\r\n".join(body) + "\r\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construcción de la app FastAPI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def create_app(vault_dir: str) -> FastAPI:
|
|
"""Crea la app FastAPI ligada a un vault concreto.
|
|
|
|
Valida que el vault existe al construir el ``VaultState`` (no un 500
|
|
silencioso en el primer request). Registra todos los endpoints sobre un
|
|
estado compartido en memoria.
|
|
"""
|
|
state = VaultState(vault_dir)
|
|
app = FastAPI(title="osint_web", version="0.1.0")
|
|
# Anti DNS-rebinding: solo acepta requests cuyo Host sea localhost. Cierra el
|
|
# vector por el que una web maliciosa rebindea su dominio a 127.0.0.1 y, desde
|
|
# el navegador del usuario, alcanza este service local (sin auth) o el de DuckDB.
|
|
from starlette.middleware.trustedhost import TrustedHostMiddleware
|
|
|
|
app.add_middleware(
|
|
TrustedHostMiddleware,
|
|
allowed_hosts=["127.0.0.1", "localhost", "testserver"],
|
|
)
|
|
|
|
# Anti-CSRF de navegador: rechaza las peticiones mutantes que el navegador
|
|
# marca como cross-site (header Sec-Fetch-Site). Cierra el hueco de las
|
|
# peticiones "simples" (POST sin preflight CORS, p.ej. /api/refresh) que el
|
|
# TrustedHost no filtra porque su Host sigue siendo 127.0.0.1. El frontend
|
|
# mismo-origen y los clientes server-to-server no envían 'cross-site'.
|
|
@app.middleware("http")
|
|
async def _reject_cross_site(request, call_next):
|
|
if request.method in ("POST", "PUT", "PATCH", "DELETE"):
|
|
if request.headers.get("sec-fetch-site") == "cross-site":
|
|
return JSONResponse(
|
|
status_code=403,
|
|
content={"status": "error", "error": "petición cross-site rechazada"},
|
|
)
|
|
return await call_next(request)
|
|
|
|
app.state.vault = state
|
|
|
|
# -- Vault --
|
|
|
|
@app.get("/api/health")
|
|
def health() -> dict:
|
|
"""Health check: confirma que el servidor está vivo y el vault cargado."""
|
|
return {
|
|
"status": "ok",
|
|
"vault": state.vault_dir,
|
|
"nodes": len(state.graph["nodes"]),
|
|
"edges": len(state.graph["edges"]),
|
|
}
|
|
|
|
@app.get("/api/graph")
|
|
def api_graph() -> dict:
|
|
"""Grafo completo del vault (nodos + aristas + conteos) para sigma.js.
|
|
|
|
Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar
|
|
notas.
|
|
"""
|
|
return state.graph_payload()
|
|
|
|
@app.get("/api/nodes")
|
|
def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict:
|
|
"""Filas de la tabla de un tipo concreto (frontmatter aplanado).
|
|
|
|
Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve
|
|
todos.
|
|
"""
|
|
rows = state.rows_by_tipo(tipo)
|
|
return {"tipo": tipo, "count": len(rows), "rows": rows}
|
|
|
|
@app.get("/api/node/{slug}")
|
|
def api_node(slug: str) -> dict:
|
|
"""Ficha de un nodo: frontmatter + body + lista de attachments."""
|
|
detail = state.node_detail(slug)
|
|
if detail is None:
|
|
raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug)
|
|
return detail
|
|
|
|
@app.get("/api/attachment")
|
|
def api_attachment(
|
|
path: str = Query(..., description="path relativo al vault"),
|
|
) -> FileResponse:
|
|
"""Sirve el binario de un attachment, con allowlist ESTRICTA al vault.
|
|
|
|
El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica
|
|
que cae dentro del vault: cualquier intento de salir
|
|
(``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el
|
|
archivo no existe (pero está dentro del vault), 404.
|
|
"""
|
|
abs_path = state.resolve_attachment_path(path)
|
|
if abs_path is None:
|
|
# Distinguimos traversal (403) de inexistente-dentro-del-vault (404).
|
|
candidate = os.path.realpath(os.path.join(state._vault_real, path or ""))
|
|
if state._is_within_vault(candidate) and candidate != state._vault_real:
|
|
raise HTTPException(status_code=404, detail="attachment no encontrado")
|
|
raise HTTPException(status_code=403, detail="path fuera del vault")
|
|
return FileResponse(abs_path)
|
|
|
|
@app.get("/api/search")
|
|
def api_search(q: str = Query(..., min_length=1)) -> dict:
|
|
"""Nodos del grafo cuyas notas matchean la query (substring)."""
|
|
results = state.search(q)
|
|
return {"query": q, "count": len(results), "results": results}
|
|
|
|
# -- Xandikos: contactos (CardDAV) --
|
|
|
|
@app.get("/api/contacts")
|
|
def api_contacts() -> JSONResponse:
|
|
"""Contactos del addressbook Xandikos, parseados a JSON (cacheados).
|
|
|
|
Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[],
|
|
osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``).
|
|
La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la
|
|
invalida). Si Xandikos / el osint_db no responde o falta la password →
|
|
503 con un JSON de error claro, nunca un crash.
|
|
"""
|
|
try:
|
|
contacts = state.contacts()
|
|
except (RuntimeError, DavUnavailable, osintdb_client.OsintDbUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={"status": "ok", "count": len(contacts), "contacts": contacts}
|
|
)
|
|
|
|
@app.get("/api/contact/{uid}")
|
|
def api_contact(uid: str) -> JSONResponse:
|
|
"""Un contacto concreto (por UID) parseado a JSON, desde la caché.
|
|
|
|
Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo
|
|
completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no
|
|
responde o falta la password.
|
|
"""
|
|
try:
|
|
contacts = state.contacts()
|
|
except (RuntimeError, DavUnavailable, osintdb_client.OsintDbUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
match = next((c for c in contacts if c.get("uid") == uid), None)
|
|
if match is None:
|
|
# Tolerancia: aceptar también el segmento final del href (nombre del
|
|
# recurso .vcf) cuando el UID no coincide literalmente.
|
|
match = next(
|
|
(
|
|
c
|
|
for c in contacts
|
|
if uid in (c.get("href") or "").rsplit("/", 1)[-1]
|
|
),
|
|
None,
|
|
)
|
|
if match is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % uid
|
|
)
|
|
return JSONResponse(content={"status": "ok", "contact": match})
|
|
|
|
# -- Contactos: CRUD (ficha .md del vault = verdad, vCard = reflejo) --
|
|
|
|
@app.post("/api/contact")
|
|
def api_create_contact(data: ContactIn = Body(...)) -> JSONResponse:
|
|
"""Crea un contacto: escribe la ficha ``.md`` del vault + el vCard.
|
|
|
|
La ficha del vault es la fuente de verdad (CONVENTIONS.md §3b/§6);
|
|
Xandikos se actualiza de inmediato para que el contacto se vea ya en la
|
|
app y en el móvil. 409 si el slug ya existe. Devuelve ``{slug, uid}``.
|
|
Con el flag ``OSINT_DB_BACKEND`` ON el alta va al osint_db; 503 si no
|
|
responde.
|
|
"""
|
|
try:
|
|
result = state.create_contact(data)
|
|
except osintdb_client.OsintDbUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(status_code=201, content={"status": "ok", **result})
|
|
|
|
@app.put("/api/contact/{slug}")
|
|
def api_update_contact(slug: str, data: ContactIn = Body(...)) -> JSONResponse:
|
|
"""Edita la ficha de un contacto (merge frontmatter) + re-sube el vCard.
|
|
|
|
404 si no existe la ficha. Preserva campos heredados no editables
|
|
(``sexo``, ``fecha_nacimiento``, ...). Devuelve ``{slug, uid}``. Con el
|
|
flag ``OSINT_DB_BACKEND`` ON la edición va al osint_db; 503 si no responde.
|
|
"""
|
|
try:
|
|
result = state.update_contact(slug, data)
|
|
except osintdb_client.OsintDbUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
@app.delete("/api/contact/{slug}")
|
|
def api_delete_contact(slug: str) -> JSONResponse:
|
|
"""Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos.
|
|
|
|
404 si no existe la ficha. Devuelve confirmación ``{slug, deleted}``. Con
|
|
el flag ``OSINT_DB_BACKEND`` ON el borrado va al osint_db; 503 si no
|
|
responde.
|
|
"""
|
|
try:
|
|
result = state.delete_contact(slug)
|
|
except osintdb_client.OsintDbUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
# -- Libretas (addressbooks) de contactos --
|
|
|
|
@app.get("/api/addressbooks")
|
|
def api_addressbooks() -> JSONResponse:
|
|
"""Libretas de contactos disponibles para el selector del frontend.
|
|
|
|
Cada una: ``{slug, display_name, collection_path, color}``. Con el flag
|
|
``OSINT_DB_BACKEND`` ON vienen del osint_db; con el flag OFF se devuelve
|
|
solo la libreta por defecto del vault. 503 si el osint_db no responde.
|
|
"""
|
|
try:
|
|
books = state.list_addressbooks()
|
|
except osintdb_client.OsintDbUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={
|
|
"status": "ok",
|
|
"count": len(books),
|
|
"addressbooks": books,
|
|
"default": DEFAULT_ADDRESSBOOK_SLUG,
|
|
}
|
|
)
|
|
|
|
@app.post("/api/addressbooks")
|
|
def api_create_addressbook(data: AddressbookIn = Body(...)) -> JSONResponse:
|
|
"""Crea una libreta de contactos nueva.
|
|
|
|
Body: ``{slug, name?, color?}``. Requiere el flag ``OSINT_DB_BACKEND``
|
|
activo (el osint_db crea la colección CardDAV en Xandikos); con el flag
|
|
OFF devuelve 501 claro. 503 si el osint_db no responde.
|
|
"""
|
|
try:
|
|
res = state.create_addressbook(data)
|
|
except osintdb_client.OsintDbUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(status_code=201, content={"status": "ok", **res})
|
|
|
|
# -- Xandikos: calendario (CalDAV) --
|
|
|
|
@app.get("/api/calendars")
|
|
def api_calendars() -> JSONResponse:
|
|
"""Colecciones de calendario bajo el calendar-home, con nombre y color.
|
|
|
|
Cada una: ``{href, name, color}``. Alimenta el selector de calendario del
|
|
frontend. 503 con JSON de error si Xandikos no responde.
|
|
"""
|
|
try:
|
|
calendars = state.list_calendars()
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={
|
|
"status": "ok",
|
|
"count": len(calendars),
|
|
"calendars": calendars,
|
|
"default": XANDIKOS_CALENDAR_COLLECTION,
|
|
}
|
|
)
|
|
|
|
@app.post("/api/calendars")
|
|
def api_create_calendar(data: CalendarIn = Body(...)) -> JSONResponse:
|
|
"""Crea una colección de calendario nueva (MKCALENDAR + nombre/color).
|
|
|
|
Body: ``{slug, name?, color?, description?}``. Idempotente si ya existe.
|
|
Devuelve ``{status, href, existed?}``. 400 si falta el nombre; 503 si
|
|
Xandikos no responde.
|
|
"""
|
|
try:
|
|
res = state.create_calendar(data)
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(status_code=201, content={"status": "ok", **res})
|
|
|
|
@app.get("/api/calendar")
|
|
def api_calendar(
|
|
cal: str = Query("", description="colección de calendario (ruta o nombre)"),
|
|
from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"),
|
|
to: str = Query("", description="fecha fin YYYY-MM-DD"),
|
|
) -> JSONResponse:
|
|
"""Eventos de una colección del calendario Xandikos en ``[from, to]``.
|
|
|
|
Cada evento: ``{uid, summary, dtstart, dtend, tz, all_day, location,
|
|
description, color}`` (dtstart/dtend en ISO con offset). ``cal`` elige la
|
|
colección (default la actual). La descarga + parseo se cachean
|
|
(``POST /api/refresh`` invalida); el filtro por rango va sobre la caché.
|
|
Si Xandikos no responde → 503 con JSON de error claro, nunca un crash.
|
|
"""
|
|
try:
|
|
events = state.calendar(cal, from_, to)
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={"status": "ok", "count": len(events), "events": events}
|
|
)
|
|
|
|
# -- Calendario: CRUD de eventos (VEVENT) --
|
|
|
|
@app.post("/api/event")
|
|
def api_create_event(data: EventIn = Body(...)) -> JSONResponse:
|
|
"""Crea un VEVENT en una colección de calendario (PUT de un VCALENDAR).
|
|
|
|
Body: ``{cal?, summary, dtstart, dtend?, tz?, all_day?, location?,
|
|
description?, color?}``. Genera el UID. 400 si la fecha es inválida; 503
|
|
si Xandikos rechaza el evento. Devuelve ``{uid, cal}``.
|
|
"""
|
|
try:
|
|
result = state.create_event(data)
|
|
except DavUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(status_code=201, content={"status": "ok", **result})
|
|
|
|
@app.put("/api/event/{uid}")
|
|
def api_update_event(uid: str, data: EventIn = Body(...)) -> JSONResponse:
|
|
"""Edita un VEVENT existente (reescribe ``<uid>.ics``).
|
|
|
|
Reutiliza el UID. 400 si la fecha es inválida; 503 si Xandikos rechaza.
|
|
Devuelve ``{uid, cal}``.
|
|
"""
|
|
try:
|
|
result = state.update_event(uid, data)
|
|
except DavUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
@app.delete("/api/event/{uid}")
|
|
def api_delete_event(
|
|
uid: str,
|
|
cal: str = Query("", description="colección de calendario (ruta o nombre)"),
|
|
) -> JSONResponse:
|
|
"""Borra un VEVENT (``<uid>.ics``) de una colección de calendario.
|
|
|
|
404 de Xandikos se trata como idempotente. 503 si falla por otra causa.
|
|
Devuelve ``{uid, deleted}``.
|
|
"""
|
|
try:
|
|
result = state.delete_event(uid, cal)
|
|
except DavUnavailable as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
# -- Refresco de cachés --
|
|
|
|
@app.post("/api/refresh")
|
|
def api_refresh() -> dict:
|
|
"""Reconstruye la caché del grafo del vault e invalida las cachés DAV.
|
|
|
|
Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y
|
|
calendario, que se recargarán perezosamente en el siguiente acceso.
|
|
Devuelve el conteo del grafo recién reconstruido.
|
|
"""
|
|
summary = state.refresh()
|
|
state.invalidate_dav()
|
|
return {"status": "refreshed", **summary}
|
|
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_args(argv: Optional[list] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos."
|
|
)
|
|
parser.add_argument(
|
|
"--vault",
|
|
default=os.path.expanduser("~/Obsidian/osint"),
|
|
help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)",
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
default="127.0.0.1",
|
|
help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)",
|
|
)
|
|
parser.add_argument(
|
|
"--port", type=int, default=8470, help="puerto local (default: 8470)"
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[list] = None) -> int:
|
|
args = _parse_args(argv)
|
|
if args.host != "127.0.0.1":
|
|
# Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red.
|
|
print(
|
|
"ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos "
|
|
"personales sensibles; exponerlo a la red es un riesgo.",
|
|
file=sys.stderr,
|
|
)
|
|
try:
|
|
app = create_app(args.vault)
|
|
except (FileNotFoundError, NotADirectoryError) as exc:
|
|
print(f"error: {exc}", file=sys.stderr)
|
|
return 2
|
|
|
|
import uvicorn
|
|
|
|
state = app.state.vault
|
|
print(
|
|
f"osint_web backend en http://{args.host}:{args.port} — vault: "
|
|
f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, "
|
|
f"{len(state.graph['edges'])} aristas)"
|
|
)
|
|
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|