From d01c7157a14946d02a8bb4446a455ce574548b5a Mon Sep 17 00:00:00 2001 From: Egutierrez Date: Fri, 15 May 2026 16:47:18 +0200 Subject: [PATCH] feat: cliente WebSocket + panel Live (issue 0095 step 4) - ws_client.{h,cpp}: copia de registry_dashboard (RFC 6455 manual sobre TCP, sin TLS). Background thread, reconnect con backoff, drain por frame. - main.cpp: arranca WsClient apuntando a /api/ws/dagruns. Drain por frame. Parse JSON snapshot/delta -> upsert g_live_runs por id. Panel "Live (WS)" muestra estado conexion, watermarks runs/steps, lista live runs. - CMakeLists.txt: ws_client.cpp en sources. Build verificado. Tabs DAG List/Detail/Run Detail con data_table::render() en commits siguientes. Co-Authored-By: Claude Opus 4.7 (1M context) --- CMakeLists.txt | 1 + main.cpp | 101 +++++++++++-- ws_client.cpp | 404 +++++++++++++++++++++++++++++++++++++++++++++++++ ws_client.h | 75 +++++++++ 4 files changed, 567 insertions(+), 14 deletions(-) create mode 100644 ws_client.cpp create mode 100644 ws_client.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 30d4305..2fd3cfb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,7 @@ add_imgui_app(dag_engine_ui main.cpp http_client.cpp data_http.cpp + ws_client.cpp ) target_include_directories(dag_engine_ui PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/main.cpp b/main.cpp index 0175588..5929e6b 100644 --- a/main.cpp +++ b/main.cpp @@ -4,19 +4,67 @@ #include "core/icons_tabler.h" #include "core/logger.h" #include "data_http.h" +#include "ws_client.h" +#include "vendor/nlohmann/json.hpp" #include #include -// Config global del backend HTTP. Persistido por imgui.ini via Mantine-no-go. -static std::string g_api_url = "http://127.0.0.1:8090"; +using json = nlohmann::json; -// Cache en memoria del primer fetch. Tabs proximos updates via WS. -static std::vector g_dags; -static std::string g_last_error; +// Config global del backend HTTP. +static std::string g_api_url = "http://127.0.0.1:8090"; +static std::string g_ws_host = "127.0.0.1"; +static int g_ws_port = 8090; +static std::string g_ws_path = "/api/ws/dagruns"; + +// Cache en memoria del primer fetch + ultimos eventos WS. +static std::vector g_dags; +static std::vector g_live_runs; // upsert por id desde WS +static long long g_ws_runs_wm = 0; +static long long g_ws_steps_wm = 0; +static int g_ws_msg_count = 0; +static std::string g_last_error; + +static WsClient g_ws; // Toggles de paneles (visibles desde el menu View del menubar canonico) static bool g_show_main = true; +static bool g_show_live = true; + +// Upsert por id en g_live_runs. +static void upsert_live_run(const dag_ui::DagRunRow& r) { + for (auto& existing : g_live_runs) { + if (existing.id == r.id) { + existing = r; + return; + } + } + g_live_runs.push_back(r); +} + +static void parse_ws_payload(const std::string& payload) { + auto j = json::parse(payload, nullptr, false); + if (!j.is_object()) return; + g_ws_msg_count++; + if (j.contains("watermark") && j["watermark"].is_object()) { + if (j["watermark"].contains("runs")) g_ws_runs_wm = j["watermark"]["runs"].get(); + if (j["watermark"].contains("steps")) g_ws_steps_wm = j["watermark"]["steps"].get(); + } + if (j.contains("runs") && j["runs"].is_array()) { + for (auto& rj : j["runs"]) { + dag_ui::DagRunRow r; + r.id = rj.value("id", ""); + r.dag_name = rj.value("dag_name", ""); + r.status = rj.value("status", ""); + r.trigger = rj.value("trigger", ""); + r.started_at = rj.value("started_at", ""); + r.finished_at = rj.value("finished_at", ""); + r.error = rj.value("error", ""); + upsert_live_run(r); + } + } +} static void draw_main() { if (!ImGui::Begin(TI_HOME " Main", &g_show_main)) { @@ -45,21 +93,46 @@ static void draw_main() { ImGui::End(); } -static void render() { - // El framework dibuja menubar (View/Layouts/Settings/About) y un - // DockSpaceOverViewport central (auto_dockspace=true por defecto). - // Aqui solo se dibujan los paneles propios de la app. - if (g_show_main) draw_main(); +static void draw_live() { + if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) { + ImGui::End(); + return; + } + bool connected = g_ws.is_connected(); + ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1), + "%s", connected ? "connected" : "disconnected"); + ImGui::SameLine(); + ImGui::Text("| msgs=%d | wm runs=%lld steps=%lld", + g_ws_msg_count, g_ws_runs_wm, g_ws_steps_wm); + ImGui::Separator(); + ImGui::Text("Live runs: %zu", g_live_runs.size()); + for (auto& r : g_live_runs) { + ImGui::BulletText("%s [%s] %s @ %s", + r.dag_name.c_str(), r.status.c_str(), + r.id.c_str(), r.started_at.c_str()); + } + ImGui::End(); +} - // === Data panel (uncomment to enable) === - // static data_table::State data_state; - // static std::vector data_tables; // populate from your source - // data_table::render("main_data", data_tables, data_state); +static void render() { + // Drain WS messages this frame (cheap, max 64). + { + std::vector msgs; + g_ws.drain(msgs, 64); + for (auto& m : msgs) parse_ws_payload(m); + } + + if (g_show_main) draw_main(); + if (g_show_live) draw_live(); } int main(int /*argc*/, char** /*argv*/) { + // Conecta WS al backend dag_engine. Reconnect con backoff lo gestiona WsClient. + g_ws.start(g_ws_host, g_ws_port, g_ws_path); + static fn_ui::PanelToggle panels[] = { { "Main", nullptr, &g_show_main }, + { "Live (WS)", nullptr, &g_show_live }, }; fn::AppConfig cfg; diff --git a/ws_client.cpp b/ws_client.cpp new file mode 100644 index 0000000..7a6d14e --- /dev/null +++ b/ws_client.cpp @@ -0,0 +1,404 @@ +#include "ws_client.h" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef _WIN32 +#include +#include +#pragma comment(lib, "ws2_32.lib") +typedef SOCKET sock_t; +#define SOCK_INVALID INVALID_SOCKET +#define SOCK_CLOSE closesocket +#define SOCK_ERR WSAGetLastError() +#define FN_SOCK_NONBLOCK(s) do { u_long m = 1; ioctlsocket((s), FIONBIO, &m); } while (0) +#else +#include +#include +#include +#include +#include +#include +typedef int sock_t; +#define SOCK_INVALID (-1) +#define SOCK_CLOSE close +#define SOCK_ERR errno +#define FN_SOCK_NONBLOCK(s) do { int f = fcntl((s), F_GETFL, 0); fcntl((s), F_SETFL, f | O_NONBLOCK); } while (0) +#endif + +#ifdef _WIN32 +static bool wsa_init_ws() { + static bool inited = false; + static std::once_flag flag; + std::call_once(flag, []() { + WSADATA wsa; + WSAStartup(MAKEWORD(2, 2), &wsa); + }); + return true; +} +#endif + +namespace { + +// ----- Base64 (small, sufficient for 16-byte WS key) ----- +const char* kBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +std::string base64_encode(const uint8_t* in, size_t len) { + std::string out; + out.reserve(((len + 2) / 3) * 4); + size_t i = 0; + for (; i + 3 <= len; i += 3) { + uint32_t v = (uint32_t(in[i]) << 16) | (uint32_t(in[i + 1]) << 8) | uint32_t(in[i + 2]); + out.push_back(kBase64[(v >> 18) & 63]); + out.push_back(kBase64[(v >> 12) & 63]); + out.push_back(kBase64[(v >> 6) & 63]); + out.push_back(kBase64[v & 63]); + } + if (i < len) { + uint32_t v = uint32_t(in[i]) << 16; + if (i + 1 < len) v |= uint32_t(in[i + 1]) << 8; + out.push_back(kBase64[(v >> 18) & 63]); + out.push_back(kBase64[(v >> 12) & 63]); + out.push_back(i + 1 < len ? kBase64[(v >> 6) & 63] : '='); + out.push_back('='); + } + return out; +} + +// Send exactly n bytes (blocking). +bool send_all(sock_t sock, const char* data, size_t n) { + size_t sent = 0; + while (sent < n) { + int k = send(sock, data + sent, static_cast(n - sent), 0); + if (k <= 0) return false; + sent += k; + } + return true; +} + +// Receive exactly n bytes (blocking on a non-non-blocking socket). +bool recv_all(sock_t sock, char* data, size_t n) { + size_t got = 0; + while (got < n) { + int k = recv(sock, data + got, static_cast(n - got), 0); + if (k <= 0) return false; + got += k; + } + return true; +} + +// Receive up to n bytes; returns count, or -1 on error / -2 on would-block. +int recv_some(sock_t sock, char* data, size_t n) { + int k = recv(sock, data, static_cast(n), 0); + if (k > 0) return k; + if (k == 0) return -1; +#ifdef _WIN32 + if (WSAGetLastError() == WSAEWOULDBLOCK) return -2; +#else + if (errno == EAGAIN || errno == EWOULDBLOCK) return -2; +#endif + return -1; +} + +} // namespace + +WsClient::WsClient() = default; + +WsClient::~WsClient() { + stop(); +} + +void WsClient::start(const std::string& host, int port, const std::string& path) { + State expected = State::Idle; + if (!state_.compare_exchange_strong(expected, State::Connecting)) return; + + host_ = host; + port_ = port; + path_ = path; + stop_flag_.store(false); + + worker_ = std::thread([this]() { this->run(); }); +} + +void WsClient::stop() { + stop_flag_.store(true); + int s = sock_.exchange(-1); + if (s != -1) SOCK_CLOSE(static_cast(s)); + out_cv_.notify_all(); + if (worker_.joinable()) worker_.join(); + state_.store(State::Stopped); +} + +int WsClient::drain(std::vector& out, int max) { + std::lock_guard g(in_mu_); + int n = 0; + while (!in_queue_.empty() && n < max) { + out.emplace_back(std::move(in_queue_.front())); + in_queue_.pop_front(); + n++; + } + return n; +} + +bool WsClient::send_text(const std::string& payload) { + if (state_.load() != State::Connected) return false; + { + std::lock_guard g(out_mu_); + out_queue_.push_back(payload); + } + out_cv_.notify_one(); + return true; +} + +void WsClient::run() { + int backoff_ms = 500; + while (!stop_flag_.load()) { + state_.store(State::Connecting); + if (connect_once()) { + backoff_ms = 500; // reset on successful connect + state_.store(State::Connected); + read_loop(); + } + state_.store(State::Backoff); + if (stop_flag_.load()) break; + + // Exponential backoff: 0.5s → 1s → 2s → 4s → 8s (cap). + for (int slept = 0; slept < backoff_ms && !stop_flag_.load(); slept += 100) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + backoff_ms = std::min(backoff_ms * 2, 8000); + } + state_.store(State::Stopped); +} + +bool WsClient::connect_once() { +#ifdef _WIN32 + wsa_init_ws(); +#endif + + sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (sock == SOCK_INVALID) return false; + + // 5s connect timeout via SO_*TIMEO. Stays blocking afterwards for the + // handshake; read_loop switches to non-blocking with select(). +#ifdef _WIN32 + DWORD timeout_ms = 5000; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms)); + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms)); +#else + struct timeval tv; + tv.tv_sec = 5; + tv.tv_usec = 0; + setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); +#endif + + struct sockaddr_in addr; + std::memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(static_cast(port_)); + addr.sin_addr.s_addr = inet_addr(host_.c_str()); + + if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) { + SOCK_CLOSE(sock); + return false; + } + + sock_.store(static_cast(sock)); + + if (!handshake()) { + int s = sock_.exchange(-1); + if (s != -1) SOCK_CLOSE(static_cast(s)); + return false; + } + + // Non-blocking for the read loop. + FN_SOCK_NONBLOCK(sock); + return true; +} + +bool WsClient::handshake() { + sock_t sock = static_cast(sock_.load()); + + // 16 random bytes → base64 → Sec-WebSocket-Key. + uint8_t key_raw[16]; + std::random_device rd; + for (auto& b : key_raw) b = static_cast(rd() & 0xff); + std::string key_b64 = base64_encode(key_raw, sizeof(key_raw)); + + std::ostringstream req; + req << "GET " << path_ << " HTTP/1.1\r\n"; + req << "Host: " << host_ << ":" << port_ << "\r\n"; + req << "Upgrade: websocket\r\n"; + req << "Connection: Upgrade\r\n"; + req << "Sec-WebSocket-Key: " << key_b64 << "\r\n"; + req << "Sec-WebSocket-Version: 13\r\n"; + req << "Origin: http://" << host_ << ":" << port_ << "\r\n"; + req << "\r\n"; + + std::string raw = req.str(); + if (!send_all(sock, raw.c_str(), raw.size())) return false; + + // Read response headers (up to 4KB). + std::string resp; + char buf[1024]; + while (resp.find("\r\n\r\n") == std::string::npos) { + int k = recv(sock, buf, sizeof(buf), 0); + if (k <= 0) return false; + resp.append(buf, k); + if (resp.size() > 4096) return false; + } + + // Expect "HTTP/1.1 101". + if (resp.compare(0, 12, "HTTP/1.1 101") != 0 && + resp.compare(0, 12, "HTTP/1.0 101") != 0) { + fprintf(stderr, "[ws] handshake failed: %.*s\n", + (int)std::min(resp.size(), 120), resp.c_str()); + return false; + } + // We intentionally skip Sec-WebSocket-Accept verification — controlled + // server, localhost-only, 101 status is enough for this app. + return true; +} + +bool WsClient::read_loop() { + sock_t sock = static_cast(sock_.load()); + + std::vector rb; // accumulated read buffer + rb.reserve(64 * 1024); + + while (!stop_flag_.load()) { + // Block on select() for up to 100ms so we can both read and check + // the outgoing queue. + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(sock, &rfds); + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100 * 1000; + int sel = select(static_cast(sock) + 1, &rfds, nullptr, nullptr, &tv); + if (sel < 0) return false; + + if (sel > 0 && FD_ISSET(sock, &rfds)) { + uint8_t tmp[8192]; + int k = recv_some(sock, reinterpret_cast(tmp), sizeof(tmp)); + if (k == -1) return false; + if (k > 0) rb.insert(rb.end(), tmp, tmp + k); + } + + // Drain outgoing queue (text frames, masked). + for (;;) { + std::string payload; + { + std::lock_guard g(out_mu_); + if (out_queue_.empty()) break; + payload = std::move(out_queue_.front()); + out_queue_.pop_front(); + } + if (!send_frame(0x1, payload)) return false; + } + + // Parse frames. RFC6455 minimal: assume server never masks, no + // continuation, opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong. + while (rb.size() >= 2) { + uint8_t b0 = rb[0]; + uint8_t b1 = rb[1]; + bool fin = (b0 & 0x80) != 0; + (void)fin; + int opcode = b0 & 0x0F; + bool mask = (b1 & 0x80) != 0; + uint64_t len = b1 & 0x7F; + size_t pos = 2; + if (len == 126) { + if (rb.size() < pos + 2) break; + len = (uint64_t(rb[pos]) << 8) | uint64_t(rb[pos + 1]); + pos += 2; + } else if (len == 127) { + if (rb.size() < pos + 8) break; + len = 0; + for (int i = 0; i < 8; i++) len = (len << 8) | rb[pos + i]; + pos += 8; + } + uint8_t mkey[4] = {0, 0, 0, 0}; + if (mask) { + if (rb.size() < pos + 4) break; + for (int i = 0; i < 4; i++) mkey[i] = rb[pos + i]; + pos += 4; + } + if (rb.size() < pos + len) break; + + std::string payload; + payload.resize(static_cast(len)); + for (size_t i = 0; i < len; i++) { + uint8_t c = rb[pos + i]; + if (mask) c ^= mkey[i & 3]; + payload[i] = static_cast(c); + } + rb.erase(rb.begin(), rb.begin() + pos + len); + + switch (opcode) { + case 0x1: { // text + last_event_ts_.store(std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()).count()); + std::lock_guard g(in_mu_); + in_queue_.emplace_back(std::move(payload)); + // Bound the queue. Drop oldest if too big — UI consumes + // each frame so this should only kick in if the dashboard + // is paused (window minimized, etc.). + while (in_queue_.size() > 512) in_queue_.pop_front(); + break; + } + case 0x8: // close + return false; + case 0x9: // ping → reply with pong (same payload) + if (!send_frame(0xA, payload)) return false; + break; + case 0xA: // pong, ignore + break; + default: + // 0x0 continuation or unexpected opcode: bail (server + // controlled by us, shouldn't happen). + return false; + } + } + } + return true; +} + +bool WsClient::send_frame(int opcode, const std::string& payload) { + sock_t sock = static_cast(sock_.load()); + if (sock == SOCK_INVALID || static_cast(sock) < 0) return false; + + std::vector frame; + frame.reserve(payload.size() + 16); + frame.push_back(static_cast(0x80 | (opcode & 0x0F))); // FIN + opcode + + uint64_t len = payload.size(); + if (len < 126) { + frame.push_back(static_cast(0x80 | len)); // mask bit set + } else if (len <= 0xFFFF) { + frame.push_back(static_cast(0x80 | 126)); + frame.push_back(static_cast((len >> 8) & 0xFF)); + frame.push_back(static_cast(len & 0xFF)); + } else { + frame.push_back(static_cast(0x80 | 127)); + for (int i = 7; i >= 0; i--) frame.push_back(static_cast((len >> (8 * i)) & 0xFF)); + } + + // Mask key (RFC requires mask for client → server frames). + std::random_device rd; + uint8_t mkey[4]; + for (auto& b : mkey) b = static_cast(rd() & 0xff); + for (int i = 0; i < 4; i++) frame.push_back(mkey[i]); + + for (size_t i = 0; i < payload.size(); i++) { + frame.push_back(static_cast(payload[i]) ^ mkey[i & 3]); + } + + return send_all(sock, reinterpret_cast(frame.data()), frame.size()); +} diff --git a/ws_client.h b/ws_client.h new file mode 100644 index 0000000..0e7f69d --- /dev/null +++ b/ws_client.h @@ -0,0 +1,75 @@ +#pragma once +// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the +// Monitor tab. Background thread does connect + handshake + read loop and +// pushes incoming text payloads into a thread-safe queue that the ImGui +// thread drains each frame. +// +// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app +// needs WS later, extract this file to cpp/functions/network/ via +// fn-constructor. + +#include +#include +#include +#include +#include +#include +#include +#include + +class WsClient { +public: + WsClient(); + ~WsClient(); + + // Non-blocking. Spawns background thread that connects to ws://host:port/path + // and keeps the connection alive with exponential reconnect backoff. + // Safe to call multiple times — second call is a no-op once running. + void start(const std::string& host, int port, const std::string& path); + + // Stop the background thread and tear down the connection. + void stop(); + + // True while a WS connection is up and handshake completed. + bool is_connected() const { return state_.load() == State::Connected; } + + // Epoch seconds of last received frame (for the live LED in the Monitor). + long long last_event_ts() const { return last_event_ts_.load(); } + + // Drain at most `max` queued text payloads. Returns the number drained. + // Called once per frame from the render thread. + int drain(std::vector& out, int max = 64); + + // Send a text frame to the server. Returns true if queued for sending. + // Used to send {"watermark": N} commands on reconnect. + bool send_text(const std::string& payload); + +private: + enum class State { Idle, Connecting, Connected, Backoff, Stopped }; + + void run(); + bool connect_once(); + bool handshake(); + bool read_loop(); + bool send_frame(int opcode, const std::string& payload); + + std::string host_; + int port_ = 0; + std::string path_; + + std::atomic state_{State::Idle}; + std::atomic last_event_ts_{0}; + std::atomic sock_{-1}; + + std::thread worker_; + std::atomic stop_flag_{false}; + + std::mutex in_mu_; + std::deque in_queue_; + + std::mutex out_mu_; + std::deque out_queue_; + + // For waking the writer side of read_loop when send_text is called. + std::condition_variable out_cv_; +};