feat: cliente HTTP REST + main panel demo (issue 0095 step 3)

- http_client.{h,cpp}: TCP plain (copia de registry_dashboard, sin SSL).
- data_http.{h,cpp}: API DAG (list/get/runs/trigger) + parser nlohmann/json.
- vendor/nlohmann/json.hpp: vendored.
- main.cpp: panel "Fetch /api/dags" demo, lista DAGs con schedule.
- CMakeLists.txt: anade http_client.cpp + data_http.cpp.

Build verificado. WS y tabs (data_table::render) en commits siguientes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-15 16:44:40 +02:00
parent 334943b7db
commit 44026d0a70
7 changed files with 25327 additions and 2 deletions
+2
View File
@@ -1,5 +1,7 @@
add_imgui_app(dag_engine_ui
main.cpp
http_client.cpp
data_http.cpp
)
target_include_directories(dag_engine_ui PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
+243
View File
@@ -0,0 +1,243 @@
#include "data_http.h"
#include "http_client.h"
#include "vendor/nlohmann/json.hpp"
#include <cstdio>
#include <cstdlib>
using json = nlohmann::json;
namespace dag_ui {
static bool parse_url(const std::string& url, std::string& host, int& port) {
auto pos = url.find("://");
std::string rest = (pos != std::string::npos) ? url.substr(pos + 3) : url;
auto colon = rest.find(':');
if (colon == std::string::npos) {
host = rest;
port = 80;
} else {
host = rest.substr(0, colon);
port = std::atoi(rest.substr(colon + 1).c_str());
}
return !host.empty() && port > 0;
}
static std::string get_str(const json& j, const char* key) {
if (!j.contains(key) || j[key].is_null()) return "";
if (j[key].is_string()) return j[key].get<std::string>();
return j[key].dump();
}
static int get_int(const json& j, const char* key) {
if (!j.contains(key) || j[key].is_null()) return 0;
if (j[key].is_number()) return j[key].get<int>();
return 0;
}
static long long get_int64(const json& j, const char* key) {
if (!j.contains(key) || j[key].is_null()) return 0;
if (j[key].is_number()) return j[key].get<long long>();
return 0;
}
static std::vector<std::string> get_str_array(const json& j, const char* key) {
std::vector<std::string> out;
if (!j.contains(key) || !j[key].is_array()) return out;
for (auto& v : j[key]) {
if (v.is_string()) out.push_back(v.get<std::string>());
}
return out;
}
static void parse_run(const json& j, DagRunRow& r) {
r.id = get_str(j, "id");
r.dag_name = get_str(j, "dag_name");
r.dag_path = get_str(j, "dag_path");
r.status = get_str(j, "status");
r.trigger = get_str(j, "trigger");
r.started_at = get_str(j, "started_at");
r.finished_at = get_str(j, "finished_at");
r.error = get_str(j, "error");
}
static void parse_step(const json& j, DagStepRow& s) {
s.id = get_str(j, "id");
s.run_id = get_str(j, "run_id");
s.step_name = get_str(j, "step_name");
s.status = get_str(j, "status");
s.exit_code = get_int(j, "exit_code");
s.stdout_text = get_str(j, "stdout");
s.stderr_text = get_str(j, "stderr");
s.started_at = get_str(j, "started_at");
s.finished_at = get_str(j, "finished_at");
s.duration_ms = get_int64(j, "duration_ms");
s.error = get_str(j, "error");
}
static void parse_dag_info(const json& j, DagInfo& d) {
d.name = get_str(j, "name");
d.description = get_str(j, "description");
d.schedule = get_str_array(j, "schedule");
d.tags = get_str_array(j, "tags");
d.type = get_str(j, "type");
d.file_path = get_str(j, "file_path");
if (j.contains("valid") && j["valid"].is_boolean()) {
d.valid = j["valid"].get<bool>();
}
if (j.contains("last_run") && j["last_run"].is_object()) {
d.has_last_run = true;
DagRunRow lr;
parse_run(j["last_run"], lr);
d.last_run_id = lr.id;
d.last_run_status = lr.status;
d.last_run_started_at = lr.started_at;
d.last_run_finished_at = lr.finished_at;
}
}
bool list_dags_http(const std::string& api_url, std::vector<DagInfo>& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) {
fprintf(stderr, "[dag_http] invalid URL: %s\n", api_url.c_str());
return false;
}
HttpClient cli(host, port);
auto res = cli.get("/api/dags");
if (!res.ok()) {
fprintf(stderr, "[dag_http] list_dags failed: status=%d\n", res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_array()) {
fprintf(stderr, "[dag_http] list_dags: expected array\n");
return false;
}
out.clear();
out.reserve(j.size());
for (auto& item : j) {
DagInfo d;
parse_dag_info(item, d);
out.push_back(std::move(d));
}
return true;
}
bool get_dag_http(const std::string& api_url, const std::string& name,
DagDetail& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
auto res = cli.get("/api/dags/" + name);
if (!res.ok()) {
fprintf(stderr, "[dag_http] get_dag(%s) failed: status=%d\n",
name.c_str(), res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object()) return false;
if (j.contains("dag") && j["dag"].is_object()) {
parse_dag_info(j["dag"], out.info);
} else {
parse_dag_info(j, out.info);
}
if (j.contains("recent_runs") && j["recent_runs"].is_array()) {
for (auto& r : j["recent_runs"]) {
DagRunRow row;
parse_run(r, row);
out.recent_runs.push_back(std::move(row));
}
}
if (j.contains("validation") && j["validation"].is_object()) {
auto& v = j["validation"];
if (v.contains("valid") && v["valid"].is_boolean() && !v["valid"].get<bool>()) {
out.validation_error = get_str(v, "error");
}
}
return true;
}
bool list_runs_http(const std::string& api_url, const std::string& dag_name,
int limit, std::vector<DagRunRow>& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
std::string path = "/api/runs?limit=" + std::to_string(limit);
if (!dag_name.empty()) path += "&dag=" + dag_name;
auto res = cli.get(path);
if (!res.ok()) {
fprintf(stderr, "[dag_http] list_runs failed: status=%d\n", res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object() || !j.contains("runs") || !j["runs"].is_array()) {
return false;
}
out.clear();
for (auto& r : j["runs"]) {
DagRunRow row;
parse_run(r, row);
out.push_back(std::move(row));
}
return true;
}
bool get_run_http(const std::string& api_url, const std::string& run_id,
DagRunDetail& out) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) return false;
HttpClient cli(host, port);
auto res = cli.get("/api/runs/" + run_id);
if (!res.ok()) {
fprintf(stderr, "[dag_http] get_run(%s) failed: status=%d\n",
run_id.c_str(), res.status);
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (!j.is_object()) return false;
if (j.contains("run") && j["run"].is_object()) {
parse_run(j["run"], out.run);
}
if (j.contains("steps") && j["steps"].is_array()) {
for (auto& s : j["steps"]) {
DagStepRow row;
parse_step(s, row);
out.steps.push_back(std::move(row));
}
}
return true;
}
bool trigger_dag_http(const std::string& api_url, const std::string& name,
std::string& out_run_id, std::string& out_error) {
std::string host;
int port;
if (!parse_url(api_url, host, port)) {
out_error = "invalid URL";
return false;
}
HttpClient cli(host, port);
auto res = cli.post("/api/dags/" + name + "/run", "{}", "application/json");
if (!res.ok() && res.status != 202) {
out_error = "HTTP " + std::to_string(res.status) + ": " + res.body;
return false;
}
auto j = json::parse(res.body, nullptr, false);
if (j.is_object()) {
out_run_id = get_str(j, "run_id");
if (out_run_id.empty()) {
out_run_id = get_str(j, "id");
}
}
return true;
}
} // namespace dag_ui
+84
View File
@@ -0,0 +1,84 @@
#pragma once
// Cliente HTTP REST contra apps/dag_engine (Go backend).
// Endpoints: /api/dags, /api/runs, /api/dags/{name}/run (issue 0095).
#include <string>
#include <vector>
namespace dag_ui {
// --- Modelo de datos (mirror JSON shape de apps/dag_engine) ---
struct DagInfo {
std::string name;
std::string description;
std::vector<std::string> schedule; // cron expressions
std::vector<std::string> tags;
std::string type;
std::string file_path;
bool valid = false;
// last_run inlined fields (extracted from nested object on load)
bool has_last_run = false;
std::string last_run_id;
std::string last_run_status;
std::string last_run_started_at;
std::string last_run_finished_at;
};
struct DagRunRow {
std::string id;
std::string dag_name;
std::string dag_path;
std::string status; // pending|running|success|failed|cancelled
std::string trigger; // manual|cron|api
std::string started_at;
std::string finished_at;
std::string error;
};
struct DagStepRow {
std::string id;
std::string run_id;
std::string step_name;
std::string status;
int exit_code = 0;
std::string stdout_text;
std::string stderr_text;
std::string started_at;
std::string finished_at;
long long duration_ms = 0;
std::string error;
};
struct DagRunDetail {
DagRunRow run;
std::vector<DagStepRow> steps;
};
struct DagDetail {
DagInfo info;
std::vector<DagRunRow> recent_runs;
std::string validation_error; // empty if valid
};
// --- API ---
// api_url: "http://127.0.0.1:8090". Todas las funciones blocking, devuelven
// false si la red falla o el status no es 2xx. Errores van a stderr.
bool list_dags_http(const std::string& api_url, std::vector<DagInfo>& out);
bool get_dag_http(const std::string& api_url, const std::string& name,
DagDetail& out);
bool list_runs_http(const std::string& api_url, const std::string& dag_name,
int limit, std::vector<DagRunRow>& out);
bool get_run_http(const std::string& api_url, const std::string& run_id,
DagRunDetail& out);
// Devuelve el run_id en out_run_id si HTTP 202.
bool trigger_dag_http(const std::string& api_url, const std::string& name,
std::string& out_run_id, std::string& out_error);
} // namespace dag_ui
+173
View File
@@ -0,0 +1,173 @@
#include "http_client.h"
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <sstream>
#include <vector>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#include <mutex>
#pragma comment(lib, "ws2_32.lib")
// std::call_once para evitar race condition si hay peticiones simultaneas
// desde multiples threads (main + runners).
static std::once_flag g_wsa_once;
static bool g_wsa_ok = false;
static bool wsa_init() {
std::call_once(g_wsa_once, []() {
WSADATA wsa;
g_wsa_ok = (WSAStartup(MAKEWORD(2, 2), &wsa) == 0);
});
return g_wsa_ok;
}
typedef SOCKET sock_t;
#define SOCK_INVALID INVALID_SOCKET
#define SOCK_CLOSE closesocket
#define SOCK_ERR WSAGetLastError()
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netdb.h>
#include <errno.h>
typedef int sock_t;
#define SOCK_INVALID (-1)
#define SOCK_CLOSE close
#define SOCK_ERR errno
#endif
HttpClient::HttpClient(const std::string& host, int port)
: host_(host), port_(port) {}
HttpResponse HttpClient::get(const std::string& path) {
return request("GET", path, "", "");
}
HttpResponse HttpClient::post(const std::string& path, const std::string& body,
const std::string& content_type) {
return request("POST", path, body, content_type);
}
HttpResponse HttpClient::request(const std::string& method, const std::string& path,
const std::string& body, const std::string& content_type) {
HttpResponse resp;
#ifdef _WIN32
if (!wsa_init()) {
fprintf(stderr, "[http] WSAStartup failed\n");
return resp;
}
#endif
sock_t sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sock == SOCK_INVALID) {
fprintf(stderr, "[http] socket() failed: %d\n", SOCK_ERR);
return resp;
}
// Timeout — Windows y POSIX usan formatos distintos para SO_{RCV,SND}TIMEO.
// Windows: DWORD milisegundos. POSIX: struct timeval.
#ifdef _WIN32
DWORD timeout_ms = static_cast<DWORD>(timeout_sec_ * 1000);
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_ms, sizeof(timeout_ms));
#else
struct timeval tv;
tv.tv_sec = timeout_sec_;
tv.tv_usec = 0;
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (const char*)&tv, sizeof(tv));
#endif
// Connect
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<uint16_t>(port_));
addr.sin_addr.s_addr = inet_addr(host_.c_str());
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) {
int err = SOCK_ERR;
SOCK_CLOSE(sock);
// Reportamos el errno/WSAError en el body para que el toast sea util.
char buf[128];
std::snprintf(buf, sizeof(buf),
"connect() failed to %s:%d (err=%d)",
host_.c_str(), port_, err);
resp.body = buf;
return resp;
}
// Build request
std::ostringstream req;
req << method << " " << path << " HTTP/1.1\r\n";
req << "Host: " << host_ << ":" << port_ << "\r\n";
req << "Connection: close\r\n";
if (!body.empty()) {
req << "Content-Type: " << content_type << "\r\n";
req << "Content-Length: " << body.size() << "\r\n";
}
req << "\r\n";
if (!body.empty()) req << body;
std::string raw_req = req.str();
send(sock, raw_req.c_str(), static_cast<int>(raw_req.size()), 0);
// Read response
std::vector<char> buf(8192);
std::string raw;
for (;;) {
int n = recv(sock, buf.data(), static_cast<int>(buf.size()), 0);
if (n <= 0) break;
raw.append(buf.data(), n);
}
SOCK_CLOSE(sock);
// Parse status line
auto hdr_end = raw.find("\r\n\r\n");
if (hdr_end == std::string::npos) return resp;
// "HTTP/1.1 200 OK\r\n..."
auto first_line_end = raw.find("\r\n");
if (first_line_end == std::string::npos) return resp;
std::string status_line = raw.substr(0, first_line_end);
auto sp1 = status_line.find(' ');
if (sp1 != std::string::npos) {
resp.status = std::atoi(status_line.c_str() + sp1 + 1);
}
resp.body = raw.substr(hdr_end + 4);
// Handle chunked transfer encoding
std::string headers_str = raw.substr(0, hdr_end);
if (headers_str.find("chunked") != std::string::npos) {
// Decode chunked body
std::string decoded;
const char* p = resp.body.c_str();
const char* end = p + resp.body.size();
while (p < end) {
// Read chunk size (hex)
char* chunk_end = nullptr;
long chunk_size = strtol(p, &chunk_end, 16);
if (chunk_size <= 0) break;
// Skip \r\n after size
p = chunk_end;
if (p < end && *p == '\r') p++;
if (p < end && *p == '\n') p++;
// Read chunk data
if (p + chunk_size <= end) {
decoded.append(p, chunk_size);
}
p += chunk_size;
// Skip trailing \r\n
if (p < end && *p == '\r') p++;
if (p < end && *p == '\n') p++;
}
resp.body = decoded;
}
return resp;
}
+30
View File
@@ -0,0 +1,30 @@
#pragma once
// Minimal HTTP client — no threading, no SSL, just plain TCP to localhost.
// Works with both win32 and posix MinGW thread models.
#include <string>
struct HttpResponse {
int status = 0;
std::string body;
bool ok() const { return status >= 200 && status < 300; }
};
// Simple blocking HTTP GET/POST over TCP sockets.
// host: "127.0.0.1", port: 8484
class HttpClient {
public:
HttpClient(const std::string& host, int port);
HttpResponse get(const std::string& path);
HttpResponse post(const std::string& path, const std::string& body,
const std::string& content_type = "application/json");
private:
std::string host_;
int port_;
int timeout_sec_ = 5;
HttpResponse request(const std::string& method, const std::string& path,
const std::string& body, const std::string& content_type);
};
+30 -2
View File
@@ -3,7 +3,17 @@
#include "core/panel_menu.h"
#include "core/icons_tabler.h"
#include "core/logger.h"
// #include "viz/data_table.h" // uncomment to enable data_table::render() panel
#include "data_http.h"
#include <string>
#include <vector>
// Config global del backend HTTP. Persistido por imgui.ini via Mantine-no-go.
static std::string g_api_url = "http://127.0.0.1:8090";
// Cache en memoria del primer fetch. Tabs proximos updates via WS.
static std::vector<dag_ui::DagInfo> g_dags;
static std::string g_last_error;
// Toggles de paneles (visibles desde el menu View del menubar canonico)
static bool g_show_main = true;
@@ -13,7 +23,25 @@ static void draw_main() {
ImGui::End();
return;
}
ImGui::TextUnformatted("Hello from dag_engine_ui");
ImGui::Text("API: %s", g_api_url.c_str());
if (ImGui::Button("Fetch /api/dags")) {
g_dags.clear();
g_last_error.clear();
if (!dag_ui::list_dags_http(g_api_url, g_dags)) {
g_last_error = "list_dags_http failed (see stderr)";
}
}
if (!g_last_error.empty()) {
ImGui::TextColored(ImVec4(1, 0.4f, 0.4f, 1), "%s", g_last_error.c_str());
}
ImGui::Separator();
ImGui::Text("DAGs: %zu", g_dags.size());
for (auto& d : g_dags) {
ImGui::BulletText("%s (%s) schedule=%s",
d.name.c_str(),
d.valid ? "valid" : "invalid",
d.schedule.empty() ? "none" : d.schedule[0].c_str());
}
ImGui::End();
}
+24765
View File
File diff suppressed because it is too large Load Diff