#include "data_http.h" #include "http_client.h" #include "vendor/nlohmann/json.hpp" #include #include using json = nlohmann::json; namespace dag_ui { static bool parse_url(const std::string& url, std::string& host, int& port) { auto pos = url.find("://"); std::string rest = (pos != std::string::npos) ? url.substr(pos + 3) : url; auto colon = rest.find(':'); if (colon == std::string::npos) { host = rest; port = 80; } else { host = rest.substr(0, colon); port = std::atoi(rest.substr(colon + 1).c_str()); } return !host.empty() && port > 0; } static std::string get_str(const json& j, const char* key) { if (!j.contains(key) || j[key].is_null()) return ""; if (j[key].is_string()) return j[key].get(); return j[key].dump(); } static int get_int(const json& j, const char* key) { if (!j.contains(key) || j[key].is_null()) return 0; if (j[key].is_number()) return j[key].get(); return 0; } static long long get_int64(const json& j, const char* key) { if (!j.contains(key) || j[key].is_null()) return 0; if (j[key].is_number()) return j[key].get(); return 0; } static std::vector get_str_array(const json& j, const char* key) { std::vector out; if (!j.contains(key) || !j[key].is_array()) return out; for (auto& v : j[key]) { if (v.is_string()) out.push_back(v.get()); } return out; } static void parse_run(const json& j, DagRunRow& r) { r.id = get_str(j, "id"); r.dag_name = get_str(j, "dag_name"); r.dag_path = get_str(j, "dag_path"); r.status = get_str(j, "status"); r.trigger = get_str(j, "trigger"); r.started_at = get_str(j, "started_at"); r.finished_at = get_str(j, "finished_at"); r.error = get_str(j, "error"); } static void parse_step(const json& j, DagStepRow& s) { s.id = get_str(j, "id"); s.run_id = get_str(j, "run_id"); s.step_name = get_str(j, "step_name"); s.status = get_str(j, "status"); s.exit_code = get_int(j, "exit_code"); s.stdout_text = get_str(j, "stdout"); s.stderr_text = get_str(j, "stderr"); s.started_at = get_str(j, "started_at"); s.finished_at = get_str(j, "finished_at"); s.duration_ms = get_int64(j, "duration_ms"); s.error = get_str(j, "error"); s.function_id = get_str(j, "function_id"); } static void parse_dag_info(const json& j, DagInfo& d) { d.name = get_str(j, "name"); d.description = get_str(j, "description"); d.schedule = get_str_array(j, "schedule"); d.tags = get_str_array(j, "tags"); d.type = get_str(j, "type"); d.file_path = get_str(j, "file_path"); if (j.contains("valid") && j["valid"].is_boolean()) { d.valid = j["valid"].get(); } if (j.contains("last_run") && j["last_run"].is_object()) { d.has_last_run = true; DagRunRow lr; parse_run(j["last_run"], lr); d.last_run_id = lr.id; d.last_run_status = lr.status; d.last_run_started_at = lr.started_at; d.last_run_finished_at = lr.finished_at; } if (j.contains("last_runs") && j["last_runs"].is_array()) { for (auto& r : j["last_runs"]) { d.last_runs_status.push_back(get_str(r, "status")); } } } bool list_dags_http(const std::string& api_url, std::vector& out) { std::string host; int port; if (!parse_url(api_url, host, port)) { fprintf(stderr, "[dag_http] invalid URL: %s\n", api_url.c_str()); return false; } HttpClient cli(host, port); auto res = cli.get("/api/dags"); if (!res.ok()) { fprintf(stderr, "[dag_http] list_dags failed: status=%d\n", res.status); return false; } auto j = json::parse(res.body, nullptr, false); if (!j.is_array()) { fprintf(stderr, "[dag_http] list_dags: expected array\n"); return false; } out.clear(); out.reserve(j.size()); for (auto& item : j) { DagInfo d; parse_dag_info(item, d); out.push_back(std::move(d)); } return true; } bool get_dag_http(const std::string& api_url, const std::string& name, DagDetail& out) { std::string host; int port; if (!parse_url(api_url, host, port)) return false; HttpClient cli(host, port); auto res = cli.get("/api/dags/" + name); if (!res.ok()) { fprintf(stderr, "[dag_http] get_dag(%s) failed: status=%d\n", name.c_str(), res.status); return false; } auto j = json::parse(res.body, nullptr, false); if (!j.is_object()) return false; if (j.contains("dag") && j["dag"].is_object()) { parse_dag_info(j["dag"], out.info); } else { parse_dag_info(j, out.info); } if (j.contains("recent_runs") && j["recent_runs"].is_array()) { for (auto& r : j["recent_runs"]) { DagRunRow row; parse_run(r, row); out.recent_runs.push_back(std::move(row)); } } if (j.contains("validation") && j["validation"].is_object()) { auto& v = j["validation"]; if (v.contains("valid") && v["valid"].is_boolean() && !v["valid"].get()) { out.validation_error = get_str(v, "error"); } } return true; } bool list_runs_http(const std::string& api_url, const std::string& dag_name, int limit, std::vector& out) { std::string host; int port; if (!parse_url(api_url, host, port)) return false; HttpClient cli(host, port); std::string path = "/api/runs?limit=" + std::to_string(limit); if (!dag_name.empty()) path += "&dag=" + dag_name; auto res = cli.get(path); if (!res.ok()) { fprintf(stderr, "[dag_http] list_runs failed: status=%d\n", res.status); return false; } auto j = json::parse(res.body, nullptr, false); if (!j.is_object() || !j.contains("runs") || !j["runs"].is_array()) { return false; } out.clear(); for (auto& r : j["runs"]) { DagRunRow row; parse_run(r, row); out.push_back(std::move(row)); } return true; } bool get_run_http(const std::string& api_url, const std::string& run_id, DagRunDetail& out) { std::string host; int port; if (!parse_url(api_url, host, port)) return false; HttpClient cli(host, port); auto res = cli.get("/api/runs/" + run_id); if (!res.ok()) { fprintf(stderr, "[dag_http] get_run(%s) failed: status=%d\n", run_id.c_str(), res.status); return false; } auto j = json::parse(res.body, nullptr, false); if (!j.is_object()) return false; if (j.contains("run") && j["run"].is_object()) { parse_run(j["run"], out.run); } if (j.contains("steps") && j["steps"].is_array()) { for (auto& s : j["steps"]) { DagStepRow row; parse_step(s, row); out.steps.push_back(std::move(row)); } } return true; } bool trigger_dag_http(const std::string& api_url, const std::string& name, std::string& out_run_id, std::string& out_error) { std::string host; int port; if (!parse_url(api_url, host, port)) { out_error = "invalid URL"; return false; } HttpClient cli(host, port); auto res = cli.post("/api/dags/" + name + "/run", "{}", "application/json"); if (!res.ok() && res.status != 202) { out_error = "HTTP " + std::to_string(res.status) + ": " + res.body; return false; } auto j = json::parse(res.body, nullptr, false); if (j.is_object()) { out_run_id = get_str(j, "run_id"); if (out_run_id.empty()) { out_run_id = get_str(j, "id"); } } return true; } bool get_function_http(const std::string& api_url, const std::string& function_id, FnInfo& out) { std::string host; int port; if (!parse_url(api_url, host, port)) return false; if (function_id.empty()) return false; HttpClient cli(host, port); auto res = cli.get("/api/functions/" + function_id); if (!res.ok()) { fprintf(stderr, "[dag_http] get_function(%s) failed: status=%d\n", function_id.c_str(), res.status); return false; } auto j = json::parse(res.body, nullptr, false); if (!j.is_object()) return false; out.id = get_str(j, "id"); out.name = get_str(j, "name"); out.description = get_str(j, "description"); out.signature = get_str(j, "signature"); out.purity = get_str(j, "purity"); out.domain = get_str(j, "domain"); out.lang = get_str(j, "lang"); out.uses_functions = get_str_array(j, "uses_functions"); out.uses_types = get_str_array(j, "uses_types"); return true; } } // namespace dag_ui