#include #include "app_base.h" #include "core/panel_menu.h" #include "core/icons_tabler.h" #include "core/logger.h" #include "data_http.h" #include "http_client.h" #include "ws_client.h" #include "tabs.h" #include "vendor/nlohmann/json.hpp" #include #include #include #include using json = nlohmann::json; // Config global del backend HTTP. static std::string g_api_url = "http://127.0.0.1:8090"; static std::string g_ws_host = "127.0.0.1"; static int g_ws_port = 8090; static std::string g_ws_path = "/api/ws/dagruns"; // Cache en memoria del primer fetch + ultimos eventos WS. static std::vector g_dags; static std::vector g_live_runs; // upsert por id desde WS static std::vector g_runs_all; // cache para Timeline (snapshot REST + WS upsert) static long long g_ws_runs_wm = 0; static long long g_ws_steps_wm = 0; static int g_ws_msg_count = 0; static std::string g_last_error; // Upsert por id en g_runs_all. Mantiene mas reciente al frente. static void upsert_run_in_all(const dag_ui::DagRunRow& r) { for (auto& existing : g_runs_all) { if (existing.id == r.id) { existing = r; return; } } g_runs_all.push_back(r); } static WsClient g_ws; // Toggles de paneles (visibles desde el menu View del menubar canonico) static bool g_show_main = false; // diagnostico, off por defecto static bool g_show_live = true; static bool g_show_dag_list = true; static bool g_show_dag_detail = true; static bool g_show_run_detail = true; static bool g_show_timeline = true; static bool g_show_all_runs = true; static bool g_show_health = true; static bool g_show_function_panel = true; // Auto-fetch DAG list una vez al arrancar. static bool g_initial_fetched = false; // Flag set by tabs::draw_dag_list cuando el usuario pulsa Refresh, o cuando // WS notifica que un run termino (status != running) — re-fetch /api/dags // para actualizar last_runs. static bool g_refresh_pending = false; extern "C" void dag_list_request_refresh() { g_refresh_pending = true; } // Upsert por id en g_live_runs. static void upsert_live_run(const dag_ui::DagRunRow& r) { for (auto& existing : g_live_runs) { if (existing.id == r.id) { existing = r; return; } } g_live_runs.push_back(r); } static void parse_ws_payload(const std::string& payload) { auto j = json::parse(payload, nullptr, false); if (!j.is_object()) return; g_ws_msg_count++; if (j.contains("watermark") && j["watermark"].is_object()) { if (j["watermark"].contains("runs")) g_ws_runs_wm = j["watermark"]["runs"].get(); if (j["watermark"].contains("steps")) g_ws_steps_wm = j["watermark"]["steps"].get(); } if (j.contains("runs") && j["runs"].is_array()) { for (auto& rj : j["runs"]) { dag_ui::DagRunRow r; r.id = rj.value("id", ""); r.dag_name = rj.value("dag_name", ""); r.status = rj.value("status", ""); r.trigger = rj.value("trigger", ""); r.started_at = rj.value("started_at", ""); r.finished_at = rj.value("finished_at", ""); r.error = rj.value("error", ""); upsert_live_run(r); upsert_run_in_all(r); // Cuando un run termina, refresca DAG List para que last_runs // refleje la nueva ejecucion en R1..R5. if (r.status == "success" || r.status == "failed" || r.status == "cancelled") { g_refresh_pending = true; } } } } static void draw_main() { if (!ImGui::Begin(TI_HOME " Main", &g_show_main)) { ImGui::End(); return; } ImGui::Text("API: %s", g_api_url.c_str()); if (ImGui::Button("Fetch /api/dags")) { g_dags.clear(); g_last_error.clear(); if (!dag_ui::list_dags_http(g_api_url, g_dags)) { g_last_error = "list_dags_http failed (see stderr)"; } } if (!g_last_error.empty()) { ImGui::TextColored(ImVec4(1, 0.4f, 0.4f, 1), "%s", g_last_error.c_str()); } ImGui::Separator(); ImGui::Text("DAGs: %zu", g_dags.size()); for (auto& d : g_dags) { ImGui::BulletText("%s (%s) schedule=%s", d.name.c_str(), d.valid ? "valid" : "invalid", d.schedule.empty() ? "none" : d.schedule[0].c_str()); } ImGui::End(); } static void draw_live() { if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) { ImGui::End(); return; } bool connected = g_ws.is_connected(); ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1), "%s", connected ? "connected" : "disconnected"); ImGui::SameLine(); ImGui::Text("| msgs=%d | wm runs=%lld steps=%lld", g_ws_msg_count, g_ws_runs_wm, g_ws_steps_wm); ImGui::Separator(); ImGui::Text("Live runs: %zu", g_live_runs.size()); for (auto& r : g_live_runs) { ImGui::BulletText("%s [%s] %s @ %s", r.dag_name.c_str(), r.status.c_str(), r.id.c_str(), r.started_at.c_str()); } ImGui::End(); } static void render() { // Auto-fetch DAGs on first frame or on explicit refresh. if (!g_initial_fetched || g_refresh_pending) { g_initial_fetched = true; g_refresh_pending = false; dag_ui::list_dags_http(g_api_url, g_dags); // Tambien snapshot inicial / refresh de runs para Timeline. std::vector tmp; if (dag_ui::list_runs_http(g_api_url, "", 200, tmp)) { for (auto& r : tmp) upsert_run_in_all(r); } } // Drain WS messages this frame (cheap, max 64). { std::vector msgs; g_ws.drain(msgs, 64); for (auto& m : msgs) parse_ws_payload(m); } if (g_show_dag_list) dag_ui_tabs::draw_dag_list(g_api_url, g_dags, g_live_runs); if (g_show_dag_detail) dag_ui_tabs::draw_dag_detail(g_api_url); if (g_show_run_detail) dag_ui_tabs::draw_run_detail(g_api_url); if (g_show_timeline) dag_ui_tabs::draw_timeline(g_api_url, g_runs_all); if (g_show_all_runs) dag_ui_tabs::draw_all_runs(g_api_url, g_runs_all); if (g_show_health) dag_ui_tabs::draw_health(g_api_url, g_runs_all); if (g_show_main) draw_main(); if (g_show_live) draw_live(); if (g_show_function_panel) dag_ui_tabs::draw_function_panel(g_api_url, &g_show_function_panel); } // Self-test: blocking HTTP GET to the dag_engine backend, no GUI. // Returns 0 if reachable (any 2xx), 1 otherwise. static int run_self_test() { HttpClient client(g_ws_host, g_ws_port); // Probe /api/dags as a sucedaneo de /health (no dedicated /health helper). HttpResponse resp = client.get("/api/dags"); if (resp.ok()) { std::printf("self-test ok: dag_engine reachable at %s\n", g_api_url.c_str()); return 0; } std::printf("self-test fail: dag_engine unreachable at %s (status=%d)\n", g_api_url.c_str(), resp.status); return 1; } int main(int argc, char** argv) { // CLI flag --self-test: probe backend and exit without opening GUI. for (int i = 1; i < argc; i++) { if (argv[i] && std::strcmp(argv[i], "--self-test") == 0) { return run_self_test(); } } // Conecta WS al backend dag_engine. Reconnect con backoff lo gestiona WsClient. g_ws.start(g_ws_host, g_ws_port, g_ws_path); static fn_ui::PanelToggle panels[] = { { "DAGs", nullptr, &g_show_dag_list }, { "DAG Detail", nullptr, &g_show_dag_detail }, { "Run Detail", nullptr, &g_show_run_detail }, { "Timeline", nullptr, &g_show_timeline }, { "All Runs", nullptr, &g_show_all_runs }, { "Health", nullptr, &g_show_health }, { "Function", nullptr, &g_show_function_panel }, { "Live (WS)", nullptr, &g_show_live }, { "Main (diag)", nullptr, &g_show_main }, }; fn::AppConfig cfg; cfg.title = "dag_engine_ui — Frontend ImGui para dag_engine. Lista, lanza e inspecciona DAGs con live updates via WS."; cfg.about = { "dag_engine_ui", "0.1.0", "Frontend ImGui para dag_engine. Lista, lanza e inspecciona DAGs con live updates via WS." }; cfg.log = { "dag_engine_ui.log", 1 }; cfg.panels = panels; cfg.panel_count = sizeof(panels) / sizeof(panels[0]); return fn::run_app(cfg, render); }