4abc3f97ec
- CMakeLists.txt - app.md - data_http.cpp - data_http.h - main.cpp - tabs.cpp - tabs.h - appicon.ico Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
279 lines
8.9 KiB
C++
279 lines
8.9 KiB
C++
#include "data_http.h"
|
|
#include "http_client.h"
|
|
#include "vendor/nlohmann/json.hpp"
|
|
|
|
#include <cstdio>
|
|
#include <cstdlib>
|
|
|
|
using json = nlohmann::json;
|
|
|
|
namespace dag_ui {
|
|
|
|
static bool parse_url(const std::string& url, std::string& host, int& port) {
|
|
auto pos = url.find("://");
|
|
std::string rest = (pos != std::string::npos) ? url.substr(pos + 3) : url;
|
|
auto colon = rest.find(':');
|
|
if (colon == std::string::npos) {
|
|
host = rest;
|
|
port = 80;
|
|
} else {
|
|
host = rest.substr(0, colon);
|
|
port = std::atoi(rest.substr(colon + 1).c_str());
|
|
}
|
|
return !host.empty() && port > 0;
|
|
}
|
|
|
|
static std::string get_str(const json& j, const char* key) {
|
|
if (!j.contains(key) || j[key].is_null()) return "";
|
|
if (j[key].is_string()) return j[key].get<std::string>();
|
|
return j[key].dump();
|
|
}
|
|
|
|
static int get_int(const json& j, const char* key) {
|
|
if (!j.contains(key) || j[key].is_null()) return 0;
|
|
if (j[key].is_number()) return j[key].get<int>();
|
|
return 0;
|
|
}
|
|
|
|
static long long get_int64(const json& j, const char* key) {
|
|
if (!j.contains(key) || j[key].is_null()) return 0;
|
|
if (j[key].is_number()) return j[key].get<long long>();
|
|
return 0;
|
|
}
|
|
|
|
static std::vector<std::string> get_str_array(const json& j, const char* key) {
|
|
std::vector<std::string> out;
|
|
if (!j.contains(key) || !j[key].is_array()) return out;
|
|
for (auto& v : j[key]) {
|
|
if (v.is_string()) out.push_back(v.get<std::string>());
|
|
}
|
|
return out;
|
|
}
|
|
|
|
static void parse_run(const json& j, DagRunRow& r) {
|
|
r.id = get_str(j, "id");
|
|
r.dag_name = get_str(j, "dag_name");
|
|
r.dag_path = get_str(j, "dag_path");
|
|
r.status = get_str(j, "status");
|
|
r.trigger = get_str(j, "trigger");
|
|
r.started_at = get_str(j, "started_at");
|
|
r.finished_at = get_str(j, "finished_at");
|
|
r.error = get_str(j, "error");
|
|
}
|
|
|
|
static void parse_step(const json& j, DagStepRow& s) {
|
|
s.id = get_str(j, "id");
|
|
s.run_id = get_str(j, "run_id");
|
|
s.step_name = get_str(j, "step_name");
|
|
s.status = get_str(j, "status");
|
|
s.exit_code = get_int(j, "exit_code");
|
|
s.stdout_text = get_str(j, "stdout");
|
|
s.stderr_text = get_str(j, "stderr");
|
|
s.started_at = get_str(j, "started_at");
|
|
s.finished_at = get_str(j, "finished_at");
|
|
s.duration_ms = get_int64(j, "duration_ms");
|
|
s.error = get_str(j, "error");
|
|
s.function_id = get_str(j, "function_id");
|
|
}
|
|
|
|
static void parse_dag_info(const json& j, DagInfo& d) {
|
|
d.name = get_str(j, "name");
|
|
d.description = get_str(j, "description");
|
|
d.schedule = get_str_array(j, "schedule");
|
|
d.tags = get_str_array(j, "tags");
|
|
d.type = get_str(j, "type");
|
|
d.file_path = get_str(j, "file_path");
|
|
if (j.contains("valid") && j["valid"].is_boolean()) {
|
|
d.valid = j["valid"].get<bool>();
|
|
}
|
|
if (j.contains("last_run") && j["last_run"].is_object()) {
|
|
d.has_last_run = true;
|
|
DagRunRow lr;
|
|
parse_run(j["last_run"], lr);
|
|
d.last_run_id = lr.id;
|
|
d.last_run_status = lr.status;
|
|
d.last_run_started_at = lr.started_at;
|
|
d.last_run_finished_at = lr.finished_at;
|
|
}
|
|
if (j.contains("last_runs") && j["last_runs"].is_array()) {
|
|
for (auto& r : j["last_runs"]) {
|
|
d.last_runs_status.push_back(get_str(r, "status"));
|
|
}
|
|
}
|
|
}
|
|
|
|
bool list_dags_http(const std::string& api_url, std::vector<DagInfo>& out) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) {
|
|
fprintf(stderr, "[dag_http] invalid URL: %s\n", api_url.c_str());
|
|
return false;
|
|
}
|
|
HttpClient cli(host, port);
|
|
auto res = cli.get("/api/dags");
|
|
if (!res.ok()) {
|
|
fprintf(stderr, "[dag_http] list_dags failed: status=%d\n", res.status);
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (!j.is_array()) {
|
|
fprintf(stderr, "[dag_http] list_dags: expected array\n");
|
|
return false;
|
|
}
|
|
out.clear();
|
|
out.reserve(j.size());
|
|
for (auto& item : j) {
|
|
DagInfo d;
|
|
parse_dag_info(item, d);
|
|
out.push_back(std::move(d));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool get_dag_http(const std::string& api_url, const std::string& name,
|
|
DagDetail& out) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) return false;
|
|
HttpClient cli(host, port);
|
|
auto res = cli.get("/api/dags/" + name);
|
|
if (!res.ok()) {
|
|
fprintf(stderr, "[dag_http] get_dag(%s) failed: status=%d\n",
|
|
name.c_str(), res.status);
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (!j.is_object()) return false;
|
|
|
|
if (j.contains("dag") && j["dag"].is_object()) {
|
|
parse_dag_info(j["dag"], out.info);
|
|
} else {
|
|
parse_dag_info(j, out.info);
|
|
}
|
|
if (j.contains("recent_runs") && j["recent_runs"].is_array()) {
|
|
for (auto& r : j["recent_runs"]) {
|
|
DagRunRow row;
|
|
parse_run(r, row);
|
|
out.recent_runs.push_back(std::move(row));
|
|
}
|
|
}
|
|
if (j.contains("validation") && j["validation"].is_object()) {
|
|
auto& v = j["validation"];
|
|
if (v.contains("valid") && v["valid"].is_boolean() && !v["valid"].get<bool>()) {
|
|
out.validation_error = get_str(v, "error");
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool list_runs_http(const std::string& api_url, const std::string& dag_name,
|
|
int limit, std::vector<DagRunRow>& out) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) return false;
|
|
HttpClient cli(host, port);
|
|
|
|
std::string path = "/api/runs?limit=" + std::to_string(limit);
|
|
if (!dag_name.empty()) path += "&dag=" + dag_name;
|
|
|
|
auto res = cli.get(path);
|
|
if (!res.ok()) {
|
|
fprintf(stderr, "[dag_http] list_runs failed: status=%d\n", res.status);
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (!j.is_object() || !j.contains("runs") || !j["runs"].is_array()) {
|
|
return false;
|
|
}
|
|
out.clear();
|
|
for (auto& r : j["runs"]) {
|
|
DagRunRow row;
|
|
parse_run(r, row);
|
|
out.push_back(std::move(row));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool get_run_http(const std::string& api_url, const std::string& run_id,
|
|
DagRunDetail& out) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) return false;
|
|
HttpClient cli(host, port);
|
|
auto res = cli.get("/api/runs/" + run_id);
|
|
if (!res.ok()) {
|
|
fprintf(stderr, "[dag_http] get_run(%s) failed: status=%d\n",
|
|
run_id.c_str(), res.status);
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (!j.is_object()) return false;
|
|
|
|
if (j.contains("run") && j["run"].is_object()) {
|
|
parse_run(j["run"], out.run);
|
|
}
|
|
if (j.contains("steps") && j["steps"].is_array()) {
|
|
for (auto& s : j["steps"]) {
|
|
DagStepRow row;
|
|
parse_step(s, row);
|
|
out.steps.push_back(std::move(row));
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool trigger_dag_http(const std::string& api_url, const std::string& name,
|
|
std::string& out_run_id, std::string& out_error) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) {
|
|
out_error = "invalid URL";
|
|
return false;
|
|
}
|
|
HttpClient cli(host, port);
|
|
auto res = cli.post("/api/dags/" + name + "/run", "{}", "application/json");
|
|
if (!res.ok() && res.status != 202) {
|
|
out_error = "HTTP " + std::to_string(res.status) + ": " + res.body;
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (j.is_object()) {
|
|
out_run_id = get_str(j, "run_id");
|
|
if (out_run_id.empty()) {
|
|
out_run_id = get_str(j, "id");
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool get_function_http(const std::string& api_url,
|
|
const std::string& function_id,
|
|
FnInfo& out) {
|
|
std::string host;
|
|
int port;
|
|
if (!parse_url(api_url, host, port)) return false;
|
|
if (function_id.empty()) return false;
|
|
HttpClient cli(host, port);
|
|
auto res = cli.get("/api/functions/" + function_id);
|
|
if (!res.ok()) {
|
|
fprintf(stderr, "[dag_http] get_function(%s) failed: status=%d\n",
|
|
function_id.c_str(), res.status);
|
|
return false;
|
|
}
|
|
auto j = json::parse(res.body, nullptr, false);
|
|
if (!j.is_object()) return false;
|
|
|
|
out.id = get_str(j, "id");
|
|
out.name = get_str(j, "name");
|
|
out.description = get_str(j, "description");
|
|
out.signature = get_str(j, "signature");
|
|
out.purity = get_str(j, "purity");
|
|
out.domain = get_str(j, "domain");
|
|
out.lang = get_str(j, "lang");
|
|
out.uses_functions = get_str_array(j, "uses_functions");
|
|
out.uses_types = get_str_array(j, "uses_types");
|
|
return true;
|
|
}
|
|
|
|
} // namespace dag_ui
|