#!/usr/bin/env python3 """Backend FastAPI de la app osint_web. Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres fuentes de datos para un frontend web local: 1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo, fichas con galería de attachments y búsqueda global). 2. La agenda CardDAV del servidor Xandikos (contactos). 3. El calendario CalDAV del servidor Xandikos (eventos). Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de embeds ni protocolo DAV. Orquesta funciones del registry de los grupos ``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret`` para la credencial de Xandikos. La única lógica propia de la app es: cacheo en memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y el parseo ligero de vCard/iCalendar a JSON. Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint ``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado cae dentro del vault; cualquier intento de path traversal devuelve 403. Uso: python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470 Endpoints (JSON salvo /api/attachment): GET /api/health estado + tamaño del grafo cacheado GET /api/graph grafo completo {nodes, edges} para sigma.js GET /api/nodes?tipo=persona filas de la tabla de ese tipo GET /api/node/ ficha: frontmatter + body + attachments GET /api/attachment?path=.. binario del attachment (path relativo al vault) GET /api/search?q=... nodos cuyo contenido matchea la query GET /api/contacts contactos (Xandikos por defecto; osint_db si flag) GET /api/contact/ un vCard concreto a JSON GET /api/addressbooks libretas de contactos (selector del frontend) POST /api/addressbooks crea una libreta nueva (requiere OSINT_DB_BACKEND) GET /api/calendars colecciones de calendario bajo /enmanuel/calendars/ GET /api/calendar?cal=&from=&to= eventos de una colección del calendario (CalDAV) POST /api/event crea un VEVENT en una colección de calendario PUT /api/event/ edita un VEVENT existente DELETE /api/event/ borra un VEVENT POST /api/refresh re-escanea el vault y reconstruye la caché """ from __future__ import annotations import argparse import importlib.util import json import os import re import sys import threading import time import uuid from datetime import datetime, timedelta, timezone from typing import Optional try: from zoneinfo import ZoneInfo, ZoneInfoNotFoundError except ImportError: # pragma: no cover - Python < 3.9 sin tzdata ZoneInfo = None # type: ignore[assignment] class ZoneInfoNotFoundError(Exception): # type: ignore[no-redef] pass def _registry_functions_dir() -> str: """Localiza ``python/functions`` del fn_registry sin paths hardcodeados. Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y ``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el backend funciona en cualquier PC con el layout estándar del registry (la app vive en ``/projects/osint/apps/osint_web/server/``) sin hardcodear el home de un usuario concreto (memoria ``hardcoded-lucas-paths``). """ env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS") if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")): return env_functions candidates: list[str] = [] env_root = os.environ.get("FN_REGISTRY_ROOT") if env_root: candidates.append(env_root) current = os.path.dirname(os.path.abspath(__file__)) while True: candidates.append(current) parent = os.path.dirname(current) if parent == current: break current = parent candidates.append("/home/enmanuel/fn_registry") for root in candidates: functions_dir = os.path.join(root, "python", "functions") if os.path.isdir(os.path.join(functions_dir, "obsidian")): return functions_dir raise RuntimeError( "no se encontró python/functions/obsidian subiendo desde " f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz " "del fn_registry" ) _FUNCTIONS_DIR = _registry_functions_dir() sys.path.insert(0, _FUNCTIONS_DIR) from fastapi import Body, FastAPI, HTTPException, Query # noqa: E402 from fastapi.responses import FileResponse, JSONResponse # noqa: E402 from pydantic import BaseModel, Field # noqa: E402 # --- Grupo de capacidad obsidian (parseo del vault) --- # El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así # que se importa directamente. from obsidian import ( # noqa: E402 (sys.path debe resolverse antes) build_obsidian_graph, create_obsidian_note, delete_obsidian_note, extract_obsidian_embeds, list_obsidian_notes, read_obsidian_note, resolve_obsidian_embed, search_obsidian_notes, slugify_obsidian_name, update_obsidian_note, ) def _load_infra_fn(module_name: str, attr: str): """Carga una función del paquete ``infra`` por archivo, sin tocar su __init__. El ``infra/__init__.py`` del registry importa de forma eager ``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que esta app no usa. Importar ``from infra.dav_list_resources import ...`` arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se carga cada módulo concreto del grupo ``dav`` directamente por su ruta de archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo registry-first: se usa la función del registry sin reimplementarla, solo se importa de forma quirúrgica. """ file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py") spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path) if spec is None or spec.loader is None: # pragma: no cover - defensivo raise ImportError("no se pudo cargar %s" % file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return getattr(module, attr) # --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass --- # dav_get_collection trae TODOS los recursos (vCards / VCALENDARs) de una # colección en UNA petición REPORT con el contenido inline; dav_collection_ctag # lee el ctag de la colección (PROPFIND Depth:0 barato) para validar la caché en # disco sin descargar nada cuando nada cambió. dav_get_collection = _load_infra_fn("dav_get_collection", "dav_get_collection") dav_collection_ctag = _load_infra_fn("dav_collection_ctag", "dav_collection_ctag") split_vcards = _load_infra_fn("split_vcards", "split_vcards") pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret") # Escritura CardDAV: PUT (crear/editar) y DELETE (borrar) un vCard. El cambio en # el vault .md es la fuente de verdad; estas reflejan el cambio en Xandikos de # inmediato para que la app y el móvil lo vean ya, sin esperar al sync periódico. carddav_put_vcard = _load_infra_fn("carddav_put_vcard", "carddav_put_vcard") dav_delete_resource = _load_infra_fn("dav_delete_resource", "dav_delete_resource") # Calendario (CalDAV): crear/editar eventos (PUT de un VCALENDAR por UID) y listar # las colecciones de calendario del usuario con su nombre y color. caldav_put_event = _load_infra_fn("caldav_put_event", "caldav_put_event") dav_list_calendars = _load_infra_fn("dav_list_calendars", "dav_list_calendars") # Crear una colección de calendario nueva (MKCALENDAR + PROPPATCH nombre/color). dav_make_calendar = _load_infra_fn("dav_make_calendar", "dav_make_calendar") # Expandir una RRULE a las fechas de cada ocurrencia dentro de un rango (pura). expand_rrule = _load_infra_fn("expand_rrule", "expand_rrule") # Cliente del service osint_db (DuckDB), usado SOLO cuando el feature flag # OSINT_DB_BACKEND está ON. El módulo vive junto a este archivo en server/, que # está en sys.path tanto al ejecutar `python server/main.py` como al importarlo # desde los tests. Se importa siempre (es barato: solo stdlib) pero no se usa a # menos que el flag esté activo. import osintdb_client # noqa: E402 # --------------------------------------------------------------------------- # Configuración Xandikos (CardDAV / CalDAV) # --------------------------------------------------------------------------- XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com" XANDIKOS_USERNAME = "enmanuel" XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel" XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/" # Libreta (addressbook) por defecto. Cuando el flag OSINT_DB_BACKEND está OFF, # todos los contactos viven en esta única libreta; el frontend la muestra como # opción por defecto del selector. DEFAULT_ADDRESSBOOK_SLUG = "addressbook" DEFAULT_ADDRESSBOOK_NAME = "Contactos" # Calendar-home del usuario: bajo él cuelgan las colecciones de calendario. El # selector de calendario las descubre con dav_list_calendars (PROPFIND Depth:1). XANDIKOS_CALENDAR_HOME = "/enmanuel/calendars/" # Colección de calendario por defecto (la única hoy). Sigue siendo el destino # cuando el cliente no especifica `cal`. XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/" # Caché en disco de los datos DAV ya parseados, indexada por el ctag de la # colección. Al arrancar el server lee el ctag (PROPFIND barato ~0.1s) y, si # coincide con el de la caché en disco, sirve los contactos/eventos ya parseados # sin descargar ni reparsear nada (arranque instantáneo). El directorio vive # junto al server y está gitignored (datos personales sensibles + regenerable). _CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache") _CONTACTS_CACHE_FILE = os.path.join(_CACHE_DIR, "contacts.json") # Caché del calendario por defecto. Para otras colecciones la ruta se deriva del # nombre de la colección (_calendar_cache_file), así cada calendario tiene su # propia caché en disco. _CALENDAR_CACHE_FILE = os.path.join(_CACHE_DIR, "calendar.json") def _calendar_cache_file(collection_path: str) -> str: """Ruta de la caché en disco de una colección de calendario concreta. La colección por defecto usa ``_CALENDAR_CACHE_FILE`` (compatibilidad con la caché previa); cualquier otra deriva su archivo del último segmento del path, saneado, para que cada calendario tenga su propia caché aislada. """ if collection_path.strip("/") == XANDIKOS_CALENDAR_COLLECTION.strip("/"): return _CALENDAR_CACHE_FILE tail = collection_path.strip("/").rsplit("/", 1)[-1] or "calendar" safe = re.sub(r"[^A-Za-z0-9_.-]", "_", tail) return os.path.join(_CACHE_DIR, "calendar_%s.json" % safe) # Extensiones de imagen que el frontend muestra en la galería con lightbox. _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"} class DavUnavailable(Exception): """Xandikos no responde (sin red, timeout, auth caída). Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los endpoints del vault, que deben seguir funcionando offline. """ def _attachment_kind(name: str) -> str: """Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``.""" ext = os.path.splitext(name)[1].lower() if ext in _IMAGE_EXTS: return "image" if ext == ".pdf": return "pdf" return "other" def _read_pass_secret(entry: str) -> str: """Lee la contraseña (primera línea) de una entrada de ``pass``. Wrapper fino sobre la función del registry ``pass_get_secret_py_infra`` (grupo ``infra``/``flow-replay``), que ejecuta ``pass show `` sin shell y nunca logea el valor. Convierte su resultado ``{status, value|error}`` en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints DAV degraden con un error explicable en vez de un 500 silencioso. """ res = pass_get_secret(entry) if res.get("status") != "ok": raise RuntimeError( "no se pudo leer la entrada '%s' de pass: %s" % (entry, res.get("error", "error desconocido")) ) value = res.get("value", "") if not value.strip(): raise RuntimeError("la entrada '%s' de pass está vacía" % entry) return value.strip() # --------------------------------------------------------------------------- # Caché en disco de los datos DAV ya parseados (indexada por ctag) # --------------------------------------------------------------------------- def _read_disk_cache(path: str) -> Optional[dict]: """Lee una caché DAV del disco: ``{"ctag": str, "items": list}`` o None. Devuelve None (recargar de la red) ante cualquier problema: archivo inexistente, JSON corrupto, o estructura inesperada. Nunca lanza: la caché es un acelerador, no una fuente de verdad — si falla, se cae a la descarga. """ try: with open(path, "r", encoding="utf-8") as fh: data = json.load(fh) except (OSError, ValueError): return None if not isinstance(data, dict) or not isinstance(data.get("items"), list): return None return data def _write_disk_cache(path: str, ctag: str, items: list) -> None: """Escribe la caché DAV al disco de forma atómica (tmp + rename). Persiste ``{"ctag": ctag, "items": items, "saved_at": epoch}``. Errores de escritura se ignoran (no deben tumbar el endpoint): la caché en memoria sigue sirviendo y el disco se reintentará en el siguiente refresco. """ try: os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as fh: json.dump( {"ctag": ctag, "items": items, "saved_at": time.time()}, fh, ensure_ascii=False, ) os.replace(tmp, path) except OSError: pass # --------------------------------------------------------------------------- # Feature flag OSINT_DB_BACKEND: fuente de verdad de los contactos # --------------------------------------------------------------------------- # # OFF (default): los contactos se escriben como ficha .md en el vault + reflejo # del vCard en Xandikos (comportamiento histórico de la app). # ON: la app lee/escribe contra el service osint_db (DuckDB, 127.0.0.1:8771), # que pasa a ser la fuente de verdad y empuja él mismo el cambio a Xandikos. # # El flag vive en dev/feature_flags.json (raíz de la app), patrón TBD del # registry (.claude/rules/feature_flags.md). Se lee en cada acceso (no se cachea) # para que cambiarlo no requiera reiniciar el server. _FLAGS_FILE = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "dev", "feature_flags.json", ) # Service osint_db (DuckDB) — fuente de verdad cuando el flag está ON. OSINT_DB_BASE_URL = "http://127.0.0.1:8771" def _osint_db_backend_enabled() -> bool: """True si el flag ``OSINT_DB_BACKEND`` está activo en dev/feature_flags.json. Lee el archivo en cada llamada (sin caché) para que el flip se note sin reiniciar. Tolerante a fallos: archivo ausente, JSON corrupto o clave faltante → False (comportamiento histórico vault+Xandikos), nunca lanza. """ try: with open(_FLAGS_FILE, "r", encoding="utf-8") as fh: data = json.load(fh) except (OSError, ValueError): return False flag = (data.get("flags") or {}).get("OSINT_DB_BACKEND") or {} return bool(flag.get("enabled")) def _contacts_from_osint_db() -> list: """Lee los contactos del osint_db y los adapta al shape JSON del frontend. El osint_db devuelve filas ``{uid, collection, fn, tels, emails, note_path}`` (``tels``/``emails`` como JSON array). Esta función las mapea al mismo dict que produce ``_vcard_to_json`` (``uid, nombre, telefonos, correos, phones, emails, osint, ...``), para que ``/api/contacts`` y la vista no distingan la fuente. ``collection`` se expone para poder filtrar por libreta. Raises: osintdb_client.OsintDbUnavailable: si el service no responde. """ rows = osintdb_client.list_contacts() out: list = [] for row in rows: tels = osintdb_client._parse_json_array(row.get("tels")) mails = osintdb_client._parse_json_array(row.get("emails")) fn = row.get("fn") out.append( { "uid": row.get("uid"), "fn": fn, "nombre": fn, "nickname": None, "alias": None, "org": None, "note": None, "nota": None, "collection": row.get("collection"), "phones": [{"value": t, "type": ""} for t in tels], "emails": [{"value": e, "type": ""} for e in mails], "telefonos": tels, "correos": mails, "direcciones": [], "osint": {}, "note_path": row.get("note_path"), } ) out.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower()) return out def _osint_db_contact_payload(data: "ContactIn", uid: Optional[str] = None) -> dict: """Construye el cuerpo JSON de un contacto para el service osint_db. Mapea el ``ContactIn`` (ya reconciliado multi-valor) al contrato del osint_db: ``{uid?, collection, fn, telefonos, emails, direcciones, nombre, aliases, dni, pais, contexto, notas}``. ``collection`` se deriva del campo ``contexto`` si apunta a una libreta; por defecto la libreta canónica. ``uid`` solo se incluye al crear (el PUT lo lleva en la ruta, no en el cuerpo). """ nombre = data.nombre.strip() payload: dict = { "collection": _norm_str(data.collection) or DEFAULT_ADDRESSBOOK_SLUG, "fn": nombre, "nombre": nombre, "telefonos": _norm_list(data.telefonos), "emails": _norm_list(data.emails), "direcciones": _norm_list(data.direcciones), "aliases": _norm_list(data.aliases), "dni": _norm_str(data.dni), "pais": _norm_str(data.pais), "contexto": _norm_str(data.contexto), "notas": _norm_str(data.notas), } if uid: payload["uid"] = uid return payload # --------------------------------------------------------------------------- # Estado del servidor: caché del vault + password Xandikos # --------------------------------------------------------------------------- class VaultState: """Caché en memoria del vault: grafo agregado + índice slug → nota. Se construye al arrancar y se reconstruye bajo demanda con ``refresh()`` (botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe mediante un lock sobre la reconstrucción. La password de Xandikos se lee de ``pass`` perezosamente y se cachea en memoria. Raises: FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar, nunca un 500 silencioso). NotADirectoryError: si ``vault_dir`` no es un directorio. """ def __init__(self, vault_dir: str): if not os.path.exists(vault_dir): raise FileNotFoundError(f"el vault no existe: {vault_dir}") if not os.path.isdir(vault_dir): raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}") self.vault_dir = os.path.abspath(vault_dir) self._vault_real = os.path.realpath(self.vault_dir) self._lock = threading.Lock() self.graph: dict = {"nodes": [], "edges": []} self.note_index: dict = {} # slug -> {"path", "tipo", "label"} self._xandikos_password: Optional[str] = None # Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al # primer acceso y se invalidan en POST /api/refresh. None = sin cargar. # Cada caché lleva su ctag para servir del disco sin red cuando la # colección no cambió. _force_reload (set por /api/refresh) salta la # validación de ctag en el siguiente acceso. self._dav_lock = threading.Lock() self._contacts_cache: Optional[list] = None # Caché de eventos POR colección de calendario: collection_path → list. # Permite varios calendarios sin pisarse; cada uno con su ctag. self._calendar_cache: dict[str, list] = {} self._calendar_ctag: dict[str, str] = {} self._calendars_cache: Optional[list] = None # lista de colecciones self._contacts_ctag: Optional[str] = None self._force_reload = False self.refresh() # --- vault -------------------------------------------------------------- def refresh(self) -> dict: """Re-escanea el vault: reconstruye grafo + índice de notas. Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend. """ with self._lock: graph = build_obsidian_graph(self.vault_dir, include_dangling=True) nodes_by_id = {n["id"]: n for n in graph["nodes"]} note_index: dict = {} for path in list_obsidian_notes(self.vault_dir): slug = os.path.splitext(os.path.basename(path))[0] if not slug or slug in note_index: continue node = nodes_by_id.get(slug, {}) note_index[slug] = { "path": path, "tipo": node.get("tipo", "nota"), "label": node.get("label", slug), } self.graph = graph self.note_index = note_index return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])} def graph_payload(self) -> dict: """Grafo + conteos por tipo para la leyenda de sigma.js.""" counts: dict[str, int] = {} for node in self.graph["nodes"]: counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1 return { "nodes": self.graph["nodes"], "edges": self.graph["edges"], "counts": counts, "total_nodes": len(self.graph["nodes"]), "total_edges": len(self.graph["edges"]), } def rows_by_tipo(self, tipo: str) -> list: """Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados. Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter`` completo — el frontend aplana las columnas que le interesen. Sin ``tipo`` devuelve todos los nodos reales. """ rows = [] for node in self.graph["nodes"]: if node.get("dangling"): continue if tipo and node["tipo"] != tipo: continue rows.append( { "id": node["id"], "label": node["label"], "tipo": node["tipo"], "frontmatter": node["frontmatter"], } ) return rows def _resolve_embed(self, embed_name: str) -> str: """Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault. El vault osint usa dos formas de embed: por nombre de archivo (``![[foto.jpg]]``) y por path relativo al vault (``![[attachments/personas//foto.png]]``). La función del registry ``resolve_obsidian_embed`` resuelve solo por basename, así que primero se intenta el embed como path literal relativo al vault (cubre la forma con ruta), y si no existe se cae al resolutor por basename del registry. Devuelve cadena vacía si ninguna forma resuelve. """ # Forma 1: el embed ya es un path relativo al vault. literal = os.path.realpath(os.path.join(self._vault_real, embed_name)) if self._is_within_vault(literal) and os.path.isfile(literal): return literal # Forma 2: resolución por basename via función del registry. return resolve_obsidian_embed(self.vault_dir, embed_name) def node_detail(self, slug: str): """Ficha completa de un nodo: frontmatter + body + attachments. Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``) y devueltos como paths **relativos al vault** (lo que consume ``/api/attachment``). Un embed que no resuelve se reporta con ``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no corresponde a ninguna nota del vault. """ info = self.note_index.get(slug) if info is None: # Tolerancia: aceptar también nombres sin slugificar. info = self.note_index.get(slugify_obsidian_name(slug)) if info is None: return None note = read_obsidian_note(info["path"]) attachments = [] for name in extract_obsidian_embeds(note["body"]): abs_path = self._resolve_embed(name) if not abs_path: attachments.append({"name": name, "path": "", "kind": "missing"}) continue real = os.path.realpath(abs_path) # Defensa en profundidad: solo attachments dentro del vault. if not self._is_within_vault(real): continue rel = os.path.relpath(real, self._vault_real) attachments.append( {"name": name, "path": rel, "kind": _attachment_kind(abs_path)} ) return { "id": os.path.splitext(os.path.basename(info["path"]))[0], "tipo": info["tipo"], "label": info["label"], "frontmatter": note["frontmatter"], "body": note["body"], "tags": note["tags"], "wikilinks": note["wikilinks"], "attachments": attachments, } def _is_within_vault(self, candidate_real_path: str) -> bool: """True si ``candidate_real_path`` (ya realpath) está dentro del vault. Añade el separador final al vault para que ``/vault-evil`` no cuele como prefijo de ``/vault``. """ return ( candidate_real_path == self._vault_real or candidate_real_path.startswith(self._vault_real + os.sep) ) def resolve_attachment_path(self, rel_path: str): """Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault. Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y sigue symlinks) y exige que el resultado quede estrictamente bajo la raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento de salir del vault, paths absolutos, o archivos inexistentes. """ if not rel_path: return None candidate = os.path.realpath(os.path.join(self._vault_real, rel_path)) if candidate == self._vault_real: return None if not candidate.startswith(self._vault_real + os.sep): return None if not os.path.isfile(candidate): return None return candidate def search(self, query: str) -> list: """Búsqueda global: nodos cuyas notas matchean la query (substring). Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug, label, tipo) + las líneas que matchean. """ results = [] for hit in search_obsidian_notes(self.vault_dir, query): slug = os.path.splitext(os.path.basename(hit["path"]))[0] info = self.note_index.get(slug, {}) results.append( { "id": slug, "label": info.get("label", slug), "tipo": info.get("tipo", "nota"), "matches": hit.get("matches", []), } ) return results # --- Xandikos ----------------------------------------------------------- def xandikos_password(self) -> str: """Password de Xandikos desde ``pass``, cacheada en memoria.""" with self._lock: if self._xandikos_password is None: self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY) return self._xandikos_password def _collection_ctag(self, collection_path: str, password: str) -> Optional[str]: """Lee el ctag de una colección (PROPFIND barato), o None si no se puede. El ctag es el token de versión de la colección: si no cambió, la caché en disco sigue vigente. Devolver None significa "no pude validar" → se recarga de la red por seguridad (nunca se sirve caché potencialmente obsoleta sin confirmación). No lanza. """ res = dav_collection_ctag( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path ) return res.get("ctag") if res.get("status") == "ok" else None def _load_collection( self, collection_path: str, content_type: str, cache_file: str, parse_items, ) -> tuple: """Carga una colección DAV con caché en disco validada por ctag. Flujo (1 o 2 peticiones, nunca N): 1. Lee el ctag de la colección (PROPFIND Depth:0, ~0.1s). 2. Si el ctag coincide con el de la caché en disco (y no hay refresh forzado), parsea los items del disco y devuelve sin descargar. 3. Si no, hace UN REPORT ``dav_get_collection`` que trae todos los recursos con su contenido inline, los parsea con ``parse_items`` y reescribe la caché en disco con el nuevo ctag. ``parse_items(resources) -> list`` transforma la lista ``[{href, etag, data}]`` del registry en la lista de objetos JSON que sirve el endpoint (contactos o eventos). Returns: tuple ``(items, ctag)``. Raises: DavUnavailable: si Xandikos no responde al REPORT cuando hay que descargar (sin red, timeout, auth). """ password = self.xandikos_password() ctag = self._collection_ctag(collection_path, password) # Caché en disco vigente: mismo ctag y sin refresh forzado → sin red. if ctag is not None and not self._force_reload: disk = _read_disk_cache(cache_file) if disk is not None and disk.get("ctag") == ctag: return parse_items(disk["items"]), ctag # Hay que (re)descargar: UN REPORT trae todo con el contenido inline. got = dav_get_collection( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path, content_type, ) if got.get("status") != "ok": raise DavUnavailable("Xandikos no responde: %s" % got.get("error")) resources = got.get("resources", []) items = parse_items(resources) # Persistir en disco para el arranque instantáneo de la próxima vez. Si # no obtuvimos ctag, guardamos cadena vacía: nunca matcheará un ctag real, # así que la próxima vez se revalidará (cae con elegancia a "siempre # recargar" sin romper). _write_disk_cache(cache_file, ctag or "", items) return items, (ctag or "") @staticmethod def _parse_contacts(items: list) -> list: """Parsea ``[{href, etag, data}]`` (o items cacheados) a contactos JSON. Acepta dos formas de ``items``: la lista de recursos del registry (cada uno con ``data`` = texto vCard, posible multi-tarjeta) que hay que parsear, o la lista de contactos ya parseados (caché en disco), que se devuelve tal cual. Se distinguen por la presencia de la clave ``data``. """ if items and "data" in items[0]: contacts: list = [] for res in items: href = res.get("href") for card_text in split_vcards(res.get("data", "")): card = _vcard_to_json(card_text) card["etag"] = res.get("etag") card["href"] = href contacts.append(card) contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower()) return contacts return list(items) @staticmethod def _parse_events(items: list) -> list: """Parsea ``[{href, etag, data}]`` (o items cacheados) a eventos JSON. Igual criterio que ``_parse_contacts``: si los items llevan ``data`` son recursos del registry (texto VCALENDAR) y se parsean; si no, ya son eventos cacheados y se devuelven tal cual. """ if items and "data" in items[0]: events: list = [] for res in items: href = res.get("href") for event in _vcalendar_to_events(res.get("data", "")): event["etag"] = res.get("etag") event["href"] = href events.append(event) events.sort(key=lambda e: e.get("dtstart") or "") return events return list(items) def contacts(self) -> list: """Contactos, desde Xandikos (flag OFF) o desde osint_db (flag ON). Con el flag ``OSINT_DB_BACKEND`` activo, la fuente de verdad es el service osint_db (DuckDB): se consultan sus contactos y se devuelven con el mismo shape JSON que produce el parseo del vCard, para que el frontend no note la diferencia. Con el flag OFF (default), camino histórico: addressbook Xandikos parseado y cacheado. Caché en dos niveles para el camino DAV: memoria (mientras vive el proceso) y disco (``.cache/contacts.json``, validada por ctag para arranque instantáneo). Al primer acceso descarga TODO en UNA petición REPORT (``dav_get_collection``) en vez de un GET por ``.vcf``. Raises: RuntimeError: si no se puede leer la password de ``pass``. DavUnavailable: si Xandikos no responde (sin red, timeout, auth). osintdb_client.OsintDbUnavailable: si el osint_db no responde (flag ON). """ if _osint_db_backend_enabled(): return _contacts_from_osint_db() with self._dav_lock: if self._contacts_cache is not None and not self._force_reload: return self._contacts_cache contacts, ctag = self._load_collection( XANDIKOS_CONTACTS_COLLECTION, "vcard", _CONTACTS_CACHE_FILE, self._parse_contacts, ) self._contacts_cache = contacts self._contacts_ctag = ctag self._maybe_clear_force_reload() return contacts def list_addressbooks(self) -> list: """Libretas (addressbooks) disponibles para los contactos. Con el flag ``OSINT_DB_BACKEND`` ON, consulta las libretas del osint_db (``{slug, display_name, collection_path, color}``). Con el flag OFF, hoy solo existe la libreta por defecto en el vault; se devuelve esa única entrada para que el selector del frontend tenga algo que mostrar. Raises: osintdb_client.OsintDbUnavailable: si el osint_db no responde (flag ON). """ if _osint_db_backend_enabled(): return osintdb_client.list_addressbooks() return [ { "slug": DEFAULT_ADDRESSBOOK_SLUG, "display_name": DEFAULT_ADDRESSBOOK_NAME, "collection_path": XANDIKOS_CONTACTS_COLLECTION, "color": None, } ] def create_addressbook(self, data: "AddressbookIn") -> dict: """Crea una libreta de contactos nueva. Solo soportado con el flag ``OSINT_DB_BACKEND`` ON: el osint_db crea la colección CardDAV en Xandikos y la registra en la DuckDB. Con el flag OFF no hay forma de crear libretas todavía (no existe ``dav_make_addressbook`` en el registry) → 501 claro indicando que requiere el flag. Returns: dict ``{status, slug, ...}`` del osint_db. Raises: HTTPException(400): si el slug/nombre queda vacío. HTTPException(501): si el flag está OFF. osintdb_client.OsintDbUnavailable: si el osint_db no responde. """ slug = (data.slug or data.name or "").strip() if not slug: raise HTTPException( status_code=400, detail="el nombre de la libreta es obligatorio" ) if not _osint_db_backend_enabled(): raise HTTPException( status_code=501, detail=( "crear libretas requiere el backend OSINT_DB_BACKEND activo " "(hoy solo existe la libreta por defecto en el vault)" ), ) res = osintdb_client.create_addressbook( slug, data.name or slug, data.color or None ) self.invalidate_dav() return res def _resolve_calendar(self, cal: str = "") -> str: """Normaliza el parámetro ``cal`` a una ruta de colección de calendario. Acepta una ruta absoluta (``/enmanuel/calendars/calendar/``), el nombre corto de la colección (``calendar``), o vacío (→ colección por defecto). Garantiza barras inicial/final. NO valida contra el servidor (eso lo hace el propio Xandikos al fallar la petición); solo da forma canónica. """ cal = (cal or "").strip() if not cal: return XANDIKOS_CALENDAR_COLLECTION if cal.startswith("/"): path = cal else: # Nombre corto → lo colgamos del calendar-home. path = XANDIKOS_CALENDAR_HOME.rstrip("/") + "/" + cal.strip("/") if not path.endswith("/"): path += "/" return path def list_calendars(self) -> list: """Colecciones de calendario bajo el calendar-home, con nombre y color. Cacheada en memoria (``POST /api/refresh`` la invalida). Compone la función del registry ``dav_list_calendars`` (PROPFIND Depth:1). Devuelve ``[{href, name, color}, ...]`` ordenadas por nombre. Raises: RuntimeError: si no se puede leer la password de ``pass``. DavUnavailable: si Xandikos no responde. """ with self._dav_lock: if self._calendars_cache is not None and not self._force_reload: return self._calendars_cache password = self.xandikos_password() res = dav_list_calendars( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CALENDAR_HOME ) if res.get("status") != "ok": raise DavUnavailable( "Xandikos no responde: %s" % res.get("error") ) calendars = res.get("calendars", []) self._calendars_cache = calendars self._maybe_clear_force_reload() return calendars def create_calendar(self, data: "CalendarIn") -> dict: """Crea una colección de calendario nueva bajo el calendar-home (MKCALENDAR). Compone la función del registry ``dav_make_calendar`` (MKCALENDAR + PROPPATCH de nombre/color). Invalida la caché de colecciones para que el calendario nuevo aparezca en el selector al recargar. Returns: dict ``{status, href, existed?}`` de la función del registry. Raises: HTTPException(400): si el slug/nombre queda vacío tras sanear. DavUnavailable: si Xandikos rechaza la creación. """ slug = (data.slug or data.name or "").strip() if not slug: raise HTTPException(status_code=400, detail="el nombre del calendario es obligatorio") password = self.xandikos_password() res = dav_make_calendar( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CALENDAR_HOME, slug, data.name or slug, data.color or "", data.description or "", ) if res.get("status") != "ok": raise DavUnavailable( "Xandikos no pudo crear el calendario: %s" % res.get("error") ) with self._dav_lock: self._calendars_cache = None return res def calendar(self, cal: str = "", dt_from: str = "", dt_to: str = "") -> list: """Eventos de una colección de calendario Xandikos, cacheados y filtrados. Caché por colección (memoria + disco, validada por ctag). La descarga + parseo completos se cachean (UNA petición REPORT); el filtro por ``[from, to]`` se aplica sobre la caché. Sin ``cal`` usa la colección por defecto; sin ``from``/``to`` devuelve todos los eventos. Raises: RuntimeError: si no se puede leer la password de ``pass``. DavUnavailable: si Xandikos no responde (sin red, timeout, auth). """ collection = self._resolve_calendar(cal) with self._dav_lock: cached = self._calendar_cache.get(collection) if cached is None or self._force_reload: events, ctag = self._load_collection( collection, "ical", _calendar_cache_file(collection), self._parse_events, ) self._calendar_cache[collection] = events self._calendar_ctag[collection] = ctag self._maybe_clear_force_reload() cached = events all_events = list(cached) # Sin rango: devolvemos los eventos maestros tal cual (no expandimos # series infinitas). Con rango: cada serie recurrente se expande a sus # ocurrencias dentro de [from, to]; los puntuales se filtran por fecha. if not dt_from and not dt_to: return all_events out: list = [] for ev in all_events: if ev.get("rrule"): out.extend(_expand_event_occurrences(ev, dt_from, dt_to)) elif _event_in_range(ev, dt_from, dt_to): out.append(ev) return out # --- Escritura de eventos del calendario (CalDAV) ----------------------- def create_event(self, data: "EventIn") -> dict: """Crea un VEVENT en una colección de calendario (PUT de un VCALENDAR). Genera un UID nuevo, construye el VCALENDAR/VEVENT (respetando tz/all_day, ver ``_build_vcalendar``) y lo sube con ``caldav_put_event``. Invalida la caché de esa colección para que el evento aparezca ya. Returns: dict ``{uid, cal, dav}``. Raises: HTTPException(400): si la fecha es inválida o falta el summary. DavUnavailable: si Xandikos rechaza el PUT. """ if not data.summary or not data.summary.strip(): raise HTTPException(status_code=400, detail="el summary es obligatorio") collection = self._resolve_calendar(data.cal or "") uid = str(uuid.uuid4()) try: vcal = _build_vcalendar(data, uid) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc dav = self._put_event(collection, uid, vcal) if dav.get("status") != "ok": raise DavUnavailable("Xandikos rechazó el evento: %s" % dav.get("error")) self._invalidate_calendar(collection) return {"uid": uid, "cal": collection, "dav": dav} def update_event(self, uid: str, data: "EventIn") -> dict: """Edita un VEVENT existente: reescribe el recurso ``.ics`` (PUT). Reutiliza el UID (idempotente). Construye el VCALENDAR de nuevo a partir del cuerpo recibido y lo sube. Invalida la caché de la colección. Returns: dict ``{uid, cal, dav}``. Raises: HTTPException(400): si la fecha es inválida o falta el summary. DavUnavailable: si Xandikos rechaza el PUT. """ if not data.summary or not data.summary.strip(): raise HTTPException(status_code=400, detail="el summary es obligatorio") collection = self._resolve_calendar(data.cal or "") try: vcal = _build_vcalendar(data, uid) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc dav = self._put_event(collection, uid, vcal) if dav.get("status") != "ok": raise DavUnavailable("Xandikos rechazó el evento: %s" % dav.get("error")) self._invalidate_calendar(collection) return {"uid": uid, "cal": collection, "dav": dav} def delete_event(self, uid: str, cal: str = "") -> dict: """Borra un VEVENT: elimina el recurso ``.ics`` de la colección. Trata 404 como idempotente (ya no existía). Invalida la caché de la colección. Returns: dict ``{uid, deleted, dav}``. Raises: DavUnavailable: si Xandikos falla con un error distinto de 404. """ collection = self._resolve_calendar(cal) password = self.xandikos_password() resource_path = collection + _safe_event_resource(uid) dav = dav_delete_resource( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, resource_path ) if dav.get("status") != "ok" and dav.get("http_status") == 404: dav = {"status": "ok", "http_status": 404, "idempotent": True} if dav.get("status") != "ok": raise DavUnavailable("Xandikos no pudo borrar: %s" % dav.get("error")) self._invalidate_calendar(collection) return {"uid": uid, "deleted": True, "dav": dav} def _put_event(self, collection: str, uid: str, vcalendar_text: str) -> dict: """Sube (PUT) un VCALENDAR a una colección CalDAV. No lanza por sí sola. Compone la función del registry ``caldav_put_event`` (deriva el nombre del recurso de ``safe(uid).ics``). Devuelve su dict ``{status, http_status|error}``. """ password = self.xandikos_password() return caldav_put_event( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection, uid, vcalendar_text, ) def _invalidate_calendar(self, collection: str) -> None: """Vacía la caché en memoria de una colección de calendario concreta.""" with self._dav_lock: self._calendar_cache.pop(collection, None) self._calendar_ctag.pop(collection, None) def _maybe_clear_force_reload(self) -> None: """Apaga el flag de refresh forzado una vez consumido por una recarga. Llamado bajo ``_dav_lock`` tras recargar una colección. El flag lo activa ``invalidate_dav`` (POST /api/refresh) para forzar UNA recarga que ignore el ctag; tras ella vuelve a la validación normal por ctag. """ self._force_reload = False def invalidate_dav(self) -> None: """Vacía las cachés de contactos y calendario y fuerza una recarga. Limpia las cachés en memoria y marca ``_force_reload`` para que el siguiente acceso a cada colección ignore el ctag cacheado y vuelva a descargar del servidor (el botón "refrescar" debe traer cambios aunque el ctag no se haya actualizado todavía). No borra la password. """ with self._dav_lock: self._contacts_cache = None self._calendar_cache = {} self._calendar_ctag = {} self._calendars_cache = None self._force_reload = True # --- Escritura de contactos: ficha .md (verdad) + reflejo en Xandikos ---- def _contact_note_path(self, tipo: str, slug: str) -> str: """Path absoluto de la ficha ``.md`` de un contacto en el vault. ``personas/.md`` o ``organizaciones/.md`` según el tipo. """ folder = _TIPO_FOLDER.get(tipo, "personas") return os.path.join(self._vault_real, folder, slug + ".md") def _put_vcard(self, slug: str, vcard_text: str) -> dict: """Sube (PUT) un vCard a Xandikos por su UID=slug. No lanza por sí sola. Reflejo inmediato del cambio en la ficha del vault, para que el contacto se vea ya en la app y en el móvil sin esperar al sync periódico. Devuelve el dict ``{status, http_status|error}`` de ``carddav_put_vcard``. """ password = self.xandikos_password() return carddav_put_vcard( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CONTACTS_COLLECTION, slug, vcard_text, ) def _delete_vcard(self, slug: str) -> dict: """Borra (DELETE) el vCard ``.vcf`` de Xandikos. No lanza. Compone ``dav_delete_resource`` con el href del recurso (mismo nombre que usó el PUT: ``.vcf``). Trata 404 como idempotente (ya no existía → objetivo cumplido), igual que un borrado repetido. """ password = self.xandikos_password() resource_path = XANDIKOS_CONTACTS_COLLECTION + slug + ".vcf" res = dav_delete_resource( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, resource_path ) if res.get("status") != "ok" and res.get("http_status") == 404: return {"status": "ok", "http_status": 404, "idempotent": True} return res def create_contact(self, data: "ContactIn") -> dict: """Crea un contacto: ficha ``.md`` (verdad) + reflejo del vCard. 1. Genera el slug del nombre. 409 si ya existe la ficha. 2. Escribe la ficha ``.md`` con el frontmatter canónico (acción primaria). 3. Hace PUT del vCard a Xandikos (reflejo inmediato; un fallo NO revierte la ficha — el sync periódico reconciliará). 4. Invalida las cachés DAV para que el contacto aparezca ya en la app. Returns: dict ``{slug, uid, path, dav}``. Raises: HTTPException(409): si ya existe una ficha con ese slug. HTTPException(400): si el tipo no es 'persona'|'organizacion' o el nombre está vacío. """ if not data.nombre.strip(): raise HTTPException(status_code=400, detail="el nombre es obligatorio") if _osint_db_backend_enabled(): # Flag ON: el osint_db es la fuente de verdad. Genera el slug igual que # el camino vault (mismo UID), envía el payload y deja que el service # escriba la DuckDB + empuje a Xandikos. slug = slugify_obsidian_name(data.nombre) if not slug: raise HTTPException( status_code=400, detail="el nombre no produce un slug válido" ) res = osintdb_client.create_contact(_osint_db_contact_payload(data, slug)) self.invalidate_dav() uid = res.get("uid") or slug return {"slug": uid, "uid": uid, "path": None, "osint_db": res} tipo = (data.tipo or "persona").strip() if tipo not in _TIPO_FOLDER: raise HTTPException( status_code=400, detail="tipo inválido '%s' (persona|organizacion)" % tipo, ) if not data.nombre.strip(): raise HTTPException(status_code=400, detail="el nombre es obligatorio") slug = slugify_obsidian_name(data.nombre) if not slug: raise HTTPException( status_code=400, detail="el nombre no produce un slug válido" ) note_path = self._contact_note_path(tipo, slug) if os.path.exists(note_path): raise HTTPException( status_code=409, detail="ya existe un contacto con slug '%s'" % slug ) frontmatter = _contact_frontmatter(data, slug) body = _contact_body(data.notas) folder = _TIPO_FOLDER[tipo] # Acción primaria: la ficha del vault es la fuente de verdad. create_obsidian_note( self.vault_dir, os.path.join(folder, slug), body=body, frontmatter=frontmatter, ) # Reflejo inmediato en Xandikos (no rompe el alta si Xandikos cae). vcard_fm = dict(frontmatter) vcard_fm["_notas"] = _norm_str(data.notas) dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug)) self.refresh() self.invalidate_dav() return {"slug": slug, "uid": slug, "path": note_path, "dav": dav} def update_contact(self, slug: str, data: "ContactIn") -> dict: """Edita un contacto existente: merge del frontmatter + re-PUT del vCard. Localiza la ficha por slug (en personas/ u organizaciones/ según el tipo de la ficha actual). 404 si no existe. Hace merge de los campos editables sobre el frontmatter actual (preserva campos heredados no tocados como ``sexo``, ``fecha_nacimiento``, ``horoscopo``), reescribe el body ``## Notas`` y re-sube el vCard. Invalida las cachés DAV. Returns: dict ``{slug, uid, path, dav}``. Raises: HTTPException(404): si no existe la ficha del contacto. """ if _osint_db_backend_enabled(): # Flag ON: delega la edición en el osint_db (PUT por UID). res = osintdb_client.update_contact( slug, _osint_db_contact_payload(data) ) self.invalidate_dav() return {"slug": slug, "uid": slug, "path": None, "osint_db": res} path = self._find_contact_note(slug) if path is None: raise HTTPException( status_code=404, detail="contacto '%s' no encontrado" % slug ) note = read_obsidian_note(path) current = dict(note.get("frontmatter") or {}) # Listas multi-valor (ya reconciladas con los singulares en ContactIn). telefonos = _norm_list(data.telefonos) emails = _norm_list(data.emails) direcciones = _norm_list(data.direcciones) # Merge de los campos editables (preserva los heredados no tocados). El # singular se conserva = primer elemento para los lectores viejos. merged = { "nombre": data.nombre.strip() or current.get("nombre") or slug, "aliases": _norm_list(data.aliases), "telefono": telefonos[0] if telefonos else None, "telefonos": telefonos, "email": emails[0] if emails else None, "emails": emails, "direccion": direcciones[0] if direcciones else None, "direcciones": direcciones, "pais": _norm_str(data.pais), "relaciones": _norm_list(data.relaciones), "contexto": _norm_str(data.contexto), } if current.get("tipo") != "organizacion": merged["dni"] = _norm_str(data.dni) current.update(merged) update_obsidian_note( path, body=_contact_body(data.notas), set_frontmatter=current ) vcard_fm = dict(current) vcard_fm["_notas"] = _norm_str(data.notas) dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug)) self.refresh() self.invalidate_dav() return {"slug": slug, "uid": slug, "path": path, "dav": dav} def delete_contact(self, slug: str) -> dict: """Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos. 404 si la ficha no existe. Borra el archivo ``.md`` (acción primaria) y el recurso ``.vcf`` de Xandikos (reflejo). Invalida las cachés DAV. Returns: dict ``{slug, deleted: True, dav}``. Raises: HTTPException(404): si no existe la ficha del contacto. """ if _osint_db_backend_enabled(): # Flag ON: delega el borrado en el osint_db (DELETE por UID). res = osintdb_client.delete_contact(slug) self.invalidate_dav() return {"slug": slug, "deleted": True, "osint_db": res} path = self._find_contact_note(slug) if path is None: raise HTTPException( status_code=404, detail="contacto '%s' no encontrado" % slug ) delete_obsidian_note(path) dav = self._delete_vcard(slug) self.refresh() self.invalidate_dav() return {"slug": slug, "deleted": True, "dav": dav} def _find_contact_note(self, slug: str): """Localiza la ficha ``.md`` de un contacto por slug, o None. Busca ``personas/.md`` y ``organizaciones/.md`` (los dos tipos de contacto). Devuelve el primer path existente o None. """ for folder in ("personas", "organizaciones"): candidate = os.path.join(self._vault_real, folder, slug + ".md") if os.path.isfile(candidate): return candidate return None # --------------------------------------------------------------------------- # Helpers DAV: parseo ligero de vCard / iCalendar a JSON # --------------------------------------------------------------------------- def _unescape_ical(value: str) -> str: """Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350).""" return ( value.replace("\\n", "\n") .replace("\\N", "\n") .replace("\\,", ",") .replace("\\;", ";") .replace("\\\\", "\\") ) def _unfold_lines(text: str) -> list: """Des-pliega las líneas continuadas (folding) de un vCard/iCalendar. RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la anterior. Esta función las une para parsearlas como propiedades completas. """ raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n") unfolded: list = [] for line in raw_lines: if line[:1] in (" ", "\t") and unfolded: unfolded[-1] += line[1:] else: unfolded.append(line) return unfolded def _parse_property(line: str) -> Optional[tuple]: """Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``. Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si la línea no es una propiedad (sin ``:``). El nombre se devuelve en mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``); los params como dict con claves en mayúsculas. """ if ":" not in line: return None head, value = line.split(":", 1) parts = head.split(";") name = parts[0].strip() # Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping). if "." in name: name = name.rsplit(".", 1)[-1] name = name.upper() params: dict = {} for part in parts[1:]: if "=" in part: k, v = part.split("=", 1) params[k.strip().upper()] = v.strip() return name, params, value def _vcard_to_json(vcard_text: str) -> dict: """Convierte un VCARD a un dict JSON con los campos de interés. Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME), teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque ``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` → ``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard ya viene troceado por ``split_vcards``. Expone tanto las claves en español que consume el frontend del task (``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el frontend a un único shape. """ out: dict = { "uid": None, "fn": None, "nickname": None, "org": None, "note": None, "phones": [], "emails": [], "direcciones": [], "osint": {}, } for line in _unfold_lines(vcard_text): parsed = _parse_property(line) if not parsed: continue name, params, value = parsed # ADR es estructurado (7 componentes separados por ';'): NO se des-escapa # antes de partir, para no confundir separadores con contenido escapado. if name == "ADR": adr = _parse_adr_value(value) if adr: out["direcciones"].append(adr) continue value = _unescape_ical(value.strip()) if name == "UID": out["uid"] = value elif name == "FN": out["fn"] = value elif name == "NICKNAME": out["nickname"] = value elif name == "ORG": out["org"] = value.replace(";", " ").strip() elif name == "NOTE": out["note"] = value elif name == "TEL": out["phones"].append({"value": value, "type": params.get("TYPE", "")}) elif name == "EMAIL": out["emails"].append({"value": value, "type": params.get("TYPE", "")}) elif name.startswith("X-OSINT-"): key = name[len("X-OSINT-") :].lower().replace("-", "_") if key: out["osint"][key] = value elif name == "N" and not out["fn"]: # Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido". comps = [c for c in value.split(";") if c] if len(comps) >= 2: out["fn"] = ("%s %s" % (comps[1], comps[0])).strip() elif comps: out["fn"] = comps[0] # Compat con vCards antiguos: la dirección iba en X-OSINT-DIRECCION (un solo # valor) en vez de ADR. Si vino por ahí y no hay ADR, súbela a direcciones[] # para que el frontend la vea como multi-valor; deja también osint.direccion # por si algún lector viejo lo consulta. legacy_dir = out["osint"].get("direccion") if legacy_dir and legacy_dir not in out["direcciones"]: out["direcciones"].append(legacy_dir) # Alias en español que consume el frontend del task (mismo dato, otra clave). out["nombre"] = out["fn"] out["alias"] = out["nickname"] out["nota"] = out["note"] out["telefonos"] = [p["value"] for p in out["phones"]] out["correos"] = [e["value"] for e in out["emails"]] return out def _parse_adr_value(raw: str) -> Optional[str]: """Extrae la dirección legible de un valor ADR estructurado (RFC 6350). El ADR tiene 7 componentes separados por ``;``: ``po-box;extended;street;locality;region;postal-code;country``. Esta función une los componentes no vacíos (des-escapados) en una sola línea legible, con preferencia por ``street``; si solo hay un campo, lo devuelve. Devuelve ``None`` si el ADR queda vacío. """ parts = raw.split(";") # Des-escapa cada componente por separado (el ';' ya se usó para partir). comps = [_unescape_ical(p.strip()) for p in parts] nonempty = [c for c in comps if c] if not nonempty: return None # street es el 3er componente (índice 2). Si está, suele bastar; si hay más # (locality, region, etc.) se concatenan con coma para una línea legible. if len(comps) >= 3 and comps[2]: tail = [c for c in comps[3:] if c] return ", ".join([comps[2]] + tail) if tail else comps[2] return ", ".join(nonempty) _VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE) _ICAL_DT_RE = re.compile( r"^(\d{4})(\d{2})(\d{2})(?:T(\d{2})(\d{2})(\d{2})?)?(Z)?$" ) def _zoneinfo(tzid: str): """``ZoneInfo(tzid)`` o ``None`` si el tz no existe / falta tzdata. Nunca lanza: un TZID desconocido (o un sistema sin base de zonas) degrada a None y el llamador trata la hora como naive/local, sin tumbar el parseo. """ if ZoneInfo is None or not tzid: return None try: return ZoneInfo(tzid) except (ZoneInfoNotFoundError, ValueError, KeyError): return None def _parse_ical_datetime(value: str, params: dict) -> Optional[dict]: """Parsea un valor DTSTART/DTEND iCal a una representación normalizada. Maneja las tres formas del calendario: - UTC: ``20260611T090000Z`` (sufijo Z). - con zona: ``DTSTART;TZID=Europe/Madrid:20260611T090000`` (param TZID). - solo fecha (todo el día): ``DTSTART;VALUE=DATE:20260611`` o sin hora. Returns: dict ``{iso, tz, all_day, ical}`` o ``None`` si no parsea. ``iso`` es ISO 8601 con offset cuando hay zona/UTC (``2026-06-11T11:00:00+02:00``) o ``YYYY-MM-DD`` para todo el día; ``tz`` es el TZID original (``Europe/Madrid``, ``UTC``, o None si naive/all-day); ``all_day`` True si es solo fecha; ``ical`` el prefijo ``YYYYMMDD`` para el filtro de rango. """ value = (value or "").strip() m = _ICAL_DT_RE.match(value) if not m: return None year, month, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) has_time = m.group(4) is not None is_utc = m.group(7) == "Z" is_date_value = (params.get("VALUE", "").upper() == "DATE") or not has_time ical_prefix = "%04d%02d%02d" % (year, month, day) if is_date_value: return { "iso": "%04d-%02d-%02d" % (year, month, day), "tz": None, "all_day": True, "ical": ical_prefix, } hour = int(m.group(4)) minute = int(m.group(5)) second = int(m.group(6)) if m.group(6) else 0 tzid = params.get("TZID", "") if is_utc: dt = datetime(year, month, day, hour, minute, second, tzinfo=timezone.utc) tz_name = "UTC" elif tzid and _zoneinfo(tzid) is not None: dt = datetime(year, month, day, hour, minute, second, tzinfo=_zoneinfo(tzid)) tz_name = tzid else: # Hora "flotante" (sin Z ni TZID, o TZID desconocido): se interpreta como # local del visor. La servimos sin offset; el frontend la sitúa en su TZ. return { "iso": "%04d-%02d-%02dT%02d:%02d:%02d" % (year, month, day, hour, minute, second), "tz": tzid or None, "all_day": False, "ical": ical_prefix, } return {"iso": dt.isoformat(), "tz": tz_name, "all_day": False, "ical": ical_prefix} def _vevent_to_json(vevent_block: str) -> dict: """Convierte un bloque VEVENT a un dict JSON con los campos de interés. Extrae: uid, summary, dtstart/dtend (ISO con offset cuando hay zona/UTC, o ``YYYY-MM-DD`` para todo el día), la TZ original (``tz``), ``all_day``, location, description y color (propiedad ``COLOR`` RFC 7986 o ``X-APPLE-CALENDAR-COLOR`` si el evento la trae). ``dtstart_ical`` / ``dtend_ical`` conservan el prefijo ``YYYYMMDD`` crudo para el filtro de rango. Parseo ligero a mano (sin dependencia externa). """ out: dict = { "uid": None, "summary": None, "dtstart": None, "dtend": None, "dtstart_ical": None, "dtend_ical": None, "tz": None, "all_day": False, "location": None, "description": None, "color": None, "rrule": None, "recurring": False, "occurrence": False, } for line in _unfold_lines(vevent_block): parsed = _parse_property(line) if not parsed: continue name, params, value = parsed value = value.strip() if name == "UID": out["uid"] = value elif name == "SUMMARY": out["summary"] = _unescape_ical(value) elif name == "DTSTART": dt = _parse_ical_datetime(value, params) if dt: out["dtstart"] = dt["iso"] out["dtstart_ical"] = dt["ical"] out["tz"] = dt["tz"] out["all_day"] = dt["all_day"] else: out["dtstart"] = value elif name == "DTEND": dt = _parse_ical_datetime(value, params) if dt: out["dtend"] = dt["iso"] out["dtend_ical"] = dt["ical"] else: out["dtend"] = value elif name == "LOCATION": out["location"] = _unescape_ical(value) elif name == "DESCRIPTION": out["description"] = _unescape_ical(value) elif name in ("COLOR", "X-APPLE-CALENDAR-COLOR"): out["color"] = value elif name == "RRULE": out["rrule"] = value out["recurring"] = True return out def _vcalendar_to_events(vcalendar_text: str) -> list: """Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON.""" events = [] for block in _VEVENT_RE.findall(vcalendar_text): events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT")) return events def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool: """True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``. Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` (``dtstart_ical``); si falta, se deriva del ISO de ``dtstart``. Los límites se normalizan quitando los guiones, así acepta tanto ``2026-06-11`` como ``20260611``. ``dt_from``/``dt_to`` vacíos desactivan ese extremo del filtro. """ dtstart = event.get("dtstart_ical") or "" if not dtstart: dtstart = (event.get("dtstart") or "").replace("-", "")[:8] dtstart = dtstart.replace("-", "")[:8] if not dtstart: return True if dt_from and dtstart < dt_from.replace("-", "")[:8]: return False if dt_to and dtstart > dt_to.replace("-", "")[:8]: return False return True def _default_expand_start() -> str: """Límite inferior por defecto al expandir una serie sin rango explícito.""" return (datetime.now(timezone.utc) - timedelta(days=366)).strftime("%Y%m%d") def _default_expand_end() -> str: """Límite superior por defecto al expandir una serie sin rango explícito.""" return (datetime.now(timezone.utc) + timedelta(days=731)).strftime("%Y%m%d") def _shift_iso_days(value: str, days: int) -> str: """Desplaza la parte de fecha de un ISO (``YYYY-MM-DD`` o con ``T...``). Conserva intacta la parte horaria/offset (``T10:00:00+02:00``) y solo mueve la fecha ``days`` días. Para una fecha pura mueve la fecha sola. Si el valor no parsea, lo devuelve sin tocar (defensivo). """ if not value: return value date_part = value[:10] rest = value[10:] try: base = datetime.strptime(date_part, "%Y-%m-%d") except ValueError: return value shifted = (base + timedelta(days=days)).strftime("%Y-%m-%d") return shifted + rest def _occurrence_clone(event: dict, occ_ymd: str) -> dict: """Clona un evento maestro recurrente reubicado en la fecha ``occ_ymd``. Mantiene la hora local / offset del maestro (solo cambia la fecha) y aplica el mismo desplazamiento de días al ``dtend`` para preservar la duración. Marca ``occurrence=True`` cuando la fecha difiere de la del maestro (la primera ocurrencia coincide con el maestro y queda ``occurrence=False``). """ master_ymd = ( event.get("dtstart_ical") or (event.get("dtstart") or "").replace("-", "") )[:8] new_date_iso = "%s-%s-%s" % (occ_ymd[:4], occ_ymd[4:6], occ_ymd[6:8]) clone = dict(event) ds = event.get("dtstart") or "" if event.get("all_day") or len(ds) == 10: clone["dtstart"] = new_date_iso else: clone["dtstart"] = new_date_iso + ds[10:] clone["dtstart_ical"] = occ_ymd try: delta = ( datetime.strptime(occ_ymd, "%Y%m%d") - datetime.strptime(master_ymd, "%Y%m%d") ).days except ValueError: delta = 0 de = event.get("dtend") if de: clone["dtend"] = _shift_iso_days(de, delta) de_ical = event.get("dtend_ical") if de_ical: clone["dtend_ical"] = (clone["dtend"] or "").replace("-", "")[:8] clone["occurrence"] = occ_ymd != master_ymd return clone def _expand_event_occurrences(event: dict, dt_from: str, dt_to: str) -> list: """Expande un evento recurrente a sus ocurrencias dentro de ``[from, to]``. Compone la función pura del registry ``expand_rrule`` (solo necesita las FECHAS de cada ocurrencia; la hora local se preserva clonando el maestro). Si el evento no tiene ``rrule``, o algo no parsea, devuelve ``[event]`` sin tocar — nunca pierde el evento original. """ rrule = event.get("rrule") if not rrule: return [event] master_ymd = ( event.get("dtstart_ical") or (event.get("dtstart") or "").replace("-", "") )[:8] if len(master_ymd) < 8: return [event] rs = (dt_from or "").replace("-", "")[:8] or _default_expand_start() re_ = (dt_to or "").replace("-", "")[:8] or _default_expand_end() try: occ_dates = expand_rrule(master_ymd, rrule, rs, re_, all_day=True) except Exception: return [event] if not occ_dates: return [] return [_occurrence_clone(event, d) for d in occ_dates] # --------------------------------------------------------------------------- # Escritura de contactos: ficha .md del vault (fuente de verdad) + vCard # --------------------------------------------------------------------------- # Subcarpeta del vault por tipo de contacto. La fuente de verdad de un contacto # es su ficha en el vault (CONVENTIONS.md §3b para persona, §6 para organización); # Xandikos es solo el retransmisor al móvil. _TIPO_FOLDER = {"persona": "personas", "organizacion": "organizaciones"} # Tags por defecto de cada tipo (CONVENTIONS.md). Se preservan si la ficha ya # trae otros tags al editar. _TIPO_TAGS = { "persona": ["persona", "osint"], "organizacion": ["organizacion", "osint"], } class ContactIn(BaseModel): """Cuerpo de POST/PUT de un contacto (persona u organización). Refleja el esquema canónico del frontmatter (CONVENTIONS.md §3b/§6). Los campos vacíos se normalizan a ``null``/``[]`` al escribir la ficha, nunca se omiten, para que el score de completitud sea consistente. Multi-valor: un contacto puede tener VARIOS teléfonos, emails y direcciones (``telefonos``/``emails``/``direcciones``). Los campos singulares ``telefono``/``email``/``direccion`` se conservan por compatibilidad con clientes y lectores viejos: el validador ``model_post_init`` los reconcilia con las listas (singular → ``[valor]`` si la lista está vacía; y el singular se rellena con ``lista[0]`` para que los lectores que solo miran el singular sigan funcionando). """ tipo: str = Field(default="persona") nombre: str aliases: list[str] = Field(default_factory=list) # Singulares (compat) — el primer elemento de cada lista multi-valor. telefono: Optional[str] = None email: Optional[str] = None direccion: Optional[str] = None # Multi-valor: listas completas de teléfonos, emails y direcciones. telefonos: list[str] = Field(default_factory=list) emails: list[str] = Field(default_factory=list) direcciones: list[str] = Field(default_factory=list) dni: Optional[str] = None pais: Optional[str] = None contexto: Optional[str] = None relaciones: list[str] = Field(default_factory=list) notas: Optional[str] = None # Libreta (addressbook) destino. Solo se consume con el flag OSINT_DB_BACKEND # ON (el osint_db enruta el contacto a esa colección). Con el flag OFF se # ignora: hoy solo existe la libreta por defecto en el vault. None → libreta # por defecto. collection: Optional[str] = None def model_post_init(self, __context: object) -> None: """Reconcilia los campos singulares con las listas multi-valor. Para cada par (singular, lista): si la lista llega vacía pero el singular trae valor, la lista se siembra con ``[singular]`` (cliente viejo que solo envía el campo singular); y siempre se rellena el singular con el primer elemento normalizado de la lista, para que los lectores que solo miran el singular (frontmatter compat, vCard heredado) sigan funcionando. """ for singular, plural in ( ("telefono", "telefonos"), ("email", "emails"), ("direccion", "direcciones"), ): lista = _norm_list(getattr(self, plural)) if not lista: single = _norm_str(getattr(self, singular)) if single: lista = [single] object.__setattr__(self, plural, lista) object.__setattr__(self, singular, lista[0] if lista else None) def _norm_str(value: Optional[str]) -> Optional[str]: """Normaliza un string opcional: trim; cadena vacía → None.""" if value is None: return None value = value.strip() return value or None def _norm_list(values: Optional[list]) -> list: """Normaliza una lista de strings: trim cada item y descarta los vacíos.""" if not values: return [] out = [] for v in values: s = (v or "").strip() if s: out.append(s) return out def _contact_frontmatter(data: "ContactIn", slug: str) -> dict: """Construye el frontmatter canónico de la ficha de un contacto. Para ``tipo: persona`` sigue el esquema completo de CONVENTIONS.md §3b (todos los campos presentes, ``null``/``[]`` si vacíos). Para ``tipo: organizacion`` usa el subconjunto de §6. El orden de claves se preserva (``yaml.safe_dump(sort_keys=False)`` en ``format_obsidian_note``). """ nombre = data.nombre.strip() aliases = _norm_list(data.aliases) relaciones = _norm_list(data.relaciones) # Listas multi-valor ya reconciladas en ContactIn.model_post_init; el campo # singular = primer elemento (o None) para los lectores viejos. telefonos = _norm_list(data.telefonos) emails = _norm_list(data.emails) direcciones = _norm_list(data.direcciones) if data.tipo == "organizacion": return { "tipo": "organizacion", "nombre": nombre, "slug": slug, "aliases": aliases, "telefono": telefonos[0] if telefonos else None, "telefonos": telefonos, "email": emails[0] if emails else None, "emails": emails, "direccion": direcciones[0] if direcciones else None, "direcciones": direcciones, "pais": _norm_str(data.pais), "relaciones": relaciones, "contexto": _norm_str(data.contexto), "fuente": "osint_web (alta manual)", "tags": list(_TIPO_TAGS["organizacion"]), } # Persona: esquema canónico §3b. return { "tipo": "persona", "nombre": nombre, "slug": slug, "aliases": aliases, "sexo": None, "fecha_nacimiento": None, "dni": _norm_str(data.dni), "telefono": telefonos[0] if telefonos else None, "telefonos": telefonos, "email": emails[0] if emails else None, "emails": emails, "direccion": direcciones[0] if direcciones else None, "direcciones": direcciones, "pais": _norm_str(data.pais), "relaciones": relaciones, "contexto": _norm_str(data.contexto), "fuente": "osint_web (alta manual)", "tags": list(_TIPO_TAGS["persona"]), } def _contact_body(notas: Optional[str]) -> str: """Cuerpo Markdown de la ficha: sección ``## Notas`` con el texto libre.""" notas = _norm_str(notas) if notas: return "## Notas\n\n%s\n" % notas return "## Notas\n" def _vcard_escape(value: str) -> str: """Escapa un valor de texto para una línea vCard (RFC 6350).""" return ( value.replace("\\", "\\\\") .replace("\n", "\\n") .replace(",", "\\,") .replace(";", "\\;") ) def _vcard_value_list(frontmatter: dict, plural: str, singular: str) -> list: """Lista de valores de un campo multi-valor del frontmatter de contacto. Prefiere la clave plural (``telefonos``/``emails``/``direcciones``); si está vacía cae al singular (``telefono``/...) por compatibilidad con fichas antiguas. Normaliza (trim + descarta vacíos) y devuelve una lista de strings. """ values = frontmatter.get(plural) if not values: single = frontmatter.get(singular) values = [single] if single else [] return _norm_list([str(v) for v in values]) def _build_vcard(frontmatter: dict, slug: str) -> str: """Serializa un frontmatter de contacto a un VCARD 3.0 con el UID = slug. Soporta multi-valor: emite una línea ``TEL`` por teléfono, una ``EMAIL`` por email y una ``ADR`` por dirección (campos ``telefonos``/``emails``/ ``direcciones`` del frontmatter; cae a los singulares ``telefono``/... por compat). Mapea además: nombre→FN, aliases→NICKNAME, notas→NOTE, organización→ORG; y los campos OSINT (dni, pais, contexto, sexo, fecha_nacimiento) a propiedades ``X-OSINT-*`` que el parser ``_vcard_to_json`` ya entiende. El UID es el slug → idempotente: re-subir el mismo slug sobrescribe el recurso ``.vcf``. """ nombre = (frontmatter.get("nombre") or slug).strip() lines = [ "BEGIN:VCARD", "VERSION:3.0", "UID:%s" % slug, "FN:%s" % _vcard_escape(nombre), ] aliases = frontmatter.get("aliases") or [] if aliases: lines.append("NICKNAME:%s" % _vcard_escape(",".join(str(a) for a in aliases))) if frontmatter.get("tipo") == "organizacion": lines.append("ORG:%s" % _vcard_escape(nombre)) # Multi-valor: una línea TEL/EMAIL por elemento. for tel in _vcard_value_list(frontmatter, "telefonos", "telefono"): lines.append("TEL;TYPE=CELL:%s" % _vcard_escape(tel)) for email in _vcard_value_list(frontmatter, "emails", "email"): lines.append("EMAIL;TYPE=INTERNET:%s" % _vcard_escape(email)) # Direcciones → ADR estructurado (la dirección va en el componente street; # los separadores ';' del ADR NO se escapan, solo el contenido). Una línea # ADR por dirección. El parser _vcard_to_json reconstruye la lista desde ADR. for adr in _vcard_value_list(frontmatter, "direcciones", "direccion"): lines.append("ADR;TYPE=HOME:;;%s;;;;" % _vcard_escape(adr)) # Campos OSINT → X-OSINT-* (los recoge _vcard_to_json en el bloque osint). for fm_key, x_name in ( ("dni", "X-OSINT-DNI"), ("pais", "X-OSINT-PAIS"), ("contexto", "X-OSINT-CONTEXTO"), ("sexo", "X-OSINT-SEXO"), ("fecha_nacimiento", "X-OSINT-FECHA-NACIMIENTO"), ): val = frontmatter.get(fm_key) if val: lines.append("%s:%s" % (x_name, _vcard_escape(str(val)))) notas = frontmatter.get("_notas") if notas: lines.append("NOTE:%s" % _vcard_escape(str(notas))) lines.append("END:VCARD") return "\r\n".join(lines) + "\r\n" # --------------------------------------------------------------------------- # Escritura de eventos del calendario: construcción de VEVENT / VCALENDAR # --------------------------------------------------------------------------- _ISO_DT_RE = re.compile( r"^(\d{4})-(\d{2})-(\d{2})(?:[T ](\d{2}):(\d{2})(?::(\d{2}))?)?" r"(Z|[+-]\d{2}:?\d{2})?$" ) # Saneado del nombre del recurso .ics: DEBE coincidir con el que aplica # caldav_put_event internamente (mismo patrón), para que el DELETE apunte al # recurso que el PUT creó. _UNSAFE_RESOURCE_RE = re.compile(r"[^A-Za-z0-9_.-]") def _safe_event_resource(uid: str) -> str: """Nombre del recurso ``.ics`` de un UID (igual que caldav_put_event).""" return _UNSAFE_RESOURCE_RE.sub("_", uid)[:120] + ".ics" class EventIn(BaseModel): """Cuerpo de POST/PUT de un evento del calendario (VEVENT). Las fechas se aceptan en ISO local sin offset (``2026-06-15T10:00``) + un ``tz`` (TZID, p.ej. ``Europe/Madrid``); o con offset/``Z`` ya incluido. El servidor las normaliza al construir el VEVENT (``DTSTART;TZID=...`` o ``...Z``). Para eventos de todo el día basta ``dtstart`` = ``2026-06-15`` con ``all_day=True``. """ cal: Optional[str] = None summary: str dtstart: str dtend: Optional[str] = None tz: Optional[str] = Field(default="Europe/Madrid") all_day: bool = False location: Optional[str] = None description: Optional[str] = None color: Optional[str] = None # Regla de recurrencia iCalendar SIN el prefijo "RRULE:" (p.ej. # "FREQ=WEEKLY;INTERVAL=1;COUNT=10"). None / "" → evento puntual. Editar un # evento recurrente reescribe toda la serie (no se soporta editar una sola # ocurrencia). rrule: Optional[str] = None class CalendarIn(BaseModel): """Cuerpo de POST /api/calendars: crea una colección de calendario nueva. El ``slug`` es el segmento de URL de la colección (lo sanea la función del registry ``dav_make_calendar`` a ``[a-z0-9_-]``). ``name`` es el nombre visible; ``color`` un hex ``#rrggbb`` opcional. """ slug: str name: Optional[str] = "" color: Optional[str] = None description: Optional[str] = None class AddressbookIn(BaseModel): """Cuerpo de POST /api/addressbooks: crea una libreta de contactos nueva. ``slug`` es el segmento de URL de la colección CardDAV; ``name`` el nombre visible; ``color`` un hex ``#rrggbb`` opcional. Solo se procesa con el flag ``OSINT_DB_BACKEND`` activo (el osint_db crea la colección en Xandikos). """ slug: str name: Optional[str] = "" color: Optional[str] = None def _parse_iso_input(value: str) -> Optional[dict]: """Parsea una fecha de entrada ISO a ``{year,month,day,hour,minute,second, offset,date_only}`` o ``None``. Acepta ``2026-06-15``, ``2026-06-15T10:00``, ``2026-06-15T10:00:00`` y variantes con ``Z`` o ``±HH:MM`` al final. Es tolerante a la separación con espacio en vez de ``T``. """ value = (value or "").strip() m = _ISO_DT_RE.match(value) if not m: return None has_time = m.group(4) is not None return { "year": int(m.group(1)), "month": int(m.group(2)), "day": int(m.group(3)), "hour": int(m.group(4)) if has_time else 0, "minute": int(m.group(5)) if has_time else 0, "second": int(m.group(6)) if (has_time and m.group(6)) else 0, "offset": m.group(7), "date_only": not has_time, } def _ical_dt_property(prop: str, value: str, tz: Optional[str], all_day: bool) -> str: """Construye una línea DTSTART/DTEND iCal a partir de una fecha de entrada. - all_day → ``DTSTART;VALUE=DATE:YYYYMMDD``. - con offset/``Z`` en la entrada → convierte a UTC: ``DTSTART:...Z``. - con ``tz`` válido → ``DTSTART;TZID=:YYYYMMDDTHHMMSS`` (hora local del tz, el VTIMEZONE lo aporta el VCALENDAR). - sin tz ni offset → hora flotante ``DTSTART:YYYYMMDDTHHMMSS``. Raises: ValueError: si ``value`` no es una fecha ISO reconocible. """ p = _parse_iso_input(value) if p is None: raise ValueError("fecha inválida: %r (usa ISO YYYY-MM-DD[THH:MM])" % value) ymd = "%04d%02d%02d" % (p["year"], p["month"], p["day"]) if all_day or p["date_only"]: return "%s;VALUE=DATE:%s" % (prop, ymd) hms = "%02d%02d%02d" % (p["hour"], p["minute"], p["second"]) if p["offset"]: # La entrada ya trae offset/Z: la pasamos a UTC absoluto. dt = datetime.fromisoformat( "%04d-%02d-%02dT%02d:%02d:%02d%s" % ( p["year"], p["month"], p["day"], p["hour"], p["minute"], p["second"], _normalize_offset(p["offset"]), ) ) dt_utc = dt.astimezone(timezone.utc) return "%s:%s" % (prop, dt_utc.strftime("%Y%m%dT%H%M%SZ")) if tz and _zoneinfo(tz) is not None: return "%s;TZID=%s:%sT%s" % (prop, tz, ymd, hms) # Hora flotante (sin tz reconocible): la escribimos sin Z. return "%s:%sT%s" % (prop, ymd, hms) def _normalize_offset(offset: str) -> str: """Normaliza un offset ISO a la forma ``±HH:MM`` que entiende fromisoformat.""" if offset == "Z": return "+00:00" if len(offset) == 5 and ":" not in offset: # ±HHMM return offset[:3] + ":" + offset[3:] return offset def _vtimezone_block(tz: str) -> str: """Bloque VTIMEZONE mínimo para un TZID, con el offset estándar y de verano. Calcula los offsets reales del tz para enero (estándar) y julio (verano) del año actual con ``zoneinfo`` y emite un VTIMEZONE con ambas observancias. Es una aproximación suficiente para que el cliente (y Xandikos) resuelvan la hora local; no reproduce las reglas RRULE exactas. Devuelve cadena vacía si el tz es UTC, desconocido, o no hace falta (el evento se sirve igual sin él). """ zone = _zoneinfo(tz) if zone is None or tz.upper() == "UTC": return "" year = datetime.now().year jan = datetime(year, 1, 15, 12, tzinfo=zone) jul = datetime(year, 7, 15, 12, tzinfo=zone) std_off = jan.utcoffset() or timedelta(0) dst_off = jul.utcoffset() or timedelta(0) def _fmt(off: timedelta) -> str: total = int(off.total_seconds()) sign = "+" if total >= 0 else "-" total = abs(total) return "%s%02d%02d" % (sign, total // 3600, (total % 3600) // 60) lines = ["BEGIN:VTIMEZONE", "TZID:%s" % tz] if dst_off != std_off: # Tiene horario de verano: dos observancias (estándar + verano). lines += [ "BEGIN:STANDARD", "DTSTART:19701025T030000", "TZOFFSETFROM:%s" % _fmt(dst_off), "TZOFFSETTO:%s" % _fmt(std_off), "END:STANDARD", "BEGIN:DAYLIGHT", "DTSTART:19700329T020000", "TZOFFSETFROM:%s" % _fmt(std_off), "TZOFFSETTO:%s" % _fmt(dst_off), "END:DAYLIGHT", ] else: lines += [ "BEGIN:STANDARD", "DTSTART:19700101T000000", "TZOFFSETFROM:%s" % _fmt(std_off), "TZOFFSETTO:%s" % _fmt(std_off), "END:STANDARD", ] lines.append("END:VTIMEZONE") return "\r\n".join(lines) def _build_vcalendar(data: "EventIn", uid: str) -> str: """Serializa un ``EventIn`` a un VCALENDAR 2.0 con un VEVENT y su UID. Construye DTSTART/DTEND respetando ``tz``/``all_day``/offset (ver ``_ical_dt_property``), añade un VTIMEZONE si el evento usa un TZID con horario de verano, y mapea summary/location/description/color. El UID se reutiliza al editar → idempotente (el recurso ``.ics`` se sobrescribe). Raises: ValueError: si ``dtstart`` no es una fecha ISO reconocible. """ dtstamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") tz = data.tz or "" body = [ "BEGIN:VCALENDAR", "VERSION:2.0", "PRODID:-//osint_web//calendar//ES", "CALSCALE:GREGORIAN", ] if not data.all_day and tz and _zoneinfo(tz) is not None: vtz = _vtimezone_block(tz) if vtz: body.append(vtz) vevent = [ "BEGIN:VEVENT", # Sanitizamos el UID (quitamos saltos de línea) para que no pueda inyectar # propiedades/componentes iCal nuevos en el VEVENT. "UID:%s" % str(uid).replace("\r", "").replace("\n", ""), "DTSTAMP:%s" % dtstamp, _ical_dt_property("DTSTART", data.dtstart, tz, data.all_day), ] if data.dtend: vevent.append(_ical_dt_property("DTEND", data.dtend, tz, data.all_day)) vevent.append("SUMMARY:%s" % _vcard_escape(data.summary.strip())) if data.location and data.location.strip(): vevent.append("LOCATION:%s" % _vcard_escape(data.location.strip())) if data.description and data.description.strip(): vevent.append("DESCRIPTION:%s" % _vcard_escape(data.description.strip())) if data.color and data.color.strip(): # COLOR (RFC 7986) — nombre CSS3 o, para clientes Apple, el hex va aparte. vevent.append("X-APPLE-CALENDAR-COLOR:%s" % data.color.strip()) rrule = (data.rrule or "").strip() if rrule: # Acepta tanto "FREQ=..." como "RRULE:FREQ=..."; normaliza a la línea # canónica "RRULE:" que entienden Xandikos y los clientes (DAVx5). if rrule.upper().startswith("RRULE:"): rrule = rrule[len("RRULE:"):].strip() # Sanitizar: quitar saltos de línea para que el valor de la RRULE no # inyecte propiedades/componentes nuevos (los `;`/`,` son separadores # legítimos de la regla, así que no se escapan). vevent.append("RRULE:%s" % rrule.replace("\r", "").replace("\n", "")) vevent.append("END:VEVENT") body.append("\r\n".join(vevent)) body.append("END:VCALENDAR") return "\r\n".join(body) + "\r\n" # --------------------------------------------------------------------------- # Construcción de la app FastAPI # --------------------------------------------------------------------------- def create_app(vault_dir: str) -> FastAPI: """Crea la app FastAPI ligada a un vault concreto. Valida que el vault existe al construir el ``VaultState`` (no un 500 silencioso en el primer request). Registra todos los endpoints sobre un estado compartido en memoria. """ state = VaultState(vault_dir) app = FastAPI(title="osint_web", version="0.1.0") # Anti DNS-rebinding: solo acepta requests cuyo Host sea localhost. Cierra el # vector por el que una web maliciosa rebindea su dominio a 127.0.0.1 y, desde # el navegador del usuario, alcanza este service local (sin auth) o el de DuckDB. from starlette.middleware.trustedhost import TrustedHostMiddleware app.add_middleware( TrustedHostMiddleware, allowed_hosts=["127.0.0.1", "localhost", "testserver"], ) app.state.vault = state # -- Vault -- @app.get("/api/health") def health() -> dict: """Health check: confirma que el servidor está vivo y el vault cargado.""" return { "status": "ok", "vault": state.vault_dir, "nodes": len(state.graph["nodes"]), "edges": len(state.graph["edges"]), } @app.get("/api/graph") def api_graph() -> dict: """Grafo completo del vault (nodos + aristas + conteos) para sigma.js. Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar notas. """ return state.graph_payload() @app.get("/api/nodes") def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict: """Filas de la tabla de un tipo concreto (frontmatter aplanado). Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve todos. """ rows = state.rows_by_tipo(tipo) return {"tipo": tipo, "count": len(rows), "rows": rows} @app.get("/api/node/{slug}") def api_node(slug: str) -> dict: """Ficha de un nodo: frontmatter + body + lista de attachments.""" detail = state.node_detail(slug) if detail is None: raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug) return detail @app.get("/api/attachment") def api_attachment( path: str = Query(..., description="path relativo al vault"), ) -> FileResponse: """Sirve el binario de un attachment, con allowlist ESTRICTA al vault. El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica que cae dentro del vault: cualquier intento de salir (``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el archivo no existe (pero está dentro del vault), 404. """ abs_path = state.resolve_attachment_path(path) if abs_path is None: # Distinguimos traversal (403) de inexistente-dentro-del-vault (404). candidate = os.path.realpath(os.path.join(state._vault_real, path or "")) if state._is_within_vault(candidate) and candidate != state._vault_real: raise HTTPException(status_code=404, detail="attachment no encontrado") raise HTTPException(status_code=403, detail="path fuera del vault") return FileResponse(abs_path) @app.get("/api/search") def api_search(q: str = Query(..., min_length=1)) -> dict: """Nodos del grafo cuyas notas matchean la query (substring).""" results = state.search(q) return {"query": q, "count": len(results), "results": results} # -- Xandikos: contactos (CardDAV) -- @app.get("/api/contacts") def api_contacts() -> JSONResponse: """Contactos del addressbook Xandikos, parseados a JSON (cacheados). Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[], osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``). La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la invalida). Si Xandikos / el osint_db no responde o falta la password → 503 con un JSON de error claro, nunca un crash. """ try: contacts = state.contacts() except (RuntimeError, DavUnavailable, osintdb_client.OsintDbUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={"status": "ok", "count": len(contacts), "contacts": contacts} ) @app.get("/api/contact/{uid}") def api_contact(uid: str) -> JSONResponse: """Un contacto concreto (por UID) parseado a JSON, desde la caché. Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no responde o falta la password. """ try: contacts = state.contacts() except (RuntimeError, DavUnavailable, osintdb_client.OsintDbUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) match = next((c for c in contacts if c.get("uid") == uid), None) if match is None: # Tolerancia: aceptar también el segmento final del href (nombre del # recurso .vcf) cuando el UID no coincide literalmente. match = next( ( c for c in contacts if uid in (c.get("href") or "").rsplit("/", 1)[-1] ), None, ) if match is None: raise HTTPException( status_code=404, detail="contacto '%s' no encontrado" % uid ) return JSONResponse(content={"status": "ok", "contact": match}) # -- Contactos: CRUD (ficha .md del vault = verdad, vCard = reflejo) -- @app.post("/api/contact") def api_create_contact(data: ContactIn = Body(...)) -> JSONResponse: """Crea un contacto: escribe la ficha ``.md`` del vault + el vCard. La ficha del vault es la fuente de verdad (CONVENTIONS.md §3b/§6); Xandikos se actualiza de inmediato para que el contacto se vea ya en la app y en el móvil. 409 si el slug ya existe. Devuelve ``{slug, uid}``. Con el flag ``OSINT_DB_BACKEND`` ON el alta va al osint_db; 503 si no responde. """ try: result = state.create_contact(data) except osintdb_client.OsintDbUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(status_code=201, content={"status": "ok", **result}) @app.put("/api/contact/{slug}") def api_update_contact(slug: str, data: ContactIn = Body(...)) -> JSONResponse: """Edita la ficha de un contacto (merge frontmatter) + re-sube el vCard. 404 si no existe la ficha. Preserva campos heredados no editables (``sexo``, ``fecha_nacimiento``, ...). Devuelve ``{slug, uid}``. Con el flag ``OSINT_DB_BACKEND`` ON la edición va al osint_db; 503 si no responde. """ try: result = state.update_contact(slug, data) except osintdb_client.OsintDbUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(content={"status": "ok", **result}) @app.delete("/api/contact/{slug}") def api_delete_contact(slug: str) -> JSONResponse: """Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos. 404 si no existe la ficha. Devuelve confirmación ``{slug, deleted}``. Con el flag ``OSINT_DB_BACKEND`` ON el borrado va al osint_db; 503 si no responde. """ try: result = state.delete_contact(slug) except osintdb_client.OsintDbUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(content={"status": "ok", **result}) # -- Libretas (addressbooks) de contactos -- @app.get("/api/addressbooks") def api_addressbooks() -> JSONResponse: """Libretas de contactos disponibles para el selector del frontend. Cada una: ``{slug, display_name, collection_path, color}``. Con el flag ``OSINT_DB_BACKEND`` ON vienen del osint_db; con el flag OFF se devuelve solo la libreta por defecto del vault. 503 si el osint_db no responde. """ try: books = state.list_addressbooks() except osintdb_client.OsintDbUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={ "status": "ok", "count": len(books), "addressbooks": books, "default": DEFAULT_ADDRESSBOOK_SLUG, } ) @app.post("/api/addressbooks") def api_create_addressbook(data: AddressbookIn = Body(...)) -> JSONResponse: """Crea una libreta de contactos nueva. Body: ``{slug, name?, color?}``. Requiere el flag ``OSINT_DB_BACKEND`` activo (el osint_db crea la colección CardDAV en Xandikos); con el flag OFF devuelve 501 claro. 503 si el osint_db no responde. """ try: res = state.create_addressbook(data) except osintdb_client.OsintDbUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(status_code=201, content={"status": "ok", **res}) # -- Xandikos: calendario (CalDAV) -- @app.get("/api/calendars") def api_calendars() -> JSONResponse: """Colecciones de calendario bajo el calendar-home, con nombre y color. Cada una: ``{href, name, color}``. Alimenta el selector de calendario del frontend. 503 con JSON de error si Xandikos no responde. """ try: calendars = state.list_calendars() except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={ "status": "ok", "count": len(calendars), "calendars": calendars, "default": XANDIKOS_CALENDAR_COLLECTION, } ) @app.post("/api/calendars") def api_create_calendar(data: CalendarIn = Body(...)) -> JSONResponse: """Crea una colección de calendario nueva (MKCALENDAR + nombre/color). Body: ``{slug, name?, color?, description?}``. Idempotente si ya existe. Devuelve ``{status, href, existed?}``. 400 si falta el nombre; 503 si Xandikos no responde. """ try: res = state.create_calendar(data) except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(status_code=201, content={"status": "ok", **res}) @app.get("/api/calendar") def api_calendar( cal: str = Query("", description="colección de calendario (ruta o nombre)"), from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"), to: str = Query("", description="fecha fin YYYY-MM-DD"), ) -> JSONResponse: """Eventos de una colección del calendario Xandikos en ``[from, to]``. Cada evento: ``{uid, summary, dtstart, dtend, tz, all_day, location, description, color}`` (dtstart/dtend en ISO con offset). ``cal`` elige la colección (default la actual). La descarga + parseo se cachean (``POST /api/refresh`` invalida); el filtro por rango va sobre la caché. Si Xandikos no responde → 503 con JSON de error claro, nunca un crash. """ try: events = state.calendar(cal, from_, to) except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={"status": "ok", "count": len(events), "events": events} ) # -- Calendario: CRUD de eventos (VEVENT) -- @app.post("/api/event") def api_create_event(data: EventIn = Body(...)) -> JSONResponse: """Crea un VEVENT en una colección de calendario (PUT de un VCALENDAR). Body: ``{cal?, summary, dtstart, dtend?, tz?, all_day?, location?, description?, color?}``. Genera el UID. 400 si la fecha es inválida; 503 si Xandikos rechaza el evento. Devuelve ``{uid, cal}``. """ try: result = state.create_event(data) except DavUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(status_code=201, content={"status": "ok", **result}) @app.put("/api/event/{uid}") def api_update_event(uid: str, data: EventIn = Body(...)) -> JSONResponse: """Edita un VEVENT existente (reescribe ``.ics``). Reutiliza el UID. 400 si la fecha es inválida; 503 si Xandikos rechaza. Devuelve ``{uid, cal}``. """ try: result = state.update_event(uid, data) except DavUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(content={"status": "ok", **result}) @app.delete("/api/event/{uid}") def api_delete_event( uid: str, cal: str = Query("", description="colección de calendario (ruta o nombre)"), ) -> JSONResponse: """Borra un VEVENT (``.ics``) de una colección de calendario. 404 de Xandikos se trata como idempotente. 503 si falla por otra causa. Devuelve ``{uid, deleted}``. """ try: result = state.delete_event(uid, cal) except DavUnavailable as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse(content={"status": "ok", **result}) # -- Refresco de cachés -- @app.post("/api/refresh") def api_refresh() -> dict: """Reconstruye la caché del grafo del vault e invalida las cachés DAV. Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y calendario, que se recargarán perezosamente en el siguiente acceso. Devuelve el conteo del grafo recién reconstruido. """ summary = state.refresh() state.invalidate_dav() return {"status": "refreshed", **summary} return app # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def _parse_args(argv: Optional[list] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos." ) parser.add_argument( "--vault", default=os.path.expanduser("~/Obsidian/osint"), help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)", ) parser.add_argument( "--host", default="127.0.0.1", help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)", ) parser.add_argument( "--port", type=int, default=8470, help="puerto local (default: 8470)" ) return parser.parse_args(argv) def main(argv: Optional[list] = None) -> int: args = _parse_args(argv) if args.host != "127.0.0.1": # Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red. print( "ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos " "personales sensibles; exponerlo a la red es un riesgo.", file=sys.stderr, ) try: app = create_app(args.vault) except (FileNotFoundError, NotADirectoryError) as exc: print(f"error: {exc}", file=sys.stderr) return 2 import uvicorn state = app.state.vault print( f"osint_web backend en http://{args.host}:{args.port} — vault: " f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, " f"{len(state.graph['edges'])} aristas)" ) uvicorn.run(app, host=args.host, port=args.port, log_level="info") return 0 if __name__ == "__main__": sys.exit(main())