feat(monitor): WS client live stream + apply snapshot/delta to UI

Hand-rolled minimal RFC6455 WebSocket client (~330 LOC) tailored to the
Monitor tab's needs. Single endpoint, text frames, masked client→server,
exponential reconnect backoff (0.5s → 8s cap), thread-safe in/out queues.
TLS is intentionally out of scope (localhost-only). Sec-WebSocket-Accept
verification is skipped — the server is controlled, 101 status is enough.

Files:
- ws_client.{h,cpp}: WsClient with start(host,port,path), drain(), send_text(),
  is_connected(), last_event_ts(). Worker thread does connect + handshake +
  read_loop + reconnect.
- CMakeLists.txt: pulls ws_client.cpp into the dashboard target. ws2_32 was
  already linked for http_client.cpp.
- main.cpp: parses host:port from --api URL, spawns a global WsClient, and
  drains its queue once per render frame via poll_ws(). apply_ws_message()
  routes JSON to g_data.claude:
    snapshot → replace KPIs + recent_executions
    delta    → append rows, increment total_calls / total_errors
  monitor_set_ws_state() forwards connection state + last_event_ts to the
  Monitor toolbar LED.

End-to-end smoke test passed against sqlite_api on localhost:8484:
- Snapshot received with KPIs + 100 recent rows.
- After INSERT INTO calls, delta arrives within ~250ms (server ticker).
- Errors (success=0) propagate correctly and bump the Errors KPI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:34:03 +02:00
parent 87b7ef45ff
commit 9205567d5f
4 changed files with 612 additions and 0 deletions
+1
View File
@@ -19,6 +19,7 @@ add_imgui_app(registry_dashboard
data.cpp
data_http.cpp
http_client.cpp
ws_client.cpp
views.cpp
${CMAKE_SOURCE_DIR}/functions/viz/kpi_card.cpp
${CMAKE_SOURCE_DIR}/functions/viz/bar_chart.cpp
+133
View File
@@ -8,18 +8,135 @@
#include "data.h"
#include "data_http.h"
#include "views.h"
#include "ws_client.h"
#include "nlohmann/json.hpp"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <fstream>
#include <vector>
static RegistryData g_data;
static std::string g_db_path;
static std::string g_api_url;
static bool g_loaded = false;
static bool g_using_http = false;
static WsClient g_ws;
// Parse "http://host:port" → host, port. Devuelve false si no encaja.
static bool parse_http_url(const std::string& url, std::string& host, int& port) {
static const char* kPrefix = "http://";
if (url.rfind(kPrefix, 0) != 0) return false;
std::string rest = url.substr(std::strlen(kPrefix));
auto colon = rest.find(':');
if (colon == std::string::npos) {
host = rest;
port = 80;
return true;
}
host = rest.substr(0, colon);
auto slash = rest.find('/', colon + 1);
std::string port_str = (slash == std::string::npos)
? rest.substr(colon + 1)
: rest.substr(colon + 1, slash - colon - 1);
port = std::atoi(port_str.c_str());
return port > 0;
}
// Aplica un mensaje JSON recibido por WS a g_data.claude. Tipos:
// - "snapshot": reemplaza KPIs y la lista entera de recent_executions.
// - "delta": append rows (front), dedup por id, recalcula KPIs.
// Devuelve true si el mensaje era valido.
static bool apply_ws_message(const std::string& raw) {
using nlohmann::json;
json msg = json::parse(raw, nullptr, false);
if (!msg.is_object()) return false;
const std::string type = msg.value("type", "");
if (type != "snapshot" && type != "delta") return false;
if (msg.contains("server_time") && msg["server_time"].is_number_integer()) {
g_data.claude.last_event_ts = msg["server_time"].get<long long>();
}
if (msg.contains("watermark") && msg["watermark"].is_number_integer()) {
long long w = msg["watermark"].get<long long>();
if (w > g_data.claude.last_seen_call_id) g_data.claude.last_seen_call_id = w;
}
// Snapshot reemplaza KPIs. Delta los actualiza por incremento.
if (type == "snapshot" && msg.contains("stats") && msg["stats"].is_object()) {
const auto& s = msg["stats"];
g_data.claude.total_calls = s.value("total_calls", 0);
g_data.claude.total_errors = s.value("total_errors", 0);
g_data.claude.total_violations = s.value("total_violations", 0);
g_data.claude.total_copies = s.value("total_copies", 0);
g_data.claude.total_versions = s.value("total_versions", 0);
g_data.claude.available = true;
}
if (!msg.contains("calls") || !msg["calls"].is_array()) return true;
if (type == "snapshot") {
g_data.claude.recent_executions.clear();
}
// Construye filas nuevas
std::vector<RecentExecutionRow> incoming;
incoming.reserve(msg["calls"].size());
int new_errors = 0;
for (const auto& c : msg["calls"]) {
RecentExecutionRow row;
row.id = c.value("id", 0LL);
row.ts = c.value("ts", 0LL);
row.function_id = c.value("function_id", "");
row.tool_used = c.value("tool_used", "");
row.duration_ms = c.value("duration_ms", 0);
row.success = c.value("success", true);
row.error_class = c.value("error_class", "");
row.session_id = c.value("session_id", "");
if (!row.success) new_errors++;
incoming.push_back(std::move(row));
}
if (type == "delta") {
g_data.claude.total_calls += static_cast<int>(incoming.size());
g_data.claude.total_errors += new_errors;
}
// Prepend (newer al frente). Para delta: filas vienen ASC del server,
// las anadimos al frente en orden inverso. Para snapshot: ya vienen DESC.
if (type == "delta") {
for (auto it = incoming.rbegin(); it != incoming.rend(); ++it) {
g_data.claude.recent_executions.insert(
g_data.claude.recent_executions.begin(), std::move(*it));
}
} else {
for (auto& row : incoming) {
g_data.claude.recent_executions.push_back(std::move(row));
}
}
// Cap list (UI muestra ~100). Evita crecer indefinidamente con deltas.
const size_t kCap = 200;
if (g_data.claude.recent_executions.size() > kCap) {
g_data.claude.recent_executions.resize(kCap);
}
return true;
}
static void poll_ws() {
bool connected = g_ws.is_connected();
long long ts = g_ws.last_event_ts();
monitor_set_ws_state(connected, ts);
std::vector<std::string> msgs;
g_ws.drain(msgs, 32);
for (const auto& m : msgs) {
apply_ws_message(m);
}
}
static void reload_data() {
// Conservar la ventana del Monitor entre recargas (no se pierde al refrescar).
@@ -73,6 +190,11 @@ static void render() {
reload_monitor();
}
// Issue 0086: drena la cola de mensajes WS y aplica deltas a g_data.
// No bloquea — drain es O(N) sobre los mensajes encolados desde el
// ultimo frame (tipicamente 0-3).
poll_ws();
if (!g_loaded) {
fullscreen_window_begin("##error");
ImGui::TextColored(ImVec4(1, 0.3f, 0.3f, 1),
@@ -187,6 +309,17 @@ int main(int argc, char** argv) {
reload_data();
// Issue 0086: lanza el cliente WS al hub de eventos. El hub solo arranca
// su ticker cuando recibe el primer subscriber, asi que esta conexion
// tambien le dice al servidor "empieza a streamear".
{
std::string ws_host;
int ws_port = 0;
if (parse_http_url(g_api_url, ws_host, ws_port)) {
g_ws.start(ws_host, ws_port, "/api/events/call_monitor");
}
}
return fn::run_app(
{.title = "fn_registry Dashboard",
.width = 1600,
+404
View File
@@ -0,0 +1,404 @@
#include "ws_client.h"
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <random>
#include <sstream>
#include <vector>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET sock_t;
#define SOCK_INVALID INVALID_SOCKET
#define SOCK_CLOSE closesocket
#define SOCK_ERR WSAGetLastError()
#define FN_SOCK_NONBLOCK(s) do { u_long m = 1; ioctlsocket((s), FIONBIO, &m); } while (0)
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
typedef int sock_t;
#define SOCK_INVALID (-1)
#define SOCK_CLOSE close
#define SOCK_ERR errno
#define FN_SOCK_NONBLOCK(s) do { int f = fcntl((s), F_GETFL, 0); fcntl((s), F_SETFL, f | O_NONBLOCK); } while (0)
#endif
#ifdef _WIN32
static bool wsa_init_ws() {
static bool inited = false;
static std::once_flag flag;
std::call_once(flag, []() {
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
});
return true;
}
#endif
namespace {
// ----- Base64 (small, sufficient for 16-byte WS key) -----
const char* kBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
std::string base64_encode(const uint8_t* in, size_t len) {
std::string out;
out.reserve(((len + 2) / 3) * 4);
size_t i = 0;
for (; i + 3 <= len; i += 3) {
uint32_t v = (uint32_t(in[i]) << 16) | (uint32_t(in[i + 1]) << 8) | uint32_t(in[i + 2]);
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(kBase64[(v >> 6) & 63]);
out.push_back(kBase64[v & 63]);
}
if (i < len) {
uint32_t v = uint32_t(in[i]) << 16;
if (i + 1 < len) v |= uint32_t(in[i + 1]) << 8;
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(i + 1 < len ? kBase64[(v >> 6) & 63] : '=');
out.push_back('=');
}
return out;
}
// Send exactly n bytes (blocking).
bool send_all(sock_t sock, const char* data, size_t n) {
size_t sent = 0;
while (sent < n) {
int k = send(sock, data + sent, static_cast<int>(n - sent), 0);
if (k <= 0) return false;
sent += k;
}
return true;
}
// Receive exactly n bytes (blocking on a non-non-blocking socket).
bool recv_all(sock_t sock, char* data, size_t n) {
size_t got = 0;
while (got < n) {
int k = recv(sock, data + got, static_cast<int>(n - got), 0);
if (k <= 0) return false;
got += k;
}
return true;
}
// Receive up to n bytes; returns count, or -1 on error / -2 on would-block.
int recv_some(sock_t sock, char* data, size_t n) {
int k = recv(sock, data, static_cast<int>(n), 0);
if (k > 0) return k;
if (k == 0) return -1;
#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK) return -2;
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) return -2;
#endif
return -1;
}
} // namespace
WsClient::WsClient() = default;
WsClient::~WsClient() {
stop();
}
void WsClient::start(const std::string& host, int port, const std::string& path) {
State expected = State::Idle;
if (!state_.compare_exchange_strong(expected, State::Connecting)) return;
host_ = host;
port_ = port;
path_ = path;
stop_flag_.store(false);
worker_ = std::thread([this]() { this->run(); });
}
void WsClient::stop() {
stop_flag_.store(true);
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
out_cv_.notify_all();
if (worker_.joinable()) worker_.join();
state_.store(State::Stopped);
}
int WsClient::drain(std::vector<std::string>& out, int max) {
std::lock_guard<std::mutex> g(in_mu_);
int n = 0;
while (!in_queue_.empty() && n < max) {
out.emplace_back(std::move(in_queue_.front()));
in_queue_.pop_front();
n++;
}
return n;
}
bool WsClient::send_text(const std::string& payload) {
if (state_.load() != State::Connected) return false;
{
std::lock_guard<std::mutex> g(out_mu_);
out_queue_.push_back(payload);
}
out_cv_.notify_one();
return true;
}
void WsClient::run() {
int backoff_ms = 500;
while (!stop_flag_.load()) {
state_.store(State::Connecting);
if (connect_once()) {
backoff_ms = 500; // reset on successful connect
state_.store(State::Connected);
read_loop();
}
state_.store(State::Backoff);
if (stop_flag_.load()) break;
// Exponential backoff: 0.5s → 1s → 2s → 4s → 8s (cap).
for (int slept = 0; slept < backoff_ms && !stop_flag_.load(); slept += 100) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
backoff_ms = std::min(backoff_ms * 2, 8000);
}
state_.store(State::Stopped);
}
bool WsClient::connect_once() {
#ifdef _WIN32
wsa_init_ws();
#endif
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == SOCK_INVALID) return false;
// 5s connect timeout via SO_*TIMEO. Stays blocking afterwards for the
// handshake; read_loop switches to non-blocking with select().
#ifdef _WIN32
DWORD timeout_ms = 5000;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
struct sockaddr_in addr;
std::memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<uint16_t>(port_));
addr.sin_addr.s_addr = inet_addr(host_.c_str());
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
SOCK_CLOSE(sock);
return false;
}
sock_.store(static_cast<int>(sock));
if (!handshake()) {
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
return false;
}
// Non-blocking for the read loop.
FN_SOCK_NONBLOCK(sock);
return true;
}
bool WsClient::handshake() {
sock_t sock = static_cast<sock_t>(sock_.load());
// 16 random bytes → base64 → Sec-WebSocket-Key.
uint8_t key_raw[16];
std::random_device rd;
for (auto& b : key_raw) b = static_cast<uint8_t>(rd() & 0xff);
std::string key_b64 = base64_encode(key_raw, sizeof(key_raw));
std::ostringstream req;
req << "GET " << path_ << " HTTP/1.1\r\n";
req << "Host: " << host_ << ":" << port_ << "\r\n";
req << "Upgrade: websocket\r\n";
req << "Connection: Upgrade\r\n";
req << "Sec-WebSocket-Key: " << key_b64 << "\r\n";
req << "Sec-WebSocket-Version: 13\r\n";
req << "Origin: http://" << host_ << ":" << port_ << "\r\n";
req << "\r\n";
std::string raw = req.str();
if (!send_all(sock, raw.c_str(), raw.size())) return false;
// Read response headers (up to 4KB).
std::string resp;
char buf[1024];
while (resp.find("\r\n\r\n") == std::string::npos) {
int k = recv(sock, buf, sizeof(buf), 0);
if (k <= 0) return false;
resp.append(buf, k);
if (resp.size() > 4096) return false;
}
// Expect "HTTP/1.1 101".
if (resp.compare(0, 12, "HTTP/1.1 101") != 0 &&
resp.compare(0, 12, "HTTP/1.0 101") != 0) {
fprintf(stderr, "[ws] handshake failed: %.*s\n",
(int)std::min<size_t>(resp.size(), 120), resp.c_str());
return false;
}
// We intentionally skip Sec-WebSocket-Accept verification — controlled
// server, localhost-only, 101 status is enough for this app.
return true;
}
bool WsClient::read_loop() {
sock_t sock = static_cast<sock_t>(sock_.load());
std::vector<uint8_t> rb; // accumulated read buffer
rb.reserve(64 * 1024);
while (!stop_flag_.load()) {
// Block on select() for up to 100ms so we can both read and check
// the outgoing queue.
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100 * 1000;
int sel = select(static_cast<int>(sock) + 1, &rfds, nullptr, nullptr, &tv);
if (sel < 0) return false;
if (sel > 0 && FD_ISSET(sock, &rfds)) {
uint8_t tmp[8192];
int k = recv_some(sock, reinterpret_cast<char*>(tmp), sizeof(tmp));
if (k == -1) return false;
if (k > 0) rb.insert(rb.end(), tmp, tmp + k);
}
// Drain outgoing queue (text frames, masked).
for (;;) {
std::string payload;
{
std::lock_guard<std::mutex> g(out_mu_);
if (out_queue_.empty()) break;
payload = std::move(out_queue_.front());
out_queue_.pop_front();
}
if (!send_frame(0x1, payload)) return false;
}
// Parse frames. RFC6455 minimal: assume server never masks, no
// continuation, opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong.
while (rb.size() >= 2) {
uint8_t b0 = rb[0];
uint8_t b1 = rb[1];
bool fin = (b0 & 0x80) != 0;
(void)fin;
int opcode = b0 & 0x0F;
bool mask = (b1 & 0x80) != 0;
uint64_t len = b1 & 0x7F;
size_t pos = 2;
if (len == 126) {
if (rb.size() < pos + 2) break;
len = (uint64_t(rb[pos]) << 8) | uint64_t(rb[pos + 1]);
pos += 2;
} else if (len == 127) {
if (rb.size() < pos + 8) break;
len = 0;
for (int i = 0; i < 8; i++) len = (len << 8) | rb[pos + i];
pos += 8;
}
uint8_t mkey[4] = {0, 0, 0, 0};
if (mask) {
if (rb.size() < pos + 4) break;
for (int i = 0; i < 4; i++) mkey[i] = rb[pos + i];
pos += 4;
}
if (rb.size() < pos + len) break;
std::string payload;
payload.resize(static_cast<size_t>(len));
for (size_t i = 0; i < len; i++) {
uint8_t c = rb[pos + i];
if (mask) c ^= mkey[i & 3];
payload[i] = static_cast<char>(c);
}
rb.erase(rb.begin(), rb.begin() + pos + len);
switch (opcode) {
case 0x1: { // text
last_event_ts_.store(std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count());
std::lock_guard<std::mutex> g(in_mu_);
in_queue_.emplace_back(std::move(payload));
// Bound the queue. Drop oldest if too big — UI consumes
// each frame so this should only kick in if the dashboard
// is paused (window minimized, etc.).
while (in_queue_.size() > 512) in_queue_.pop_front();
break;
}
case 0x8: // close
return false;
case 0x9: // ping → reply with pong (same payload)
if (!send_frame(0xA, payload)) return false;
break;
case 0xA: // pong, ignore
break;
default:
// 0x0 continuation or unexpected opcode: bail (server
// controlled by us, shouldn't happen).
return false;
}
}
}
return true;
}
bool WsClient::send_frame(int opcode, const std::string& payload) {
sock_t sock = static_cast<sock_t>(sock_.load());
if (sock == SOCK_INVALID || static_cast<int>(sock) < 0) return false;
std::vector<uint8_t> frame;
frame.reserve(payload.size() + 16);
frame.push_back(static_cast<uint8_t>(0x80 | (opcode & 0x0F))); // FIN + opcode
uint64_t len = payload.size();
if (len < 126) {
frame.push_back(static_cast<uint8_t>(0x80 | len)); // mask bit set
} else if (len <= 0xFFFF) {
frame.push_back(static_cast<uint8_t>(0x80 | 126));
frame.push_back(static_cast<uint8_t>((len >> 8) & 0xFF));
frame.push_back(static_cast<uint8_t>(len & 0xFF));
} else {
frame.push_back(static_cast<uint8_t>(0x80 | 127));
for (int i = 7; i >= 0; i--) frame.push_back(static_cast<uint8_t>((len >> (8 * i)) & 0xFF));
}
// Mask key (RFC requires mask for client → server frames).
std::random_device rd;
uint8_t mkey[4];
for (auto& b : mkey) b = static_cast<uint8_t>(rd() & 0xff);
for (int i = 0; i < 4; i++) frame.push_back(mkey[i]);
for (size_t i = 0; i < payload.size(); i++) {
frame.push_back(static_cast<uint8_t>(payload[i]) ^ mkey[i & 3]);
}
return send_all(sock, reinterpret_cast<const char*>(frame.data()), frame.size());
}
+74
View File
@@ -0,0 +1,74 @@
#pragma once
// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the
// Monitor tab. Background thread does connect + handshake + read loop and
// pushes incoming text payloads into a thread-safe queue that the ImGui
// thread drains each frame.
//
// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app
// needs WS later, extract this file to cpp/functions/network/ via
// fn-constructor.
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
class WsClient {
public:
WsClient();
~WsClient();
// Non-blocking. Spawns background thread that connects to ws://host:port/path
// and keeps the connection alive with exponential reconnect backoff.
// Safe to call multiple times — second call is a no-op once running.
void start(const std::string& host, int port, const std::string& path);
// Stop the background thread and tear down the connection.
void stop();
// True while a WS connection is up and handshake completed.
bool is_connected() const { return state_.load() == State::Connected; }
// Epoch seconds of last received frame (for the live LED in the Monitor).
long long last_event_ts() const { return last_event_ts_.load(); }
// Drain at most `max` queued text payloads. Returns the number drained.
// Called once per frame from the render thread.
int drain(std::vector<std::string>& out, int max = 64);
// Send a text frame to the server. Returns true if queued for sending.
// Used to send {"watermark": N} commands on reconnect.
bool send_text(const std::string& payload);
private:
enum class State { Idle, Connecting, Connected, Backoff, Stopped };
void run();
bool connect_once();
bool handshake();
bool read_loop();
bool send_frame(int opcode, const std::string& payload);
std::string host_;
int port_ = 0;
std::string path_;
std::atomic<State> state_{State::Idle};
std::atomic<long long> last_event_ts_{0};
std::atomic<int> sock_{-1};
std::thread worker_;
std::atomic<bool> stop_flag_{false};
std::mutex in_mu_;
std::deque<std::string> in_queue_;
std::mutex out_mu_;
std::deque<std::string> out_queue_;
// For waking the writer side of read_loop when send_text is called.
std::condition_variable out_cv_;
};