From eb57e833922ec815c9bdf58d6614fb2a3d95930e Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Mon, 18 May 2026 19:59:50 +0200 Subject: [PATCH] =?UTF-8?q?feat(cpp/core):=20sse=5Fclient=5Fcpp=5Fcore=20?= =?UTF-8?q?=E2=80=94=20SSE=20client=20con=20reconnect=20+=20Last-Event-ID?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cliente Server-Sent Events C++ reusable (fn_sse::Client) con background thread, exponential backoff, Last-Event-ID y stop() que no bloquea. Implementacion clave: fork+execvp curl directamente (sin /bin/sh wrapper) para tener el PID real del proceso curl en curl_pid_, lo que permite que stop() → kill(SIGTERM) → fgets NULL → join() funcione sin bloqueo. 4 tests (Catch2): connect_and_receive_3_events, parse_event_field, reconnect_on_disconnect, stop_kills_thread. Fixture Python SSE con /health probe via http_request_cpp_core. Co-Authored-By: Claude Sonnet 4.6 --- cpp/functions/core/sse_client.cpp | 360 +++++++++++++++++++++++++++++ cpp/functions/core/sse_client.h | 65 ++++++ cpp/functions/core/sse_client.md | 94 ++++++++ cpp/tests/CMakeLists.txt | 8 + cpp/tests/test_sse_client.cpp | 361 ++++++++++++++++++++++++++++++ 5 files changed, 888 insertions(+) create mode 100644 cpp/functions/core/sse_client.cpp create mode 100644 cpp/functions/core/sse_client.h create mode 100644 cpp/functions/core/sse_client.md create mode 100644 cpp/tests/test_sse_client.cpp diff --git a/cpp/functions/core/sse_client.cpp b/cpp/functions/core/sse_client.cpp new file mode 100644 index 00000000..98773d55 --- /dev/null +++ b/cpp/functions/core/sse_client.cpp @@ -0,0 +1,360 @@ +// sse_client.cpp — Server-Sent Events client via curl -N (fork+exec direct). +// +// Transport: background thread fork+exec curl DIRECTLY (no /bin/sh wrapper) +// so that the child PID is the curl process itself. This lets stop() call +// kill(curl_pid, SIGTERM) and have fgets() return NULL immediately. +// +// Using /bin/sh -c "curl ..." hides the real PID (it's the grandchild) and +// kill(sh_pid) leaves curl orphaned → fgets() never returns → join() hangs. +// +// No link-time libcurl. curl must be in PATH at runtime. +// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core. +#include "core/sse_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#include +#endif + +namespace fn_sse { + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +namespace { + +int64_t now_ms() { + using clock = std::chrono::steady_clock; + return std::chrono::duration_cast( + clock::now().time_since_epoch()).count(); +} + +void sleep_ms(int ms) { + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); +} + +// Strip trailing \r and \n. +void rstrip_newline(std::string& s) { + while (!s.empty() && (s.back() == '\r' || s.back() == '\n')) + s.pop_back(); +} + +int parse_int_field(const std::string& s) { + size_t i = 0; + while (i < s.size() && (s[i] == ' ' || s[i] == '\t')) ++i; + if (i == s.size()) return 0; + char* end = nullptr; + long v = std::strtol(s.c_str() + i, &end, 10); + return (end && end != s.c_str() + i) ? (int)v : 0; +} + +// Parse one SSE line into the event buffer. +void parse_sse_line(const std::string& line, Event& buf, bool& should_flush) { + should_flush = false; + + if (line.empty()) { + should_flush = true; + return; + } + + if (line[0] == ':') return; // comment + + size_t colon = line.find(':'); + std::string field, value; + if (colon == std::string::npos) { + field = line; + value = ""; + } else { + field = line.substr(0, colon); + value = line.substr(colon + 1); + if (!value.empty() && value[0] == ' ') value = value.substr(1); + } + + if (field == "event") { + buf.event = value; + } else if (field == "data") { + if (!buf.data.empty()) buf.data += '\n'; + buf.data += value; + } else if (field == "id") { + if (value.find('\0') == std::string::npos) + buf.id = value; + } else if (field == "retry") { + int ms = parse_int_field(value); + if (ms > 0) buf.retry_ms = ms; + } +} + +// Dispatch buffered event, reset mutable fields. +void flush_event(Event& buf, const Client::EventHandler& cb) { + if (buf.data.empty()) { + buf.event = ""; + return; + } + if (buf.event.empty()) buf.event = "message"; + if (!buf.data.empty() && buf.data.back() == '\n') buf.data.pop_back(); + if (cb) cb(buf); + std::string saved_id = buf.id; + int saved_rms = buf.retry_ms; + buf = Event{}; + buf.id = saved_id; + buf.retry_ms = saved_rms; +} + +#ifndef _WIN32 +// Build argv vector for exec. Returns vector of char* pointing into `strs`. +// The last element is nullptr. +struct CurlArgs { + std::vector strs; + std::vector argv; + + void add(const std::string& s) { strs.push_back(s); } + + void build() { + argv.clear(); + for (const auto& s : strs) argv.push_back(s.c_str()); + argv.push_back(nullptr); + } +}; +#endif + +} // anon + +// --------------------------------------------------------------------------- +// Impl +// --------------------------------------------------------------------------- + +struct Client::Impl { + std::thread thread_; + std::atomic stop_requested_{false}; + std::atomic running_{false}; + +#ifndef _WIN32 + std::atomic curl_pid_{0}; +#endif + + void run(Config cfg, EventHandler on_event, StatusHandler on_status) { + running_ = true; + + int backoff_ms = cfg.reconnect_initial_ms; + std::string last_id; + + auto emit_status = [&](const std::string& s) { + if (on_status) on_status(s); + }; + + while (!stop_requested_) { + emit_status("connecting"); + +#ifndef _WIN32 + // Build curl args directly — no shell wrapper, so PID == curl PID. + CurlArgs args; + args.add("curl"); + args.add("-N"); // no output buffering + args.add("-sS"); // silent + show errors + args.add("--max-time"); + args.add("0"); // no global timeout (stream is indefinite) + args.add("--connect-timeout"); + args.add(std::to_string(cfg.connect_timeout_ms / 1000 + 1)); + args.add("-H"); args.add("Accept: text/event-stream"); + args.add("-H"); args.add("Cache-Control: no-cache"); + + if (!cfg.bearer_token.empty()) { + args.add("-H"); + args.add("Authorization: Bearer " + cfg.bearer_token); + } + if (!last_id.empty()) { + args.add("-H"); + args.add("Last-Event-ID: " + last_id); + } + args.add(cfg.url); + args.build(); + + // Fork+pipe: child exec's curl, parent reads stdout. + int pipefd[2]; + if (::pipe(pipefd) != 0) { + emit_status("error: pipe() failed"); + if (!cfg.auto_reconnect || stop_requested_) break; + sleep_ms(backoff_ms); + backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms); + continue; + } + + pid_t child = ::fork(); + if (child < 0) { + ::close(pipefd[0]); + ::close(pipefd[1]); + emit_status("error: fork() failed"); + if (!cfg.auto_reconnect || stop_requested_) break; + sleep_ms(backoff_ms); + backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms); + continue; + } + + if (child == 0) { + // Child: redirect stdout to pipe write-end, stderr to /dev/null. + ::close(pipefd[0]); + ::dup2(pipefd[1], STDOUT_FILENO); + ::close(pipefd[1]); + int devnull = ::open("/dev/null", O_WRONLY); + if (devnull >= 0) { + ::dup2(devnull, STDERR_FILENO); + ::close(devnull); + } + // exec curl directly — no sh wrapper. + ::execvp("curl", const_cast(args.argv.data())); + ::_exit(127); // exec failed + } + + // Parent: close write-end, keep read-end. + ::close(pipefd[1]); + curl_pid_ = child; + FILE* pipe_file = ::fdopen(pipefd[0], "r"); + + if (!pipe_file) { + ::close(pipefd[0]); + ::kill(child, SIGTERM); + int st; ::waitpid(child, &st, 0); + curl_pid_ = 0; + emit_status("error: fdopen() failed"); + if (!cfg.auto_reconnect || stop_requested_) break; + sleep_ms(backoff_ms); + backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms); + continue; + } +#else + // Windows: popen fallback (no reliable PID, stop() may be slow). + std::ostringstream cmd; + cmd << "curl -N -sS --max-time 0 --connect-timeout " + << (cfg.connect_timeout_ms / 1000 + 1) + << " -H \"Accept: text/event-stream\"" + << " -H \"Cache-Control: no-cache\""; + if (!cfg.bearer_token.empty()) + cmd << " -H \"Authorization: Bearer " + cfg.bearer_token + "\""; + if (!last_id.empty()) + cmd << " -H \"Last-Event-ID: " + last_id + "\""; + cmd << " \"" << cfg.url << "\" 2>NUL"; + FILE* pipe_file = ::popen(cmd.str().c_str(), "r"); + if (!pipe_file) { + emit_status("error: popen() failed"); + if (!cfg.auto_reconnect || stop_requested_) break; + sleep_ms(backoff_ms); + backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms); + continue; + } +#endif + + // --------------------------------------------------------------- + // Read loop — parse SSE lines as they arrive from curl. + // --------------------------------------------------------------- + Event buf; + buf.id = last_id; + bool connected_emitted = false; + char line_buf[8192]; + + while (!stop_requested_) { + if (!fgets(line_buf, sizeof(line_buf), pipe_file)) break; + + if (!connected_emitted) { + emit_status("connected"); + connected_emitted = true; + backoff_ms = cfg.reconnect_initial_ms; // reset on success + } + + std::string line(line_buf); + rstrip_newline(line); + + bool flush = false; + parse_sse_line(line, buf, flush); + + if (flush) { + if (buf.retry_ms > 0) cfg.reconnect_initial_ms = buf.retry_ms; + if (!buf.id.empty()) last_id = buf.id; + flush_event(buf, on_event); + } + } + + // --------------------------------------------------------------- + // Cleanup: kill child (in case we exited due to stop_requested_) + // then wait to avoid zombie. + // --------------------------------------------------------------- +#ifndef _WIN32 + { + pid_t p = curl_pid_.exchange(0); + if (p > 0) { + ::kill(p, SIGTERM); + // Drain to let curl finish writing and avoid SIGPIPE. + char drain[256]; + while (fgets(drain, sizeof(drain), pipe_file)) {} + } + fclose(pipe_file); + if (p > 0) { + int st = 0; + ::waitpid(p, &st, 0); + } + } +#else + ::pclose(pipe_file); +#endif + + if (stop_requested_) break; + + emit_status("disconnected"); + + if (!cfg.auto_reconnect) break; + + int64_t wait_until = now_ms() + backoff_ms; + while (!stop_requested_ && now_ms() < wait_until) + sleep_ms(50); + + backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms); + } + + emit_status("disconnected"); + running_ = false; + } +}; + +// --------------------------------------------------------------------------- +// Client +// --------------------------------------------------------------------------- + +Client::Client() : impl_(std::make_unique()) {} +Client::~Client() { stop(); } + +void Client::start(const Config& cfg, + EventHandler on_event, + StatusHandler on_status) { + if (impl_->running_) return; + impl_->stop_requested_ = false; + impl_->thread_ = std::thread([this, cfg, on_event, on_status]() { + impl_->run(cfg, on_event, on_status); + }); +} + +void Client::stop() { + impl_->stop_requested_ = true; +#ifndef _WIN32 + pid_t p = impl_->curl_pid_.exchange(0); + if (p > 0) ::kill(p, SIGTERM); +#endif + if (impl_->thread_.joinable()) + impl_->thread_.join(); +} + +bool Client::is_running() const { + return impl_->running_.load(); +} + +} // namespace fn_sse diff --git a/cpp/functions/core/sse_client.h b/cpp/functions/core/sse_client.h new file mode 100644 index 00000000..b4e2be61 --- /dev/null +++ b/cpp/functions/core/sse_client.h @@ -0,0 +1,65 @@ +// sse_client.h — Server-Sent Events C++ client with reconnect + Last-Event-ID. +// +// Background-thread client that reads a text/event-stream via curl -N (no +// buffer). Parses SSE spec fields, fires per-event and status callbacks, and +// reconnects with exponential backoff when the server closes. +// +// No link-time libcurl required — requires only `curl` in PATH at runtime. +// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core. +#pragma once + +#include +#include +#include +#include +#include + +namespace fn_sse { + +// A single parsed SSE event per the W3C spec. +struct Event { + std::string event; // "message" if no "event:" field was present + std::string data; // "data:" lines joined with "\n"; trailing "\n" stripped + std::string id; // last "id:" seen (used as Last-Event-ID on reconnect) + int retry_ms = 0; // from "retry:" field, 0 if absent +}; + +struct Config { + std::string url; + std::string bearer_token; // optional: Authorization: Bearer + int connect_timeout_ms = 5000; // curl --connect-timeout + int reconnect_initial_ms = 1000; + int reconnect_max_ms = 30000; // exponential backoff cap + bool auto_reconnect = true; +}; + +// Asynchronous SSE client. Call start() once; the background thread loops +// reading the stream and invoking callbacks. Call stop() to tear down. +// +// Thread safety: on_event / on_status are called from the background thread. +// If they touch UI state, callers must protect with their own mutex. +class Client { +public: + using EventHandler = std::function; + // status strings: "connecting" | "connected" | "disconnected" | "error: " + using StatusHandler = std::function; + + Client(); + ~Client(); + + // Start the background thread. Idempotent — no-op if already running. + void start(const Config& cfg, + EventHandler on_event, + StatusHandler on_status = nullptr); + + // Signal the thread to stop and join. Safe to call multiple times. + void stop(); + + bool is_running() const; + +private: + struct Impl; + std::unique_ptr impl_; +}; + +} // namespace fn_sse diff --git a/cpp/functions/core/sse_client.md b/cpp/functions/core/sse_client.md new file mode 100644 index 00000000..fa6d3705 --- /dev/null +++ b/cpp/functions/core/sse_client.md @@ -0,0 +1,94 @@ +--- +name: sse_client +kind: function +lang: cpp +domain: core +version: 0.1.0 +description: "Cliente Server-Sent Events C++ con reconnect exponencial + Last-Event-ID. Reusable para streams HTTP push (agent_runs, fsnotify, telemetria, dashboards)." +tags: [sse, http, client, network, agents, registry-gap] +purity: impure +signature: "fn_sse::Client::start(cfg, on_event, on_status) -- background thread" +params: + - name: cfg + desc: "Config con url, bearer_token opcional, connect_timeout_ms, reconnect_initial_ms, reconnect_max_ms y auto_reconnect" + - name: on_event + desc: "Callback invocado por cada Event{event,data,id,retry_ms} parseado del stream SSE. Corre en el background thread del cliente." + - name: on_status + desc: "Callback opcional con strings 'connecting'|'connected'|'disconnected'|'error: ' al cambiar estado de conexion." +output: "void start(), void stop(), bool is_running() — control de un thread background que mantiene la conexion SSE viva con reconnect automatico" +uses_functions: [] +uses_types: [] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [] +example: | + fn_sse::Config cfg; + cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream"; + fn_sse::Client cli; + cli.start(cfg, + [](const fn_sse::Event& e){ /* handle e.data JSON */ }, + [](const std::string& s){ std::printf("[sse] %s\n", s.c_str()); }); + // ... app vive aqui; cli corre en background + cli.stop(); +tested: true +tests: + - connect_and_receive_3_events + - parse_event_field + - reconnect_on_disconnect + - stop_kills_thread +test_file_path: "cpp/tests/test_sse_client.cpp" +file_path: "cpp/functions/core/sse_client.cpp" +framework: "" +notes: "curl popen sin libcurl link, mismo patron que llm_anthropic y http_request. Usa fork+exec en lugar de popen puro para obtener el PID del proceso curl y poder matarlo en stop() con SIGTERM, evitando un join colgado." +--- + +## Cuando usarla + +- App necesita escuchar un stream HTTP push (`text/event-stream`) de forma continua. +- Ejemplos: `kanban_cpp` escucha `/api/boards/{issues,flows}/stream` para updates en vivo cuando un `.md` cambia; `skill_tree` escucha `/api/runs//sse` para progreso de `agent_runner_api`; dashboard cross-app lee un timeline SSE. +- NO usar para peticiones HTTP one-shot — eso es `http_request_cpp_core`. + +## Ejemplo + +```cpp +#include "core/sse_client.h" +#include +#include +#include +#include + +std::atomic events_seen{0}; + +int main() { + fn_sse::Config cfg; + cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream"; + cfg.auto_reconnect = true; + + fn_sse::Client cli; + cli.start(cfg, + [](const fn_sse::Event& e) { + events_seen++; + std::printf("event=%s data=%s id=%s\n", + e.event.c_str(), e.data.c_str(), e.id.c_str()); + }, + [](const std::string& status) { + std::printf("[sse] %s\n", status.c_str()); + }); + + // App vive aqui; cli corre en background con reconnect automatico. + std::this_thread::sleep_for(std::chrono::seconds(60)); + cli.stop(); + return 0; +} +``` + +## Gotchas + +- `curl -N` (no buffer) es imprescindible — sin esto stdout queda buffered y el cliente no ve eventos hasta que el buffer se llena. +- `--max-time 0` desactiva el timeout global de curl para streams indefinidos. Si quieres limitar la duracion total, setea `cfg.connect_timeout_ms` (solo afecta la conexion inicial). +- Reconnect no es instantaneo: backoff exponencial 1 s..30 s por defecto. El caller debe tolerar gaps. +- `stop()` mata el proceso curl via `SIGTERM` antes del join. Sin esto, el thread se cuelga hasta que el servidor cierre. Con fork+exec obtenemos el PID; si por algun motivo fork falla cae a `popen()` (sin PID conocido). +- Thread safety: `on_event` y `on_status` corren en el background thread del cliente. Si tocan state de ImGui u otra UI, proteger con mutex en el caller — no desde aqui. +- Last-Event-ID: si el servidor no manda `id:` por evento, el reconnect re-recibe el stream desde el inicio. Es responsabilidad del servidor enviar IDs si quiere resume sin repeticion. +- Si el servidor cierra graceful (FIN, exit 0 de curl), el cliente detecta EOF y reconecta segun `auto_reconnect`. Si el servidor devuelve error HTTP (4xx/5xx), curl tambien cierra y el cliente reconecta — no hay distincion entre "servidor no disponible" y "stream terminado limpiamente". diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e4127422..771bbba2 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -308,3 +308,11 @@ add_fn_test(test_dod_evidence_panel test_dod_evidence_panel.cpp # --- Issue 0118 — agent_runs_timeline: helpers puros ---------- add_fn_test(test_agent_runs_timeline test_agent_runs_timeline.cpp ${CMAKE_CURRENT_SOURCE_DIR}/../functions/viz/agent_runs_timeline_helpers.cpp) + +# --- sse_client: SSE client C++ (registry function) ---- +# Integration test: forks a minimal Python SSE server fixture. +# Uses http_request for the /health readiness probe. +# Skips gracefully if python3 is not available. +add_fn_test(test_sse_client test_sse_client.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../functions/core/sse_client.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/../functions/core/http_request.cpp) diff --git a/cpp/tests/test_sse_client.cpp b/cpp/tests/test_sse_client.cpp new file mode 100644 index 00000000..7fa5e6b3 --- /dev/null +++ b/cpp/tests/test_sse_client.cpp @@ -0,0 +1,361 @@ +// test_sse_client.cpp — Catch2 integration tests for fn_sse::Client. +// +// Strategy: spin up a minimal Python SSE server fixture per test via fork+exec +// (same pattern as test_http_request.cpp). The server exposes: +// GET /health → 200 OK (instant, used for readiness probe) +// GET /stream → text/event-stream (the SSE endpoint) +// +// Tests: +// 1. connect_and_receive_3_events — basic event delivery +// 2. parse_event_field — "event:" field parsed correctly +// 3. reconnect_on_disconnect — auto-reconnect fires when server closes +// 4. stop_kills_thread — stop() joins quickly, is_running() → false +// +// Tests skip gracefully if python3 is not available. +#include "catch_amalgamated.hpp" +#include "core/sse_client.h" +#include "core/http_request.h" // used for readiness probe only + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#endif + +namespace { + +// Pick a port unique-ish per process to avoid collisions. +int pick_port(int offset = 0) { + int base = 47400; + int jitter = (int)(getpid() % 200); + return base + jitter + offset; +} + +// --------------------------------------------------------------------------- +// PythonSSEServer — minimal SSE server fixture +// --------------------------------------------------------------------------- +// GET /health → 200 "ok" +// GET /stream → text/event-stream (sends EVENTS then optionally closes) + +static const char* kServerScript = R"PYEOF( +import sys, json, time +from http.server import BaseHTTPRequestHandler, HTTPServer + +HOST = '127.0.0.1' +PORT = int(sys.argv[1]) +EVENTS = json.loads(sys.argv[2]) +CLOSE = sys.argv[3] == "true" + +class H(BaseHTTPRequestHandler): + def log_message(self, *a, **kw): pass + + def do_GET(self): + if self.path == '/health': + body = b'ok' + self.send_response(200) + self.send_header('Content-Type', 'text/plain') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + return + + # /stream (or anything else) — SSE endpoint + self.send_response(200) + self.send_header('Content-Type', 'text/event-stream') + self.send_header('Cache-Control', 'no-cache') + self.end_headers() + for ev in EVENTS: + if ev.get('id'): + self.wfile.write(('id: ' + ev['id'] + '\n').encode()) + if ev.get('type'): + self.wfile.write(('event: ' + ev['type'] + '\n').encode()) + self.wfile.write(('data: ' + ev['data'] + '\n\n').encode()) + self.wfile.flush() + time.sleep(0.1) + if not CLOSE: + while True: + time.sleep(1) + # close_after=true: return → connection closes + +HTTPServer((HOST, PORT), H).serve_forever() +)PYEOF"; + +struct SseFixture { + pid_t pid = 0; + int port = 0; + + // events: list of {type, data, id} (type/id may be empty strings) + bool start(int p, + const std::vector>& events, + bool close_after) { +#ifdef _WIN32 + (void)p; (void)events; (void)close_after; + return false; +#else + port = p; + + // Build EVENTS JSON array. + std::string events_json = "["; + for (size_t i = 0; i < events.size(); ++i) { + if (i) events_json += ","; + events_json += "{\"type\":\"" + std::get<0>(events[i]) + + "\",\"data\":\"" + std::get<1>(events[i]) + + "\",\"id\":\"" + std::get<2>(events[i]) + "\"}"; + } + events_json += "]"; + + // Write server script to tmp file. + char tmp_buf[256]; + std::snprintf(tmp_buf, sizeof(tmp_buf), "/tmp/fn_sse_srv_%d.py", (int)getpid() + p); + std::string script_path(tmp_buf); + { + std::ofstream f(script_path); + f << kServerScript; + } + + pid_t p2 = fork(); + if (p2 < 0) return false; + if (p2 == 0) { + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + execlp("python3", "python3", "-u", script_path.c_str(), + std::to_string(port).c_str(), + events_json.c_str(), + close_after ? "true" : "false", + (char*)nullptr); + _Exit(127); + } + pid = p2; + + // Poll /health via http_request until server is ready (~3 s max). + std::string health_url = std::string("http://127.0.0.1:") + + std::to_string(port) + "/health"; + for (int i = 0; i < 60; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + fn_http::Request req; + req.url = health_url; + req.timeout_ms = 400; + auto res = fn_http::request(req); + if (res.status == 200) return true; + } + // Server didn't respond in time. + return false; +#endif + } + + void stop() { +#ifndef _WIN32 + if (pid > 0) { + kill(pid, SIGTERM); + int st = 0; + waitpid(pid, &st, 0); + pid = 0; + } +#endif + } + + std::string stream_url() const { + return std::string("http://127.0.0.1:") + std::to_string(port) + "/stream"; + } +}; + +bool python3_available() { + return system("python3 --version > /dev/null 2>&1") == 0; +} + +} // anon + +// --------------------------------------------------------------------------- +// Test 1: connect_and_receive_3_events +// --------------------------------------------------------------------------- +TEST_CASE("sse_client: connect_and_receive_3_events", "[sse_client]") { +#ifdef _WIN32 + SUCCEED("skipped on Windows"); return; +#else + if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; } + + SseFixture srv; + bool ok = srv.start(pick_port(0), + {{"","hello-0","0"},{"","hello-1","1"},{"","hello-2","2"}}, + /*close_after=*/false); + if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; } + + fn_sse::Config cfg; + cfg.url = srv.stream_url(); + cfg.auto_reconnect = false; + cfg.connect_timeout_ms = 3000; + + std::mutex mtx; + std::vector received; + + fn_sse::Client cli; + cli.start(cfg, + [&](const fn_sse::Event& e) { + std::lock_guard lk(mtx); + received.push_back(e); + }); + + // Wait up to 5 s for 3 events. + for (int i = 0; i < 100; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::lock_guard lk(mtx); + if ((int)received.size() >= 3) break; + } + + cli.stop(); + srv.stop(); + + std::lock_guard lk(mtx); + REQUIRE(received.size() >= 3); + REQUIRE(received[0].data == "hello-0"); + REQUIRE(received[1].data == "hello-1"); + REQUIRE(received[2].data == "hello-2"); + // Default event type when "event:" field absent. + REQUIRE(received[0].event == "message"); +#endif +} + +// --------------------------------------------------------------------------- +// Test 2: parse_event_field +// --------------------------------------------------------------------------- +TEST_CASE("sse_client: parse_event_field", "[sse_client]") { +#ifdef _WIN32 + SUCCEED("skipped on Windows"); return; +#else + if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; } + + SseFixture srv; + bool ok = srv.start(pick_port(2), + {{"card_changed", R"({"id":42})", "1"}}, + /*close_after=*/false); + if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; } + + fn_sse::Config cfg; + cfg.url = srv.stream_url(); + cfg.auto_reconnect = false; + cfg.connect_timeout_ms = 3000; + + std::mutex mtx; + fn_sse::Event captured; + + fn_sse::Client cli; + cli.start(cfg, + [&](const fn_sse::Event& e) { + std::lock_guard lk(mtx); + captured = e; + }); + + for (int i = 0; i < 80; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::lock_guard lk(mtx); + if (!captured.data.empty()) break; + } + + cli.stop(); + srv.stop(); + + std::lock_guard lk(mtx); + REQUIRE(captured.event == "card_changed"); + REQUIRE(captured.data == R"({"id":42})"); + REQUIRE(captured.id == "1"); +#endif +} + +// --------------------------------------------------------------------------- +// Test 3: reconnect_on_disconnect +// --------------------------------------------------------------------------- +TEST_CASE("sse_client: reconnect_on_disconnect", "[sse_client]") { +#ifdef _WIN32 + SUCCEED("skipped on Windows"); return; +#else + if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; } + + // Server sends 2 events then closes. With auto_reconnect=true the client + // should reconnect and receive 2 more events (same server re-handles new req). + SseFixture srv; + bool ok = srv.start(pick_port(4), + {{"","wave-0",""},{"","wave-1",""}}, + /*close_after=*/true); + if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; } + + fn_sse::Config cfg; + cfg.url = srv.stream_url(); + cfg.auto_reconnect = true; + cfg.reconnect_initial_ms = 200; // fast backoff for test + cfg.connect_timeout_ms = 3000; + + std::atomic total_events{0}; + std::atomic disconnects{0}; + + fn_sse::Client cli; + cli.start(cfg, + [&](const fn_sse::Event&) { total_events++; }, + [&](const std::string& s) { + if (s == "disconnected") disconnects++; + }); + + // Wait up to 8 s for at least 2 disconnects (meaning at least 2 reconnects). + for (int i = 0; i < 160; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (disconnects >= 2) break; + } + + cli.stop(); + srv.stop(); + + REQUIRE(total_events >= 2); // at least first batch arrived + REQUIRE(disconnects >= 1); // client detected server close + reconnected +#endif +} + +// --------------------------------------------------------------------------- +// Test 4: stop_kills_thread +// --------------------------------------------------------------------------- +TEST_CASE("sse_client: stop_kills_thread", "[sse_client]") { +#ifdef _WIN32 + SUCCEED("skipped on Windows"); return; +#else + if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; } + + // Start client against an infinite-stream server, then immediately stop. + SseFixture srv; + bool ok = srv.start(pick_port(6), + {{"","ping",""}}, + /*close_after=*/false); + if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; } + + fn_sse::Config cfg; + cfg.url = srv.stream_url(); + cfg.auto_reconnect = true; + cfg.connect_timeout_ms = 3000; + + fn_sse::Client cli; + cli.start(cfg, [](const fn_sse::Event&) {}); + + // Give the thread a moment to start and connect. + std::this_thread::sleep_for(std::chrono::milliseconds(400)); + REQUIRE(cli.is_running()); + + auto t0 = std::chrono::steady_clock::now(); + cli.stop(); + auto elapsed_ms = std::chrono::duration_cast( + std::chrono::steady_clock::now() - t0).count(); + + srv.stop(); + + REQUIRE_FALSE(cli.is_running()); + // stop() should complete in well under 5 s (SIGTERM + join). + REQUIRE(elapsed_ms < 5000); +#endif +}