#!/usr/bin/env python3 """Backend FastAPI de la app osint_web. Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres fuentes de datos para un frontend web local: 1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo, fichas con galería de attachments y búsqueda global). 2. La agenda CardDAV del servidor Xandikos (contactos). 3. El calendario CalDAV del servidor Xandikos (eventos). Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de embeds ni protocolo DAV. Orquesta funciones del registry de los grupos ``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret`` para la credencial de Xandikos. La única lógica propia de la app es: cacheo en memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y el parseo ligero de vCard/iCalendar a JSON. Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint ``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado cae dentro del vault; cualquier intento de path traversal devuelve 403. Uso: python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470 Endpoints (JSON salvo /api/attachment): GET /api/health estado + tamaño del grafo cacheado GET /api/graph grafo completo {nodes, edges} para sigma.js GET /api/nodes?tipo=persona filas de la tabla de ese tipo GET /api/node/ ficha: frontmatter + body + attachments GET /api/attachment?path=.. binario del attachment (path relativo al vault) GET /api/search?q=... nodos cuyo contenido matchea la query GET /api/contacts contactos del addressbook Xandikos (CardDAV) GET /api/contact/ un vCard concreto a JSON GET /api/calendar?from=&to= eventos del calendario Xandikos (CalDAV) POST /api/refresh re-escanea el vault y reconstruye la caché """ from __future__ import annotations import argparse import importlib.util import json import os import re import sys import threading import time from typing import Optional def _registry_functions_dir() -> str: """Localiza ``python/functions`` del fn_registry sin paths hardcodeados. Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y ``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el backend funciona en cualquier PC con el layout estándar del registry (la app vive en ``/projects/osint/apps/osint_web/server/``) sin hardcodear el home de un usuario concreto (memoria ``hardcoded-lucas-paths``). """ env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS") if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")): return env_functions candidates: list[str] = [] env_root = os.environ.get("FN_REGISTRY_ROOT") if env_root: candidates.append(env_root) current = os.path.dirname(os.path.abspath(__file__)) while True: candidates.append(current) parent = os.path.dirname(current) if parent == current: break current = parent candidates.append("/home/enmanuel/fn_registry") for root in candidates: functions_dir = os.path.join(root, "python", "functions") if os.path.isdir(os.path.join(functions_dir, "obsidian")): return functions_dir raise RuntimeError( "no se encontró python/functions/obsidian subiendo desde " f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz " "del fn_registry" ) _FUNCTIONS_DIR = _registry_functions_dir() sys.path.insert(0, _FUNCTIONS_DIR) from fastapi import FastAPI, HTTPException, Query # noqa: E402 from fastapi.responses import FileResponse, JSONResponse # noqa: E402 # --- Grupo de capacidad obsidian (parseo del vault) --- # El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así # que se importa directamente. from obsidian import ( # noqa: E402 (sys.path debe resolverse antes) build_obsidian_graph, extract_obsidian_embeds, list_obsidian_notes, read_obsidian_note, resolve_obsidian_embed, search_obsidian_notes, slugify_obsidian_name, ) def _load_infra_fn(module_name: str, attr: str): """Carga una función del paquete ``infra`` por archivo, sin tocar su __init__. El ``infra/__init__.py`` del registry importa de forma eager ``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que esta app no usa. Importar ``from infra.dav_list_resources import ...`` arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se carga cada módulo concreto del grupo ``dav`` directamente por su ruta de archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo registry-first: se usa la función del registry sin reimplementarla, solo se importa de forma quirúrgica. """ file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py") spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path) if spec is None or spec.loader is None: # pragma: no cover - defensivo raise ImportError("no se pudo cargar %s" % file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return getattr(module, attr) # --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass --- # dav_get_collection trae TODOS los recursos (vCards / VCALENDARs) de una # colección en UNA petición REPORT con el contenido inline; dav_collection_ctag # lee el ctag de la colección (PROPFIND Depth:0 barato) para validar la caché en # disco sin descargar nada cuando nada cambió. dav_get_collection = _load_infra_fn("dav_get_collection", "dav_get_collection") dav_collection_ctag = _load_infra_fn("dav_collection_ctag", "dav_collection_ctag") split_vcards = _load_infra_fn("split_vcards", "split_vcards") pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret") # --------------------------------------------------------------------------- # Configuración Xandikos (CardDAV / CalDAV) # --------------------------------------------------------------------------- XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com" XANDIKOS_USERNAME = "enmanuel" XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel" XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/" XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/" # Caché en disco de los datos DAV ya parseados, indexada por el ctag de la # colección. Al arrancar el server lee el ctag (PROPFIND barato ~0.1s) y, si # coincide con el de la caché en disco, sirve los contactos/eventos ya parseados # sin descargar ni reparsear nada (arranque instantáneo). El directorio vive # junto al server y está gitignored (datos personales sensibles + regenerable). _CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache") _CONTACTS_CACHE_FILE = os.path.join(_CACHE_DIR, "contacts.json") _CALENDAR_CACHE_FILE = os.path.join(_CACHE_DIR, "calendar.json") # Extensiones de imagen que el frontend muestra en la galería con lightbox. _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"} class DavUnavailable(Exception): """Xandikos no responde (sin red, timeout, auth caída). Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los endpoints del vault, que deben seguir funcionando offline. """ def _attachment_kind(name: str) -> str: """Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``.""" ext = os.path.splitext(name)[1].lower() if ext in _IMAGE_EXTS: return "image" if ext == ".pdf": return "pdf" return "other" def _read_pass_secret(entry: str) -> str: """Lee la contraseña (primera línea) de una entrada de ``pass``. Wrapper fino sobre la función del registry ``pass_get_secret_py_infra`` (grupo ``infra``/``flow-replay``), que ejecuta ``pass show `` sin shell y nunca logea el valor. Convierte su resultado ``{status, value|error}`` en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints DAV degraden con un error explicable en vez de un 500 silencioso. """ res = pass_get_secret(entry) if res.get("status") != "ok": raise RuntimeError( "no se pudo leer la entrada '%s' de pass: %s" % (entry, res.get("error", "error desconocido")) ) value = res.get("value", "") if not value.strip(): raise RuntimeError("la entrada '%s' de pass está vacía" % entry) return value.strip() # --------------------------------------------------------------------------- # Caché en disco de los datos DAV ya parseados (indexada por ctag) # --------------------------------------------------------------------------- def _read_disk_cache(path: str) -> Optional[dict]: """Lee una caché DAV del disco: ``{"ctag": str, "items": list}`` o None. Devuelve None (recargar de la red) ante cualquier problema: archivo inexistente, JSON corrupto, o estructura inesperada. Nunca lanza: la caché es un acelerador, no una fuente de verdad — si falla, se cae a la descarga. """ try: with open(path, "r", encoding="utf-8") as fh: data = json.load(fh) except (OSError, ValueError): return None if not isinstance(data, dict) or not isinstance(data.get("items"), list): return None return data def _write_disk_cache(path: str, ctag: str, items: list) -> None: """Escribe la caché DAV al disco de forma atómica (tmp + rename). Persiste ``{"ctag": ctag, "items": items, "saved_at": epoch}``. Errores de escritura se ignoran (no deben tumbar el endpoint): la caché en memoria sigue sirviendo y el disco se reintentará en el siguiente refresco. """ try: os.makedirs(os.path.dirname(path), exist_ok=True) tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as fh: json.dump( {"ctag": ctag, "items": items, "saved_at": time.time()}, fh, ensure_ascii=False, ) os.replace(tmp, path) except OSError: pass # --------------------------------------------------------------------------- # Estado del servidor: caché del vault + password Xandikos # --------------------------------------------------------------------------- class VaultState: """Caché en memoria del vault: grafo agregado + índice slug → nota. Se construye al arrancar y se reconstruye bajo demanda con ``refresh()`` (botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe mediante un lock sobre la reconstrucción. La password de Xandikos se lee de ``pass`` perezosamente y se cachea en memoria. Raises: FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar, nunca un 500 silencioso). NotADirectoryError: si ``vault_dir`` no es un directorio. """ def __init__(self, vault_dir: str): if not os.path.exists(vault_dir): raise FileNotFoundError(f"el vault no existe: {vault_dir}") if not os.path.isdir(vault_dir): raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}") self.vault_dir = os.path.abspath(vault_dir) self._vault_real = os.path.realpath(self.vault_dir) self._lock = threading.Lock() self.graph: dict = {"nodes": [], "edges": []} self.note_index: dict = {} # slug -> {"path", "tipo", "label"} self._xandikos_password: Optional[str] = None # Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al # primer acceso y se invalidan en POST /api/refresh. None = sin cargar. # Cada caché lleva su ctag para servir del disco sin red cuando la # colección no cambió. _force_reload (set por /api/refresh) salta la # validación de ctag en el siguiente acceso. self._dav_lock = threading.Lock() self._contacts_cache: Optional[list] = None self._calendar_cache: Optional[list] = None self._contacts_ctag: Optional[str] = None self._calendar_ctag: Optional[str] = None self._force_reload = False self.refresh() # --- vault -------------------------------------------------------------- def refresh(self) -> dict: """Re-escanea el vault: reconstruye grafo + índice de notas. Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend. """ with self._lock: graph = build_obsidian_graph(self.vault_dir, include_dangling=True) nodes_by_id = {n["id"]: n for n in graph["nodes"]} note_index: dict = {} for path in list_obsidian_notes(self.vault_dir): slug = os.path.splitext(os.path.basename(path))[0] if not slug or slug in note_index: continue node = nodes_by_id.get(slug, {}) note_index[slug] = { "path": path, "tipo": node.get("tipo", "nota"), "label": node.get("label", slug), } self.graph = graph self.note_index = note_index return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])} def graph_payload(self) -> dict: """Grafo + conteos por tipo para la leyenda de sigma.js.""" counts: dict[str, int] = {} for node in self.graph["nodes"]: counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1 return { "nodes": self.graph["nodes"], "edges": self.graph["edges"], "counts": counts, "total_nodes": len(self.graph["nodes"]), "total_edges": len(self.graph["edges"]), } def rows_by_tipo(self, tipo: str) -> list: """Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados. Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter`` completo — el frontend aplana las columnas que le interesen. Sin ``tipo`` devuelve todos los nodos reales. """ rows = [] for node in self.graph["nodes"]: if node.get("dangling"): continue if tipo and node["tipo"] != tipo: continue rows.append( { "id": node["id"], "label": node["label"], "tipo": node["tipo"], "frontmatter": node["frontmatter"], } ) return rows def _resolve_embed(self, embed_name: str) -> str: """Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault. El vault osint usa dos formas de embed: por nombre de archivo (``![[foto.jpg]]``) y por path relativo al vault (``![[attachments/personas//foto.png]]``). La función del registry ``resolve_obsidian_embed`` resuelve solo por basename, así que primero se intenta el embed como path literal relativo al vault (cubre la forma con ruta), y si no existe se cae al resolutor por basename del registry. Devuelve cadena vacía si ninguna forma resuelve. """ # Forma 1: el embed ya es un path relativo al vault. literal = os.path.realpath(os.path.join(self._vault_real, embed_name)) if self._is_within_vault(literal) and os.path.isfile(literal): return literal # Forma 2: resolución por basename via función del registry. return resolve_obsidian_embed(self.vault_dir, embed_name) def node_detail(self, slug: str): """Ficha completa de un nodo: frontmatter + body + attachments. Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``) y devueltos como paths **relativos al vault** (lo que consume ``/api/attachment``). Un embed que no resuelve se reporta con ``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no corresponde a ninguna nota del vault. """ info = self.note_index.get(slug) if info is None: # Tolerancia: aceptar también nombres sin slugificar. info = self.note_index.get(slugify_obsidian_name(slug)) if info is None: return None note = read_obsidian_note(info["path"]) attachments = [] for name in extract_obsidian_embeds(note["body"]): abs_path = self._resolve_embed(name) if not abs_path: attachments.append({"name": name, "path": "", "kind": "missing"}) continue real = os.path.realpath(abs_path) # Defensa en profundidad: solo attachments dentro del vault. if not self._is_within_vault(real): continue rel = os.path.relpath(real, self._vault_real) attachments.append( {"name": name, "path": rel, "kind": _attachment_kind(abs_path)} ) return { "id": os.path.splitext(os.path.basename(info["path"]))[0], "tipo": info["tipo"], "label": info["label"], "frontmatter": note["frontmatter"], "body": note["body"], "tags": note["tags"], "wikilinks": note["wikilinks"], "attachments": attachments, } def _is_within_vault(self, candidate_real_path: str) -> bool: """True si ``candidate_real_path`` (ya realpath) está dentro del vault. Añade el separador final al vault para que ``/vault-evil`` no cuele como prefijo de ``/vault``. """ return ( candidate_real_path == self._vault_real or candidate_real_path.startswith(self._vault_real + os.sep) ) def resolve_attachment_path(self, rel_path: str): """Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault. Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y sigue symlinks) y exige que el resultado quede estrictamente bajo la raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento de salir del vault, paths absolutos, o archivos inexistentes. """ if not rel_path: return None candidate = os.path.realpath(os.path.join(self._vault_real, rel_path)) if candidate == self._vault_real: return None if not candidate.startswith(self._vault_real + os.sep): return None if not os.path.isfile(candidate): return None return candidate def search(self, query: str) -> list: """Búsqueda global: nodos cuyas notas matchean la query (substring). Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug, label, tipo) + las líneas que matchean. """ results = [] for hit in search_obsidian_notes(self.vault_dir, query): slug = os.path.splitext(os.path.basename(hit["path"]))[0] info = self.note_index.get(slug, {}) results.append( { "id": slug, "label": info.get("label", slug), "tipo": info.get("tipo", "nota"), "matches": hit.get("matches", []), } ) return results # --- Xandikos ----------------------------------------------------------- def xandikos_password(self) -> str: """Password de Xandikos desde ``pass``, cacheada en memoria.""" with self._lock: if self._xandikos_password is None: self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY) return self._xandikos_password def _collection_ctag(self, collection_path: str, password: str) -> Optional[str]: """Lee el ctag de una colección (PROPFIND barato), o None si no se puede. El ctag es el token de versión de la colección: si no cambió, la caché en disco sigue vigente. Devolver None significa "no pude validar" → se recarga de la red por seguridad (nunca se sirve caché potencialmente obsoleta sin confirmación). No lanza. """ res = dav_collection_ctag( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path ) return res.get("ctag") if res.get("status") == "ok" else None def _load_collection( self, collection_path: str, content_type: str, cache_file: str, parse_items, ) -> tuple: """Carga una colección DAV con caché en disco validada por ctag. Flujo (1 o 2 peticiones, nunca N): 1. Lee el ctag de la colección (PROPFIND Depth:0, ~0.1s). 2. Si el ctag coincide con el de la caché en disco (y no hay refresh forzado), parsea los items del disco y devuelve sin descargar. 3. Si no, hace UN REPORT ``dav_get_collection`` que trae todos los recursos con su contenido inline, los parsea con ``parse_items`` y reescribe la caché en disco con el nuevo ctag. ``parse_items(resources) -> list`` transforma la lista ``[{href, etag, data}]`` del registry en la lista de objetos JSON que sirve el endpoint (contactos o eventos). Returns: tuple ``(items, ctag)``. Raises: DavUnavailable: si Xandikos no responde al REPORT cuando hay que descargar (sin red, timeout, auth). """ password = self.xandikos_password() ctag = self._collection_ctag(collection_path, password) # Caché en disco vigente: mismo ctag y sin refresh forzado → sin red. if ctag is not None and not self._force_reload: disk = _read_disk_cache(cache_file) if disk is not None and disk.get("ctag") == ctag: return parse_items(disk["items"]), ctag # Hay que (re)descargar: UN REPORT trae todo con el contenido inline. got = dav_get_collection( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path, content_type, ) if got.get("status") != "ok": raise DavUnavailable("Xandikos no responde: %s" % got.get("error")) resources = got.get("resources", []) items = parse_items(resources) # Persistir en disco para el arranque instantáneo de la próxima vez. Si # no obtuvimos ctag, guardamos cadena vacía: nunca matcheará un ctag real, # así que la próxima vez se revalidará (cae con elegancia a "siempre # recargar" sin romper). _write_disk_cache(cache_file, ctag or "", items) return items, (ctag or "") @staticmethod def _parse_contacts(items: list) -> list: """Parsea ``[{href, etag, data}]`` (o items cacheados) a contactos JSON. Acepta dos formas de ``items``: la lista de recursos del registry (cada uno con ``data`` = texto vCard, posible multi-tarjeta) que hay que parsear, o la lista de contactos ya parseados (caché en disco), que se devuelve tal cual. Se distinguen por la presencia de la clave ``data``. """ if items and "data" in items[0]: contacts: list = [] for res in items: href = res.get("href") for card_text in split_vcards(res.get("data", "")): card = _vcard_to_json(card_text) card["etag"] = res.get("etag") card["href"] = href contacts.append(card) contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower()) return contacts return list(items) @staticmethod def _parse_events(items: list) -> list: """Parsea ``[{href, etag, data}]`` (o items cacheados) a eventos JSON. Igual criterio que ``_parse_contacts``: si los items llevan ``data`` son recursos del registry (texto VCALENDAR) y se parsean; si no, ya son eventos cacheados y se devuelven tal cual. """ if items and "data" in items[0]: events: list = [] for res in items: href = res.get("href") for event in _vcalendar_to_events(res.get("data", "")): event["etag"] = res.get("etag") event["href"] = href events.append(event) events.sort(key=lambda e: e.get("dtstart") or "") return events return list(items) def contacts(self) -> list: """Contactos del addressbook Xandikos, parseados y cacheados. Caché en dos niveles: memoria (mientras vive el proceso) y disco (``.cache/contacts.json``, validada por ctag para arranque instantáneo). Al primer acceso descarga TODO en UNA petición REPORT (``dav_get_collection``) en vez de un GET por ``.vcf``. Raises: RuntimeError: si no se puede leer la password de ``pass``. DavUnavailable: si Xandikos no responde (sin red, timeout, auth). """ with self._dav_lock: if self._contacts_cache is not None and not self._force_reload: return self._contacts_cache contacts, ctag = self._load_collection( XANDIKOS_CONTACTS_COLLECTION, "vcard", _CONTACTS_CACHE_FILE, self._parse_contacts, ) self._contacts_cache = contacts self._contacts_ctag = ctag self._maybe_clear_force_reload() return contacts def calendar(self, dt_from: str = "", dt_to: str = "") -> list: """Eventos del calendario Xandikos, cacheados; filtrados por rango. Misma caché en dos niveles que ``contacts``. La descarga + parseo completos se cachean (UNA petición REPORT); el filtro por ``[from, to]`` se aplica sobre la caché en cada llamada (barato). Sin ``from``/``to`` devuelve todos. Raises: RuntimeError: si no se puede leer la password de ``pass``. DavUnavailable: si Xandikos no responde (sin red, timeout, auth). """ with self._dav_lock: if self._calendar_cache is None or self._force_reload: events, ctag = self._load_collection( XANDIKOS_CALENDAR_COLLECTION, "ical", _CALENDAR_CACHE_FILE, self._parse_events, ) self._calendar_cache = events self._calendar_ctag = ctag self._maybe_clear_force_reload() all_events = self._calendar_cache if not dt_from and not dt_to: return list(all_events) return [e for e in all_events if _event_in_range(e, dt_from, dt_to)] def _maybe_clear_force_reload(self) -> None: """Apaga el flag de refresh forzado una vez consumido por una recarga. Llamado bajo ``_dav_lock`` tras recargar una colección. El flag lo activa ``invalidate_dav`` (POST /api/refresh) para forzar UNA recarga que ignore el ctag; tras ella vuelve a la validación normal por ctag. """ self._force_reload = False def invalidate_dav(self) -> None: """Vacía las cachés de contactos y calendario y fuerza una recarga. Limpia las cachés en memoria y marca ``_force_reload`` para que el siguiente acceso a cada colección ignore el ctag cacheado y vuelva a descargar del servidor (el botón "refrescar" debe traer cambios aunque el ctag no se haya actualizado todavía). No borra la password. """ with self._dav_lock: self._contacts_cache = None self._calendar_cache = None self._force_reload = True # --------------------------------------------------------------------------- # Helpers DAV: parseo ligero de vCard / iCalendar a JSON # --------------------------------------------------------------------------- def _unescape_ical(value: str) -> str: """Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350).""" return ( value.replace("\\n", "\n") .replace("\\N", "\n") .replace("\\,", ",") .replace("\\;", ";") .replace("\\\\", "\\") ) def _unfold_lines(text: str) -> list: """Des-pliega las líneas continuadas (folding) de un vCard/iCalendar. RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la anterior. Esta función las une para parsearlas como propiedades completas. """ raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n") unfolded: list = [] for line in raw_lines: if line[:1] in (" ", "\t") and unfolded: unfolded[-1] += line[1:] else: unfolded.append(line) return unfolded def _parse_property(line: str) -> Optional[tuple]: """Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``. Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si la línea no es una propiedad (sin ``:``). El nombre se devuelve en mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``); los params como dict con claves en mayúsculas. """ if ":" not in line: return None head, value = line.split(":", 1) parts = head.split(";") name = parts[0].strip() # Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping). if "." in name: name = name.rsplit(".", 1)[-1] name = name.upper() params: dict = {} for part in parts[1:]: if "=" in part: k, v = part.split("=", 1) params[k.strip().upper()] = v.strip() return name, params, value def _vcard_to_json(vcard_text: str) -> dict: """Convierte un VCARD a un dict JSON con los campos de interés. Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME), teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque ``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` → ``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard ya viene troceado por ``split_vcards``. Expone tanto las claves en español que consume el frontend del task (``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el frontend a un único shape. """ out: dict = { "uid": None, "fn": None, "nickname": None, "org": None, "note": None, "phones": [], "emails": [], "osint": {}, } for line in _unfold_lines(vcard_text): parsed = _parse_property(line) if not parsed: continue name, params, value = parsed value = _unescape_ical(value.strip()) if name == "UID": out["uid"] = value elif name == "FN": out["fn"] = value elif name == "NICKNAME": out["nickname"] = value elif name == "ORG": out["org"] = value.replace(";", " ").strip() elif name == "NOTE": out["note"] = value elif name == "TEL": out["phones"].append({"value": value, "type": params.get("TYPE", "")}) elif name == "EMAIL": out["emails"].append({"value": value, "type": params.get("TYPE", "")}) elif name.startswith("X-OSINT-"): key = name[len("X-OSINT-") :].lower().replace("-", "_") if key: out["osint"][key] = value elif name == "N" and not out["fn"]: # Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido". comps = [c for c in value.split(";") if c] if len(comps) >= 2: out["fn"] = ("%s %s" % (comps[1], comps[0])).strip() elif comps: out["fn"] = comps[0] # Alias en español que consume el frontend del task (mismo dato, otra clave). out["nombre"] = out["fn"] out["alias"] = out["nickname"] out["nota"] = out["note"] out["telefonos"] = [p["value"] for p in out["phones"]] out["correos"] = [e["value"] for e in out["emails"]] return out _VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE) def _vevent_to_json(vevent_block: str) -> dict: """Convierte un bloque VEVENT a un dict JSON con los campos de interés. Extrae: uid, summary, dtstart, dtend, location, description. Las fechas se devuelven tal cual vienen del servidor (formato iCal, ej. ``20260611T090000Z`` o ``20260611``); el frontend las formatea a europeo. Parseo ligero a mano. """ out: dict = { "uid": None, "summary": None, "dtstart": None, "dtend": None, "location": None, "description": None, } for line in _unfold_lines(vevent_block): parsed = _parse_property(line) if not parsed: continue name, _params, value = parsed value = value.strip() if name == "UID": out["uid"] = value elif name == "SUMMARY": out["summary"] = _unescape_ical(value) elif name == "DTSTART": out["dtstart"] = value elif name == "DTEND": out["dtend"] = value elif name == "LOCATION": out["location"] = _unescape_ical(value) elif name == "DESCRIPTION": out["description"] = _unescape_ical(value) return out def _vcalendar_to_events(vcalendar_text: str) -> list: """Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON.""" events = [] for block in _VEVENT_RE.findall(vcalendar_text): events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT")) return events def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool: """True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``. Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` que comparten todos los formatos iCal (date y date-time). Los límites se normalizan quitando los guiones, así acepta tanto el formato documentado del endpoint (``2026-06-11``) como el iCal crudo (``20260611``). ``dt_from``/``dt_to`` vacíos desactivan ese extremo del filtro. """ dtstart = (event.get("dtstart") or "").replace("-", "")[:8] if not dtstart: return True if dt_from and dtstart < dt_from.replace("-", "")[:8]: return False if dt_to and dtstart > dt_to.replace("-", "")[:8]: return False return True # --------------------------------------------------------------------------- # Construcción de la app FastAPI # --------------------------------------------------------------------------- def create_app(vault_dir: str) -> FastAPI: """Crea la app FastAPI ligada a un vault concreto. Valida que el vault existe al construir el ``VaultState`` (no un 500 silencioso en el primer request). Registra todos los endpoints sobre un estado compartido en memoria. """ state = VaultState(vault_dir) app = FastAPI(title="osint_web", version="0.1.0") app.state.vault = state # -- Vault -- @app.get("/api/health") def health() -> dict: """Health check: confirma que el servidor está vivo y el vault cargado.""" return { "status": "ok", "vault": state.vault_dir, "nodes": len(state.graph["nodes"]), "edges": len(state.graph["edges"]), } @app.get("/api/graph") def api_graph() -> dict: """Grafo completo del vault (nodos + aristas + conteos) para sigma.js. Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar notas. """ return state.graph_payload() @app.get("/api/nodes") def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict: """Filas de la tabla de un tipo concreto (frontmatter aplanado). Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve todos. """ rows = state.rows_by_tipo(tipo) return {"tipo": tipo, "count": len(rows), "rows": rows} @app.get("/api/node/{slug}") def api_node(slug: str) -> dict: """Ficha de un nodo: frontmatter + body + lista de attachments.""" detail = state.node_detail(slug) if detail is None: raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug) return detail @app.get("/api/attachment") def api_attachment( path: str = Query(..., description="path relativo al vault"), ) -> FileResponse: """Sirve el binario de un attachment, con allowlist ESTRICTA al vault. El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica que cae dentro del vault: cualquier intento de salir (``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el archivo no existe (pero está dentro del vault), 404. """ abs_path = state.resolve_attachment_path(path) if abs_path is None: # Distinguimos traversal (403) de inexistente-dentro-del-vault (404). candidate = os.path.realpath(os.path.join(state._vault_real, path or "")) if state._is_within_vault(candidate) and candidate != state._vault_real: raise HTTPException(status_code=404, detail="attachment no encontrado") raise HTTPException(status_code=403, detail="path fuera del vault") return FileResponse(abs_path) @app.get("/api/search") def api_search(q: str = Query(..., min_length=1)) -> dict: """Nodos del grafo cuyas notas matchean la query (substring).""" results = state.search(q) return {"query": q, "count": len(results), "results": results} # -- Xandikos: contactos (CardDAV) -- @app.get("/api/contacts") def api_contacts() -> JSONResponse: """Contactos del addressbook Xandikos, parseados a JSON (cacheados). Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[], osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``). La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la invalida). Si Xandikos no responde o falta la password → 503 con un JSON de error claro, nunca un crash. """ try: contacts = state.contacts() except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={"status": "ok", "count": len(contacts), "contacts": contacts} ) @app.get("/api/contact/{uid}") def api_contact(uid: str) -> JSONResponse: """Un contacto concreto (por UID) parseado a JSON, desde la caché. Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no responde o falta la password. """ try: contacts = state.contacts() except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) match = next((c for c in contacts if c.get("uid") == uid), None) if match is None: # Tolerancia: aceptar también el segmento final del href (nombre del # recurso .vcf) cuando el UID no coincide literalmente. match = next( ( c for c in contacts if uid in (c.get("href") or "").rsplit("/", 1)[-1] ), None, ) if match is None: raise HTTPException( status_code=404, detail="contacto '%s' no encontrado" % uid ) return JSONResponse(content={"status": "ok", "contact": match}) # -- Xandikos: calendario (CalDAV) -- @app.get("/api/calendar") def api_calendar( from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"), to: str = Query("", description="fecha fin YYYY-MM-DD"), ) -> JSONResponse: """Eventos del calendario Xandikos en ``[from, to]`` (cacheados). Cada evento: ``{uid, summary, dtstart, dtend, location, description}``. La descarga + parseo completos se cachean (``POST /api/refresh`` los invalida); el filtro por rango se aplica sobre la caché. Sin ``from``/ ``to`` devuelve todos. Si Xandikos no responde o falta la password → 503 con JSON de error claro, nunca un crash. """ try: events = state.calendar(from_, to) except (RuntimeError, DavUnavailable) as exc: return JSONResponse( status_code=503, content={"status": "error", "error": str(exc)} ) return JSONResponse( content={"status": "ok", "count": len(events), "events": events} ) # -- Refresco de cachés -- @app.post("/api/refresh") def api_refresh() -> dict: """Reconstruye la caché del grafo del vault e invalida las cachés DAV. Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y calendario, que se recargarán perezosamente en el siguiente acceso. Devuelve el conteo del grafo recién reconstruido. """ summary = state.refresh() state.invalidate_dav() return {"status": "refreshed", **summary} return app # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def _parse_args(argv: Optional[list] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos." ) parser.add_argument( "--vault", default=os.path.expanduser("~/Obsidian/osint"), help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)", ) parser.add_argument( "--host", default="127.0.0.1", help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)", ) parser.add_argument( "--port", type=int, default=8470, help="puerto local (default: 8470)" ) return parser.parse_args(argv) def main(argv: Optional[list] = None) -> int: args = _parse_args(argv) if args.host != "127.0.0.1": # Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red. print( "ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos " "personales sensibles; exponerlo a la red es un riesgo.", file=sys.stderr, ) try: app = create_app(args.vault) except (FileNotFoundError, NotADirectoryError) as exc: print(f"error: {exc}", file=sys.stderr) return 2 import uvicorn state = app.state.vault print( f"osint_web backend en http://{args.host}:{args.port} — vault: " f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, " f"{len(state.graph['edges'])} aristas)" ) uvicorn.run(app, host=args.host, port=args.port, log_level="info") return 0 if __name__ == "__main__": sys.exit(main())