4cb36a92e8
- tabs.cpp draw_timeline: scatter ImPlot con eje X tiempo (UseLocalTime), eje Y categorico DAG (SetupAxisTicks), 1 serie por status con color consistente (verde/rojo/amarillo/gris). - Combo ventana: 15m/1h/6h/24h/7d. Default 24h. - Hover tooltip: punto mas cercano en pixel-space -> muestra status, dag, run id, started/finished, trigger, error. - main.cpp: g_runs_all cache. Snapshot inicial via list_runs_http(limit=200) + upserts desde WS deltas. Auto-refresh por g_refresh_pending. - Panel toggle "Timeline" en el menu View. - Helper parse_rfc3339 inline (ignora offset, asume hora local — coherente con ImPlot::UseLocalTime). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
200 lines
7.3 KiB
C++
200 lines
7.3 KiB
C++
#include <imgui.h>
|
|
#include "app_base.h"
|
|
#include "core/panel_menu.h"
|
|
#include "core/icons_tabler.h"
|
|
#include "core/logger.h"
|
|
#include "data_http.h"
|
|
#include "ws_client.h"
|
|
#include "tabs.h"
|
|
#include "vendor/nlohmann/json.hpp"
|
|
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
using json = nlohmann::json;
|
|
|
|
// Config global del backend HTTP.
|
|
static std::string g_api_url = "http://127.0.0.1:8090";
|
|
static std::string g_ws_host = "127.0.0.1";
|
|
static int g_ws_port = 8090;
|
|
static std::string g_ws_path = "/api/ws/dagruns";
|
|
|
|
// Cache en memoria del primer fetch + ultimos eventos WS.
|
|
static std::vector<dag_ui::DagInfo> g_dags;
|
|
static std::vector<dag_ui::DagRunRow> g_live_runs; // upsert por id desde WS
|
|
static std::vector<dag_ui::DagRunRow> g_runs_all; // cache para Timeline (snapshot REST + WS upsert)
|
|
static long long g_ws_runs_wm = 0;
|
|
static long long g_ws_steps_wm = 0;
|
|
static int g_ws_msg_count = 0;
|
|
static std::string g_last_error;
|
|
|
|
// Upsert por id en g_runs_all. Mantiene mas reciente al frente.
|
|
static void upsert_run_in_all(const dag_ui::DagRunRow& r) {
|
|
for (auto& existing : g_runs_all) {
|
|
if (existing.id == r.id) {
|
|
existing = r;
|
|
return;
|
|
}
|
|
}
|
|
g_runs_all.push_back(r);
|
|
}
|
|
|
|
static WsClient g_ws;
|
|
|
|
// Toggles de paneles (visibles desde el menu View del menubar canonico)
|
|
static bool g_show_main = false; // diagnostico, off por defecto
|
|
static bool g_show_live = true;
|
|
static bool g_show_dag_list = true;
|
|
static bool g_show_dag_detail = true;
|
|
static bool g_show_run_detail = true;
|
|
static bool g_show_timeline = true;
|
|
|
|
// Auto-fetch DAG list una vez al arrancar.
|
|
static bool g_initial_fetched = false;
|
|
// Flag set by tabs::draw_dag_list cuando el usuario pulsa Refresh, o cuando
|
|
// WS notifica que un run termino (status != running) — re-fetch /api/dags
|
|
// para actualizar last_runs.
|
|
static bool g_refresh_pending = false;
|
|
|
|
extern "C" void dag_list_request_refresh() { g_refresh_pending = true; }
|
|
|
|
// Upsert por id en g_live_runs.
|
|
static void upsert_live_run(const dag_ui::DagRunRow& r) {
|
|
for (auto& existing : g_live_runs) {
|
|
if (existing.id == r.id) {
|
|
existing = r;
|
|
return;
|
|
}
|
|
}
|
|
g_live_runs.push_back(r);
|
|
}
|
|
|
|
static void parse_ws_payload(const std::string& payload) {
|
|
auto j = json::parse(payload, nullptr, false);
|
|
if (!j.is_object()) return;
|
|
g_ws_msg_count++;
|
|
if (j.contains("watermark") && j["watermark"].is_object()) {
|
|
if (j["watermark"].contains("runs")) g_ws_runs_wm = j["watermark"]["runs"].get<long long>();
|
|
if (j["watermark"].contains("steps")) g_ws_steps_wm = j["watermark"]["steps"].get<long long>();
|
|
}
|
|
if (j.contains("runs") && j["runs"].is_array()) {
|
|
for (auto& rj : j["runs"]) {
|
|
dag_ui::DagRunRow r;
|
|
r.id = rj.value("id", "");
|
|
r.dag_name = rj.value("dag_name", "");
|
|
r.status = rj.value("status", "");
|
|
r.trigger = rj.value("trigger", "");
|
|
r.started_at = rj.value("started_at", "");
|
|
r.finished_at = rj.value("finished_at", "");
|
|
r.error = rj.value("error", "");
|
|
upsert_live_run(r);
|
|
upsert_run_in_all(r);
|
|
// Cuando un run termina, refresca DAG List para que last_runs
|
|
// refleje la nueva ejecucion en R1..R5.
|
|
if (r.status == "success" || r.status == "failed" ||
|
|
r.status == "cancelled") {
|
|
g_refresh_pending = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void draw_main() {
|
|
if (!ImGui::Begin(TI_HOME " Main", &g_show_main)) {
|
|
ImGui::End();
|
|
return;
|
|
}
|
|
ImGui::Text("API: %s", g_api_url.c_str());
|
|
if (ImGui::Button("Fetch /api/dags")) {
|
|
g_dags.clear();
|
|
g_last_error.clear();
|
|
if (!dag_ui::list_dags_http(g_api_url, g_dags)) {
|
|
g_last_error = "list_dags_http failed (see stderr)";
|
|
}
|
|
}
|
|
if (!g_last_error.empty()) {
|
|
ImGui::TextColored(ImVec4(1, 0.4f, 0.4f, 1), "%s", g_last_error.c_str());
|
|
}
|
|
ImGui::Separator();
|
|
ImGui::Text("DAGs: %zu", g_dags.size());
|
|
for (auto& d : g_dags) {
|
|
ImGui::BulletText("%s (%s) schedule=%s",
|
|
d.name.c_str(),
|
|
d.valid ? "valid" : "invalid",
|
|
d.schedule.empty() ? "none" : d.schedule[0].c_str());
|
|
}
|
|
ImGui::End();
|
|
}
|
|
|
|
static void draw_live() {
|
|
if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) {
|
|
ImGui::End();
|
|
return;
|
|
}
|
|
bool connected = g_ws.is_connected();
|
|
ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1),
|
|
"%s", connected ? "connected" : "disconnected");
|
|
ImGui::SameLine();
|
|
ImGui::Text("| msgs=%d | wm runs=%lld steps=%lld",
|
|
g_ws_msg_count, g_ws_runs_wm, g_ws_steps_wm);
|
|
ImGui::Separator();
|
|
ImGui::Text("Live runs: %zu", g_live_runs.size());
|
|
for (auto& r : g_live_runs) {
|
|
ImGui::BulletText("%s [%s] %s @ %s",
|
|
r.dag_name.c_str(), r.status.c_str(),
|
|
r.id.c_str(), r.started_at.c_str());
|
|
}
|
|
ImGui::End();
|
|
}
|
|
|
|
static void render() {
|
|
// Auto-fetch DAGs on first frame or on explicit refresh.
|
|
if (!g_initial_fetched || g_refresh_pending) {
|
|
g_initial_fetched = true;
|
|
g_refresh_pending = false;
|
|
dag_ui::list_dags_http(g_api_url, g_dags);
|
|
// Tambien snapshot inicial / refresh de runs para Timeline.
|
|
std::vector<dag_ui::DagRunRow> tmp;
|
|
if (dag_ui::list_runs_http(g_api_url, "", 200, tmp)) {
|
|
for (auto& r : tmp) upsert_run_in_all(r);
|
|
}
|
|
}
|
|
|
|
// Drain WS messages this frame (cheap, max 64).
|
|
{
|
|
std::vector<std::string> msgs;
|
|
g_ws.drain(msgs, 64);
|
|
for (auto& m : msgs) parse_ws_payload(m);
|
|
}
|
|
|
|
if (g_show_dag_list) dag_ui_tabs::draw_dag_list(g_api_url, g_dags, g_live_runs);
|
|
if (g_show_dag_detail) dag_ui_tabs::draw_dag_detail(g_api_url);
|
|
if (g_show_run_detail) dag_ui_tabs::draw_run_detail(g_api_url);
|
|
if (g_show_timeline) dag_ui_tabs::draw_timeline(g_api_url, g_runs_all);
|
|
if (g_show_main) draw_main();
|
|
if (g_show_live) draw_live();
|
|
}
|
|
|
|
int main(int /*argc*/, char** /*argv*/) {
|
|
// Conecta WS al backend dag_engine. Reconnect con backoff lo gestiona WsClient.
|
|
g_ws.start(g_ws_host, g_ws_port, g_ws_path);
|
|
|
|
static fn_ui::PanelToggle panels[] = {
|
|
{ "DAGs", nullptr, &g_show_dag_list },
|
|
{ "DAG Detail", nullptr, &g_show_dag_detail },
|
|
{ "Run Detail", nullptr, &g_show_run_detail },
|
|
{ "Timeline", nullptr, &g_show_timeline },
|
|
{ "Live (WS)", nullptr, &g_show_live },
|
|
{ "Main (diag)", nullptr, &g_show_main },
|
|
};
|
|
|
|
fn::AppConfig cfg;
|
|
cfg.title = "dag_engine_ui — Frontend ImGui para dag_engine. Lista, lanza e inspecciona DAGs con live updates via WS.";
|
|
cfg.about = { "dag_engine_ui", "0.1.0", "Frontend ImGui para dag_engine. Lista, lanza e inspecciona DAGs con live updates via WS." };
|
|
cfg.log = { "dag_engine_ui.log", 1 };
|
|
cfg.panels = panels;
|
|
cfg.panel_count = sizeof(panels) / sizeof(panels[0]);
|
|
|
|
return fn::run_app(cfg, render);
|
|
}
|