43889bfc07
Añade alta, edición y borrado de contactos (personas y organizaciones) a la
app osint_web. La fuente de verdad es la ficha .md del vault Obsidian
(CONVENTIONS.md §3b/§6); Xandikos es el retransmisor al móvil.
Backend (server/main.py):
- POST /api/contact: genera slug, escribe la ficha .md con el frontmatter
canónico + PUT del vCard a Xandikos. 409 si el slug ya existe.
- PUT /api/contact/{slug}: merge del frontmatter (preserva campos heredados)
+ re-PUT del vCard. 404 si no existe.
- DELETE /api/contact/{slug}: borra la ficha .md + DELETE del vCard. 404 si
no existe.
Cada escritura invalida la caché DAV para que el cambio se vea ya en la app.
Registry-first: orquesta create/update/delete_obsidian_note del grupo obsidian
y carddav_put_vcard/dav_delete_resource del grupo dav (sin reimplementar
parseo ni HTTP). Mapea los campos OSINT a propiedades X-OSINT-* del vCard.
Frontend (ContactsView.tsx + api.ts + format.ts):
- Botón "Nuevo contacto" → modal con formulario Mantine (TextInput,
TagsInput aliases, Select contexto, Textarea notas).
- Detalle: botones "Editar" (formulario precargado) y "Borrar" (con
confirmación). Tras guardar refresca la lista.
- Helper slugify (replica slugify_obsidian_name) para resolver la ficha.
Tests: 6 nuevos casos (ciclo crear→editar→borrar con .md real + reflejo vCard
mockeado, organización, 404s, tipo inválido, preserva campos heredados). Suite
27 passed. Ciclo e2e real verificado contra Xandikos + vault (vCard creado,
editado y borrado; slug zz-test-crud limpiado). pnpm build verde (React 19 +
Mantine v9).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1466 lines
59 KiB
Python
1466 lines
59 KiB
Python
#!/usr/bin/env python3
|
|
"""Backend FastAPI de la app osint_web.
|
|
|
|
Sirve, en JSON (salvo el endpoint de attachments, que sirve binarios), tres
|
|
fuentes de datos para un frontend web local:
|
|
|
|
1. El vault de Obsidian ``osint`` (grafo de nodos + aristas, tablas por tipo,
|
|
fichas con galería de attachments y búsqueda global).
|
|
2. La agenda CardDAV del servidor Xandikos (contactos).
|
|
3. El calendario CalDAV del servidor Xandikos (eventos).
|
|
|
|
Registry-first: este servidor NO reimplementa parseo de Markdown, resolución de
|
|
embeds ni protocolo DAV. Orquesta funciones del registry de los grupos
|
|
``obsidian`` (parseo del vault) y ``dav`` (CardDAV/CalDAV) + ``pass_get_secret``
|
|
para la credencial de Xandikos. La única lógica propia de la app es: cacheo en
|
|
memoria, allowlist de path traversal, aplanado del frontmatter para las tablas y
|
|
el parseo ligero de vCard/iCalendar a JSON.
|
|
|
|
Seguridad: el vault contiene datos personales sensibles (DNIs, fotos), así que
|
|
el servidor escucha SOLO en ``127.0.0.1`` y nunca expone a la red. El endpoint
|
|
``/api/attachment`` valida con ``os.path.realpath`` que el archivo solicitado
|
|
cae dentro del vault; cualquier intento de path traversal devuelve 403.
|
|
|
|
Uso:
|
|
python3 server/main.py --vault /home/enmanuel/Obsidian/osint --port 8470
|
|
|
|
Endpoints (JSON salvo /api/attachment):
|
|
GET /api/health estado + tamaño del grafo cacheado
|
|
GET /api/graph grafo completo {nodes, edges} para sigma.js
|
|
GET /api/nodes?tipo=persona filas de la tabla de ese tipo
|
|
GET /api/node/<slug> ficha: frontmatter + body + attachments
|
|
GET /api/attachment?path=.. binario del attachment (path relativo al vault)
|
|
GET /api/search?q=... nodos cuyo contenido matchea la query
|
|
GET /api/contacts contactos del addressbook Xandikos (CardDAV)
|
|
GET /api/contact/<uid> un vCard concreto a JSON
|
|
GET /api/calendar?from=&to= eventos del calendario Xandikos (CalDAV)
|
|
POST /api/refresh re-escanea el vault y reconstruye la caché
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import importlib.util
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
|
|
def _registry_functions_dir() -> str:
|
|
"""Localiza ``python/functions`` del fn_registry sin paths hardcodeados.
|
|
|
|
Prueba primero las variables de entorno ``FN_REGISTRY_FUNCTIONS`` y
|
|
``FN_REGISTRY_ROOT``, después sube por los directorios padre de este archivo
|
|
hasta encontrar una raíz que contenga ``python/functions/obsidian``, y por
|
|
último cae al layout estándar del PC (``/home/enmanuel/fn_registry``). Así el
|
|
backend funciona en cualquier PC con el layout estándar del registry (la app
|
|
vive en ``<root>/projects/osint/apps/osint_web/server/``) sin hardcodear el
|
|
home de un usuario concreto (memoria ``hardcoded-lucas-paths``).
|
|
"""
|
|
env_functions = os.environ.get("FN_REGISTRY_FUNCTIONS")
|
|
if env_functions and os.path.isdir(os.path.join(env_functions, "obsidian")):
|
|
return env_functions
|
|
|
|
candidates: list[str] = []
|
|
env_root = os.environ.get("FN_REGISTRY_ROOT")
|
|
if env_root:
|
|
candidates.append(env_root)
|
|
current = os.path.dirname(os.path.abspath(__file__))
|
|
while True:
|
|
candidates.append(current)
|
|
parent = os.path.dirname(current)
|
|
if parent == current:
|
|
break
|
|
current = parent
|
|
candidates.append("/home/enmanuel/fn_registry")
|
|
for root in candidates:
|
|
functions_dir = os.path.join(root, "python", "functions")
|
|
if os.path.isdir(os.path.join(functions_dir, "obsidian")):
|
|
return functions_dir
|
|
raise RuntimeError(
|
|
"no se encontró python/functions/obsidian subiendo desde "
|
|
f"{os.path.abspath(__file__)}; define FN_REGISTRY_ROOT con la raíz "
|
|
"del fn_registry"
|
|
)
|
|
|
|
|
|
_FUNCTIONS_DIR = _registry_functions_dir()
|
|
sys.path.insert(0, _FUNCTIONS_DIR)
|
|
|
|
from fastapi import Body, FastAPI, HTTPException, Query # noqa: E402
|
|
from fastapi.responses import FileResponse, JSONResponse # noqa: E402
|
|
from pydantic import BaseModel, Field # noqa: E402
|
|
|
|
# --- Grupo de capacidad obsidian (parseo del vault) ---
|
|
# El paquete obsidian tiene un __init__ ligero (sin dependencias pesadas), así
|
|
# que se importa directamente.
|
|
from obsidian import ( # noqa: E402 (sys.path debe resolverse antes)
|
|
build_obsidian_graph,
|
|
create_obsidian_note,
|
|
delete_obsidian_note,
|
|
extract_obsidian_embeds,
|
|
list_obsidian_notes,
|
|
read_obsidian_note,
|
|
resolve_obsidian_embed,
|
|
search_obsidian_notes,
|
|
slugify_obsidian_name,
|
|
update_obsidian_note,
|
|
)
|
|
|
|
|
|
def _load_infra_fn(module_name: str, attr: str):
|
|
"""Carga una función del paquete ``infra`` por archivo, sin tocar su __init__.
|
|
|
|
El ``infra/__init__.py`` del registry importa de forma eager
|
|
``generate_app_icon`` (que necesita Pillow/PIL) y otros módulos pesados que
|
|
esta app no usa. Importar ``from infra.dav_list_resources import ...``
|
|
arrastraría ese __init__ y exigiría PIL como dependencia. Para evitarlo, se
|
|
carga cada módulo concreto del grupo ``dav`` directamente por su ruta de
|
|
archivo con ``importlib``, sin ejecutar el __init__ del paquete. Sigue siendo
|
|
registry-first: se usa la función del registry sin reimplementarla, solo se
|
|
importa de forma quirúrgica.
|
|
"""
|
|
file_path = os.path.join(_FUNCTIONS_DIR, "infra", module_name + ".py")
|
|
spec = importlib.util.spec_from_file_location("infra_%s" % module_name, file_path)
|
|
if spec is None or spec.loader is None: # pragma: no cover - defensivo
|
|
raise ImportError("no se pudo cargar %s" % file_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return getattr(module, attr)
|
|
|
|
|
|
# --- Grupo de capacidad dav (CardDAV / CalDAV contra Xandikos) + pass ---
|
|
# dav_get_collection trae TODOS los recursos (vCards / VCALENDARs) de una
|
|
# colección en UNA petición REPORT con el contenido inline; dav_collection_ctag
|
|
# lee el ctag de la colección (PROPFIND Depth:0 barato) para validar la caché en
|
|
# disco sin descargar nada cuando nada cambió.
|
|
dav_get_collection = _load_infra_fn("dav_get_collection", "dav_get_collection")
|
|
dav_collection_ctag = _load_infra_fn("dav_collection_ctag", "dav_collection_ctag")
|
|
split_vcards = _load_infra_fn("split_vcards", "split_vcards")
|
|
pass_get_secret = _load_infra_fn("pass_get_secret", "pass_get_secret")
|
|
# Escritura CardDAV: PUT (crear/editar) y DELETE (borrar) un vCard. El cambio en
|
|
# el vault .md es la fuente de verdad; estas reflejan el cambio en Xandikos de
|
|
# inmediato para que la app y el móvil lo vean ya, sin esperar al sync periódico.
|
|
carddav_put_vcard = _load_infra_fn("carddav_put_vcard", "carddav_put_vcard")
|
|
dav_delete_resource = _load_infra_fn("dav_delete_resource", "dav_delete_resource")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuración Xandikos (CardDAV / CalDAV)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
XANDIKOS_BASE_URL = "https://dav-eedeb681c4ab89ab8e444ac9.organic-machine.com"
|
|
XANDIKOS_USERNAME = "enmanuel"
|
|
XANDIKOS_PASS_ENTRY = "dav/xandikos-enmanuel"
|
|
XANDIKOS_CONTACTS_COLLECTION = "/enmanuel/contacts/addressbook/"
|
|
XANDIKOS_CALENDAR_COLLECTION = "/enmanuel/calendars/calendar/"
|
|
|
|
# Caché en disco de los datos DAV ya parseados, indexada por el ctag de la
|
|
# colección. Al arrancar el server lee el ctag (PROPFIND barato ~0.1s) y, si
|
|
# coincide con el de la caché en disco, sirve los contactos/eventos ya parseados
|
|
# sin descargar ni reparsear nada (arranque instantáneo). El directorio vive
|
|
# junto al server y está gitignored (datos personales sensibles + regenerable).
|
|
_CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".cache")
|
|
_CONTACTS_CACHE_FILE = os.path.join(_CACHE_DIR, "contacts.json")
|
|
_CALENDAR_CACHE_FILE = os.path.join(_CACHE_DIR, "calendar.json")
|
|
|
|
# Extensiones de imagen que el frontend muestra en la galería con lightbox.
|
|
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
|
|
|
|
|
|
class DavUnavailable(Exception):
|
|
"""Xandikos no responde (sin red, timeout, auth caída).
|
|
|
|
Los endpoints DAV la capturan y devuelven un 503 JSON claro, para que un
|
|
fallo de la agenda/calendario NUNCA tumbe el server ni afecte a los
|
|
endpoints del vault, que deben seguir funcionando offline.
|
|
"""
|
|
|
|
|
|
def _attachment_kind(name: str) -> str:
|
|
"""Clasifica un attachment por extensión: ``image`` | ``pdf`` | ``other``."""
|
|
ext = os.path.splitext(name)[1].lower()
|
|
if ext in _IMAGE_EXTS:
|
|
return "image"
|
|
if ext == ".pdf":
|
|
return "pdf"
|
|
return "other"
|
|
|
|
|
|
def _read_pass_secret(entry: str) -> str:
|
|
"""Lee la contraseña (primera línea) de una entrada de ``pass``.
|
|
|
|
Wrapper fino sobre la función del registry ``pass_get_secret_py_infra``
|
|
(grupo ``infra``/``flow-replay``), que ejecuta ``pass show <entry>`` sin
|
|
shell y nunca logea el valor. Convierte su resultado ``{status, value|error}``
|
|
en un ``str`` o un ``RuntimeError`` con mensaje claro, para que los endpoints
|
|
DAV degraden con un error explicable en vez de un 500 silencioso.
|
|
"""
|
|
res = pass_get_secret(entry)
|
|
if res.get("status") != "ok":
|
|
raise RuntimeError(
|
|
"no se pudo leer la entrada '%s' de pass: %s"
|
|
% (entry, res.get("error", "error desconocido"))
|
|
)
|
|
value = res.get("value", "")
|
|
if not value.strip():
|
|
raise RuntimeError("la entrada '%s' de pass está vacía" % entry)
|
|
return value.strip()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Caché en disco de los datos DAV ya parseados (indexada por ctag)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _read_disk_cache(path: str) -> Optional[dict]:
|
|
"""Lee una caché DAV del disco: ``{"ctag": str, "items": list}`` o None.
|
|
|
|
Devuelve None (recargar de la red) ante cualquier problema: archivo
|
|
inexistente, JSON corrupto, o estructura inesperada. Nunca lanza: la caché
|
|
es un acelerador, no una fuente de verdad — si falla, se cae a la descarga.
|
|
"""
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
data = json.load(fh)
|
|
except (OSError, ValueError):
|
|
return None
|
|
if not isinstance(data, dict) or not isinstance(data.get("items"), list):
|
|
return None
|
|
return data
|
|
|
|
|
|
def _write_disk_cache(path: str, ctag: str, items: list) -> None:
|
|
"""Escribe la caché DAV al disco de forma atómica (tmp + rename).
|
|
|
|
Persiste ``{"ctag": ctag, "items": items, "saved_at": epoch}``. Errores de
|
|
escritura se ignoran (no deben tumbar el endpoint): la caché en memoria sigue
|
|
sirviendo y el disco se reintentará en el siguiente refresco.
|
|
"""
|
|
try:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
tmp = path + ".tmp"
|
|
with open(tmp, "w", encoding="utf-8") as fh:
|
|
json.dump(
|
|
{"ctag": ctag, "items": items, "saved_at": time.time()},
|
|
fh,
|
|
ensure_ascii=False,
|
|
)
|
|
os.replace(tmp, path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Estado del servidor: caché del vault + password Xandikos
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class VaultState:
|
|
"""Caché en memoria del vault: grafo agregado + índice slug → nota.
|
|
|
|
Se construye al arrancar y se reconstruye bajo demanda con ``refresh()``
|
|
(botón "refrescar" del frontend → ``POST /api/refresh``). Thread-safe
|
|
mediante un lock sobre la reconstrucción. La password de Xandikos se lee de
|
|
``pass`` perezosamente y se cachea en memoria.
|
|
|
|
Raises:
|
|
FileNotFoundError: si ``vault_dir`` no existe (error claro al arrancar,
|
|
nunca un 500 silencioso).
|
|
NotADirectoryError: si ``vault_dir`` no es un directorio.
|
|
"""
|
|
|
|
def __init__(self, vault_dir: str):
|
|
if not os.path.exists(vault_dir):
|
|
raise FileNotFoundError(f"el vault no existe: {vault_dir}")
|
|
if not os.path.isdir(vault_dir):
|
|
raise NotADirectoryError(f"el vault no es un directorio: {vault_dir}")
|
|
self.vault_dir = os.path.abspath(vault_dir)
|
|
self._vault_real = os.path.realpath(self.vault_dir)
|
|
self._lock = threading.Lock()
|
|
self.graph: dict = {"nodes": [], "edges": []}
|
|
self.note_index: dict = {} # slug -> {"path", "tipo", "label"}
|
|
self._xandikos_password: Optional[str] = None
|
|
# Cachés DAV en memoria (igual que el grafo): se llenan perezosamente al
|
|
# primer acceso y se invalidan en POST /api/refresh. None = sin cargar.
|
|
# Cada caché lleva su ctag para servir del disco sin red cuando la
|
|
# colección no cambió. _force_reload (set por /api/refresh) salta la
|
|
# validación de ctag en el siguiente acceso.
|
|
self._dav_lock = threading.Lock()
|
|
self._contacts_cache: Optional[list] = None
|
|
self._calendar_cache: Optional[list] = None
|
|
self._contacts_ctag: Optional[str] = None
|
|
self._calendar_ctag: Optional[str] = None
|
|
self._force_reload = False
|
|
self.refresh()
|
|
|
|
# --- vault --------------------------------------------------------------
|
|
|
|
def refresh(self) -> dict:
|
|
"""Re-escanea el vault: reconstruye grafo + índice de notas.
|
|
|
|
Devuelve un resumen ``{"nodes": N, "edges": M}`` para el frontend.
|
|
"""
|
|
with self._lock:
|
|
graph = build_obsidian_graph(self.vault_dir, include_dangling=True)
|
|
nodes_by_id = {n["id"]: n for n in graph["nodes"]}
|
|
note_index: dict = {}
|
|
for path in list_obsidian_notes(self.vault_dir):
|
|
slug = os.path.splitext(os.path.basename(path))[0]
|
|
if not slug or slug in note_index:
|
|
continue
|
|
node = nodes_by_id.get(slug, {})
|
|
note_index[slug] = {
|
|
"path": path,
|
|
"tipo": node.get("tipo", "nota"),
|
|
"label": node.get("label", slug),
|
|
}
|
|
self.graph = graph
|
|
self.note_index = note_index
|
|
return {"nodes": len(graph["nodes"]), "edges": len(graph["edges"])}
|
|
|
|
def graph_payload(self) -> dict:
|
|
"""Grafo + conteos por tipo para la leyenda de sigma.js."""
|
|
counts: dict[str, int] = {}
|
|
for node in self.graph["nodes"]:
|
|
counts[node["tipo"]] = counts.get(node["tipo"], 0) + 1
|
|
return {
|
|
"nodes": self.graph["nodes"],
|
|
"edges": self.graph["edges"],
|
|
"counts": counts,
|
|
"total_nodes": len(self.graph["nodes"]),
|
|
"total_edges": len(self.graph["edges"]),
|
|
}
|
|
|
|
def rows_by_tipo(self, tipo: str) -> list:
|
|
"""Filas de la tabla de un tipo: nodos reales (no fantasma) filtrados.
|
|
|
|
Cada fila lleva ``id``, ``label``, ``tipo`` y el ``frontmatter``
|
|
completo — el frontend aplana las columnas que le interesen. Sin
|
|
``tipo`` devuelve todos los nodos reales.
|
|
"""
|
|
rows = []
|
|
for node in self.graph["nodes"]:
|
|
if node.get("dangling"):
|
|
continue
|
|
if tipo and node["tipo"] != tipo:
|
|
continue
|
|
rows.append(
|
|
{
|
|
"id": node["id"],
|
|
"label": node["label"],
|
|
"tipo": node["tipo"],
|
|
"frontmatter": node["frontmatter"],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
def _resolve_embed(self, embed_name: str) -> str:
|
|
"""Resuelve un embed ``![[...]]`` a un path absoluto dentro del vault.
|
|
|
|
El vault osint usa dos formas de embed: por nombre de archivo
|
|
(``![[foto.jpg]]``) y por path relativo al vault
|
|
(``![[attachments/personas/<slug>/foto.png]]``). La función del registry
|
|
``resolve_obsidian_embed`` resuelve solo por basename, así que primero se
|
|
intenta el embed como path literal relativo al vault (cubre la forma con
|
|
ruta), y si no existe se cae al resolutor por basename del registry.
|
|
Devuelve cadena vacía si ninguna forma resuelve.
|
|
"""
|
|
# Forma 1: el embed ya es un path relativo al vault.
|
|
literal = os.path.realpath(os.path.join(self._vault_real, embed_name))
|
|
if self._is_within_vault(literal) and os.path.isfile(literal):
|
|
return literal
|
|
# Forma 2: resolución por basename via función del registry.
|
|
return resolve_obsidian_embed(self.vault_dir, embed_name)
|
|
|
|
def node_detail(self, slug: str):
|
|
"""Ficha completa de un nodo: frontmatter + body + attachments.
|
|
|
|
Los attachments salen de los embeds ``![[...]]`` del cuerpo, resueltos a
|
|
paths reales con ``_resolve_embed`` (que compone ``resolve_obsidian_embed``)
|
|
y devueltos como paths **relativos al vault** (lo que consume
|
|
``/api/attachment``). Un embed que no resuelve se reporta con
|
|
``kind: "missing"`` y path vacío. Devuelve ``None`` si el slug no
|
|
corresponde a ninguna nota del vault.
|
|
"""
|
|
info = self.note_index.get(slug)
|
|
if info is None:
|
|
# Tolerancia: aceptar también nombres sin slugificar.
|
|
info = self.note_index.get(slugify_obsidian_name(slug))
|
|
if info is None:
|
|
return None
|
|
note = read_obsidian_note(info["path"])
|
|
attachments = []
|
|
for name in extract_obsidian_embeds(note["body"]):
|
|
abs_path = self._resolve_embed(name)
|
|
if not abs_path:
|
|
attachments.append({"name": name, "path": "", "kind": "missing"})
|
|
continue
|
|
real = os.path.realpath(abs_path)
|
|
# Defensa en profundidad: solo attachments dentro del vault.
|
|
if not self._is_within_vault(real):
|
|
continue
|
|
rel = os.path.relpath(real, self._vault_real)
|
|
attachments.append(
|
|
{"name": name, "path": rel, "kind": _attachment_kind(abs_path)}
|
|
)
|
|
return {
|
|
"id": os.path.splitext(os.path.basename(info["path"]))[0],
|
|
"tipo": info["tipo"],
|
|
"label": info["label"],
|
|
"frontmatter": note["frontmatter"],
|
|
"body": note["body"],
|
|
"tags": note["tags"],
|
|
"wikilinks": note["wikilinks"],
|
|
"attachments": attachments,
|
|
}
|
|
|
|
def _is_within_vault(self, candidate_real_path: str) -> bool:
|
|
"""True si ``candidate_real_path`` (ya realpath) está dentro del vault.
|
|
|
|
Añade el separador final al vault para que ``/vault-evil`` no cuele como
|
|
prefijo de ``/vault``.
|
|
"""
|
|
return (
|
|
candidate_real_path == self._vault_real
|
|
or candidate_real_path.startswith(self._vault_real + os.sep)
|
|
)
|
|
|
|
def resolve_attachment_path(self, rel_path: str):
|
|
"""Resuelve un path relativo de attachment a absoluto, SOLO dentro del vault.
|
|
|
|
Bloquea path traversal: normaliza con ``realpath`` (colapsa ``..`` y
|
|
sigue symlinks) y exige que el resultado quede estrictamente bajo la
|
|
raíz real del vault. Devuelve ``None`` (→ 403/404) ante cualquier intento
|
|
de salir del vault, paths absolutos, o archivos inexistentes.
|
|
"""
|
|
if not rel_path:
|
|
return None
|
|
candidate = os.path.realpath(os.path.join(self._vault_real, rel_path))
|
|
if candidate == self._vault_real:
|
|
return None
|
|
if not candidate.startswith(self._vault_real + os.sep):
|
|
return None
|
|
if not os.path.isfile(candidate):
|
|
return None
|
|
return candidate
|
|
|
|
def search(self, query: str) -> list:
|
|
"""Búsqueda global: nodos cuyas notas matchean la query (substring).
|
|
|
|
Compone ``search_obsidian_notes`` y mapea cada hit a su nodo (slug,
|
|
label, tipo) + las líneas que matchean.
|
|
"""
|
|
results = []
|
|
for hit in search_obsidian_notes(self.vault_dir, query):
|
|
slug = os.path.splitext(os.path.basename(hit["path"]))[0]
|
|
info = self.note_index.get(slug, {})
|
|
results.append(
|
|
{
|
|
"id": slug,
|
|
"label": info.get("label", slug),
|
|
"tipo": info.get("tipo", "nota"),
|
|
"matches": hit.get("matches", []),
|
|
}
|
|
)
|
|
return results
|
|
|
|
# --- Xandikos -----------------------------------------------------------
|
|
|
|
def xandikos_password(self) -> str:
|
|
"""Password de Xandikos desde ``pass``, cacheada en memoria."""
|
|
with self._lock:
|
|
if self._xandikos_password is None:
|
|
self._xandikos_password = _read_pass_secret(XANDIKOS_PASS_ENTRY)
|
|
return self._xandikos_password
|
|
|
|
def _collection_ctag(self, collection_path: str, password: str) -> Optional[str]:
|
|
"""Lee el ctag de una colección (PROPFIND barato), o None si no se puede.
|
|
|
|
El ctag es el token de versión de la colección: si no cambió, la caché en
|
|
disco sigue vigente. Devolver None significa "no pude validar" → se
|
|
recarga de la red por seguridad (nunca se sirve caché potencialmente
|
|
obsoleta sin confirmación). No lanza.
|
|
"""
|
|
res = dav_collection_ctag(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, collection_path
|
|
)
|
|
return res.get("ctag") if res.get("status") == "ok" else None
|
|
|
|
def _load_collection(
|
|
self,
|
|
collection_path: str,
|
|
content_type: str,
|
|
cache_file: str,
|
|
parse_items,
|
|
) -> tuple:
|
|
"""Carga una colección DAV con caché en disco validada por ctag.
|
|
|
|
Flujo (1 o 2 peticiones, nunca N):
|
|
1. Lee el ctag de la colección (PROPFIND Depth:0, ~0.1s).
|
|
2. Si el ctag coincide con el de la caché en disco (y no hay refresh
|
|
forzado), parsea los items del disco y devuelve sin descargar.
|
|
3. Si no, hace UN REPORT ``dav_get_collection`` que trae todos los
|
|
recursos con su contenido inline, los parsea con ``parse_items`` y
|
|
reescribe la caché en disco con el nuevo ctag.
|
|
|
|
``parse_items(resources) -> list`` transforma la lista
|
|
``[{href, etag, data}]`` del registry en la lista de objetos JSON que
|
|
sirve el endpoint (contactos o eventos).
|
|
|
|
Returns:
|
|
tuple ``(items, ctag)``.
|
|
|
|
Raises:
|
|
DavUnavailable: si Xandikos no responde al REPORT cuando hay que
|
|
descargar (sin red, timeout, auth).
|
|
"""
|
|
password = self.xandikos_password()
|
|
ctag = self._collection_ctag(collection_path, password)
|
|
|
|
# Caché en disco vigente: mismo ctag y sin refresh forzado → sin red.
|
|
if ctag is not None and not self._force_reload:
|
|
disk = _read_disk_cache(cache_file)
|
|
if disk is not None and disk.get("ctag") == ctag:
|
|
return parse_items(disk["items"]), ctag
|
|
|
|
# Hay que (re)descargar: UN REPORT trae todo con el contenido inline.
|
|
got = dav_get_collection(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
collection_path,
|
|
content_type,
|
|
)
|
|
if got.get("status") != "ok":
|
|
raise DavUnavailable("Xandikos no responde: %s" % got.get("error"))
|
|
resources = got.get("resources", [])
|
|
items = parse_items(resources)
|
|
# Persistir en disco para el arranque instantáneo de la próxima vez. Si
|
|
# no obtuvimos ctag, guardamos cadena vacía: nunca matcheará un ctag real,
|
|
# así que la próxima vez se revalidará (cae con elegancia a "siempre
|
|
# recargar" sin romper).
|
|
_write_disk_cache(cache_file, ctag or "", items)
|
|
return items, (ctag or "")
|
|
|
|
@staticmethod
|
|
def _parse_contacts(items: list) -> list:
|
|
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a contactos JSON.
|
|
|
|
Acepta dos formas de ``items``: la lista de recursos del registry (cada
|
|
uno con ``data`` = texto vCard, posible multi-tarjeta) que hay que
|
|
parsear, o la lista de contactos ya parseados (caché en disco), que se
|
|
devuelve tal cual. Se distinguen por la presencia de la clave ``data``.
|
|
"""
|
|
if items and "data" in items[0]:
|
|
contacts: list = []
|
|
for res in items:
|
|
href = res.get("href")
|
|
for card_text in split_vcards(res.get("data", "")):
|
|
card = _vcard_to_json(card_text)
|
|
card["etag"] = res.get("etag")
|
|
card["href"] = href
|
|
contacts.append(card)
|
|
contacts.sort(key=lambda c: (c.get("fn") or c.get("uid") or "").lower())
|
|
return contacts
|
|
return list(items)
|
|
|
|
@staticmethod
|
|
def _parse_events(items: list) -> list:
|
|
"""Parsea ``[{href, etag, data}]`` (o items cacheados) a eventos JSON.
|
|
|
|
Igual criterio que ``_parse_contacts``: si los items llevan ``data`` son
|
|
recursos del registry (texto VCALENDAR) y se parsean; si no, ya son
|
|
eventos cacheados y se devuelven tal cual.
|
|
"""
|
|
if items and "data" in items[0]:
|
|
events: list = []
|
|
for res in items:
|
|
href = res.get("href")
|
|
for event in _vcalendar_to_events(res.get("data", "")):
|
|
event["etag"] = res.get("etag")
|
|
event["href"] = href
|
|
events.append(event)
|
|
events.sort(key=lambda e: e.get("dtstart") or "")
|
|
return events
|
|
return list(items)
|
|
|
|
def contacts(self) -> list:
|
|
"""Contactos del addressbook Xandikos, parseados y cacheados.
|
|
|
|
Caché en dos niveles: memoria (mientras vive el proceso) y disco
|
|
(``.cache/contacts.json``, validada por ctag para arranque instantáneo).
|
|
Al primer acceso descarga TODO en UNA petición REPORT
|
|
(``dav_get_collection``) en vez de un GET por ``.vcf``.
|
|
|
|
Raises:
|
|
RuntimeError: si no se puede leer la password de ``pass``.
|
|
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
|
|
"""
|
|
with self._dav_lock:
|
|
if self._contacts_cache is not None and not self._force_reload:
|
|
return self._contacts_cache
|
|
contacts, ctag = self._load_collection(
|
|
XANDIKOS_CONTACTS_COLLECTION,
|
|
"vcard",
|
|
_CONTACTS_CACHE_FILE,
|
|
self._parse_contacts,
|
|
)
|
|
self._contacts_cache = contacts
|
|
self._contacts_ctag = ctag
|
|
self._maybe_clear_force_reload()
|
|
return contacts
|
|
|
|
def calendar(self, dt_from: str = "", dt_to: str = "") -> list:
|
|
"""Eventos del calendario Xandikos, cacheados; filtrados por rango.
|
|
|
|
Misma caché en dos niveles que ``contacts``. La descarga + parseo
|
|
completos se cachean (UNA petición REPORT); el filtro por ``[from, to]``
|
|
se aplica sobre la caché en cada llamada (barato). Sin ``from``/``to``
|
|
devuelve todos.
|
|
|
|
Raises:
|
|
RuntimeError: si no se puede leer la password de ``pass``.
|
|
DavUnavailable: si Xandikos no responde (sin red, timeout, auth).
|
|
"""
|
|
with self._dav_lock:
|
|
if self._calendar_cache is None or self._force_reload:
|
|
events, ctag = self._load_collection(
|
|
XANDIKOS_CALENDAR_COLLECTION,
|
|
"ical",
|
|
_CALENDAR_CACHE_FILE,
|
|
self._parse_events,
|
|
)
|
|
self._calendar_cache = events
|
|
self._calendar_ctag = ctag
|
|
self._maybe_clear_force_reload()
|
|
all_events = self._calendar_cache
|
|
if not dt_from and not dt_to:
|
|
return list(all_events)
|
|
return [e for e in all_events if _event_in_range(e, dt_from, dt_to)]
|
|
|
|
def _maybe_clear_force_reload(self) -> None:
|
|
"""Apaga el flag de refresh forzado una vez consumido por una recarga.
|
|
|
|
Llamado bajo ``_dav_lock`` tras recargar una colección. El flag lo activa
|
|
``invalidate_dav`` (POST /api/refresh) para forzar UNA recarga que ignore
|
|
el ctag; tras ella vuelve a la validación normal por ctag.
|
|
"""
|
|
self._force_reload = False
|
|
|
|
def invalidate_dav(self) -> None:
|
|
"""Vacía las cachés de contactos y calendario y fuerza una recarga.
|
|
|
|
Limpia las cachés en memoria y marca ``_force_reload`` para que el
|
|
siguiente acceso a cada colección ignore el ctag cacheado y vuelva a
|
|
descargar del servidor (el botón "refrescar" debe traer cambios aunque el
|
|
ctag no se haya actualizado todavía). No borra la password.
|
|
"""
|
|
with self._dav_lock:
|
|
self._contacts_cache = None
|
|
self._calendar_cache = None
|
|
self._force_reload = True
|
|
|
|
# --- Escritura de contactos: ficha .md (verdad) + reflejo en Xandikos ----
|
|
|
|
def _contact_note_path(self, tipo: str, slug: str) -> str:
|
|
"""Path absoluto de la ficha ``.md`` de un contacto en el vault.
|
|
|
|
``personas/<slug>.md`` o ``organizaciones/<slug>.md`` según el tipo.
|
|
"""
|
|
folder = _TIPO_FOLDER.get(tipo, "personas")
|
|
return os.path.join(self._vault_real, folder, slug + ".md")
|
|
|
|
def _put_vcard(self, slug: str, vcard_text: str) -> dict:
|
|
"""Sube (PUT) un vCard a Xandikos por su UID=slug. No lanza por sí sola.
|
|
|
|
Reflejo inmediato del cambio en la ficha del vault, para que el contacto
|
|
se vea ya en la app y en el móvil sin esperar al sync periódico. Devuelve
|
|
el dict ``{status, http_status|error}`` de ``carddav_put_vcard``.
|
|
"""
|
|
password = self.xandikos_password()
|
|
return carddav_put_vcard(
|
|
XANDIKOS_BASE_URL,
|
|
XANDIKOS_USERNAME,
|
|
password,
|
|
XANDIKOS_CONTACTS_COLLECTION,
|
|
slug,
|
|
vcard_text,
|
|
)
|
|
|
|
def _delete_vcard(self, slug: str) -> dict:
|
|
"""Borra (DELETE) el vCard ``<slug>.vcf`` de Xandikos. No lanza.
|
|
|
|
Compone ``dav_delete_resource`` con el href del recurso (mismo nombre que
|
|
usó el PUT: ``<slug>.vcf``). Trata 404 como idempotente (ya no existía →
|
|
objetivo cumplido), igual que un borrado repetido.
|
|
"""
|
|
password = self.xandikos_password()
|
|
resource_path = XANDIKOS_CONTACTS_COLLECTION + slug + ".vcf"
|
|
res = dav_delete_resource(
|
|
XANDIKOS_BASE_URL, XANDIKOS_USERNAME, password, resource_path
|
|
)
|
|
if res.get("status") != "ok" and res.get("http_status") == 404:
|
|
return {"status": "ok", "http_status": 404, "idempotent": True}
|
|
return res
|
|
|
|
def create_contact(self, data: "ContactIn") -> dict:
|
|
"""Crea un contacto: ficha ``.md`` (verdad) + reflejo del vCard.
|
|
|
|
1. Genera el slug del nombre. 409 si ya existe la ficha.
|
|
2. Escribe la ficha ``.md`` con el frontmatter canónico (acción primaria).
|
|
3. Hace PUT del vCard a Xandikos (reflejo inmediato; un fallo NO revierte
|
|
la ficha — el sync periódico reconciliará).
|
|
4. Invalida las cachés DAV para que el contacto aparezca ya en la app.
|
|
|
|
Returns:
|
|
dict ``{slug, uid, path, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(409): si ya existe una ficha con ese slug.
|
|
HTTPException(400): si el tipo no es 'persona'|'organizacion' o el
|
|
nombre está vacío.
|
|
"""
|
|
tipo = (data.tipo or "persona").strip()
|
|
if tipo not in _TIPO_FOLDER:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="tipo inválido '%s' (persona|organizacion)" % tipo,
|
|
)
|
|
if not data.nombre.strip():
|
|
raise HTTPException(status_code=400, detail="el nombre es obligatorio")
|
|
slug = slugify_obsidian_name(data.nombre)
|
|
if not slug:
|
|
raise HTTPException(
|
|
status_code=400, detail="el nombre no produce un slug válido"
|
|
)
|
|
note_path = self._contact_note_path(tipo, slug)
|
|
if os.path.exists(note_path):
|
|
raise HTTPException(
|
|
status_code=409, detail="ya existe un contacto con slug '%s'" % slug
|
|
)
|
|
frontmatter = _contact_frontmatter(data, slug)
|
|
body = _contact_body(data.notas)
|
|
folder = _TIPO_FOLDER[tipo]
|
|
# Acción primaria: la ficha del vault es la fuente de verdad.
|
|
create_obsidian_note(
|
|
self.vault_dir,
|
|
os.path.join(folder, slug),
|
|
body=body,
|
|
frontmatter=frontmatter,
|
|
)
|
|
# Reflejo inmediato en Xandikos (no rompe el alta si Xandikos cae).
|
|
vcard_fm = dict(frontmatter)
|
|
vcard_fm["_notas"] = _norm_str(data.notas)
|
|
dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug))
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "uid": slug, "path": note_path, "dav": dav}
|
|
|
|
def update_contact(self, slug: str, data: "ContactIn") -> dict:
|
|
"""Edita un contacto existente: merge del frontmatter + re-PUT del vCard.
|
|
|
|
Localiza la ficha por slug (en personas/ u organizaciones/ según el tipo
|
|
de la ficha actual). 404 si no existe. Hace merge de los campos editables
|
|
sobre el frontmatter actual (preserva campos heredados no tocados como
|
|
``sexo``, ``fecha_nacimiento``, ``horoscopo``), reescribe el body
|
|
``## Notas`` y re-sube el vCard. Invalida las cachés DAV.
|
|
|
|
Returns:
|
|
dict ``{slug, uid, path, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(404): si no existe la ficha del contacto.
|
|
"""
|
|
path = self._find_contact_note(slug)
|
|
if path is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % slug
|
|
)
|
|
note = read_obsidian_note(path)
|
|
current = dict(note.get("frontmatter") or {})
|
|
# Merge de los campos editables (preserva los heredados no tocados).
|
|
merged = {
|
|
"nombre": data.nombre.strip() or current.get("nombre") or slug,
|
|
"aliases": _norm_list(data.aliases),
|
|
"telefono": _norm_str(data.telefono),
|
|
"email": _norm_str(data.email),
|
|
"direccion": _norm_str(data.direccion),
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": _norm_list(data.relaciones),
|
|
"contexto": _norm_str(data.contexto),
|
|
}
|
|
if current.get("tipo") != "organizacion":
|
|
merged["dni"] = _norm_str(data.dni)
|
|
current.update(merged)
|
|
update_obsidian_note(
|
|
path, body=_contact_body(data.notas), set_frontmatter=current
|
|
)
|
|
vcard_fm = dict(current)
|
|
vcard_fm["_notas"] = _norm_str(data.notas)
|
|
dav = self._put_vcard(slug, _build_vcard(vcard_fm, slug))
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "uid": slug, "path": path, "dav": dav}
|
|
|
|
def delete_contact(self, slug: str) -> dict:
|
|
"""Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos.
|
|
|
|
404 si la ficha no existe. Borra el archivo ``.md`` (acción primaria) y
|
|
el recurso ``<slug>.vcf`` de Xandikos (reflejo). Invalida las cachés DAV.
|
|
|
|
Returns:
|
|
dict ``{slug, deleted: True, dav}``.
|
|
|
|
Raises:
|
|
HTTPException(404): si no existe la ficha del contacto.
|
|
"""
|
|
path = self._find_contact_note(slug)
|
|
if path is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % slug
|
|
)
|
|
delete_obsidian_note(path)
|
|
dav = self._delete_vcard(slug)
|
|
self.refresh()
|
|
self.invalidate_dav()
|
|
return {"slug": slug, "deleted": True, "dav": dav}
|
|
|
|
def _find_contact_note(self, slug: str):
|
|
"""Localiza la ficha ``.md`` de un contacto por slug, o None.
|
|
|
|
Busca ``personas/<slug>.md`` y ``organizaciones/<slug>.md`` (los dos
|
|
tipos de contacto). Devuelve el primer path existente o None.
|
|
"""
|
|
for folder in ("personas", "organizaciones"):
|
|
candidate = os.path.join(self._vault_real, folder, slug + ".md")
|
|
if os.path.isfile(candidate):
|
|
return candidate
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers DAV: parseo ligero de vCard / iCalendar a JSON
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _unescape_ical(value: str) -> str:
|
|
"""Des-escapa los caracteres de un valor iCalendar/vCard (RFC 5545/6350)."""
|
|
return (
|
|
value.replace("\\n", "\n")
|
|
.replace("\\N", "\n")
|
|
.replace("\\,", ",")
|
|
.replace("\\;", ";")
|
|
.replace("\\\\", "\\")
|
|
)
|
|
|
|
|
|
def _unfold_lines(text: str) -> list:
|
|
"""Des-pliega las líneas continuadas (folding) de un vCard/iCalendar.
|
|
|
|
RFC 5545/6350: una línea que empieza por espacio o tab es continuación de la
|
|
anterior. Esta función las une para parsearlas como propiedades completas.
|
|
"""
|
|
raw_lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
|
|
unfolded: list = []
|
|
for line in raw_lines:
|
|
if line[:1] in (" ", "\t") and unfolded:
|
|
unfolded[-1] += line[1:]
|
|
else:
|
|
unfolded.append(line)
|
|
return unfolded
|
|
|
|
|
|
def _parse_property(line: str) -> Optional[tuple]:
|
|
"""Parsea una línea de propiedad vCard/iCal a ``(nombre, params, valor)``.
|
|
|
|
Formato: ``[itemN.]NAME;PARAM=val;PARAM2=val:value``. Devuelve ``None`` si
|
|
la línea no es una propiedad (sin ``:``). El nombre se devuelve en
|
|
mayúsculas y SIN el prefijo de grupo ``itemN.`` / ``GRUPO.`` que añaden
|
|
Apple/Google a las propiedades agrupadas (``item1.TEL``, ``item2.EMAIL``);
|
|
los params como dict con claves en mayúsculas.
|
|
"""
|
|
if ":" not in line:
|
|
return None
|
|
head, value = line.split(":", 1)
|
|
parts = head.split(";")
|
|
name = parts[0].strip()
|
|
# Quitar el prefijo de grupo "itemN." / "GRUPO." (vCard property grouping).
|
|
if "." in name:
|
|
name = name.rsplit(".", 1)[-1]
|
|
name = name.upper()
|
|
params: dict = {}
|
|
for part in parts[1:]:
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
params[k.strip().upper()] = v.strip()
|
|
return name, params, value
|
|
|
|
|
|
def _vcard_to_json(vcard_text: str) -> dict:
|
|
"""Convierte un VCARD a un dict JSON con los campos de interés.
|
|
|
|
Extrae: uid, nombre completo (FN o N reordenado), alias (NICKNAME),
|
|
teléfonos (TEL), emails (EMAIL), organización (ORG), nota (NOTE) y el bloque
|
|
``osint`` con todas las propiedades ``X-OSINT-*`` (la clave es el sufijo en
|
|
minúsculas: ``X-OSINT-DNI`` → ``osint.dni``, ``X-OSINT-PAIS`` →
|
|
``osint.pais``). Parseo ligero a mano (sin dependencia de vobject); el vCard
|
|
ya viene troceado por ``split_vcards``.
|
|
|
|
Expone tanto las claves en español que consume el frontend del task
|
|
(``nombre``, ``alias``, ``nota``, ``telefonos``) como las formas tipadas con
|
|
tipo (``phones``, ``emails`` como objetos ``{value, type}``), para no atar el
|
|
frontend a un único shape.
|
|
"""
|
|
out: dict = {
|
|
"uid": None,
|
|
"fn": None,
|
|
"nickname": None,
|
|
"org": None,
|
|
"note": None,
|
|
"phones": [],
|
|
"emails": [],
|
|
"osint": {},
|
|
}
|
|
for line in _unfold_lines(vcard_text):
|
|
parsed = _parse_property(line)
|
|
if not parsed:
|
|
continue
|
|
name, params, value = parsed
|
|
value = _unescape_ical(value.strip())
|
|
if name == "UID":
|
|
out["uid"] = value
|
|
elif name == "FN":
|
|
out["fn"] = value
|
|
elif name == "NICKNAME":
|
|
out["nickname"] = value
|
|
elif name == "ORG":
|
|
out["org"] = value.replace(";", " ").strip()
|
|
elif name == "NOTE":
|
|
out["note"] = value
|
|
elif name == "TEL":
|
|
out["phones"].append({"value": value, "type": params.get("TYPE", "")})
|
|
elif name == "EMAIL":
|
|
out["emails"].append({"value": value, "type": params.get("TYPE", "")})
|
|
elif name.startswith("X-OSINT-"):
|
|
key = name[len("X-OSINT-") :].lower().replace("-", "_")
|
|
if key:
|
|
out["osint"][key] = value
|
|
elif name == "N" and not out["fn"]:
|
|
# Nombre estructurado Apellido;Nombre;... -> "Nombre Apellido".
|
|
comps = [c for c in value.split(";") if c]
|
|
if len(comps) >= 2:
|
|
out["fn"] = ("%s %s" % (comps[1], comps[0])).strip()
|
|
elif comps:
|
|
out["fn"] = comps[0]
|
|
# Alias en español que consume el frontend del task (mismo dato, otra clave).
|
|
out["nombre"] = out["fn"]
|
|
out["alias"] = out["nickname"]
|
|
out["nota"] = out["note"]
|
|
out["telefonos"] = [p["value"] for p in out["phones"]]
|
|
out["correos"] = [e["value"] for e in out["emails"]]
|
|
return out
|
|
|
|
|
|
_VEVENT_RE = re.compile(r"BEGIN:VEVENT(.*?)END:VEVENT", re.DOTALL | re.IGNORECASE)
|
|
|
|
|
|
def _vevent_to_json(vevent_block: str) -> dict:
|
|
"""Convierte un bloque VEVENT a un dict JSON con los campos de interés.
|
|
|
|
Extrae: uid, summary, dtstart, dtend, location, description. Las fechas se
|
|
devuelven tal cual vienen del servidor (formato iCal, ej. ``20260611T090000Z``
|
|
o ``20260611``); el frontend las formatea a europeo. Parseo ligero a mano.
|
|
"""
|
|
out: dict = {
|
|
"uid": None,
|
|
"summary": None,
|
|
"dtstart": None,
|
|
"dtend": None,
|
|
"location": None,
|
|
"description": None,
|
|
}
|
|
for line in _unfold_lines(vevent_block):
|
|
parsed = _parse_property(line)
|
|
if not parsed:
|
|
continue
|
|
name, _params, value = parsed
|
|
value = value.strip()
|
|
if name == "UID":
|
|
out["uid"] = value
|
|
elif name == "SUMMARY":
|
|
out["summary"] = _unescape_ical(value)
|
|
elif name == "DTSTART":
|
|
out["dtstart"] = value
|
|
elif name == "DTEND":
|
|
out["dtend"] = value
|
|
elif name == "LOCATION":
|
|
out["location"] = _unescape_ical(value)
|
|
elif name == "DESCRIPTION":
|
|
out["description"] = _unescape_ical(value)
|
|
return out
|
|
|
|
|
|
def _vcalendar_to_events(vcalendar_text: str) -> list:
|
|
"""Extrae todos los VEVENT de un VCALENDAR y los convierte a JSON."""
|
|
events = []
|
|
for block in _VEVENT_RE.findall(vcalendar_text):
|
|
events.append(_vevent_to_json("BEGIN:VEVENT" + block + "END:VEVENT"))
|
|
return events
|
|
|
|
|
|
def _event_in_range(event: dict, dt_from: str, dt_to: str) -> bool:
|
|
"""True si el evento cae (por DTSTART) dentro de ``[dt_from, dt_to]``.
|
|
|
|
Comparación lexicográfica sobre el prefijo ``YYYYMMDD`` que comparten todos
|
|
los formatos iCal (date y date-time). Los límites se normalizan quitando los
|
|
guiones, así acepta tanto el formato documentado del endpoint
|
|
(``2026-06-11``) como el iCal crudo (``20260611``). ``dt_from``/``dt_to``
|
|
vacíos desactivan ese extremo del filtro.
|
|
"""
|
|
dtstart = (event.get("dtstart") or "").replace("-", "")[:8]
|
|
if not dtstart:
|
|
return True
|
|
if dt_from and dtstart < dt_from.replace("-", "")[:8]:
|
|
return False
|
|
if dt_to and dtstart > dt_to.replace("-", "")[:8]:
|
|
return False
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Escritura de contactos: ficha .md del vault (fuente de verdad) + vCard
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Subcarpeta del vault por tipo de contacto. La fuente de verdad de un contacto
|
|
# es su ficha en el vault (CONVENTIONS.md §3b para persona, §6 para organización);
|
|
# Xandikos es solo el retransmisor al móvil.
|
|
_TIPO_FOLDER = {"persona": "personas", "organizacion": "organizaciones"}
|
|
|
|
# Tags por defecto de cada tipo (CONVENTIONS.md). Se preservan si la ficha ya
|
|
# trae otros tags al editar.
|
|
_TIPO_TAGS = {
|
|
"persona": ["persona", "osint"],
|
|
"organizacion": ["organizacion", "osint"],
|
|
}
|
|
|
|
|
|
class ContactIn(BaseModel):
|
|
"""Cuerpo de POST/PUT de un contacto (persona u organización).
|
|
|
|
Refleja el esquema canónico del frontmatter (CONVENTIONS.md §3b/§6). Los
|
|
campos vacíos se normalizan a ``null``/``[]`` al escribir la ficha, nunca se
|
|
omiten, para que el score de completitud sea consistente.
|
|
"""
|
|
|
|
tipo: str = Field(default="persona")
|
|
nombre: str
|
|
aliases: list[str] = Field(default_factory=list)
|
|
telefono: Optional[str] = None
|
|
email: Optional[str] = None
|
|
dni: Optional[str] = None
|
|
direccion: Optional[str] = None
|
|
pais: Optional[str] = None
|
|
contexto: Optional[str] = None
|
|
relaciones: list[str] = Field(default_factory=list)
|
|
notas: Optional[str] = None
|
|
|
|
|
|
def _norm_str(value: Optional[str]) -> Optional[str]:
|
|
"""Normaliza un string opcional: trim; cadena vacía → None."""
|
|
if value is None:
|
|
return None
|
|
value = value.strip()
|
|
return value or None
|
|
|
|
|
|
def _norm_list(values: Optional[list]) -> list:
|
|
"""Normaliza una lista de strings: trim cada item y descarta los vacíos."""
|
|
if not values:
|
|
return []
|
|
out = []
|
|
for v in values:
|
|
s = (v or "").strip()
|
|
if s:
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def _contact_frontmatter(data: "ContactIn", slug: str) -> dict:
|
|
"""Construye el frontmatter canónico de la ficha de un contacto.
|
|
|
|
Para ``tipo: persona`` sigue el esquema completo de CONVENTIONS.md §3b
|
|
(todos los campos presentes, ``null``/``[]`` si vacíos). Para
|
|
``tipo: organizacion`` usa el subconjunto de §6. El orden de claves se
|
|
preserva (``yaml.safe_dump(sort_keys=False)`` en ``format_obsidian_note``).
|
|
"""
|
|
nombre = data.nombre.strip()
|
|
aliases = _norm_list(data.aliases)
|
|
relaciones = _norm_list(data.relaciones)
|
|
if data.tipo == "organizacion":
|
|
return {
|
|
"tipo": "organizacion",
|
|
"nombre": nombre,
|
|
"slug": slug,
|
|
"aliases": aliases,
|
|
"telefono": _norm_str(data.telefono),
|
|
"email": _norm_str(data.email),
|
|
"direccion": _norm_str(data.direccion),
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": relaciones,
|
|
"contexto": _norm_str(data.contexto),
|
|
"fuente": "osint_web (alta manual)",
|
|
"tags": list(_TIPO_TAGS["organizacion"]),
|
|
}
|
|
# Persona: esquema canónico §3b.
|
|
return {
|
|
"tipo": "persona",
|
|
"nombre": nombre,
|
|
"slug": slug,
|
|
"aliases": aliases,
|
|
"sexo": None,
|
|
"fecha_nacimiento": None,
|
|
"dni": _norm_str(data.dni),
|
|
"telefono": _norm_str(data.telefono),
|
|
"email": _norm_str(data.email),
|
|
"direccion": _norm_str(data.direccion),
|
|
"pais": _norm_str(data.pais),
|
|
"relaciones": relaciones,
|
|
"contexto": _norm_str(data.contexto),
|
|
"fuente": "osint_web (alta manual)",
|
|
"tags": list(_TIPO_TAGS["persona"]),
|
|
}
|
|
|
|
|
|
def _contact_body(notas: Optional[str]) -> str:
|
|
"""Cuerpo Markdown de la ficha: sección ``## Notas`` con el texto libre."""
|
|
notas = _norm_str(notas)
|
|
if notas:
|
|
return "## Notas\n\n%s\n" % notas
|
|
return "## Notas\n"
|
|
|
|
|
|
def _vcard_escape(value: str) -> str:
|
|
"""Escapa un valor de texto para una línea vCard (RFC 6350)."""
|
|
return (
|
|
value.replace("\\", "\\\\")
|
|
.replace("\n", "\\n")
|
|
.replace(",", "\\,")
|
|
.replace(";", "\\;")
|
|
)
|
|
|
|
|
|
def _build_vcard(frontmatter: dict, slug: str) -> str:
|
|
"""Serializa un frontmatter de contacto a un VCARD 3.0 con el UID = slug.
|
|
|
|
Mapea: nombre→FN, aliases→NICKNAME, telefono→TEL, email→EMAIL, notas→NOTE,
|
|
organización→ORG; y los campos OSINT (dni, direccion, pais, contexto, sexo,
|
|
fecha_nacimiento) a propiedades ``X-OSINT-*`` que el parser ``_vcard_to_json``
|
|
ya entiende. El UID es el slug → idempotente: re-subir el mismo slug
|
|
sobrescribe el recurso ``<slug>.vcf``.
|
|
"""
|
|
nombre = (frontmatter.get("nombre") or slug).strip()
|
|
lines = [
|
|
"BEGIN:VCARD",
|
|
"VERSION:3.0",
|
|
"UID:%s" % slug,
|
|
"FN:%s" % _vcard_escape(nombre),
|
|
]
|
|
aliases = frontmatter.get("aliases") or []
|
|
if aliases:
|
|
lines.append("NICKNAME:%s" % _vcard_escape(",".join(str(a) for a in aliases)))
|
|
if frontmatter.get("tipo") == "organizacion":
|
|
lines.append("ORG:%s" % _vcard_escape(nombre))
|
|
tel = frontmatter.get("telefono")
|
|
if tel:
|
|
lines.append("TEL;TYPE=CELL:%s" % _vcard_escape(str(tel)))
|
|
email = frontmatter.get("email")
|
|
if email:
|
|
lines.append("EMAIL;TYPE=INTERNET:%s" % _vcard_escape(str(email)))
|
|
# Campos OSINT → X-OSINT-* (los recoge _vcard_to_json en el bloque osint).
|
|
for fm_key, x_name in (
|
|
("dni", "X-OSINT-DNI"),
|
|
("direccion", "X-OSINT-DIRECCION"),
|
|
("pais", "X-OSINT-PAIS"),
|
|
("contexto", "X-OSINT-CONTEXTO"),
|
|
("sexo", "X-OSINT-SEXO"),
|
|
("fecha_nacimiento", "X-OSINT-FECHA-NACIMIENTO"),
|
|
):
|
|
val = frontmatter.get(fm_key)
|
|
if val:
|
|
lines.append("%s:%s" % (x_name, _vcard_escape(str(val))))
|
|
notas = frontmatter.get("_notas")
|
|
if notas:
|
|
lines.append("NOTE:%s" % _vcard_escape(str(notas)))
|
|
lines.append("END:VCARD")
|
|
return "\r\n".join(lines) + "\r\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construcción de la app FastAPI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def create_app(vault_dir: str) -> FastAPI:
|
|
"""Crea la app FastAPI ligada a un vault concreto.
|
|
|
|
Valida que el vault existe al construir el ``VaultState`` (no un 500
|
|
silencioso en el primer request). Registra todos los endpoints sobre un
|
|
estado compartido en memoria.
|
|
"""
|
|
state = VaultState(vault_dir)
|
|
app = FastAPI(title="osint_web", version="0.1.0")
|
|
app.state.vault = state
|
|
|
|
# -- Vault --
|
|
|
|
@app.get("/api/health")
|
|
def health() -> dict:
|
|
"""Health check: confirma que el servidor está vivo y el vault cargado."""
|
|
return {
|
|
"status": "ok",
|
|
"vault": state.vault_dir,
|
|
"nodes": len(state.graph["nodes"]),
|
|
"edges": len(state.graph["edges"]),
|
|
}
|
|
|
|
@app.get("/api/graph")
|
|
def api_graph() -> dict:
|
|
"""Grafo completo del vault (nodos + aristas + conteos) para sigma.js.
|
|
|
|
Cacheado en memoria; usar ``/api/refresh`` para recargar tras editar
|
|
notas.
|
|
"""
|
|
return state.graph_payload()
|
|
|
|
@app.get("/api/nodes")
|
|
def api_nodes(tipo: str = Query("", description="tipo de nodo a filtrar")) -> dict:
|
|
"""Filas de la tabla de un tipo concreto (frontmatter aplanado).
|
|
|
|
Devuelve solo los nodos reales (no fantasma). Sin ``tipo`` devuelve
|
|
todos.
|
|
"""
|
|
rows = state.rows_by_tipo(tipo)
|
|
return {"tipo": tipo, "count": len(rows), "rows": rows}
|
|
|
|
@app.get("/api/node/{slug}")
|
|
def api_node(slug: str) -> dict:
|
|
"""Ficha de un nodo: frontmatter + body + lista de attachments."""
|
|
detail = state.node_detail(slug)
|
|
if detail is None:
|
|
raise HTTPException(status_code=404, detail="nodo '%s' no encontrado" % slug)
|
|
return detail
|
|
|
|
@app.get("/api/attachment")
|
|
def api_attachment(
|
|
path: str = Query(..., description="path relativo al vault"),
|
|
) -> FileResponse:
|
|
"""Sirve el binario de un attachment, con allowlist ESTRICTA al vault.
|
|
|
|
El ``path`` es relativo al vault. Se resuelve a su realpath y se verifica
|
|
que cae dentro del vault: cualquier intento de salir
|
|
(``../../etc/passwd``, symlink fuera del vault) devuelve 403. Si el
|
|
archivo no existe (pero está dentro del vault), 404.
|
|
"""
|
|
abs_path = state.resolve_attachment_path(path)
|
|
if abs_path is None:
|
|
# Distinguimos traversal (403) de inexistente-dentro-del-vault (404).
|
|
candidate = os.path.realpath(os.path.join(state._vault_real, path or ""))
|
|
if state._is_within_vault(candidate) and candidate != state._vault_real:
|
|
raise HTTPException(status_code=404, detail="attachment no encontrado")
|
|
raise HTTPException(status_code=403, detail="path fuera del vault")
|
|
return FileResponse(abs_path)
|
|
|
|
@app.get("/api/search")
|
|
def api_search(q: str = Query(..., min_length=1)) -> dict:
|
|
"""Nodos del grafo cuyas notas matchean la query (substring)."""
|
|
results = state.search(q)
|
|
return {"query": q, "count": len(results), "results": results}
|
|
|
|
# -- Xandikos: contactos (CardDAV) --
|
|
|
|
@app.get("/api/contacts")
|
|
def api_contacts() -> JSONResponse:
|
|
"""Contactos del addressbook Xandikos, parseados a JSON (cacheados).
|
|
|
|
Cada contacto: ``{uid, nombre, alias, nota, org, telefonos[], emails[],
|
|
osint{dni,pais,sexo,...}}`` (+ las formas tipadas ``phones``/``emails``).
|
|
La lista se cachea en memoria al primer acceso (``POST /api/refresh`` la
|
|
invalida). Si Xandikos no responde o falta la password → 503 con un JSON
|
|
de error claro, nunca un crash.
|
|
"""
|
|
try:
|
|
contacts = state.contacts()
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={"status": "ok", "count": len(contacts), "contacts": contacts}
|
|
)
|
|
|
|
@app.get("/api/contact/{uid}")
|
|
def api_contact(uid: str) -> JSONResponse:
|
|
"""Un contacto concreto (por UID) parseado a JSON, desde la caché.
|
|
|
|
Resuelve sobre la lista cacheada de ``/api/contacts`` (mismo parseo
|
|
completo, todos los campos). 404 si el UID no existe; 503 si Xandikos no
|
|
responde o falta la password.
|
|
"""
|
|
try:
|
|
contacts = state.contacts()
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
match = next((c for c in contacts if c.get("uid") == uid), None)
|
|
if match is None:
|
|
# Tolerancia: aceptar también el segmento final del href (nombre del
|
|
# recurso .vcf) cuando el UID no coincide literalmente.
|
|
match = next(
|
|
(
|
|
c
|
|
for c in contacts
|
|
if uid in (c.get("href") or "").rsplit("/", 1)[-1]
|
|
),
|
|
None,
|
|
)
|
|
if match is None:
|
|
raise HTTPException(
|
|
status_code=404, detail="contacto '%s' no encontrado" % uid
|
|
)
|
|
return JSONResponse(content={"status": "ok", "contact": match})
|
|
|
|
# -- Contactos: CRUD (ficha .md del vault = verdad, vCard = reflejo) --
|
|
|
|
@app.post("/api/contact")
|
|
def api_create_contact(data: ContactIn = Body(...)) -> JSONResponse:
|
|
"""Crea un contacto: escribe la ficha ``.md`` del vault + el vCard.
|
|
|
|
La ficha del vault es la fuente de verdad (CONVENTIONS.md §3b/§6);
|
|
Xandikos se actualiza de inmediato para que el contacto se vea ya en la
|
|
app y en el móvil. 409 si el slug ya existe. Devuelve ``{slug, uid}``.
|
|
"""
|
|
result = state.create_contact(data)
|
|
return JSONResponse(status_code=201, content={"status": "ok", **result})
|
|
|
|
@app.put("/api/contact/{slug}")
|
|
def api_update_contact(slug: str, data: ContactIn = Body(...)) -> JSONResponse:
|
|
"""Edita la ficha de un contacto (merge frontmatter) + re-sube el vCard.
|
|
|
|
404 si no existe la ficha. Preserva campos heredados no editables
|
|
(``sexo``, ``fecha_nacimiento``, ...). Devuelve ``{slug, uid}``.
|
|
"""
|
|
result = state.update_contact(slug, data)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
@app.delete("/api/contact/{slug}")
|
|
def api_delete_contact(slug: str) -> JSONResponse:
|
|
"""Borra un contacto: elimina la ficha ``.md`` + el vCard en Xandikos.
|
|
|
|
404 si no existe la ficha. Devuelve confirmación ``{slug, deleted}``.
|
|
"""
|
|
result = state.delete_contact(slug)
|
|
return JSONResponse(content={"status": "ok", **result})
|
|
|
|
# -- Xandikos: calendario (CalDAV) --
|
|
|
|
@app.get("/api/calendar")
|
|
def api_calendar(
|
|
from_: str = Query("", alias="from", description="fecha inicio YYYY-MM-DD"),
|
|
to: str = Query("", description="fecha fin YYYY-MM-DD"),
|
|
) -> JSONResponse:
|
|
"""Eventos del calendario Xandikos en ``[from, to]`` (cacheados).
|
|
|
|
Cada evento: ``{uid, summary, dtstart, dtend, location, description}``.
|
|
La descarga + parseo completos se cachean (``POST /api/refresh`` los
|
|
invalida); el filtro por rango se aplica sobre la caché. Sin ``from``/
|
|
``to`` devuelve todos. Si Xandikos no responde o falta la password →
|
|
503 con JSON de error claro, nunca un crash.
|
|
"""
|
|
try:
|
|
events = state.calendar(from_, to)
|
|
except (RuntimeError, DavUnavailable) as exc:
|
|
return JSONResponse(
|
|
status_code=503, content={"status": "error", "error": str(exc)}
|
|
)
|
|
return JSONResponse(
|
|
content={"status": "ok", "count": len(events), "events": events}
|
|
)
|
|
|
|
# -- Refresco de cachés --
|
|
|
|
@app.post("/api/refresh")
|
|
def api_refresh() -> dict:
|
|
"""Reconstruye la caché del grafo del vault e invalida las cachés DAV.
|
|
|
|
Re-escanea el vault (grafo + tablas) y vacía las cachés de contactos y
|
|
calendario, que se recargarán perezosamente en el siguiente acceso.
|
|
Devuelve el conteo del grafo recién reconstruido.
|
|
"""
|
|
summary = state.refresh()
|
|
state.invalidate_dav()
|
|
return {"status": "refreshed", **summary}
|
|
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_args(argv: Optional[list] = None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(
|
|
description="Backend osint_web: sirve el vault osint + agenda/calendario Xandikos."
|
|
)
|
|
parser.add_argument(
|
|
"--vault",
|
|
default=os.path.expanduser("~/Obsidian/osint"),
|
|
help="ruta al vault de Obsidian osint (default: ~/Obsidian/osint)",
|
|
)
|
|
parser.add_argument(
|
|
"--host",
|
|
default="127.0.0.1",
|
|
help="host de escucha (default: 127.0.0.1 — NO cambiar: datos sensibles)",
|
|
)
|
|
parser.add_argument(
|
|
"--port", type=int, default=8470, help="puerto local (default: 8470)"
|
|
)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def main(argv: Optional[list] = None) -> int:
|
|
args = _parse_args(argv)
|
|
if args.host != "127.0.0.1":
|
|
# Seguridad: el vault tiene PII (DNIs, fotos). Nunca exponer a red.
|
|
print(
|
|
"ADVERTENCIA: --host distinto de 127.0.0.1. El vault contiene datos "
|
|
"personales sensibles; exponerlo a la red es un riesgo.",
|
|
file=sys.stderr,
|
|
)
|
|
try:
|
|
app = create_app(args.vault)
|
|
except (FileNotFoundError, NotADirectoryError) as exc:
|
|
print(f"error: {exc}", file=sys.stderr)
|
|
return 2
|
|
|
|
import uvicorn
|
|
|
|
state = app.state.vault
|
|
print(
|
|
f"osint_web backend en http://{args.host}:{args.port} — vault: "
|
|
f"{state.vault_dir} ({len(state.graph['nodes'])} nodos, "
|
|
f"{len(state.graph['edges'])} aristas)"
|
|
)
|
|
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|