Files
fn_registry/cpp/tests/test_compute_pipeline.cpp
T
egutierrez a03675113a chore: auto-commit (286 archivos)
- .claude/agents/fn-orquestador/SKILL.md
- .claude/commands/fn_claude.md
- .claude/rules/INDEX.md
- .claude/rules/cpp_apps.md
- .claude/rules/ids_naming.md
- CHANGELOG.md
- apps/dag_engine/README.md
- apps/dag_engine/api.go
- apps/dag_engine/dags_migrated/example.yaml
- apps/dag_engine/dags_migrated/example_lineage_tracking.yaml
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:33:22 +02:00

169 lines
5.9 KiB
C++

// Tests for data_table::compute_pipeline (cpp/functions/core/compute_pipeline.cpp).
// Pure logic: no ImGui, no I/O. Exercises stage chaining.
#define CATCH_CONFIG_MAIN
#include "catch_amalgamated.hpp"
#include "core/compute_pipeline.h"
#include <string>
#include <vector>
using namespace data_table;
// ---------------------------------------------------------------------------
// Helper
// ---------------------------------------------------------------------------
namespace {
struct Table {
std::vector<std::string> backing;
std::vector<const char*> cells;
std::vector<std::string> headers;
std::vector<ColumnType> types;
int rows = 0, cols = 0;
void add_row(std::initializer_list<const char*> row) {
for (const char* s : row) backing.emplace_back(s ? s : "");
++rows;
}
void finalize() {
cols = headers.empty() ? 0 : (int)headers.size();
cells.clear();
cells.reserve(backing.size());
for (const auto& s : backing) cells.push_back(s.c_str());
}
};
} // anon
// ---------------------------------------------------------------------------
// Test: empty stages returns passthrough
// ---------------------------------------------------------------------------
TEST_CASE("compute_pipeline empty stages returns passthrough") {
Table t;
t.headers = {"x", "y"};
t.types = {ColumnType::Int, ColumnType::Int};
t.add_row({"1", "2"});
t.add_row({"3", "4"});
t.finalize();
StageOutput out = compute_pipeline(t.cells.data(), t.rows, t.cols,
t.headers, t.types, {});
REQUIRE(out.rows == 2);
REQUIRE(out.cols == 2);
}
// ---------------------------------------------------------------------------
// Test: single stage pipeline equals compute_stage directly
// ---------------------------------------------------------------------------
TEST_CASE("compute_pipeline single stage equals compute_stage") {
Table t;
t.headers = {"dept", "amount"};
t.types = {ColumnType::String, ColumnType::Float};
t.add_row({"eng", "100"});
t.add_row({"mktg", "200"});
t.add_row({"eng", "150"});
t.finalize();
Stage stage;
Filter f; f.col = 0; f.op = Op::Eq; f.value = "eng";
stage.filters.push_back(f);
StageOutput direct = compute_stage(t.cells.data(), t.rows, t.cols,
t.headers, t.types, stage);
StageOutput via_pipe = compute_pipeline(t.cells.data(), t.rows, t.cols,
t.headers, t.types, {stage});
REQUIRE(direct.rows == via_pipe.rows);
REQUIRE(direct.cols == via_pipe.cols);
}
// ---------------------------------------------------------------------------
// Test: two-stage chain — filter then group+sum
// ---------------------------------------------------------------------------
TEST_CASE("compute_pipeline two stages chain filter then group") {
Table t;
t.headers = {"region", "type", "revenue"};
t.types = {ColumnType::String, ColumnType::String, ColumnType::Float};
t.add_row({"EU", "A", "100"});
t.add_row({"US", "A", "200"});
t.add_row({"EU", "B", "300"});
t.add_row({"EU", "A", "50"});
t.finalize();
// Stage 0: filter EU only.
Stage s0;
Filter f; f.col = 0; f.op = Op::Eq; f.value = "EU";
s0.filters.push_back(f);
// Stage 1: group by type + sum revenue, sort desc.
Stage s1;
s1.breakouts = {"type"};
Aggregation agg; agg.fn = AggFn::Sum; agg.col = "revenue";
s1.aggregations.push_back(agg);
SortClause sc; sc.col = "sum_revenue"; sc.desc = true;
s1.sorts.push_back(sc);
StageOutput out = compute_pipeline(t.cells.data(), t.rows, t.cols,
t.headers, t.types, {s0, s1});
// EU rows: A=100+50=150, B=300. Sorted desc -> B first.
REQUIRE(out.rows == 2);
REQUIRE(std::string(out.cells[0 * out.cols + 0]) == "B");
REQUIRE(std::string(out.cells[1 * out.cols + 0]) == "A");
double rev_b = std::stod(out.cells[0 * out.cols + 1]);
double rev_a = std::stod(out.cells[1 * out.cols + 1]);
REQUIRE(rev_b == 300.0);
REQUIRE(rev_a == 150.0);
}
// ---------------------------------------------------------------------------
// Test: three-stage chain — group, then filter on aggregated column, then sort
// ---------------------------------------------------------------------------
TEST_CASE("compute_pipeline three stage chain") {
Table t;
t.headers = {"cat", "val"};
t.types = {ColumnType::String, ColumnType::Int};
t.add_row({"A", "10"});
t.add_row({"A", "20"});
t.add_row({"B", "5"});
t.add_row({"C", "100"});
t.add_row({"C", "50"});
t.finalize();
// Stage 0: group by cat, sum val.
Stage s0;
s0.breakouts = {"cat"};
Aggregation agg; agg.fn = AggFn::Sum; agg.col = "val";
s0.aggregations.push_back(agg);
// Stage 1: filter where sum_val > 10.
Stage s1;
Filter f; f.col = 1; f.op = Op::Gt; f.value = "10";
s1.filters.push_back(f);
// Stage 2: sort asc by sum_val.
Stage s2;
SortClause sc; sc.col = "sum_val"; sc.desc = false;
s2.sorts.push_back(sc);
StageOutput out = compute_pipeline(t.cells.data(), t.rows, t.cols,
t.headers, t.types, {s0, s1, s2});
// A=30, B=5 (filtered out), C=150. Sorted asc -> A(30), C(150).
REQUIRE(out.rows == 2);
REQUIRE(std::string(out.cells[0 * out.cols + 0]) == "A");
REQUIRE(std::string(out.cells[1 * out.cols + 0]) == "C");
}
// ---------------------------------------------------------------------------
// Test: pipeline with empty input table
// ---------------------------------------------------------------------------
TEST_CASE("compute_pipeline empty table") {
Stage s0;
Filter f; f.col = 0; f.op = Op::Eq; f.value = "x";
s0.filters.push_back(f);
StageOutput out = compute_pipeline(nullptr, 0, 0, {}, {}, {s0});
REQUIRE(out.rows == 0);
}