merge: issue/0086-monitor-tab — Monitor tab + WS live stream

UI:
- Tab Monitor first/default in TabBar
- 5 KPIs (Calls/Errors/Violations/Copies/Versions)
- Recent Executions table with timestamps
- Date filter (1h/24h/7d/30d/All)
- Live LED + last-event indicator

Backend:
- Hand-rolled RFC6455 WS client (ws_client.{h,cpp})
- Connects to sqlite_api /api/events/call_monitor
- Snapshot + delta application to g_data.claude
- Reconnect with exponential backoff

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-14 00:35:38 +02:00
10 changed files with 1261 additions and 8 deletions
+1
View File
@@ -19,6 +19,7 @@ add_imgui_app(registry_dashboard
data.cpp
data_http.cpp
http_client.cpp
ws_client.cpp
views.cpp
${CMAKE_SOURCE_DIR}/functions/viz/kpi_card.cpp
${CMAKE_SOURCE_DIR}/functions/viz/bar_chart.cpp
+51
View File
@@ -95,6 +95,9 @@ cd cpp && cmake -B build/windows -S . -DCMAKE_TOOLCHAIN_FILE=toolchains/mingw-w6
- [ ] Filtros interactivos por lenguaje/dominio en sidebar
- [ ] Busqueda FTS5 integrada via API
- [ ] Detalles de funcion al hacer click en tabla
- [ ] Extraer ws_client a `cpp/functions/network/` cuando un segundo app C++ necesite WS
- [ ] SHA1 + Sec-WebSocket-Accept verification (hoy se confia en el handshake 101)
- [ ] Migrar `nhooyr.io/websocket``github.com/coder/websocket` (mismo paquete, deprecation upstream)
## Notas
@@ -142,6 +145,54 @@ El cross-compile pasa de thread model `win32` a `posix` (`x86_64-w64-mingw32-g++
- `CMakeLists.txt` limpiado: `fps_overlay.cpp` y `tokens.cpp` ya viven en `fn_framework` — no listarlos explicitamente o el linker da multiple-definition.
- 5 TTFs (Karla/Roboto/DroidSans/Cousine/Tabler) copiadas junto al exe via `add_imgui_app` post-build.
## Fase — Monitor tab + WebSocket live stream `[done 2026-05-14, issue 0086]`
Rebranding "Claude Usage" → **Monitor**, ahora primera y por defecto en el TabBar.
Es el landing del bucke reactivo (construir → ejecutar → recopilar → analizar → mejorar).
Nuevos elementos UI:
- **Toolbar interna** del Monitor con preset de ventana temporal (1h / 24h / 7d / 30d / All),
boton Refresh manual, LED `live`/`offline` con timestamp del ultimo evento WS.
- **5 KPI cards** (era 4): añadido "Errors" derivado de `COUNT(*) FROM calls WHERE success = 0`
filtrado por la ventana activa.
- **Sub-tab "Recent Executions"** (la primera) con columnas: When, Function, Tool, ms, OK, Error.
Backed by `calls` table, sorted by ts DESC, limit 100, filtrada por ventana.
- Violations sub-tab gana columna "When" con ts formateado.
Pipeline en vivo (low-latency, push-based):
```
Hook PostToolUse ──► call_monitor CLI ──► INSERT calls (siempre, sincrono)
ops:call_monitor.db
│ (ticker 250ms cuando subs>0)
sqlite_api /api/events/call_monitor ──► WS hub ──► subscribers (dashboards)
```
- **sqlite_api**: nuevo endpoint `GET /api/events/call_monitor`. Hub gestiona subscribers,
ticker arranca solo con >=1 sub (cero overhead si no hay dashboards). Cliente recibe
snapshot inicial (KPIs + 100 ultimas) y luego deltas (`id > watermark`). Intervalo
adaptativo: 250ms activo → 2s idle (tras 30s sin eventos).
- **registry_dashboard**: cliente WS minimal RFC6455 en `ws_client.{h,cpp}` (no TLS,
thread propio, reconnect exponencial 0.5s → 8s). `main.cpp` consume deltas y los aplica
a `g_data.claude` (incrementa KPIs, anade filas, dedup por id).
- **Fallback**: si WS cae, los datos siguen registrandose en SQLite via call_monitor CLI.
Al reconectar, el cliente puede mandar `{"watermark": N}` para reanudar sin perder eventos.
Cambios en `data.{h,cpp}` y `data_http.{h,cpp}`:
- `RecentExecutionRow`, `window_secs`, `ws_connected`, `last_event_ts`, `last_seen_call_id`
en `ClaudeUsageData`.
- `load_claude_usage_http(api, out, window_secs)` filtra `calls` y `violations` por ventana.
- `load_recent_executions_http()` standalone para refetch parcial (preparado para WS,
no esta cableado todavia — actualmente las deltas WS bastan).
Decisiones de scope:
- Sec-WebSocket-Accept verification se omite (server controlado, localhost only).
- TLS fuera (ws://, no wss://).
- WS client no extraido al registry todavia: hasta que un segundo app C++ lo necesite.
## Notas — Settings submenu + Git column (sesion 2026-04-28)
- `fn_ui::app_menubar` reemplaza el item plano `Settings...` por un `BeginMenu("Settings")` con dos subitems: `Settings...` (existente) y `About...` (nuevo modulo `app_about_cpp_core`). El registry_dashboard cablea la info via `fn_ui::about_window_set_info("fn_registry Dashboard", "0.2.0", "Dashboard ImGui...")` antes de `fn::run_app`.
+61
View File
@@ -140,6 +140,66 @@ struct ProjectDetail {
};
// All data loaded from registry.db in one shot
// Issue 0085: Claude usage telemetry rows from call_monitor.operations.db.
struct ClaudeUsageRow {
std::string function_id;
int calls_total = 0;
int calls_7d = 0;
int errors_total = 0;
double error_rate = 0.0;
double mean_duration_ms = 0.0;
};
struct ClaudeViolationRow {
std::string rule_id;
std::string function_id;
std::string command_snippet;
std::string severity;
long long ts = 0;
};
struct ClaudeCopiedRow {
std::string app_file;
std::string app_function;
std::string registry_id;
std::string kind;
double similarity = 1.0;
};
// Una invocacion concreta del registro de telemetria. Lo que el agente lanzo,
// cuanto tardo, si fallo. Usado por la tabla "Recent Executions" del Monitor.
struct RecentExecutionRow {
long long id = 0; // calls.id (watermark para WS deltas)
long long ts = 0; // epoch seconds
std::string function_id;
std::string tool_used; // mcp / fn_cli_run / bash / heredoc / ...
int duration_ms = 0;
bool success = true;
std::string error_class;
std::string session_id;
};
struct ClaudeUsageData {
bool available = false; // true if call_monitor.operations.db is reachable
int total_calls = 0;
int total_errors = 0;
int total_violations = 0;
int total_copies = 0;
int total_versions = 0;
std::vector<ClaudeUsageRow> top_functions; // top 20 by calls_total
std::vector<ClaudeViolationRow> recent_violations; // last 20
std::vector<ClaudeCopiedRow> copies;
std::vector<RecentExecutionRow> recent_executions; // last N within window
// Filtro de fecha. 0 = All. Otros valores en segundos.
int window_secs = 86400; // default 24h
// WS live state. true cuando hay conexion WS activa al hub de eventos.
bool ws_connected = false;
long long last_event_ts = 0; // ultimo ts recibido por WS
long long last_seen_call_id = 0; // watermark (max id procesado)
};
struct RegistryData {
RegistryStats stats;
std::vector<LangCount> by_lang;
@@ -151,6 +211,7 @@ struct RegistryData {
std::vector<AnalysisRow> analyses;
std::vector<TypeRow> types;
std::vector<ProjectRow> projects;
ClaudeUsageData claude;
int orphan_apps = 0;
int orphan_analyses = 0;
int orphan_vaults = 0;
+191
View File
@@ -536,3 +536,194 @@ bool http_post_add_vault(const std::string& api_url,
b["description"] = description;
return post_json(api_url, "/api/add/vault", b, out_body);
}
// ---- Issue 0085d: Claude usage telemetry ----
// Query against ops:call_monitor instead of registry.
static json call_monitor_query(HttpClient& cli, const char* sql) {
json body;
body["sql"] = sql;
auto res = cli.post("/api/databases/ops:call_monitor/query", body.dump(), "application/json");
if (!res.ok()) {
return nullptr;
}
return json::parse(res.body, nullptr, false);
}
static double extract_row_double(const json& row, size_t idx) {
if (idx >= row.size() || row[idx].is_null()) return 0.0;
if (row[idx].is_number()) return row[idx].get<double>();
if (row[idx].is_string()) return std::atof(row[idx].get<std::string>().c_str());
return 0.0;
}
// Construye un filtro temporal `WHERE ts >= ?` literal embebido (no prepared)
// reemplazando el placeholder. window_secs == 0 -> sin filtro.
static std::string ts_filter(int window_secs, const char* col = "ts",
const char* glue = "WHERE") {
if (window_secs <= 0) return "";
char buf[128];
std::snprintf(buf, sizeof(buf), " %s %s >= (strftime('%%s','now') - %d) ",
glue, col, window_secs);
return std::string(buf);
}
bool load_claude_usage_http(const std::string& api_url, RegistryData& out,
int window_secs) {
// Preservar window y estado WS al recargar.
bool prev_ws = out.claude.ws_connected;
long long prev_last_ev = out.claude.last_event_ts;
long long prev_max_id = out.claude.last_seen_call_id;
out.claude = ClaudeUsageData{};
out.claude.window_secs = window_secs;
out.claude.ws_connected = prev_ws;
out.claude.last_event_ts = prev_last_ev;
out.claude.last_seen_call_id = prev_max_id;
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
// Probe: is ops:call_monitor known?
auto probe = cli.get("/api/databases/ops:call_monitor/tables");
if (!probe.ok()) {
out.claude.available = false;
return true; // not an error: monitor not yet deployed
}
out.claude.available = true;
const std::string wf_calls = ts_filter(window_secs); // " WHERE ts >= ..."
const std::string wf_viol = ts_filter(window_secs);
const std::string wf_calls_and = wf_calls.empty()
? std::string(" WHERE success = 0 ")
: std::string(wf_calls + " AND success = 0 ");
// Totals (filtradas por ventana donde aplica)
{
const std::string sql_calls = "SELECT COUNT(*) FROM calls" + wf_calls;
out.claude.total_calls = extract_int(call_monitor_query(cli, sql_calls.c_str()));
}
{
const std::string sql_err = "SELECT COUNT(*) FROM calls" + wf_calls_and;
out.claude.total_errors = extract_int(call_monitor_query(cli, sql_err.c_str()));
}
{
const std::string sql_viol = "SELECT COUNT(*) FROM violations" + wf_viol;
out.claude.total_violations = extract_int(call_monitor_query(cli, sql_viol.c_str()));
}
out.claude.total_copies = extract_int(call_monitor_query(cli, "SELECT COUNT(*) FROM copied_code"));
out.claude.total_versions = extract_int(call_monitor_query(cli, "SELECT COUNT(*) FROM function_versions"));
// Recent executions (calls table) ordenada por ts DESC
{
std::string sql = "SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id "
"FROM calls" + wf_calls + " ORDER BY ts DESC LIMIT 100";
json rx = call_monitor_query(cli, sql.c_str());
if (rx.is_object() && rx.contains("rows")) {
long long mx = out.claude.last_seen_call_id;
for (const auto& r : rx["rows"]) {
RecentExecutionRow row;
row.id = (long long)extract_row_int(r, 0);
row.ts = (long long)extract_row_int(r, 1);
row.function_id = extract_str(r, 2);
row.tool_used = extract_str(r, 3);
row.duration_ms = extract_row_int(r, 4);
row.success = extract_row_int(r, 5) != 0;
row.error_class = extract_str(r, 6);
row.session_id = extract_str(r, 7);
if (row.id > mx) mx = row.id;
out.claude.recent_executions.push_back(row);
}
out.claude.last_seen_call_id = mx;
}
}
// Top functions by calls_total
json top = call_monitor_query(cli,
"SELECT function_id, calls_total, calls_7d, errors_total, error_rate, mean_duration_ms "
"FROM function_stats ORDER BY calls_total DESC LIMIT 20");
if (top.is_object() && top.contains("rows")) {
for (const auto& r : top["rows"]) {
ClaudeUsageRow row;
row.function_id = extract_str(r, 0);
row.calls_total = extract_row_int(r, 1);
row.calls_7d = extract_row_int(r, 2);
row.errors_total = extract_row_int(r, 3);
row.error_rate = extract_row_double(r, 4);
row.mean_duration_ms = extract_row_double(r, 5);
out.claude.top_functions.push_back(row);
}
}
// Recent violations (filtradas por ventana)
std::string sql_viol_list = "SELECT rule_id, function_id, command_snippet, severity, ts "
"FROM violations" + wf_viol + " ORDER BY ts DESC LIMIT 20";
json viol = call_monitor_query(cli, sql_viol_list.c_str());
if (viol.is_object() && viol.contains("rows")) {
for (const auto& r : viol["rows"]) {
ClaudeViolationRow row;
row.rule_id = extract_str(r, 0);
row.function_id = extract_str(r, 1);
row.command_snippet = extract_str(r, 2);
row.severity = extract_str(r, 3);
row.ts = (long long)extract_row_int(r, 4);
out.claude.recent_violations.push_back(row);
}
}
// Copied code matches
json cp = call_monitor_query(cli,
"SELECT app_file, app_function, registry_id, kind, similarity "
"FROM copied_code ORDER BY detected_at DESC LIMIT 50");
if (cp.is_object() && cp.contains("rows")) {
for (const auto& r : cp["rows"]) {
ClaudeCopiedRow row;
row.app_file = extract_str(r, 0);
row.app_function = extract_str(r, 1);
row.registry_id = extract_str(r, 2);
row.kind = extract_str(r, 3);
row.similarity = extract_row_double(r, 4);
out.claude.copies.push_back(row);
}
}
return true;
}
bool load_recent_executions_http(const std::string& api_url,
int window_secs, int limit,
std::vector<RecentExecutionRow>& out,
long long& out_max_id) {
out.clear();
out_max_id = 0;
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
const std::string wf = ts_filter(window_secs);
char lim_buf[32];
std::snprintf(lim_buf, sizeof(lim_buf), " LIMIT %d", limit > 0 ? limit : 100);
std::string sql = "SELECT id, ts, function_id, tool_used, duration_ms, success, error_class, session_id "
"FROM calls" + wf + " ORDER BY ts DESC" + lim_buf;
json rx = call_monitor_query(cli, sql.c_str());
if (!rx.is_object() || !rx.contains("rows")) return false;
for (const auto& r : rx["rows"]) {
RecentExecutionRow row;
row.id = (long long)extract_row_int(r, 0);
row.ts = (long long)extract_row_int(r, 1);
row.function_id = extract_str(r, 2);
row.tool_used = extract_str(r, 3);
row.duration_ms = extract_row_int(r, 4);
row.success = extract_row_int(r, 5) != 0;
row.error_class = extract_str(r, 6);
row.session_id = extract_str(r, 7);
if (row.id > out_max_id) out_max_id = row.id;
out.push_back(row);
}
return true;
}
+22
View File
@@ -35,6 +35,28 @@ bool load_unit_tests_http(const std::string& api_url,
const std::string& function_id,
std::vector<UnitTestRow>& out);
// Issue 0085d: Load Claude usage telemetry from ops:call_monitor (top
// functions, recent violations, copied code). Si la BD no esta disponible
// setea out.claude.available = false sin error.
//
// `window_secs`: ventana hacia atras desde now. 0 = sin filtro (All).
// Aplica a total_calls, total_errors, top_functions, recent_executions,
// recent_violations. total_versions/total_copies son acumulados (no filtran).
bool load_claude_usage_http(const std::string& api_url, RegistryData& out,
int window_secs);
// Variante legacy (window default 24h). Usar la version con window_secs.
inline bool load_claude_usage_http(const std::string& api_url, RegistryData& out) {
return load_claude_usage_http(api_url, out, 86400);
}
// Carga la ventana de "Recent Executions" (calls table) ordenada por ts DESC.
// Filtrada por window_secs (0 = sin filtro). limit = max filas (default 100).
bool load_recent_executions_http(const std::string& api_url,
int window_secs, int limit,
std::vector<RecentExecutionRow>& out,
long long& out_max_id);
// Operaciones de mutacion (thread-safe porque http_post ya lo es).
// Devuelven el body de respuesta en `out_body`. true si HTTP status 2xx.
bool http_post_reindex(const std::string& api_url, std::string& out_body);
+162 -5
View File
@@ -8,27 +8,152 @@
#include "data.h"
#include "data_http.h"
#include "views.h"
#include "ws_client.h"
#include "nlohmann/json.hpp"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <fstream>
#include <vector>
static RegistryData g_data;
static std::string g_db_path;
static std::string g_api_url;
static bool g_loaded = false;
static bool g_using_http = false;
static WsClient g_ws;
// Parse "http://host:port" → host, port. Devuelve false si no encaja.
static bool parse_http_url(const std::string& url, std::string& host, int& port) {
static const char* kPrefix = "http://";
if (url.rfind(kPrefix, 0) != 0) return false;
std::string rest = url.substr(std::strlen(kPrefix));
auto colon = rest.find(':');
if (colon == std::string::npos) {
host = rest;
port = 80;
return true;
}
host = rest.substr(0, colon);
auto slash = rest.find('/', colon + 1);
std::string port_str = (slash == std::string::npos)
? rest.substr(colon + 1)
: rest.substr(colon + 1, slash - colon - 1);
port = std::atoi(port_str.c_str());
return port > 0;
}
// Aplica un mensaje JSON recibido por WS a g_data.claude. Tipos:
// - "snapshot": reemplaza KPIs y la lista entera de recent_executions.
// - "delta": append rows (front), dedup por id, recalcula KPIs.
// Devuelve true si el mensaje era valido.
static bool apply_ws_message(const std::string& raw) {
using nlohmann::json;
json msg = json::parse(raw, nullptr, false);
if (!msg.is_object()) return false;
const std::string type = msg.value("type", "");
if (type != "snapshot" && type != "delta") return false;
if (msg.contains("server_time") && msg["server_time"].is_number_integer()) {
g_data.claude.last_event_ts = msg["server_time"].get<long long>();
}
if (msg.contains("watermark") && msg["watermark"].is_number_integer()) {
long long w = msg["watermark"].get<long long>();
if (w > g_data.claude.last_seen_call_id) g_data.claude.last_seen_call_id = w;
}
// Snapshot reemplaza KPIs. Delta los actualiza por incremento.
if (type == "snapshot" && msg.contains("stats") && msg["stats"].is_object()) {
const auto& s = msg["stats"];
g_data.claude.total_calls = s.value("total_calls", 0);
g_data.claude.total_errors = s.value("total_errors", 0);
g_data.claude.total_violations = s.value("total_violations", 0);
g_data.claude.total_copies = s.value("total_copies", 0);
g_data.claude.total_versions = s.value("total_versions", 0);
g_data.claude.available = true;
}
if (!msg.contains("calls") || !msg["calls"].is_array()) return true;
if (type == "snapshot") {
g_data.claude.recent_executions.clear();
}
// Construye filas nuevas
std::vector<RecentExecutionRow> incoming;
incoming.reserve(msg["calls"].size());
int new_errors = 0;
for (const auto& c : msg["calls"]) {
RecentExecutionRow row;
row.id = c.value("id", 0LL);
row.ts = c.value("ts", 0LL);
row.function_id = c.value("function_id", "");
row.tool_used = c.value("tool_used", "");
row.duration_ms = c.value("duration_ms", 0);
row.success = c.value("success", true);
row.error_class = c.value("error_class", "");
row.session_id = c.value("session_id", "");
if (!row.success) new_errors++;
incoming.push_back(std::move(row));
}
if (type == "delta") {
g_data.claude.total_calls += static_cast<int>(incoming.size());
g_data.claude.total_errors += new_errors;
}
// Prepend (newer al frente). Para delta: filas vienen ASC del server,
// las anadimos al frente en orden inverso. Para snapshot: ya vienen DESC.
if (type == "delta") {
for (auto it = incoming.rbegin(); it != incoming.rend(); ++it) {
g_data.claude.recent_executions.insert(
g_data.claude.recent_executions.begin(), std::move(*it));
}
} else {
for (auto& row : incoming) {
g_data.claude.recent_executions.push_back(std::move(row));
}
}
// Cap list (UI muestra ~100). Evita crecer indefinidamente con deltas.
const size_t kCap = 200;
if (g_data.claude.recent_executions.size() > kCap) {
g_data.claude.recent_executions.resize(kCap);
}
return true;
}
static void poll_ws() {
bool connected = g_ws.is_connected();
long long ts = g_ws.last_event_ts();
monitor_set_ws_state(connected, ts);
std::vector<std::string> msgs;
g_ws.drain(msgs, 32);
for (const auto& m : msgs) {
apply_ws_message(m);
}
}
static void reload_data() {
// Conservar la ventana del Monitor entre recargas (no se pierde al refrescar).
int prev_window = g_data.claude.window_secs;
if (prev_window == 0 && g_data.claude.total_calls == 0) prev_window = 86400;
g_data = RegistryData{};
g_data.claude.window_secs = prev_window;
// Try HTTP API first
if (!g_api_url.empty()) {
g_loaded = load_registry_data_http(g_api_url, g_data);
if (g_loaded) {
g_using_http = true;
// Issue 0085d: best-effort load of Claude telemetry from
// ops:call_monitor. Falla silenciosamente si no esta disponible
// (la tab Monitor mostrara placeholder).
load_claude_usage_http(g_api_url, g_data, g_data.claude.window_secs);
return;
}
fprintf(stderr, "HTTP API failed, falling back to SQLite\n");
@@ -44,12 +169,32 @@ static void reload_data() {
}
}
// Refetch SOLO de telemetria del Monitor. Se dispara al cambiar la ventana
// temporal o al recibir un evento WS que invalide el snapshot. No toca el
// registry general.
static void reload_monitor() {
if (g_api_url.empty() || !g_loaded) return;
load_claude_usage_http(g_api_url, g_data, g_data.claude.window_secs);
}
static void render() {
if (ImGui::GetIO().UserData != nullptr) {
ImGui::GetIO().UserData = nullptr;
reload_data();
}
// Issue 0086: el Monitor pide refetch parcial cuando el usuario cambia la
// ventana temporal o pulsa Refresh. No pasa por reload_data() para no
// tirar abajo todo el dataset del registry.
if (monitor_consume_reload_request()) {
reload_monitor();
}
// Issue 0086: drena la cola de mensajes WS y aplica deltas a g_data.
// No bloquea — drain es O(N) sobre los mensajes encolados desde el
// ultimo frame (tipicamente 0-3).
poll_ws();
if (!g_loaded) {
fullscreen_window_begin("##error");
ImGui::TextColored(ImVec4(1, 0.3f, 0.3f, 1),
@@ -117,11 +262,12 @@ int main(int argc, char** argv) {
// Info de la ventana About (submenu Settings → About...)
fn_ui::about_window_set_info(
"fn_registry Dashboard",
"0.3.0",
"Dashboard ImGui para visualizar el estado del fn_registry. "
"Consume datos via sqlite_api HTTP (fallback a SQLite directo). "
"KPIs con sparkline, charts con leyenda, tablas, altura responsive, "
"Status panel en Settings, multi-viewport, dashboard_panel en views."
"0.4.0",
"Dashboard ImGui del fn_registry. Pestana Monitor por defecto con KPIs "
"live (Calls / Errors / Violations / Copies / Versions), Recent Executions "
"con timestamp, filtro de ventana (1h/24h/7d/30d/All) y WS subscription "
"al hub /api/events/call_monitor de sqlite_api. Resto de tabs (Dashboard, "
"Explorer, Projects, Apps, Analysis, Types) sin cambios."
);
// Seccion Status dentro de la ventana Settings (submenu Settings → Settings...).
@@ -164,6 +310,17 @@ int main(int argc, char** argv) {
reload_data();
// Issue 0086: lanza el cliente WS al hub de eventos. El hub solo arranca
// su ticker cuando recibe el primer subscriber, asi que esta conexion
// tambien le dice al servidor "empieza a streamear".
{
std::string ws_host;
int ws_port = 0;
if (parse_http_url(g_api_url, ws_host, ws_port)) {
g_ws.start(ws_host, ws_port, "/api/events/call_monitor");
}
}
return fn::run_app(
{.title = "fn_registry Dashboard",
.width = 1600,
+280 -3
View File
@@ -31,6 +31,7 @@
#include <cstdio>
#include <cstring>
#include <cctype>
#include <ctime>
#include <string>
#include <vector>
@@ -40,6 +41,57 @@
static std::string g_api_url;
static fn_ui::ProcessRunner g_reindex_runner;
// ---------------------------------------------------------------------------
// Monitor (issue 0086) — local state
// ---------------------------------------------------------------------------
// Presets de ventana temporal: 1h, 24h, 7d, 30d, All. Indice por defecto = 24h.
static const char* kMonitorWindowLabels[] = {"1h", "24h", "7d", "30d", "All"};
static const int kMonitorWindowSecs[] = {3600, 86400, 604800, 2592000, 0};
static int g_monitor_window_idx = 1; // 24h por defecto
static bool g_monitor_reload_request = false;
static bool g_monitor_ws_connected = false;
static long long g_monitor_last_event_ts = 0;
bool monitor_consume_reload_request() {
bool r = g_monitor_reload_request;
g_monitor_reload_request = false;
return r;
}
void monitor_set_ws_state(bool connected, long long last_event_ts) {
g_monitor_ws_connected = connected;
if (last_event_ts > 0) g_monitor_last_event_ts = last_event_ts;
}
// Formatea un epoch ts en "YYYY-MM-DD HH:MM:SS" local. Si ts == 0 -> "-".
static std::string format_ts(long long ts) {
if (ts <= 0) return "-";
std::time_t t = static_cast<std::time_t>(ts);
std::tm tm_buf{};
#if defined(_WIN32)
localtime_s(&tm_buf, &t);
#else
localtime_r(&t, &tm_buf);
#endif
char buf[32];
std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", &tm_buf);
return std::string(buf);
}
// Formatea ts relativo a now: "3s", "2m", "1h", "4d". Para "live indicator".
static std::string format_ts_relative(long long ts) {
if (ts <= 0) return "never";
std::time_t now = std::time(nullptr);
long long diff = static_cast<long long>(now) - ts;
if (diff < 0) diff = 0;
char buf[32];
if (diff < 60) std::snprintf(buf, sizeof(buf), "%llds ago", (long long)diff);
else if (diff < 3600) std::snprintf(buf, sizeof(buf), "%lldm ago", (long long)(diff / 60));
else if (diff < 86400) std::snprintf(buf, sizeof(buf), "%lldh ago", (long long)(diff / 3600));
else std::snprintf(buf, sizeof(buf), "%lldd ago", (long long)(diff / 86400));
return std::string(buf);
}
static fn_ui::ProcessRunner g_add_runner;
// Add modal state
@@ -311,6 +363,226 @@ void draw_types_list(const std::vector<TypeRow>& types) {
table_view("##types", headers, cols, cells.data(), static_cast<int>(types.size()));
}
// ---------------------------------------------------------------------------
// Monitor tab (issue 0086) — reads from ops:call_monitor.
// Pestana principal del dashboard. Bucle reactivo: construir / ejecutar /
// recopilar / analizar / mejorar lo vigila desde aqui.
// ---------------------------------------------------------------------------
static void draw_monitor_toolbar(RegistryData& data) {
fn_ui::toolbar_begin();
// Window preset selector. Si cambia, marcamos reload_request para que
// main.cpp recargue solo claude (no toca registry entero).
ImGui::TextUnformatted("Window:");
ImGui::SameLine();
ImGui::SetNextItemWidth(110.0f);
if (ImGui::BeginCombo("##monitor_window", kMonitorWindowLabels[g_monitor_window_idx])) {
const int n = (int)(sizeof(kMonitorWindowLabels) / sizeof(kMonitorWindowLabels[0]));
for (int i = 0; i < n; i++) {
const bool selected = (i == g_monitor_window_idx);
if (ImGui::Selectable(kMonitorWindowLabels[i], selected)) {
if (i != g_monitor_window_idx) {
g_monitor_window_idx = i;
data.claude.window_secs = kMonitorWindowSecs[i];
g_monitor_reload_request = true;
}
}
if (selected) ImGui::SetItemDefaultFocus();
}
ImGui::EndCombo();
}
ImGui::SameLine();
if (fn_ui::button("Refresh", fn_ui::ButtonVariant::Subtle)) {
g_monitor_reload_request = true;
}
// Live LED: verde si WS conectado, gris si caido. Ts ultimo evento.
ImGui::SameLine();
ImGui::Dummy(ImVec2(fn_tokens::spacing::lg, 0));
ImGui::SameLine();
const ImVec4 dot_col = g_monitor_ws_connected
? ImVec4(0.30f, 0.85f, 0.40f, 1.0f)
: ImVec4(0.55f, 0.55f, 0.55f, 1.0f);
ImGui::TextColored(dot_col, "%s", g_monitor_ws_connected ? TI_POINT : TI_CIRCLE_DOTTED);
ImGui::SameLine();
ImGui::TextUnformatted(g_monitor_ws_connected ? "live" : "offline");
if (g_monitor_last_event_ts > 0) {
ImGui::SameLine();
ImGui::PushStyleColor(ImGuiCol_Text, fn_tokens::colors::text_muted);
const std::string rel = format_ts_relative(g_monitor_last_event_ts);
ImGui::Text("(last event: %s)", rel.c_str());
ImGui::PopStyleColor();
}
fn_ui::toolbar_end();
}
void draw_monitor(RegistryData& data) {
auto& cu = data.claude;
// Toolbar siempre visible (date filter + LED) incluso si call_monitor caido.
draw_monitor_toolbar(data);
ImGui::Dummy(ImVec2(0, fn_tokens::spacing::sm));
if (!cu.available) {
ImGui::PushStyleColor(ImGuiCol_Text, fn_tokens::colors::text_muted);
ImGui::TextWrapped("%s call_monitor.operations.db no esta accesible.", TI_ALERT_CIRCLE);
ImGui::TextWrapped("Inicializa con: ./projects/fn_monitoring/apps/call_monitor/call_monitor init");
ImGui::TextWrapped("Despues `systemctl --user restart sqlite_api` para que el datasource ops:call_monitor sea descubierto.");
ImGui::PopStyleColor();
return;
}
// 5 KPI cards: Calls / Errors / Violations / Copies / Versions
const ImGuiTableFlags flags = ImGuiTableFlags_SizingStretchSame | ImGuiTableFlags_NoPadOuterX;
if (ImGui::BeginTable("##monitor_kpi", 5, flags)) {
struct KPI { const char* label; float value; const char* icon; };
const KPI cards[5] = {
{"Calls", static_cast<float>(cu.total_calls), TI_ACTIVITY},
{"Errors", static_cast<float>(cu.total_errors), TI_ALERT_TRIANGLE},
{"Violations", static_cast<float>(cu.total_violations), TI_ALERT_CIRCLE},
{"Copies", static_cast<float>(cu.total_copies), TI_COPY},
{"Versions", static_cast<float>(cu.total_versions), TI_HISTORY},
};
ImGui::TableNextRow();
for (int i = 0; i < 5; i++) {
ImGui::TableSetColumnIndex(i);
kpi_card(cards[i].label, cards[i].value, 0.0f, nullptr, 0, "%.0f", cards[i].icon);
}
ImGui::EndTable();
}
ImGui::Dummy(ImVec2(0, fn_tokens::spacing::md));
// Sub-tabs: Recent Executions (primera) / Top Functions / Violations / Copies
if (ImGui::BeginTabBar("##monitor_sub_tabs")) {
if (ImGui::BeginTabItem("Recent Executions")) {
if (cu.recent_executions.empty()) {
ImGui::TextDisabled("No executions in this window. Try widening (7d/30d/All) or wait for the next call.");
} else {
const ImGuiTableFlags tf = ImGuiTableFlags_RowBg | ImGuiTableFlags_Borders
| ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY;
if (ImGui::BeginTable("##monitor_recent", 6, tf, ImVec2(0, 0))) {
ImGui::TableSetupColumn("When");
ImGui::TableSetupColumn("Function");
ImGui::TableSetupColumn("Tool");
ImGui::TableSetupColumn("ms");
ImGui::TableSetupColumn("OK");
ImGui::TableSetupColumn("Error");
ImGui::TableHeadersRow();
for (const auto& r : cu.recent_executions) {
ImGui::TableNextRow();
ImGui::TableSetColumnIndex(0);
ImGui::TextUnformatted(format_ts(r.ts).c_str());
ImGui::TableSetColumnIndex(1);
ImGui::TextUnformatted(r.function_id.empty() ? "-" : r.function_id.c_str());
ImGui::TableSetColumnIndex(2);
ImGui::TextUnformatted(r.tool_used.c_str());
ImGui::TableSetColumnIndex(3);
ImGui::Text("%d", r.duration_ms);
ImGui::TableSetColumnIndex(4);
if (r.success) {
ImGui::PushStyleColor(ImGuiCol_Text, fn_tokens::colors::success);
ImGui::TextUnformatted(TI_CHECK);
ImGui::PopStyleColor();
} else {
ImGui::PushStyleColor(ImGuiCol_Text, fn_tokens::colors::error);
ImGui::TextUnformatted(TI_X);
ImGui::PopStyleColor();
}
ImGui::TableSetColumnIndex(5);
ImGui::TextUnformatted(r.error_class.c_str());
}
ImGui::EndTable();
}
}
ImGui::EndTabItem();
}
if (ImGui::BeginTabItem("Top Functions")) {
if (cu.top_functions.empty()) {
ImGui::TextDisabled("No function calls recorded yet. Hook fires on next session.");
} else {
const ImGuiTableFlags tf = ImGuiTableFlags_RowBg | ImGuiTableFlags_Borders
| ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY;
if (ImGui::BeginTable("##monitor_top_fn", 6, tf, ImVec2(0, 0))) {
ImGui::TableSetupColumn("Function ID");
ImGui::TableSetupColumn("Calls");
ImGui::TableSetupColumn("7d");
ImGui::TableSetupColumn("Errors");
ImGui::TableSetupColumn("Error %");
ImGui::TableSetupColumn("Mean ms");
ImGui::TableHeadersRow();
for (const auto& r : cu.top_functions) {
ImGui::TableNextRow();
ImGui::TableSetColumnIndex(0); ImGui::TextUnformatted(r.function_id.c_str());
ImGui::TableSetColumnIndex(1); ImGui::Text("%d", r.calls_total);
ImGui::TableSetColumnIndex(2); ImGui::Text("%d", r.calls_7d);
ImGui::TableSetColumnIndex(3); ImGui::Text("%d", r.errors_total);
ImGui::TableSetColumnIndex(4); ImGui::Text("%.1f%%", r.error_rate * 100.0);
ImGui::TableSetColumnIndex(5); ImGui::Text("%.0f", r.mean_duration_ms);
}
ImGui::EndTable();
}
}
ImGui::EndTabItem();
}
if (ImGui::BeginTabItem("Violations")) {
if (cu.recent_violations.empty()) {
ImGui::TextDisabled("No antipattern violations detected.");
} else {
const ImGuiTableFlags tf = ImGuiTableFlags_RowBg | ImGuiTableFlags_Borders
| ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY;
if (ImGui::BeginTable("##monitor_viol", 5, tf, ImVec2(0, 0))) {
ImGui::TableSetupColumn("When");
ImGui::TableSetupColumn("Rule");
ImGui::TableSetupColumn("Severity");
ImGui::TableSetupColumn("Function");
ImGui::TableSetupColumn("Snippet");
ImGui::TableHeadersRow();
for (const auto& r : cu.recent_violations) {
ImGui::TableNextRow();
ImGui::TableSetColumnIndex(0); ImGui::TextUnformatted(format_ts(r.ts).c_str());
ImGui::TableSetColumnIndex(1); ImGui::TextUnformatted(r.rule_id.c_str());
ImGui::TableSetColumnIndex(2); ImGui::TextUnformatted(r.severity.c_str());
ImGui::TableSetColumnIndex(3); ImGui::TextUnformatted(r.function_id.c_str());
ImGui::TableSetColumnIndex(4); ImGui::TextUnformatted(r.command_snippet.c_str());
}
ImGui::EndTable();
}
}
ImGui::EndTabItem();
}
if (ImGui::BeginTabItem("Copied Code")) {
if (cu.copies.empty()) {
ImGui::TextDisabled("No copied code detected. Run `fn doctor copied-code` or `call_monitor copied-code`.");
} else {
const ImGuiTableFlags tf = ImGuiTableFlags_RowBg | ImGuiTableFlags_Borders
| ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY;
if (ImGui::BeginTable("##monitor_copies", 5, tf, ImVec2(0, 0))) {
ImGui::TableSetupColumn("Kind");
ImGui::TableSetupColumn("Sim");
ImGui::TableSetupColumn("App File");
ImGui::TableSetupColumn("App Function");
ImGui::TableSetupColumn("Registry ID");
ImGui::TableHeadersRow();
for (const auto& r : cu.copies) {
ImGui::TableNextRow();
ImGui::TableSetColumnIndex(0); ImGui::TextUnformatted(r.kind.c_str());
ImGui::TableSetColumnIndex(1); ImGui::Text("%.2f", r.similarity);
ImGui::TableSetColumnIndex(2); ImGui::TextUnformatted(r.app_file.c_str());
ImGui::TableSetColumnIndex(3); ImGui::TextUnformatted(r.app_function.c_str());
ImGui::TableSetColumnIndex(4); ImGui::TextUnformatted(r.registry_id.c_str());
}
ImGui::EndTable();
}
}
ImGui::EndTabItem();
}
ImGui::EndTabBar();
}
}
// ---------------------------------------------------------------------------
// Projects view
// ---------------------------------------------------------------------------
@@ -887,10 +1159,15 @@ void draw_dashboard(RegistryData& data) {
draw_actions_bar();
ImGui::Dummy(ImVec2(0, fn_tokens::spacing::sm));
// Navegacion top-level: cada tab ocupa toda la zona de contenido. El
// primero ("Dashboard") incluye los KPIs + charts + tabla de recientes;
// los demas son vistas dedicadas a su entidad.
// Navegacion top-level: "Monitor" es la primera y por defecto (issue 0086).
// El bucle reactivo (construir / ejecutar / recopilar / analizar / mejorar)
// se vigila desde alli, asi que pega como landing. Las demas son vistas
// dedicadas a entidades del registry.
if (ImGui::BeginTabBar("##main_tabs", ImGuiTabBarFlags_FittingPolicyScroll)) {
if (ImGui::BeginTabItem("Monitor")) {
draw_monitor(data);
ImGui::EndTabItem();
}
if (ImGui::BeginTabItem("Dashboard")) {
draw_kpi_row(data);
ImGui::Dummy(ImVec2(0, fn_tokens::spacing::md));
+15
View File
@@ -22,3 +22,18 @@ void draw_projects_list(RegistryData& data);
// Explorer: lista navegable de funciones + visor de codigo y documentacion.
// Carga la lista completa al primer render via /api/databases/registry/query.
void draw_functions_explorer();
// Issue 0086: tab "Monitor" (antes "Claude Usage"). Pestana principal y por
// defecto. Muestra KPIs del bucle reactivo + Recent Executions con timestamps,
// top functions, violations, copied code. Filtro de fecha por presets y
// estado WS live (LED + ts ultimo evento). Si data.claude.available == false
// muestra placeholder con instrucciones para inicializar call_monitor.
void draw_monitor(RegistryData& data);
// Flag global: cuando draw_monitor cambia la ventana o WS reconecta y necesita
// refetch parcial. Lo lee main.cpp y llama load_claude_usage_http sin tocar
// el resto del registry (rapido).
bool monitor_consume_reload_request();
// Setter expuesto a main.cpp: refleja estado WS en el LED. true = live.
void monitor_set_ws_state(bool connected, long long last_event_ts);
+404
View File
@@ -0,0 +1,404 @@
#include "ws_client.h"
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <random>
#include <sstream>
#include <vector>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET sock_t;
#define SOCK_INVALID INVALID_SOCKET
#define SOCK_CLOSE closesocket
#define SOCK_ERR WSAGetLastError()
#define FN_SOCK_NONBLOCK(s) do { u_long m = 1; ioctlsocket((s), FIONBIO, &m); } while (0)
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
typedef int sock_t;
#define SOCK_INVALID (-1)
#define SOCK_CLOSE close
#define SOCK_ERR errno
#define FN_SOCK_NONBLOCK(s) do { int f = fcntl((s), F_GETFL, 0); fcntl((s), F_SETFL, f | O_NONBLOCK); } while (0)
#endif
#ifdef _WIN32
static bool wsa_init_ws() {
static bool inited = false;
static std::once_flag flag;
std::call_once(flag, []() {
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
});
return true;
}
#endif
namespace {
// ----- Base64 (small, sufficient for 16-byte WS key) -----
const char* kBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
std::string base64_encode(const uint8_t* in, size_t len) {
std::string out;
out.reserve(((len + 2) / 3) * 4);
size_t i = 0;
for (; i + 3 <= len; i += 3) {
uint32_t v = (uint32_t(in[i]) << 16) | (uint32_t(in[i + 1]) << 8) | uint32_t(in[i + 2]);
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(kBase64[(v >> 6) & 63]);
out.push_back(kBase64[v & 63]);
}
if (i < len) {
uint32_t v = uint32_t(in[i]) << 16;
if (i + 1 < len) v |= uint32_t(in[i + 1]) << 8;
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(i + 1 < len ? kBase64[(v >> 6) & 63] : '=');
out.push_back('=');
}
return out;
}
// Send exactly n bytes (blocking).
bool send_all(sock_t sock, const char* data, size_t n) {
size_t sent = 0;
while (sent < n) {
int k = send(sock, data + sent, static_cast<int>(n - sent), 0);
if (k <= 0) return false;
sent += k;
}
return true;
}
// Receive exactly n bytes (blocking on a non-non-blocking socket).
bool recv_all(sock_t sock, char* data, size_t n) {
size_t got = 0;
while (got < n) {
int k = recv(sock, data + got, static_cast<int>(n - got), 0);
if (k <= 0) return false;
got += k;
}
return true;
}
// Receive up to n bytes; returns count, or -1 on error / -2 on would-block.
int recv_some(sock_t sock, char* data, size_t n) {
int k = recv(sock, data, static_cast<int>(n), 0);
if (k > 0) return k;
if (k == 0) return -1;
#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK) return -2;
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) return -2;
#endif
return -1;
}
} // namespace
WsClient::WsClient() = default;
WsClient::~WsClient() {
stop();
}
void WsClient::start(const std::string& host, int port, const std::string& path) {
State expected = State::Idle;
if (!state_.compare_exchange_strong(expected, State::Connecting)) return;
host_ = host;
port_ = port;
path_ = path;
stop_flag_.store(false);
worker_ = std::thread([this]() { this->run(); });
}
void WsClient::stop() {
stop_flag_.store(true);
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
out_cv_.notify_all();
if (worker_.joinable()) worker_.join();
state_.store(State::Stopped);
}
int WsClient::drain(std::vector<std::string>& out, int max) {
std::lock_guard<std::mutex> g(in_mu_);
int n = 0;
while (!in_queue_.empty() && n < max) {
out.emplace_back(std::move(in_queue_.front()));
in_queue_.pop_front();
n++;
}
return n;
}
bool WsClient::send_text(const std::string& payload) {
if (state_.load() != State::Connected) return false;
{
std::lock_guard<std::mutex> g(out_mu_);
out_queue_.push_back(payload);
}
out_cv_.notify_one();
return true;
}
void WsClient::run() {
int backoff_ms = 500;
while (!stop_flag_.load()) {
state_.store(State::Connecting);
if (connect_once()) {
backoff_ms = 500; // reset on successful connect
state_.store(State::Connected);
read_loop();
}
state_.store(State::Backoff);
if (stop_flag_.load()) break;
// Exponential backoff: 0.5s → 1s → 2s → 4s → 8s (cap).
for (int slept = 0; slept < backoff_ms && !stop_flag_.load(); slept += 100) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
backoff_ms = std::min(backoff_ms * 2, 8000);
}
state_.store(State::Stopped);
}
bool WsClient::connect_once() {
#ifdef _WIN32
wsa_init_ws();
#endif
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == SOCK_INVALID) return false;
// 5s connect timeout via SO_*TIMEO. Stays blocking afterwards for the
// handshake; read_loop switches to non-blocking with select().
#ifdef _WIN32
DWORD timeout_ms = 5000;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
struct sockaddr_in addr;
std::memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<uint16_t>(port_));
addr.sin_addr.s_addr = inet_addr(host_.c_str());
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
SOCK_CLOSE(sock);
return false;
}
sock_.store(static_cast<int>(sock));
if (!handshake()) {
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
return false;
}
// Non-blocking for the read loop.
FN_SOCK_NONBLOCK(sock);
return true;
}
bool WsClient::handshake() {
sock_t sock = static_cast<sock_t>(sock_.load());
// 16 random bytes → base64 → Sec-WebSocket-Key.
uint8_t key_raw[16];
std::random_device rd;
for (auto& b : key_raw) b = static_cast<uint8_t>(rd() & 0xff);
std::string key_b64 = base64_encode(key_raw, sizeof(key_raw));
std::ostringstream req;
req << "GET " << path_ << " HTTP/1.1\r\n";
req << "Host: " << host_ << ":" << port_ << "\r\n";
req << "Upgrade: websocket\r\n";
req << "Connection: Upgrade\r\n";
req << "Sec-WebSocket-Key: " << key_b64 << "\r\n";
req << "Sec-WebSocket-Version: 13\r\n";
req << "Origin: http://" << host_ << ":" << port_ << "\r\n";
req << "\r\n";
std::string raw = req.str();
if (!send_all(sock, raw.c_str(), raw.size())) return false;
// Read response headers (up to 4KB).
std::string resp;
char buf[1024];
while (resp.find("\r\n\r\n") == std::string::npos) {
int k = recv(sock, buf, sizeof(buf), 0);
if (k <= 0) return false;
resp.append(buf, k);
if (resp.size() > 4096) return false;
}
// Expect "HTTP/1.1 101".
if (resp.compare(0, 12, "HTTP/1.1 101") != 0 &&
resp.compare(0, 12, "HTTP/1.0 101") != 0) {
fprintf(stderr, "[ws] handshake failed: %.*s\n",
(int)std::min<size_t>(resp.size(), 120), resp.c_str());
return false;
}
// We intentionally skip Sec-WebSocket-Accept verification — controlled
// server, localhost-only, 101 status is enough for this app.
return true;
}
bool WsClient::read_loop() {
sock_t sock = static_cast<sock_t>(sock_.load());
std::vector<uint8_t> rb; // accumulated read buffer
rb.reserve(64 * 1024);
while (!stop_flag_.load()) {
// Block on select() for up to 100ms so we can both read and check
// the outgoing queue.
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100 * 1000;
int sel = select(static_cast<int>(sock) + 1, &rfds, nullptr, nullptr, &tv);
if (sel < 0) return false;
if (sel > 0 && FD_ISSET(sock, &rfds)) {
uint8_t tmp[8192];
int k = recv_some(sock, reinterpret_cast<char*>(tmp), sizeof(tmp));
if (k == -1) return false;
if (k > 0) rb.insert(rb.end(), tmp, tmp + k);
}
// Drain outgoing queue (text frames, masked).
for (;;) {
std::string payload;
{
std::lock_guard<std::mutex> g(out_mu_);
if (out_queue_.empty()) break;
payload = std::move(out_queue_.front());
out_queue_.pop_front();
}
if (!send_frame(0x1, payload)) return false;
}
// Parse frames. RFC6455 minimal: assume server never masks, no
// continuation, opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong.
while (rb.size() >= 2) {
uint8_t b0 = rb[0];
uint8_t b1 = rb[1];
bool fin = (b0 & 0x80) != 0;
(void)fin;
int opcode = b0 & 0x0F;
bool mask = (b1 & 0x80) != 0;
uint64_t len = b1 & 0x7F;
size_t pos = 2;
if (len == 126) {
if (rb.size() < pos + 2) break;
len = (uint64_t(rb[pos]) << 8) | uint64_t(rb[pos + 1]);
pos += 2;
} else if (len == 127) {
if (rb.size() < pos + 8) break;
len = 0;
for (int i = 0; i < 8; i++) len = (len << 8) | rb[pos + i];
pos += 8;
}
uint8_t mkey[4] = {0, 0, 0, 0};
if (mask) {
if (rb.size() < pos + 4) break;
for (int i = 0; i < 4; i++) mkey[i] = rb[pos + i];
pos += 4;
}
if (rb.size() < pos + len) break;
std::string payload;
payload.resize(static_cast<size_t>(len));
for (size_t i = 0; i < len; i++) {
uint8_t c = rb[pos + i];
if (mask) c ^= mkey[i & 3];
payload[i] = static_cast<char>(c);
}
rb.erase(rb.begin(), rb.begin() + pos + len);
switch (opcode) {
case 0x1: { // text
last_event_ts_.store(std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count());
std::lock_guard<std::mutex> g(in_mu_);
in_queue_.emplace_back(std::move(payload));
// Bound the queue. Drop oldest if too big — UI consumes
// each frame so this should only kick in if the dashboard
// is paused (window minimized, etc.).
while (in_queue_.size() > 512) in_queue_.pop_front();
break;
}
case 0x8: // close
return false;
case 0x9: // ping → reply with pong (same payload)
if (!send_frame(0xA, payload)) return false;
break;
case 0xA: // pong, ignore
break;
default:
// 0x0 continuation or unexpected opcode: bail (server
// controlled by us, shouldn't happen).
return false;
}
}
}
return true;
}
bool WsClient::send_frame(int opcode, const std::string& payload) {
sock_t sock = static_cast<sock_t>(sock_.load());
if (sock == SOCK_INVALID || static_cast<int>(sock) < 0) return false;
std::vector<uint8_t> frame;
frame.reserve(payload.size() + 16);
frame.push_back(static_cast<uint8_t>(0x80 | (opcode & 0x0F))); // FIN + opcode
uint64_t len = payload.size();
if (len < 126) {
frame.push_back(static_cast<uint8_t>(0x80 | len)); // mask bit set
} else if (len <= 0xFFFF) {
frame.push_back(static_cast<uint8_t>(0x80 | 126));
frame.push_back(static_cast<uint8_t>((len >> 8) & 0xFF));
frame.push_back(static_cast<uint8_t>(len & 0xFF));
} else {
frame.push_back(static_cast<uint8_t>(0x80 | 127));
for (int i = 7; i >= 0; i--) frame.push_back(static_cast<uint8_t>((len >> (8 * i)) & 0xFF));
}
// Mask key (RFC requires mask for client → server frames).
std::random_device rd;
uint8_t mkey[4];
for (auto& b : mkey) b = static_cast<uint8_t>(rd() & 0xff);
for (int i = 0; i < 4; i++) frame.push_back(mkey[i]);
for (size_t i = 0; i < payload.size(); i++) {
frame.push_back(static_cast<uint8_t>(payload[i]) ^ mkey[i & 3]);
}
return send_all(sock, reinterpret_cast<const char*>(frame.data()), frame.size());
}
+74
View File
@@ -0,0 +1,74 @@
#pragma once
// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the
// Monitor tab. Background thread does connect + handshake + read loop and
// pushes incoming text payloads into a thread-safe queue that the ImGui
// thread drains each frame.
//
// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app
// needs WS later, extract this file to cpp/functions/network/ via
// fn-constructor.
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
class WsClient {
public:
WsClient();
~WsClient();
// Non-blocking. Spawns background thread that connects to ws://host:port/path
// and keeps the connection alive with exponential reconnect backoff.
// Safe to call multiple times — second call is a no-op once running.
void start(const std::string& host, int port, const std::string& path);
// Stop the background thread and tear down the connection.
void stop();
// True while a WS connection is up and handshake completed.
bool is_connected() const { return state_.load() == State::Connected; }
// Epoch seconds of last received frame (for the live LED in the Monitor).
long long last_event_ts() const { return last_event_ts_.load(); }
// Drain at most `max` queued text payloads. Returns the number drained.
// Called once per frame from the render thread.
int drain(std::vector<std::string>& out, int max = 64);
// Send a text frame to the server. Returns true if queued for sending.
// Used to send {"watermark": N} commands on reconnect.
bool send_text(const std::string& payload);
private:
enum class State { Idle, Connecting, Connected, Backoff, Stopped };
void run();
bool connect_once();
bool handshake();
bool read_loop();
bool send_frame(int opcode, const std::string& payload);
std::string host_;
int port_ = 0;
std::string path_;
std::atomic<State> state_{State::Idle};
std::atomic<long long> last_event_ts_{0};
std::atomic<int> sock_{-1};
std::thread worker_;
std::atomic<bool> stop_flag_{false};
std::mutex in_mu_;
std::deque<std::string> in_queue_;
std::mutex out_mu_;
std::deque<std::string> out_queue_;
// For waking the writer side of read_loop when send_text is called.
std::condition_variable out_cv_;
};