Files
fn_registry/cpp/tests/test_sse_client.cpp
T
egutierrez c52846d475 feat(cpp/core): sse_client_cpp_core — SSE client con reconnect + Last-Event-ID
Cliente Server-Sent Events C++ reusable (fn_sse::Client) con background
thread, exponential backoff, Last-Event-ID y stop() que no bloquea.

Implementacion clave: fork+execvp curl directamente (sin /bin/sh wrapper)
para tener el PID real del proceso curl en curl_pid_, lo que permite que
stop() → kill(SIGTERM) → fgets NULL → join() funcione sin bloqueo.

4 tests (Catch2): connect_and_receive_3_events, parse_event_field,
reconnect_on_disconnect, stop_kills_thread. Fixture Python SSE con
/health probe via http_request_cpp_core.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-18 19:59:50 +02:00

362 lines
12 KiB
C++

// test_sse_client.cpp — Catch2 integration tests for fn_sse::Client.
//
// Strategy: spin up a minimal Python SSE server fixture per test via fork+exec
// (same pattern as test_http_request.cpp). The server exposes:
// GET /health → 200 OK (instant, used for readiness probe)
// GET /stream → text/event-stream (the SSE endpoint)
//
// Tests:
// 1. connect_and_receive_3_events — basic event delivery
// 2. parse_event_field — "event:" field parsed correctly
// 3. reconnect_on_disconnect — auto-reconnect fires when server closes
// 4. stop_kills_thread — stop() joins quickly, is_running() → false
//
// Tests skip gracefully if python3 is not available.
#include "catch_amalgamated.hpp"
#include "core/sse_client.h"
#include "core/http_request.h" // used for readiness probe only
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <fstream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#ifndef _WIN32
#include <signal.h>
#include <sys/wait.h>
#include <unistd.h>
#endif
namespace {
// Pick a port unique-ish per process to avoid collisions.
int pick_port(int offset = 0) {
int base = 47400;
int jitter = (int)(getpid() % 200);
return base + jitter + offset;
}
// ---------------------------------------------------------------------------
// PythonSSEServer — minimal SSE server fixture
// ---------------------------------------------------------------------------
// GET /health → 200 "ok"
// GET /stream → text/event-stream (sends EVENTS then optionally closes)
static const char* kServerScript = R"PYEOF(
import sys, json, time
from http.server import BaseHTTPRequestHandler, HTTPServer
HOST = '127.0.0.1'
PORT = int(sys.argv[1])
EVENTS = json.loads(sys.argv[2])
CLOSE = sys.argv[3] == "true"
class H(BaseHTTPRequestHandler):
def log_message(self, *a, **kw): pass
def do_GET(self):
if self.path == '/health':
body = b'ok'
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
return
# /stream (or anything else) — SSE endpoint
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
for ev in EVENTS:
if ev.get('id'):
self.wfile.write(('id: ' + ev['id'] + '\n').encode())
if ev.get('type'):
self.wfile.write(('event: ' + ev['type'] + '\n').encode())
self.wfile.write(('data: ' + ev['data'] + '\n\n').encode())
self.wfile.flush()
time.sleep(0.1)
if not CLOSE:
while True:
time.sleep(1)
# close_after=true: return → connection closes
HTTPServer((HOST, PORT), H).serve_forever()
)PYEOF";
struct SseFixture {
pid_t pid = 0;
int port = 0;
// events: list of {type, data, id} (type/id may be empty strings)
bool start(int p,
const std::vector<std::tuple<std::string,std::string,std::string>>& events,
bool close_after) {
#ifdef _WIN32
(void)p; (void)events; (void)close_after;
return false;
#else
port = p;
// Build EVENTS JSON array.
std::string events_json = "[";
for (size_t i = 0; i < events.size(); ++i) {
if (i) events_json += ",";
events_json += "{\"type\":\"" + std::get<0>(events[i]) +
"\",\"data\":\"" + std::get<1>(events[i]) +
"\",\"id\":\"" + std::get<2>(events[i]) + "\"}";
}
events_json += "]";
// Write server script to tmp file.
char tmp_buf[256];
std::snprintf(tmp_buf, sizeof(tmp_buf), "/tmp/fn_sse_srv_%d.py", (int)getpid() + p);
std::string script_path(tmp_buf);
{
std::ofstream f(script_path);
f << kServerScript;
}
pid_t p2 = fork();
if (p2 < 0) return false;
if (p2 == 0) {
freopen("/dev/null", "r", stdin);
freopen("/dev/null", "w", stdout);
freopen("/dev/null", "w", stderr);
execlp("python3", "python3", "-u", script_path.c_str(),
std::to_string(port).c_str(),
events_json.c_str(),
close_after ? "true" : "false",
(char*)nullptr);
_Exit(127);
}
pid = p2;
// Poll /health via http_request until server is ready (~3 s max).
std::string health_url = std::string("http://127.0.0.1:") +
std::to_string(port) + "/health";
for (int i = 0; i < 60; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
fn_http::Request req;
req.url = health_url;
req.timeout_ms = 400;
auto res = fn_http::request(req);
if (res.status == 200) return true;
}
// Server didn't respond in time.
return false;
#endif
}
void stop() {
#ifndef _WIN32
if (pid > 0) {
kill(pid, SIGTERM);
int st = 0;
waitpid(pid, &st, 0);
pid = 0;
}
#endif
}
std::string stream_url() const {
return std::string("http://127.0.0.1:") + std::to_string(port) + "/stream";
}
};
bool python3_available() {
return system("python3 --version > /dev/null 2>&1") == 0;
}
} // anon
// ---------------------------------------------------------------------------
// Test 1: connect_and_receive_3_events
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: connect_and_receive_3_events", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
SseFixture srv;
bool ok = srv.start(pick_port(0),
{{"","hello-0","0"},{"","hello-1","1"},{"","hello-2","2"}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = false;
cfg.connect_timeout_ms = 3000;
std::mutex mtx;
std::vector<fn_sse::Event> received;
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event& e) {
std::lock_guard<std::mutex> lk(mtx);
received.push_back(e);
});
// Wait up to 5 s for 3 events.
for (int i = 0; i < 100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::lock_guard<std::mutex> lk(mtx);
if ((int)received.size() >= 3) break;
}
cli.stop();
srv.stop();
std::lock_guard<std::mutex> lk(mtx);
REQUIRE(received.size() >= 3);
REQUIRE(received[0].data == "hello-0");
REQUIRE(received[1].data == "hello-1");
REQUIRE(received[2].data == "hello-2");
// Default event type when "event:" field absent.
REQUIRE(received[0].event == "message");
#endif
}
// ---------------------------------------------------------------------------
// Test 2: parse_event_field
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: parse_event_field", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
SseFixture srv;
bool ok = srv.start(pick_port(2),
{{"card_changed", R"({"id":42})", "1"}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = false;
cfg.connect_timeout_ms = 3000;
std::mutex mtx;
fn_sse::Event captured;
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event& e) {
std::lock_guard<std::mutex> lk(mtx);
captured = e;
});
for (int i = 0; i < 80; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::lock_guard<std::mutex> lk(mtx);
if (!captured.data.empty()) break;
}
cli.stop();
srv.stop();
std::lock_guard<std::mutex> lk(mtx);
REQUIRE(captured.event == "card_changed");
REQUIRE(captured.data == R"({"id":42})");
REQUIRE(captured.id == "1");
#endif
}
// ---------------------------------------------------------------------------
// Test 3: reconnect_on_disconnect
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: reconnect_on_disconnect", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
// Server sends 2 events then closes. With auto_reconnect=true the client
// should reconnect and receive 2 more events (same server re-handles new req).
SseFixture srv;
bool ok = srv.start(pick_port(4),
{{"","wave-0",""},{"","wave-1",""}},
/*close_after=*/true);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = true;
cfg.reconnect_initial_ms = 200; // fast backoff for test
cfg.connect_timeout_ms = 3000;
std::atomic<int> total_events{0};
std::atomic<int> disconnects{0};
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event&) { total_events++; },
[&](const std::string& s) {
if (s == "disconnected") disconnects++;
});
// Wait up to 8 s for at least 2 disconnects (meaning at least 2 reconnects).
for (int i = 0; i < 160; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (disconnects >= 2) break;
}
cli.stop();
srv.stop();
REQUIRE(total_events >= 2); // at least first batch arrived
REQUIRE(disconnects >= 1); // client detected server close + reconnected
#endif
}
// ---------------------------------------------------------------------------
// Test 4: stop_kills_thread
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: stop_kills_thread", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
// Start client against an infinite-stream server, then immediately stop.
SseFixture srv;
bool ok = srv.start(pick_port(6),
{{"","ping",""}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = true;
cfg.connect_timeout_ms = 3000;
fn_sse::Client cli;
cli.start(cfg, [](const fn_sse::Event&) {});
// Give the thread a moment to start and connect.
std::this_thread::sleep_for(std::chrono::milliseconds(400));
REQUIRE(cli.is_running());
auto t0 = std::chrono::steady_clock::now();
cli.stop();
auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - t0).count();
srv.stop();
REQUIRE_FALSE(cli.is_running());
// stop() should complete in well under 5 s (SIGTERM + join).
REQUIRE(elapsed_ms < 5000);
#endif
}