"""Capa de acceso a la base DuckDB: single-writer + migraciones. Single-writer: SOLO este service escribe osint.duckdb. DuckDB bloquea el archivo a un escritor exclusivo, así que la conexión de escritura se abre bajo demanda (migraciones, ingest, rebuild de derivadas), serializada con un lock de proceso, y se cierra inmediatamente al terminar. Así, fuera de una escritura en curso, las lecturas de /api/query pueden abrir su propia conexión read_only via duckdb_query_readonly sin conflicto de lock. Migraciones (regla db_migrations adaptada a DuckDB): archivos numerados migrations/NNN_*.sql, aditivos e idempotentes, aplicados en orden al arrancar. La tabla _migrations registra cuáles ya se aplicaron para no repetirlas. """ from __future__ import annotations import glob import os import threading from contextlib import contextmanager import duckdb # Lock de proceso: serializa todas las aperturas de la conexión de escritura. _WRITE_LOCK = threading.Lock() MIGRATIONS_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "migrations" ) @contextmanager def write_conn(db_path: str): """Abre la conexión de escritura del service de forma exclusiva. Context manager: adquiere el lock de proceso, abre DuckDB en lectura y escritura, cede la conexión y la cierra siempre al salir. Crea el directorio padre del archivo si no existe. """ parent = os.path.dirname(os.path.abspath(db_path)) if parent: os.makedirs(parent, exist_ok=True) with _WRITE_LOCK: conn = duckdb.connect(db_path) try: yield conn finally: conn.close() def apply_migrations(db_path: str) -> list: """Aplica las migraciones pendientes en orden y devuelve las aplicadas. Lee migrations/NNN_*.sql ordenados por nombre, salta las que ya están registradas en _migrations y ejecuta el resto dentro de la conexión de escritura. Idempotente: una segunda llamada no aplica nada. """ applied: list = [] files = sorted(glob.glob(os.path.join(MIGRATIONS_DIR, "*.sql"))) with write_conn(db_path) as conn: conn.execute( "CREATE TABLE IF NOT EXISTS _migrations (" "name TEXT PRIMARY KEY, applied_at TIMESTAMP DEFAULT now())" ) done = {row[0] for row in conn.execute("SELECT name FROM _migrations").fetchall()} for path in files: name = os.path.basename(path) if name in done: continue with open(path, "r", encoding="utf-8") as fh: sql = fh.read() conn.execute("BEGIN") try: conn.execute(sql) conn.execute("INSERT INTO _migrations (name) VALUES (?)", [name]) conn.execute("COMMIT") except Exception: conn.execute("ROLLBACK") raise applied.append(name) return applied def list_tables(db_path: str) -> list: """Inventario de tablas para /api/tables. Devuelve una lista de dicts {schema, name, kind, row_count, columns} para las tablas de los schemas main y derived (excluye _migrations). kind es 'master' para main y 'derived' para derived. """ out: list = [] conn = duckdb.connect(db_path, read_only=True) try: tables = conn.execute( "SELECT table_schema, table_name FROM information_schema.tables " "WHERE table_schema IN ('main', 'derived') AND table_name != '_migrations' " "ORDER BY table_schema, table_name" ).fetchall() for schema, name in tables: cols = conn.execute( "SELECT column_name, data_type FROM information_schema.columns " "WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position", [schema, name], ).fetchall() row_count = conn.execute( f'SELECT COUNT(*) FROM "{schema}"."{name}"' ).fetchone()[0] out.append( { "schema": schema, "name": name, "kind": "derived" if schema == "derived" else "master", "row_count": row_count, "columns": [{"name": c, "type": t} for c, t in cols], } ) finally: conn.close() return out