"""Tests para el pipeline duckdb_to_postgres. Los tests que tocan PostgreSQL hacen skip elegante si no hay PG_TEST_DSN. El mapeo de tipos y la construccion de DDL se prueban sin Postgres (logica pura interna). """ import os import sys import pytest sys.path.insert(0, os.path.dirname(__file__)) import duckdb # noqa: E402 from duckdb_to_postgres import ( # noqa: E402 _build_ddl, _map_duckdb_type_to_pg, duckdb_to_postgres, ) PG_DSN = os.environ.get("PG_TEST_DSN") # --- Tests sin Postgres: mapeo de tipos y DDL --- def test_map_tipos_duckdb_a_postgres(): assert _map_duckdb_type_to_pg("BIGINT") == "BIGINT" assert _map_duckdb_type_to_pg("INTEGER") == "BIGINT" assert _map_duckdb_type_to_pg("DOUBLE") == "DOUBLE PRECISION" assert _map_duckdb_type_to_pg("FLOAT") == "DOUBLE PRECISION" assert _map_duckdb_type_to_pg("VARCHAR") == "TEXT" assert _map_duckdb_type_to_pg("TEXT") == "TEXT" assert _map_duckdb_type_to_pg("BOOLEAN") == "BOOLEAN" assert _map_duckdb_type_to_pg("DATE") == "DATE" assert _map_duckdb_type_to_pg("TIMESTAMP") == "TIMESTAMP" # Parametrizados normalizan al tipo base. assert _map_duckdb_type_to_pg("DECIMAL(10,2)") == "TEXT" assert _map_duckdb_type_to_pg("VARCHAR(50)") == "TEXT" # Desconocido -> TEXT (con posible perdida de tipado). assert _map_duckdb_type_to_pg("STRUCT(a INT)") == "TEXT" def test_build_ddl_con_pk_y_drop(): cols = [ {"name": "id", "type": "BIGINT"}, {"name": "nombre", "type": "VARCHAR"}, ] ddl = _build_ddl("destino", cols, ["id"], drop_first=True) assert "DROP TABLE IF EXISTS \"destino\";" in ddl assert 'CREATE TABLE IF NOT EXISTS "destino"' in ddl assert '"id" BIGINT' in ddl assert '"nombre" TEXT' in ddl assert 'PRIMARY KEY ("id")' in ddl def test_build_ddl_sin_pk_ni_drop(): cols = [{"name": "x", "type": "DOUBLE"}] ddl = _build_ddl("t", cols, [], drop_first=False) assert "DROP TABLE" not in ddl assert '"x" DOUBLE PRECISION' in ddl assert "PRIMARY KEY" not in ddl # --- Validaciones de entrada (sin Postgres) --- def test_identificador_tabla_invalido(tmp_path): res = duckdb_to_postgres(str(tmp_path / "x.duckdb"), "t; DROP", "dsn") assert res["status"] == "error" assert "invalid table identifier" in res["error"] def test_mode_invalido(tmp_path): db = tmp_path / "x.duckdb" con = duckdb.connect(str(db)) con.execute("CREATE TABLE t (id BIGINT)") con.close() res = duckdb_to_postgres(str(db), "t", "dsn", mode="merge") assert res["status"] == "error" assert "invalid mode" in res["error"] # --- Tests end-to-end con Postgres --- @pytest.mark.skipif(not PG_DSN, reason="PG_TEST_DSN no definido") def test_replace_sincroniza_filas(tmp_path): db = tmp_path / "src.duckdb" con = duckdb.connect(str(db)) con.execute("CREATE TABLE ventas (id BIGINT, region VARCHAR, total DOUBLE)") con.execute( "INSERT INTO ventas VALUES (1,'norte',10.5),(2,'sur',20.0),(3,'norte',5.25)" ) con.close() pgt = "test_duckdb_to_pg_ventas" res = duckdb_to_postgres(str(db), "ventas", PG_DSN, pg_table=pgt, mode="replace") assert res["status"] == "ok", res assert res["pg_table"] == pgt assert res["rows_synced"] == 3 assert res["created"] is True import psycopg2 conn = psycopg2.connect(PG_DSN) try: with conn.cursor() as cur: cur.execute(f'SELECT COUNT(*) FROM "{pgt}"') assert cur.fetchone()[0] == 3 cur.execute(f'DROP TABLE IF EXISTS "{pgt}"') conn.commit() finally: conn.close() @pytest.mark.skipif(not PG_DSN, reason="PG_TEST_DSN no definido") def test_upsert_idempotente_con_key_cols(tmp_path): db = tmp_path / "src.duckdb" con = duckdb.connect(str(db)) con.execute("CREATE TABLE u (id BIGINT, v VARCHAR)") con.execute("INSERT INTO u VALUES (1,'a'),(2,'b')") con.close() pgt = "test_duckdb_to_pg_upsert" r1 = duckdb_to_postgres( str(db), "u", PG_DSN, pg_table=pgt, mode="replace", key_cols=["id"] ) assert r1["status"] == "ok", r1 # Re-sync en modo upsert: no debe duplicar (idempotente). r2 = duckdb_to_postgres( str(db), "u", PG_DSN, pg_table=pgt, mode="upsert", key_cols=["id"] ) assert r2["status"] == "ok", r2 import psycopg2 conn = psycopg2.connect(PG_DSN) try: with conn.cursor() as cur: cur.execute(f'SELECT COUNT(*) FROM "{pgt}"') assert cur.fetchone()[0] == 2 cur.execute(f'DROP TABLE IF EXISTS "{pgt}"') conn.commit() finally: conn.close()