Files
fn_registry/python/functions/pipelines/run_mssql_query.py
T
egutierrez 86d68dc9f0 feat(infra): conexion y consulta directa a SQL Server (Navision) via pymssql
Grupo de capacidad nuevo 'sql-connect' (3 funciones) para conectar a un
Microsoft SQL Server (donde corre Navision) y consultar directamente, en
lugar del ida y vuelta manual de pegar CSVs.

- mssql_connect_py_infra: abre conexion pymssql (login_timeout acotado,
  credenciales por argumento, RuntimeError claro si falla).
- mssql_query_py_infra: SELECT parametrizada con binding seguro (sin
  inyeccion) sobre conexion abierta; devuelve {columns, rows, row_count};
  0 filas -> lista vacia; max_rows con fetchmany; read-only.
- run_mssql_query_py_pipelines: one-shot que compone connect+query y cierra
  siempre; CLI imprime JSON o CSV; contrasena desde env var (pass).

Pagina madre docs/capabilities/sql-connect.md + fila en INDEX.md.
Dependencia pymssql>=2.3.13 anadida a python/pyproject.toml + uv.lock.
Tests mock-based (11) verdes; error path verificado end-to-end contra el
driver real (host inalcanzable -> RuntimeError, acotado por login_timeout).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-22 11:29:49 +02:00

141 lines
5.7 KiB
Python

"""One-shot SQL Server (Navision) query: connect, run a SELECT, print rows.
Composes the registry functions `mssql_connect` and `mssql_query` so a single
`fn run run_mssql_query ...` opens a connection, runs one parameterized SELECT,
closes the connection, and prints the rows as JSON or CSV. Built to make
iterating over Navision queries a one-command loop instead of a manual
copy-paste round trip.
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from infra.mssql_connect import mssql_connect
from infra.mssql_query import mssql_query
def run_mssql_query(host: str, database: str, user: str, password: str,
sql: str, params=None, port: int = 1433,
max_rows: int | None = None, login_timeout: int = 15,
query_timeout: int = 30) -> dict:
"""Open a SQL Server connection, run one SELECT, close, return the rows.
Thin impure composition of `mssql_connect` + `mssql_query`. The connection
is always closed (try/finally), even on error. Credentials are supplied by
the caller (read from `pass`/env) and never hardcoded.
Args:
host: SQL Server host or IP. From WSL2 use the Windows LAN IP, not
"localhost".
database: Database name to connect to.
user: SQL Server login user.
password: Password for the login user. Pass it from `pass`/env, never a
string literal in code.
sql: The SELECT statement, using pymssql placeholders `%s` (positional)
or `%(name)s` (named) for any bound values.
params: Tuple/list for positional placeholders, dict for named
placeholders, or None. Bound safely by the driver (no injection).
port: TCP port of the SQL Server instance. Defaults to 1433.
max_rows: If a positive int, only the first `max_rows` rows are
returned. If None, all rows are returned.
login_timeout: Seconds allowed for the connect/login phase. Defaults to
15. Keeps an unreachable host from hanging.
query_timeout: Seconds allowed for the query. Defaults to 30.
Returns:
The dict returned by `mssql_query`: {"columns": [...], "rows": [...],
"row_count": int}.
Raises:
RuntimeError: If the connection or the query fails. The original
exception (from `mssql_connect` / `mssql_query`) is chained.
"""
conn = mssql_connect(
host, database, user, password,
port=port, login_timeout=login_timeout, query_timeout=query_timeout,
)
try:
return mssql_query(conn, sql, params=params, max_rows=max_rows)
finally:
try:
conn.close()
except Exception: # pragma: no cover - close errors are non-fatal here
pass
def _to_csv(result: dict) -> str:
"""Render a query result dict as CSV text (header + rows)."""
import csv
import io
buf = io.StringIO()
writer = csv.writer(buf)
columns = result.get("columns", [])
writer.writerow(columns)
for row in result.get("rows", []):
writer.writerow([row.get(col) for col in columns])
return buf.getvalue()
if __name__ == "__main__":
import argparse
import json
parser = argparse.ArgumentParser(
description=(
"Ejecuta una SELECT contra un SQL Server (Navision) e imprime las "
"filas. Compone mssql_connect + mssql_query."
)
)
parser.add_argument("--host", required=True, help="Host/IP del SQL Server (IP LAN de Windows desde WSL2).")
parser.add_argument("--database", required=True, help="Nombre de la base de datos.")
parser.add_argument("--user", required=True, help="Usuario de login.")
parser.add_argument(
"--password-env", default="MSSQL_PASSWORD",
help="Variable de entorno de la que leer la contrasena (default MSSQL_PASSWORD). "
"Uso recomendado: MSSQL_PASSWORD=$(pass navision/password) fn run run_mssql_query ...",
)
parser.add_argument(
"--password", default="",
help="Contrasena literal (DESACONSEJADO: visible en la lista de procesos). "
"Prefiere --password-env.",
)
parser.add_argument("--sql", required=True, help="Sentencia SELECT (placeholders %%s o %%(nombre)s).")
parser.add_argument(
"--param", action="append", default=None, dest="params",
help="Parametro posicional para los placeholders %%s. Repetible y en orden.",
)
parser.add_argument("--port", type=int, default=1433, help="Puerto TCP. Default 1433.")
parser.add_argument("--max-rows", type=int, default=None, help="Limite de filas devueltas.")
parser.add_argument("--login-timeout", type=int, default=15, help="Timeout de login en segundos.")
parser.add_argument("--query-timeout", type=int, default=30, help="Timeout de query en segundos.")
parser.add_argument(
"--format", choices=["json", "csv"], default="json", dest="fmt",
help="Formato de salida. Default json.",
)
args = parser.parse_args()
password = args.password or os.environ.get(args.password_env, "")
if not password:
parser.error(
f"sin contrasena: define la env var {args.password_env!r} "
f"(p.ej. MSSQL_PASSWORD=$(pass navision/password)) o pasa --password."
)
params = tuple(args.params) if args.params else None
result = run_mssql_query(
args.host, args.database, args.user, password,
args.sql, params=params, port=args.port, max_rows=args.max_rows,
login_timeout=args.login_timeout, query_timeout=args.query_timeout,
)
if args.fmt == "csv":
sys.stdout.write(_to_csv(result))
else:
print(json.dumps(result, default=str, ensure_ascii=False))