eb57e83392
Cliente Server-Sent Events C++ reusable (fn_sse::Client) con background thread, exponential backoff, Last-Event-ID y stop() que no bloquea. Implementacion clave: fork+execvp curl directamente (sin /bin/sh wrapper) para tener el PID real del proceso curl en curl_pid_, lo que permite que stop() → kill(SIGTERM) → fgets NULL → join() funcione sin bloqueo. 4 tests (Catch2): connect_and_receive_3_events, parse_event_field, reconnect_on_disconnect, stop_kills_thread. Fixture Python SSE con /health probe via http_request_cpp_core. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
362 lines
12 KiB
C++
362 lines
12 KiB
C++
// test_sse_client.cpp — Catch2 integration tests for fn_sse::Client.
|
|
//
|
|
// Strategy: spin up a minimal Python SSE server fixture per test via fork+exec
|
|
// (same pattern as test_http_request.cpp). The server exposes:
|
|
// GET /health → 200 OK (instant, used for readiness probe)
|
|
// GET /stream → text/event-stream (the SSE endpoint)
|
|
//
|
|
// Tests:
|
|
// 1. connect_and_receive_3_events — basic event delivery
|
|
// 2. parse_event_field — "event:" field parsed correctly
|
|
// 3. reconnect_on_disconnect — auto-reconnect fires when server closes
|
|
// 4. stop_kills_thread — stop() joins quickly, is_running() → false
|
|
//
|
|
// Tests skip gracefully if python3 is not available.
|
|
#include "catch_amalgamated.hpp"
|
|
#include "core/sse_client.h"
|
|
#include "core/http_request.h" // used for readiness probe only
|
|
|
|
#include <atomic>
|
|
#include <chrono>
|
|
#include <cstdio>
|
|
#include <cstdlib>
|
|
#include <fstream>
|
|
#include <mutex>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <vector>
|
|
|
|
#ifndef _WIN32
|
|
#include <signal.h>
|
|
#include <sys/wait.h>
|
|
#include <unistd.h>
|
|
#endif
|
|
|
|
namespace {
|
|
|
|
// Pick a port unique-ish per process to avoid collisions.
|
|
int pick_port(int offset = 0) {
|
|
int base = 47400;
|
|
int jitter = (int)(getpid() % 200);
|
|
return base + jitter + offset;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// PythonSSEServer — minimal SSE server fixture
|
|
// ---------------------------------------------------------------------------
|
|
// GET /health → 200 "ok"
|
|
// GET /stream → text/event-stream (sends EVENTS then optionally closes)
|
|
|
|
static const char* kServerScript = R"PYEOF(
|
|
import sys, json, time
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
HOST = '127.0.0.1'
|
|
PORT = int(sys.argv[1])
|
|
EVENTS = json.loads(sys.argv[2])
|
|
CLOSE = sys.argv[3] == "true"
|
|
|
|
class H(BaseHTTPRequestHandler):
|
|
def log_message(self, *a, **kw): pass
|
|
|
|
def do_GET(self):
|
|
if self.path == '/health':
|
|
body = b'ok'
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/plain')
|
|
self.send_header('Content-Length', str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
return
|
|
|
|
# /stream (or anything else) — SSE endpoint
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/event-stream')
|
|
self.send_header('Cache-Control', 'no-cache')
|
|
self.end_headers()
|
|
for ev in EVENTS:
|
|
if ev.get('id'):
|
|
self.wfile.write(('id: ' + ev['id'] + '\n').encode())
|
|
if ev.get('type'):
|
|
self.wfile.write(('event: ' + ev['type'] + '\n').encode())
|
|
self.wfile.write(('data: ' + ev['data'] + '\n\n').encode())
|
|
self.wfile.flush()
|
|
time.sleep(0.1)
|
|
if not CLOSE:
|
|
while True:
|
|
time.sleep(1)
|
|
# close_after=true: return → connection closes
|
|
|
|
HTTPServer((HOST, PORT), H).serve_forever()
|
|
)PYEOF";
|
|
|
|
struct SseFixture {
|
|
pid_t pid = 0;
|
|
int port = 0;
|
|
|
|
// events: list of {type, data, id} (type/id may be empty strings)
|
|
bool start(int p,
|
|
const std::vector<std::tuple<std::string,std::string,std::string>>& events,
|
|
bool close_after) {
|
|
#ifdef _WIN32
|
|
(void)p; (void)events; (void)close_after;
|
|
return false;
|
|
#else
|
|
port = p;
|
|
|
|
// Build EVENTS JSON array.
|
|
std::string events_json = "[";
|
|
for (size_t i = 0; i < events.size(); ++i) {
|
|
if (i) events_json += ",";
|
|
events_json += "{\"type\":\"" + std::get<0>(events[i]) +
|
|
"\",\"data\":\"" + std::get<1>(events[i]) +
|
|
"\",\"id\":\"" + std::get<2>(events[i]) + "\"}";
|
|
}
|
|
events_json += "]";
|
|
|
|
// Write server script to tmp file.
|
|
char tmp_buf[256];
|
|
std::snprintf(tmp_buf, sizeof(tmp_buf), "/tmp/fn_sse_srv_%d.py", (int)getpid() + p);
|
|
std::string script_path(tmp_buf);
|
|
{
|
|
std::ofstream f(script_path);
|
|
f << kServerScript;
|
|
}
|
|
|
|
pid_t p2 = fork();
|
|
if (p2 < 0) return false;
|
|
if (p2 == 0) {
|
|
freopen("/dev/null", "r", stdin);
|
|
freopen("/dev/null", "w", stdout);
|
|
freopen("/dev/null", "w", stderr);
|
|
execlp("python3", "python3", "-u", script_path.c_str(),
|
|
std::to_string(port).c_str(),
|
|
events_json.c_str(),
|
|
close_after ? "true" : "false",
|
|
(char*)nullptr);
|
|
_Exit(127);
|
|
}
|
|
pid = p2;
|
|
|
|
// Poll /health via http_request until server is ready (~3 s max).
|
|
std::string health_url = std::string("http://127.0.0.1:") +
|
|
std::to_string(port) + "/health";
|
|
for (int i = 0; i < 60; ++i) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
fn_http::Request req;
|
|
req.url = health_url;
|
|
req.timeout_ms = 400;
|
|
auto res = fn_http::request(req);
|
|
if (res.status == 200) return true;
|
|
}
|
|
// Server didn't respond in time.
|
|
return false;
|
|
#endif
|
|
}
|
|
|
|
void stop() {
|
|
#ifndef _WIN32
|
|
if (pid > 0) {
|
|
kill(pid, SIGTERM);
|
|
int st = 0;
|
|
waitpid(pid, &st, 0);
|
|
pid = 0;
|
|
}
|
|
#endif
|
|
}
|
|
|
|
std::string stream_url() const {
|
|
return std::string("http://127.0.0.1:") + std::to_string(port) + "/stream";
|
|
}
|
|
};
|
|
|
|
bool python3_available() {
|
|
return system("python3 --version > /dev/null 2>&1") == 0;
|
|
}
|
|
|
|
} // anon
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Test 1: connect_and_receive_3_events
|
|
// ---------------------------------------------------------------------------
|
|
TEST_CASE("sse_client: connect_and_receive_3_events", "[sse_client]") {
|
|
#ifdef _WIN32
|
|
SUCCEED("skipped on Windows"); return;
|
|
#else
|
|
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
|
|
|
|
SseFixture srv;
|
|
bool ok = srv.start(pick_port(0),
|
|
{{"","hello-0","0"},{"","hello-1","1"},{"","hello-2","2"}},
|
|
/*close_after=*/false);
|
|
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
|
|
|
|
fn_sse::Config cfg;
|
|
cfg.url = srv.stream_url();
|
|
cfg.auto_reconnect = false;
|
|
cfg.connect_timeout_ms = 3000;
|
|
|
|
std::mutex mtx;
|
|
std::vector<fn_sse::Event> received;
|
|
|
|
fn_sse::Client cli;
|
|
cli.start(cfg,
|
|
[&](const fn_sse::Event& e) {
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
received.push_back(e);
|
|
});
|
|
|
|
// Wait up to 5 s for 3 events.
|
|
for (int i = 0; i < 100; ++i) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
if ((int)received.size() >= 3) break;
|
|
}
|
|
|
|
cli.stop();
|
|
srv.stop();
|
|
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
REQUIRE(received.size() >= 3);
|
|
REQUIRE(received[0].data == "hello-0");
|
|
REQUIRE(received[1].data == "hello-1");
|
|
REQUIRE(received[2].data == "hello-2");
|
|
// Default event type when "event:" field absent.
|
|
REQUIRE(received[0].event == "message");
|
|
#endif
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Test 2: parse_event_field
|
|
// ---------------------------------------------------------------------------
|
|
TEST_CASE("sse_client: parse_event_field", "[sse_client]") {
|
|
#ifdef _WIN32
|
|
SUCCEED("skipped on Windows"); return;
|
|
#else
|
|
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
|
|
|
|
SseFixture srv;
|
|
bool ok = srv.start(pick_port(2),
|
|
{{"card_changed", R"({"id":42})", "1"}},
|
|
/*close_after=*/false);
|
|
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
|
|
|
|
fn_sse::Config cfg;
|
|
cfg.url = srv.stream_url();
|
|
cfg.auto_reconnect = false;
|
|
cfg.connect_timeout_ms = 3000;
|
|
|
|
std::mutex mtx;
|
|
fn_sse::Event captured;
|
|
|
|
fn_sse::Client cli;
|
|
cli.start(cfg,
|
|
[&](const fn_sse::Event& e) {
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
captured = e;
|
|
});
|
|
|
|
for (int i = 0; i < 80; ++i) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
if (!captured.data.empty()) break;
|
|
}
|
|
|
|
cli.stop();
|
|
srv.stop();
|
|
|
|
std::lock_guard<std::mutex> lk(mtx);
|
|
REQUIRE(captured.event == "card_changed");
|
|
REQUIRE(captured.data == R"({"id":42})");
|
|
REQUIRE(captured.id == "1");
|
|
#endif
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Test 3: reconnect_on_disconnect
|
|
// ---------------------------------------------------------------------------
|
|
TEST_CASE("sse_client: reconnect_on_disconnect", "[sse_client]") {
|
|
#ifdef _WIN32
|
|
SUCCEED("skipped on Windows"); return;
|
|
#else
|
|
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
|
|
|
|
// Server sends 2 events then closes. With auto_reconnect=true the client
|
|
// should reconnect and receive 2 more events (same server re-handles new req).
|
|
SseFixture srv;
|
|
bool ok = srv.start(pick_port(4),
|
|
{{"","wave-0",""},{"","wave-1",""}},
|
|
/*close_after=*/true);
|
|
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
|
|
|
|
fn_sse::Config cfg;
|
|
cfg.url = srv.stream_url();
|
|
cfg.auto_reconnect = true;
|
|
cfg.reconnect_initial_ms = 200; // fast backoff for test
|
|
cfg.connect_timeout_ms = 3000;
|
|
|
|
std::atomic<int> total_events{0};
|
|
std::atomic<int> disconnects{0};
|
|
|
|
fn_sse::Client cli;
|
|
cli.start(cfg,
|
|
[&](const fn_sse::Event&) { total_events++; },
|
|
[&](const std::string& s) {
|
|
if (s == "disconnected") disconnects++;
|
|
});
|
|
|
|
// Wait up to 8 s for at least 2 disconnects (meaning at least 2 reconnects).
|
|
for (int i = 0; i < 160; ++i) {
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
if (disconnects >= 2) break;
|
|
}
|
|
|
|
cli.stop();
|
|
srv.stop();
|
|
|
|
REQUIRE(total_events >= 2); // at least first batch arrived
|
|
REQUIRE(disconnects >= 1); // client detected server close + reconnected
|
|
#endif
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Test 4: stop_kills_thread
|
|
// ---------------------------------------------------------------------------
|
|
TEST_CASE("sse_client: stop_kills_thread", "[sse_client]") {
|
|
#ifdef _WIN32
|
|
SUCCEED("skipped on Windows"); return;
|
|
#else
|
|
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
|
|
|
|
// Start client against an infinite-stream server, then immediately stop.
|
|
SseFixture srv;
|
|
bool ok = srv.start(pick_port(6),
|
|
{{"","ping",""}},
|
|
/*close_after=*/false);
|
|
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
|
|
|
|
fn_sse::Config cfg;
|
|
cfg.url = srv.stream_url();
|
|
cfg.auto_reconnect = true;
|
|
cfg.connect_timeout_ms = 3000;
|
|
|
|
fn_sse::Client cli;
|
|
cli.start(cfg, [](const fn_sse::Event&) {});
|
|
|
|
// Give the thread a moment to start and connect.
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(400));
|
|
REQUIRE(cli.is_running());
|
|
|
|
auto t0 = std::chrono::steady_clock::now();
|
|
cli.stop();
|
|
auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
std::chrono::steady_clock::now() - t0).count();
|
|
|
|
srv.stop();
|
|
|
|
REQUIRE_FALSE(cli.is_running());
|
|
// stop() should complete in well under 5 s (SIGTERM + join).
|
|
REQUIRE(elapsed_ms < 5000);
|
|
#endif
|
|
}
|