#include #include "app_base.h" #include "core/panel_menu.h" #include "core/icons_tabler.h" #include "core/logger.h" #include "data_http.h" #include "http_client.h" #include "ws_client.h" #include "tabs.h" #include "vendor/nlohmann/json.hpp" #include #include #include #include using json = nlohmann::json; // Backend config. static std::string g_api_url = "http://127.0.0.1:8484"; static std::string g_ws_host = "127.0.0.1"; static int g_ws_port = 8484; static std::string g_ws_path = "/api/ws/datafactory"; // Caches populated by REST initial fetch + WS deltas. static std::vector g_nodes; static std::vector g_runs_all; static std::vector g_databases; static std::vector g_tables; static WsClient g_ws; static int g_ws_msg_count = 0; static bool g_initial_fetched = false; static bool g_refresh_pending = false; // Panel toggles. static bool g_show_map = true; static bool g_show_extractors = true; static bool g_show_transformers = true; static bool g_show_databases = true; static bool g_show_tables = true; static bool g_show_sinks = true; static bool g_show_health = true; static bool g_show_detail = true; static bool g_show_table_preview = true; static bool g_show_live = false; static void upsert_run(const data_factory::Run& r) { for (auto& existing : g_runs_all) { if (existing.id == r.id) { existing = r; return; } } g_runs_all.push_back(r); } static void parse_ws_payload(const std::string& payload) { auto j = json::parse(payload, nullptr, false); if (!j.is_object()) return; g_ws_msg_count++; // Initial snapshot may include "nodes" / "databases". if (j.contains("nodes") && j["nodes"].is_array()) { g_nodes.clear(); for (auto& nj : j["nodes"]) { data_factory::Node n; n.id = nj.value("id", ""); n.kind = nj.value("kind", ""); n.name = nj.value("name", ""); n.function_id = nj.value("function_id", ""); n.description = nj.value("description", ""); n.schedule_cron = nj.value("schedule_cron", ""); n.enabled = nj.value("enabled", true); g_nodes.push_back(std::move(n)); } } if (j.contains("databases") && j["databases"].is_array()) { g_databases.clear(); for (auto& dj : j["databases"]) { data_factory::DatabaseInfo d; d.id = dj.value("id", ""); d.kind = dj.value("kind", ""); d.label = dj.value("label", ""); d.uri = dj.value("uri", ""); d.description = dj.value("description", ""); d.table_count = dj.value("table_count", (long long)0); d.size_bytes = dj.value("size_bytes", (long long)0); d.last_seen_at = dj.value("last_seen_at", ""); g_databases.push_back(std::move(d)); } } if (j.contains("runs") && j["runs"].is_array()) { for (auto& rj : j["runs"]) { data_factory::Run r; r.id = rj.value("id", ""); r.node_id = rj.value("node_id", ""); r.started_at = rj.value("started_at", ""); r.finished_at = rj.value("finished_at", ""); r.status = rj.value("status", ""); r.rows_in = rj.value("rows_in", (long long)0); r.rows_out = rj.value("rows_out", (long long)0); r.kb_in = rj.value("kb_in", (long long)0); r.kb_out = rj.value("kb_out", (long long)0); r.duration_ms = rj.value("duration_ms", (long long)0); r.trigger = rj.value("trigger", ""); r.error = rj.value("error", ""); upsert_run(r); if (r.status == "success" || r.status == "failed" || r.status == "cancelled") { g_refresh_pending = true; } } } } static void draw_live() { if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) { ImGui::End(); return; } bool connected = g_ws.is_connected(); ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1), "%s", connected ? "connected" : "disconnected"); ImGui::SameLine(); ImGui::Text("| msgs=%d | nodes=%zu runs=%zu dbs=%zu", g_ws_msg_count, g_nodes.size(), g_runs_all.size(), g_databases.size()); ImGui::Separator(); if (ImGui::Button("Refresh REST")) g_refresh_pending = true; ImGui::End(); } static void render() { if (!g_initial_fetched || g_refresh_pending) { g_initial_fetched = true; g_refresh_pending = false; data_factory::list_nodes_http(g_api_url, "", g_nodes); data_factory::list_databases_http(g_api_url, g_databases); data_factory::list_tables_http(g_api_url, g_tables); std::vector tmp; if (data_factory::list_runs_http(g_api_url, "", 200, tmp)) { for (auto& r : tmp) upsert_run(r); } } // Drain WS messages this frame (cheap, max 64). { std::vector msgs; g_ws.drain(msgs, 64); for (auto& m : msgs) parse_ws_payload(m); } if (g_show_map) data_factory_ui::draw_map(g_api_url, g_nodes); if (g_show_extractors) data_factory_ui::draw_extractors(g_api_url, g_nodes, g_runs_all); if (g_show_transformers) data_factory_ui::draw_transformers(g_api_url, g_nodes, g_runs_all); if (g_show_databases) data_factory_ui::draw_databases(g_api_url, g_databases); if (g_show_tables) data_factory_ui::draw_tables(g_api_url, g_tables); if (g_show_sinks) data_factory_ui::draw_sinks(g_api_url, g_nodes, g_runs_all); if (g_show_health) data_factory_ui::draw_health(g_api_url, g_runs_all); if (g_show_detail) data_factory_ui::draw_node_detail_panel( g_api_url, g_nodes, g_runs_all, &g_show_detail); if (g_show_table_preview) data_factory_ui::draw_table_preview_panel( g_api_url, &g_show_table_preview); if (g_show_live) draw_live(); } // Self-test: blocking HTTP GET to sqlite_api /api/datafactory/nodes. No GUI. static int run_self_test() { HttpClient client(g_ws_host, g_ws_port); HttpResponse resp = client.get("/api/datafactory/nodes"); if (resp.ok()) { std::printf("self-test ok: sqlite_api reachable at %s\n", g_api_url.c_str()); return 0; } std::printf("self-test fail: sqlite_api unreachable at %s (status=%d)\n", g_api_url.c_str(), resp.status); return 1; } int main(int argc, char** argv) { for (int i = 1; i < argc; i++) { if (argv[i] && std::strcmp(argv[i], "--self-test") == 0) { return run_self_test(); } } g_ws.start(g_ws_host, g_ws_port, g_ws_path); static fn_ui::PanelToggle panels[] = { { "Map", nullptr, &g_show_map }, { "Extractors", nullptr, &g_show_extractors }, { "Transformers", nullptr, &g_show_transformers }, { "Databases", nullptr, &g_show_databases }, { "Tables", nullptr, &g_show_tables }, { "Table Preview", nullptr, &g_show_table_preview }, { "Sinks", nullptr, &g_show_sinks }, { "Health", nullptr, &g_show_health }, { "Node Detail", nullptr, &g_show_detail }, { "Live (WS)", nullptr, &g_show_live }, }; fn::AppConfig cfg; cfg.title = "data_factory — Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub."; cfg.about = { "data_factory", "0.1.0", "Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub." }; cfg.log = { "data_factory.log", 1 }; cfg.panels = panels; cfg.panel_count = sizeof(panels) / sizeof(panels[0]); return fn::run_app(cfg, render); }