feat(cpp/core): sse_client_cpp_core — SSE client con reconnect + Last-Event-ID

Cliente Server-Sent Events C++ reusable (fn_sse::Client) con background
thread, exponential backoff, Last-Event-ID y stop() que no bloquea.

Implementacion clave: fork+execvp curl directamente (sin /bin/sh wrapper)
para tener el PID real del proceso curl en curl_pid_, lo que permite que
stop() → kill(SIGTERM) → fgets NULL → join() funcione sin bloqueo.

4 tests (Catch2): connect_and_receive_3_events, parse_event_field,
reconnect_on_disconnect, stop_kills_thread. Fixture Python SSE con
/health probe via http_request_cpp_core.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-18 19:59:50 +02:00
parent b5affae68c
commit c52846d475
5 changed files with 888 additions and 0 deletions
+360
View File
@@ -0,0 +1,360 @@
// sse_client.cpp — Server-Sent Events client via curl -N (fork+exec direct).
//
// Transport: background thread fork+exec curl DIRECTLY (no /bin/sh wrapper)
// so that the child PID is the curl process itself. This lets stop() call
// kill(curl_pid, SIGTERM) and have fgets() return NULL immediately.
//
// Using /bin/sh -c "curl ..." hides the real PID (it's the grandchild) and
// kill(sh_pid) leaves curl orphaned → fgets() never returns → join() hangs.
//
// No link-time libcurl. curl must be in PATH at runtime.
// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core.
#include "core/sse_client.h"
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#ifndef _WIN32
#include <csignal>
#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>
#endif
namespace fn_sse {
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
namespace {
int64_t now_ms() {
using clock = std::chrono::steady_clock;
return std::chrono::duration_cast<std::chrono::milliseconds>(
clock::now().time_since_epoch()).count();
}
void sleep_ms(int ms) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
}
// Strip trailing \r and \n.
void rstrip_newline(std::string& s) {
while (!s.empty() && (s.back() == '\r' || s.back() == '\n'))
s.pop_back();
}
int parse_int_field(const std::string& s) {
size_t i = 0;
while (i < s.size() && (s[i] == ' ' || s[i] == '\t')) ++i;
if (i == s.size()) return 0;
char* end = nullptr;
long v = std::strtol(s.c_str() + i, &end, 10);
return (end && end != s.c_str() + i) ? (int)v : 0;
}
// Parse one SSE line into the event buffer.
void parse_sse_line(const std::string& line, Event& buf, bool& should_flush) {
should_flush = false;
if (line.empty()) {
should_flush = true;
return;
}
if (line[0] == ':') return; // comment
size_t colon = line.find(':');
std::string field, value;
if (colon == std::string::npos) {
field = line;
value = "";
} else {
field = line.substr(0, colon);
value = line.substr(colon + 1);
if (!value.empty() && value[0] == ' ') value = value.substr(1);
}
if (field == "event") {
buf.event = value;
} else if (field == "data") {
if (!buf.data.empty()) buf.data += '\n';
buf.data += value;
} else if (field == "id") {
if (value.find('\0') == std::string::npos)
buf.id = value;
} else if (field == "retry") {
int ms = parse_int_field(value);
if (ms > 0) buf.retry_ms = ms;
}
}
// Dispatch buffered event, reset mutable fields.
void flush_event(Event& buf, const Client::EventHandler& cb) {
if (buf.data.empty()) {
buf.event = "";
return;
}
if (buf.event.empty()) buf.event = "message";
if (!buf.data.empty() && buf.data.back() == '\n') buf.data.pop_back();
if (cb) cb(buf);
std::string saved_id = buf.id;
int saved_rms = buf.retry_ms;
buf = Event{};
buf.id = saved_id;
buf.retry_ms = saved_rms;
}
#ifndef _WIN32
// Build argv vector for exec. Returns vector of char* pointing into `strs`.
// The last element is nullptr.
struct CurlArgs {
std::vector<std::string> strs;
std::vector<const char*> argv;
void add(const std::string& s) { strs.push_back(s); }
void build() {
argv.clear();
for (const auto& s : strs) argv.push_back(s.c_str());
argv.push_back(nullptr);
}
};
#endif
} // anon
// ---------------------------------------------------------------------------
// Impl
// ---------------------------------------------------------------------------
struct Client::Impl {
std::thread thread_;
std::atomic<bool> stop_requested_{false};
std::atomic<bool> running_{false};
#ifndef _WIN32
std::atomic<pid_t> curl_pid_{0};
#endif
void run(Config cfg, EventHandler on_event, StatusHandler on_status) {
running_ = true;
int backoff_ms = cfg.reconnect_initial_ms;
std::string last_id;
auto emit_status = [&](const std::string& s) {
if (on_status) on_status(s);
};
while (!stop_requested_) {
emit_status("connecting");
#ifndef _WIN32
// Build curl args directly — no shell wrapper, so PID == curl PID.
CurlArgs args;
args.add("curl");
args.add("-N"); // no output buffering
args.add("-sS"); // silent + show errors
args.add("--max-time");
args.add("0"); // no global timeout (stream is indefinite)
args.add("--connect-timeout");
args.add(std::to_string(cfg.connect_timeout_ms / 1000 + 1));
args.add("-H"); args.add("Accept: text/event-stream");
args.add("-H"); args.add("Cache-Control: no-cache");
if (!cfg.bearer_token.empty()) {
args.add("-H");
args.add("Authorization: Bearer " + cfg.bearer_token);
}
if (!last_id.empty()) {
args.add("-H");
args.add("Last-Event-ID: " + last_id);
}
args.add(cfg.url);
args.build();
// Fork+pipe: child exec's curl, parent reads stdout.
int pipefd[2];
if (::pipe(pipefd) != 0) {
emit_status("error: pipe() failed");
if (!cfg.auto_reconnect || stop_requested_) break;
sleep_ms(backoff_ms);
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
continue;
}
pid_t child = ::fork();
if (child < 0) {
::close(pipefd[0]);
::close(pipefd[1]);
emit_status("error: fork() failed");
if (!cfg.auto_reconnect || stop_requested_) break;
sleep_ms(backoff_ms);
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
continue;
}
if (child == 0) {
// Child: redirect stdout to pipe write-end, stderr to /dev/null.
::close(pipefd[0]);
::dup2(pipefd[1], STDOUT_FILENO);
::close(pipefd[1]);
int devnull = ::open("/dev/null", O_WRONLY);
if (devnull >= 0) {
::dup2(devnull, STDERR_FILENO);
::close(devnull);
}
// exec curl directly — no sh wrapper.
::execvp("curl", const_cast<char* const*>(args.argv.data()));
::_exit(127); // exec failed
}
// Parent: close write-end, keep read-end.
::close(pipefd[1]);
curl_pid_ = child;
FILE* pipe_file = ::fdopen(pipefd[0], "r");
if (!pipe_file) {
::close(pipefd[0]);
::kill(child, SIGTERM);
int st; ::waitpid(child, &st, 0);
curl_pid_ = 0;
emit_status("error: fdopen() failed");
if (!cfg.auto_reconnect || stop_requested_) break;
sleep_ms(backoff_ms);
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
continue;
}
#else
// Windows: popen fallback (no reliable PID, stop() may be slow).
std::ostringstream cmd;
cmd << "curl -N -sS --max-time 0 --connect-timeout "
<< (cfg.connect_timeout_ms / 1000 + 1)
<< " -H \"Accept: text/event-stream\""
<< " -H \"Cache-Control: no-cache\"";
if (!cfg.bearer_token.empty())
cmd << " -H \"Authorization: Bearer " + cfg.bearer_token + "\"";
if (!last_id.empty())
cmd << " -H \"Last-Event-ID: " + last_id + "\"";
cmd << " \"" << cfg.url << "\" 2>NUL";
FILE* pipe_file = ::popen(cmd.str().c_str(), "r");
if (!pipe_file) {
emit_status("error: popen() failed");
if (!cfg.auto_reconnect || stop_requested_) break;
sleep_ms(backoff_ms);
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
continue;
}
#endif
// ---------------------------------------------------------------
// Read loop — parse SSE lines as they arrive from curl.
// ---------------------------------------------------------------
Event buf;
buf.id = last_id;
bool connected_emitted = false;
char line_buf[8192];
while (!stop_requested_) {
if (!fgets(line_buf, sizeof(line_buf), pipe_file)) break;
if (!connected_emitted) {
emit_status("connected");
connected_emitted = true;
backoff_ms = cfg.reconnect_initial_ms; // reset on success
}
std::string line(line_buf);
rstrip_newline(line);
bool flush = false;
parse_sse_line(line, buf, flush);
if (flush) {
if (buf.retry_ms > 0) cfg.reconnect_initial_ms = buf.retry_ms;
if (!buf.id.empty()) last_id = buf.id;
flush_event(buf, on_event);
}
}
// ---------------------------------------------------------------
// Cleanup: kill child (in case we exited due to stop_requested_)
// then wait to avoid zombie.
// ---------------------------------------------------------------
#ifndef _WIN32
{
pid_t p = curl_pid_.exchange(0);
if (p > 0) {
::kill(p, SIGTERM);
// Drain to let curl finish writing and avoid SIGPIPE.
char drain[256];
while (fgets(drain, sizeof(drain), pipe_file)) {}
}
fclose(pipe_file);
if (p > 0) {
int st = 0;
::waitpid(p, &st, 0);
}
}
#else
::pclose(pipe_file);
#endif
if (stop_requested_) break;
emit_status("disconnected");
if (!cfg.auto_reconnect) break;
int64_t wait_until = now_ms() + backoff_ms;
while (!stop_requested_ && now_ms() < wait_until)
sleep_ms(50);
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
}
emit_status("disconnected");
running_ = false;
}
};
// ---------------------------------------------------------------------------
// Client
// ---------------------------------------------------------------------------
Client::Client() : impl_(std::make_unique<Impl>()) {}
Client::~Client() { stop(); }
void Client::start(const Config& cfg,
EventHandler on_event,
StatusHandler on_status) {
if (impl_->running_) return;
impl_->stop_requested_ = false;
impl_->thread_ = std::thread([this, cfg, on_event, on_status]() {
impl_->run(cfg, on_event, on_status);
});
}
void Client::stop() {
impl_->stop_requested_ = true;
#ifndef _WIN32
pid_t p = impl_->curl_pid_.exchange(0);
if (p > 0) ::kill(p, SIGTERM);
#endif
if (impl_->thread_.joinable())
impl_->thread_.join();
}
bool Client::is_running() const {
return impl_->running_.load();
}
} // namespace fn_sse
+65
View File
@@ -0,0 +1,65 @@
// sse_client.h — Server-Sent Events C++ client with reconnect + Last-Event-ID.
//
// Background-thread client that reads a text/event-stream via curl -N (no
// buffer). Parses SSE spec fields, fires per-event and status callbacks, and
// reconnects with exponential backoff when the server closes.
//
// No link-time libcurl required — requires only `curl` in PATH at runtime.
// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core.
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <string>
#include <thread>
namespace fn_sse {
// A single parsed SSE event per the W3C spec.
struct Event {
std::string event; // "message" if no "event:" field was present
std::string data; // "data:" lines joined with "\n"; trailing "\n" stripped
std::string id; // last "id:" seen (used as Last-Event-ID on reconnect)
int retry_ms = 0; // from "retry:" field, 0 if absent
};
struct Config {
std::string url;
std::string bearer_token; // optional: Authorization: Bearer <token>
int connect_timeout_ms = 5000; // curl --connect-timeout
int reconnect_initial_ms = 1000;
int reconnect_max_ms = 30000; // exponential backoff cap
bool auto_reconnect = true;
};
// Asynchronous SSE client. Call start() once; the background thread loops
// reading the stream and invoking callbacks. Call stop() to tear down.
//
// Thread safety: on_event / on_status are called from the background thread.
// If they touch UI state, callers must protect with their own mutex.
class Client {
public:
using EventHandler = std::function<void(const Event&)>;
// status strings: "connecting" | "connected" | "disconnected" | "error: <msg>"
using StatusHandler = std::function<void(const std::string&)>;
Client();
~Client();
// Start the background thread. Idempotent — no-op if already running.
void start(const Config& cfg,
EventHandler on_event,
StatusHandler on_status = nullptr);
// Signal the thread to stop and join. Safe to call multiple times.
void stop();
bool is_running() const;
private:
struct Impl;
std::unique_ptr<Impl> impl_;
};
} // namespace fn_sse
+94
View File
@@ -0,0 +1,94 @@
---
name: sse_client
kind: function
lang: cpp
domain: core
version: 0.1.0
description: "Cliente Server-Sent Events C++ con reconnect exponencial + Last-Event-ID. Reusable para streams HTTP push (agent_runs, fsnotify, telemetria, dashboards)."
tags: [sse, http, client, network, agents, registry-gap]
purity: impure
signature: "fn_sse::Client::start(cfg, on_event, on_status) -- background thread"
params:
- name: cfg
desc: "Config con url, bearer_token opcional, connect_timeout_ms, reconnect_initial_ms, reconnect_max_ms y auto_reconnect"
- name: on_event
desc: "Callback invocado por cada Event{event,data,id,retry_ms} parseado del stream SSE. Corre en el background thread del cliente."
- name: on_status
desc: "Callback opcional con strings 'connecting'|'connected'|'disconnected'|'error: <msg>' al cambiar estado de conexion."
output: "void start(), void stop(), bool is_running() — control de un thread background que mantiene la conexion SSE viva con reconnect automatico"
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: []
example: |
fn_sse::Config cfg;
cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream";
fn_sse::Client cli;
cli.start(cfg,
[](const fn_sse::Event& e){ /* handle e.data JSON */ },
[](const std::string& s){ std::printf("[sse] %s\n", s.c_str()); });
// ... app vive aqui; cli corre en background
cli.stop();
tested: true
tests:
- connect_and_receive_3_events
- parse_event_field
- reconnect_on_disconnect
- stop_kills_thread
test_file_path: "cpp/tests/test_sse_client.cpp"
file_path: "cpp/functions/core/sse_client.cpp"
framework: ""
notes: "curl popen sin libcurl link, mismo patron que llm_anthropic y http_request. Usa fork+exec en lugar de popen puro para obtener el PID del proceso curl y poder matarlo en stop() con SIGTERM, evitando un join colgado."
---
## Cuando usarla
- App necesita escuchar un stream HTTP push (`text/event-stream`) de forma continua.
- Ejemplos: `kanban_cpp` escucha `/api/boards/{issues,flows}/stream` para updates en vivo cuando un `.md` cambia; `skill_tree` escucha `/api/runs/<id>/sse` para progreso de `agent_runner_api`; dashboard cross-app lee un timeline SSE.
- NO usar para peticiones HTTP one-shot — eso es `http_request_cpp_core`.
## Ejemplo
```cpp
#include "core/sse_client.h"
#include <atomic>
#include <cstdio>
#include <thread>
#include <chrono>
std::atomic<int> events_seen{0};
int main() {
fn_sse::Config cfg;
cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream";
cfg.auto_reconnect = true;
fn_sse::Client cli;
cli.start(cfg,
[](const fn_sse::Event& e) {
events_seen++;
std::printf("event=%s data=%s id=%s\n",
e.event.c_str(), e.data.c_str(), e.id.c_str());
},
[](const std::string& status) {
std::printf("[sse] %s\n", status.c_str());
});
// App vive aqui; cli corre en background con reconnect automatico.
std::this_thread::sleep_for(std::chrono::seconds(60));
cli.stop();
return 0;
}
```
## Gotchas
- `curl -N` (no buffer) es imprescindible — sin esto stdout queda buffered y el cliente no ve eventos hasta que el buffer se llena.
- `--max-time 0` desactiva el timeout global de curl para streams indefinidos. Si quieres limitar la duracion total, setea `cfg.connect_timeout_ms` (solo afecta la conexion inicial).
- Reconnect no es instantaneo: backoff exponencial 1 s..30 s por defecto. El caller debe tolerar gaps.
- `stop()` mata el proceso curl via `SIGTERM` antes del join. Sin esto, el thread se cuelga hasta que el servidor cierre. Con fork+exec obtenemos el PID; si por algun motivo fork falla cae a `popen()` (sin PID conocido).
- Thread safety: `on_event` y `on_status` corren en el background thread del cliente. Si tocan state de ImGui u otra UI, proteger con mutex en el caller — no desde aqui.
- Last-Event-ID: si el servidor no manda `id:` por evento, el reconnect re-recibe el stream desde el inicio. Es responsabilidad del servidor enviar IDs si quiere resume sin repeticion.
- Si el servidor cierra graceful (FIN, exit 0 de curl), el cliente detecta EOF y reconecta segun `auto_reconnect`. Si el servidor devuelve error HTTP (4xx/5xx), curl tambien cierra y el cliente reconecta — no hay distincion entre "servidor no disponible" y "stream terminado limpiamente".
+8
View File
@@ -308,3 +308,11 @@ add_fn_test(test_dod_evidence_panel test_dod_evidence_panel.cpp
# --- Issue 0118 — agent_runs_timeline: helpers puros ----------
add_fn_test(test_agent_runs_timeline test_agent_runs_timeline.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../functions/viz/agent_runs_timeline_helpers.cpp)
# --- sse_client: SSE client C++ (registry function) ----
# Integration test: forks a minimal Python SSE server fixture.
# Uses http_request for the /health readiness probe.
# Skips gracefully if python3 is not available.
add_fn_test(test_sse_client test_sse_client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../functions/core/sse_client.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../functions/core/http_request.cpp)
+361
View File
@@ -0,0 +1,361 @@
// test_sse_client.cpp — Catch2 integration tests for fn_sse::Client.
//
// Strategy: spin up a minimal Python SSE server fixture per test via fork+exec
// (same pattern as test_http_request.cpp). The server exposes:
// GET /health → 200 OK (instant, used for readiness probe)
// GET /stream → text/event-stream (the SSE endpoint)
//
// Tests:
// 1. connect_and_receive_3_events — basic event delivery
// 2. parse_event_field — "event:" field parsed correctly
// 3. reconnect_on_disconnect — auto-reconnect fires when server closes
// 4. stop_kills_thread — stop() joins quickly, is_running() → false
//
// Tests skip gracefully if python3 is not available.
#include "catch_amalgamated.hpp"
#include "core/sse_client.h"
#include "core/http_request.h" // used for readiness probe only
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <fstream>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#ifndef _WIN32
#include <signal.h>
#include <sys/wait.h>
#include <unistd.h>
#endif
namespace {
// Pick a port unique-ish per process to avoid collisions.
int pick_port(int offset = 0) {
int base = 47400;
int jitter = (int)(getpid() % 200);
return base + jitter + offset;
}
// ---------------------------------------------------------------------------
// PythonSSEServer — minimal SSE server fixture
// ---------------------------------------------------------------------------
// GET /health → 200 "ok"
// GET /stream → text/event-stream (sends EVENTS then optionally closes)
static const char* kServerScript = R"PYEOF(
import sys, json, time
from http.server import BaseHTTPRequestHandler, HTTPServer
HOST = '127.0.0.1'
PORT = int(sys.argv[1])
EVENTS = json.loads(sys.argv[2])
CLOSE = sys.argv[3] == "true"
class H(BaseHTTPRequestHandler):
def log_message(self, *a, **kw): pass
def do_GET(self):
if self.path == '/health':
body = b'ok'
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
return
# /stream (or anything else) — SSE endpoint
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
for ev in EVENTS:
if ev.get('id'):
self.wfile.write(('id: ' + ev['id'] + '\n').encode())
if ev.get('type'):
self.wfile.write(('event: ' + ev['type'] + '\n').encode())
self.wfile.write(('data: ' + ev['data'] + '\n\n').encode())
self.wfile.flush()
time.sleep(0.1)
if not CLOSE:
while True:
time.sleep(1)
# close_after=true: return → connection closes
HTTPServer((HOST, PORT), H).serve_forever()
)PYEOF";
struct SseFixture {
pid_t pid = 0;
int port = 0;
// events: list of {type, data, id} (type/id may be empty strings)
bool start(int p,
const std::vector<std::tuple<std::string,std::string,std::string>>& events,
bool close_after) {
#ifdef _WIN32
(void)p; (void)events; (void)close_after;
return false;
#else
port = p;
// Build EVENTS JSON array.
std::string events_json = "[";
for (size_t i = 0; i < events.size(); ++i) {
if (i) events_json += ",";
events_json += "{\"type\":\"" + std::get<0>(events[i]) +
"\",\"data\":\"" + std::get<1>(events[i]) +
"\",\"id\":\"" + std::get<2>(events[i]) + "\"}";
}
events_json += "]";
// Write server script to tmp file.
char tmp_buf[256];
std::snprintf(tmp_buf, sizeof(tmp_buf), "/tmp/fn_sse_srv_%d.py", (int)getpid() + p);
std::string script_path(tmp_buf);
{
std::ofstream f(script_path);
f << kServerScript;
}
pid_t p2 = fork();
if (p2 < 0) return false;
if (p2 == 0) {
freopen("/dev/null", "r", stdin);
freopen("/dev/null", "w", stdout);
freopen("/dev/null", "w", stderr);
execlp("python3", "python3", "-u", script_path.c_str(),
std::to_string(port).c_str(),
events_json.c_str(),
close_after ? "true" : "false",
(char*)nullptr);
_Exit(127);
}
pid = p2;
// Poll /health via http_request until server is ready (~3 s max).
std::string health_url = std::string("http://127.0.0.1:") +
std::to_string(port) + "/health";
for (int i = 0; i < 60; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
fn_http::Request req;
req.url = health_url;
req.timeout_ms = 400;
auto res = fn_http::request(req);
if (res.status == 200) return true;
}
// Server didn't respond in time.
return false;
#endif
}
void stop() {
#ifndef _WIN32
if (pid > 0) {
kill(pid, SIGTERM);
int st = 0;
waitpid(pid, &st, 0);
pid = 0;
}
#endif
}
std::string stream_url() const {
return std::string("http://127.0.0.1:") + std::to_string(port) + "/stream";
}
};
bool python3_available() {
return system("python3 --version > /dev/null 2>&1") == 0;
}
} // anon
// ---------------------------------------------------------------------------
// Test 1: connect_and_receive_3_events
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: connect_and_receive_3_events", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
SseFixture srv;
bool ok = srv.start(pick_port(0),
{{"","hello-0","0"},{"","hello-1","1"},{"","hello-2","2"}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = false;
cfg.connect_timeout_ms = 3000;
std::mutex mtx;
std::vector<fn_sse::Event> received;
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event& e) {
std::lock_guard<std::mutex> lk(mtx);
received.push_back(e);
});
// Wait up to 5 s for 3 events.
for (int i = 0; i < 100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::lock_guard<std::mutex> lk(mtx);
if ((int)received.size() >= 3) break;
}
cli.stop();
srv.stop();
std::lock_guard<std::mutex> lk(mtx);
REQUIRE(received.size() >= 3);
REQUIRE(received[0].data == "hello-0");
REQUIRE(received[1].data == "hello-1");
REQUIRE(received[2].data == "hello-2");
// Default event type when "event:" field absent.
REQUIRE(received[0].event == "message");
#endif
}
// ---------------------------------------------------------------------------
// Test 2: parse_event_field
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: parse_event_field", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
SseFixture srv;
bool ok = srv.start(pick_port(2),
{{"card_changed", R"({"id":42})", "1"}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = false;
cfg.connect_timeout_ms = 3000;
std::mutex mtx;
fn_sse::Event captured;
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event& e) {
std::lock_guard<std::mutex> lk(mtx);
captured = e;
});
for (int i = 0; i < 80; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
std::lock_guard<std::mutex> lk(mtx);
if (!captured.data.empty()) break;
}
cli.stop();
srv.stop();
std::lock_guard<std::mutex> lk(mtx);
REQUIRE(captured.event == "card_changed");
REQUIRE(captured.data == R"({"id":42})");
REQUIRE(captured.id == "1");
#endif
}
// ---------------------------------------------------------------------------
// Test 3: reconnect_on_disconnect
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: reconnect_on_disconnect", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
// Server sends 2 events then closes. With auto_reconnect=true the client
// should reconnect and receive 2 more events (same server re-handles new req).
SseFixture srv;
bool ok = srv.start(pick_port(4),
{{"","wave-0",""},{"","wave-1",""}},
/*close_after=*/true);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = true;
cfg.reconnect_initial_ms = 200; // fast backoff for test
cfg.connect_timeout_ms = 3000;
std::atomic<int> total_events{0};
std::atomic<int> disconnects{0};
fn_sse::Client cli;
cli.start(cfg,
[&](const fn_sse::Event&) { total_events++; },
[&](const std::string& s) {
if (s == "disconnected") disconnects++;
});
// Wait up to 8 s for at least 2 disconnects (meaning at least 2 reconnects).
for (int i = 0; i < 160; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (disconnects >= 2) break;
}
cli.stop();
srv.stop();
REQUIRE(total_events >= 2); // at least first batch arrived
REQUIRE(disconnects >= 1); // client detected server close + reconnected
#endif
}
// ---------------------------------------------------------------------------
// Test 4: stop_kills_thread
// ---------------------------------------------------------------------------
TEST_CASE("sse_client: stop_kills_thread", "[sse_client]") {
#ifdef _WIN32
SUCCEED("skipped on Windows"); return;
#else
if (!python3_available()) { SUCCEED("python3 not available — skipped"); return; }
// Start client against an infinite-stream server, then immediately stop.
SseFixture srv;
bool ok = srv.start(pick_port(6),
{{"","ping",""}},
/*close_after=*/false);
if (!ok) { SUCCEED("server fixture failed to start — skipped"); return; }
fn_sse::Config cfg;
cfg.url = srv.stream_url();
cfg.auto_reconnect = true;
cfg.connect_timeout_ms = 3000;
fn_sse::Client cli;
cli.start(cfg, [](const fn_sse::Event&) {});
// Give the thread a moment to start and connect.
std::this_thread::sleep_for(std::chrono::milliseconds(400));
REQUIRE(cli.is_running());
auto t0 = std::chrono::steady_clock::now();
cli.stop();
auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - t0).count();
srv.stop();
REQUIRE_FALSE(cli.is_running());
// stop() should complete in well under 5 s (SIGTERM + join).
REQUIRE(elapsed_ms < 5000);
#endif
}