feat(cpp/core): sse_client_cpp_core — SSE client con reconnect + Last-Event-ID
Cliente Server-Sent Events C++ reusable (fn_sse::Client) con background thread, exponential backoff, Last-Event-ID y stop() que no bloquea. Implementacion clave: fork+execvp curl directamente (sin /bin/sh wrapper) para tener el PID real del proceso curl en curl_pid_, lo que permite que stop() → kill(SIGTERM) → fgets NULL → join() funcione sin bloqueo. 4 tests (Catch2): connect_and_receive_3_events, parse_event_field, reconnect_on_disconnect, stop_kills_thread. Fixture Python SSE con /health probe via http_request_cpp_core. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,360 @@
|
||||
// sse_client.cpp — Server-Sent Events client via curl -N (fork+exec direct).
|
||||
//
|
||||
// Transport: background thread fork+exec curl DIRECTLY (no /bin/sh wrapper)
|
||||
// so that the child PID is the curl process itself. This lets stop() call
|
||||
// kill(curl_pid, SIGTERM) and have fgets() return NULL immediately.
|
||||
//
|
||||
// Using /bin/sh -c "curl ..." hides the real PID (it's the grandchild) and
|
||||
// kill(sh_pid) leaves curl orphaned → fgets() never returns → join() hangs.
|
||||
//
|
||||
// No link-time libcurl. curl must be in PATH at runtime.
|
||||
// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core.
|
||||
#include "core/sse_client.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#ifndef _WIN32
|
||||
#include <csignal>
|
||||
#include <fcntl.h>
|
||||
#include <sys/wait.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
namespace fn_sse {
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
namespace {
|
||||
|
||||
int64_t now_ms() {
|
||||
using clock = std::chrono::steady_clock;
|
||||
return std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
clock::now().time_since_epoch()).count();
|
||||
}
|
||||
|
||||
void sleep_ms(int ms) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
|
||||
}
|
||||
|
||||
// Strip trailing \r and \n.
|
||||
void rstrip_newline(std::string& s) {
|
||||
while (!s.empty() && (s.back() == '\r' || s.back() == '\n'))
|
||||
s.pop_back();
|
||||
}
|
||||
|
||||
int parse_int_field(const std::string& s) {
|
||||
size_t i = 0;
|
||||
while (i < s.size() && (s[i] == ' ' || s[i] == '\t')) ++i;
|
||||
if (i == s.size()) return 0;
|
||||
char* end = nullptr;
|
||||
long v = std::strtol(s.c_str() + i, &end, 10);
|
||||
return (end && end != s.c_str() + i) ? (int)v : 0;
|
||||
}
|
||||
|
||||
// Parse one SSE line into the event buffer.
|
||||
void parse_sse_line(const std::string& line, Event& buf, bool& should_flush) {
|
||||
should_flush = false;
|
||||
|
||||
if (line.empty()) {
|
||||
should_flush = true;
|
||||
return;
|
||||
}
|
||||
|
||||
if (line[0] == ':') return; // comment
|
||||
|
||||
size_t colon = line.find(':');
|
||||
std::string field, value;
|
||||
if (colon == std::string::npos) {
|
||||
field = line;
|
||||
value = "";
|
||||
} else {
|
||||
field = line.substr(0, colon);
|
||||
value = line.substr(colon + 1);
|
||||
if (!value.empty() && value[0] == ' ') value = value.substr(1);
|
||||
}
|
||||
|
||||
if (field == "event") {
|
||||
buf.event = value;
|
||||
} else if (field == "data") {
|
||||
if (!buf.data.empty()) buf.data += '\n';
|
||||
buf.data += value;
|
||||
} else if (field == "id") {
|
||||
if (value.find('\0') == std::string::npos)
|
||||
buf.id = value;
|
||||
} else if (field == "retry") {
|
||||
int ms = parse_int_field(value);
|
||||
if (ms > 0) buf.retry_ms = ms;
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch buffered event, reset mutable fields.
|
||||
void flush_event(Event& buf, const Client::EventHandler& cb) {
|
||||
if (buf.data.empty()) {
|
||||
buf.event = "";
|
||||
return;
|
||||
}
|
||||
if (buf.event.empty()) buf.event = "message";
|
||||
if (!buf.data.empty() && buf.data.back() == '\n') buf.data.pop_back();
|
||||
if (cb) cb(buf);
|
||||
std::string saved_id = buf.id;
|
||||
int saved_rms = buf.retry_ms;
|
||||
buf = Event{};
|
||||
buf.id = saved_id;
|
||||
buf.retry_ms = saved_rms;
|
||||
}
|
||||
|
||||
#ifndef _WIN32
|
||||
// Build argv vector for exec. Returns vector of char* pointing into `strs`.
|
||||
// The last element is nullptr.
|
||||
struct CurlArgs {
|
||||
std::vector<std::string> strs;
|
||||
std::vector<const char*> argv;
|
||||
|
||||
void add(const std::string& s) { strs.push_back(s); }
|
||||
|
||||
void build() {
|
||||
argv.clear();
|
||||
for (const auto& s : strs) argv.push_back(s.c_str());
|
||||
argv.push_back(nullptr);
|
||||
}
|
||||
};
|
||||
#endif
|
||||
|
||||
} // anon
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Impl
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct Client::Impl {
|
||||
std::thread thread_;
|
||||
std::atomic<bool> stop_requested_{false};
|
||||
std::atomic<bool> running_{false};
|
||||
|
||||
#ifndef _WIN32
|
||||
std::atomic<pid_t> curl_pid_{0};
|
||||
#endif
|
||||
|
||||
void run(Config cfg, EventHandler on_event, StatusHandler on_status) {
|
||||
running_ = true;
|
||||
|
||||
int backoff_ms = cfg.reconnect_initial_ms;
|
||||
std::string last_id;
|
||||
|
||||
auto emit_status = [&](const std::string& s) {
|
||||
if (on_status) on_status(s);
|
||||
};
|
||||
|
||||
while (!stop_requested_) {
|
||||
emit_status("connecting");
|
||||
|
||||
#ifndef _WIN32
|
||||
// Build curl args directly — no shell wrapper, so PID == curl PID.
|
||||
CurlArgs args;
|
||||
args.add("curl");
|
||||
args.add("-N"); // no output buffering
|
||||
args.add("-sS"); // silent + show errors
|
||||
args.add("--max-time");
|
||||
args.add("0"); // no global timeout (stream is indefinite)
|
||||
args.add("--connect-timeout");
|
||||
args.add(std::to_string(cfg.connect_timeout_ms / 1000 + 1));
|
||||
args.add("-H"); args.add("Accept: text/event-stream");
|
||||
args.add("-H"); args.add("Cache-Control: no-cache");
|
||||
|
||||
if (!cfg.bearer_token.empty()) {
|
||||
args.add("-H");
|
||||
args.add("Authorization: Bearer " + cfg.bearer_token);
|
||||
}
|
||||
if (!last_id.empty()) {
|
||||
args.add("-H");
|
||||
args.add("Last-Event-ID: " + last_id);
|
||||
}
|
||||
args.add(cfg.url);
|
||||
args.build();
|
||||
|
||||
// Fork+pipe: child exec's curl, parent reads stdout.
|
||||
int pipefd[2];
|
||||
if (::pipe(pipefd) != 0) {
|
||||
emit_status("error: pipe() failed");
|
||||
if (!cfg.auto_reconnect || stop_requested_) break;
|
||||
sleep_ms(backoff_ms);
|
||||
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
|
||||
continue;
|
||||
}
|
||||
|
||||
pid_t child = ::fork();
|
||||
if (child < 0) {
|
||||
::close(pipefd[0]);
|
||||
::close(pipefd[1]);
|
||||
emit_status("error: fork() failed");
|
||||
if (!cfg.auto_reconnect || stop_requested_) break;
|
||||
sleep_ms(backoff_ms);
|
||||
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (child == 0) {
|
||||
// Child: redirect stdout to pipe write-end, stderr to /dev/null.
|
||||
::close(pipefd[0]);
|
||||
::dup2(pipefd[1], STDOUT_FILENO);
|
||||
::close(pipefd[1]);
|
||||
int devnull = ::open("/dev/null", O_WRONLY);
|
||||
if (devnull >= 0) {
|
||||
::dup2(devnull, STDERR_FILENO);
|
||||
::close(devnull);
|
||||
}
|
||||
// exec curl directly — no sh wrapper.
|
||||
::execvp("curl", const_cast<char* const*>(args.argv.data()));
|
||||
::_exit(127); // exec failed
|
||||
}
|
||||
|
||||
// Parent: close write-end, keep read-end.
|
||||
::close(pipefd[1]);
|
||||
curl_pid_ = child;
|
||||
FILE* pipe_file = ::fdopen(pipefd[0], "r");
|
||||
|
||||
if (!pipe_file) {
|
||||
::close(pipefd[0]);
|
||||
::kill(child, SIGTERM);
|
||||
int st; ::waitpid(child, &st, 0);
|
||||
curl_pid_ = 0;
|
||||
emit_status("error: fdopen() failed");
|
||||
if (!cfg.auto_reconnect || stop_requested_) break;
|
||||
sleep_ms(backoff_ms);
|
||||
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
|
||||
continue;
|
||||
}
|
||||
#else
|
||||
// Windows: popen fallback (no reliable PID, stop() may be slow).
|
||||
std::ostringstream cmd;
|
||||
cmd << "curl -N -sS --max-time 0 --connect-timeout "
|
||||
<< (cfg.connect_timeout_ms / 1000 + 1)
|
||||
<< " -H \"Accept: text/event-stream\""
|
||||
<< " -H \"Cache-Control: no-cache\"";
|
||||
if (!cfg.bearer_token.empty())
|
||||
cmd << " -H \"Authorization: Bearer " + cfg.bearer_token + "\"";
|
||||
if (!last_id.empty())
|
||||
cmd << " -H \"Last-Event-ID: " + last_id + "\"";
|
||||
cmd << " \"" << cfg.url << "\" 2>NUL";
|
||||
FILE* pipe_file = ::popen(cmd.str().c_str(), "r");
|
||||
if (!pipe_file) {
|
||||
emit_status("error: popen() failed");
|
||||
if (!cfg.auto_reconnect || stop_requested_) break;
|
||||
sleep_ms(backoff_ms);
|
||||
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
|
||||
continue;
|
||||
}
|
||||
#endif
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Read loop — parse SSE lines as they arrive from curl.
|
||||
// ---------------------------------------------------------------
|
||||
Event buf;
|
||||
buf.id = last_id;
|
||||
bool connected_emitted = false;
|
||||
char line_buf[8192];
|
||||
|
||||
while (!stop_requested_) {
|
||||
if (!fgets(line_buf, sizeof(line_buf), pipe_file)) break;
|
||||
|
||||
if (!connected_emitted) {
|
||||
emit_status("connected");
|
||||
connected_emitted = true;
|
||||
backoff_ms = cfg.reconnect_initial_ms; // reset on success
|
||||
}
|
||||
|
||||
std::string line(line_buf);
|
||||
rstrip_newline(line);
|
||||
|
||||
bool flush = false;
|
||||
parse_sse_line(line, buf, flush);
|
||||
|
||||
if (flush) {
|
||||
if (buf.retry_ms > 0) cfg.reconnect_initial_ms = buf.retry_ms;
|
||||
if (!buf.id.empty()) last_id = buf.id;
|
||||
flush_event(buf, on_event);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------
|
||||
// Cleanup: kill child (in case we exited due to stop_requested_)
|
||||
// then wait to avoid zombie.
|
||||
// ---------------------------------------------------------------
|
||||
#ifndef _WIN32
|
||||
{
|
||||
pid_t p = curl_pid_.exchange(0);
|
||||
if (p > 0) {
|
||||
::kill(p, SIGTERM);
|
||||
// Drain to let curl finish writing and avoid SIGPIPE.
|
||||
char drain[256];
|
||||
while (fgets(drain, sizeof(drain), pipe_file)) {}
|
||||
}
|
||||
fclose(pipe_file);
|
||||
if (p > 0) {
|
||||
int st = 0;
|
||||
::waitpid(p, &st, 0);
|
||||
}
|
||||
}
|
||||
#else
|
||||
::pclose(pipe_file);
|
||||
#endif
|
||||
|
||||
if (stop_requested_) break;
|
||||
|
||||
emit_status("disconnected");
|
||||
|
||||
if (!cfg.auto_reconnect) break;
|
||||
|
||||
int64_t wait_until = now_ms() + backoff_ms;
|
||||
while (!stop_requested_ && now_ms() < wait_until)
|
||||
sleep_ms(50);
|
||||
|
||||
backoff_ms = std::min(backoff_ms * 2, cfg.reconnect_max_ms);
|
||||
}
|
||||
|
||||
emit_status("disconnected");
|
||||
running_ = false;
|
||||
}
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Client
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
Client::Client() : impl_(std::make_unique<Impl>()) {}
|
||||
Client::~Client() { stop(); }
|
||||
|
||||
void Client::start(const Config& cfg,
|
||||
EventHandler on_event,
|
||||
StatusHandler on_status) {
|
||||
if (impl_->running_) return;
|
||||
impl_->stop_requested_ = false;
|
||||
impl_->thread_ = std::thread([this, cfg, on_event, on_status]() {
|
||||
impl_->run(cfg, on_event, on_status);
|
||||
});
|
||||
}
|
||||
|
||||
void Client::stop() {
|
||||
impl_->stop_requested_ = true;
|
||||
#ifndef _WIN32
|
||||
pid_t p = impl_->curl_pid_.exchange(0);
|
||||
if (p > 0) ::kill(p, SIGTERM);
|
||||
#endif
|
||||
if (impl_->thread_.joinable())
|
||||
impl_->thread_.join();
|
||||
}
|
||||
|
||||
bool Client::is_running() const {
|
||||
return impl_->running_.load();
|
||||
}
|
||||
|
||||
} // namespace fn_sse
|
||||
@@ -0,0 +1,65 @@
|
||||
// sse_client.h — Server-Sent Events C++ client with reconnect + Last-Event-ID.
|
||||
//
|
||||
// Background-thread client that reads a text/event-stream via curl -N (no
|
||||
// buffer). Parses SSE spec fields, fires per-event and status callbacks, and
|
||||
// reconnects with exponential backoff when the server closes.
|
||||
//
|
||||
// No link-time libcurl required — requires only `curl` in PATH at runtime.
|
||||
// Same transport strategy as http_request_cpp_core and llm_anthropic_cpp_core.
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace fn_sse {
|
||||
|
||||
// A single parsed SSE event per the W3C spec.
|
||||
struct Event {
|
||||
std::string event; // "message" if no "event:" field was present
|
||||
std::string data; // "data:" lines joined with "\n"; trailing "\n" stripped
|
||||
std::string id; // last "id:" seen (used as Last-Event-ID on reconnect)
|
||||
int retry_ms = 0; // from "retry:" field, 0 if absent
|
||||
};
|
||||
|
||||
struct Config {
|
||||
std::string url;
|
||||
std::string bearer_token; // optional: Authorization: Bearer <token>
|
||||
int connect_timeout_ms = 5000; // curl --connect-timeout
|
||||
int reconnect_initial_ms = 1000;
|
||||
int reconnect_max_ms = 30000; // exponential backoff cap
|
||||
bool auto_reconnect = true;
|
||||
};
|
||||
|
||||
// Asynchronous SSE client. Call start() once; the background thread loops
|
||||
// reading the stream and invoking callbacks. Call stop() to tear down.
|
||||
//
|
||||
// Thread safety: on_event / on_status are called from the background thread.
|
||||
// If they touch UI state, callers must protect with their own mutex.
|
||||
class Client {
|
||||
public:
|
||||
using EventHandler = std::function<void(const Event&)>;
|
||||
// status strings: "connecting" | "connected" | "disconnected" | "error: <msg>"
|
||||
using StatusHandler = std::function<void(const std::string&)>;
|
||||
|
||||
Client();
|
||||
~Client();
|
||||
|
||||
// Start the background thread. Idempotent — no-op if already running.
|
||||
void start(const Config& cfg,
|
||||
EventHandler on_event,
|
||||
StatusHandler on_status = nullptr);
|
||||
|
||||
// Signal the thread to stop and join. Safe to call multiple times.
|
||||
void stop();
|
||||
|
||||
bool is_running() const;
|
||||
|
||||
private:
|
||||
struct Impl;
|
||||
std::unique_ptr<Impl> impl_;
|
||||
};
|
||||
|
||||
} // namespace fn_sse
|
||||
@@ -0,0 +1,94 @@
|
||||
---
|
||||
name: sse_client
|
||||
kind: function
|
||||
lang: cpp
|
||||
domain: core
|
||||
version: 0.1.0
|
||||
description: "Cliente Server-Sent Events C++ con reconnect exponencial + Last-Event-ID. Reusable para streams HTTP push (agent_runs, fsnotify, telemetria, dashboards)."
|
||||
tags: [sse, http, client, network, agents, registry-gap]
|
||||
purity: impure
|
||||
signature: "fn_sse::Client::start(cfg, on_event, on_status) -- background thread"
|
||||
params:
|
||||
- name: cfg
|
||||
desc: "Config con url, bearer_token opcional, connect_timeout_ms, reconnect_initial_ms, reconnect_max_ms y auto_reconnect"
|
||||
- name: on_event
|
||||
desc: "Callback invocado por cada Event{event,data,id,retry_ms} parseado del stream SSE. Corre en el background thread del cliente."
|
||||
- name: on_status
|
||||
desc: "Callback opcional con strings 'connecting'|'connected'|'disconnected'|'error: <msg>' al cambiar estado de conexion."
|
||||
output: "void start(), void stop(), bool is_running() — control de un thread background que mantiene la conexion SSE viva con reconnect automatico"
|
||||
uses_functions: []
|
||||
uses_types: []
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: []
|
||||
example: |
|
||||
fn_sse::Config cfg;
|
||||
cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream";
|
||||
fn_sse::Client cli;
|
||||
cli.start(cfg,
|
||||
[](const fn_sse::Event& e){ /* handle e.data JSON */ },
|
||||
[](const std::string& s){ std::printf("[sse] %s\n", s.c_str()); });
|
||||
// ... app vive aqui; cli corre en background
|
||||
cli.stop();
|
||||
tested: true
|
||||
tests:
|
||||
- connect_and_receive_3_events
|
||||
- parse_event_field
|
||||
- reconnect_on_disconnect
|
||||
- stop_kills_thread
|
||||
test_file_path: "cpp/tests/test_sse_client.cpp"
|
||||
file_path: "cpp/functions/core/sse_client.cpp"
|
||||
framework: ""
|
||||
notes: "curl popen sin libcurl link, mismo patron que llm_anthropic y http_request. Usa fork+exec en lugar de popen puro para obtener el PID del proceso curl y poder matarlo en stop() con SIGTERM, evitando un join colgado."
|
||||
---
|
||||
|
||||
## Cuando usarla
|
||||
|
||||
- App necesita escuchar un stream HTTP push (`text/event-stream`) de forma continua.
|
||||
- Ejemplos: `kanban_cpp` escucha `/api/boards/{issues,flows}/stream` para updates en vivo cuando un `.md` cambia; `skill_tree` escucha `/api/runs/<id>/sse` para progreso de `agent_runner_api`; dashboard cross-app lee un timeline SSE.
|
||||
- NO usar para peticiones HTTP one-shot — eso es `http_request_cpp_core`.
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```cpp
|
||||
#include "core/sse_client.h"
|
||||
#include <atomic>
|
||||
#include <cstdio>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
|
||||
std::atomic<int> events_seen{0};
|
||||
|
||||
int main() {
|
||||
fn_sse::Config cfg;
|
||||
cfg.url = "http://127.0.0.1:8403/api/boards/issues/stream";
|
||||
cfg.auto_reconnect = true;
|
||||
|
||||
fn_sse::Client cli;
|
||||
cli.start(cfg,
|
||||
[](const fn_sse::Event& e) {
|
||||
events_seen++;
|
||||
std::printf("event=%s data=%s id=%s\n",
|
||||
e.event.c_str(), e.data.c_str(), e.id.c_str());
|
||||
},
|
||||
[](const std::string& status) {
|
||||
std::printf("[sse] %s\n", status.c_str());
|
||||
});
|
||||
|
||||
// App vive aqui; cli corre en background con reconnect automatico.
|
||||
std::this_thread::sleep_for(std::chrono::seconds(60));
|
||||
cli.stop();
|
||||
return 0;
|
||||
}
|
||||
```
|
||||
|
||||
## Gotchas
|
||||
|
||||
- `curl -N` (no buffer) es imprescindible — sin esto stdout queda buffered y el cliente no ve eventos hasta que el buffer se llena.
|
||||
- `--max-time 0` desactiva el timeout global de curl para streams indefinidos. Si quieres limitar la duracion total, setea `cfg.connect_timeout_ms` (solo afecta la conexion inicial).
|
||||
- Reconnect no es instantaneo: backoff exponencial 1 s..30 s por defecto. El caller debe tolerar gaps.
|
||||
- `stop()` mata el proceso curl via `SIGTERM` antes del join. Sin esto, el thread se cuelga hasta que el servidor cierre. Con fork+exec obtenemos el PID; si por algun motivo fork falla cae a `popen()` (sin PID conocido).
|
||||
- Thread safety: `on_event` y `on_status` corren en el background thread del cliente. Si tocan state de ImGui u otra UI, proteger con mutex en el caller — no desde aqui.
|
||||
- Last-Event-ID: si el servidor no manda `id:` por evento, el reconnect re-recibe el stream desde el inicio. Es responsabilidad del servidor enviar IDs si quiere resume sin repeticion.
|
||||
- Si el servidor cierra graceful (FIN, exit 0 de curl), el cliente detecta EOF y reconecta segun `auto_reconnect`. Si el servidor devuelve error HTTP (4xx/5xx), curl tambien cierra y el cliente reconecta — no hay distincion entre "servidor no disponible" y "stream terminado limpiamente".
|
||||
Reference in New Issue
Block a user