#!/usr/bin/env python3 """Backend FastAPI de la app osint_web. Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres fuentes de datos para un frontend web local: 1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo, fichas con galería de attachments y búsqueda global). 2. La agenda CardDAV del servidor Xandikos (contactos). 3. El calendario CalDAV del servidor Xandikos (eventos). Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de embeds ni protocolo DAV. Orquesta funciones del registry de los grupos ``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret`` para la credencial de Xandikos. La única lógica propia de la app es: cacheo en memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y el parseo ligero de vCard/iCalendar a JSON. Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint ``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado cae dentro del vault; cualquier intento de path traversal devuelve 403. Uso: python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470 Endpoints (JSON salvo /api/attachment): GET /api/health estado + tamaño del grafo cacheado GET /api/graph grafo completo {nodes, edges} para sigma.js GET /api/nodes?tipo=persona filas de la tabla de ese tipo GET /api/node/ ficha: frontmatter + body + attachments GET /api/attachment?path=.. binario del attachment (path relativo al vault) GET /api/search?q=... nodos cuyo contenido matchea la query GET /api/contacts contactos del addressbook Xandikos (CardDAV) GET /api/contact/ un vCard concreto a JSON GET /api/calendar?from=&to= eventos del calendario Xandikos (CalDAV) POST /api/refresh re-escanea el vault y reconstruye la caché """ from __future__ import annotations import argparse import importlib.util import os import re import sys import threading from typing import Optional def _registry_functions_dir() -> str: """Localiza ``python/functions`` del fn_registry sin paths hardcodeados. Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y ``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el backend funciona en cualquier PC con el layout estándar del registry (la app vive en ``/projects/osint/apps/osint_web/server/``) sin hardcodear el home de un usuario concreto (memoria ``hardcoded-lucas-paths``). """ env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS") if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")): return env_functions candidates: list[str] = [] env_root = os.environ.get("FN_REGISTRY_ROOT") if env_root: candidates.append(env_root) current = os.path.dirname(os.path.abspath(__file__)) while True: candidates.append(current) parent = os.path.dirname(current) if parent == current: break current = parent candidates.append("/home/enmanuel/fn_registry") for root in candidates: functions_dir = os.path.join(root, "python", "functions") if os.path.isdir(os.path.join(functions_dir, "obsidian")): return functions_dir raise RuntimeError( "no se encontró python/functions/obsidian subiendo desde " f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz " "del fn_registry" ) _FUNCTIONS_DIR = _registry_functions_dir() sys.path.insert(0, _FUNCTIONS_DIR) from fastapi import FastAPI, HTTPException, Query # noqa: E402 from fastapi.responses import FileResponse, JSONResponse # noqa: E402 # --- Grupo de capacidad obsidian (parseo del vault) --- # El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así # que se importa directamente. from obsidian import ( # noqa: E402 (sys.path debe resolverse antes) build_obsidian_graph, extract_obsidian_embeds, list_obsidian_notes, read_obsidian_note, resolve_obsidian_embed, search_obsidian_notes, slugify_obsidian_name, ) def _load_infra_fn(module_name: str, attr: str): """Carga una función del paquete ``infra`` por archivo, sin tocar su __init__. El ``infra/__init__.py`` del registry importa de forma eager ``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que esta app no usa. Importar ``from infra.dav_list_resources import ...`` arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se carga cada módulo concreto del grupo ``dav`` directamente por su ruta de archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo registry-first: se usa la función del registry sin reimplementarla, solo se importa de forma quirúrgica. """ file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py") spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path) if spec is None or spec.loader is None: # pragma: no cover - defensivo raise ImportError("no se pudo cargar %s" % file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return getattr(module, attr) # --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass --- dav_list_resources = _load_infra_fn("dav_list_resources", "dav_list_resources") dav_get_resource = _load_infra_fn("dav_get_resource", "dav_get_resource") split_vcards = _load_infra_fn("split_vcards", "split_vcards") pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret") # --------------------------------------------------------------------------- # Configuración Xandikos (CardDAV / CalDAV) # --------------------------------------------------------------------------- XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com" XANDIKOS_USERNAME = "enmanuel" XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel" XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/" XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/" # Extensiones de imagen que el frontend muestra en la galería con lightbox. _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"} def _attachment_kind(name: str) -> str: """Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``.""" ext = os.path.splitext(name)[1].lower() if ext in _IMAGE_EXTS: return "image" if ext == ".pdf": return "pdf" return "other" def _read_pass_secret(entry: str) -> str: """Lee la contraseña (primera línea) de una entrada de ``pass``. Wrapper fino sobre la función del registry ``pass_get_secret_py_infra`` (grupo ``infra``/``flow-replay``), que ejecuta ``pass show `` sin shell y nunca logea el valor. Convierte su resultado ``{status, value|error}`` en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints DAV degraden con un error explicable en vez de un 500 silencioso. """ res = pass_get_secret(entry) if res.get("status") != "ok": raise RuntimeError( "no se pudo leer la entrada '%s' de pass: %s" % (entry, res.get("error", "error desconocido")) ) value = res.get("value", "") if not value.strip(): raise RuntimeError("la entrada '%s' de pass está vacía" % entry) return value.strip() # --------------------------------------------------------------------------- # Estado del servidor: caché del vault + password Xandikos # --------------------------------------------------------------------------- class VaultState: """Caché en memoria del vault: grafo agregado + índice slug → nota. Se construye al arrancar y se reconstruye bajo demanda con ``refresh()`` (botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe mediante un lock sobre la reconstrucción. La password de Xandikos se lee de ``pass`` perezosamente y se cachea en memoria. Raises: FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar, nunca un 500 silencioso). NotADirectoryError: si ``vault_dir`` no es un directorio. """ def __init__(self, vault_dir: str): if not os.path.exists(vault_dir): raise FileNotFoundError(f"el vault no existe: {vault_dir}") if not os.path.isdir(vault_dir): raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}") self.vault_dir = os.path.abspath(vault_dir) self._vault_real = os.path.realpath(self.vault_dir) self._lock = threading.Lock() self.graph: dict = {"nodes": [], "edges": []} self.note_index: dict = {} # slug -> {"path", "tipo", "label"} self._xandikos_password: Optional[str] = None # Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al # primer acceso y se invalidan en POST /api/refresh. None = sin cargar. self._dav_lock = threading.Lock() self._contacts_cache: Optional[list] = None self._calendar_cache: Optional[list] = None self.refresh() # --- vault -------------------------------------------------------------- def refresh(self) -> dict: """Re-escanea el vault: reconstruye grafo + índice de notas. Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend. """ with self._lock: graph = build_obsidian_graph(self.vault_dir, include_dangling=True) nodes_by_id = {n["id"]: n for n in graph["nodes"]} note_index: dict = {} for path in list_obsidian_notes(self.vault_dir): slug = os.path.splitext(os.path.basename(path))[0] if not slug or slug in note_index: continue node = nodes_by_id.get(slug, {}) note_index[slug] = { "path": path, "tipo": node.get("tipo", "nota"), "label": node.get("label", slug), } self.graph = graph self.note_index = note_index return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])} def graph_payload(self) -> dict: """Grafo + conteos por tipo para la leyenda de sigma.js.""" counts: dict[str, int] = {} for node in self.graph["nodes"]: counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1 return { "nodes": self.graph["nodes"], "edges": self.graph["edges"], "counts": counts, "total_nodes": len(self.graph["nodes"]), "total_edges": len(self.graph["edges"]), } def rows_by_tipo(self, tipo: str) -> list: """Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados. Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter`` completo — el frontend aplana las columnas que le interesen. Sin ``tipo`` devuelve todos los nodos reales. """ rows = [] for node in self.graph["nodes"]: if node.get("dangling"): continue if tipo and node["tipo"] != tipo: continue rows.append( { "id": node["id"], "label": node["label"], "tipo": node["tipo"], "frontmatter": node["frontmatter"], } ) return rows def _resolve_embed(self, embed_name: str) -> str: """Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault. El vault osint usa dos formas de embed: por nombre de archivo (``![[foto.jpg]]``) y por path relativo al vault (``![[attachments/personas//foto.png]]``). La función del registry ``resolve_obsidian_embed`` resuelve solo por basename, así que primero se intenta el embed como path literal relativo al vault (cubre la forma con ruta), y si no existe se cae al resolutor por basename del registry. Devuelve cadena vacía si ninguna forma resuelve. """ # Forma 1: el embed ya es un path relativo al vault. literal = os.path.realpath(os.path.join(self._vault_real, embed_name)) if self._is_within_vault(literal) and os.path.isfile(literal): return literal # Forma 2: resolución por basename via función del registry. return resolve_obsidian_embed(self.vault_dir, embed_name) def node_detail(self, slug: str): """Ficha completa de un nodo: frontmatter + body + attachments. Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``) y devueltos como paths **relativos al vault** (lo que consume ``/api/attachment``). Un embed que no resuelve se reporta con ``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no corresponde a ninguna nota del vault. """ info = self.note_index.get(slug) if info is None: # Tolerancia: aceptar también nombres sin slugificar. info = self.note_index.get(slugify_obsidian_name(slug)) if info is None: return None note = read_obsidian_note(info["path"]) attachments = [] for name in extract_obsidian_embeds(note["body"]): abs_path = self._resolve_embed(name) if not abs_path: attachments.append({"name": name, "path": "", "kind": "missing"}) continue real = os.path.realpath(abs_path) # Defensa en profundidad: solo attachments dentro del vault. if not self._is_within_vault(real): continue rel = os.path.relpath(real, self._vault_real) attachments.append( {"name": name, "path": rel, "kind": _attachment_kind(abs_path)} ) return { "id": os.path.splitext(os.path.basename(info["path"]))[0], "tipo": info["tipo"], "label": info["label"], "frontmatter": note["frontmatter"], "body": note["body"], "tags": note["tags"], "wikilinks": note["wikilinks"], "attachments": attachments, } def _is_within_vault(self, candidate_real_path: str) -> bool: """True si ``candidate_real_path`` (ya realpath) está dentro del vault. Añade el separador final al vault para que ``/vault-evil`` no cuele como prefijo de ``/vault``. """ return ( candidate_real_path == self._vault_real or candidate_real_path.startswith(self._vault_real + os.sep) ) def resolve_attachment_path(self, rel_path: str): """Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault. Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y sigue symlinks) y exige que el resultado quede estrictamente bajo la raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento de salir del vault, paths absolutos, o archivos inexistentes. """ if not rel_path: return None candidate = os.path.realpath(os.path.join(self._vault_real, rel_path)) if candidate == self._vault_real: return None if not candidate.startswith(self._vault_real + os.sep): return None if not os.path.isfile(candidate): return None return candidate def search(self, query: str) -> list: """Búsqueda global: nodos cuyas notas matchean la query (substring). Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug, label, tipo) + las líneas que matchean. """ results = [] for hit in search_obsidian_notes(self.vault_dir, query): slug = os.path.splitext(os.path.basename(hit["path"]))[0] info = self.note_index.get(slug, {}) results.append( { "id": slug, "label": info.get("label", slug), "tipo": info.get("tipo", "nota"), "matches": hit.get("matches", []), } ) return results # --- Xandikos ----------------------------------------------------------- def xandikos_password(self) -> str: """Password de Xandikos desde ``pass``, cacheada en memoria.""" with self._lock: if self._xandikos_password is None: self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY) return self._xandikos_password # --------------------------------------------------------------------------- # Helpers DAV: parseo ligero de vCard / iCalendar a JSON # --------------------------------------------------------------------------- def _unescape_ical(value: str) -> str: """Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350).""" return ( value.replace("\\n", "\n") .replace("\\N", "\n") .replace("\\,", ",") .replace("\\;", ";") .replace("\\\\", "\\") ) def _unfold_lines(text: str) -> list: """Des-pliega las líneas continuadas (folding) de un vCard/iCalendar. RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la anterior. Esta función las une para parsearlas como propiedades completas. """ raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n") unfolded: list = [] for line in raw_lines: if line[:1] in (" ", "\t") and unfolded: unfolded[-1] += line[1:] else: unfolded.append(line) return unfolded def _parse_property(line: str) -> Optional[tuple]: """Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``. Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si la línea no es una propiedad (sin ``:``). El nombre se devuelve en mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``); los params como dict con claves en mayúsculas. """ if ":" not in line: return None head, value = line.split(":", 1) parts = head.split(";") name = parts[0].strip() # Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping). if "." in name: name = name.rsplit(".", 1)[-1] name = name.upper() params: dict = {} for part in parts[1:]: if "=" in part: k, v = part.split("=", 1) params[k.strip().upper()] = v.strip() return name, params, value def _vcard_to_json(vcard_text: str) -> dict: """Convierte un VCARD a un dict JSON con los campos de interés. Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME), teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque ``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` → ``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard ya viene troceado por ``split_vcards``. Expone tanto las claves en español que consume el frontend del task (``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el frontend a un único shape. """ out: dict = { "uid": None, "fn": None, "nickname": None, "org": None, "note": None, "phones": [], "emails": [], "osint": {}, } for line in _unfold_lines(vcard_text): parsed = _parse_property(line) if not parsed: continue name, params, value = parsed value = _unescape_ical(value.strip()) if name == "UID": out["uid"] = value elif name == "FN": out["fn"] = value elif name == "NICKNAME": out["nickname"] = value elif name == "ORG": out["org"] = value.replace(";", " ").strip() elif name == "NOTE": out["note"] = value elif name == "TEL": out["phones"].append({"value": value, "type": params.get("TYPE", "")}) elif name == "EMAIL": out["emails"].append({"value": value, "type": params.get("TYPE", "")}) elif name.startswith("X-OSINT-"): key = name[len("X-OSINT-") :].lower().replace("-", "_") if key: out["osint"][key] = value elif name == "N" and not out["fn"]: # Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido". comps = [c for c in value.split(";") if c] if len(comps) >= 2: out["fn"] = ("%s %s" % (comps[1], comps[0])).strip() elif comps: out["fn"] = comps[0] # Alias en español que consume el frontend del task (mismo dato, otra clave). out["nombre"] = out["fn"] out["alias"] = out["nickname"] out["nota"] = out["note"] out["telefonos"] = [p["value"] for p in out["phones"]] out["correos"] = [e["value"] for e in out["emails"]] return out _VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE) def _vevent_to_json(vevent_block: str) -> dict: """Convierte un bloque VEVENT a un dict JSON con los campos de interés. Extrae: uid, summary, dtstart, dtend, location, description. Las fechas se devuelven tal cual vienen del servidor (formato iCal, ej. ``20260611T090000Z`` o ``20260611``); el frontend las formatea a europeo. Parseo ligero a mano. """ out: dict = { "uid": None, "summary": None, "dtstart": None, "dtend": None, "location": None, "description": None, } for line in _unfold_lines(vevent_block): parsed = _parse_property(line) if not parsed: continue name, _params, value = parsed value = value.strip() if name == "UID": out["uid"] = value elif name == "SUMMARY": out["summary"] = _unescape_ical(value) elif name == "DTSTART": out["dtstart"] = value elif name == "DTEND": out["dtend"] = value elif name == "LOCATION": out["location"] = _unescape_ical(value) elif name == "DESCRIPTION": out["description"] = _unescape_ical(value) return out def _vcalendar_to_events(vcalendar_text: str) -> list: """Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON.""" events = [] for block in _VEVENT_RE.findall(vcalendar_text): events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT")) return events def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool: """True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``. La comparación es lexicográfica sobre el prefijo de fecha ``YYYYMMDD`` que comparten todos los formatos iCal (date y date-time). ``dt_from``/``dt_to`` vacíos desactivan ese extremo del filtro. """ dtstart = (event.get("dtstart") or "")[:8] if not dtstart: return True if dt_from and dtstart < dt_from[:8]: return False if dt_to and dtstart > dt_to[:8]: return False return True def _uid_to_href(uid: str, resources: list) -> Optional[str]: """Localiza el href de un recurso DAV cuyo último segmento contiene el uid.""" for res in resources: href = res.get("href", "") tail = href.rstrip("/").rsplit("/", 1)[-1] if uid in tail or tail.startswith(uid): return href return None # --------------------------------------------------------------------------- # Construcción de la app FastAPI # --------------------------------------------------------------------------- def create_app(vault_dir: str) -> FastAPI: """Crea la app FastAPI ligada a un vault concreto. Valida que el vault existe al construir el ``VaultState`` (no un 500 silencioso en el primer request). Registra todos los endpoints sobre un estado compartido en memoria. """ state = VaultState(vault_dir) app = FastAPI(title="osint_web", version="0.1.0") app.state.vault = state # -- Vault -- @app.get("/api/health") def health() -> dict: """Health check: confirma que el servidor está vivo y el vault cargado.""" return { "status": "ok", "vault": state.vault_dir, "nodes": len(state.graph["nodes"]), "edges": len(state.graph["edges"]), } @app.get("/api/graph") def api_graph() -> dict: """Grafo completo del vault (nodos + aristas + conteos) para sigma.js. Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar notas. """ return state.graph_payload() @app.get("/api/nodes") def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict: """Filas de la tabla de un tipo concreto (frontmatter aplanado). Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve todos. """ rows = state.rows_by_tipo(tipo) return {"tipo": tipo, "count": len(rows), "rows": rows} @app.get("/api/node/{slug}") def api_node(slug: str) -> dict: """Ficha de un nodo: frontmatter + body + lista de attachments.""" detail = state.node_detail(slug) if detail is None: raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug) return detail @app.get("/api/attachment") def api_attachment( path: str = Query(..., description="path relativo al vault"), ) -> FileResponse: """Sirve el binario de un attachment, con allowlist ESTRICTA al vault. El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica que cae dentro del vault: cualquier intento de salir (``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el archivo no existe (pero está dentro del vault), 404. """ abs_path = state.resolve_attachment_path(path) if abs_path is None: # Distinguimos traversal (403) de inexistente-dentro-del-vault (404). candidate = os.path.realpath(os.path.join(state._vault_real, path or "")) if state._is_within_vault(candidate) and candidate != state._vault_real: raise HTTPException(status_code=404, detail="attachment no encontrado") raise HTTPException(status_code=403, detail="path fuera del vault") return FileResponse(abs_path) @app.get("/api/search") def api_search(q: str = Query(..., min_length=1)) -> dict: """Nodos del grafo cuyas notas matchean la query (substring).""" results = state.search(q) return {"query": q, "count": len(results), "results": results} # -- Xandikos: contactos (CardDAV) -- @app.get("/api/contacts") def api_contacts() -> JSONResponse: """Contactos del addressbook Xandikos, parseados a JSON. Lista los recursos de la colección CardDAV, descarga cada ``.vcf`` y lo parsea con ``split_vcards`` + parseo ligero. Si Xandikos no responde (sin red) devuelve un error claro (502), no un crash. """ try: password = state.xandikos_password() except RuntimeError as exc: return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)}) listing = dav_list_resources( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CONTACTS_COLLECTION, ) if listing.get("status") != "ok": return JSONResponse( status_code=502, content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")}, ) contacts: list = [] for res in listing.get("resources", []): href = res.get("href") if not href or not href.lower().endswith(".vcf"): continue got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href) if got.get("status") != "ok": continue for card_text in split_vcards(got.get("text", "")): card = _vcard_to_json(card_text) card["etag"] = res.get("etag") contacts.append(card) contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower()) return JSONResponse(content={"status": "ok", "count": len(contacts), "contacts": contacts}) @app.get("/api/contact/{uid}") def api_contact(uid: str) -> JSONResponse: """Un vCard concreto (por UID) a JSON.""" try: password = state.xandikos_password() except RuntimeError as exc: return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)}) listing = dav_list_resources( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CONTACTS_COLLECTION ) if listing.get("status") != "ok": return JSONResponse( status_code=502, content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")}, ) href = _uid_to_href(uid, listing.get("resources", [])) if not href: raise HTTPException(status_code=404, detail="contacto '%s' no encontrado" % uid) got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href) if got.get("status") != "ok": return JSONResponse( status_code=502, content={"status": "error", "error": "no se pudo descargar el vCard"}, ) cards = split_vcards(got.get("text", "")) if not cards: raise HTTPException(status_code=404, detail="vCard vacío") return JSONResponse(content={"status": "ok", "contact": _vcard_to_json(cards[0])}) # -- Xandikos: calendario (CalDAV) -- @app.get("/api/calendar") def api_calendar( from_: str = Query("", alias="from", description="fecha inicio YYYYMMDD"), to: str = Query("", description="fecha fin YYYYMMDD"), ) -> JSONResponse: """Eventos del calendario Xandikos en ``[from, to]``, parseados a JSON. Lista los recursos de la colección CalDAV, descarga cada ``.ics``, extrae sus VEVENT y los filtra por DTSTART dentro del rango. Sin red -> error claro (502), no crash. """ try: password = state.xandikos_password() except RuntimeError as exc: return JSONResponse(status_code=503, content={"status": "error", "error": str(exc)}) listing = dav_list_resources( XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, XANDIKOS_CALENDAR_COLLECTION ) if listing.get("status") != "ok": return JSONResponse( status_code=502, content={"status": "error", "error": "Xandikos no responde: %s" % listing.get("error")}, ) events: list = [] for res in listing.get("resources", []): href = res.get("href") if not href or not href.lower().endswith(".ics"): continue got = dav_get_resource(XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, href) if got.get("status") != "ok": continue for event in _vcalendar_to_events(got.get("text", "")): if _event_in_range(event, from_, to): events.append(event) events.sort(key=lambda e: e.get("dtstart") or "") return JSONResponse(content={"status": "ok", "count": len(events), "events": events}) # -- Refresco de cachés -- @app.post("/api/refresh") def api_refresh() -> dict: """Invalida y reconstruye la caché del grafo del vault. Los datos DAV no se cachean, así que esto solo afecta al grafo/tablas del vault. Devuelve el conteo del grafo recién reconstruido. """ summary = state.refresh() return {"status": "refreshed", **summary} return app # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def _parse_args(argv: Optional[list] = None) -> argparse.Namespace: parser = argparse.ArgumentParser( description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos." ) parser.add_argument( "--vault", default=os.path.expanduser("~/Obsidian/osint"), help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)", ) parser.add_argument( "--host", default="127.0.0.1", help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)", ) parser.add_argument( "--port", type=int, default=8470, help="puerto local (default: 8470)" ) return parser.parse_args(argv) def main(argv: Optional[list] = None) -> int: args = _parse_args(argv) if args.host != "127.0.0.1": # Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red. print( "ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos " "personales sensibles; exponerlo a la red es un riesgo.", file=sys.stderr, ) try: app = create_app(args.vault) except (FileNotFoundError, NotADirectoryError) as exc: print(f"error: {exc}", file=sys.stderr) return 2 import uvicorn state = app.state.vault print( f"osint_web backend en http://{args.host}:{args.port} — vault: " f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, " f"{len(state.graph['edges'])} aristas)" ) uvicorn.run(app, host=args.host, port=args.port, log_level="info") return 0 if __name__ == "__main__": sys.exit(main())