Files
fn_registry/python/functions/infra/http_replay_sequence_test.py
T
egutierrez 8742cb25be feat(browser): auto-commit con 60 cambios
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-07 11:42:31 +02:00

121 lines
4.3 KiB
Python

"""Tests para http_replay_sequence.
No dependen de red: mockean requests.Session.request con unittest.mock para
verificar substitucion, extraccion y manejo de errores de transporte.
"""
from unittest.mock import patch
import requests
from .http_replay_sequence import http_replay_sequence
class _FakeResp:
"""Respuesta minima que imita lo que usa la funcion de requests.Response."""
def __init__(self, status_code=200, json_data=None, text="", headers=None):
self.status_code = status_code
self._json = json_data if json_data is not None else {}
self.text = text
self.headers = headers or {}
def json(self):
return self._json
def test_golden_extract_and_subst():
"""2 pasos: extract json del paso 0 -> usado en {{token}} del paso 1.
Verifica que la url y el header del paso 1 llevaron el valor substituido.
"""
sent = [] # captura (method, url, kwargs) de cada request
def fake_request(self, method, url, **kwargs):
sent.append((method, url, kwargs))
if "/uuid" in url:
return _FakeResp(200, json_data={"data": {"items": [{"token": "ABC123"}]}})
return _FakeResp(200, json_data={"echo": True})
calls = [
{"method": "GET", "url": "https://api.example/uuid",
"headers": {"Accept": "application/json"}, "body": None, "body_type": None},
{"method": "POST", "url": "https://api.example/use/{{token}}",
"headers": {"X-Token": "{{token}}"}, "body": '{"k": "v"}', "body_type": "json"},
]
extract = [
{"from": 0, "type": "json", "expr": "data.items.0.token", "as": "token"},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls, extract=extract)
assert result["status"] == "ok"
assert result["error"] == ""
assert result["params_final"]["token"] == "ABC123"
# Paso 0 extrajo el token.
assert result["steps"][0]["extracted"] == {"token": "ABC123"}
assert result["steps"][0]["ok"] is True
# Paso 1: la URL fue substituida.
method1, url1, kwargs1 = sent[1]
assert method1 == "POST"
assert url1 == "https://api.example/use/ABC123"
# El header X-Token llevo el valor substituido.
assert kwargs1["headers"]["X-Token"] == "ABC123"
# El body se manda como data= sin re-serializar.
assert kwargs1["data"] == '{"k": "v"}'
assert result["steps"][1]["ok"] is True
assert result["steps"][1]["missing_params"] == []
def test_edge_missing_param():
"""Param faltante -> missing_params poblado y literal {{x}} intacto."""
sent = []
def fake_request(self, method, url, **kwargs):
sent.append((method, url, kwargs))
return _FakeResp(200, json_data={})
calls = [
{"method": "GET", "url": "https://api.example/path/{{missing}}",
"headers": {"X-H": "{{missing}}"}, "body": None, "body_type": None},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls)
assert result["status"] == "ok"
# El literal {{missing}} queda intacto tanto en url como en header.
method0, url0, kwargs0 = sent[0]
assert url0 == "https://api.example/path/{{missing}}"
assert kwargs0["headers"]["X-H"] == "{{missing}}"
# El step registra el param faltante (deduplicado, una sola vez).
assert result["steps"][0]["missing_params"] == ["missing"]
def test_error_path_request_exception():
"""La sesion lanza requests.RequestException -> status=error, corta, step.error poblado."""
def fake_request(self, method, url, **kwargs):
raise requests.RequestException("connection refused")
calls = [
{"method": "GET", "url": "https://down.example/a", "headers": {},
"body": None, "body_type": None},
{"method": "GET", "url": "https://down.example/b", "headers": {},
"body": None, "body_type": None},
]
with patch.object(requests.Session, "request", fake_request):
result = http_replay_sequence(calls)
assert result["status"] == "error"
assert "connection refused" in result["error"]
# Corta tras la excepcion: solo se registro el primer step.
assert len(result["steps"]) == 1
assert result["steps"][0]["ok"] is False
assert result["steps"][0]["status_code"] == 0
assert "connection refused" in result["steps"][0]["error"]