Files
fn_registry/python/functions/infra/http_post_json_test.py
T
egutierrez 5a324f6554 feat: funciones Python infra y tipos Python (core, datascience, infra)
Infra: cache_to_file, cache_to_sqlite, http_download_file, http_get_json,
http_post_json, read_file_with_encoding, safe_extract_zip, scan_directory,
setup_logger, normalize_zip_filenames.
Tipos: 30+ tipos core (agent_action, context, task, message, parse_result...),
6 tipos datascience (entity_candidate, extraction_result...), 2 tipos infra.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:43 +02:00

77 lines
2.4 KiB
Python

"""Tests para http_post_json."""
import json
import sys
import unittest
import urllib.error
from io import BytesIO
from unittest.mock import MagicMock, patch
sys.path.insert(0, "/home/lucas/fn_registry/python/functions")
from infra.http_post_json import http_post_json
def _make_response(data: bytes, status: int = 200):
resp = MagicMock()
resp.read.return_value = data
resp.status = status
resp.__enter__ = lambda s: s
resp.__exit__ = MagicMock(return_value=False)
return resp
class TestHttpPostJson(unittest.TestCase):
def test_mock_post_body_serializado_correctamente(self):
captured = []
def fake_urlopen(req, timeout=None):
captured.append(req.data)
return _make_response(b'{"created": true}')
body = {"name": "test", "value": 99}
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_post_json("http://example.com/api", body)
sent = json.loads(captured[0])
self.assertEqual(sent["name"], "test")
self.assertEqual(sent["value"], 99)
def test_mock_respuesta_201(self):
mock_resp = _make_response(b'{"id": 1}', status=201)
with patch("urllib.request.urlopen", return_value=mock_resp):
result = http_post_json("http://example.com/api", {"x": 1})
self.assertEqual(result, {"id": 1})
def test_mock_respuesta_500_error(self):
err = urllib.error.HTTPError(
url="http://example.com/api",
code=500,
msg="Internal Server Error",
hdrs=None, # type: ignore[arg-type]
fp=BytesIO(b"server error details"),
)
with patch("urllib.request.urlopen", side_effect=err):
with self.assertRaises(RuntimeError) as ctx:
http_post_json("http://example.com/api", {"x": 1})
self.assertIn("500", str(ctx.exception))
def test_body_con_unicode(self):
captured = []
def fake_urlopen(req, timeout=None):
captured.append(req.data)
return _make_response(b'{"ok": true}')
body = {"mensaje": "Hola mundo \u00e9\u00e0\u00fc \U0001f600"}
with patch("urllib.request.urlopen", side_effect=fake_urlopen):
http_post_json("http://example.com/api", body)
decoded = json.loads(captured[0].decode("utf-8"))
self.assertEqual(decoded["mensaje"], body["mensaje"])
if __name__ == "__main__":
unittest.main()