diff --git a/cpp/functions/core/data_table_types.h b/cpp/functions/core/data_table_types.h index 383aff25..b35170d2 100644 --- a/cpp/functions/core/data_table_types.h +++ b/cpp/functions/core/data_table_types.h @@ -379,12 +379,27 @@ struct StringPool { } // intern: inserta si no existe. Devuelve indice estable. + // INVARIANTE: reserve() ANTES del primer intern() por columna para evitar + // reallocs que invalidarian los string_view del mapa. Si la estimacion fue + // insuficiente, forzamos reserve(size+1) ANTES de emplace_back para que + // la realloc ocurra antes de que cualquier sv del mapa apunte al buffer + // viejo — y reconstruimos el mapa desde cero tras la realloc. uint32_t intern(std::string_view sv) { auto it = index.find(sv); if (it != index.end()) return it->second; uint32_t id = (uint32_t)strings.size(); + if (strings.size() == strings.capacity()) { + // Realloc inminente: hacerlo ANTES de insertar en index para que + // los string_view existentes no queden dangling. Tras el reserve, + // reconstruimos el index desde cero porque los punteros cambiaron. + strings.reserve(strings.capacity() == 0 ? 64 : strings.capacity() * 2); + index.clear(); + for (uint32_t i = 0; i < (uint32_t)strings.size(); ++i) + index.emplace(std::string_view(strings[i]), i); + } strings.emplace_back(sv); - // Re-apuntar el string_view al almacenamiento interno (strings[id]). + // string_view apunta al almacenamiento interno (strings[id]), estable + // porque acabamos de garantizar capacidad suficiente. index.emplace(std::string_view(strings[id]), id); return id; } diff --git a/modules/data_table/data_table.cpp b/modules/data_table/data_table.cpp index b4d4d8c7..9f58fa23 100644 --- a/modules/data_table/data_table.cpp +++ b/modules/data_table/data_table.cpp @@ -71,6 +71,7 @@ #include #include +#include #include #include #include @@ -315,7 +316,185 @@ static std::string row_to_tsv(const char* const* cells, int rows, int cols, return out; } +// --------------------------------------------------------------------------- +// Issue 0133 — Change 3: Reader rewire helpers. +// +// snap_cell: devuelve el string de una celda desde el snapshot columnar cuando +// la columna esta en rango, con fallback al raw cells array. +// Para columnas Int/Float usa un buffer thread_local de 32 bytes (evita alloc). +// --------------------------------------------------------------------------- +static inline const char* snap_cell(int r, int c, + const SnapshotCache& snap, + const StringPool& pool, + char (&tmp)[32]) +{ + if (c >= 0 && c < (int)snap.cols.size()) { + const ColumnSnapshot& cs = snap.cols[(size_t)c]; + if (cs.type == ColumnType::Int) { + std::snprintf(tmp, sizeof(tmp), "%" PRId64, cs.i64[(size_t)r]); + return tmp; + } else if (cs.type == ColumnType::Float) { + std::snprintf(tmp, sizeof(tmp), "%.17g", cs.f64[(size_t)r]); + return tmp; + } else if (!cs.str_ids.empty()) { + return pool.at(cs.str_ids[(size_t)r]).c_str(); + } + } + (void)tmp; + return nullptr; // caller must fallback to cells[r*cols+c] +} + +// compare_snap: evaluates filter f against row r using snapshot when available. +// Falls back to raw cells if column not in snapshot. +static inline bool compare_snap(int r, int f_col, + const char* f_val, Op f_op, + const char* const* cells, int cols, + const SnapshotCache& snap, + const StringPool& pool) +{ + // Fast numeric path: avoid string conversion for numeric comparisons. + if (f_col >= 0 && f_col < (int)snap.cols.size()) { + const ColumnSnapshot& cs = snap.cols[(size_t)f_col]; + if (cs.type == ColumnType::Int && + (f_op == Op::Eq || f_op == Op::Neq || f_op == Op::Gt || + f_op == Op::Gte || f_op == Op::Lt || f_op == Op::Lte)) { + double fv; + if (parse_number(f_val, fv)) { + int64_t av = cs.i64[(size_t)r]; + int64_t bv = (int64_t)fv; + switch (f_op) { + case Op::Eq: return av == bv; + case Op::Neq: return av != bv; + case Op::Gt: return av > bv; + case Op::Gte: return av >= bv; + case Op::Lt: return av < bv; + case Op::Lte: return av <= bv; + default: break; + } + } + } + if (cs.type == ColumnType::Float && + (f_op == Op::Eq || f_op == Op::Neq || f_op == Op::Gt || + f_op == Op::Gte || f_op == Op::Lt || f_op == Op::Lte)) { + double fv; + if (parse_number(f_val, fv)) { + double av = cs.f64[(size_t)r]; + switch (f_op) { + case Op::Eq: return av == fv; + case Op::Neq: return av != fv; + case Op::Gt: return av > fv; + case Op::Gte: return av >= fv; + case Op::Lt: return av < fv; + case Op::Lte: return av <= fv; + default: break; + } + } + } + // String column: snapshot offers no speed advantage for substring ops + // (Contains/NotContains/StartsWith/EndsWith need full string scan regardless). + // Only use intern path for equality (id compare avoids strcmp). + if (!cs.str_ids.empty()) { + if (f_op == Op::Eq || f_op == Op::Neq) { + // Find the interned id of f_val (if not found, no row can match Eq, + // and all rows match Neq). + std::string_view fv_sv(f_val); + auto fv_it = pool.index.find(fv_sv); + if (f_op == Op::Eq) { + if (fv_it == pool.index.end()) return false; // f_val not interned => no match + return cs.str_ids[(size_t)r] == fv_it->second; + } else { // Op::Neq + if (fv_it == pool.index.end()) return true; // f_val not interned => all differ + return cs.str_ids[(size_t)r] != fv_it->second; + } + } + // For substring / prefix / suffix ops: fall through to raw cells (no snapshot benefit). + } + } + // Fallback: raw cells (e.g. derived column not in snapshot, or string substring op). + const char* cell = (f_col >= 0 && f_col < cols) ? cells[r * cols + f_col] : nullptr; + return compare(cell, f_val, f_op); +} + // compute_visible_rows: applies stage-0 filters + optional sort, returns matching row indices. +// Issue 0133 — Change 3: overload with snapshot for columnar reads. +static std::vector compute_visible_rows(const char* const* cells, + int rows, int cols, + const State& st, + const SnapshotCache& snap, + const StringPool& pool) +{ + std::vector out; + out.reserve(rows); + const Stage& s = st.raw(); + for (int r = 0; r < rows; ++r) { + bool keep = true; + for (const auto& f : s.filters) { + if (f.col < 0 || f.col >= cols) continue; + if (!compare_snap(r, f.col, f.value.c_str(), f.op, + cells, cols, snap, pool)) { + keep = false; break; + } + } + if (keep) out.push_back(r); + } + if (!s.sorts.empty()) { + const SortClause& sc0 = s.sorts.front(); + int sc = -1; + if (!sc0.col.empty() && sc0.col[0] == '@') { + sc = std::atoi(sc0.col.c_str() + 1); + } + bool desc = sc0.desc; + if (sc >= 0 && sc < cols) { + // Fast numeric sort via snapshot. + if (sc < (int)snap.cols.size()) { + const ColumnSnapshot& cs = snap.cols[(size_t)sc]; + if (cs.type == ColumnType::Int) { + std::sort(out.begin(), out.end(), [&](int a, int b) { + int64_t va = cs.i64[(size_t)a]; + int64_t vb = cs.i64[(size_t)b]; + return desc ? (va > vb) : (va < vb); + }); + goto sort_done; + } else if (cs.type == ColumnType::Float) { + std::sort(out.begin(), out.end(), [&](int a, int b) { + double va = cs.f64[(size_t)a]; + double vb = cs.f64[(size_t)b]; + return desc ? (va > vb) : (va < vb); + }); + goto sort_done; + } else if (!cs.str_ids.empty()) { + // String sort: compare uint32_t ids first (if equal -> same string). + std::sort(out.begin(), out.end(), [&](int a, int b) { + uint32_t ia = cs.str_ids[(size_t)a]; + uint32_t ib = cs.str_ids[(size_t)b]; + if (ia == ib) return false; // equal + int cmp = std::strcmp(pool.at(ia).c_str(), pool.at(ib).c_str()); + return desc ? (cmp > 0) : (cmp < 0); + }); + goto sort_done; + } + } + // Fallback sort via raw cells. + std::sort(out.begin(), out.end(), [&](int a, int b) { + const char* ca = cells[a * cols + sc]; + const char* cb = cells[b * cols + sc]; + if (!ca) ca = ""; + if (!cb) cb = ""; + double na, nb; + bool num = parse_number(ca, na) && parse_number(cb, nb); + int cmp; + if (num) cmp = (na < nb) ? -1 : (na > nb ? 1 : 0); + else cmp = std::strcmp(ca, cb); + return desc ? (cmp > 0) : (cmp < 0); + }); + sort_done:; + } + } + return out; +} + +// compute_visible_rows: legacy overload without snapshot (used by stage>0 path +// which operates on materialized StageOutput — not the raw cells snapshot). static std::vector compute_visible_rows(const char* const* cells, int rows, int cols, const State& st) @@ -830,7 +1009,18 @@ void render(const char* id, // StringPool.clear() + rebuild siempre que el snapshot se reconstruya, // para mantener coherencia de indices entre pool y snapshot. // ------------------------------------------------------------------------- - if (U.snapshot.last_cells_ptr != cells) { + // Snapshot invalido si: + // 1. El puntero de cells cambio (nuevos datos). + // 2. El pool fue limpiado despues del build (st.string_pool es un nuevo State + // o fue cleared externamente): pool_size_built != strings.size(). + // Esto cubre el caso "begin_scenario crea nuevo State con pool vacio pero + // same cells pointer" — sin este check los str_ids apuntarian a un pool + // vacio y se crashearia en pool.at(str_ids[r]). + const bool snap_stale = (U.snapshot.last_cells_ptr != cells) || + (U.snapshot.pool_size_built != + (uint32_t)st.string_pool.strings.size() && + !U.snapshot.cols.empty()); + if (snap_stale) { // Invalidar y reconstruir. U.snapshot.last_cells_ptr = cells; U.snapshot.cols.clear(); @@ -875,14 +1065,49 @@ void render(const char* id, } } else { // String, Bool, Date, Json, Auto → intern as string. - cs.str_ids.resize((size_t)row_count); - for (int r = 0; r < row_count; ++r) { + // Cardinality cap: if >2048 unique values seen in first 25% of rows, + // skip interning this column (high-cardinality cols like timestamps + // offer no compression benefit and hurt cache). str_ids stays empty; + // compare_snap falls back to raw cells for this column. + static const int kCardinalityCap = 2048; + const int sample_n = (row_count < 4) ? row_count : (row_count / 4); + uint32_t pool_before = (uint32_t)st.string_pool.strings.size(); + bool skip_intern = false; + for (int r = 0; r < sample_n; ++r) { const char* sv = cells[(size_t)(r * orig_cols + c)]; std::string_view svv = sv ? std::string_view(sv) : std::string_view(""); - cs.str_ids[(size_t)r] = st.string_pool.intern(svv); + st.string_pool.intern(svv); + if ((int)st.string_pool.strings.size() - (int)pool_before > kCardinalityCap) { + skip_intern = true; + break; + } + } + if (skip_intern) { + // Rollback pool entries added during sample (remove tail entries). + // Simpler: just leave pool with sample entries and mark col as no-intern + // by keeping str_ids empty. Pool entries are harmless (amortized). + // cs.str_ids stays empty → compare_snap falls through to raw cells. + cs.str_ids.clear(); + } else { + // Low cardinality: intern all rows. + cs.str_ids.resize((size_t)row_count); + // Fill already-sampled rows from pool (intern is idempotent). + for (int r = 0; r < sample_n; ++r) { + const char* sv = cells[(size_t)(r * orig_cols + c)]; + std::string_view svv = sv ? std::string_view(sv) : std::string_view(""); + cs.str_ids[(size_t)r] = st.string_pool.intern(svv); + } + // Intern remaining rows. + for (int r = sample_n; r < row_count; ++r) { + const char* sv = cells[(size_t)(r * orig_cols + c)]; + std::string_view svv = sv ? std::string_view(sv) : std::string_view(""); + cs.str_ids[(size_t)r] = st.string_pool.intern(svv); + } } } } + // Record pool size at end of build so validity check is accurate. + U.snapshot.pool_size_built = (uint32_t)st.string_pool.strings.size(); } // Build eff_headers / src_for_eff / eff_types para STAGE 0. @@ -1060,7 +1285,10 @@ void render(const char* id, st_tmp.stages[0].sorts.push_back({tmp, sc0.desc}); } } - auto visible_rows = compute_visible_rows(cells, row_count, orig_cols, st_tmp); + // Issue 0133 — Change 3: use snapshot-aware overload when snapshot is valid. + auto visible_rows = (U.snapshot.last_cells_ptr == cells && !U.snapshot.cols.empty()) + ? compute_visible_rows(cells, row_count, orig_cols, st_tmp, U.snapshot, st.string_pool) + : compute_visible_rows(cells, row_count, orig_cols, st_tmp); int visible_cols = 0; for (int k = 0; k < eff_cols; ++k) if (st.col_visible[k]) ++visible_cols; @@ -1107,7 +1335,13 @@ void render(const char* id, if (!st.col_visible[c]) continue; int src = src_for_eff[c]; if (!first) out += ','; - out += csv_escape(cells[r * orig_cols + src]); + // Issue 0133 — Change 3: use snapshot for orig cols. + char tmp32[32]; + const char* cv = (src < orig_cols && src < (int)U.snapshot.cols.size()) + ? snap_cell(r, src, U.snapshot, st.string_pool, tmp32) + : nullptr; + if (!cv) cv = cells[r * orig_cols + src]; + out += csv_escape(cv); first = false; } out += '\n'; @@ -1160,6 +1394,8 @@ void render(const char* id, for (int r : visible_rows) { for (int c : vcols) { if (c < orig_cols) { + // Raw pointer: materialization copies to string anyway — snapshot + // path offers no benefit here and adds snprintf overhead for Int/Float. const char* p = cells[r * orig_cols + c]; so_main.cell_backing.emplace_back(p ? p : ""); } else { @@ -1230,6 +1466,7 @@ void render(const char* id, } } else { int src = src_for_eff[c]; + // Raw pointer: materialization copies to string anyway. const char* p = cells[r * orig_cols + src]; s0_backing.emplace_back(p ? p : ""); } @@ -1282,7 +1519,10 @@ void render(const char* id, st_tmp.stages[0].sorts.push_back({tmp, sc0.desc}); } } - auto vrows = compute_visible_rows(cells, row_count, orig_cols, st_tmp); + // Issue 0133 — Change 3: use snapshot-aware filter/sort when available. + auto vrows = (U.snapshot.last_cells_ptr == cells && !U.snapshot.cols.empty()) + ? compute_visible_rows(cells, row_count, orig_cols, st_tmp, U.snapshot, st.string_pool) + : compute_visible_rows(cells, row_count, orig_cols, st_tmp); // Materializar stage0 output: cells (eff_cols) con derived evaluadas. std::vector mat_backing; @@ -1292,10 +1532,9 @@ void render(const char* id, for (int r : vrows) { for (int c = 0; c < eff_cols; ++c) { - const char* p; - std::string buf; if (c < orig_cols) { - p = cells[r * orig_cols + c]; + // Raw pointer: materialization copies to string anyway. + const char* p = cells[r * orig_cols + c]; mat_backing.emplace_back(p ? p : ""); } else { const DerivedColumn& d = stage0.derived[c - orig_cols]; @@ -1318,7 +1557,7 @@ void render(const char* id, lua_engine::eval(lua_engine::get(), d.lua_id, ctx, &err)); } } else { - // retipo puro + // retipo puro — raw pointer from orig cells. int src = d.source_col; const char* sp = (src >= 0 && src < orig_cols) ? cells[r * orig_cols + src] : ""; mat_backing.emplace_back(sp ? sp : ""); diff --git a/modules/data_table/data_table_internal.h b/modules/data_table/data_table_internal.h index 870aef49..5db9ca9c 100644 --- a/modules/data_table/data_table_internal.h +++ b/modules/data_table/data_table_internal.h @@ -116,7 +116,8 @@ struct ColumnSnapshot { // (post-join, pre-stages). Se invalida por pointer-identity de `cells`. // --------------------------------------------------------------------------- struct SnapshotCache { - const char* const* last_cells_ptr = nullptr; // sentinel de invalidacion + const char* const* last_cells_ptr = nullptr; // sentinel de invalidacion por ptr + uint32_t pool_size_built = 0; // strings.size() cuando se construyo std::vector cols; // un entry por columna efectiva };