"""Pipeline: sincroniza una tabla DuckDB a una tabla PostgreSQL. Esto es lo que desbloquea que herramientas BI (Metabase, Grafana, Superset) lean los datos que viven en un archivo DuckDB: esas herramientas NO hablan DuckDB nativo, pero todas hablan PostgreSQL. El pipeline lee el schema y las filas de la tabla DuckDB, crea (o recrea) la tabla equivalente en PostgreSQL con un mapeo de tipos DuckDB -> PostgreSQL, y vuelca las filas en lotes. Funcion impura de tipo pipeline: compone funciones del registry y NO reescribe su logica. - duckdb_table_schema -> lee columnas y tipos de la tabla DuckDB. - duckdb_query_readonly -> lee las filas (paginadas con LIMIT/OFFSET). - pg_apply_sql -> aplica el DDL (CREATE/DROP) escrito a un .sql temporal. - pg_insert_rows -> inserta lotes (camino replace / append sin clave). - pg_upsert (opcional) -> upsert idempotente cuando hay key_cols y mode!='replace'. pg_upsert se importa detras de un check: si todavia no esta en el registry, el pipeline sigue funcionando para el camino replace/insert. Devuelve un dict sin lanzar, estilo del grupo: {status:'ok', ...} en exito y {status:'error', error:str} en fallo. """ import os import re import sys import tempfile # Las funciones del registry se importan, no se reescriben. sys.path apunta al # directorio de funciones del registry (mismo patron que usan las apps Python). _FUNCTIONS_DIR = os.path.join( os.path.dirname(__file__), "..", ".." ) # python/ _FUNCTIONS_DIR = os.path.abspath(os.path.join(_FUNCTIONS_DIR, "functions")) if _FUNCTIONS_DIR not in sys.path: sys.path.insert(0, _FUNCTIONS_DIR) from infra.duckdb_query_readonly import duckdb_query_readonly # noqa: E402 from infra.duckdb_table_schema import duckdb_table_schema # noqa: E402 from infra.pg_apply_sql import pg_apply_sql # noqa: E402 from infra.pg_insert_rows import pg_insert_rows # noqa: E402 # pg_upsert puede no existir aun (lo construye otro agente en paralelo). Lo # cargamos detras de un check; sin el, el camino upsert no esta disponible pero # el resto del pipeline funciona. try: from infra.pg_upsert import pg_upsert # noqa: E402 _HAS_UPSERT = True except Exception: # noqa: BLE001 - cualquier fallo de import deja el camino off pg_upsert = None _HAS_UPSERT = False _VALID_IDENT = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") def _map_duckdb_type_to_pg(duck_type: str) -> str: """Mapea un tipo DuckDB a su equivalente PostgreSQL. El mapeo es conservador: tipos numericos/temporales/booleanos conocidos se mapean a su equivalente PG natural; cualquier otro tipo (incluidos compuestos como LIST/STRUCT/MAP, o DECIMAL con escala) cae a TEXT, que siempre acepta el valor serializado. Puede haber perdida de tipado fuerte para esos casos. """ t = (duck_type or "").strip().upper() # Normalizar tipos parametrizados: DECIMAL(10,2) -> DECIMAL, VARCHAR(50) -> VARCHAR. base = t.split("(")[0].strip() mapping = { "BIGINT": "BIGINT", "INT8": "BIGINT", "LONG": "BIGINT", "INTEGER": "BIGINT", "INT": "BIGINT", "INT4": "BIGINT", "SMALLINT": "BIGINT", "INT2": "BIGINT", "TINYINT": "BIGINT", "INT1": "BIGINT", "HUGEINT": "TEXT", # 128-bit: no cabe en BIGINT, serializar a texto. "UBIGINT": "TEXT", "DOUBLE": "DOUBLE PRECISION", "FLOAT8": "DOUBLE PRECISION", "FLOAT": "DOUBLE PRECISION", "FLOAT4": "DOUBLE PRECISION", "REAL": "DOUBLE PRECISION", "VARCHAR": "TEXT", "TEXT": "TEXT", "STRING": "TEXT", "CHAR": "TEXT", "BPCHAR": "TEXT", "BOOLEAN": "BOOLEAN", "BOOL": "BOOLEAN", "LOGICAL": "BOOLEAN", "DATE": "DATE", "TIMESTAMP": "TIMESTAMP", "DATETIME": "TIMESTAMP", "TIMESTAMP_S": "TIMESTAMP", "TIMESTAMP_MS": "TIMESTAMP", "TIMESTAMP_NS": "TIMESTAMP", } return mapping.get(base, "TEXT") def _build_ddl( pg_table: str, columns: list, key_cols: list, drop_first: bool, ) -> str: """Construye el DDL CREATE (y opcional DROP) para la tabla destino en PG. columns: lista de {name, type} (tipo DuckDB). key_cols: columnas de la PK (puede ser None/[]). drop_first: si True antepone DROP TABLE IF EXISTS. """ col_defs = [] for col in columns: pg_type = _map_duckdb_type_to_pg(col["type"]) col_defs.append(f' "{col["name"]}" {pg_type}') pk_clause = "" if key_cols: pk_cols = ", ".join(f'"{c}"' for c in key_cols) pk_clause = f",\n PRIMARY KEY ({pk_cols})" parts = [] if drop_first: parts.append(f'DROP TABLE IF EXISTS "{pg_table}";') parts.append( f'CREATE TABLE IF NOT EXISTS "{pg_table}" (\n' + ",\n".join(col_defs) + pk_clause + "\n);" ) return "\n".join(parts) def duckdb_to_postgres( duckdb_path: str, table: str, pg_dsn: str, pg_table: str = None, mode: str = "replace", key_cols: list = None, batch_size: int = 5000, ) -> dict: """Sincroniza una tabla DuckDB a PostgreSQL (puente para BI: Metabase/Grafana). Args: duckdb_path: ruta al archivo DuckDB de origen (se lee en modo read_only). table: nombre de la tabla DuckDB a sincronizar. Validado como identificador. pg_dsn: cadena de conexion PostgreSQL, p.ej. "postgresql://user:pass@host:5432/db". pg_table: nombre de la tabla destino en PostgreSQL. None (default) usa el mismo nombre que `table`. Validado como identificador. mode: 'replace' (default) hace DROP TABLE IF EXISTS + CREATE + INSERT de todas las filas (snapshot completo). 'append'/'upsert' crean la tabla si no existe (CREATE TABLE IF NOT EXISTS) y luego: si key_cols esta presente usan pg_upsert (idempotente); si no, hacen INSERT append-only con pg_insert_rows. Cualquier otro valor devuelve {status:'error', ...}. key_cols: lista de columnas de la PRIMARY KEY. Se incluyen en el CREATE como PRIMARY KEY y, en modo != 'replace', habilitan el upsert idempotente. None/[] (default) = sin PK, solo INSERT. batch_size: numero de filas por lote de insercion/upsert (default 5000). Returns: dict. En exito: {status:'ok', pg_table:str, rows_synced:int, created:bool} donde rows_synced es el total de filas volcadas y created indica si se ejecuto el CREATE/DROP del schema. En error (sin lanzar): {status:'error', error:str}. """ # --- Validaciones de entrada --- if not isinstance(table, str) or not _VALID_IDENT.match(table): return {"status": "error", "error": f"invalid table identifier: {table!r}"} target = pg_table if pg_table is not None else table if not isinstance(target, str) or not _VALID_IDENT.match(target): return {"status": "error", "error": f"invalid pg_table identifier: {target!r}"} if mode not in ("replace", "append", "upsert"): return { "status": "error", "error": f"invalid mode: {mode!r} (expected 'replace'|'append'|'upsert')", } keys = list(key_cols) if key_cols else [] for k in keys: if not isinstance(k, str) or not _VALID_IDENT.match(k): return {"status": "error", "error": f"invalid key_col identifier: {k!r}"} if not isinstance(batch_size, int) or batch_size <= 0: return {"status": "error", "error": f"invalid batch_size: {batch_size!r}"} use_upsert = bool(keys) and mode != "replace" if use_upsert and not _HAS_UPSERT: return { "status": "error", "error": ( "key_cols + mode!='replace' requiere pg_upsert_py_infra, que no " "esta disponible en este entorno" ), } # --- (a) Schema de la tabla DuckDB --- schema = duckdb_table_schema(duckdb_path, table) if schema.get("status") != "ok": return { "status": "error", "error": f"no se pudo leer el schema de {table!r}: {schema.get('error')}", } columns = schema["columns"] if not columns: return {"status": "error", "error": f"la tabla {table!r} no tiene columnas"} col_names = [c["name"] for c in columns] # Validar que las key_cols existen en el schema. for k in keys: if k not in col_names: return { "status": "error", "error": f"key_col {k!r} no esta en las columnas de {table!r}", } # --- (b) DDL: crear/recrear la tabla en PostgreSQL via pg_apply_sql --- drop_first = mode == "replace" ddl = _build_ddl(target, columns, keys, drop_first) tmp_sql_path = None try: fd, tmp_sql_path = tempfile.mkstemp(suffix=".sql", prefix="duckdb_to_pg_") with os.fdopen(fd, "w", encoding="utf-8") as fh: fh.write(ddl) pg_apply_sql(pg_dsn, tmp_sql_path) # lanza RuntimeError si falla created = True except Exception as e: # noqa: BLE001 - convertir el raise de pg_apply_sql a dict return {"status": "error", "error": f"DDL fallo: {e}"} finally: if tmp_sql_path is not None and os.path.exists(tmp_sql_path): try: os.remove(tmp_sql_path) except OSError: pass # --- (c) Leer filas de DuckDB y volcarlas en PostgreSQL por lotes --- quoted = '"' + table.replace('"', '""') + '"' offset = 0 rows_synced = 0 try: while True: page = duckdb_query_readonly( duckdb_path, f"SELECT * FROM {quoted} LIMIT ? OFFSET ?", params=[batch_size, offset], max_rows=batch_size, ) if page.get("status") != "ok": return { "status": "error", "error": f"lectura de filas fallo en offset {offset}: " f"{page.get('error')}", } batch = page["rows"] if not batch: break if use_upsert: res = pg_upsert(pg_dsn, target, batch, keys) if res.get("status") != "ok": return { "status": "error", "error": f"pg_upsert fallo en offset {offset}: " f"{res.get('error')}", } rows_synced += res.get("inserted", 0) + res.get("updated", 0) else: # pg_insert_rows lanza RuntimeError si falla; add_snapshot_date=False # para no inyectar columnas que el schema DuckDB no tiene. inserted = pg_insert_rows( pg_dsn, target, batch, add_snapshot_date=False ) rows_synced += inserted offset += len(batch) if len(batch) < batch_size: break except Exception as e: # noqa: BLE001 - convertir raises de pg_insert_rows a dict return {"status": "error", "error": f"insercion fallo: {e}"} return { "status": "ok", "pg_table": target, "rows_synced": rows_synced, "created": created, } if __name__ == "__main__": # Ejecucion directa con `fn run`: demo minima contra una base DuckDB temporal y # un PostgreSQL apuntado por PG_TEST_DSN (si esta disponible). import json dsn = os.environ.get("PG_TEST_DSN") if not dsn: print(json.dumps({"status": "skipped", "reason": "PG_TEST_DSN no definido"})) sys.exit(0) demo_db = os.environ.get("DUCKDB_DEMO_PATH", "/tmp/duckdb_to_pg_demo.duckdb") import duckdb # noqa: E402 con = duckdb.connect(demo_db) con.execute("CREATE OR REPLACE TABLE demo (id BIGINT, nombre VARCHAR, total DOUBLE)") con.execute("INSERT INTO demo VALUES (1, 'ana', 10.5), (2, 'luis', 20.0)") con.close() print(json.dumps(duckdb_to_postgres(demo_db, "demo", dsn, mode="replace")))