chore: sync from fn-registry agent

This commit is contained in:
fn-registry agent
2026-05-16 16:34:49 +02:00
commit 6337d93491
18 changed files with 26836 additions and 0 deletions
+20
View File
@@ -0,0 +1,20 @@
add_imgui_app(data_factory
main.cpp
http_client.cpp
data_http.cpp
ws_client.cpp
tabs.cpp
${CMAKE_SOURCE_DIR}/functions/core/empty_state.cpp
${CMAKE_SOURCE_DIR}/functions/core/badge.cpp
)
target_include_directories(data_factory PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
# fn_table_viz: optional, kept for parity with dag_engine_ui.
if(TARGET fn_table_viz)
target_link_libraries(data_factory PRIVATE fn_table_viz)
endif()
if(WIN32)
target_link_libraries(data_factory PRIVATE ws2_32)
set_target_properties(data_factory PROPERTIES WIN32_EXECUTABLE TRUE)
endif()
+72
View File
@@ -0,0 +1,72 @@
---
name: data_factory
lang: cpp
domain: tools
description: "Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub."
tags: [imgui, dashboard, data-pipeline, factory, http, websocket]
uses_functions:
- empty_state_cpp_core
- badge_cpp_core
uses_types: []
framework: "imgui"
entry_point: "main.cpp"
dir_path: "apps/data_factory"
repo_url: "https://gitea.organic-machine.com/dataforge/data_factory"
e2e_checks:
- id: build_cmake
cmd: "cmake --build cpp/build -j --target data_factory"
timeout_s: 300
severity: critical
- id: binary_exists
cmd: "test -x cpp/build/linux/apps/data_factory/data_factory || test -x cpp/build/apps/data_factory/data_factory"
timeout_s: 5
severity: critical
- id: self_test
cmd: "(cpp/build/linux/apps/data_factory/data_factory --self-test 2>&1 || cpp/build/apps/data_factory/data_factory --self-test 2>&1) | head -20"
timeout_s: 10
expect_stdout_contains: "self-test"
severity: warning
- id: cpp_apps_conformance
cmd: "./fn doctor cpp-apps 2>&1 | grep -A1 data_factory || echo 'no issues'"
expect_stdout_contains: "no issues"
severity: critical
---
# data_factory
Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub.
## Build
```bash
cd cpp && cmake --build build --target data_factory -j
```
## Run
```bash
./cpp/build/data_factory
```
## Paneles
| Panel | Que muestra |
|---|---|
| Map | Arbol plano agrupado por kind (extractor/transformer/database/sink/validator). Placeholder de futuro grafo. |
| Extractors | Tabla kind=extractor: Name / Function / Schedule / Last Run / Status / Rows-KB. |
| Transformers | Tabla kind=transformer: Name / Function / Last Run / Status / Rows-KB. |
| Databases | Tabla de DBs registradas: Label / Kind / URI / Tables / Size / Last Seen. |
| Sinks | Tabla kind=sink: Name / Function / Last Run / Status / Rows-KB. |
| Health | KPIs: runs_24h / success_rate / failed_24h+pending / throughput rows-KB 24h. |
| Node Detail | Panel lateral con node seleccionado: metadata + FnInfo card + ultimas 10 runs. |
| Live (WS) | Diag de conexion WS al /api/ws/datafactory de sqlite_api. |
## Backend
Apunta a `http://127.0.0.1:8484` (sqlite_api). Endpoints consumidos:
- `GET /api/datafactory/nodes?kind=...`
- `GET /api/datafactory/runs?node_id=...&limit=N`
- `GET /api/datafactory/databases`
- `GET /api/functions/{id}` (registry metadata para FnInfo)
- `GET /api/ws/datafactory` (snapshot + delta runs cada 250ms)
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 9.1 KiB

BIN
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
+222
View File
@@ -0,0 +1,222 @@
#include "data_http.h"
#include "http_client.h"
#include "vendor/nlohmann/json.hpp"
#include <cstdio>
#include <cstdlib>
using json = nlohmann::json;
namespace data_factory {
static bool parse_url(const std::string& url, std::string& host, int& port) {
auto pos = url.find("://");
std::string rest = (pos != std::string::npos) ? url.substr(pos + 3) : url;
auto colon = rest.find(':');
if (colon == std::string::npos) {
host = rest;
port = 80;
} else {
host = rest.substr(0, colon);
port = std::atoi(rest.substr(colon + 1).c_str());
}
return !host.empty() && port > 0;
}
static std::string get_str(const json& j, const char* key) {
if (!j.contains(key) || j[key].is_null()) return "";
if (j[key].is_string()) return j[key].get<std::string>();
return j[key].dump();
}
static long long get_int64(const json& j, const char* key) {
if (!j.contains(key) || j[key].is_null()) return 0;
if (j[key].is_number()) return j[key].get<long long>();
return 0;
}
static bool get_bool(const json& j, const char* key, bool def) {
if (!j.contains(key) || j[key].is_null()) return def;
if (j[key].is_boolean()) return j[key].get<bool>();
if (j[key].is_number()) return j[key].get<int>() != 0;
return def;
}
static std::vector<std::string> get_str_array(const json& j, const char* key) {
std::vector<std::string> out;
if (!j.contains(key) || !j[key].is_array()) return out;
for (auto& v : j[key]) {
if (v.is_string()) out.push_back(v.get<std::string>());
}
return out;
}
// tags can be stored as csv string or array. Tolerate both.
static std::vector<std::string> get_tags(const json& j) {
std::vector<std::string> out;
if (j.contains("tags") && j["tags"].is_array()) {
for (auto& v : j["tags"]) {
if (v.is_string()) out.push_back(v.get<std::string>());
}
return out;
}
std::string csv = get_str(j, "tags_csv");
if (csv.empty()) csv = get_str(j, "tags");
if (csv.empty()) return out;
std::string cur;
for (char c : csv) {
if (c == ',') {
if (!cur.empty()) out.push_back(cur);
cur.clear();
} else {
cur.push_back(c);
}
}
if (!cur.empty()) out.push_back(cur);
return out;
}
static void parse_node(const json& j, Node& n) {
n.id = get_str(j, "id");
n.kind = get_str(j, "kind");
n.name = get_str(j, "name");
n.function_id = get_str(j, "function_id");
n.description = get_str(j, "description");
n.schedule_cron = get_str(j, "schedule_cron");
n.enabled = get_bool(j, "enabled", true);
n.tags = get_tags(j);
n.created_at = get_str(j, "created_at");
n.updated_at = get_str(j, "updated_at");
}
static void parse_run(const json& j, Run& r) {
r.id = get_str(j, "id");
r.node_id = get_str(j, "node_id");
r.started_at = get_str(j, "started_at");
r.finished_at = get_str(j, "finished_at");
r.status = get_str(j, "status");
r.rows_in = get_int64(j, "rows_in");
r.rows_out = get_int64(j, "rows_out");
r.kb_in = get_int64(j, "kb_in");
r.kb_out = get_int64(j, "kb_out");
r.duration_ms = get_int64(j, "duration_ms");
r.trigger = get_str(j, "trigger");
r.error = get_str(j, "error");
}
static void parse_db(const json& j, DatabaseInfo& d) {
d.id = get_str(j, "id");
d.kind = get_str(j, "kind");
d.label = get_str(j, "label");
d.uri = get_str(j, "uri");
d.description = get_str(j, "description");
d.table_count = get_int64(j, "table_count");
d.size_bytes = get_int64(j, "size_bytes");
d.last_seen_at = get_str(j, "last_seen_at");
}
bool list_nodes_http(const std::string& api_url, const std::string& kind,
std::vector<Node>& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
std::string path = "/api/datafactory/nodes";
if (!kind.empty()) path += "?kind=" + kind;
auto res = cli.get(path);
if (!res.ok()) {
fprintf(stderr, "[df_http] list_nodes failed: status=%d\n", res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object() || !j.contains("nodes") || !j["nodes"].is_array()) {
return false;
}
out.clear();
for (auto& item : j["nodes"]) {
Node n;
parse_node(item, n);
out.push_back(std::move(n));
}
return true;
}
bool list_runs_http(const std::string& api_url, const std::string& node_id,
int limit, std::vector<Run>& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
std::string path = "/api/datafactory/runs?limit=" + std::to_string(limit);
if (!node_id.empty()) path += "&node_id=" + node_id;
auto res = cli.get(path);
if (!res.ok()) {
fprintf(stderr, "[df_http] list_runs failed: status=%d\n", res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object() || !j.contains("runs") || !j["runs"].is_array()) {
return false;
}
out.clear();
for (auto& item : j["runs"]) {
Run r;
parse_run(item, r);
out.push_back(std::move(r));
}
return true;
}
bool list_databases_http(const std::string& api_url,
std::vector<DatabaseInfo>& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
auto res = cli.get("/api/datafactory/databases");
if (!res.ok()) {
fprintf(stderr, "[df_http] list_databases failed: status=%d\n", res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object() || !j.contains("databases") || !j["databases"].is_array()) {
return false;
}
out.clear();
for (auto& item : j["databases"]) {
DatabaseInfo d;
parse_db(item, d);
out.push_back(std::move(d));
}
return true;
}
bool get_function_http(const std::string& api_url,
const std::string& function_id,
FnInfo& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
if (function_id.empty()) return false;
HttpClient cli(host, port);
auto res = cli.get("/api/functions/" + function_id);
if (!res.ok()) {
fprintf(stderr, "[df_http] get_function(%s) failed: status=%d\n",
function_id.c_str(), res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object()) return false;
out.id = get_str(j, "id");
out.name = get_str(j, "name");
out.description = get_str(j, "description");
out.signature = get_str(j, "signature");
out.purity = get_str(j, "purity");
out.domain = get_str(j, "domain");
out.lang = get_str(j, "lang");
out.uses_functions = get_str_array(j, "uses_functions");
out.uses_types = get_str_array(j, "uses_types");
return true;
}
} // namespace data_factory
+76
View File
@@ -0,0 +1,76 @@
#pragma once
// HTTP REST client for sqlite_api /api/datafactory/* endpoints.
// All calls are blocking; return false on net failure or non-2xx.
#include <string>
#include <vector>
namespace data_factory {
struct Node {
std::string id;
std::string kind; // extractor|transformer|database|sink|validator
std::string name;
std::string function_id;
std::string description;
std::string schedule_cron;
bool enabled = true;
std::vector<std::string> tags;
std::string created_at;
std::string updated_at;
};
struct Run {
std::string id;
std::string node_id;
std::string started_at;
std::string finished_at;
std::string status; // running|success|failed|cancelled
long long rows_in = 0;
long long rows_out = 0;
long long kb_in = 0;
long long kb_out = 0;
long long duration_ms = 0;
std::string trigger;
std::string error;
};
struct DatabaseInfo {
std::string id;
std::string kind;
std::string label;
std::string uri;
std::string description;
long long table_count = 0;
long long size_bytes = 0;
std::string last_seen_at;
};
// Mirrors dag_engine_ui FnInfo (response shape of GET /api/functions/{id}).
struct FnInfo {
std::string id;
std::string name;
std::string description;
std::string signature;
std::string purity;
std::string domain;
std::string lang;
std::vector<std::string> uses_functions;
std::vector<std::string> uses_types;
};
bool list_nodes_http(const std::string& api_url, const std::string& kind,
std::vector<Node>& out);
bool list_runs_http(const std::string& api_url, const std::string& node_id,
int limit, std::vector<Run>& out);
bool list_databases_http(const std::string& api_url,
std::vector<DatabaseInfo>& out);
bool get_function_http(const std::string& api_url,
const std::string& function_id,
FnInfo& out);
} // namespace data_factory
+77
View File
@@ -0,0 +1,77 @@
// Package datafactory opens and migrates data_factory.db. It is consumed by
// projects/fn_monitoring/apps/sqlite_api to expose REST + WS endpoints over
// the data_factory schema (nodes, connections, runs, databases).
//
// The C++ app in apps/data_factory (main.cpp) does NOT depend on this
// package. The Go subpackage lives in apps/data_factory/datafactory/ so it
// does not collide with main.cpp at the Go-toolchain level.
//
// Migrations are read from disk at runtime (apps/data_factory/migrations/
// relative to FN_REGISTRY_ROOT, or via an explicit migrationsDir argument).
// This keeps the SQL as the single source of truth — the C++ side reads
// the same files via its own bridge.
package datafactory
import (
"database/sql"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
_ "github.com/mattn/go-sqlite3"
)
// Open opens data_factory.db at dbPath (creating the file and the parent
// directory if needed) and applies every *.sql under migrationsDir in
// lexical order. Idempotent — re-running on an already-migrated DB is a
// no-op (duplicate-column / already-exists errors are tolerated).
// Returns a RW *sql.DB; callers are responsible for Close().
func Open(dbPath, migrationsDir string) (*sql.DB, error) {
if err := os.MkdirAll(filepath.Dir(dbPath), 0o755); err != nil {
return nil, fmt.Errorf("datafactory: mkdir %s: %w", filepath.Dir(dbPath), err)
}
dsn := dbPath + "?_journal_mode=WAL&_busy_timeout=5000&_foreign_keys=on"
db, err := sql.Open("sqlite3", dsn)
if err != nil {
return nil, fmt.Errorf("datafactory: open %s: %w", dbPath, err)
}
if err := db.Ping(); err != nil {
db.Close()
return nil, fmt.Errorf("datafactory: ping %s: %w", dbPath, err)
}
if err := applyMigrations(db, migrationsDir); err != nil {
db.Close()
return nil, fmt.Errorf("datafactory: migrate: %w", err)
}
return db, nil
}
func applyMigrations(conn *sql.DB, dir string) error {
entries, err := os.ReadDir(dir)
if err != nil {
return fmt.Errorf("read migrations dir %s: %w", dir, err)
}
names := make([]string, 0, len(entries))
for _, e := range entries {
if !e.IsDir() && strings.HasSuffix(e.Name(), ".sql") {
names = append(names, e.Name())
}
}
sort.Strings(names)
for _, n := range names {
b, err := os.ReadFile(filepath.Join(dir, n))
if err != nil {
return fmt.Errorf("%s: read: %w", n, err)
}
if _, err := conn.Exec(string(b)); err != nil {
if strings.Contains(err.Error(), "duplicate column") ||
strings.Contains(err.Error(), "already exists") {
continue
}
return fmt.Errorf("%s: %w", n, err)
}
}
return nil
}
+173
View File
@@ -0,0 +1,173 @@
#include "http_client.h"
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <sstream>
#include <vector>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <mutex>
#pragma comment(lib, "ws2_32.lib")
// std::call_once para evitar race condition si hay peticiones simultaneas
// desde multiples threads (main + runners).
static std::once_flag g_wsa_once;
static bool g_wsa_ok = false;
static bool wsa_init() {
std::call_once(g_wsa_once, []() {
WSADATA wsa;
g_wsa_ok = (WSAStartup(MAKEWORD(2, 2), &wsa) == 0);
});
return g_wsa_ok;
}
typedef SOCKET sock_t;
#define SOCK_INVALID INVALID_SOCKET
#define SOCK_CLOSE closesocket
#define SOCK_ERR WSAGetLastError()
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netdb.h>
#include <errno.h>
typedef int sock_t;
#define SOCK_INVALID (-1)
#define SOCK_CLOSE close
#define SOCK_ERR errno
#endif
HttpClient::HttpClient(const std::string& host, int port)
: host_(host), port_(port) {}
HttpResponse HttpClient::get(const std::string& path) {
return request("GET", path, "", "");
}
HttpResponse HttpClient::post(const std::string& path, const std::string& body,
const std::string& content_type) {
return request("POST", path, body, content_type);
}
HttpResponse HttpClient::request(const std::string& method, const std::string& path,
const std::string& body, const std::string& content_type) {
HttpResponse resp;
#ifdef _WIN32
if (!wsa_init()) {
fprintf(stderr, "[http] WSAStartup failed\n");
return resp;
}
#endif
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == SOCK_INVALID) {
fprintf(stderr, "[http] socket() failed: %d\n", SOCK_ERR);
return resp;
}
// Timeout — Windows y POSIX usan formatos distintos para SO_{RCV,SND}TIMEO.
// Windows: DWORD milisegundos. POSIX: struct timeval.
#ifdef _WIN32
DWORD timeout_ms = static_cast<DWORD>(timeout_sec_ * 1000);
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = timeout_sec_;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv));
#endif
// Connect
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<uint16_t>(port_));
addr.sin_addr.s_addr = inet_addr(host_.c_str());
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
int err = SOCK_ERR;
SOCK_CLOSE(sock);
// Reportamos el errno/WSAError en el body para que el toast sea util.
char buf[128];
std::snprintf(buf, sizeof(buf),
"connect() failed to %s:%d (err=%d)",
host_.c_str(), port_, err);
resp.body = buf;
return resp;
}
// Build request
std::ostringstream req;
req << method << " " << path << " HTTP/1.1\r\n";
req << "Host: " << host_ << ":" << port_ << "\r\n";
req << "Connection: close\r\n";
if (!body.empty()) {
req << "Content-Type: " << content_type << "\r\n";
req << "Content-Length: " << body.size() << "\r\n";
}
req << "\r\n";
if (!body.empty()) req << body;
std::string raw_req = req.str();
send(sock, raw_req.c_str(), static_cast<int>(raw_req.size()), 0);
// Read response
std::vector<char> buf(8192);
std::string raw;
for (;;) {
int n = recv(sock, buf.data(), static_cast<int>(buf.size()), 0);
if (n <= 0) break;
raw.append(buf.data(), n);
}
SOCK_CLOSE(sock);
// Parse status line
auto hdr_end = raw.find("\r\n\r\n");
if (hdr_end == std::string::npos) return resp;
// "HTTP/1.1 200 OK\r\n..."
auto first_line_end = raw.find("\r\n");
if (first_line_end == std::string::npos) return resp;
std::string status_line = raw.substr(0, first_line_end);
auto sp1 = status_line.find(' ');
if (sp1 != std::string::npos) {
resp.status = std::atoi(status_line.c_str() + sp1 + 1);
}
resp.body = raw.substr(hdr_end + 4);
// Handle chunked transfer encoding
std::string headers_str = raw.substr(0, hdr_end);
if (headers_str.find("chunked") != std::string::npos) {
// Decode chunked body
std::string decoded;
const char* p = resp.body.c_str();
const char* end = p + resp.body.size();
while (p < end) {
// Read chunk size (hex)
char* chunk_end = nullptr;
long chunk_size = strtol(p, &chunk_end, 16);
if (chunk_size <= 0) break;
// Skip \r\n after size
p = chunk_end;
if (p < end && *p == '\r') p++;
if (p < end && *p == '\n') p++;
// Read chunk data
if (p + chunk_size <= end) {
decoded.append(p, chunk_size);
}
p += chunk_size;
// Skip trailing \r\n
if (p < end && *p == '\r') p++;
if (p < end && *p == '\n') p++;
}
resp.body = decoded;
}
return resp;
}
+30
View File
@@ -0,0 +1,30 @@
#pragma once
// Minimal HTTP client — no threading, no SSL, just plain TCP to localhost.
// Works with both win32 and posix MinGW thread models.
#include <string>
struct HttpResponse {
int status = 0;
std::string body;
bool ok() const { return status >= 200 && status < 300; }
};
// Simple blocking HTTP GET/POST over TCP sockets.
// host: "127.0.0.1", port: 8484
class HttpClient {
public:
HttpClient(const std::string& host, int port);
HttpResponse get(const std::string& path);
HttpResponse post(const std::string& path, const std::string& body,
const std::string& content_type = "application/json");
private:
std::string host_;
int port_;
int timeout_sec_ = 5;
HttpResponse request(const std::string& method, const std::string& path,
const std::string& body, const std::string& content_type);
};
+200
View File
@@ -0,0 +1,200 @@
#include <imgui.h>
#include "app_base.h"
#include "core/panel_menu.h"
#include "core/icons_tabler.h"
#include "core/logger.h"
#include "data_http.h"
#include "http_client.h"
#include "ws_client.h"
#include "tabs.h"
#include "vendor/nlohmann/json.hpp"
#include <cstdio>
#include <cstring>
#include <string>
#include <vector>
using json = nlohmann::json;
// Backend config.
static std::string g_api_url = "http://127.0.0.1:8484";
static std::string g_ws_host = "127.0.0.1";
static int g_ws_port = 8484;
static std::string g_ws_path = "/api/ws/datafactory";
// Caches populated by REST initial fetch + WS deltas.
static std::vector<data_factory::Node> g_nodes;
static std::vector<data_factory::Run> g_runs_all;
static std::vector<data_factory::DatabaseInfo> g_databases;
static WsClient g_ws;
static int g_ws_msg_count = 0;
static bool g_initial_fetched = false;
static bool g_refresh_pending = false;
// Panel toggles.
static bool g_show_map = true;
static bool g_show_extractors = true;
static bool g_show_transformers= true;
static bool g_show_databases = true;
static bool g_show_sinks = true;
static bool g_show_health = true;
static bool g_show_detail = true;
static bool g_show_live = false;
static void upsert_run(const data_factory::Run& r) {
for (auto& existing : g_runs_all) {
if (existing.id == r.id) {
existing = r;
return;
}
}
g_runs_all.push_back(r);
}
static void parse_ws_payload(const std::string& payload) {
auto j = json::parse(payload, nullptr, false);
if (!j.is_object()) return;
g_ws_msg_count++;
// Initial snapshot may include "nodes" / "databases".
if (j.contains("nodes") && j["nodes"].is_array()) {
g_nodes.clear();
for (auto& nj : j["nodes"]) {
data_factory::Node n;
n.id = nj.value("id", "");
n.kind = nj.value("kind", "");
n.name = nj.value("name", "");
n.function_id = nj.value("function_id", "");
n.description = nj.value("description", "");
n.schedule_cron = nj.value("schedule_cron", "");
n.enabled = nj.value("enabled", true);
g_nodes.push_back(std::move(n));
}
}
if (j.contains("databases") && j["databases"].is_array()) {
g_databases.clear();
for (auto& dj : j["databases"]) {
data_factory::DatabaseInfo d;
d.id = dj.value("id", "");
d.kind = dj.value("kind", "");
d.label = dj.value("label", "");
d.uri = dj.value("uri", "");
d.description = dj.value("description", "");
d.table_count = dj.value("table_count", (long long)0);
d.size_bytes = dj.value("size_bytes", (long long)0);
d.last_seen_at = dj.value("last_seen_at", "");
g_databases.push_back(std::move(d));
}
}
if (j.contains("runs") && j["runs"].is_array()) {
for (auto& rj : j["runs"]) {
data_factory::Run r;
r.id = rj.value("id", "");
r.node_id = rj.value("node_id", "");
r.started_at = rj.value("started_at", "");
r.finished_at = rj.value("finished_at", "");
r.status = rj.value("status", "");
r.rows_in = rj.value("rows_in", (long long)0);
r.rows_out = rj.value("rows_out", (long long)0);
r.kb_in = rj.value("kb_in", (long long)0);
r.kb_out = rj.value("kb_out", (long long)0);
r.duration_ms = rj.value("duration_ms", (long long)0);
r.trigger = rj.value("trigger", "");
r.error = rj.value("error", "");
upsert_run(r);
if (r.status == "success" || r.status == "failed" ||
r.status == "cancelled") {
g_refresh_pending = true;
}
}
}
}
static void draw_live() {
if (!ImGui::Begin(TI_BOLT " Live (WS)", &g_show_live)) {
ImGui::End();
return;
}
bool connected = g_ws.is_connected();
ImGui::TextColored(connected ? ImVec4(0.3f, 0.9f, 0.3f, 1) : ImVec4(0.9f, 0.4f, 0.4f, 1),
"%s", connected ? "connected" : "disconnected");
ImGui::SameLine();
ImGui::Text("| msgs=%d | nodes=%zu runs=%zu dbs=%zu",
g_ws_msg_count, g_nodes.size(), g_runs_all.size(), g_databases.size());
ImGui::Separator();
if (ImGui::Button("Refresh REST")) g_refresh_pending = true;
ImGui::End();
}
static void render() {
if (!g_initial_fetched || g_refresh_pending) {
g_initial_fetched = true;
g_refresh_pending = false;
data_factory::list_nodes_http(g_api_url, "", g_nodes);
data_factory::list_databases_http(g_api_url, g_databases);
std::vector<data_factory::Run> tmp;
if (data_factory::list_runs_http(g_api_url, "", 200, tmp)) {
for (auto& r : tmp) upsert_run(r);
}
}
// Drain WS messages this frame (cheap, max 64).
{
std::vector<std::string> msgs;
g_ws.drain(msgs, 64);
for (auto& m : msgs) parse_ws_payload(m);
}
if (g_show_map) data_factory_ui::draw_map(g_api_url, g_nodes);
if (g_show_extractors) data_factory_ui::draw_extractors(g_api_url, g_nodes, g_runs_all);
if (g_show_transformers) data_factory_ui::draw_transformers(g_api_url, g_nodes, g_runs_all);
if (g_show_databases) data_factory_ui::draw_databases(g_api_url, g_databases);
if (g_show_sinks) data_factory_ui::draw_sinks(g_api_url, g_nodes, g_runs_all);
if (g_show_health) data_factory_ui::draw_health(g_api_url, g_runs_all);
if (g_show_detail) data_factory_ui::draw_node_detail_panel(
g_api_url, g_nodes, g_runs_all, &g_show_detail);
if (g_show_live) draw_live();
}
// Self-test: blocking HTTP GET to sqlite_api /api/datafactory/nodes. No GUI.
static int run_self_test() {
HttpClient client(g_ws_host, g_ws_port);
HttpResponse resp = client.get("/api/datafactory/nodes");
if (resp.ok()) {
std::printf("self-test ok: sqlite_api reachable at %s\n", g_api_url.c_str());
return 0;
}
std::printf("self-test fail: sqlite_api unreachable at %s (status=%d)\n",
g_api_url.c_str(), resp.status);
return 1;
}
int main(int argc, char** argv) {
for (int i = 1; i < argc; i++) {
if (argv[i] && std::strcmp(argv[i], "--self-test") == 0) {
return run_self_test();
}
}
g_ws.start(g_ws_host, g_ws_port, g_ws_path);
static fn_ui::PanelToggle panels[] = {
{ "Map", nullptr, &g_show_map },
{ "Extractors", nullptr, &g_show_extractors },
{ "Transformers", nullptr, &g_show_transformers },
{ "Databases", nullptr, &g_show_databases },
{ "Sinks", nullptr, &g_show_sinks },
{ "Health", nullptr, &g_show_health },
{ "Node Detail", nullptr, &g_show_detail },
{ "Live (WS)", nullptr, &g_show_live },
};
fn::AppConfig cfg;
cfg.title = "data_factory — Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub.";
cfg.about = { "data_factory", "0.1.0", "Factorio-style data pipeline factory: extractors, transformers, databases, sinks. Live updates via WS pubsub." };
cfg.log = { "data_factory.log", 1 };
cfg.panels = panels;
cfg.panel_count = sizeof(panels) / sizeof(panels[0]);
return fn::run_app(cfg, render);
}
+76
View File
@@ -0,0 +1,76 @@
-- data_factory init schema (issue 0097).
-- Idempotente. Aplicado al arrancar via embed.FS (Go side / bridge a C++).
--
-- Tablas:
-- nodes — extractors/transformers/databases/sinks declarados localmente
-- connections — aristas (lineage) entre nodes
-- runs — historico de ejecuciones (rows/kb/duration por run)
-- databases — registro de DBs externas que sirven como sink/source
--
-- Datos NUNCA viajan entre PCs por sync (per-PC, igual que dag_engine.db).
CREATE TABLE IF NOT EXISTS nodes (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK(kind IN ('extractor','transformer','database','sink','validator')),
name TEXT NOT NULL,
function_id TEXT NOT NULL DEFAULT '', -- FK logico a registry.functions.id (sin enforcement)
description TEXT NOT NULL DEFAULT '',
config_json TEXT NOT NULL DEFAULT '{}',
schedule_cron TEXT NOT NULL DEFAULT '', -- '' = manual, sino expresion cron (parseable con parse_cron_expr_go_core)
enabled INTEGER NOT NULL DEFAULT 1,
tags_csv TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_nodes_kind ON nodes(kind);
CREATE INDEX IF NOT EXISTS idx_nodes_function_id ON nodes(function_id);
CREATE INDEX IF NOT EXISTS idx_nodes_enabled ON nodes(enabled);
CREATE TABLE IF NOT EXISTS connections (
id TEXT PRIMARY KEY,
src_node TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
dst_node TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
payload_schema TEXT NOT NULL DEFAULT '{}', -- JSON schema del dato que viaja
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_connections_src ON connections(src_node);
CREATE INDEX IF NOT EXISTS idx_connections_dst ON connections(dst_node);
CREATE UNIQUE INDEX IF NOT EXISTS uniq_connection_edge ON connections(src_node, dst_node);
CREATE TABLE IF NOT EXISTS runs (
id TEXT PRIMARY KEY,
node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
started_at TEXT NOT NULL,
finished_at TEXT,
status TEXT NOT NULL DEFAULT 'running' CHECK(status IN ('running','success','failed','cancelled')),
rows_in INTEGER NOT NULL DEFAULT 0,
rows_out INTEGER NOT NULL DEFAULT 0,
kb_in INTEGER NOT NULL DEFAULT 0,
kb_out INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0,
trigger TEXT NOT NULL DEFAULT 'manual' CHECK(trigger IN ('manual','cron','dag','api')),
error TEXT NOT NULL DEFAULT '',
notes TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_runs_node ON runs(node_id);
CREATE INDEX IF NOT EXISTS idx_runs_status ON runs(status);
CREATE INDEX IF NOT EXISTS idx_runs_started ON runs(started_at DESC);
CREATE TABLE IF NOT EXISTS databases (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK(kind IN ('sqlite','postgres','bigquery','duckdb','mongo','parquet','csv','other')),
label TEXT NOT NULL,
uri TEXT NOT NULL DEFAULT '', -- connection string redacted (no creds)
description TEXT NOT NULL DEFAULT '',
tags_csv TEXT NOT NULL DEFAULT '',
last_seen_at TEXT NOT NULL DEFAULT '',
table_count INTEGER NOT NULL DEFAULT 0,
size_bytes INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_databases_kind ON databases(kind);
+584
View File
@@ -0,0 +1,584 @@
#include "tabs.h"
#include "core/icons_tabler.h"
#include "core/empty_state.h"
#include "core/badge.h"
#include <imgui.h>
#include <algorithm>
#include <cstdio>
#include <ctime>
#include <map>
#include <string>
namespace data_factory_ui {
// ---------------------------------------------------------------------------
// Globals
// ---------------------------------------------------------------------------
Selection& selection() {
static Selection s;
return s;
}
FunctionCache& function_cache() {
static FunctionCache fc;
return fc;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
static std::string format_duration(long long ms) {
if (ms <= 0) return "-";
if (ms < 1000) return std::to_string(ms) + "ms";
char buf[32];
std::snprintf(buf, sizeof(buf), "%.2fs", ms / 1000.0);
return buf;
}
static std::string format_bytes(long long bytes) {
if (bytes <= 0) return "-";
const double kb = 1024.0;
const double mb = kb * 1024.0;
const double gb = mb * 1024.0;
char buf[32];
if (bytes < (long long)kb) std::snprintf(buf, sizeof(buf), "%lld B", bytes);
else if (bytes < (long long)mb) std::snprintf(buf, sizeof(buf), "%.1f KB", bytes / kb);
else if (bytes < (long long)gb) std::snprintf(buf, sizeof(buf), "%.1f MB", bytes / mb);
else std::snprintf(buf, sizeof(buf), "%.2f GB", bytes / gb);
return buf;
}
static time_t parse_rfc3339(const std::string& s) {
if (s.size() < 19) return 0;
std::tm tm{};
tm.tm_year = std::atoi(s.substr(0, 4).c_str()) - 1900;
tm.tm_mon = std::atoi(s.substr(5, 2).c_str()) - 1;
tm.tm_mday = std::atoi(s.substr(8, 2).c_str());
tm.tm_hour = std::atoi(s.substr(11, 2).c_str());
tm.tm_min = std::atoi(s.substr(14, 2).c_str());
tm.tm_sec = std::atoi(s.substr(17, 2).c_str());
tm.tm_isdst = -1;
return std::mktime(&tm);
}
static BadgeVariant variant_for_status(const std::string& st) {
if (st == "success") return BadgeVariant::Success;
if (st == "failed") return BadgeVariant::Error;
if (st == "running") return BadgeVariant::Warning;
if (st == "cancelled") return BadgeVariant::Default;
return BadgeVariant::Default;
}
// Pick most-recent run per node from runs_all.
static const data_factory::Run* last_run_for(
const std::string& node_id,
const std::vector<data_factory::Run>& runs_all)
{
const data_factory::Run* best = nullptr;
for (auto& r : runs_all) {
if (r.node_id != node_id) continue;
if (!best || r.started_at > best->started_at) best = &r;
}
return best;
}
// ---------------------------------------------------------------------------
// Generic kind table — used by extractors / transformers / sinks
// ---------------------------------------------------------------------------
static void draw_node_table(const char* table_id,
const std::vector<data_factory::Node>& nodes,
const std::string& filter_kind,
const std::vector<data_factory::Run>& runs_all,
bool show_schedule)
{
int cols = show_schedule ? 6 : 5;
if (!ImGui::BeginTable(table_id, cols,
ImGuiTableFlags_Borders | ImGuiTableFlags_RowBg |
ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY))
{
return;
}
ImGui::TableSetupColumn("Name", ImGuiTableColumnFlags_WidthStretch, 0.20f);
ImGui::TableSetupColumn("Function", ImGuiTableColumnFlags_WidthStretch, 0.30f);
if (show_schedule)
ImGui::TableSetupColumn("Schedule", ImGuiTableColumnFlags_WidthStretch, 0.12f);
ImGui::TableSetupColumn("Last Run", ImGuiTableColumnFlags_WidthStretch, 0.18f);
ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthStretch, 0.10f);
ImGui::TableSetupColumn("Rows/KB", ImGuiTableColumnFlags_WidthStretch, 0.10f);
ImGui::TableHeadersRow();
int row_idx = 0;
for (auto& n : nodes) {
if (n.kind != filter_kind) continue;
ImGui::PushID(row_idx++);
ImGui::TableNextRow();
// Name (selectable -> sets selection)
ImGui::TableNextColumn();
bool selected = (selection().node_id == n.id);
if (ImGui::Selectable(n.name.c_str(), selected,
ImGuiSelectableFlags_SpanAllColumns))
{
selection().node_id = n.id;
// Invalidate function cache if function_id changed.
if (function_cache().function_id != n.function_id) {
function_cache() = {};
function_cache().function_id = n.function_id;
}
}
if (!n.enabled) {
ImGui::SameLine();
badge("disabled", BadgeVariant::Default);
}
// Function id
ImGui::TableNextColumn();
if (n.function_id.empty()) ImGui::TextDisabled("(none)");
else ImGui::TextUnformatted(n.function_id.c_str());
// Schedule
if (show_schedule) {
ImGui::TableNextColumn();
if (n.schedule_cron.empty()) ImGui::TextDisabled("manual");
else ImGui::TextUnformatted(n.schedule_cron.c_str());
}
// Last run
const data_factory::Run* lr = last_run_for(n.id, runs_all);
ImGui::TableNextColumn();
if (lr) ImGui::TextUnformatted(lr->started_at.c_str());
else ImGui::TextDisabled("-");
// Status badge
ImGui::TableNextColumn();
if (lr) badge(lr->status.c_str(), variant_for_status(lr->status));
else ImGui::TextDisabled("-");
// Rows / KB
ImGui::TableNextColumn();
if (lr) ImGui::Text("%lld / %lld", lr->rows_out, lr->kb_out);
else ImGui::TextDisabled("-");
ImGui::PopID();
}
ImGui::EndTable();
}
// ---------------------------------------------------------------------------
// Extractors
// ---------------------------------------------------------------------------
void draw_extractors(const std::string& /*api_url*/,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all)
{
if (!ImGui::Begin(TI_DOWNLOAD " Extractors")) {
ImGui::End();
return;
}
int count = 0;
for (auto& n : nodes) if (n.kind == "extractor") count++;
if (count == 0) {
empty_state(TI_DOWNLOAD, "No extractors",
"Register an extractor node via sqlite_api.");
ImGui::End();
return;
}
ImGui::TextDisabled("%d extractor nodes. Click row -> see Node Detail.", count);
draw_node_table("##df_extractors", nodes, "extractor", runs_all, true);
ImGui::End();
}
// ---------------------------------------------------------------------------
// Transformers
// ---------------------------------------------------------------------------
void draw_transformers(const std::string& /*api_url*/,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all)
{
if (!ImGui::Begin(TI_REFRESH " Transformers")) {
ImGui::End();
return;
}
int count = 0;
for (auto& n : nodes) if (n.kind == "transformer") count++;
if (count == 0) {
empty_state(TI_REFRESH, "No transformers",
"Register a transformer node via sqlite_api.");
ImGui::End();
return;
}
ImGui::TextDisabled("%d transformer nodes.", count);
draw_node_table("##df_transformers", nodes, "transformer", runs_all, false);
ImGui::End();
}
// ---------------------------------------------------------------------------
// Sinks
// ---------------------------------------------------------------------------
void draw_sinks(const std::string& /*api_url*/,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all)
{
if (!ImGui::Begin(TI_UPLOAD " Sinks")) {
ImGui::End();
return;
}
int count = 0;
for (auto& n : nodes) if (n.kind == "sink") count++;
if (count == 0) {
empty_state(TI_UPLOAD, "No sinks",
"Register a sink node via sqlite_api.");
ImGui::End();
return;
}
ImGui::TextDisabled("%d sink nodes.", count);
draw_node_table("##df_sinks", nodes, "sink", runs_all, false);
ImGui::End();
}
// ---------------------------------------------------------------------------
// Databases
// ---------------------------------------------------------------------------
void draw_databases(const std::string& /*api_url*/,
const std::vector<data_factory::DatabaseInfo>& dbs)
{
if (!ImGui::Begin(TI_DATABASE " Databases")) {
ImGui::End();
return;
}
if (dbs.empty()) {
empty_state(TI_DATABASE, "No databases registered",
"POST /api/datafactory/databases to register a DB.");
ImGui::End();
return;
}
if (ImGui::BeginTable("##df_databases", 6,
ImGuiTableFlags_Borders | ImGuiTableFlags_RowBg |
ImGuiTableFlags_Resizable | ImGuiTableFlags_ScrollY))
{
ImGui::TableSetupColumn("Label", ImGuiTableColumnFlags_WidthStretch, 0.18f);
ImGui::TableSetupColumn("Kind", ImGuiTableColumnFlags_WidthStretch, 0.10f);
ImGui::TableSetupColumn("URI", ImGuiTableColumnFlags_WidthStretch, 0.32f);
ImGui::TableSetupColumn("Tables", ImGuiTableColumnFlags_WidthStretch, 0.10f);
ImGui::TableSetupColumn("Size", ImGuiTableColumnFlags_WidthStretch, 0.15f);
ImGui::TableSetupColumn("Last Seen",ImGuiTableColumnFlags_WidthStretch, 0.15f);
ImGui::TableHeadersRow();
for (auto& d : dbs) {
ImGui::TableNextRow();
ImGui::TableNextColumn();
ImGui::TextUnformatted(d.label.empty() ? d.id.c_str() : d.label.c_str());
ImGui::TableNextColumn();
badge(d.kind.c_str(), BadgeVariant::Info);
ImGui::TableNextColumn();
ImGui::TextUnformatted(d.uri.c_str());
ImGui::TableNextColumn();
ImGui::Text("%lld", d.table_count);
ImGui::TableNextColumn();
ImGui::TextUnformatted(format_bytes(d.size_bytes).c_str());
ImGui::TableNextColumn();
if (d.last_seen_at.empty()) ImGui::TextDisabled("-");
else ImGui::TextUnformatted(d.last_seen_at.c_str());
}
ImGui::EndTable();
}
ImGui::End();
}
// ---------------------------------------------------------------------------
// Health
// ---------------------------------------------------------------------------
void draw_health(const std::string& /*api_url*/,
const std::vector<data_factory::Run>& runs_all)
{
if (!ImGui::Begin(TI_ACTIVITY " Health")) {
ImGui::End();
return;
}
if (runs_all.empty()) {
empty_state(TI_ACTIVITY, "No runs yet",
"Trigger a node to populate health metrics.");
ImGui::End();
return;
}
const time_t now = std::time(nullptr);
const time_t cutoff_24h = now - 86400;
int runs_24h = 0, success_24h = 0, failed_24h = 0;
int pending_total = 0;
long long rows_24h = 0, kb_24h = 0;
int success_all = 0, failed_all = 0, cancelled_all = 0;
for (auto& r : runs_all) {
if (r.status == "running" || r.status == "pending") pending_total++;
if (r.status == "success") success_all++;
if (r.status == "failed") failed_all++;
if (r.status == "cancelled") cancelled_all++;
time_t t = parse_rfc3339(r.started_at);
if (t == 0 || t < cutoff_24h) continue;
runs_24h++;
rows_24h += r.rows_out;
kb_24h += r.kb_out;
if (r.status == "success") success_24h++;
if (r.status == "failed") failed_24h++;
}
int terminal = success_all + failed_all + cancelled_all;
float success_rate = (terminal > 0)
? (100.0f * (float)success_all / (float)terminal) : 0.0f;
if (ImGui::BeginTable("##df_kpis", 4,
ImGuiTableFlags_Borders | ImGuiTableFlags_SizingStretchSame))
{
ImGui::TableNextRow();
ImGui::TableNextColumn();
ImGui::Text("%s Runs (24h)", TI_ACTIVITY);
ImGui::Text("%d", runs_24h);
ImGui::TextDisabled("success: %d", success_24h);
ImGui::TableNextColumn();
ImGui::Text("%s Success rate", TI_CHECK);
ImGui::TextColored(ImVec4(0.30f, 0.85f, 0.40f, 1), "%.1f%%", success_rate);
ImGui::TextDisabled("%d / %d terminal", success_all, terminal);
ImGui::TableNextColumn();
ImGui::Text("%s Failed (24h)", TI_ALERT_TRIANGLE);
if (failed_24h > 0)
ImGui::TextColored(ImVec4(0.95f, 0.35f, 0.30f, 1), "%d", failed_24h);
else
ImGui::Text("%d", failed_24h);
ImGui::TextDisabled("pending: %d", pending_total);
ImGui::TableNextColumn();
ImGui::Text("%s Throughput (24h)", TI_BOLT);
ImGui::Text("%lld rows", rows_24h);
ImGui::TextDisabled("%lld KB", kb_24h);
ImGui::EndTable();
}
ImGui::Separator();
ImGui::TextDisabled("Computed client-side from %zu runs in cache.", runs_all.size());
ImGui::End();
}
// ---------------------------------------------------------------------------
// Map (placeholder: flat tree by kind)
// ---------------------------------------------------------------------------
void draw_map(const std::string& /*api_url*/,
const std::vector<data_factory::Node>& nodes)
{
if (!ImGui::Begin(TI_LIST " Map")) {
ImGui::End();
return;
}
if (nodes.empty()) {
empty_state(TI_LIST, "No nodes",
"Register nodes via sqlite_api /api/datafactory/nodes.");
ImGui::End();
return;
}
// Group by kind.
std::map<std::string, std::vector<const data_factory::Node*>> by_kind;
for (auto& n : nodes) by_kind[n.kind].push_back(&n);
static const char* kind_order[] = {
"extractor", "transformer", "database", "sink", "validator"
};
static const char* kind_label[] = {
"Extractors", "Transformers", "Databases", "Sinks", "Validators"
};
const int n_kinds = sizeof(kind_order) / sizeof(kind_order[0]);
for (int i = 0; i < n_kinds; i++) {
auto it = by_kind.find(kind_order[i]);
int count = (it != by_kind.end()) ? (int)it->second.size() : 0;
char header[128];
std::snprintf(header, sizeof(header), "%s (%d)###%s_hdr",
kind_label[i], count, kind_order[i]);
if (!ImGui::TreeNodeEx(header, ImGuiTreeNodeFlags_DefaultOpen)) continue;
if (count == 0) {
ImGui::TextDisabled(" (empty)");
} else {
for (auto* p : it->second) {
ImGui::PushID(p->id.c_str());
ImGui::Bullet();
ImGui::SameLine();
bool sel = (selection().node_id == p->id);
if (ImGui::Selectable(p->name.c_str(), sel)) {
selection().node_id = p->id;
if (function_cache().function_id != p->function_id) {
function_cache() = {};
function_cache().function_id = p->function_id;
}
}
ImGui::SameLine();
if (!p->function_id.empty()) {
ImGui::TextDisabled(" -> %s", p->function_id.c_str());
}
ImGui::PopID();
}
}
ImGui::TreePop();
}
ImGui::End();
}
// ---------------------------------------------------------------------------
// Node detail panel (side)
// ---------------------------------------------------------------------------
void draw_node_detail_panel(const std::string& api_url,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all,
bool* p_open)
{
if (!ImGui::Begin(TI_INFO_CIRCLE " Node Detail", p_open)) {
ImGui::End();
return;
}
const std::string& nid = selection().node_id;
if (nid.empty()) {
empty_state(TI_INFO_CIRCLE, "Nothing selected",
"Click a row in any tab to inspect the node.");
ImGui::End();
return;
}
const data_factory::Node* node = nullptr;
for (auto& n : nodes) if (n.id == nid) { node = &n; break; }
if (!node) {
ImGui::TextDisabled("Node not in current cache: %s", nid.c_str());
ImGui::End();
return;
}
// Header
ImGui::Text("%s %s", TI_INFO_CIRCLE, node->name.c_str());
ImGui::SameLine();
badge(node->kind.c_str(), BadgeVariant::Info);
if (!node->enabled) {
ImGui::SameLine();
badge("disabled", BadgeVariant::Default);
}
ImGui::Separator();
if (!node->description.empty()) {
ImGui::TextWrapped("%s", node->description.c_str());
ImGui::Separator();
}
ImGui::Text("id: %s", node->id.c_str());
ImGui::Text("kind: %s", node->kind.c_str());
if (!node->function_id.empty()) {
ImGui::Text("function: %s", node->function_id.c_str());
}
if (!node->schedule_cron.empty()) {
ImGui::Text("schedule: %s", node->schedule_cron.c_str());
}
if (!node->tags.empty()) {
ImGui::Text("tags:");
for (auto& t : node->tags) {
ImGui::SameLine();
badge(t.c_str(), BadgeVariant::Default);
}
}
// Function metadata card (lazy load)
if (!node->function_id.empty()) {
ImGui::Separator();
auto& fc = function_cache();
if (fc.function_id != node->function_id) {
fc = {};
fc.function_id = node->function_id;
}
if (!fc.loaded && fc.error.empty()) {
if (data_factory::get_function_http(api_url, fc.function_id, fc.info)) {
fc.loaded = true;
} else {
fc.error = "Failed to fetch /api/functions/" + fc.function_id;
}
}
if (fc.loaded) {
ImGui::Text("%s Registry function", TI_BOX);
if (!fc.info.domain.empty()) {
ImGui::SameLine();
badge(fc.info.domain.c_str(), BadgeVariant::Info);
}
if (!fc.info.purity.empty()) {
ImGui::SameLine();
BadgeVariant v = (fc.info.purity == "pure")
? BadgeVariant::Success : BadgeVariant::Warning;
badge(fc.info.purity.c_str(), v);
}
if (!fc.info.lang.empty()) {
ImGui::SameLine();
badge(fc.info.lang.c_str(), BadgeVariant::Default);
}
if (!fc.info.signature.empty()) {
ImGui::TextWrapped("sig: %s", fc.info.signature.c_str());
}
if (!fc.info.description.empty()) {
ImGui::TextWrapped("%s", fc.info.description.c_str());
}
} else if (!fc.error.empty()) {
ImGui::TextColored(ImVec4(0.95f, 0.4f, 0.4f, 1), "%s", fc.error.c_str());
} else {
ImGui::TextDisabled("Loading function metadata...");
}
}
// Recent runs (top 10)
ImGui::Separator();
ImGui::Text("%s Recent runs", TI_HISTORY);
int shown = 0;
if (ImGui::BeginTable("##df_node_runs", 5,
ImGuiTableFlags_Borders | ImGuiTableFlags_RowBg))
{
ImGui::TableSetupColumn("Started", ImGuiTableColumnFlags_WidthStretch, 0.30f);
ImGui::TableSetupColumn("Status", ImGuiTableColumnFlags_WidthStretch, 0.12f);
ImGui::TableSetupColumn("Duration", ImGuiTableColumnFlags_WidthStretch, 0.13f);
ImGui::TableSetupColumn("Rows", ImGuiTableColumnFlags_WidthStretch, 0.10f);
ImGui::TableSetupColumn("Trigger", ImGuiTableColumnFlags_WidthStretch, 0.15f);
ImGui::TableHeadersRow();
for (auto& r : runs_all) {
if (r.node_id != nid) continue;
if (shown >= 10) break;
shown++;
ImGui::TableNextRow();
ImGui::TableNextColumn();
ImGui::TextUnformatted(r.started_at.c_str());
ImGui::TableNextColumn();
badge(r.status.c_str(), variant_for_status(r.status));
ImGui::TableNextColumn();
ImGui::TextUnformatted(format_duration(r.duration_ms).c_str());
ImGui::TableNextColumn();
ImGui::Text("%lld", r.rows_out);
ImGui::TableNextColumn();
ImGui::TextUnformatted(r.trigger.c_str());
}
ImGui::EndTable();
}
if (shown == 0) {
ImGui::TextDisabled("(no runs for this node yet)");
}
ImGui::End();
}
} // namespace data_factory_ui
+62
View File
@@ -0,0 +1,62 @@
#pragma once
// Tabs / panels for data_factory:
// - Map (flat tree by kind, placeholder for future graph)
// - Extractors (table of kind=extractor nodes)
// - Transformers (kind=transformer)
// - Databases (registered databases)
// - Sinks (kind=sink)
// - Health (KPIs derived from runs cache)
// - Node Detail (side panel, fn metadata + recent runs)
#include "data_http.h"
#include <string>
#include <vector>
namespace data_factory_ui {
// Cross-tab selection.
struct Selection {
std::string node_id; // "" = none
std::string run_id; // "" = none
};
Selection& selection();
// Cached function metadata for the node detail panel.
struct FunctionCache {
std::string function_id;
data_factory::FnInfo info;
bool loaded = false;
std::string error;
};
FunctionCache& function_cache();
void draw_map(const std::string& api_url,
const std::vector<data_factory::Node>& nodes);
void draw_extractors(const std::string& api_url,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all);
void draw_transformers(const std::string& api_url,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all);
void draw_databases(const std::string& api_url,
const std::vector<data_factory::DatabaseInfo>& dbs);
void draw_sinks(const std::string& api_url,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all);
void draw_health(const std::string& api_url,
const std::vector<data_factory::Run>& runs_all);
void draw_node_detail_panel(const std::string& api_url,
const std::vector<data_factory::Node>& nodes,
const std::vector<data_factory::Run>& runs_all,
bool* p_open);
} // namespace data_factory_ui
+24765
View File
File diff suppressed because it is too large Load Diff
+404
View File
@@ -0,0 +1,404 @@
#include "ws_client.h"
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <random>
#include <sstream>
#include <vector>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET sock_t;
#define SOCK_INVALID INVALID_SOCKET
#define SOCK_CLOSE closesocket
#define SOCK_ERR WSAGetLastError()
#define FN_SOCK_NONBLOCK(s) do { u_long m = 1; ioctlsocket((s), FIONBIO, &m); } while (0)
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
typedef int sock_t;
#define SOCK_INVALID (-1)
#define SOCK_CLOSE close
#define SOCK_ERR errno
#define FN_SOCK_NONBLOCK(s) do { int f = fcntl((s), F_GETFL, 0); fcntl((s), F_SETFL, f | O_NONBLOCK); } while (0)
#endif
#ifdef _WIN32
static bool wsa_init_ws() {
static bool inited = false;
static std::once_flag flag;
std::call_once(flag, []() {
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
});
return true;
}
#endif
namespace {
// ----- Base64 (small, sufficient for 16-byte WS key) -----
const char* kBase64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
std::string base64_encode(const uint8_t* in, size_t len) {
std::string out;
out.reserve(((len + 2) / 3) * 4);
size_t i = 0;
for (; i + 3 <= len; i += 3) {
uint32_t v = (uint32_t(in[i]) << 16) | (uint32_t(in[i + 1]) << 8) | uint32_t(in[i + 2]);
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(kBase64[(v >> 6) & 63]);
out.push_back(kBase64[v & 63]);
}
if (i < len) {
uint32_t v = uint32_t(in[i]) << 16;
if (i + 1 < len) v |= uint32_t(in[i + 1]) << 8;
out.push_back(kBase64[(v >> 18) & 63]);
out.push_back(kBase64[(v >> 12) & 63]);
out.push_back(i + 1 < len ? kBase64[(v >> 6) & 63] : '=');
out.push_back('=');
}
return out;
}
// Send exactly n bytes (blocking).
bool send_all(sock_t sock, const char* data, size_t n) {
size_t sent = 0;
while (sent < n) {
int k = send(sock, data + sent, static_cast<int>(n - sent), 0);
if (k <= 0) return false;
sent += k;
}
return true;
}
// Receive exactly n bytes (blocking on a non-non-blocking socket).
bool recv_all(sock_t sock, char* data, size_t n) {
size_t got = 0;
while (got < n) {
int k = recv(sock, data + got, static_cast<int>(n - got), 0);
if (k <= 0) return false;
got += k;
}
return true;
}
// Receive up to n bytes; returns count, or -1 on error / -2 on would-block.
int recv_some(sock_t sock, char* data, size_t n) {
int k = recv(sock, data, static_cast<int>(n), 0);
if (k > 0) return k;
if (k == 0) return -1;
#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK) return -2;
#else
if (errno == EAGAIN || errno == EWOULDBLOCK) return -2;
#endif
return -1;
}
} // namespace
WsClient::WsClient() = default;
WsClient::~WsClient() {
stop();
}
void WsClient::start(const std::string& host, int port, const std::string& path) {
State expected = State::Idle;
if (!state_.compare_exchange_strong(expected, State::Connecting)) return;
host_ = host;
port_ = port;
path_ = path;
stop_flag_.store(false);
worker_ = std::thread([this]() { this->run(); });
}
void WsClient::stop() {
stop_flag_.store(true);
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
out_cv_.notify_all();
if (worker_.joinable()) worker_.join();
state_.store(State::Stopped);
}
int WsClient::drain(std::vector<std::string>& out, int max) {
std::lock_guard<std::mutex> g(in_mu_);
int n = 0;
while (!in_queue_.empty() && n < max) {
out.emplace_back(std::move(in_queue_.front()));
in_queue_.pop_front();
n++;
}
return n;
}
bool WsClient::send_text(const std::string& payload) {
if (state_.load() != State::Connected) return false;
{
std::lock_guard<std::mutex> g(out_mu_);
out_queue_.push_back(payload);
}
out_cv_.notify_one();
return true;
}
void WsClient::run() {
int backoff_ms = 500;
while (!stop_flag_.load()) {
state_.store(State::Connecting);
if (connect_once()) {
backoff_ms = 500; // reset on successful connect
state_.store(State::Connected);
read_loop();
}
state_.store(State::Backoff);
if (stop_flag_.load()) break;
// Exponential backoff: 0.5s → 1s → 2s → 4s → 8s (cap).
for (int slept = 0; slept < backoff_ms && !stop_flag_.load(); slept += 100) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
backoff_ms = std::min(backoff_ms * 2, 8000);
}
state_.store(State::Stopped);
}
bool WsClient::connect_once() {
#ifdef _WIN32
wsa_init_ws();
#endif
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == SOCK_INVALID) return false;
// 5s connect timeout via SO_*TIMEO. Stays blocking afterwards for the
// handshake; read_loop switches to non-blocking with select().
#ifdef _WIN32
DWORD timeout_ms = 5000;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv));
#endif
struct sockaddr_in addr;
std::memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<uint16_t>(port_));
addr.sin_addr.s_addr = inet_addr(host_.c_str());
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
SOCK_CLOSE(sock);
return false;
}
sock_.store(static_cast<int>(sock));
if (!handshake()) {
int s = sock_.exchange(-1);
if (s != -1) SOCK_CLOSE(static_cast<sock_t>(s));
return false;
}
// Non-blocking for the read loop.
FN_SOCK_NONBLOCK(sock);
return true;
}
bool WsClient::handshake() {
sock_t sock = static_cast<sock_t>(sock_.load());
// 16 random bytes → base64 → Sec-WebSocket-Key.
uint8_t key_raw[16];
std::random_device rd;
for (auto& b : key_raw) b = static_cast<uint8_t>(rd() & 0xff);
std::string key_b64 = base64_encode(key_raw, sizeof(key_raw));
std::ostringstream req;
req << "GET " << path_ << " HTTP/1.1\r\n";
req << "Host: " << host_ << ":" << port_ << "\r\n";
req << "Upgrade: websocket\r\n";
req << "Connection: Upgrade\r\n";
req << "Sec-WebSocket-Key: " << key_b64 << "\r\n";
req << "Sec-WebSocket-Version: 13\r\n";
req << "Origin: http://" << host_ << ":" << port_ << "\r\n";
req << "\r\n";
std::string raw = req.str();
if (!send_all(sock, raw.c_str(), raw.size())) return false;
// Read response headers (up to 4KB).
std::string resp;
char buf[1024];
while (resp.find("\r\n\r\n") == std::string::npos) {
int k = recv(sock, buf, sizeof(buf), 0);
if (k <= 0) return false;
resp.append(buf, k);
if (resp.size() > 4096) return false;
}
// Expect "HTTP/1.1 101".
if (resp.compare(0, 12, "HTTP/1.1 101") != 0 &&
resp.compare(0, 12, "HTTP/1.0 101") != 0) {
fprintf(stderr, "[ws] handshake failed: %.*s\n",
(int)std::min<size_t>(resp.size(), 120), resp.c_str());
return false;
}
// We intentionally skip Sec-WebSocket-Accept verification — controlled
// server, localhost-only, 101 status is enough for this app.
return true;
}
bool WsClient::read_loop() {
sock_t sock = static_cast<sock_t>(sock_.load());
std::vector<uint8_t> rb; // accumulated read buffer
rb.reserve(64 * 1024);
while (!stop_flag_.load()) {
// Block on select() for up to 100ms so we can both read and check
// the outgoing queue.
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(sock, &rfds);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 100 * 1000;
int sel = select(static_cast<int>(sock) + 1, &rfds, nullptr, nullptr, &tv);
if (sel < 0) return false;
if (sel > 0 && FD_ISSET(sock, &rfds)) {
uint8_t tmp[8192];
int k = recv_some(sock, reinterpret_cast<char*>(tmp), sizeof(tmp));
if (k == -1) return false;
if (k > 0) rb.insert(rb.end(), tmp, tmp + k);
}
// Drain outgoing queue (text frames, masked).
for (;;) {
std::string payload;
{
std::lock_guard<std::mutex> g(out_mu_);
if (out_queue_.empty()) break;
payload = std::move(out_queue_.front());
out_queue_.pop_front();
}
if (!send_frame(0x1, payload)) return false;
}
// Parse frames. RFC6455 minimal: assume server never masks, no
// continuation, opcodes: 0x1 text, 0x8 close, 0x9 ping, 0xA pong.
while (rb.size() >= 2) {
uint8_t b0 = rb[0];
uint8_t b1 = rb[1];
bool fin = (b0 & 0x80) != 0;
(void)fin;
int opcode = b0 & 0x0F;
bool mask = (b1 & 0x80) != 0;
uint64_t len = b1 & 0x7F;
size_t pos = 2;
if (len == 126) {
if (rb.size() < pos + 2) break;
len = (uint64_t(rb[pos]) << 8) | uint64_t(rb[pos + 1]);
pos += 2;
} else if (len == 127) {
if (rb.size() < pos + 8) break;
len = 0;
for (int i = 0; i < 8; i++) len = (len << 8) | rb[pos + i];
pos += 8;
}
uint8_t mkey[4] = {0, 0, 0, 0};
if (mask) {
if (rb.size() < pos + 4) break;
for (int i = 0; i < 4; i++) mkey[i] = rb[pos + i];
pos += 4;
}
if (rb.size() < pos + len) break;
std::string payload;
payload.resize(static_cast<size_t>(len));
for (size_t i = 0; i < len; i++) {
uint8_t c = rb[pos + i];
if (mask) c ^= mkey[i & 3];
payload[i] = static_cast<char>(c);
}
rb.erase(rb.begin(), rb.begin() + pos + len);
switch (opcode) {
case 0x1: { // text
last_event_ts_.store(std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()).count());
std::lock_guard<std::mutex> g(in_mu_);
in_queue_.emplace_back(std::move(payload));
// Bound the queue. Drop oldest if too big — UI consumes
// each frame so this should only kick in if the dashboard
// is paused (window minimized, etc.).
while (in_queue_.size() > 512) in_queue_.pop_front();
break;
}
case 0x8: // close
return false;
case 0x9: // ping → reply with pong (same payload)
if (!send_frame(0xA, payload)) return false;
break;
case 0xA: // pong, ignore
break;
default:
// 0x0 continuation or unexpected opcode: bail (server
// controlled by us, shouldn't happen).
return false;
}
}
}
return true;
}
bool WsClient::send_frame(int opcode, const std::string& payload) {
sock_t sock = static_cast<sock_t>(sock_.load());
if (sock == SOCK_INVALID || static_cast<int>(sock) < 0) return false;
std::vector<uint8_t> frame;
frame.reserve(payload.size() + 16);
frame.push_back(static_cast<uint8_t>(0x80 | (opcode & 0x0F))); // FIN + opcode
uint64_t len = payload.size();
if (len < 126) {
frame.push_back(static_cast<uint8_t>(0x80 | len)); // mask bit set
} else if (len <= 0xFFFF) {
frame.push_back(static_cast<uint8_t>(0x80 | 126));
frame.push_back(static_cast<uint8_t>((len >> 8) & 0xFF));
frame.push_back(static_cast<uint8_t>(len & 0xFF));
} else {
frame.push_back(static_cast<uint8_t>(0x80 | 127));
for (int i = 7; i >= 0; i--) frame.push_back(static_cast<uint8_t>((len >> (8 * i)) & 0xFF));
}
// Mask key (RFC requires mask for client → server frames).
std::random_device rd;
uint8_t mkey[4];
for (auto& b : mkey) b = static_cast<uint8_t>(rd() & 0xff);
for (int i = 0; i < 4; i++) frame.push_back(mkey[i]);
for (size_t i = 0; i < payload.size(); i++) {
frame.push_back(static_cast<uint8_t>(payload[i]) ^ mkey[i & 3]);
}
return send_all(sock, reinterpret_cast<const char*>(frame.data()), frame.size());
}
+75
View File
@@ -0,0 +1,75 @@
#pragma once
// Minimal WebSocket client (RFC 6455, ws:// only, no TLS) tailored for the
// Monitor tab. Background thread does connect + handshake + read loop and
// pushes incoming text payloads into a thread-safe queue that the ImGui
// thread drains each frame.
//
// Issue 0086 — first consumer of WS in the C++ dashboards. If a second app
// needs WS later, extract this file to cpp/functions/network/ via
// fn-constructor.
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
class WsClient {
public:
WsClient();
~WsClient();
// Non-blocking. Spawns background thread that connects to ws://host:port/path
// and keeps the connection alive with exponential reconnect backoff.
// Safe to call multiple times — second call is a no-op once running.
void start(const std::string& host, int port, const std::string& path);
// Stop the background thread and tear down the connection.
void stop();
// True while a WS connection is up and handshake completed.
bool is_connected() const { return state_.load() == State::Connected; }
// Epoch seconds of last received frame (for the live LED in the Monitor).
long long last_event_ts() const { return last_event_ts_.load(); }
// Drain at most `max` queued text payloads. Returns the number drained.
// Called once per frame from the render thread.
int drain(std::vector<std::string>& out, int max = 64);
// Send a text frame to the server. Returns true if queued for sending.
// Used to send {"watermark": N} commands on reconnect.
bool send_text(const std::string& payload);
private:
enum class State { Idle, Connecting, Connected, Backoff, Stopped };
void run();
bool connect_once();
bool handshake();
bool read_loop();
bool send_frame(int opcode, const std::string& payload);
std::string host_;
int port_ = 0;
std::string path_;
std::atomic<State> state_{State::Idle};
std::atomic<long long> last_event_ts_{0};
std::atomic<int> sock_{-1};
std::thread worker_;
std::atomic<bool> stop_flag_{false};
std::mutex in_mu_;
std::deque<std::string> in_queue_;
std::mutex out_mu_;
std::deque<std::string> out_queue_;
// For waking the writer side of read_loop when send_text is called.
std::condition_variable out_cv_;
};