Files
registry_dashboard/ws_client.h
T
egutierrez 9205567d5f feat(monitor): WS client live stream + apply snapshot/delta to UI
Hand-rolled minimal RFC6455 WebSocket client (~330 LOC) tailored to the
Monitor tab's needs. Single endpoint, text frames, masked client→server,
exponential reconnect backoff (0.5s → 8s cap), thread-safe in/out queues.
TLS is intentionally out of scope (localhost-only). Sec-WebSocket-Accept
verification is skipped — the server is controlled, 101 status is enough.

Files:
- ws_client.{h,cpp}: WsClient with start(host,port,path), drain(), send_text(),
  is_connected(), last_event_ts(). Worker thread does connect + handshake +
  read_loop + reconnect.
- CMakeLists.txt: pulls ws_client.cpp into the dashboard target. ws2_32 was
  already linked for http_client.cpp.
- main.cpp: parses host:port from --api URL, spawns a global WsClient, and
  drains its queue once per render frame via poll_ws(). apply_ws_message()
  routes JSON to g_data.claude:
    snapshot → replace KPIs + recent_executions
    delta    → append rows, increment total_calls / total_errors
  monitor_set_ws_state() forwards connection state + last_event_ts to the
  Monitor toolbar LED.

End-to-end smoke test passed against sqlite_api on localhost:8484:
- Snapshot received with KPIs + 100 recent rows.
- After INSERT INTO calls, delta arrives within ~250ms (server ticker).
- Errors (success=0) propagate correctly and bump the Errors KPI.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 00:34:03 +02:00

75 lines
2.4 KiB
C++

#pragma once
// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the
// Monitor tab. Background thread does connect + handshake + read loop and
// pushes incoming text payloads into a thread-safe queue that the ImGui
// thread drains each frame.
//
// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app
// needs WS later, extract this file to cpp/functions/network/ via
// fn-constructor.
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
class WsClient {
public:
WsClient();
~WsClient();
// Non-blocking. Spawns background thread that connects to ws://host:port/path
// and keeps the connection alive with exponential reconnect backoff.
// Safe to call multiple times — second call is a no-op once running.
void start(const std::string& host, int port, const std::string& path);
// Stop the background thread and tear down the connection.
void stop();
// True while a WS connection is up and handshake completed.
bool is_connected() const { return state_.load() == State::Connected; }
// Epoch seconds of last received frame (for the live LED in the Monitor).
long long last_event_ts() const { return last_event_ts_.load(); }
// Drain at most `max` queued text payloads. Returns the number drained.
// Called once per frame from the render thread.
int drain(std::vector<std::string>& out, int max = 64);
// Send a text frame to the server. Returns true if queued for sending.
// Used to send {"watermark": N} commands on reconnect.
bool send_text(const std::string& payload);
private:
enum class State { Idle, Connecting, Connected, Backoff, Stopped };
void run();
bool connect_once();
bool handshake();
bool read_loop();
bool send_frame(int opcode, const std::string& payload);
std::string host_;
int port_ = 0;
std::string path_;
std::atomic<State> state_{State::Idle};
std::atomic<long long> last_event_ts_{0};
std::atomic<int> sock_{-1};
std::thread worker_;
std::atomic<bool> stop_flag_{false};
std::mutex in_mu_;
std::deque<std::string> in_queue_;
std::mutex out_mu_;
std::deque<std::string> out_queue_;
// For waking the writer side of read_loop when send_text is called.
std::condition_variable out_cv_;
};