feat(viz): graph_sources lector operations.db + streaming (issue 0049g)
- graph_load_from_operations: SQLite read-only, schema-detect (type_ref/type, from_entity/source, to_entity/target, name/type, weight, updated_at). - 16-color indigo palette por hash FNV1a32 del nombre de tipo. user_data por nodo es FNV1a64(entity.id) — deterministico entre cargas. - Label pool interno: metadata.name (JSON simple) > entities.name > id. - graph_free libera nodes/edges/types/rel_types/labels/strdup'd names via arena_map (GraphData* -> arena). - Streaming pull-based con tiebreak (updated_at, id) y crecimiento x2 de capacidad. Tipos nuevos descubiertos en stream se anaden a types. - Tests: fixture in-memory (3 entity types, 2 rel types, 10 entities, 15 relations) + smoke contra apps/script_navegador/operations.db. - Issue movido a completed/. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,675 @@
|
||||
#include "graph_sources.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "../../vendor/sqlite3/sqlite3.h"
|
||||
|
||||
namespace graph {
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Hash y palette por defecto
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
static uint32_t fnv1a32(const char* s) {
|
||||
uint32_t h = 2166136261u;
|
||||
for (; s && *s; ++s) {
|
||||
h ^= (uint8_t)*s;
|
||||
h *= 16777619u;
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
static uint64_t fnv1a64(const char* s) {
|
||||
uint64_t h = 1469598103934665603ULL;
|
||||
for (; s && *s; ++s) {
|
||||
h ^= (uint8_t)*s;
|
||||
h *= 1099511628211ULL;
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
// 16 colores indigo-friendly RGBA8 (R en LSB). Suficiente variedad pero
|
||||
// armonia visual: hue rotando ~22 grados, saturacion media, luminancia
|
||||
// estable. Si dos tipos colisionan en el palette, no es critico — el caller
|
||||
// puede aplicar overrides via types.yaml.
|
||||
static const uint32_t kDefaultPalette[16] = {
|
||||
0xFF7C6FECu, 0xFFE0703Cu, 0xFF36C2A8u, 0xFFD96EB6u,
|
||||
0xFF8FB85Eu, 0xFFE0C24Au, 0xFF5BA8E0u, 0xFFC97070u,
|
||||
0xFFA67BD9u, 0xFF60B89Bu, 0xFFE08C4Au, 0xFF7995E0u,
|
||||
0xFFB8607Au, 0xFF6FB4D9u, 0xFFC09A4Au, 0xFF8FA0E0u,
|
||||
};
|
||||
|
||||
static uint32_t default_color_for(const char* type_name) {
|
||||
return kDefaultPalette[fnv1a32(type_name) & 0xFu];
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// LoaderArena: dueno de la memoria que cuelga del GraphData devuelto.
|
||||
// Mantenemos un magic header en `nodes`/`edges` para que graph_free pueda
|
||||
// localizar el arena en O(1) sin un mapa global.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct LoaderArena {
|
||||
GraphNode* nodes = nullptr;
|
||||
GraphEdge* edges = nullptr;
|
||||
EntityType* types = nullptr;
|
||||
RelationType* rel_types = nullptr;
|
||||
// Nombres de tipos (strdup'd; los apunta types[i].name / rel_types[i].name).
|
||||
std::vector<char*> type_names;
|
||||
std::vector<char*> rel_type_names;
|
||||
// String pool de labels (idx 0 reservado a "").
|
||||
std::vector<char*> labels;
|
||||
};
|
||||
|
||||
// Mapa GraphData* → LoaderArena*. Como el caller puede tener varios
|
||||
// GraphData a la vez, usar un static unordered_map es la opcion mas simple
|
||||
// que no contamina la struct publica (que es POD para vertex pulling).
|
||||
static std::unordered_map<const GraphData*, LoaderArena*>& arena_map() {
|
||||
static std::unordered_map<const GraphData*, LoaderArena*> m;
|
||||
return m;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Stats helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
static void zero_stats(GraphLoadStats* s) {
|
||||
if (!s) return;
|
||||
s->nodes_loaded = 0;
|
||||
s->edges_loaded = 0;
|
||||
s->types_discovered = 0;
|
||||
s->rel_types_discovered = 0;
|
||||
s->errors = 0;
|
||||
s->error_msg[0] = '\0';
|
||||
}
|
||||
|
||||
static void set_err(GraphLoadStats* s, const char* msg) {
|
||||
if (!s) return;
|
||||
s->errors++;
|
||||
std::snprintf(s->error_msg, sizeof(s->error_msg), "%s", msg ? msg : "");
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Schema detection: `entities` y `relations` cambian de columnas entre
|
||||
// versiones de operations.db. Detectamos por PRAGMA table_info.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
static bool table_exists(sqlite3* db, const char* name) {
|
||||
sqlite3_stmt* st = nullptr;
|
||||
const char* q = "SELECT 1 FROM sqlite_master WHERE type='table' AND name=?";
|
||||
if (sqlite3_prepare_v2(db, q, -1, &st, nullptr) != SQLITE_OK) return false;
|
||||
sqlite3_bind_text(st, 1, name, -1, SQLITE_STATIC);
|
||||
bool found = (sqlite3_step(st) == SQLITE_ROW);
|
||||
sqlite3_finalize(st);
|
||||
return found;
|
||||
}
|
||||
|
||||
static bool column_exists(sqlite3* db, const char* table, const char* column) {
|
||||
char sql[256];
|
||||
std::snprintf(sql, sizeof(sql), "PRAGMA table_info(%s)", table);
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, sql, -1, &st, nullptr) != SQLITE_OK) return false;
|
||||
bool found = false;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* n = sqlite3_column_text(st, 1);
|
||||
if (n && std::strcmp((const char*)n, column) == 0) { found = true; break; }
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
return found;
|
||||
}
|
||||
|
||||
struct Schema {
|
||||
std::string entity_type_col; // type_ref | type
|
||||
std::string entity_meta_col; // metadata (puede no existir)
|
||||
std::string entity_updated; // updated_at
|
||||
std::string rel_src_col; // from_entity | source
|
||||
std::string rel_tgt_col; // to_entity | target
|
||||
std::string rel_type_col; // name | type
|
||||
std::string rel_weight_col; // weight (puede no existir)
|
||||
std::string rel_updated; // updated_at
|
||||
};
|
||||
|
||||
static bool detect_schema(sqlite3* db, Schema* s, GraphLoadStats* stats) {
|
||||
if (!table_exists(db, "entities")) {
|
||||
set_err(stats, "missing table: entities");
|
||||
return false;
|
||||
}
|
||||
if (!table_exists(db, "relations")) {
|
||||
set_err(stats, "missing table: relations");
|
||||
return false;
|
||||
}
|
||||
s->entity_type_col = column_exists(db, "entities", "type_ref") ? "type_ref"
|
||||
: column_exists(db, "entities", "type") ? "type"
|
||||
: "";
|
||||
if (s->entity_type_col.empty()) {
|
||||
set_err(stats, "entities: missing type_ref/type column");
|
||||
return false;
|
||||
}
|
||||
s->entity_meta_col = column_exists(db, "entities", "metadata") ? "metadata" : "";
|
||||
s->entity_updated = column_exists(db, "entities", "updated_at") ? "updated_at" : "";
|
||||
|
||||
s->rel_src_col = column_exists(db, "relations", "from_entity") ? "from_entity"
|
||||
: column_exists(db, "relations", "source") ? "source"
|
||||
: "";
|
||||
s->rel_tgt_col = column_exists(db, "relations", "to_entity") ? "to_entity"
|
||||
: column_exists(db, "relations", "target") ? "target"
|
||||
: "";
|
||||
if (s->rel_src_col.empty() || s->rel_tgt_col.empty()) {
|
||||
set_err(stats, "relations: missing from_entity/to_entity columns");
|
||||
return false;
|
||||
}
|
||||
// El "tipo" de relacion: priorizamos `type` si existe; si no, `name`.
|
||||
// En operations.db del registry no hay `type` en relations, pero `name`
|
||||
// suele encodear la relacion (ej: "owns", "connects").
|
||||
s->rel_type_col = column_exists(db, "relations", "type") ? "type"
|
||||
: column_exists(db, "relations", "name") ? "name"
|
||||
: "";
|
||||
s->rel_weight_col = column_exists(db, "relations", "weight") ? "weight" : "";
|
||||
s->rel_updated = column_exists(db, "relations", "updated_at") ? "updated_at" : "";
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// JSON metadata.name extractor (super basico, sin parser): busca "name" : "X".
|
||||
// Si no encuentra, devuelve "". Sirve para entities donde metadata es JSON
|
||||
// pequeno; para casos complicados, el caller debe extender.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
static std::string json_get_name(const char* json) {
|
||||
if (!json) return "";
|
||||
const char* p = std::strstr(json, "\"name\"");
|
||||
if (!p) return "";
|
||||
p += 6;
|
||||
while (*p == ' ' || *p == '\t' || *p == ':') ++p;
|
||||
if (*p != '"') return "";
|
||||
++p;
|
||||
const char* end = std::strchr(p, '"');
|
||||
if (!end) return "";
|
||||
return std::string(p, end - p);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// String pool helper
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
static uint32_t intern_label(LoaderArena* arena, const std::string& s) {
|
||||
if (s.empty()) return 0;
|
||||
char* dup = (char*)std::malloc(s.size() + 1);
|
||||
std::memcpy(dup, s.c_str(), s.size() + 1);
|
||||
arena->labels.push_back(dup);
|
||||
// idx 0 esta reservado para "no label", asi que el pool real empieza en 1.
|
||||
return (uint32_t)arena->labels.size();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Carga principal
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
bool graph_load_from_operations(const char* db_path, GraphData* out, GraphLoadStats* stats) {
|
||||
zero_stats(stats);
|
||||
if (!db_path || !out) {
|
||||
set_err(stats, "null db_path or out");
|
||||
return false;
|
||||
}
|
||||
*out = GraphData{};
|
||||
|
||||
sqlite3* db = nullptr;
|
||||
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||
char buf[256];
|
||||
std::snprintf(buf, sizeof(buf), "open: %s", sqlite3_errmsg(db));
|
||||
set_err(stats, buf);
|
||||
if (db) sqlite3_close(db);
|
||||
return false;
|
||||
}
|
||||
|
||||
Schema sch;
|
||||
if (!detect_schema(db, &sch, stats)) {
|
||||
sqlite3_close(db);
|
||||
return false;
|
||||
}
|
||||
|
||||
auto* arena = new LoaderArena();
|
||||
|
||||
// --- 1) Tipos de entidad (DISTINCT entity_type) ---
|
||||
std::unordered_map<std::string, uint16_t> type_idx;
|
||||
{
|
||||
std::string q = "SELECT DISTINCT " + sch.entity_type_col +
|
||||
" FROM entities WHERE " + sch.entity_type_col + " IS NOT NULL";
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) != SQLITE_OK) {
|
||||
set_err(stats, sqlite3_errmsg(db));
|
||||
delete arena;
|
||||
sqlite3_close(db);
|
||||
return false;
|
||||
}
|
||||
std::vector<std::string> names;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* n = sqlite3_column_text(st, 0);
|
||||
if (!n) continue;
|
||||
names.emplace_back((const char*)n);
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
|
||||
arena->types = (EntityType*)std::calloc(names.size() ? names.size() : 1, sizeof(EntityType));
|
||||
for (size_t i = 0; i < names.size(); ++i) {
|
||||
char* dup = (char*)std::malloc(names[i].size() + 1);
|
||||
std::memcpy(dup, names[i].c_str(), names[i].size() + 1);
|
||||
arena->type_names.push_back(dup);
|
||||
arena->types[i].color = default_color_for(dup);
|
||||
arena->types[i].shape = SHAPE_CIRCLE;
|
||||
arena->types[i].icon_id = 0;
|
||||
arena->types[i].default_size = 6.0f;
|
||||
arena->types[i].name = dup;
|
||||
type_idx[names[i]] = (uint16_t)i;
|
||||
}
|
||||
out->types = arena->types;
|
||||
out->type_count = (int)names.size();
|
||||
if (stats) stats->types_discovered = (int)names.size();
|
||||
}
|
||||
|
||||
// --- 2) Tipos de relacion (DISTINCT rel_type) ---
|
||||
std::unordered_map<std::string, uint16_t> rel_type_idx;
|
||||
if (!sch.rel_type_col.empty()) {
|
||||
std::string q = "SELECT DISTINCT " + sch.rel_type_col +
|
||||
" FROM relations WHERE " + sch.rel_type_col + " IS NOT NULL";
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
||||
std::vector<std::string> names;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* n = sqlite3_column_text(st, 0);
|
||||
if (!n) continue;
|
||||
names.emplace_back((const char*)n);
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
|
||||
arena->rel_types = (RelationType*)std::calloc(names.size() ? names.size() : 1, sizeof(RelationType));
|
||||
for (size_t i = 0; i < names.size(); ++i) {
|
||||
char* dup = (char*)std::malloc(names[i].size() + 1);
|
||||
std::memcpy(dup, names[i].c_str(), names[i].size() + 1);
|
||||
arena->rel_type_names.push_back(dup);
|
||||
arena->rel_types[i].color = default_color_for(dup);
|
||||
arena->rel_types[i].style = EDGE_SOLID;
|
||||
arena->rel_types[i].width = 1.0f;
|
||||
arena->rel_types[i].name = dup;
|
||||
rel_type_idx[names[i]] = (uint16_t)i;
|
||||
}
|
||||
out->rel_types = arena->rel_types;
|
||||
out->rel_type_count = (int)names.size();
|
||||
if (stats) stats->rel_types_discovered = (int)names.size();
|
||||
}
|
||||
}
|
||||
|
||||
// --- 3) Entidades ---
|
||||
std::unordered_map<std::string, uint32_t> id_to_idx;
|
||||
{
|
||||
std::string q = "SELECT id, " + sch.entity_type_col;
|
||||
if (!sch.entity_meta_col.empty()) q += ", " + sch.entity_meta_col;
|
||||
// Aniadir name si existe (para etiqueta sin parsear metadata)
|
||||
bool has_name_col = column_exists(db, "entities", "name");
|
||||
if (has_name_col) q += ", name";
|
||||
q += " FROM entities";
|
||||
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) != SQLITE_OK) {
|
||||
set_err(stats, sqlite3_errmsg(db));
|
||||
sqlite3_close(db);
|
||||
// arena queda con types alocados; el caller debe llamar graph_free.
|
||||
arena_map()[out] = arena;
|
||||
return false;
|
||||
}
|
||||
// Reservamos generosamente — sera el caller quien decida si hace
|
||||
// shrink despues. Para v1, alocamos exactamente lo que devuelva la
|
||||
// query usando un primer pase con COUNT, pero eso obliga a 2 queries.
|
||||
// Simplificamos: pasada en vector y luego copia al array final.
|
||||
std::vector<GraphNode> rows;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* id_c = sqlite3_column_text(st, 0);
|
||||
const unsigned char* tp_c = sqlite3_column_text(st, 1);
|
||||
if (!id_c || !tp_c) continue;
|
||||
std::string id_s = (const char*)id_c;
|
||||
std::string type_s = (const char*)tp_c;
|
||||
|
||||
std::string label;
|
||||
int col = 2;
|
||||
if (!sch.entity_meta_col.empty()) {
|
||||
const unsigned char* m = sqlite3_column_text(st, col++);
|
||||
if (m) label = json_get_name((const char*)m);
|
||||
}
|
||||
if (label.empty() && has_name_col) {
|
||||
const unsigned char* nm = sqlite3_column_text(st, col);
|
||||
if (nm && *nm) label = (const char*)nm;
|
||||
}
|
||||
if (label.empty()) label = id_s;
|
||||
|
||||
auto it = type_idx.find(type_s);
|
||||
if (it == type_idx.end()) continue; // no deberia pasar (DISTINCT cubre)
|
||||
|
||||
GraphNode n = graph_node(0.0f, 0.0f, it->second);
|
||||
n.user_data = fnv1a64(id_s.c_str());
|
||||
n.label_idx = intern_label(arena, label);
|
||||
id_to_idx[id_s] = (uint32_t)rows.size();
|
||||
rows.push_back(n);
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
|
||||
if (!rows.empty()) {
|
||||
arena->nodes = (GraphNode*)std::malloc(rows.size() * sizeof(GraphNode));
|
||||
std::memcpy(arena->nodes, rows.data(), rows.size() * sizeof(GraphNode));
|
||||
}
|
||||
out->nodes = arena->nodes;
|
||||
out->node_count = (int)rows.size();
|
||||
out->node_capacity = (int)rows.size();
|
||||
if (stats) stats->nodes_loaded = (int)rows.size();
|
||||
}
|
||||
|
||||
// --- 4) Relaciones ---
|
||||
{
|
||||
std::string q = "SELECT " + sch.rel_src_col + ", " + sch.rel_tgt_col;
|
||||
if (!sch.rel_type_col.empty()) q += ", " + sch.rel_type_col;
|
||||
if (!sch.rel_weight_col.empty()) q += ", " + sch.rel_weight_col;
|
||||
q += " FROM relations";
|
||||
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) != SQLITE_OK) {
|
||||
set_err(stats, sqlite3_errmsg(db));
|
||||
} else {
|
||||
std::vector<GraphEdge> rows;
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* src = sqlite3_column_text(st, 0);
|
||||
const unsigned char* tgt = sqlite3_column_text(st, 1);
|
||||
if (!src || !tgt) {
|
||||
if (stats) stats->errors++;
|
||||
continue;
|
||||
}
|
||||
auto sit = id_to_idx.find((const char*)src);
|
||||
auto tit = id_to_idx.find((const char*)tgt);
|
||||
if (sit == id_to_idx.end() || tit == id_to_idx.end()) {
|
||||
if (stats) stats->errors++;
|
||||
continue;
|
||||
}
|
||||
int col = 2;
|
||||
uint16_t rt = 0;
|
||||
if (!sch.rel_type_col.empty()) {
|
||||
const unsigned char* tp = sqlite3_column_text(st, col++);
|
||||
if (tp) {
|
||||
auto rit = rel_type_idx.find((const char*)tp);
|
||||
if (rit != rel_type_idx.end()) rt = rit->second;
|
||||
}
|
||||
}
|
||||
float w = 1.0f;
|
||||
if (!sch.rel_weight_col.empty()) {
|
||||
if (sqlite3_column_type(st, col) != SQLITE_NULL)
|
||||
w = (float)sqlite3_column_double(st, col);
|
||||
col++;
|
||||
}
|
||||
rows.push_back(graph_edge(sit->second, tit->second, w, rt));
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
|
||||
if (!rows.empty()) {
|
||||
arena->edges = (GraphEdge*)std::malloc(rows.size() * sizeof(GraphEdge));
|
||||
std::memcpy(arena->edges, rows.data(), rows.size() * sizeof(GraphEdge));
|
||||
}
|
||||
out->edges = arena->edges;
|
||||
out->edge_count = (int)rows.size();
|
||||
out->edge_capacity = (int)rows.size();
|
||||
if (stats) stats->edges_loaded = (int)rows.size();
|
||||
}
|
||||
}
|
||||
|
||||
sqlite3_close(db);
|
||||
arena_map()[out] = arena;
|
||||
out->update_bounds();
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// graph_free
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
void graph_free(GraphData* graph) {
|
||||
if (!graph) return;
|
||||
auto& m = arena_map();
|
||||
auto it = m.find(graph);
|
||||
if (it != m.end()) {
|
||||
LoaderArena* a = it->second;
|
||||
std::free(a->nodes);
|
||||
std::free(a->edges);
|
||||
std::free(a->types);
|
||||
std::free(a->rel_types);
|
||||
for (char* p : a->type_names) std::free(p);
|
||||
for (char* p : a->rel_type_names) std::free(p);
|
||||
for (char* p : a->labels) std::free(p);
|
||||
delete a;
|
||||
m.erase(it);
|
||||
}
|
||||
*graph = GraphData{};
|
||||
}
|
||||
|
||||
const char* graph_label(const GraphData* graph, uint32_t label_idx) {
|
||||
if (!graph || label_idx == 0) return "";
|
||||
auto& m = arena_map();
|
||||
auto it = m.find(graph);
|
||||
if (it == m.end()) return "";
|
||||
LoaderArena* a = it->second;
|
||||
if (label_idx > a->labels.size()) return "";
|
||||
return a->labels[label_idx - 1];
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Streaming
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
struct GraphStreamSource {
|
||||
std::string db_path;
|
||||
int poll_ms;
|
||||
Schema schema;
|
||||
// Tiebreak: (updated_at, id) > last_seen. Cuando no hay updated_at,
|
||||
// caemos a un seek por id ordenado.
|
||||
std::string last_ent_updated;
|
||||
std::string last_ent_id;
|
||||
std::string last_rel_updated;
|
||||
std::string last_rel_id;
|
||||
};
|
||||
|
||||
GraphStreamSource* graph_stream_operations_open(const char* db_path, int poll_ms) {
|
||||
if (!db_path) return nullptr;
|
||||
sqlite3* db = nullptr;
|
||||
if (sqlite3_open_v2(db_path, &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||
if (db) sqlite3_close(db);
|
||||
return nullptr;
|
||||
}
|
||||
auto* src = new GraphStreamSource();
|
||||
src->db_path = db_path;
|
||||
src->poll_ms = poll_ms;
|
||||
GraphLoadStats dummy{};
|
||||
if (!detect_schema(db, &src->schema, &dummy)) {
|
||||
sqlite3_close(db);
|
||||
delete src;
|
||||
return nullptr;
|
||||
}
|
||||
// Inicializamos last_seen al MAX actual para que el primer pull devuelva
|
||||
// solo lo que llegue despues de open(). Si el caller quisiera todo lo
|
||||
// existente, primero deberia hacer graph_load_from_operations.
|
||||
auto fetch_max = [&](const std::string& table, const std::string& upd_col,
|
||||
std::string* out_upd, std::string* out_id) {
|
||||
std::string q;
|
||||
if (!upd_col.empty()) {
|
||||
q = "SELECT COALESCE(MAX(" + upd_col + "), ''), COALESCE(MAX(id), '') FROM " + table;
|
||||
} else {
|
||||
q = "SELECT '', COALESCE(MAX(id), '') FROM " + table;
|
||||
}
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
||||
if (sqlite3_step(st) == SQLITE_ROW) {
|
||||
const unsigned char* a = sqlite3_column_text(st, 0);
|
||||
const unsigned char* b = sqlite3_column_text(st, 1);
|
||||
*out_upd = a ? (const char*)a : "";
|
||||
*out_id = b ? (const char*)b : "";
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
}
|
||||
};
|
||||
fetch_max("entities", src->schema.entity_updated,
|
||||
&src->last_ent_updated, &src->last_ent_id);
|
||||
fetch_max("relations", src->schema.rel_updated,
|
||||
&src->last_rel_updated, &src->last_rel_id);
|
||||
sqlite3_close(db);
|
||||
return src;
|
||||
}
|
||||
|
||||
// Helpers para crecer arrays si no hay sitio. Mantienen sincronizados arena y
|
||||
// graph (que comparten el mismo puntero base).
|
||||
static void ensure_node_capacity(LoaderArena* arena, GraphData* g, int needed) {
|
||||
if (g->node_capacity >= needed) return;
|
||||
int new_cap = g->node_capacity > 0 ? g->node_capacity * 2 : 64;
|
||||
while (new_cap < needed) new_cap *= 2;
|
||||
arena->nodes = (GraphNode*)std::realloc(arena->nodes, new_cap * sizeof(GraphNode));
|
||||
g->nodes = arena->nodes;
|
||||
g->node_capacity = new_cap;
|
||||
}
|
||||
|
||||
static void ensure_edge_capacity(LoaderArena* arena, GraphData* g, int needed) {
|
||||
if (g->edge_capacity >= needed) return;
|
||||
int new_cap = g->edge_capacity > 0 ? g->edge_capacity * 2 : 64;
|
||||
while (new_cap < needed) new_cap *= 2;
|
||||
arena->edges = (GraphEdge*)std::realloc(arena->edges, new_cap * sizeof(GraphEdge));
|
||||
g->edges = arena->edges;
|
||||
g->edge_capacity = new_cap;
|
||||
}
|
||||
|
||||
int graph_stream_pull(GraphStreamSource* src, GraphData* graph) {
|
||||
if (!src || !graph) return 0;
|
||||
auto it = arena_map().find(graph);
|
||||
if (it == arena_map().end()) return 0; // GraphData no fue cargado por nosotros
|
||||
LoaderArena* arena = it->second;
|
||||
|
||||
sqlite3* db = nullptr;
|
||||
if (sqlite3_open_v2(src->db_path.c_str(), &db, SQLITE_OPEN_READONLY, nullptr) != SQLITE_OK) {
|
||||
if (db) sqlite3_close(db);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int appended = 0;
|
||||
auto& sch = src->schema;
|
||||
|
||||
// --- entidades nuevas ---
|
||||
{
|
||||
std::string q;
|
||||
const bool has_upd = !sch.entity_updated.empty();
|
||||
if (has_upd) {
|
||||
q = "SELECT id, " + sch.entity_type_col + ", " + sch.entity_updated +
|
||||
" FROM entities WHERE (" + sch.entity_updated + " > ?1) OR (" +
|
||||
sch.entity_updated + " = ?1 AND id > ?2) ORDER BY " +
|
||||
sch.entity_updated + ", id";
|
||||
} else {
|
||||
q = "SELECT id, " + sch.entity_type_col +
|
||||
", '' FROM entities WHERE id > ?2 ORDER BY id";
|
||||
}
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
||||
sqlite3_bind_text(st, 1, src->last_ent_updated.c_str(), -1, SQLITE_TRANSIENT);
|
||||
sqlite3_bind_text(st, 2, src->last_ent_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
ensure_node_capacity(arena, graph, graph->node_count + 1);
|
||||
const unsigned char* id_c = sqlite3_column_text(st, 0);
|
||||
const unsigned char* tp_c = sqlite3_column_text(st, 1);
|
||||
const unsigned char* up_c = sqlite3_column_text(st, 2);
|
||||
if (!id_c || !tp_c) continue;
|
||||
// Buscamos type_id; si es nuevo, lo anadimos al final de
|
||||
// arena->types con un realloc (caso poco frecuente).
|
||||
uint16_t type_id = 0;
|
||||
bool found_type = false;
|
||||
for (int i = 0; i < graph->type_count; ++i) {
|
||||
if (graph->types[i].name && std::strcmp(graph->types[i].name, (const char*)tp_c) == 0) {
|
||||
type_id = (uint16_t)i;
|
||||
found_type = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found_type) {
|
||||
int new_count = graph->type_count + 1;
|
||||
arena->types = (EntityType*)std::realloc(arena->types, new_count * sizeof(EntityType));
|
||||
graph->types = arena->types;
|
||||
char* dup = (char*)std::malloc(std::strlen((const char*)tp_c) + 1);
|
||||
std::strcpy(dup, (const char*)tp_c);
|
||||
arena->type_names.push_back(dup);
|
||||
arena->types[graph->type_count].color = default_color_for(dup);
|
||||
arena->types[graph->type_count].shape = SHAPE_CIRCLE;
|
||||
arena->types[graph->type_count].icon_id = 0;
|
||||
arena->types[graph->type_count].default_size = 6.0f;
|
||||
arena->types[graph->type_count].name = dup;
|
||||
type_id = (uint16_t)graph->type_count;
|
||||
graph->type_count = new_count;
|
||||
}
|
||||
GraphNode n = graph_node(0.0f, 0.0f, type_id);
|
||||
n.user_data = fnv1a64((const char*)id_c);
|
||||
n.label_idx = intern_label(arena, (const char*)id_c);
|
||||
graph->nodes[graph->node_count++] = n;
|
||||
|
||||
if (has_upd && up_c) src->last_ent_updated = (const char*)up_c;
|
||||
src->last_ent_id = (const char*)id_c;
|
||||
appended++;
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
}
|
||||
}
|
||||
|
||||
// --- relaciones nuevas ---
|
||||
{
|
||||
std::string q;
|
||||
const bool has_upd = !sch.rel_updated.empty();
|
||||
if (has_upd) {
|
||||
q = "SELECT id, " + sch.rel_src_col + ", " + sch.rel_tgt_col +
|
||||
", " + sch.rel_updated +
|
||||
" FROM relations WHERE (" + sch.rel_updated + " > ?1) OR (" +
|
||||
sch.rel_updated + " = ?1 AND id > ?2) ORDER BY " +
|
||||
sch.rel_updated + ", id";
|
||||
} else {
|
||||
q = "SELECT id, " + sch.rel_src_col + ", " + sch.rel_tgt_col +
|
||||
", '' FROM relations WHERE id > ?2 ORDER BY id";
|
||||
}
|
||||
sqlite3_stmt* st = nullptr;
|
||||
if (sqlite3_prepare_v2(db, q.c_str(), -1, &st, nullptr) == SQLITE_OK) {
|
||||
sqlite3_bind_text(st, 1, src->last_rel_updated.c_str(), -1, SQLITE_TRANSIENT);
|
||||
sqlite3_bind_text(st, 2, src->last_rel_id.c_str(), -1, SQLITE_TRANSIENT);
|
||||
while (sqlite3_step(st) == SQLITE_ROW) {
|
||||
ensure_edge_capacity(arena, graph, graph->edge_count + 1);
|
||||
const unsigned char* id_c = sqlite3_column_text(st, 0);
|
||||
const unsigned char* src_c = sqlite3_column_text(st, 1);
|
||||
const unsigned char* tgt_c = sqlite3_column_text(st, 2);
|
||||
const unsigned char* up_c = sqlite3_column_text(st, 3);
|
||||
if (!id_c || !src_c || !tgt_c) continue;
|
||||
uint64_t sh = fnv1a64((const char*)src_c);
|
||||
uint64_t th = fnv1a64((const char*)tgt_c);
|
||||
int sidx = graph->find_node_by_user_data(sh);
|
||||
int tidx = graph->find_node_by_user_data(th);
|
||||
if (sidx < 0 || tidx < 0) {
|
||||
if (has_upd && up_c) src->last_rel_updated = (const char*)up_c;
|
||||
src->last_rel_id = (const char*)id_c;
|
||||
continue;
|
||||
}
|
||||
graph->edges[graph->edge_count++] = graph_edge((uint32_t)sidx, (uint32_t)tidx);
|
||||
if (has_upd && up_c) src->last_rel_updated = (const char*)up_c;
|
||||
src->last_rel_id = (const char*)id_c;
|
||||
appended++;
|
||||
}
|
||||
sqlite3_finalize(st);
|
||||
}
|
||||
}
|
||||
|
||||
sqlite3_close(db);
|
||||
return appended;
|
||||
}
|
||||
|
||||
void graph_stream_close(GraphStreamSource* src) {
|
||||
delete src;
|
||||
}
|
||||
|
||||
} // namespace graph
|
||||
Reference in New Issue
Block a user