#!/usr/bin/env python3 """Service FastAPI osint_db: dueño único de la base DuckDB data/osint.duckdb. La base es la fuente de verdad estructurada del ecosistema OSINT: índice del vault de Obsidian (notes + entidades con note_path), maestras DAV importadas de Xandikos (contacts, events) y derivadas computadas (schema derived, sin referencias a notas). Solo este proceso escribe la base; las lecturas de /api/query usan una conexión read_only separada (duckdb_query_readonly). Registry-first: el service NO reimplementa parseo de Markdown, protocolo DAV, acceso a pass, ejecución read-only de DuckDB, render de tablas Markdown ni bloques sentinel — importa esas funciones del registry (server/registry_bridge.py) y solo aporta la lógica propia del dominio (mapeo vault→tablas, matching contacto→ficha, derivadas y la API). Seguridad: el vault contiene datos personales sensibles, así que el server escucha SOLO en 127.0.0.1 y /api/query es estrictamente de solo lectura. Uso: .venv/bin/python server/main.py --vault ~/Obsidian/osint --port 8771 Contrato (el plugin de Obsidian parsea el body, no el código HTTP: los endpoints de datos responden SIEMPRE 200 con status ok|error en el body): GET /api/health estado + db_path + número de tablas GET /api/tables inventario de tablas (schema, kind, filas, columnas) POST /api/query SELECT arbitrario read-only {sql, params, max_rows} GET /api/queries catálogo de queries con nombre POST /api/query/named ejecuta una query del catálogo {name, max_rows} POST /api/ingest/vault re-escanea el vault y reconstruye maestras+derivadas POST /api/ingest/dav baja Xandikos y reconstruye contacts/events+derivadas POST /api/push/dav push masivo HTTP de contacts+events (N PUT, fallback sin SSH) POST /api/push/dav-bulk push masivo de contacts por DISCO (1 rsync + 1 commit, requiere SSH) POST /api/sync/dav-pull sync inverso incremental por etag (ediciones del móvil) POST /api/render/note ejecuta query y la upserta como bloque sentinel en una nota """ from __future__ import annotations import argparse import os import sys # Permite ejecutar tanto `python server/main.py` como importar `server.main`. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI # noqa: E402 from pydantic import BaseModel, Field # noqa: E402 from server.config import SENTINEL_MARKER, Config # noqa: E402 from server.db import apply_migrations, list_tables # noqa: E402 from server.ingest import ingest_dav, ingest_vault # noqa: E402 from server.named_queries import NAMED_QUERIES # noqa: E402 from server.registry_bridge import ( # noqa: E402 create_obsidian_note, duckdb_query_readonly, read_obsidian_note, render_markdown_table, update_obsidian_note, upsert_sentinel_block, ) from server import writes # noqa: E402 # Tope de filas que un render vuelca en una nota (las notas no son un export). RENDER_MAX_ROWS = 200 class QueryBody(BaseModel): """Body de POST /api/query.""" sql: str params: list = Field(default_factory=list) max_rows: int = 500 class NamedQueryBody(BaseModel): """Body de POST /api/query/named.""" name: str max_rows: int = 500 class RenderNoteBody(BaseModel): """Body de POST /api/render/note. Exactamente uno de sql|query.""" note_path: str block_id: str sql: str | None = None query: str | None = None title: str | None = None class PersonBody(BaseModel): """Body de POST/PUT /api/person: ficha de persona multi-valor. Las listas telefonos/emails/direcciones son la fuente de verdad; los singulares de la nota se derivan del primer elemento al materializar. """ slug: str | None = None nombre: str | None = None aliases: list = Field(default_factory=list) sexo: str | None = None fecha_nacimiento: str | None = None dni: str | None = None pais: str | None = None contexto: str | None = None telefonos: list = Field(default_factory=list) emails: list = Field(default_factory=list) direcciones: list = Field(default_factory=list) tags: list = Field(default_factory=list) class ContactBody(BaseModel): """Body de POST/PUT /api/contact: contacto CardDAV multi-valor.""" uid: str | None = None nombre: str | None = None fn: str | None = None collection: str | None = None tels: list = Field(default_factory=list) telefonos: list = Field(default_factory=list) emails: list = Field(default_factory=list) correos: list = Field(default_factory=list) direcciones: list = Field(default_factory=list) class EventBody(BaseModel): """Body de POST/PUT /api/event: evento CalDAV.""" uid: str | None = None calendar: str | None = None summary: str | None = None dtstart: str | None = None dtend: str | None = None all_day: bool = False location: str | None = None rrule: str | None = None class AddressbookBody(BaseModel): """Body de POST /api/addressbook: crea libreta CardDAV.""" slug: str display_name: str | None = None description: str | None = None color: str | None = None class CalendarBody(BaseModel): """Body de POST /api/calendar: crea calendario CalDAV.""" slug: str display_name: str | None = None color: str | None = None def create_app(cfg: Config) -> FastAPI: """Construye la app FastAPI con la configuración dada (inyectable en tests).""" app = FastAPI(title="osint_db", docs_url=None, redoc_url=None) # Anti DNS-rebinding: solo acepta requests cuyo Host sea localhost. Sin esto, una # web maliciosa podría rebindear su dominio a 127.0.0.1 y, desde el navegador del # usuario, alcanzar este service (que no tiene auth por ser de uso local) y abusar # de /api/query. "testserver" permite el TestClient de los tests. from starlette.middleware.trustedhost import TrustedHostMiddleware app.add_middleware( TrustedHostMiddleware, allowed_hosts=["127.0.0.1", "localhost", "testserver"], ) # Anti-CSRF de navegador: rechaza las peticiones mutantes que el navegador # marca como cross-site (header Sec-Fetch-Site). Cierra el hueco de las # peticiones "simples" (POST sin preflight CORS) que el TrustedHost no filtra # porque su Host sigue siendo 127.0.0.1. Los clientes server-to-server (urllib, # curl) y el frontend mismo-origen no envían 'cross-site', así que no se ven # afectados. from starlette.responses import JSONResponse @app.middleware("http") async def _reject_cross_site(request, call_next): if request.method in ("POST", "PUT", "PATCH", "DELETE"): if request.headers.get("sec-fetch-site") == "cross-site": return JSONResponse( status_code=403, content={"status": "error", "error": "petición cross-site rechazada"}, ) return await call_next(request) def run_readonly(sql: str, params: list, max_rows: int) -> dict: """Ejecuta un SELECT con la conexión read_only del registry, acotado.""" max_rows = max(1, min(int(max_rows), 10000)) return duckdb_query_readonly(cfg.db_path, sql, params or [], max_rows) @app.get("/api/health") def health() -> dict: try: tables = list_tables(cfg.db_path) except Exception as e: # noqa: BLE001 return {"status": "error", "db_path": cfg.db_path, "error": str(e)} return {"status": "ok", "db_path": cfg.db_path, "tables": len(tables)} @app.get("/api/tables") def tables() -> dict: try: return {"status": "ok", "tables": list_tables(cfg.db_path)} except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} @app.post("/api/query") def query(body: QueryBody) -> dict: return run_readonly(body.sql, body.params, body.max_rows) @app.get("/api/queries") def queries() -> dict: return { "status": "ok", "queries": [ {"name": name, "description": q["description"], "sql": q["sql"]} for name, q in NAMED_QUERIES.items() ], } @app.post("/api/query/named") def query_named(body: NamedQueryBody) -> dict: entry = NAMED_QUERIES.get(body.name) if entry is None: return { "status": "error", "error": f"query con nombre desconocida: {body.name!r} " f"(disponibles: {', '.join(sorted(NAMED_QUERIES))})", } return run_readonly(entry["sql"], [], body.max_rows) @app.post("/api/ingest/vault") def api_ingest_vault() -> dict: try: return ingest_vault(cfg) except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} @app.post("/api/ingest/dav") def api_ingest_dav() -> dict: try: return ingest_dav(cfg) except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} @app.post("/api/render/note") def render_note(body: RenderNoteBody) -> dict: # Resolver el SQL: o viene literal, o por nombre del catálogo. if bool(body.sql) == bool(body.query): return { "status": "error", "error": "indica exactamente uno de los campos 'sql' o 'query' (named)", } if body.query: entry = NAMED_QUERIES.get(body.query) if entry is None: return { "status": "error", "error": f"query con nombre desconocida: {body.query!r}", } sql = entry["sql"] else: sql = body.sql result = run_readonly(sql, [], RENDER_MAX_ROWS) if result.get("status") != "ok": return result table_md = render_markdown_table( result["rows"], columns=result["columns"], max_rows=RENDER_MAX_ROWS ) content = table_md if table_md else "_(sin filas)_" if body.title: content = f"### {body.title}\n\n{content}" # La nota se referencia por path relativo al vault; el path no puede # escapar del vault (mismo criterio anti-traversal que osint_web). rel = body.note_path if body.note_path.endswith(".md") else body.note_path + ".md" abs_path = os.path.abspath(os.path.join(cfg.vault_dir, rel)) vault_real = os.path.realpath(cfg.vault_dir) if not os.path.realpath(abs_path).startswith(vault_real + os.sep): return {"status": "error", "error": f"note_path fuera del vault: {body.note_path!r}"} try: if os.path.exists(abs_path): note = read_obsidian_note(abs_path) new_body = upsert_sentinel_block( note.get("body", "") or "", body.block_id, content, marker=SENTINEL_MARKER, ) update_obsidian_note(abs_path, body=new_body) else: new_body = upsert_sentinel_block( "", body.block_id, content, marker=SENTINEL_MARKER ) create_obsidian_note( cfg.vault_dir, rel, body=new_body, frontmatter={"tipo": "tablero", "tags": ["osintdb"]}, ) except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} return { "status": "ok", "note_path": rel, "rows_rendered": result["row_count"], } # --- Escritura estructurada (DB fuente de verdad) --------------------- # Todos responden 200 + {status}. La escritura DB va bajo el lock del # service; el push DAV y el render ocurren tras cerrar la transacción. def _guard(fn) -> dict: try: return fn() except Exception as e: # noqa: BLE001 return {"status": "error", "error": str(e)} @app.post("/api/person") def create_person(body: PersonBody) -> dict: if not body.slug: return {"status": "error", "error": "falta 'slug'"} return _guard(lambda: writes.upsert_person(cfg, body.slug, body.model_dump())) @app.put("/api/person/{slug}") def update_person(slug: str, body: PersonBody) -> dict: return _guard(lambda: writes.upsert_person(cfg, slug, body.model_dump())) @app.delete("/api/person/{slug}") def remove_person(slug: str) -> dict: return _guard(lambda: writes.delete_person(cfg, slug)) @app.post("/api/person/{slug}/render") def materialize_person(slug: str) -> dict: return _guard(lambda: writes.render_person(cfg, slug)) @app.post("/api/contact") def create_contact(body: ContactBody) -> dict: if not body.uid: return {"status": "error", "error": "falta 'uid'"} return _guard(lambda: writes.upsert_contact(cfg, body.uid, body.model_dump())) @app.put("/api/contact/{uid}") def update_contact(uid: str, body: ContactBody) -> dict: return _guard(lambda: writes.upsert_contact(cfg, uid, body.model_dump())) @app.delete("/api/contact/{uid}") def remove_contact(uid: str) -> dict: return _guard(lambda: writes.delete_contact(cfg, uid)) @app.post("/api/event") def create_event(body: EventBody) -> dict: if not body.uid: return {"status": "error", "error": "falta 'uid'"} return _guard(lambda: writes.upsert_event(cfg, body.uid, body.model_dump())) @app.put("/api/event/{uid}") def update_event(uid: str, body: EventBody) -> dict: return _guard(lambda: writes.upsert_event(cfg, uid, body.model_dump())) @app.delete("/api/event/{uid}") def remove_event(uid: str) -> dict: return _guard(lambda: writes.delete_event(cfg, uid)) @app.post("/api/addressbook") def create_addressbook(body: AddressbookBody) -> dict: return _guard(lambda: writes.make_addressbook(cfg, body.model_dump())) @app.post("/api/calendar") def create_calendar(body: CalendarBody) -> dict: return _guard(lambda: writes.make_calendar(cfg, body.model_dump())) @app.post("/api/push/dav") def push_dav() -> dict: return _guard(lambda: writes.push_all_dav(cfg)) @app.post("/api/push/dav-bulk") def push_dav_bulk() -> dict: # Vía RÁPIDA del push de contactos: 1 rsync + 1 commit + 1 PROPFIND en # el host de Xandikos (requiere SSH por clave). Si no hay SSH, usar # /api/push/dav (HTTP) como fallback. return _guard(lambda: writes.push_all_dav_bulk(cfg)) @app.post("/api/sync/dav-pull") def sync_dav_pull() -> dict: # Sync inverso: trae a la DB las ediciones del móvil/DAVx5, # last-write-wins por etag (incremental, distinto de ingest_dav). return _guard(lambda: writes.pull_dav(cfg)) @app.post("/api/org/render-contacts") def org_render_contacts() -> dict: # Materializa en la ficha .md de cada organización la tabla de sus # personas de contacto con teléfono y rol (desde derived.org_contacts). return _guard(lambda: writes.render_all_org_contacts(cfg)) return app def main() -> int: """Punto de entrada CLI: valida el vault, migra la base y arranca uvicorn.""" parser = argparse.ArgumentParser( description="Service osint_db: DuckDB fuente de verdad del ecosistema OSINT." ) parser.add_argument("--vault", default=None, help="directorio del vault Obsidian") parser.add_argument("--db", default=None, help="ruta del archivo osint.duckdb") parser.add_argument("--port", type=int, default=None, help="puerto de escucha") parser.add_argument( "--host", default="127.0.0.1", help="host de escucha (datos sensibles: déjalo en 127.0.0.1)", ) args = parser.parse_args() cfg = Config() if args.vault: cfg.vault_dir = os.path.expanduser(args.vault) if args.db: cfg.db_path = os.path.expanduser(args.db) if args.port is not None: cfg.port = args.port cfg.host = args.host if not os.path.isdir(cfg.vault_dir): print( f"ERROR: el vault no existe o no es un directorio: {cfg.vault_dir}", file=sys.stderr, ) return 2 if cfg.host != "127.0.0.1": print( "AVISO: la base contiene datos personales sensibles; escuchar fuera " "de 127.0.0.1 no está soportado.", file=sys.stderr, ) applied = apply_migrations(cfg.db_path) if applied: print(f"migraciones aplicadas: {', '.join(applied)}") import uvicorn app = create_app(cfg) uvicorn.run(app, host=cfg.host, port=cfg.port, log_level="warning") return 0 if __name__ == "__main__": sys.exit(main())