Files
fn_registry/python/functions/pipelines/duckdb_to_postgres_test.py
T
egutierrez 927437a8d8 feat(infra): grupo claude-fleet — FleetView TUI + orquestacion de Claudes
Sistema FleetView para centralizar la flota de procesos Claude Code vivos en una
sola ventana kitty + tmux (socket aislado -L fleet) con un panel TUI:

- list_claude_fleet (+ tipo claude_fleet): escanea ~/.claude/sessions + goals +
  runtime, valida procesos vivos (anti-PID-reciclado), join por sessionId.
- list_resumable_claudes (+ tipo resumable_claude): sesiones cerradas reanudables.
- wrappers tmux: tmux_new_claude_window (con --resume), tmux_swap_window_into_console
  (preserva ancho del sidebar), tmux_map_claude_panes.
- launch_kittyclaude: comando entrypoint; instala atajos alt+flechas/enter/n/0/k/r,
  mouse on, remain-on-exit off; fija el ancho del sidebar con hooks.
- docs/capabilities/claude-fleet.md + entrada en el INDEX.

Incluye ademas funciones datascience en progreso (excel/duckdb/postgres) y ajustes
varios de docs e infra de otra sesion, agrupados aqui para no perderlos.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-17 00:04:41 +02:00

146 lines
4.6 KiB
Python

"""Tests para el pipeline duckdb_to_postgres.
Los tests que tocan PostgreSQL hacen skip elegante si no hay PG_TEST_DSN. El mapeo
de tipos y la construccion de DDL se prueban sin Postgres (logica pura interna).
"""
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(__file__))
import duckdb # noqa: E402
from duckdb_to_postgres import ( # noqa: E402
_build_ddl,
_map_duckdb_type_to_pg,
duckdb_to_postgres,
)
PG_DSN = os.environ.get("PG_TEST_DSN")
# --- Tests sin Postgres: mapeo de tipos y DDL ---
def test_map_tipos_duckdb_a_postgres():
assert _map_duckdb_type_to_pg("BIGINT") == "BIGINT"
assert _map_duckdb_type_to_pg("INTEGER") == "BIGINT"
assert _map_duckdb_type_to_pg("DOUBLE") == "DOUBLE PRECISION"
assert _map_duckdb_type_to_pg("FLOAT") == "DOUBLE PRECISION"
assert _map_duckdb_type_to_pg("VARCHAR") == "TEXT"
assert _map_duckdb_type_to_pg("TEXT") == "TEXT"
assert _map_duckdb_type_to_pg("BOOLEAN") == "BOOLEAN"
assert _map_duckdb_type_to_pg("DATE") == "DATE"
assert _map_duckdb_type_to_pg("TIMESTAMP") == "TIMESTAMP"
# Parametrizados normalizan al tipo base.
assert _map_duckdb_type_to_pg("DECIMAL(10,2)") == "TEXT"
assert _map_duckdb_type_to_pg("VARCHAR(50)") == "TEXT"
# Desconocido -> TEXT (con posible perdida de tipado).
assert _map_duckdb_type_to_pg("STRUCT(a INT)") == "TEXT"
def test_build_ddl_con_pk_y_drop():
cols = [
{"name": "id", "type": "BIGINT"},
{"name": "nombre", "type": "VARCHAR"},
]
ddl = _build_ddl("destino", cols, ["id"], drop_first=True)
assert "DROP TABLE IF EXISTS \"destino\";" in ddl
assert 'CREATE TABLE IF NOT EXISTS "destino"' in ddl
assert '"id" BIGINT' in ddl
assert '"nombre" TEXT' in ddl
assert 'PRIMARY KEY ("id")' in ddl
def test_build_ddl_sin_pk_ni_drop():
cols = [{"name": "x", "type": "DOUBLE"}]
ddl = _build_ddl("t", cols, [], drop_first=False)
assert "DROP TABLE" not in ddl
assert '"x" DOUBLE PRECISION' in ddl
assert "PRIMARY KEY" not in ddl
# --- Validaciones de entrada (sin Postgres) ---
def test_identificador_tabla_invalido(tmp_path):
res = duckdb_to_postgres(str(tmp_path / "x.duckdb"), "t; DROP", "dsn")
assert res["status"] == "error"
assert "invalid table identifier" in res["error"]
def test_mode_invalido(tmp_path):
db = tmp_path / "x.duckdb"
con = duckdb.connect(str(db))
con.execute("CREATE TABLE t (id BIGINT)")
con.close()
res = duckdb_to_postgres(str(db), "t", "dsn", mode="merge")
assert res["status"] == "error"
assert "invalid mode" in res["error"]
# --- Tests end-to-end con Postgres ---
@pytest.mark.skipif(not PG_DSN, reason="PG_TEST_DSN no definido")
def test_replace_sincroniza_filas(tmp_path):
db = tmp_path / "src.duckdb"
con = duckdb.connect(str(db))
con.execute("CREATE TABLE ventas (id BIGINT, region VARCHAR, total DOUBLE)")
con.execute(
"INSERT INTO ventas VALUES (1,'norte',10.5),(2,'sur',20.0),(3,'norte',5.25)"
)
con.close()
pgt = "test_duckdb_to_pg_ventas"
res = duckdb_to_postgres(str(db), "ventas", PG_DSN, pg_table=pgt, mode="replace")
assert res["status"] == "ok", res
assert res["pg_table"] == pgt
assert res["rows_synced"] == 3
assert res["created"] is True
import psycopg2
conn = psycopg2.connect(PG_DSN)
try:
with conn.cursor() as cur:
cur.execute(f'SELECT COUNT(*) FROM "{pgt}"')
assert cur.fetchone()[0] == 3
cur.execute(f'DROP TABLE IF EXISTS "{pgt}"')
conn.commit()
finally:
conn.close()
@pytest.mark.skipif(not PG_DSN, reason="PG_TEST_DSN no definido")
def test_upsert_idempotente_con_key_cols(tmp_path):
db = tmp_path / "src.duckdb"
con = duckdb.connect(str(db))
con.execute("CREATE TABLE u (id BIGINT, v VARCHAR)")
con.execute("INSERT INTO u VALUES (1,'a'),(2,'b')")
con.close()
pgt = "test_duckdb_to_pg_upsert"
r1 = duckdb_to_postgres(
str(db), "u", PG_DSN, pg_table=pgt, mode="replace", key_cols=["id"]
)
assert r1["status"] == "ok", r1
# Re-sync en modo upsert: no debe duplicar (idempotente).
r2 = duckdb_to_postgres(
str(db), "u", PG_DSN, pg_table=pgt, mode="upsert", key_cols=["id"]
)
assert r2["status"] == "ok", r2
import psycopg2
conn = psycopg2.connect(PG_DSN)
try:
with conn.cursor() as cur:
cur.execute(f'SELECT COUNT(*) FROM "{pgt}"')
assert cur.fetchone()[0] == 2
cur.execute(f'DROP TABLE IF EXISTS "{pgt}"')
conn.commit()
finally:
conn.close()