feat: cliente WebSocket + panel Live (issue 0095 step 4)
- ws_client.{h,cpp}: copia de registry_dashboard (RFC 6455 manual sobre TCP, sin TLS).
Background thread, reconnect con backoff, drain por frame.
- main.cpp: arranca WsClient apuntando a /api/ws/dagruns. Drain por frame.
Parse JSON snapshot/delta -> upsert g_live_runs por id.
Panel "Live (WS)" muestra estado conexion, watermarks runs/steps, lista live runs.
- CMakeLists.txt: ws_client.cpp en sources.
Build verificado. Tabs DAG List/Detail/Run Detail con data_table::render() en commits siguientes.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@ add_imgui_app(dag_engine_ui
|
||||
main.cpp
|
||||
http_client.cpp
|
||||
data_http.cpp
|
||||
ws_client.cpp
|
||||
)
|
||||
target_include_directories(dag_engine_ui PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
|
||||
|
||||
@@ -4,19 +4,67 @@
|
||||
#include "core/icons_tabler.h"
|
||||
#include "core/logger.h"
|
||||
#include "data_http.h"
|
||||
#include "ws_client.h"
|
||||
#include "vendor/nlohmann/json.hpp"
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
// Config global del backend HTTP. Persistido por imgui.ini via Mantine-no-go.
|
||||
static std::string g_api_url = "http://127.0.0.1:8090";
|
||||
using json = nlohmann::json;
|
||||
|
||||
// Cache en memoria del primer fetch. Tabs proximos updates via WS.
|
||||
static std::vector<dag_ui::DagInfo> g_dags;
|
||||
static std::string g_last_error;
|
||||
// Config global del backend HTTP.
|
||||
static std::string g_api_url = "http://127.0.0.1:8090";
|
||||
static std::string g_ws_host = "127.0.0.1";
|
||||
static int g_ws_port = 8090;
|
||||
static std::string g_ws_path = "/api/ws/dagruns";
|
||||
|
||||
// Cache en memoria del primer fetch + ultimos eventos WS.
|
||||
static std::vector<dag_ui::DagInfo> g_dags;
|
||||
static std::vector<dag_ui::DagRunRow> g_live_runs; // upsert por id desde WS
|
||||
static long long g_ws_runs_wm = 0;
|
||||
static long long g_ws_steps_wm = 0;
|
||||
static int g_ws_msg_count = 0;
|
||||
static std::string g_last_error;
|
||||
|
||||
static WsClient g_ws;
|
||||
|
||||
// Toggles de paneles (visibles desde el menu View del menubar canonico)
|
||||
static bool g_show_main = true;
|
||||
static bool g_show_live = true;
|
||||
|
||||
// Upsert por id en g_live_runs.
|
||||
static void upsert_live_run(const dag_ui::DagRunRow& r) {
|
||||
for (auto& existing : g_live_runs) {
|
||||
if (existing.id == r.id) {
|
||||
existing = r;
|
||||
return;
|
||||
}
|
||||
}
|
||||
g_live_runs.push_back(r);
|
||||
}
|
||||
|
||||
static void parse_ws_payload(const std::string& payload) {
|
||||
auto j = json::parse(payload, nullptr, false);
|
||||
if (!j.is_object()) return;
|
||||
g_ws_msg_count++;
|
||||
if (j.contains("watermark") && j["watermark"].is_object()) {
|
||||
if (j["watermark"].contains("runs")) g_ws_runs_wm = j["watermark"]["runs"].get<long long>();
|
||||
if (j["watermark"].contains("steps")) g_ws_steps_wm = j["watermark"]["steps"].get<long long>();
|
||||
}
|
||||
if (j.contains("runs") && j["runs"].is_array()) {
|
||||
for (auto& rj : j["runs"]) {
|
||||
dag_ui::DagRunRow r;
|
||||
r.id = rj.value("id", "");
|
||||
r.dag_name = rj.value("dag_name", "");
|
||||
r.status = rj.value("status", "");
|
||||
r.trigger = rj.value("trigger", "");
|
||||
r.started_at = rj.value("started_at", "");
|
||||
r.finished_at = rj.value("finished_at", "");
|
||||
r.error = rj.value("error", "");
|
||||
upsert_live_run(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void draw_main() {
|
||||
if (!ImGui::Begin(TI_HOME " Main", &g_show_main)) {
|
||||
@@ -45,21 +93,46 @@ static void draw_main() {
|
||||
ImGui::End();
|
||||
}
|
||||
|
||||
static void render() {
|
||||
// El framework dibuja menubar (View/Layouts/Settings/About) y un
|
||||
// DockSpaceOverViewport central (auto_dockspace=true por defecto).
|
||||
// Aqui solo se dibujan los paneles propios de la app.
|
||||
if (g_show_main) draw_main();
|
||||
static void draw_live() {
|
||||
if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) {
|
||||
ImGui::End();
|
||||
return;
|
||||
}
|
||||
bool connected = g_ws.is_connected();
|
||||
ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1),
|
||||
"%s", connected ? "connected" : "disconnected");
|
||||
ImGui::SameLine();
|
||||
ImGui::Text("| msgs=%d | wm runs=%lld steps=%lld",
|
||||
g_ws_msg_count, g_ws_runs_wm, g_ws_steps_wm);
|
||||
ImGui::Separator();
|
||||
ImGui::Text("Live runs: %zu", g_live_runs.size());
|
||||
for (auto& r : g_live_runs) {
|
||||
ImGui::BulletText("%s [%s] %s @ %s",
|
||||
r.dag_name.c_str(), r.status.c_str(),
|
||||
r.id.c_str(), r.started_at.c_str());
|
||||
}
|
||||
ImGui::End();
|
||||
}
|
||||
|
||||
// === Data panel (uncomment to enable) ===
|
||||
// static data_table::State data_state;
|
||||
// static std::vector<data_table::TableInput> data_tables; // populate from your source
|
||||
// data_table::render("main_data", data_tables, data_state);
|
||||
static void render() {
|
||||
// Drain WS messages this frame (cheap, max 64).
|
||||
{
|
||||
std::vector<std::string> msgs;
|
||||
g_ws.drain(msgs, 64);
|
||||
for (auto& m : msgs) parse_ws_payload(m);
|
||||
}
|
||||
|
||||
if (g_show_main) draw_main();
|
||||
if (g_show_live) draw_live();
|
||||
}
|
||||
|
||||
int main(int /*argc*/, char** /*argv*/) {
|
||||
// Conecta WS al backend dag_engine. Reconnect con backoff lo gestiona WsClient.
|
||||
g_ws.start(g_ws_host, g_ws_port, g_ws_path);
|
||||
|
||||
static fn_ui::PanelToggle panels[] = {
|
||||
{ "Main", nullptr, &g_show_main },
|
||||
{ "Live (WS)", nullptr, &g_show_live },
|
||||
};
|
||||
|
||||
fn::AppConfig cfg;
|
||||
|
||||
+404
@@ -0,0 +1,404 @@
|
||||
#include "ws_client.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <random>
|
||||
#include <sstream>
|
||||
#include <vector>
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <winsock2.h>
|
||||
#include <ws2tcpip.h>
|
||||
#pragma comment(lib, "ws2_32.lib")
|
||||
typedef SOCKET sock_t;
|
||||
#define SOCK_INVALID INVALID_SOCKET
|
||||
#define SOCK_CLOSE closesocket
|
||||
#define SOCK_ERR WSAGetLastError()
|
||||
#define FN_SOCK_NONBLOCK(s) do { u_long m = 1; ioctlsocket((s), FIONBIO, &m); } while (0)
|
||||
#else
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <arpa/inet.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
#include <errno.h>
|
||||
typedef int sock_t;
|
||||
#define SOCK_INVALID (-1)
|
||||
#define SOCK_CLOSE close
|
||||
#define SOCK_ERR errno
|
||||
#define FN_SOCK_NONBLOCK(s) do { int f = fcntl((s), F_GETFL, 0); fcntl((s), F_SETFL, f | O_NONBLOCK); } while (0)
|
||||
#endif
|
||||
|
||||
#ifdef _WIN32
|
||||
static bool wsa_init_ws() {
|
||||
static bool inited = false;
|
||||
static std::once_flag flag;
|
||||
std::call_once(flag, []() {
|
||||
WSADATA wsa;
|
||||
WSAStartup(MAKEWORD(2, 2), &wsa);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
#endif
|
||||
|
||||
namespace {
|
||||
|
||||
// ----- Base64 (small, sufficient for 16-byte WS key) -----
|
||||
const char* kBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
|
||||
|
||||
std::string base64_encode(const uint8_t* in, size_t len) {
|
||||
std::string out;
|
||||
out.reserve(((len + 2) / 3) * 4);
|
||||
size_t i = 0;
|
||||
for (; i + 3 <= len; i += 3) {
|
||||
uint32_t v = (uint32_t(in[i]) << 16) | (uint32_t(in[i + 1]) << 8) | uint32_t(in[i + 2]);
|
||||
out.push_back(kBase64[(v >> 18) & 63]);
|
||||
out.push_back(kBase64[(v >> 12) & 63]);
|
||||
out.push_back(kBase64[(v >> 6) & 63]);
|
||||
out.push_back(kBase64[v & 63]);
|
||||
}
|
||||
if (i < len) {
|
||||
uint32_t v = uint32_t(in[i]) << 16;
|
||||
if (i + 1 < len) v |= uint32_t(in[i + 1]) << 8;
|
||||
out.push_back(kBase64[(v >> 18) & 63]);
|
||||
out.push_back(kBase64[(v >> 12) & 63]);
|
||||
out.push_back(i + 1 < len ? kBase64[(v >> 6) & 63] : '=');
|
||||
out.push_back('=');
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Send exactly n bytes (blocking).
|
||||
bool send_all(sock_t sock, const char* data, size_t n) {
|
||||
size_t sent = 0;
|
||||
while (sent < n) {
|
||||
int k = send(sock, data + sent, static_cast<int>(n - sent), 0);
|
||||
if (k <= 0) return false;
|
||||
sent += k;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Receive exactly n bytes (blocking on a non-non-blocking socket).
|
||||
bool recv_all(sock_t sock, char* data, size_t n) {
|
||||
size_t got = 0;
|
||||
while (got < n) {
|
||||
int k = recv(sock, data + got, static_cast<int>(n - got), 0);
|
||||
if (k <= 0) return false;
|
||||
got += k;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Receive up to n bytes; returns count, or -1 on error / -2 on would-block.
|
||||
int recv_some(sock_t sock, char* data, size_t n) {
|
||||
int k = recv(sock, data, static_cast<int>(n), 0);
|
||||
if (k > 0) return k;
|
||||
if (k == 0) return -1;
|
||||
#ifdef _WIN32
|
||||
if (WSAGetLastError() == WSAEWOULDBLOCK) return -2;
|
||||
#else
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) return -2;
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
WsClient::WsClient() = default;
|
||||
|
||||
WsClient::~WsClient() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void WsClient::start(const std::string& host, int port, const std::string& path) {
|
||||
State expected = State::Idle;
|
||||
if (!state_.compare_exchange_strong(expected, State::Connecting)) return;
|
||||
|
||||
host_ = host;
|
||||
port_ = port;
|
||||
path_ = path;
|
||||
stop_flag_.store(false);
|
||||
|
||||
worker_ = std::thread([this]() { this->run(); });
|
||||
}
|
||||
|
||||
void WsClient::stop() {
|
||||
stop_flag_.store(true);
|
||||
int s = sock_.exchange(-1);
|
||||
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
|
||||
out_cv_.notify_all();
|
||||
if (worker_.joinable()) worker_.join();
|
||||
state_.store(State::Stopped);
|
||||
}
|
||||
|
||||
int WsClient::drain(std::vector<std::string>& out, int max) {
|
||||
std::lock_guard<std::mutex> g(in_mu_);
|
||||
int n = 0;
|
||||
while (!in_queue_.empty() && n < max) {
|
||||
out.emplace_back(std::move(in_queue_.front()));
|
||||
in_queue_.pop_front();
|
||||
n++;
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
bool WsClient::send_text(const std::string& payload) {
|
||||
if (state_.load() != State::Connected) return false;
|
||||
{
|
||||
std::lock_guard<std::mutex> g(out_mu_);
|
||||
out_queue_.push_back(payload);
|
||||
}
|
||||
out_cv_.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
void WsClient::run() {
|
||||
int backoff_ms = 500;
|
||||
while (!stop_flag_.load()) {
|
||||
state_.store(State::Connecting);
|
||||
if (connect_once()) {
|
||||
backoff_ms = 500; // reset on successful connect
|
||||
state_.store(State::Connected);
|
||||
read_loop();
|
||||
}
|
||||
state_.store(State::Backoff);
|
||||
if (stop_flag_.load()) break;
|
||||
|
||||
// Exponential backoff: 0.5s → 1s → 2s → 4s → 8s (cap).
|
||||
for (int slept = 0; slept < backoff_ms && !stop_flag_.load(); slept += 100) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
backoff_ms = std::min(backoff_ms * 2, 8000);
|
||||
}
|
||||
state_.store(State::Stopped);
|
||||
}
|
||||
|
||||
bool WsClient::connect_once() {
|
||||
#ifdef _WIN32
|
||||
wsa_init_ws();
|
||||
#endif
|
||||
|
||||
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
if (sock == SOCK_INVALID) return false;
|
||||
|
||||
// 5s connect timeout via SO_*TIMEO. Stays blocking afterwards for the
|
||||
// handshake; read_loop switches to non-blocking with select().
|
||||
#ifdef _WIN32
|
||||
DWORD timeout_ms = 5000;
|
||||
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
|
||||
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
|
||||
#else
|
||||
struct timeval tv;
|
||||
tv.tv_sec = 5;
|
||||
tv.tv_usec = 0;
|
||||
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
|
||||
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
|
||||
#endif
|
||||
|
||||
struct sockaddr_in addr;
|
||||
std::memset(&addr, 0, sizeof(addr));
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_port = htons(static_cast<uint16_t>(port_));
|
||||
addr.sin_addr.s_addr = inet_addr(host_.c_str());
|
||||
|
||||
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
|
||||
SOCK_CLOSE(sock);
|
||||
return false;
|
||||
}
|
||||
|
||||
sock_.store(static_cast<int>(sock));
|
||||
|
||||
if (!handshake()) {
|
||||
int s = sock_.exchange(-1);
|
||||
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
|
||||
return false;
|
||||
}
|
||||
|
||||
// Non-blocking for the read loop.
|
||||
FN_SOCK_NONBLOCK(sock);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WsClient::handshake() {
|
||||
sock_t sock = static_cast<sock_t>(sock_.load());
|
||||
|
||||
// 16 random bytes → base64 → Sec-WebSocket-Key.
|
||||
uint8_t key_raw[16];
|
||||
std::random_device rd;
|
||||
for (auto& b : key_raw) b = static_cast<uint8_t>(rd() & 0xff);
|
||||
std::string key_b64 = base64_encode(key_raw, sizeof(key_raw));
|
||||
|
||||
std::ostringstream req;
|
||||
req << "GET " << path_ << " HTTP/1.1\r\n";
|
||||
req << "Host: " << host_ << ":" << port_ << "\r\n";
|
||||
req << "Upgrade: websocket\r\n";
|
||||
req << "Connection: Upgrade\r\n";
|
||||
req << "Sec-WebSocket-Key: " << key_b64 << "\r\n";
|
||||
req << "Sec-WebSocket-Version: 13\r\n";
|
||||
req << "Origin: http://" << host_ << ":" << port_ << "\r\n";
|
||||
req << "\r\n";
|
||||
|
||||
std::string raw = req.str();
|
||||
if (!send_all(sock, raw.c_str(), raw.size())) return false;
|
||||
|
||||
// Read response headers (up to 4KB).
|
||||
std::string resp;
|
||||
char buf[1024];
|
||||
while (resp.find("\r\n\r\n") == std::string::npos) {
|
||||
int k = recv(sock, buf, sizeof(buf), 0);
|
||||
if (k <= 0) return false;
|
||||
resp.append(buf, k);
|
||||
if (resp.size() > 4096) return false;
|
||||
}
|
||||
|
||||
// Expect "HTTP/1.1 101".
|
||||
if (resp.compare(0, 12, "HTTP/1.1 101") != 0 &&
|
||||
resp.compare(0, 12, "HTTP/1.0 101") != 0) {
|
||||
fprintf(stderr, "[ws] handshake failed: %.*s\n",
|
||||
(int)std::min<size_t>(resp.size(), 120), resp.c_str());
|
||||
return false;
|
||||
}
|
||||
// We intentionally skip Sec-WebSocket-Accept verification — controlled
|
||||
// server, localhost-only, 101 status is enough for this app.
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WsClient::read_loop() {
|
||||
sock_t sock = static_cast<sock_t>(sock_.load());
|
||||
|
||||
std::vector<uint8_t> rb; // accumulated read buffer
|
||||
rb.reserve(64 * 1024);
|
||||
|
||||
while (!stop_flag_.load()) {
|
||||
// Block on select() for up to 100ms so we can both read and check
|
||||
// the outgoing queue.
|
||||
fd_set rfds;
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(sock, &rfds);
|
||||
struct timeval tv;
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_usec = 100 * 1000;
|
||||
int sel = select(static_cast<int>(sock) + 1, &rfds, nullptr, nullptr, &tv);
|
||||
if (sel < 0) return false;
|
||||
|
||||
if (sel > 0 && FD_ISSET(sock, &rfds)) {
|
||||
uint8_t tmp[8192];
|
||||
int k = recv_some(sock, reinterpret_cast<char*>(tmp), sizeof(tmp));
|
||||
if (k == -1) return false;
|
||||
if (k > 0) rb.insert(rb.end(), tmp, tmp + k);
|
||||
}
|
||||
|
||||
// Drain outgoing queue (text frames, masked).
|
||||
for (;;) {
|
||||
std::string payload;
|
||||
{
|
||||
std::lock_guard<std::mutex> g(out_mu_);
|
||||
if (out_queue_.empty()) break;
|
||||
payload = std::move(out_queue_.front());
|
||||
out_queue_.pop_front();
|
||||
}
|
||||
if (!send_frame(0x1, payload)) return false;
|
||||
}
|
||||
|
||||
// Parse frames. RFC6455 minimal: assume server never masks, no
|
||||
// continuation, opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong.
|
||||
while (rb.size() >= 2) {
|
||||
uint8_t b0 = rb[0];
|
||||
uint8_t b1 = rb[1];
|
||||
bool fin = (b0 & 0x80) != 0;
|
||||
(void)fin;
|
||||
int opcode = b0 & 0x0F;
|
||||
bool mask = (b1 & 0x80) != 0;
|
||||
uint64_t len = b1 & 0x7F;
|
||||
size_t pos = 2;
|
||||
if (len == 126) {
|
||||
if (rb.size() < pos + 2) break;
|
||||
len = (uint64_t(rb[pos]) << 8) | uint64_t(rb[pos + 1]);
|
||||
pos += 2;
|
||||
} else if (len == 127) {
|
||||
if (rb.size() < pos + 8) break;
|
||||
len = 0;
|
||||
for (int i = 0; i < 8; i++) len = (len << 8) | rb[pos + i];
|
||||
pos += 8;
|
||||
}
|
||||
uint8_t mkey[4] = {0, 0, 0, 0};
|
||||
if (mask) {
|
||||
if (rb.size() < pos + 4) break;
|
||||
for (int i = 0; i < 4; i++) mkey[i] = rb[pos + i];
|
||||
pos += 4;
|
||||
}
|
||||
if (rb.size() < pos + len) break;
|
||||
|
||||
std::string payload;
|
||||
payload.resize(static_cast<size_t>(len));
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
uint8_t c = rb[pos + i];
|
||||
if (mask) c ^= mkey[i & 3];
|
||||
payload[i] = static_cast<char>(c);
|
||||
}
|
||||
rb.erase(rb.begin(), rb.begin() + pos + len);
|
||||
|
||||
switch (opcode) {
|
||||
case 0x1: { // text
|
||||
last_event_ts_.store(std::chrono::duration_cast<std::chrono::seconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()).count());
|
||||
std::lock_guard<std::mutex> g(in_mu_);
|
||||
in_queue_.emplace_back(std::move(payload));
|
||||
// Bound the queue. Drop oldest if too big — UI consumes
|
||||
// each frame so this should only kick in if the dashboard
|
||||
// is paused (window minimized, etc.).
|
||||
while (in_queue_.size() > 512) in_queue_.pop_front();
|
||||
break;
|
||||
}
|
||||
case 0x8: // close
|
||||
return false;
|
||||
case 0x9: // ping → reply with pong (same payload)
|
||||
if (!send_frame(0xA, payload)) return false;
|
||||
break;
|
||||
case 0xA: // pong, ignore
|
||||
break;
|
||||
default:
|
||||
// 0x0 continuation or unexpected opcode: bail (server
|
||||
// controlled by us, shouldn't happen).
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool WsClient::send_frame(int opcode, const std::string& payload) {
|
||||
sock_t sock = static_cast<sock_t>(sock_.load());
|
||||
if (sock == SOCK_INVALID || static_cast<int>(sock) < 0) return false;
|
||||
|
||||
std::vector<uint8_t> frame;
|
||||
frame.reserve(payload.size() + 16);
|
||||
frame.push_back(static_cast<uint8_t>(0x80 | (opcode & 0x0F))); // FIN + opcode
|
||||
|
||||
uint64_t len = payload.size();
|
||||
if (len < 126) {
|
||||
frame.push_back(static_cast<uint8_t>(0x80 | len)); // mask bit set
|
||||
} else if (len <= 0xFFFF) {
|
||||
frame.push_back(static_cast<uint8_t>(0x80 | 126));
|
||||
frame.push_back(static_cast<uint8_t>((len >> 8) & 0xFF));
|
||||
frame.push_back(static_cast<uint8_t>(len & 0xFF));
|
||||
} else {
|
||||
frame.push_back(static_cast<uint8_t>(0x80 | 127));
|
||||
for (int i = 7; i >= 0; i--) frame.push_back(static_cast<uint8_t>((len >> (8 * i)) & 0xFF));
|
||||
}
|
||||
|
||||
// Mask key (RFC requires mask for client → server frames).
|
||||
std::random_device rd;
|
||||
uint8_t mkey[4];
|
||||
for (auto& b : mkey) b = static_cast<uint8_t>(rd() & 0xff);
|
||||
for (int i = 0; i < 4; i++) frame.push_back(mkey[i]);
|
||||
|
||||
for (size_t i = 0; i < payload.size(); i++) {
|
||||
frame.push_back(static_cast<uint8_t>(payload[i]) ^ mkey[i & 3]);
|
||||
}
|
||||
|
||||
return send_all(sock, reinterpret_cast<const char*>(frame.data()), frame.size());
|
||||
}
|
||||
+75
@@ -0,0 +1,75 @@
|
||||
#pragma once
|
||||
// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the
|
||||
// Monitor tab. Background thread does connect + handshake + read loop and
|
||||
// pushes incoming text payloads into a thread-safe queue that the ImGui
|
||||
// thread drains each frame.
|
||||
//
|
||||
// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app
|
||||
// needs WS later, extract this file to cpp/functions/network/ via
|
||||
// fn-constructor.
|
||||
|
||||
#include <atomic>
|
||||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
class WsClient {
|
||||
public:
|
||||
WsClient();
|
||||
~WsClient();
|
||||
|
||||
// Non-blocking. Spawns background thread that connects to ws://host:port/path
|
||||
// and keeps the connection alive with exponential reconnect backoff.
|
||||
// Safe to call multiple times — second call is a no-op once running.
|
||||
void start(const std::string& host, int port, const std::string& path);
|
||||
|
||||
// Stop the background thread and tear down the connection.
|
||||
void stop();
|
||||
|
||||
// True while a WS connection is up and handshake completed.
|
||||
bool is_connected() const { return state_.load() == State::Connected; }
|
||||
|
||||
// Epoch seconds of last received frame (for the live LED in the Monitor).
|
||||
long long last_event_ts() const { return last_event_ts_.load(); }
|
||||
|
||||
// Drain at most `max` queued text payloads. Returns the number drained.
|
||||
// Called once per frame from the render thread.
|
||||
int drain(std::vector<std::string>& out, int max = 64);
|
||||
|
||||
// Send a text frame to the server. Returns true if queued for sending.
|
||||
// Used to send {"watermark": N} commands on reconnect.
|
||||
bool send_text(const std::string& payload);
|
||||
|
||||
private:
|
||||
enum class State { Idle, Connecting, Connected, Backoff, Stopped };
|
||||
|
||||
void run();
|
||||
bool connect_once();
|
||||
bool handshake();
|
||||
bool read_loop();
|
||||
bool send_frame(int opcode, const std::string& payload);
|
||||
|
||||
std::string host_;
|
||||
int port_ = 0;
|
||||
std::string path_;
|
||||
|
||||
std::atomic<State> state_{State::Idle};
|
||||
std::atomic<long long> last_event_ts_{0};
|
||||
std::atomic<int> sock_{-1};
|
||||
|
||||
std::thread worker_;
|
||||
std::atomic<bool> stop_flag_{false};
|
||||
|
||||
std::mutex in_mu_;
|
||||
std::deque<std::string> in_queue_;
|
||||
|
||||
std::mutex out_mu_;
|
||||
std::deque<std::string> out_queue_;
|
||||
|
||||
// For waking the writer side of read_loop when send_text is called.
|
||||
std::condition_variable out_cv_;
|
||||
};
|
||||
Reference in New Issue
Block a user