Files
registry_dashboard/main.cpp
T
egutierrez f7109a8ca0 docs(monitor): bump to v0.4.0, update app.md with Monitor + WS section
- About info updated to reflect Monitor as primary tab and the WS feature set.
- app.md gains a dedicated "Fase — Monitor tab + WebSocket live stream" section
  documenting the architecture, KPIs, WS hub design, fallback semantics, and
  the scope decisions (no TLS, skip Accept verification, WS client not yet
  extracted to cpp/functions/).
- Roadmap items added: extract ws_client when a second consumer arrives,
  Sec-WebSocket-Accept verification, migrate to coder/websocket on the server.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 00:35:26 +02:00

335 lines
12 KiB
C++

#include "app_base.h"
#include "imgui.h"
#include "core/fullscreen_window.h"
#include "core/app_menubar.h"
#include "core/app_about.h"
#include "core/app_settings.h"
#include "core/tokens.h"
#include "data.h"
#include "data_http.h"
#include "views.h"
#include "ws_client.h"
#include "nlohmann/json.hpp"
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <string>
#include <fstream>
#include <vector>
static RegistryData g_data;
static std::string g_db_path;
static std::string g_api_url;
static bool g_loaded = false;
static bool g_using_http = false;
static WsClient g_ws;
// Parse "http://host:port" → host, port. Devuelve false si no encaja.
static bool parse_http_url(const std::string& url, std::string& host, int& port) {
static const char* kPrefix = "http://";
if (url.rfind(kPrefix, 0) != 0) return false;
std::string rest = url.substr(std::strlen(kPrefix));
auto colon = rest.find(':');
if (colon == std::string::npos) {
host = rest;
port = 80;
return true;
}
host = rest.substr(0, colon);
auto slash = rest.find('/', colon + 1);
std::string port_str = (slash == std::string::npos)
? rest.substr(colon + 1)
: rest.substr(colon + 1, slash - colon - 1);
port = std::atoi(port_str.c_str());
return port > 0;
}
// Aplica un mensaje JSON recibido por WS a g_data.claude. Tipos:
// - "snapshot": reemplaza KPIs y la lista entera de recent_executions.
// - "delta": append rows (front), dedup por id, recalcula KPIs.
// Devuelve true si el mensaje era valido.
static bool apply_ws_message(const std::string& raw) {
using nlohmann::json;
json msg = json::parse(raw, nullptr, false);
if (!msg.is_object()) return false;
const std::string type = msg.value("type", "");
if (type != "snapshot" && type != "delta") return false;
if (msg.contains("server_time") && msg["server_time"].is_number_integer()) {
g_data.claude.last_event_ts = msg["server_time"].get<long long>();
}
if (msg.contains("watermark") && msg["watermark"].is_number_integer()) {
long long w = msg["watermark"].get<long long>();
if (w > g_data.claude.last_seen_call_id) g_data.claude.last_seen_call_id = w;
}
// Snapshot reemplaza KPIs. Delta los actualiza por incremento.
if (type == "snapshot" && msg.contains("stats") && msg["stats"].is_object()) {
const auto& s = msg["stats"];
g_data.claude.total_calls = s.value("total_calls", 0);
g_data.claude.total_errors = s.value("total_errors", 0);
g_data.claude.total_violations = s.value("total_violations", 0);
g_data.claude.total_copies = s.value("total_copies", 0);
g_data.claude.total_versions = s.value("total_versions", 0);
g_data.claude.available = true;
}
if (!msg.contains("calls") || !msg["calls"].is_array()) return true;
if (type == "snapshot") {
g_data.claude.recent_executions.clear();
}
// Construye filas nuevas
std::vector<RecentExecutionRow> incoming;
incoming.reserve(msg["calls"].size());
int new_errors = 0;
for (const auto& c : msg["calls"]) {
RecentExecutionRow row;
row.id = c.value("id", 0LL);
row.ts = c.value("ts", 0LL);
row.function_id = c.value("function_id", "");
row.tool_used = c.value("tool_used", "");
row.duration_ms = c.value("duration_ms", 0);
row.success = c.value("success", true);
row.error_class = c.value("error_class", "");
row.session_id = c.value("session_id", "");
if (!row.success) new_errors++;
incoming.push_back(std::move(row));
}
if (type == "delta") {
g_data.claude.total_calls += static_cast<int>(incoming.size());
g_data.claude.total_errors += new_errors;
}
// Prepend (newer al frente). Para delta: filas vienen ASC del server,
// las anadimos al frente en orden inverso. Para snapshot: ya vienen DESC.
if (type == "delta") {
for (auto it = incoming.rbegin(); it != incoming.rend(); ++it) {
g_data.claude.recent_executions.insert(
g_data.claude.recent_executions.begin(), std::move(*it));
}
} else {
for (auto& row : incoming) {
g_data.claude.recent_executions.push_back(std::move(row));
}
}
// Cap list (UI muestra ~100). Evita crecer indefinidamente con deltas.
const size_t kCap = 200;
if (g_data.claude.recent_executions.size() > kCap) {
g_data.claude.recent_executions.resize(kCap);
}
return true;
}
static void poll_ws() {
bool connected = g_ws.is_connected();
long long ts = g_ws.last_event_ts();
monitor_set_ws_state(connected, ts);
std::vector<std::string> msgs;
g_ws.drain(msgs, 32);
for (const auto& m : msgs) {
apply_ws_message(m);
}
}
static void reload_data() {
// Conservar la ventana del Monitor entre recargas (no se pierde al refrescar).
int prev_window = g_data.claude.window_secs;
if (prev_window == 0 && g_data.claude.total_calls == 0) prev_window = 86400;
g_data = RegistryData{};
g_data.claude.window_secs = prev_window;
// Try HTTP API first
if (!g_api_url.empty()) {
g_loaded = load_registry_data_http(g_api_url, g_data);
if (g_loaded) {
g_using_http = true;
// Issue 0085d: best-effort load of Claude telemetry from
// ops:call_monitor. Falla silenciosamente si no esta disponible
// (la tab Monitor mostrara placeholder).
load_claude_usage_http(g_api_url, g_data, g_data.claude.window_secs);
return;
}
fprintf(stderr, "HTTP API failed, falling back to SQLite\n");
}
// Fallback to direct SQLite
g_using_http = false;
if (!g_db_path.empty()) {
g_loaded = load_registry_data(g_db_path.c_str(), g_data);
if (!g_loaded) {
fprintf(stderr, "Failed to load registry data from: %s\n", g_db_path.c_str());
}
}
}
// Refetch SOLO de telemetria del Monitor. Se dispara al cambiar la ventana
// temporal o al recibir un evento WS que invalide el snapshot. No toca el
// registry general.
static void reload_monitor() {
if (g_api_url.empty() || !g_loaded) return;
load_claude_usage_http(g_api_url, g_data, g_data.claude.window_secs);
}
static void render() {
if (ImGui::GetIO().UserData != nullptr) {
ImGui::GetIO().UserData = nullptr;
reload_data();
}
// Issue 0086: el Monitor pide refetch parcial cuando el usuario cambia la
// ventana temporal o pulsa Refresh. No pasa por reload_data() para no
// tirar abajo todo el dataset del registry.
if (monitor_consume_reload_request()) {
reload_monitor();
}
// Issue 0086: drena la cola de mensajes WS y aplica deltas a g_data.
// No bloquea — drain es O(N) sobre los mensajes encolados desde el
// ultimo frame (tipicamente 0-3).
poll_ws();
if (!g_loaded) {
fullscreen_window_begin("##error");
ImGui::TextColored(ImVec4(1, 0.3f, 0.3f, 1),
"Could not load registry data");
ImGui::Spacing();
if (!g_api_url.empty())
ImGui::Text("API: %s (unreachable)", g_api_url.c_str());
if (!g_db_path.empty())
ImGui::Text("DB: %s", g_db_path.c_str());
ImGui::Spacing();
ImGui::TextWrapped(
"Usage: registry_dashboard [--api URL] [db_path ...]\n"
" --api URL Connect to sqlite_api (default: http://127.0.0.1:8484)\n"
" db_path Direct SQLite path(s) as fallback");
ImGui::Spacing();
if (ImGui::Button("Retry")) {
reload_data();
}
fullscreen_window_end();
return;
}
draw_dashboard(g_data);
}
int main(int argc, char** argv) {
// Parse --api flag
std::vector<std::string> db_candidates;
bool api_explicit = false;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--api") == 0 && i + 1 < argc) {
g_api_url = argv[++i];
api_explicit = true;
} else if (strncmp(argv[i], "--api=", 6) == 0) {
g_api_url = argv[i] + 6;
api_explicit = true;
} else {
db_candidates.push_back(argv[i]);
}
}
// Default: try localhost API if no --api given
if (!api_explicit) {
g_api_url = "http://127.0.0.1:8484";
}
// Resolve SQLite fallback path
for (auto& candidate : db_candidates) {
if (std::ifstream(candidate).good()) {
g_db_path = candidate;
fprintf(stdout, "SQLite fallback: %s\n", g_db_path.c_str());
break;
}
fprintf(stderr, "Not found: %s\n", candidate.c_str());
}
if (!db_candidates.empty()) {
if (g_db_path.empty()) g_db_path = db_candidates.back();
}
// Compartir el API URL con las vistas (para reindex/add desde la toolbar)
views_set_api_url(g_api_url);
// Info de la ventana About (submenu Settings → About...)
fn_ui::about_window_set_info(
"fn_registry Dashboard",
"0.4.0",
"Dashboard ImGui del fn_registry. Pestana Monitor por defecto con KPIs "
"live (Calls / Errors / Violations / Copies / Versions), Recent Executions "
"con timestamp, filtro de ventana (1h/24h/7d/30d/All) y WS subscription "
"al hub /api/events/call_monitor de sqlite_api. Resto de tabs (Dashboard, "
"Explorer, Projects, Apps, Analysis, Types) sin cambios."
);
// Seccion Status dentro de la ventana Settings (submenu Settings → Settings...).
// Muestra fuente activa de datos, URL del API y path SQLite fallback.
fn_ui::settings_window_add_section("status", "Status", []{
using namespace fn_tokens;
ImGui::PushStyleColor(ImGuiCol_Text, colors::text_muted);
ImGui::TextUnformatted("Source:");
ImGui::PopStyleColor();
ImGui::SameLine();
if (g_loaded) {
ImGui::PushStyleColor(ImGuiCol_Text, colors::success);
ImGui::TextUnformatted(g_using_http ? "HTTP API (connected)" : "SQLite (direct)");
ImGui::PopStyleColor();
} else {
ImGui::PushStyleColor(ImGuiCol_Text, colors::error);
ImGui::TextUnformatted("not connected");
ImGui::PopStyleColor();
}
if (!g_api_url.empty()) {
ImGui::PushStyleColor(ImGuiCol_Text, colors::text_muted);
ImGui::TextUnformatted("API:");
ImGui::PopStyleColor();
ImGui::SameLine();
ImGui::TextUnformatted(g_api_url.c_str());
}
if (!g_db_path.empty()) {
ImGui::PushStyleColor(ImGuiCol_Text, colors::text_muted);
ImGui::TextUnformatted("DB:");
ImGui::PopStyleColor();
ImGui::SameLine();
ImGui::TextUnformatted(g_db_path.c_str());
}
ImGui::Spacing();
if (ImGui::Button("Reload")) {
ImGui::GetIO().UserData = reinterpret_cast<void*>(1);
}
});
reload_data();
// Issue 0086: lanza el cliente WS al hub de eventos. El hub solo arranca
// su ticker cuando recibe el primer subscriber, asi que esta conexion
// tambien le dice al servidor "empieza a streamear".
{
std::string ws_host;
int ws_port = 0;
if (parse_http_url(g_api_url, ws_host, ws_port)) {
g_ws.start(ws_host, ws_port, "/api/events/call_monitor");
}
}
return fn::run_app(
{.title = "fn_registry Dashboard",
.width = 1600,
.height = 1000,
.viewports = true,
.about = {"fn_registry Dashboard", "0.1.0",
"Dashboard del registry: funciones, tipos, apps, drift."},
.log = {"registry_dashboard.log", 1}},
render
);
}