Files
osint_db/server/db.py
T

120 lines
4.3 KiB
Python

"""Capa de acceso a la base DuckDB: single-writer + migraciones.
Single-writer: SOLO este service escribe osint.duckdb. DuckDB bloquea el
archivo a un escritor exclusivo, así que la conexión de escritura se abre bajo
demanda (migraciones, ingest, rebuild de derivadas), serializada con un lock de
proceso, y se cierra inmediatamente al terminar. Así, fuera de una escritura en
curso, las lecturas de /api/query pueden abrir su propia conexión read_only via
duckdb_query_readonly sin conflicto de lock.
Migraciones (regla db_migrations adaptada a DuckDB): archivos numerados
migrations/NNN_*.sql, aditivos e idempotentes, aplicados en orden al arrancar.
La tabla _migrations registra cuáles ya se aplicaron para no repetirlas.
"""
from __future__ import annotations
import glob
import os
import threading
from contextlib import contextmanager
import duckdb
# Lock de proceso: serializa todas las aperturas de la conexión de escritura.
_WRITE_LOCK = threading.Lock()
MIGRATIONS_DIR = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "migrations"
)
@contextmanager
def write_conn(db_path: str):
"""Abre la conexión de escritura del service de forma exclusiva.
Context manager: adquiere el lock de proceso, abre DuckDB en lectura y
escritura, cede la conexión y la cierra siempre al salir. Crea el
directorio padre del archivo si no existe.
"""
parent = os.path.dirname(os.path.abspath(db_path))
if parent:
os.makedirs(parent, exist_ok=True)
with _WRITE_LOCK:
conn = duckdb.connect(db_path)
try:
yield conn
finally:
conn.close()
def apply_migrations(db_path: str) -> list:
"""Aplica las migraciones pendientes en orden y devuelve las aplicadas.
Lee migrations/NNN_*.sql ordenados por nombre, salta las que ya están
registradas en _migrations y ejecuta el resto dentro de la conexión de
escritura. Idempotente: una segunda llamada no aplica nada.
"""
applied: list = []
files = sorted(glob.glob(os.path.join(MIGRATIONS_DIR, "*.sql")))
with write_conn(db_path) as conn:
conn.execute(
"CREATE TABLE IF NOT EXISTS _migrations ("
"name TEXT PRIMARY KEY, applied_at TIMESTAMP DEFAULT now())"
)
done = {row[0] for row in conn.execute("SELECT name FROM _migrations").fetchall()}
for path in files:
name = os.path.basename(path)
if name in done:
continue
with open(path, "r", encoding="utf-8") as fh:
sql = fh.read()
conn.execute("BEGIN")
try:
conn.execute(sql)
conn.execute("INSERT INTO _migrations (name) VALUES (?)", [name])
conn.execute("COMMIT")
except Exception:
conn.execute("ROLLBACK")
raise
applied.append(name)
return applied
def list_tables(db_path: str) -> list:
"""Inventario de tablas para /api/tables.
Devuelve una lista de dicts {schema, name, kind, row_count, columns} para
las tablas de los schemas main y derived (excluye _migrations). kind es
'master' para main y 'derived' para derived.
"""
out: list = []
conn = duckdb.connect(db_path, read_only=True)
try:
tables = conn.execute(
"SELECT table_schema, table_name FROM information_schema.tables "
"WHERE table_schema IN ('main', 'derived') AND table_name != '_migrations' "
"ORDER BY table_schema, table_name"
).fetchall()
for schema, name in tables:
cols = conn.execute(
"SELECT column_name, data_type FROM information_schema.columns "
"WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position",
[schema, name],
).fetchall()
row_count = conn.execute(
f'SELECT COUNT(*) FROM "{schema}"."{name}"'
).fetchone()[0]
out.append(
{
"schema": schema,
"name": name,
"kind": "derived" if schema == "derived" else "master",
"row_count": row_count,
"columns": [{"name": c, "type": t} for c, t in cols],
}
)
finally:
conn.close()
return out