Files
osint_web/server/main.py
T
agent 6af9a56c28 feat: initial scaffold of osint_web — backend Python sobre el grupo obsidian
Fase 5b del issue 0172. Backend stdlib http (solo 127.0.0.1) que orquesta
las funciones del grupo obsidian del fn_registry para servir el vault OSINT:
grafo agregado (/api/graph), tablas por tipo (/api/nodes), fichas con
attachments (/api/node, /api/attachment con bloqueo de path traversal) y
busqueda (/api/search). Cache en memoria con POST /api/refresh.

Tests pytest (10) sobre vault fixture: grafo golden, tipo filtrado, ficha
con attachments, wikilink dangling, slug con acentos, traversal bloqueado,
vault inexistente (exit 2) y e2e HTTP en puerto efimero. Frontend (React +
Vite + Mantine + sigma.js) queda para la fase siguiente.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 22:36:07 +02:00

427 lines
16 KiB
Python

#!/usr/bin/env python3
"""Backend de osint_web: sirve el vault OSINT de Obsidian como API JSON local.
Lee directamente los ``.md`` del vault (sin BD intermedia, decisión KISS del
issue 0172) y expone el grafo agregado, las tablas por tipo, las fichas con
attachments y la búsqueda global. Registry-first: todo el parseo del vault lo
hacen las funciones del grupo de capacidad ``obsidian`` del fn_registry — este
módulo solo orquesta y sirve HTTP.
Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), por lo
que el servidor escucha exclusivamente en ``127.0.0.1`` (no hay flag para
exponerlo) y el endpoint de attachments bloquea cualquier path fuera del vault
(path traversal). No es un service desplegable a VPS.
Uso:
python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470
Endpoints (JSON salvo /api/attachment):
GET /api/health estado + tamaño del grafo cacheado
GET /api/graph grafo completo {nodes, edges} para sigma.js
GET /api/nodes?tipo=persona filas de la tabla de ese tipo
GET /api/node/<slug> ficha: frontmatter + body + attachments
GET /api/attachment?path=.. binario del attachment (path relativo al vault)
GET /api/search?q=... nodos cuyo contenido matchea la query
POST /api/refresh re-escanea el vault y reconstruye la caché
"""
import argparse
import datetime
import json
import mimetypes
import os
import sys
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import parse_qs, unquote, urlparse
def _registry_functions_dir() -> str:
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
Prueba primero la variable de entorno ``FN_REGISTRY_ROOT`` y después sube
por los directorios padre de este archivo hasta encontrar una raíz que
contenga ``python/functions/obsidian``. Así el backend funciona en
cualquier PC con el layout estándar del registry (la app vive en
``<root>/projects/osint/apps/osint_web/server/``).
"""
candidates = []
env_root = os.environ.get("FN_REGISTRY_ROOT")
if env_root:
candidates.append(env_root)
current = os.path.dirname(os.path.abspath(__file__))
while True:
candidates.append(current)
parent = os.path.dirname(current)
if parent == current:
break
current = parent
for root in candidates:
functions_dir = os.path.join(root, "python", "functions")
if os.path.isdir(os.path.join(functions_dir, "obsidian")):
return functions_dir
raise RuntimeError(
"no se encontró python/functions/obsidian subiendo desde "
f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz "
"del fn_registry"
)
sys.path.insert(0, _registry_functions_dir())
from obsidian import ( # noqa: E402 (sys.path debe resolverse antes)
build_obsidian_graph,
extract_obsidian_embeds,
list_obsidian_notes,
read_obsidian_note,
resolve_obsidian_embed,
search_obsidian_notes,
slugify_obsidian_name,
)
# Extensiones de imagen que el frontend muestra en la galería con lightbox.
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
def _json_default(value):
"""Serializa tipos no-JSON del frontmatter YAML (fechas, etc.).
PyYAML parsea ``fecha_nacimiento: 1980-05-01`` como ``datetime.date``;
sin esto ``json.dumps`` revienta con el vault real. Las fechas viajan en
ISO (``YYYY-MM-DD``, ordenable); el frontend las muestra en formato
europeo DD/MM/AAAA. Cualquier otro tipo raro cae a ``str``.
"""
if isinstance(value, (datetime.date, datetime.datetime)):
return value.isoformat()
return str(value)
def _attachment_kind(name: str) -> str:
"""Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``."""
ext = os.path.splitext(name)[1].lower()
if ext in _IMAGE_EXTS:
return "image"
if ext == ".pdf":
return "pdf"
return "other"
class VaultState:
"""Caché en memoria del vault: grafo agregado + índice slug → nota.
Se construye al arrancar y se reconstruye bajo demanda con ``refresh()``
(botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe
para el ThreadingHTTPServer mediante un lock sobre la reconstrucción.
Raises:
FileNotFoundError: si ``vault_dir`` no existe (error claro al
arrancar, nunca un 500 silencioso).
NotADirectoryError: si ``vault_dir`` no es un directorio.
"""
def __init__(self, vault_dir: str):
if not os.path.exists(vault_dir):
raise FileNotFoundError(f"el vault no existe: {vault_dir}")
if not os.path.isdir(vault_dir):
raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}")
self.vault_dir = os.path.abspath(vault_dir)
self._vault_real = os.path.realpath(self.vault_dir)
self._lock = threading.Lock()
self.graph: dict = {"nodes": [], "edges": []}
self.note_index: dict = {} # slug -> {"path", "tipo", "label"}
self.refresh()
def refresh(self) -> dict:
"""Re-escanea el vault: reconstruye grafo + índice de notas.
Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend.
"""
with self._lock:
graph = build_obsidian_graph(self.vault_dir, include_dangling=True)
nodes_by_id = {n["id"]: n for n in graph["nodes"]}
note_index: dict = {}
for path in list_obsidian_notes(self.vault_dir):
slug = os.path.splitext(os.path.basename(path))[0]
if not slug or slug in note_index:
continue
node = nodes_by_id.get(slug, {})
note_index[slug] = {
"path": path,
"tipo": node.get("tipo", "nota"),
"label": node.get("label", slug),
}
self.graph = graph
self.note_index = note_index
return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])}
def rows_by_tipo(self, tipo: str) -> list:
"""Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados.
Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter``
completo — el frontend aplana las columnas que le interesen.
Sin ``tipo`` devuelve todos los nodos reales.
"""
rows = []
for node in self.graph["nodes"]:
if node.get("dangling"):
continue
if tipo and node["tipo"] != tipo:
continue
rows.append(
{
"id": node["id"],
"label": node["label"],
"tipo": node["tipo"],
"frontmatter": node["frontmatter"],
}
)
return rows
def node_detail(self, slug: str):
"""Ficha completa de un nodo: frontmatter + body + attachments.
Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos
a paths reales con ``resolve_obsidian_embed`` y devueltos como paths
**relativos al vault** (lo que consume ``/api/attachment``). Un embed
que no resuelve se reporta con ``kind: "missing"`` y path vacío.
Devuelve ``None`` si el slug no corresponde a ninguna nota del vault.
"""
info = self.note_index.get(slug)
if info is None:
# Tolerancia: aceptar también nombres sin slugificar.
info = self.note_index.get(slugify_obsidian_name(slug))
if info is None:
return None
note = read_obsidian_note(info["path"])
attachments = []
for name in extract_obsidian_embeds(note["body"]):
abs_path = resolve_obsidian_embed(self.vault_dir, name)
if not abs_path:
attachments.append({"name": name, "path": "", "kind": "missing"})
continue
rel = os.path.relpath(os.path.realpath(abs_path), self._vault_real)
attachments.append(
{"name": name, "path": rel, "kind": _attachment_kind(abs_path)}
)
return {
"id": os.path.splitext(os.path.basename(info["path"]))[0],
"tipo": info["tipo"],
"label": info["label"],
"frontmatter": note["frontmatter"],
"body": note["body"],
"tags": note["tags"],
"wikilinks": note["wikilinks"],
"attachments": attachments,
}
def resolve_attachment_path(self, rel_path: str):
"""Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault.
Bloquea path traversal: normaliza con ``realpath`` y exige que el
resultado quede estrictamente bajo la raíz real del vault. Devuelve
``None`` (→ 403/404) ante cualquier intento de salir del vault, paths
absolutos, o archivos inexistentes.
"""
if not rel_path:
return None
candidate = os.path.realpath(os.path.join(self._vault_real, rel_path))
if candidate == self._vault_real:
return None
if not candidate.startswith(self._vault_real + os.sep):
return None
if not os.path.isfile(candidate):
return None
return candidate
def search(self, query: str) -> list:
"""Búsqueda global: nodos cuyas notas matchean la query (substring).
Compone ``search_obsidian_notes`` y mapea cada hit a su nodo
(slug, label, tipo) + las líneas que matchean.
"""
results = []
for hit in search_obsidian_notes(self.vault_dir, query):
slug = os.path.splitext(os.path.basename(hit["path"]))[0]
info = self.note_index.get(slug, {})
results.append(
{
"id": slug,
"label": info.get("label", slug),
"tipo": info.get("tipo", "nota"),
"matches": hit.get("matches", []),
}
)
return results
class OsintWebHandler(BaseHTTPRequestHandler):
"""Router HTTP fino sobre VaultState. Solo GET (+ POST /api/refresh)."""
# Inyectado por make_server(); class attribute para que cada request
# (instancia nueva por conexión) comparta la misma caché.
state: VaultState = None
quiet = False
# --- helpers de respuesta -------------------------------------------------
def _send_json(self, status: int, payload) -> None:
body = json.dumps(payload, ensure_ascii=False, default=_json_default).encode(
"utf-8"
)
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
# El frontend (vite dev server en otro puerto local) necesita CORS.
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(body)
def _send_file(self, abs_path: str) -> None:
ctype = mimetypes.guess_type(abs_path)[0] or "application/octet-stream"
with open(abs_path, "rb") as f:
data = f.read()
self.send_response(200)
self.send_header("Content-Type", ctype)
self.send_header("Content-Length", str(len(data)))
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(data)
# --- rutas ----------------------------------------------------------------
def do_GET(self) -> None: # noqa: N802 (API de BaseHTTPRequestHandler)
parsed = urlparse(self.path)
route = parsed.path
params = parse_qs(parsed.query)
try:
if route == "/" or route == "/api":
self._send_json(
200,
{
"app": "osint_web",
"vault": self.state.vault_dir,
"endpoints": [
"/api/health",
"/api/graph",
"/api/nodes?tipo=<tipo>",
"/api/node/<slug>",
"/api/attachment?path=<rel>",
"/api/search?q=<query>",
"POST /api/refresh",
],
},
)
elif route == "/api/health":
self._send_json(
200,
{
"status": "ok",
"vault": self.state.vault_dir,
"nodes": len(self.state.graph["nodes"]),
"edges": len(self.state.graph["edges"]),
},
)
elif route == "/api/graph":
self._send_json(200, self.state.graph)
elif route == "/api/nodes":
tipo = params.get("tipo", [""])[0]
self._send_json(200, self.state.rows_by_tipo(tipo))
elif route.startswith("/api/node/"):
slug = unquote(route[len("/api/node/") :]).strip("/")
detail = self.state.node_detail(slug)
if detail is None:
self._send_json(404, {"error": f"nodo no encontrado: {slug}"})
else:
self._send_json(200, detail)
elif route == "/api/attachment":
rel = params.get("path", [""])[0]
abs_path = self.state.resolve_attachment_path(rel)
if abs_path is None:
self._send_json(
403, {"error": "attachment fuera del vault o inexistente"}
)
else:
self._send_file(abs_path)
elif route == "/api/search":
query = params.get("q", [""])[0]
if not query:
self._send_json(400, {"error": "falta el parámetro q"})
else:
self._send_json(200, self.state.search(query))
else:
self._send_json(404, {"error": f"ruta desconocida: {route}"})
except BrokenPipeError:
pass
except Exception as exc: # noqa: BLE001 — nunca tumbar el server
self._send_json(500, {"error": f"{type(exc).__name__}: {exc}"})
def do_POST(self) -> None: # noqa: N802
route = urlparse(self.path).path
try:
if route == "/api/refresh":
summary = self.state.refresh()
self._send_json(200, {"status": "refreshed", **summary})
else:
self._send_json(404, {"error": f"ruta desconocida: {route}"})
except Exception as exc: # noqa: BLE001
self._send_json(500, {"error": f"{type(exc).__name__}: {exc}"})
def log_message(self, fmt, *args): # noqa: A003
if not self.quiet:
sys.stderr.write(
"%s - %s\n" % (self.address_string(), fmt % args)
)
def make_server(vault_dir: str, port: int, quiet: bool = False) -> ThreadingHTTPServer:
"""Construye el HTTPServer ligado a 127.0.0.1 con la caché del vault lista.
Separado de ``main()`` para que los tests arranquen el server en un puerto
efímero (``port=0``) sin pasar por argparse.
"""
state = VaultState(vault_dir)
handler = type(
"BoundOsintWebHandler", (OsintWebHandler,), {"state": state, "quiet": quiet}
)
return ThreadingHTTPServer(("127.0.0.1", port), handler)
def main(argv=None) -> int:
parser = argparse.ArgumentParser(
description="Backend local de osint_web: sirve el vault OSINT como API JSON."
)
parser.add_argument(
"--vault",
default=os.path.expanduser("~/Obsidian/osint"),
help="ruta a la raíz del vault de Obsidian (default: ~/Obsidian/osint)",
)
parser.add_argument(
"--port", type=int, default=8470, help="puerto local (default: 8470)"
)
args = parser.parse_args(argv)
try:
server = make_server(args.vault, args.port)
except (FileNotFoundError, NotADirectoryError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
state = server.RequestHandlerClass.state
print(
f"osint_web backend en http://127.0.0.1:{args.port} — vault: "
f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, "
f"{len(state.graph['edges'])} aristas)"
)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nparando osint_web backend")
finally:
server.server_close()
return 0
if __name__ == "__main__":
sys.exit(main())