feat(infra): conexion y consulta directa a SQL Server (Navision) via pymssql

Grupo de capacidad nuevo 'sql-connect' (3 funciones) para conectar a un
Microsoft SQL Server (donde corre Navision) y consultar directamente, en
lugar del ida y vuelta manual de pegar CSVs.

- mssql_connect_py_infra: abre conexion pymssql (login_timeout acotado,
  credenciales por argumento, RuntimeError claro si falla).
- mssql_query_py_infra: SELECT parametrizada con binding seguro (sin
  inyeccion) sobre conexion abierta; devuelve {columns, rows, row_count};
  0 filas -> lista vacia; max_rows con fetchmany; read-only.
- run_mssql_query_py_pipelines: one-shot que compone connect+query y cierra
  siempre; CLI imprime JSON o CSV; contrasena desde env var (pass).

Pagina madre docs/capabilities/sql-connect.md + fila en INDEX.md.
Dependencia pymssql>=2.3.13 anadida a python/pyproject.toml + uv.lock.
Tests mock-based (11) verdes; error path verificado end-to-end contra el
driver real (host inalcanzable -> RuntimeError, acotado por login_timeout).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-22 11:29:49 +02:00
parent c1f355ffa5
commit 86d68dc9f0
13 changed files with 930 additions and 0 deletions
+81
View File
@@ -0,0 +1,81 @@
---
name: mssql_connect
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def mssql_connect(host: str, database: str, user: str, password: str, port: int = 1433, login_timeout: int = 15, query_timeout: int = 30) -> pymssql.Connection"
description: "Abre una conexion pymssql a un Microsoft SQL Server (donde corre Navision). Las credenciales llegan siempre por argumento (el caller las saca de pass/env), nunca hardcodeadas. login_timeout acota la fase de conexion/login para evitar cuelgues con un host inalcanzable. Devuelve el objeto conexion pymssql para iterar queries despues."
tags: [mssql, sqlserver, navision, sql-connect, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [pymssql]
params:
- name: host
desc: "Host o IP del servidor SQL Server. Desde WSL2 debe ser la IP LAN de Windows (ej. 10.0.0.5), no localhost."
- name: database
desc: "Nombre de la base de datos a la que conectar (ej. navdb)."
- name: user
desc: "Usuario de login de SQL Server (ej. sa)."
- name: password
desc: "Contrasena del usuario de login. Se pasa desde pass/env, nunca como literal."
- name: port
desc: "Puerto TCP del SQL Server. Por defecto 1433. La funcion lo convierte a string porque pymssql lo exige asi."
- name: login_timeout
desc: "Segundos permitidos para la fase de conexion/login antes de fallar. Por defecto 15. Evita que un host inalcanzable cuelgue indefinidamente."
- name: query_timeout
desc: "Segundos permitidos para cada query ejecutada sobre la conexion devuelta antes de hacer timeout. Por defecto 30."
output: "Un objeto pymssql.Connection abierto. El caller es responsable de cerrarlo con .close() al terminar."
tested: true
tests: ["test_golden_connect_passes_string_port_and_kwargs", "test_error_path_wraps_failure_with_host"]
test_file_path: "python/functions/infra/mssql_connect_test.py"
file_path: "python/functions/infra/mssql_connect.py"
---
## Ejemplo
```python
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "python", "functions"))
from infra.mssql_connect import mssql_connect
# La IP debe ser la IP LAN del servidor Windows: desde WSL2 "localhost" NO
# llega al host Windows. La contrasena llega del entorno, nunca literal.
conn = mssql_connect(
host="10.0.0.5",
database="navdb",
user="sa",
password=os.environ["MSSQL_PASSWORD"],
port=1433,
login_timeout=15,
)
try:
with conn.cursor() as cur:
cur.execute("SELECT TOP 1 name FROM sys.databases")
print(cur.fetchone())
finally:
conn.close()
```
## Cuando usarla
Usala cuando necesites abrir una conexion a un Microsoft SQL Server (donde
corre Navision) antes de iterar queries con `mssql_query`. Es el primer paso
de cualquier pipeline que lea datos de Navision: abre la conexion una vez,
reutilizala para varias queries, y cierrala al final. Triggers: "conecta a
Navision", "lee de SQL Server", "abre conexion mssql".
## Gotchas
- WSL2 -> Windows: usa la IP LAN del servidor Windows, NUNCA `localhost`. Desde dentro de WSL2 `localhost` no alcanza el host Windows (el reenvio de localhost solo funciona Windows -> WSL, no al reves).
- pymssql necesita el puerto como string. La funcion ya convierte `port` a `str(port)` internamente, asi que tu pasas un int normal.
- `login_timeout` esta acotado (15s por defecto) precisamente para que un host inalcanzable o mal configurado falle con un RuntimeError claro en vez de colgarse indefinidamente. Ajustalo si la red es lenta, pero no lo dejes sin limite.
- Credenciales NUNCA hardcodeadas: `user`/`password` llegan por argumento desde `pass`/env. No las escribas literales en el codigo del caller.
- Cierra la conexion con `.close()` al terminar (idealmente en un `finally`). La funcion devuelve un handle abierto y no gestiona su ciclo de vida.
- Requiere `pymssql` instalado en el venv (import perezoso: el modulo importa sin la dependencia, pero la llamada falla con RuntimeError claro si falta).
+65
View File
@@ -0,0 +1,65 @@
"""Open a connection to a Microsoft SQL Server (Navision) via pymssql."""
from __future__ import annotations
def mssql_connect(host: str, database: str, user: str, password: str,
port: int = 1433, login_timeout: int = 15,
query_timeout: int = 30):
"""Open a connection to a Microsoft SQL Server instance (e.g. Navision).
Uses the pymssql driver. Credentials are always supplied by the caller
(typically read from `pass`/env) and never hardcoded. The connection is
impure I/O: it touches the network and the database server.
pymssql expects the TCP port as a string, so `port` is converted before
being passed through. `login_timeout` bounds the connect/login phase, which
is what keeps an invalid host from hanging indefinitely; `query_timeout`
bounds individual queries run on the resulting connection.
Args:
host: SQL Server host or IP. From WSL2 this must be the Windows LAN IP
(e.g. "10.0.0.5"), not "localhost" — localhost does not reach the
Windows host from inside WSL2.
database: Name of the database to connect to (e.g. "navdb").
user: SQL Server login user (e.g. "sa").
password: Password for the login user. Pass it from `pass`/env, never
as a string literal.
port: TCP port of the SQL Server instance. Defaults to 1433. Converted
to a string internally because pymssql requires a string port.
login_timeout: Seconds allowed for the connect/login phase before it
fails. Defaults to 15. Keeps an unreachable host from hanging.
query_timeout: Seconds allowed for each query executed on the returned
connection before it times out. Defaults to 30.
Returns:
An open pymssql.Connection. The caller is responsible for closing it
with `.close()` when done.
Raises:
RuntimeError: If pymssql is not installed, or if the connection/login
fails. The message includes host:port and database for context and
the original exception is chained for debugging.
"""
# Lazy import so the module loads even without pymssql installed.
try:
import pymssql
except ImportError as exc: # pragma: no cover - exercised only without dep
raise RuntimeError(
"pymssql is required for mssql_connect; install pymssql"
) from exc
try:
return pymssql.connect(
server=host,
user=user,
password=password,
database=database,
port=str(port),
login_timeout=login_timeout,
timeout=query_timeout,
)
except Exception as exc:
raise RuntimeError(
f"mssql_connect failed connecting to {host}:{port}/{database}: {exc}"
) from exc
@@ -0,0 +1,59 @@
"""Tests for mssql_connect (mock-based, no real SQL Server)."""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.dirname(__file__))
from mssql_connect import mssql_connect
def test_golden_connect_passes_string_port_and_kwargs(monkeypatch):
"""Golden path: returns the driver connection and forwards the right kwargs.
The TCP port must reach pymssql as a STRING, and login_timeout must default
to 15 when not supplied.
"""
captured: dict = {}
sentinel = object()
def fake_connect(**kwargs):
captured.update(kwargs)
return sentinel
monkeypatch.setattr("pymssql.connect", fake_connect)
result = mssql_connect("10.0.0.5", "navdb", "sa", "pw", port=1433)
assert result is sentinel
assert captured["server"] == "10.0.0.5"
assert captured["database"] == "navdb"
assert captured["user"] == "sa"
assert captured["password"] == "pw"
assert captured["port"] == "1433"
assert isinstance(captured["port"], str)
assert captured["login_timeout"] == 15
assert captured["timeout"] == 30
def test_error_path_wraps_failure_with_host(monkeypatch):
"""Error path: a driver failure becomes a clear RuntimeError, not a hang.
The wrapped message must include the host and the phrase 'failed connecting'
so callers can diagnose connectivity problems.
"""
def fake_connect(**kwargs):
raise Exception("login timeout")
monkeypatch.setattr("pymssql.connect", fake_connect)
with pytest.raises(RuntimeError) as excinfo:
mssql_connect("10.0.0.5", "navdb", "sa", "pw", port=1433)
message = str(excinfo.value)
assert "10.0.0.5" in message
assert "failed connecting" in message
+78
View File
@@ -0,0 +1,78 @@
---
name: mssql_query
kind: function
lang: py
domain: infra
version: "1.0.0"
purity: impure
signature: "def mssql_query(conn, sql: str, params=None, max_rows: int | None = None) -> dict"
description: "Ejecuta una SELECT parametrizada (binding seguro de pymssql, sin inyeccion) sobre una conexion SQL Server/Navision ya abierta y devuelve {columns, rows como lista de dicts, row_count}. Opcion max_rows para limitar las filas."
tags: [mssql, sqlserver, navision, sql-connect, infra]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
tested: true
tests: ["test_golden_maps_rows_to_dicts", "test_binding_passes_params_to_driver", "test_zero_rows_no_error", "test_max_rows_uses_fetchmany", "test_description_none_empty_columns", "test_execution_error_raises_runtimeerror"]
test_file_path: "python/functions/infra/mssql_query_test.py"
params:
- name: conn
desc: "Conexion abierta (la que devuelve mssql_connect). No se abre ni cierra aqui; se reutiliza por duck typing via conn.cursor()."
- name: sql
desc: "Sentencia SELECT con placeholders pymssql %s (posicional) o %(nombre)s (nombrado) para los valores a vincular."
- name: params
desc: "Tuple/list para placeholders posicionales, dict para nombrados, o None. Se pasa a cursor.execute(sql, params) para binding seguro del driver (nunca interpolacion)."
- name: max_rows
desc: "Si es int>0, limita a las primeras max_rows filas (fetchmany). Si None, devuelve todas (fetchall)."
output: "Dict con tres claves: 'columns' (lista de nombres de columna en orden, vacia si no hubo result set), 'rows' (lista de dicts columna->valor, una por fila), 'row_count' (int len(rows))."
file_path: "python/functions/infra/mssql_query.py"
---
## Ejemplo
```python
import sys, os
sys.path.insert(0, os.path.join("python", "functions"))
from infra.mssql_connect import mssql_connect
from infra.mssql_query import mssql_query
conn = mssql_connect(
host="10.0.0.5", database="navdb", user="readonly", password="<desde pass>"
)
try:
res = mssql_query(
conn,
"SELECT TOP 10 No_, Amount FROM [dbo].[Cartera] WHERE [Customer No_] = %s",
("CLI-0001",),
)
print(res["columns"]) # ['No_', 'Amount']
print(res["row_count"]) # numero de filas devueltas
for fila in res["rows"]:
print(fila["No_"], fila["Amount"])
finally:
conn.close()
```
## Cuando usarla
Cuando ya tienes una conexion abierta con `mssql_connect` y quieres iterar
consultas SELECT sobre Navision / SQL Server sin reabrir la conexion en cada
una. Pasa los valores variables como `params` para que el driver los vincule de
forma segura (sin inyeccion) en lugar de construir el SQL con f-strings.
## Gotchas
- Los placeholders de pymssql son `%s` (posicional) y `%(nombre)s` (nombrado),
NO el `?` de pyodbc. Si usas el placeholder equivocado, el binding falla.
- Pasa los valores SIEMPRE por el argumento `params`, jamas con f-string o `%`
dentro del SQL: interpolar abre la puerta a inyeccion SQL.
- No hace commit: es read-only, pensada para SELECT.
- No cierra la conexion — la gestiona el caller (abrir una vez, consultar
muchas, cerrar al final).
- `max_rows` usa `cursor.fetchmany(max_rows)`; con None usa `fetchall()`.
- Si la sentencia no produce result set (`cursor.description is None`),
`columns` y `rows` vuelven como listas vacias en lugar de fallar.
- El mensaje de error es generico a proposito: no incluye el SQL ni los params
para no filtrar datos sensibles.
+77
View File
@@ -0,0 +1,77 @@
"""Run a parameterized SELECT over an open pymssql (SQL Server / Navision) connection."""
from __future__ import annotations
def mssql_query(conn, sql: str, params=None, max_rows: int | None = None) -> dict:
"""Execute a SELECT on an already-open connection and map rows to dicts.
The connection is supplied by the caller (typically from `mssql_connect`),
so a single connection can be opened once and reused for many queries. This
function never opens or closes the connection — it only borrows it. It is
impure I/O: it touches the database over an existing connection.
Parameter binding is delegated to the driver: `params` is passed straight to
`cursor.execute(sql, params)`. NEVER interpolate values into `sql` with
f-strings or `%` formatting — that opens the door to SQL injection. Use the
pymssql placeholders `%s` (positional) or `%(name)s` (named) in `sql` and
let the driver bind safely. When `params is None`, the SQL is executed with
no bound parameters.
The query runs read-only: no commit is issued. The cursor opened here is
always closed before returning (try/finally), even on error.
Args:
conn: An open connection object (e.g. the one returned by
`mssql_connect`). Used by duck typing via `conn.cursor()`, so the
concrete driver does not matter and the function stays testable.
sql: The SELECT statement, using pymssql placeholders `%s` (positional)
or `%(name)s` (named) for any bound values.
params: A tuple/list for positional placeholders, a dict for named
placeholders, or None for a query with no parameters. Passed to
`cursor.execute(sql, params)` for safe driver-side binding.
max_rows: If a positive int, only the first `max_rows` rows are fetched
(via `cursor.fetchmany(max_rows)`). If None, all rows are fetched
(via `cursor.fetchall()`).
Returns:
A dict with three keys:
- "columns": list of column names in result order (empty list if the
statement produced no result set, i.e. `cursor.description is None`).
- "rows": list of dicts, one per row, mapping each column name to its
value. Empty list when the query returned no rows.
- "row_count": int, equal to `len(rows)`.
Raises:
RuntimeError: If executing or fetching the query fails. The message is
deliberately generic (it does not include the SQL or the params,
which may carry sensitive data) and the original exception is
chained for debugging.
"""
cur = conn.cursor()
try:
try:
if params is None:
cur.execute(sql)
else:
cur.execute(sql, params)
description = cur.description
if description is None:
columns: list = []
raw_rows: list = []
else:
columns = [d[0] for d in description]
if max_rows is not None and max_rows > 0:
raw_rows = cur.fetchmany(max_rows)
else:
raw_rows = cur.fetchall()
except Exception as exc:
raise RuntimeError(
f"mssql_query failed executing query: {exc}"
) from exc
finally:
cur.close()
rows = [dict(zip(columns, row)) for row in raw_rows]
return {"columns": columns, "rows": rows, "row_count": len(rows)}
+133
View File
@@ -0,0 +1,133 @@
"""Tests para mssql_query usando un doble de prueba (sin servidor real)."""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
from functions.infra.mssql_query import mssql_query
def _desc(*names):
"""Construye una description estilo DB-API: una tupla 7-elem por columna."""
return [(name, None, None, None, None, None, None) for name in names]
class FakeCursor:
"""Doble de prueba de un cursor DB-API (pymssql-like)."""
def __init__(self, description=None, rows=None):
self.description = description
self._rows = list(rows or [])
self.executed = None # (sql, params) de la ultima execute
self.fetchmany_calls = [] # tamaños pedidos a fetchmany
self.closed = False
def execute(self, sql, params=None):
self.executed = (sql, params)
def fetchall(self):
return list(self._rows)
def fetchmany(self, size):
self.fetchmany_calls.append(size)
return list(self._rows[:size])
def close(self):
self.closed = True
class FakeConn:
"""Doble de prueba de una conexion: devuelve un FakeCursor fijo."""
def __init__(self, cursor):
self._cursor = cursor
def cursor(self):
return self._cursor
def test_golden_maps_rows_to_dicts():
cur = FakeCursor(
description=_desc("No_", "Amount"),
rows=[("CLI-1", 100), ("CLI-2", 200)],
)
conn = FakeConn(cur)
result = mssql_query(conn, "SELECT No_, Amount FROM Cartera")
assert result == {
"columns": ["No_", "Amount"],
"rows": [
{"No_": "CLI-1", "Amount": 100},
{"No_": "CLI-2", "Amount": 200},
],
"row_count": 2,
}
assert cur.closed is True
def test_binding_passes_params_to_driver():
cur = FakeCursor(description=_desc("No_"), rows=[("CLI-0001",)])
conn = FakeConn(cur)
sql = "SELECT No_ FROM Cartera WHERE [Customer No_] = %s"
mssql_query(conn, sql, params=("CLI-0001",))
# El SQL y los params llegan al driver tal cual: binding, no interpolacion.
assert cur.executed == (sql, ("CLI-0001",))
def test_zero_rows_no_error():
cur = FakeCursor(description=_desc("No_", "Amount"), rows=[])
conn = FakeConn(cur)
result = mssql_query(conn, "SELECT No_, Amount FROM Cartera WHERE 1 = 0")
assert result["rows"] == []
assert result["row_count"] == 0
assert result["columns"] == ["No_", "Amount"]
def test_max_rows_uses_fetchmany():
cur = FakeCursor(
description=_desc("No_"),
rows=[("CLI-1",), ("CLI-2",), ("CLI-3",)],
)
conn = FakeConn(cur)
result = mssql_query(conn, "SELECT No_ FROM Cartera", max_rows=1)
assert cur.fetchmany_calls == [1]
assert result["row_count"] == 1
assert result["rows"] == [{"No_": "CLI-1"}]
def test_description_none_empty_columns():
cur = FakeCursor(description=None, rows=[])
conn = FakeConn(cur)
result = mssql_query(conn, "SET NOCOUNT ON")
assert result["columns"] == []
assert result["rows"] == []
assert result["row_count"] == 0
def test_execution_error_raises_runtimeerror():
class BoomCursor(FakeCursor):
def execute(self, sql, params=None):
raise ValueError("boom")
cur = BoomCursor()
conn = FakeConn(cur)
with pytest.raises(RuntimeError, match="mssql_query failed executing query"):
mssql_query(conn, "SELECT 1")
# El cursor se cierra incluso en error (try/finally).
assert cur.closed is True