120 lines
4.3 KiB
Python
120 lines
4.3 KiB
Python
"""Capa de acceso a la base DuckDB: single-writer + migraciones.
|
|
|
|
Single-writer: SOLO este service escribe osint.duckdb. DuckDB bloquea el
|
|
archivo a un escritor exclusivo, así que la conexión de escritura se abre bajo
|
|
demanda (migraciones, ingest, rebuild de derivadas), serializada con un lock de
|
|
proceso, y se cierra inmediatamente al terminar. Así, fuera de una escritura en
|
|
curso, las lecturas de /api/query pueden abrir su propia conexión read_only via
|
|
duckdb_query_readonly sin conflicto de lock.
|
|
|
|
Migraciones (regla db_migrations adaptada a DuckDB): archivos numerados
|
|
migrations/NNN_*.sql, aditivos e idempotentes, aplicados en orden al arrancar.
|
|
La tabla _migrations registra cuáles ya se aplicaron para no repetirlas.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import glob
|
|
import os
|
|
import threading
|
|
from contextlib import contextmanager
|
|
|
|
import duckdb
|
|
|
|
# Lock de proceso: serializa todas las aperturas de la conexión de escritura.
|
|
_WRITE_LOCK = threading.Lock()
|
|
|
|
MIGRATIONS_DIR = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "migrations"
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def write_conn(db_path: str):
|
|
"""Abre la conexión de escritura del service de forma exclusiva.
|
|
|
|
Context manager: adquiere el lock de proceso, abre DuckDB en lectura y
|
|
escritura, cede la conexión y la cierra siempre al salir. Crea el
|
|
directorio padre del archivo si no existe.
|
|
"""
|
|
parent = os.path.dirname(os.path.abspath(db_path))
|
|
if parent:
|
|
os.makedirs(parent, exist_ok=True)
|
|
with _WRITE_LOCK:
|
|
conn = duckdb.connect(db_path)
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def apply_migrations(db_path: str) -> list:
|
|
"""Aplica las migraciones pendientes en orden y devuelve las aplicadas.
|
|
|
|
Lee migrations/NNN_*.sql ordenados por nombre, salta las que ya están
|
|
registradas en _migrations y ejecuta el resto dentro de la conexión de
|
|
escritura. Idempotente: una segunda llamada no aplica nada.
|
|
"""
|
|
applied: list = []
|
|
files = sorted(glob.glob(os.path.join(MIGRATIONS_DIR, "*.sql")))
|
|
with write_conn(db_path) as conn:
|
|
conn.execute(
|
|
"CREATE TABLE IF NOT EXISTS _migrations ("
|
|
"name TEXT PRIMARY KEY, applied_at TIMESTAMP DEFAULT now())"
|
|
)
|
|
done = {row[0] for row in conn.execute("SELECT name FROM _migrations").fetchall()}
|
|
for path in files:
|
|
name = os.path.basename(path)
|
|
if name in done:
|
|
continue
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
sql = fh.read()
|
|
conn.execute("BEGIN")
|
|
try:
|
|
conn.execute(sql)
|
|
conn.execute("INSERT INTO _migrations (name) VALUES (?)", [name])
|
|
conn.execute("COMMIT")
|
|
except Exception:
|
|
conn.execute("ROLLBACK")
|
|
raise
|
|
applied.append(name)
|
|
return applied
|
|
|
|
|
|
def list_tables(db_path: str) -> list:
|
|
"""Inventario de tablas para /api/tables.
|
|
|
|
Devuelve una lista de dicts {schema, name, kind, row_count, columns} para
|
|
las tablas de los schemas main y derived (excluye _migrations). kind es
|
|
'master' para main y 'derived' para derived.
|
|
"""
|
|
out: list = []
|
|
conn = duckdb.connect(db_path, read_only=True)
|
|
try:
|
|
tables = conn.execute(
|
|
"SELECT table_schema, table_name FROM information_schema.tables "
|
|
"WHERE table_schema IN ('main', 'derived') AND table_name != '_migrations' "
|
|
"ORDER BY table_schema, table_name"
|
|
).fetchall()
|
|
for schema, name in tables:
|
|
cols = conn.execute(
|
|
"SELECT column_name, data_type FROM information_schema.columns "
|
|
"WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position",
|
|
[schema, name],
|
|
).fetchall()
|
|
row_count = conn.execute(
|
|
f'SELECT COUNT(*) FROM "{schema}"."{name}"'
|
|
).fetchone()[0]
|
|
out.append(
|
|
{
|
|
"schema": schema,
|
|
"name": name,
|
|
"kind": "derived" if schema == "derived" else "master",
|
|
"row_count": row_count,
|
|
"columns": [{"name": c, "type": t} for c, t in cols],
|
|
}
|
|
)
|
|
finally:
|
|
conn.close()
|
|
return out
|