Files
osint_web/server/main.py
T
agent 59558d43cb perf: descarga DAV concurrente + caché de contactos/calendario
Las colecciones Xandikos son grandes (1064 contactos, 98 eventos). Descargar
los .vcf/.ics secuencialmente tardaba ~2 min para los contactos (timeout). Se
añade _fetch_resources con un ThreadPoolExecutor acotado (16 workers): primera
carga de /api/contacts baja a ~9s, segunda (cacheada) a ~10ms. La descarga
sigue delegada a dav_get_resource del registry (stdlib, thread-safe); solo se
paraleliza la orquestación.

Incluye caché en memoria de contactos y calendario (invalidada por
/api/refresh), DavUnavailable para degradación clara sin red, y campos
aliaseados en español (nombre/alias/telefonos/correos/osint) para el frontend.

Verificado contra el vault real (1199 nodos) y Xandikos real (1064 contactos,
98 eventos). 19 tests verdes.
2026-06-11 22:54:54 +02:00

953 lines
38 KiB
Python

#!/usr/bin/env python3
"""Backend FastAPI de la app osint_web.
Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres
fuentes de datos para un frontend web local:
1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo,
fichas con galería de attachments y búsqueda global).
2. La agenda CardDAV del servidor Xandikos (contactos).
3. El calendario CalDAV del servidor Xandikos (eventos).
Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de
embeds ni protocolo DAV. Orquesta funciones del registry de los grupos
``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret``
para la credencial de Xandikos. La única lógica propia de la app es: cacheo en
memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y
el parseo ligero de vCard/iCalendar a JSON.
Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que
el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint
``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado
cae dentro del vault; cualquier intento de path traversal devuelve 403.
Uso:
python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470
Endpoints (JSON salvo /api/attachment):
GET /api/health estado + tamaño del grafo cacheado
GET /api/graph grafo completo {nodes, edges} para sigma.js
GET /api/nodes?tipo=persona filas de la tabla de ese tipo
GET /api/node/<slug> ficha: frontmatter + body + attachments
GET /api/attachment?path=.. binario del attachment (path relativo al vault)
GET /api/search?q=... nodos cuyo contenido matchea la query
GET /api/contacts contactos del addressbook Xandikos (CardDAV)
GET /api/contact/<uid> un vCard concreto a JSON
GET /api/calendar?from=&to= eventos del calendario Xandikos (CalDAV)
POST /api/refresh re-escanea el vault y reconstruye la caché
"""
from __future__ import annotations
import argparse
import importlib.util
import os
import re
import sys
import threading
from concurrent.futures import ThreadPoolExecutor
from typing import Optional
# Nº de descargas DAV concurrentes al traer una colección completa (addressbook
# con ~1000 vCards). Secuencial son ~0.11s/recurso (~2 min para 1064); con un
# pool acotado baja a ~10s. Acotado para no saturar al servidor Xandikos.
_DAV_FETCH_WORKERS = 16
def _registry_functions_dir() -> str:
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y
``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo
hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por
último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el
backend funciona en cualquier PC con el layout estándar del registry (la app
vive en ``<root>/projects/osint/apps/osint_web/server/``) sin hardcodear el
home de un usuario concreto (memoria ``hardcoded-lucas-paths``).
"""
env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS")
if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")):
return env_functions
candidates: list[str] = []
env_root = os.environ.get("FN_REGISTRY_ROOT")
if env_root:
candidates.append(env_root)
current = os.path.dirname(os.path.abspath(__file__))
while True:
candidates.append(current)
parent = os.path.dirname(current)
if parent == current:
break
current = parent
candidates.append("/home/enmanuel/fn_registry")
for root in candidates:
functions_dir = os.path.join(root, "python", "functions")
if os.path.isdir(os.path.join(functions_dir, "obsidian")):
return functions_dir
raise RuntimeError(
"no se encontró python/functions/obsidian subiendo desde "
f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz "
"del fn_registry"
)
_FUNCTIONS_DIR = _registry_functions_dir()
sys.path.insert(0, _FUNCTIONS_DIR)
from fastapi import FastAPI, HTTPException, Query # noqa: E402
from fastapi.responses import FileResponse, JSONResponse # noqa: E402
# --- Grupo de capacidad obsidian (parseo del vault) ---
# El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así
# que se importa directamente.
from obsidian import ( # noqa: E402 (sys.path debe resolverse antes)
build_obsidian_graph,
extract_obsidian_embeds,
list_obsidian_notes,
read_obsidian_note,
resolve_obsidian_embed,
search_obsidian_notes,
slugify_obsidian_name,
)
def _load_infra_fn(module_name: str, attr: str):
"""Carga una función del paquete ``infra`` por archivo, sin tocar su __init__.
El ``infra/__init__.py`` del registry importa de forma eager
``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que
esta app no usa. Importar ``from infra.dav_list_resources import ...``
arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se
carga cada módulo concreto del grupo ``dav`` directamente por su ruta de
archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo
registry-first: se usa la función del registry sin reimplementarla, solo se
importa de forma quirúrgica.
"""
file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py")
spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path)
if spec is None or spec.loader is None: # pragma: no cover - defensivo
raise ImportError("no se pudo cargar %s" % file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return getattr(module, attr)
# --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass ---
dav_list_resources = _load_infra_fn("dav_list_resources", "dav_list_resources")
dav_get_resource = _load_infra_fn("dav_get_resource", "dav_get_resource")
split_vcards = _load_infra_fn("split_vcards", "split_vcards")
pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret")
# ---------------------------------------------------------------------------
# Configuración Xandikos (CardDAV / CalDAV)
# ---------------------------------------------------------------------------
XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
XANDIKOS_USERNAME = "enmanuel"
XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel"
XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/"
XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/"
# Extensiones de imagen que el frontend muestra en la galería con lightbox.
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
class DavUnavailable(Exception):
"""Xandikos no responde (sin red, timeout, auth caída).
Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un
fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los
endpoints del vault, que deben seguir funcionando offline.
"""
def _attachment_kind(name: str) -> str:
"""Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``."""
ext = os.path.splitext(name)[1].lower()
if ext in _IMAGE_EXTS:
return "image"
if ext == ".pdf":
return "pdf"
return "other"
def _read_pass_secret(entry: str) -> str:
"""Lee la contraseña (primera línea) de una entrada de ``pass``.
Wrapper fino sobre la función del registry ``pass_get_secret_py_infra``
(grupo ``infra``/``flow-replay``), que ejecuta ``pass show <entry>`` sin
shell y nunca logea el valor. Convierte su resultado ``{status, value|error}``
en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints
DAV degraden con un error explicable en vez de un 500 silencioso.
"""
res = pass_get_secret(entry)
if res.get("status") != "ok":
raise RuntimeError(
"no se pudo leer la entrada '%s' de pass: %s"
% (entry, res.get("error", "error desconocido"))
)
value = res.get("value", "")
if not value.strip():
raise RuntimeError("la entrada '%s' de pass está vacía" % entry)
return value.strip()
# ---------------------------------------------------------------------------
# Estado del servidor: caché del vault + password Xandikos
# ---------------------------------------------------------------------------
class VaultState:
"""Caché en memoria del vault: grafo agregado + índice slug → nota.
Se construye al arrancar y se reconstruye bajo demanda con ``refresh()``
(botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe
mediante un lock sobre la reconstrucción. La password de Xandikos se lee de
``pass`` perezosamente y se cachea en memoria.
Raises:
FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar,
nunca un 500 silencioso).
NotADirectoryError: si ``vault_dir`` no es un directorio.
"""
def __init__(self, vault_dir: str):
if not os.path.exists(vault_dir):
raise FileNotFoundError(f"el vault no existe: {vault_dir}")
if not os.path.isdir(vault_dir):
raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}")
self.vault_dir = os.path.abspath(vault_dir)
self._vault_real = os.path.realpath(self.vault_dir)
self._lock = threading.Lock()
self.graph: dict = {"nodes": [], "edges": []}
self.note_index: dict = {} # slug -> {"path", "tipo", "label"}
self._xandikos_password: Optional[str] = None
# Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al
# primer acceso y se invalidan en POST /api/refresh. None = sin cargar.
self._dav_lock = threading.Lock()
self._contacts_cache: Optional[list] = None
self._calendar_cache: Optional[list] = None
self.refresh()
# --- vault --------------------------------------------------------------
def refresh(self) -> dict:
"""Re-escanea el vault: reconstruye grafo + índice de notas.
Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend.
"""
with self._lock:
graph = build_obsidian_graph(self.vault_dir, include_dangling=True)
nodes_by_id = {n["id"]: n for n in graph["nodes"]}
note_index: dict = {}
for path in list_obsidian_notes(self.vault_dir):
slug = os.path.splitext(os.path.basename(path))[0]
if not slug or slug in note_index:
continue
node = nodes_by_id.get(slug, {})
note_index[slug] = {
"path": path,
"tipo": node.get("tipo", "nota"),
"label": node.get("label", slug),
}
self.graph = graph
self.note_index = note_index
return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])}
def graph_payload(self) -> dict:
"""Grafo + conteos por tipo para la leyenda de sigma.js."""
counts: dict[str, int] = {}
for node in self.graph["nodes"]:
counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1
return {
"nodes": self.graph["nodes"],
"edges": self.graph["edges"],
"counts": counts,
"total_nodes": len(self.graph["nodes"]),
"total_edges": len(self.graph["edges"]),
}
def rows_by_tipo(self, tipo: str) -> list:
"""Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados.
Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter``
completo — el frontend aplana las columnas que le interesen. Sin
``tipo`` devuelve todos los nodos reales.
"""
rows = []
for node in self.graph["nodes"]:
if node.get("dangling"):
continue
if tipo and node["tipo"] != tipo:
continue
rows.append(
{
"id": node["id"],
"label": node["label"],
"tipo": node["tipo"],
"frontmatter": node["frontmatter"],
}
)
return rows
def _resolve_embed(self, embed_name: str) -> str:
"""Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault.
El vault osint usa dos formas de embed: por nombre de archivo
(``![[foto.jpg]]``) y por path relativo al vault
(``![[attachments/personas/<slug>/foto.png]]``). La función del registry
``resolve_obsidian_embed`` resuelve solo por basename, así que primero se
intenta el embed como path literal relativo al vault (cubre la forma con
ruta), y si no existe se cae al resolutor por basename del registry.
Devuelve cadena vacía si ninguna forma resuelve.
"""
# Forma 1: el embed ya es un path relativo al vault.
literal = os.path.realpath(os.path.join(self._vault_real, embed_name))
if self._is_within_vault(literal) and os.path.isfile(literal):
return literal
# Forma 2: resolución por basename via función del registry.
return resolve_obsidian_embed(self.vault_dir, embed_name)
def node_detail(self, slug: str):
"""Ficha completa de un nodo: frontmatter + body + attachments.
Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a
paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``)
y devueltos como paths **relativos al vault** (lo que consume
``/api/attachment``). Un embed que no resuelve se reporta con
``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no
corresponde a ninguna nota del vault.
"""
info = self.note_index.get(slug)
if info is None:
# Tolerancia: aceptar también nombres sin slugificar.
info = self.note_index.get(slugify_obsidian_name(slug))
if info is None:
return None
note = read_obsidian_note(info["path"])
attachments = []
for name in extract_obsidian_embeds(note["body"]):
abs_path = self._resolve_embed(name)
if not abs_path:
attachments.append({"name": name, "path": "", "kind": "missing"})
continue
real = os.path.realpath(abs_path)
# Defensa en profundidad: solo attachments dentro del vault.
if not self._is_within_vault(real):
continue
rel = os.path.relpath(real, self._vault_real)
attachments.append(
{"name": name, "path": rel, "kind": _attachment_kind(abs_path)}
)
return {
"id": os.path.splitext(os.path.basename(info["path"]))[0],
"tipo": info["tipo"],
"label": info["label"],
"frontmatter": note["frontmatter"],
"body": note["body"],
"tags": note["tags"],
"wikilinks": note["wikilinks"],
"attachments": attachments,
}
def _is_within_vault(self, candidate_real_path: str) -> bool:
"""True si ``candidate_real_path`` (ya realpath) está dentro del vault.
Añade el separador final al vault para que ``/vault-evil`` no cuele como
prefijo de ``/vault``.
"""
return (
candidate_real_path == self._vault_real
or candidate_real_path.startswith(self._vault_real + os.sep)
)
def resolve_attachment_path(self, rel_path: str):
"""Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault.
Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y
sigue symlinks) y exige que el resultado quede estrictamente bajo la
raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento
de salir del vault, paths absolutos, o archivos inexistentes.
"""
if not rel_path:
return None
candidate = os.path.realpath(os.path.join(self._vault_real, rel_path))
if candidate == self._vault_real:
return None
if not candidate.startswith(self._vault_real + os.sep):
return None
if not os.path.isfile(candidate):
return None
return candidate
def search(self, query: str) -> list:
"""Búsqueda global: nodos cuyas notas matchean la query (substring).
Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug,
label, tipo) + las líneas que matchean.
"""
results = []
for hit in search_obsidian_notes(self.vault_dir, query):
slug = os.path.splitext(os.path.basename(hit["path"]))[0]
info = self.note_index.get(slug, {})
results.append(
{
"id": slug,
"label": info.get("label", slug),
"tipo": info.get("tipo", "nota"),
"matches": hit.get("matches", []),
}
)
return results
# --- Xandikos -----------------------------------------------------------
def xandikos_password(self) -> str:
"""Password de Xandikos desde ``pass``, cacheada en memoria."""
with self._lock:
if self._xandikos_password is None:
self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY)
return self._xandikos_password
def contacts(self) -> list:
"""Contactos del addressbook Xandikos, parseados y cacheados en memoria.
Llena la caché al primer acceso (descarga + parseo de todos los
``.vcf``); accesos posteriores la reutilizan hasta ``invalidate_dav()``.
Raises:
RuntimeError: si no se puede leer la password de ``pass``.
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._contacts_cache is not None:
return self._contacts_cache
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
XANDIKOS_CONTACTS_COLLECTION,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
contacts: list = []
# Descarga concurrente de los .vcf: secuencial son ~0.11s/recurso
# (~2 min para 1064 contactos); con el pool acotado baja a ~10s.
for res, got in self._fetch_resources(
listing.get("resources", []), ".vcf", password
):
href = res.get("href")
for card_text in split_vcards(got.get("text", "")):
card = _vcard_to_json(card_text)
card["etag"] = res.get("etag")
card["href"] = href
contacts.append(card)
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
self._contacts_cache = contacts
return contacts
def calendar(self, dt_from: str = "", dt_to: str = "") -> list:
"""Eventos del calendario Xandikos, cacheados; filtrados por rango.
La descarga + parseo completos se cachean; el filtro por ``[from, to]``
se aplica sobre la caché en cada llamada (barato). Sin ``from``/``to``
devuelve todos.
Raises:
RuntimeError: si no se puede leer la password de ``pass``.
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
"""
with self._dav_lock:
if self._calendar_cache is None:
password = self.xandikos_password()
listing = dav_list_resources(
XANDIKOS_BASE_URL,
XANDIKOS_USERNAME,
password,
XANDIKOS_CALENDAR_COLLECTION,
)
if listing.get("status") != "ok":
raise DavUnavailable(
"Xandikos no responde: %s" % listing.get("error")
)
events: list = []
for res, got in self._fetch_resources(
listing.get("resources", []), ".ics", password
):
href = res.get("href")
for event in _vcalendar_to_events(got.get("text", "")):
event["etag"] = res.get("etag")
event["href"] = href
events.append(event)
events.sort(key=lambda e: e.get("dtstart") or "")
self._calendar_cache = events
all_events = self._calendar_cache
if not dt_from and not dt_to:
return list(all_events)
return [e for e in all_events if _event_in_range(e, dt_from, dt_to)]
def _fetch_resources(self, resources: list, suffix: str, password: str) -> list:
"""Descarga en paralelo los recursos DAV con la extensión ``suffix``.
Filtra los recursos por extensión (``.vcf`` / ``.ics``), los descarga con
``dav_get_resource`` (función del registry) usando un pool acotado de
hilos (``_DAV_FETCH_WORKERS``) y devuelve la lista de pares
``(res, got)`` de los que respondieron ``status == "ok"``, preservando el
orden del listing. La paralelización es solo de la orquestación: la
descarga sigue delegada a la función del registry, que es stdlib y
thread-safe (abre su propia conexión por request). Acotar el pool evita
saturar al servidor Xandikos.
"""
targets = [
res
for res in resources
if res.get("href") and res["href"].lower().endswith(suffix)
]
if not targets:
return []
def _get(res):
got = dav_get_resource(
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, res["href"]
)
return res, got
workers = min(_DAV_FETCH_WORKERS, len(targets))
with ThreadPoolExecutor(max_workers=workers) as pool:
# pool.map preserva el orden de entrada.
results = list(pool.map(_get, targets))
return [(res, got) for res, got in results if got.get("status") == "ok"]
def invalidate_dav(self) -> None:
"""Vacía las cachés de contactos y calendario (no la password)."""
with self._dav_lock:
self._contacts_cache = None
self._calendar_cache = None
# ---------------------------------------------------------------------------
# Helpers DAV: parseo ligero de vCard / iCalendar a JSON
# ---------------------------------------------------------------------------
def _unescape_ical(value: str) -> str:
"""Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350)."""
return (
value.replace("\\n", "\n")
.replace("\\N", "\n")
.replace("\\,", ",")
.replace("\\;", ";")
.replace("\\\\", "\\")
)
def _unfold_lines(text: str) -> list:
"""Des-pliega las líneas continuadas (folding) de un vCard/iCalendar.
RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la
anterior. Esta función las une para parsearlas como propiedades completas.
"""
raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
unfolded: list = []
for line in raw_lines:
if line[:1] in (" ", "\t") and unfolded:
unfolded[-1] += line[1:]
else:
unfolded.append(line)
return unfolded
def _parse_property(line: str) -> Optional[tuple]:
"""Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``.
Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si
la línea no es una propiedad (sin ``:``). El nombre se devuelve en
mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden
Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``);
los params como dict con claves en mayúsculas.
"""
if ":" not in line:
return None
head, value = line.split(":", 1)
parts = head.split(";")
name = parts[0].strip()
# Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping).
if "." in name:
name = name.rsplit(".", 1)[-1]
name = name.upper()
params: dict = {}
for part in parts[1:]:
if "=" in part:
k, v = part.split("=", 1)
params[k.strip().upper()] = v.strip()
return name, params, value
def _vcard_to_json(vcard_text: str) -> dict:
"""Convierte un VCARD a un dict JSON con los campos de interés.
Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME),
teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque
``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en
minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` →
``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard
ya viene troceado por ``split_vcards``.
Expone tanto las claves en español que consume el frontend del task
(``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con
tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el
frontend a un único shape.
"""
out: dict = {
"uid": None,
"fn": None,
"nickname": None,
"org": None,
"note": None,
"phones": [],
"emails": [],
"osint": {},
}
for line in _unfold_lines(vcard_text):
parsed = _parse_property(line)
if not parsed:
continue
name, params, value = parsed
value = _unescape_ical(value.strip())
if name == "UID":
out["uid"] = value
elif name == "FN":
out["fn"] = value
elif name == "NICKNAME":
out["nickname"] = value
elif name == "ORG":
out["org"] = value.replace(";", " ").strip()
elif name == "NOTE":
out["note"] = value
elif name == "TEL":
out["phones"].append({"value": value, "type": params.get("TYPE", "")})
elif name == "EMAIL":
out["emails"].append({"value": value, "type": params.get("TYPE", "")})
elif name.startswith("X-OSINT-"):
key = name[len("X-OSINT-") :].lower().replace("-", "_")
if key:
out["osint"][key] = value
elif name == "N" and not out["fn"]:
# Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido".
comps = [c for c in value.split(";") if c]
if len(comps) >= 2:
out["fn"] = ("%s %s" % (comps[1], comps[0])).strip()
elif comps:
out["fn"] = comps[0]
# Alias en español que consume el frontend del task (mismo dato, otra clave).
out["nombre"] = out["fn"]
out["alias"] = out["nickname"]
out["nota"] = out["note"]
out["telefonos"] = [p["value"] for p in out["phones"]]
out["correos"] = [e["value"] for e in out["emails"]]
return out
_VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE)
def _vevent_to_json(vevent_block: str) -> dict:
"""Convierte un bloque VEVENT a un dict JSON con los campos de interés.
Extrae: uid, summary, dtstart, dtend, location, description. Las fechas se
devuelven tal cual vienen del servidor (formato iCal, ej. ``20260611T090000Z``
o ``20260611``); el frontend las formatea a europeo. Parseo ligero a mano.
"""
out: dict = {
"uid": None,
"summary": None,
"dtstart": None,
"dtend": None,
"location": None,
"description": None,
}
for line in _unfold_lines(vevent_block):
parsed = _parse_property(line)
if not parsed:
continue
name, _params, value = parsed
value = value.strip()
if name == "UID":
out["uid"] = value
elif name == "SUMMARY":
out["summary"] = _unescape_ical(value)
elif name == "DTSTART":
out["dtstart"] = value
elif name == "DTEND":
out["dtend"] = value
elif name == "LOCATION":
out["location"] = _unescape_ical(value)
elif name == "DESCRIPTION":
out["description"] = _unescape_ical(value)
return out
def _vcalendar_to_events(vcalendar_text: str) -> list:
"""Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON."""
events = []
for block in _VEVENT_RE.findall(vcalendar_text):
events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT"))
return events
def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool:
"""True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``.
Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` que comparten todos
los formatos iCal (date y date-time). Los límites se normalizan quitando los
guiones, así acepta tanto el formato documentado del endpoint
(``2026-06-11``) como el iCal crudo (``20260611``). ``dt_from``/``dt_to``
vacíos desactivan ese extremo del filtro.
"""
dtstart = (event.get("dtstart") or "").replace("-", "")[:8]
if not dtstart:
return True
if dt_from and dtstart < dt_from.replace("-", "")[:8]:
return False
if dt_to and dtstart > dt_to.replace("-", "")[:8]:
return False
return True
# ---------------------------------------------------------------------------
# Construcción de la app FastAPI
# ---------------------------------------------------------------------------
def create_app(vault_dir: str) -> FastAPI:
"""Crea la app FastAPI ligada a un vault concreto.
Valida que el vault existe al construir el ``VaultState`` (no un 500
silencioso en el primer request). Registra todos los endpoints sobre un
estado compartido en memoria.
"""
state = VaultState(vault_dir)
app = FastAPI(title="osint_web", version="0.1.0")
app.state.vault = state
# -- Vault --
@app.get("/api/health")
def health() -> dict:
"""Health check: confirma que el servidor está vivo y el vault cargado."""
return {
"status": "ok",
"vault": state.vault_dir,
"nodes": len(state.graph["nodes"]),
"edges": len(state.graph["edges"]),
}
@app.get("/api/graph")
def api_graph() -> dict:
"""Grafo completo del vault (nodos + aristas + conteos) para sigma.js.
Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar
notas.
"""
return state.graph_payload()
@app.get("/api/nodes")
def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict:
"""Filas de la tabla de un tipo concreto (frontmatter aplanado).
Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve
todos.
"""
rows = state.rows_by_tipo(tipo)
return {"tipo": tipo, "count": len(rows), "rows": rows}
@app.get("/api/node/{slug}")
def api_node(slug: str) -> dict:
"""Ficha de un nodo: frontmatter + body + lista de attachments."""
detail = state.node_detail(slug)
if detail is None:
raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug)
return detail
@app.get("/api/attachment")
def api_attachment(
path: str = Query(..., description="path relativo al vault"),
) -> FileResponse:
"""Sirve el binario de un attachment, con allowlist ESTRICTA al vault.
El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica
que cae dentro del vault: cualquier intento de salir
(``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el
archivo no existe (pero está dentro del vault), 404.
"""
abs_path = state.resolve_attachment_path(path)
if abs_path is None:
# Distinguimos traversal (403) de inexistente-dentro-del-vault (404).
candidate = os.path.realpath(os.path.join(state._vault_real, path or ""))
if state._is_within_vault(candidate) and candidate != state._vault_real:
raise HTTPException(status_code=404, detail="attachment no encontrado")
raise HTTPException(status_code=403, detail="path fuera del vault")
return FileResponse(abs_path)
@app.get("/api/search")
def api_search(q: str = Query(..., min_length=1)) -> dict:
"""Nodos del grafo cuyas notas matchean la query (substring)."""
results = state.search(q)
return {"query": q, "count": len(results), "results": results}
# -- Xandikos: contactos (CardDAV) --
@app.get("/api/contacts")
def api_contacts() -> JSONResponse:
"""Contactos del addressbook Xandikos, parseados a JSON (cacheados).
Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[],
osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``).
La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la
invalida). Si Xandikos no responde o falta la password → 503 con un JSON
de error claro, nunca un crash.
"""
try:
contacts = state.contacts()
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=503, content={"status": "error", "error": str(exc)}
)
return JSONResponse(
content={"status": "ok", "count": len(contacts), "contacts": contacts}
)
@app.get("/api/contact/{uid}")
def api_contact(uid: str) -> JSONResponse:
"""Un contacto concreto (por UID) parseado a JSON, desde la caché.
Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo
completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no
responde o falta la password.
"""
try:
contacts = state.contacts()
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=503, content={"status": "error", "error": str(exc)}
)
match = next((c for c in contacts if c.get("uid") == uid), None)
if match is None:
# Tolerancia: aceptar también el segmento final del href (nombre del
# recurso .vcf) cuando el UID no coincide literalmente.
match = next(
(
c
for c in contacts
if uid in (c.get("href") or "").rsplit("/", 1)[-1]
),
None,
)
if match is None:
raise HTTPException(
status_code=404, detail="contacto '%s' no encontrado" % uid
)
return JSONResponse(content={"status": "ok", "contact": match})
# -- Xandikos: calendario (CalDAV) --
@app.get("/api/calendar")
def api_calendar(
from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"),
to: str = Query("", description="fecha fin YYYY-MM-DD"),
) -> JSONResponse:
"""Eventos del calendario Xandikos en ``[from, to]`` (cacheados).
Cada evento: ``{uid, summary, dtstart, dtend, location, description}``.
La descarga + parseo completos se cachean (``POST /api/refresh`` los
invalida); el filtro por rango se aplica sobre la caché. Sin ``from``/
``to`` devuelve todos. Si Xandikos no responde o falta la password →
503 con JSON de error claro, nunca un crash.
"""
try:
events = state.calendar(from_, to)
except (RuntimeError, DavUnavailable) as exc:
return JSONResponse(
status_code=503, content={"status": "error", "error": str(exc)}
)
return JSONResponse(
content={"status": "ok", "count": len(events), "events": events}
)
# -- Refresco de cachés --
@app.post("/api/refresh")
def api_refresh() -> dict:
"""Reconstruye la caché del grafo del vault e invalida las cachés DAV.
Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y
calendario, que se recargarán perezosamente en el siguiente acceso.
Devuelve el conteo del grafo recién reconstruido.
"""
summary = state.refresh()
state.invalidate_dav()
return {"status": "refreshed", **summary}
return app
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def _parse_args(argv: Optional[list] = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos."
)
parser.add_argument(
"--vault",
default=os.path.expanduser("~/Obsidian/osint"),
help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)",
)
parser.add_argument(
"--host",
default="127.0.0.1",
help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)",
)
parser.add_argument(
"--port", type=int, default=8470, help="puerto local (default: 8470)"
)
return parser.parse_args(argv)
def main(argv: Optional[list] = None) -> int:
args = _parse_args(argv)
if args.host != "127.0.0.1":
# Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red.
print(
"ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos "
"personales sensibles; exponerlo a la red es un riesgo.",
file=sys.stderr,
)
try:
app = create_app(args.vault)
except (FileNotFoundError, NotADirectoryError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
import uvicorn
state = app.state.vault
print(
f"osint_web backend en http://{args.host}:{args.port} — vault: "
f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, "
f"{len(state.graph['edges'])} aristas)"
)
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
return 0
if __name__ == "__main__":
sys.exit(main())