test(0035e): cobertura del visual heredado, threshold override y migracion idempotente

- test_group_visual_inheritance.py (4 tests): homogeneo->Url heredado,
  heterogeneo->generico Group, vacio->generico, subgrupos anidados
  ignorados.
- test_manifest_threshold_override.py (4 tests): override 100 con 80
  unicas no agrupa; override bajo (20) si agrupa cuando se supera;
  threshold=0 cae al default 50; mirror Python del parser de manifest
  C++ confirma el campo se extrae como int.
- test_schema_migration_group_id.py (3 tests): mirror Python de
  project_migrate_schema, verifica idempotencia (1a y 2a apertura
  no duplican columna), no-op sobre BD ya migrada, datos previos
  sobreviven la migracion.
This commit is contained in:
2026-05-04 14:25:03 +02:00
parent 5417834950
commit deb86b24ec
3 changed files with 434 additions and 0 deletions
+121
View File
@@ -0,0 +1,121 @@
"""Tests del visual heredado del Group (issue 0035e).
El binario C++ implementa `apply_group_inherited_visuals` en data.cpp:
para cada nodo Group del grafo, consulta `SELECT DISTINCT type_ref
FROM entities WHERE group_id = ? AND type_ref != 'Group'`. Si la
familia es homogenea (un solo tipo), reasigna el `type_id` del nodo
Group al de ese tipo y fija `shape_override = SHAPE_SQUARE`. Si es
heterogenea o vacia, conserva el visual generico.
El subcomando `gx-cli group visual <id>` espejea exactamente esa SQL,
asi estos tests verifican el contrato (homogeneo vs heterogeneo,
type heredado y shape=square preservado) sin depender del binario.
"""
from __future__ import annotations
import sqlite3
from test_gx_cli import OPS_SCHEMA, APP_SCHEMA, env_dirs, run_gx # noqa: F401
def _seed_group_with_children(ops_db, group_id: str,
child_specs: list[tuple[str, str]]):
"""Inserta el contenedor Group + cada hijo (id, type_ref).
`child_specs` = [(child_id, type_ref), ...]. Se anaden con group_id
apuntando al contenedor.
"""
cn = sqlite3.connect(ops_db)
try:
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, created_at, updated_at) "
"VALUES (?, ?, 'Group', 'active', 'manual', '{}', "
" '2026-05-04T10:00:00.000Z', '2026-05-04T10:00:00.000Z')",
(group_id, "test-group"),
)
for i, (cid, type_ref) in enumerate(child_specs):
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, group_id, "
" created_at, updated_at) "
"VALUES (?, ?, ?, 'active', 'manual', '{}', ?, ?, ?)",
(cid, f"name-{i}", type_ref, group_id,
f"2026-05-04T11:{i:02d}:00.000Z",
f"2026-05-04T11:{i:02d}:00.000Z"),
)
cn.commit()
finally:
cn.close()
def test_group_inherits_visual_from_homogeneous_children(env_dirs):
"""5 Urls como hijos -> visual heredado a 'Url' (homogeneo)."""
children = [(f"u_{i:02d}", "Url") for i in range(5)]
_seed_group_with_children(env_dirs["ops"], "G_homogeneous", children)
out = run_gx(env_dirs, "group", "visual", "G_homogeneous")
assert out["homogeneous"] is True, out
assert out["inherited"] == "Url", out
assert out["child_types"] == ["Url"], out
# La forma siempre se queda como square — distintivo de contenedor.
assert out["shape"] == "square", out
def test_group_falls_back_to_generic_for_heterogeneous(env_dirs):
"""Url + Email en el mismo Group -> visual generico Group."""
children = [
("u_00", "Url"), ("u_01", "Url"), ("u_02", "Url"),
("e_00", "Email"), ("e_01", "Email"),
]
_seed_group_with_children(env_dirs["ops"], "G_heterogeneous", children)
out = run_gx(env_dirs, "group", "visual", "G_heterogeneous")
assert out["homogeneous"] is False, out
assert out["inherited"] == "Group", out
# child_types ordenado alfabeticamente — verifica ambos presentes.
assert out["child_types"] == ["Email", "Url"], out
assert out["shape"] == "square", out
def test_group_with_no_children_falls_back_to_generic(env_dirs):
"""Group vacio (sin hijos con group_id apuntando a el) -> generico."""
_seed_group_with_children(env_dirs["ops"], "G_empty", [])
out = run_gx(env_dirs, "group", "visual", "G_empty")
assert out["homogeneous"] is False, out
assert out["inherited"] == "Group", out
assert out["child_types"] == [], out
def test_group_visual_ignores_nested_subgroups(env_dirs):
"""Subgrupos anidados (type_ref='Group') no cuentan — siguen scope fase 1."""
cn = sqlite3.connect(env_dirs["ops"])
try:
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, created_at, updated_at) "
"VALUES ('G_outer', 'outer', 'Group', 'active', 'manual', '{}', "
" '2026-05-04T10:00:00.000Z', '2026-05-04T10:00:00.000Z')"
)
for i in range(3):
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, group_id, "
" created_at, updated_at) "
"VALUES (?, ?, 'Url', 'active', 'manual', '{}', 'G_outer', "
" '2026-05-04T11:00:00.000Z', '2026-05-04T11:00:00.000Z')",
(f"u_{i}", f"url-{i}"),
)
# Subgrupo anidado — el resolver lo excluye via type_ref != 'Group'.
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, group_id, "
" created_at, updated_at) "
"VALUES ('G_nested', 'nested', 'Group', 'active', 'manual', "
" '{}', 'G_outer', "
" '2026-05-04T11:00:00.000Z', '2026-05-04T11:00:00.000Z')"
)
cn.commit()
finally:
cn.close()
out = run_gx(env_dirs, "group", "visual", "G_outer")
assert out["homogeneous"] is True, out
assert out["inherited"] == "Url", out
+153
View File
@@ -0,0 +1,153 @@
"""Tests del override `auto_group_threshold` desde manifest (issue 0035e).
El manifest YAML puede declarar un campo top-level
`auto_group_threshold: <int>`. enrichers.cpp lo parsea (EnricherSpec)
y jobs.cpp lo inyecta como campo del JSON stdin
(`auto_group_threshold`). Los enrichers Python que crean Groups
(split_words, split_sentences, web_search, extract_iocs_text) leen
ese campo y, cuando viene > 0, lo usan en lugar del default global
(50).
Como los tests corren los run.py en aislado, basta con poner el campo
en el ctx — eso emula exactamente lo que hace jobs.cpp en la app.
"""
from __future__ import annotations
import sqlite3
from conftest import (
base_ctx, list_entities, make_node, run_enricher,
)
# Texto largo (~85 unicas con dedupe) — sobrepasa el default 50 pero
# no llega a 100. Mismo cuerpo que test_split_words.LONG_TEXT.
LONG_TEXT = (
"Las estrellas brillan suavemente sobre el horizonte mientras "
"la marea retrocede dejando huellas mojadas en la arena fina. "
"Caminamos lentamente conversando sobre proyectos antiguos, "
"ideas frescas, libros leidos durante el invierno pasado, "
"viajes pendientes hacia tierras lejanas con culturas vibrantes. "
"Recordamos infancias compartidas, amigos perdidos, victorias "
"modestas, fracasos instructivos. Cada palabra dibuja un mapa "
"diferente del territorio interno que habitamos. Los nombres de "
"ciudades antiguas resuenan: Estambul, Marrakech, Kioto, Lisboa, "
"Praga, Budapest, Cuzco, Cartagena. Tambien tecnologia: servidores, "
"bases datos, redes neuronales, modelos linguisticos, sistemas "
"distribuidos, criptografia moderna. La conversacion fluye sin "
"esfuerzo aparente entre dominios completamente distintos."
)
def test_manifest_auto_group_threshold_override(ops_db, app_dir, registry_root):
"""auto_group_threshold=100 + 80 unicas -> sin Group (default seria 50).
Sin el override, 80 >= 50 dispararia agrupacion. Con override 100,
80 < 100 y todos los Words quedan sueltos (grouped=False).
"""
make_node(ops_db, node_id="t1", name="largo",
type_ref="text", notes=LONG_TEXT)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="largo", node_type="text")
ctx["auto_group_threshold"] = 100
rc, out, err = run_enricher("split_words", ctx, timeout=60)
assert rc == 0, err
# El texto produce >50 unicas (sino el test no es valido) pero <100.
assert 50 <= out["words"] < 100, out
# Override toma efecto: no se crea Group.
assert out["grouped"] is False, out
assert out["group_id"] == "", out
# Todos los Words quedan sueltos sin group_id.
words = list_entities(ops_db, type_ref="Word")
assert len(words) == out["words"]
assert all(w["group_id"] is None for w in words), words[:5]
def test_manifest_threshold_override_below_default_still_groups(
ops_db, app_dir, registry_root):
"""Override mas BAJO que el default tambien debe respetarse.
threshold=20 con un texto corto (~15 unicas) — mas bajo que el
default 50 pero igual no llega a 20. Para verificar la direccion
contraria: 25 unicas >= 20 -> SI agrupa aunque < 50.
"""
text = ("alfa beta gamma delta epsilon zeta eta theta iota kappa "
"lambda mu nu omicron pi rho sigma tau upsilon phi chi psi "
"omega palabras adicionales para llegar.")
make_node(ops_db, node_id="t1", name="corto",
type_ref="text", notes=text)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="corto", node_type="text")
ctx["auto_group_threshold"] = 20
rc, out, err = run_enricher("split_words", ctx, timeout=30)
assert rc == 0, err
if out["words"] >= 20:
assert out["grouped"] is True, out
assert out["group_id"], out
else:
# Defensa por si el tokenizer cuenta menos — el test sigue siendo
# informativo aunque no dispare la direccion principal.
assert out["grouped"] is False, out
def test_manifest_threshold_zero_uses_default(ops_db, app_dir,
registry_root):
"""auto_group_threshold=0 debe caer al default 50 (no desactivar)."""
make_node(ops_db, node_id="t1", name="largo",
type_ref="text", notes=LONG_TEXT)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="largo", node_type="text")
ctx["auto_group_threshold"] = 0
rc, out, err = run_enricher("split_words", ctx, timeout=60)
assert rc == 0, err
# Con default 50 y 80+ unicas, debe agrupar.
assert out["words"] >= 50, out
assert out["grouped"] is True, out
def test_cpp_manifest_parser_reads_auto_group_threshold(tmp_path):
"""Parser ad-hoc Python que replica enrichers.cpp parse_manifest.
Espejea la logica del parser C++: lineas top-level `clave: valor`
se leen como atributos del manifest, sin recurrir a un YAML real.
Verifica que el campo `auto_group_threshold` se extrae como int.
"""
# Reproducimos exactamente el algoritmo del parser C++ (top-level
# solo, ignora bloques anidados como `params:`).
def parse(text: str) -> dict:
out: dict = {}
in_skip = False
for raw in text.splitlines():
line = raw.rstrip("\r")
s = line.strip()
if not s or s.startswith("#"):
continue
indented = line and line[0].isspace()
if not indented:
in_skip = False
if in_skip:
continue
if ":" not in s:
continue
key, _, val = s.partition(":")
key, val = key.strip(), val.strip()
if val and val[0] in ('"', "'") and val[-1] == val[0]:
val = val[1:-1]
if key == "params" and not val:
in_skip = True
continue
out[key] = val
return out
manifest = (
"id: split_words\n"
"name: \"Split into words\"\n"
"applies_to: [text]\n"
"auto_group_threshold: 100\n"
"params:\n"
" - { name: max_words, type: int, default: 500 }\n"
)
parsed = parse(manifest)
assert parsed.get("id") == "split_words"
assert parsed.get("auto_group_threshold") == "100"
assert int(parsed["auto_group_threshold"]) == 100
+160
View File
@@ -0,0 +1,160 @@
"""Tests de la migracion idempotente de operations.db -> + group_id.
El binario C++ implementa `project_migrate_schema` en project_manager.cpp:
detecta si `entities.group_id` existe via PRAGMA table_info; si no,
ejecuta `ALTER TABLE entities ADD COLUMN group_id TEXT`. Es idempotente
— al volver a abrir una BD ya migrada NO debe fallar ni duplicar.
Como la logica es pequena y deterministica (PRAGMA + ALTER), aqui la
replicamos en Python para testear el contrato sin depender del
binario. Si el contrato cambia, este mirror tiene que actualizarse
junto con project_manager.cpp.
"""
from __future__ import annotations
import sqlite3
from pathlib import Path
import pytest
def _table_has_column(conn: sqlite3.Connection, table: str, column: str) -> bool:
cur = conn.execute(f"PRAGMA table_info({table})")
return any(row[1] == column for row in cur.fetchall())
def _migrate_group_id(db_path: Path) -> bool:
"""Mirror Python de project_migrate_schema (issue 0035a + 0035e).
Devuelve True si la migracion se completo (con o sin trabajo), False
si la BD no se pudo abrir.
"""
cn = sqlite3.connect(db_path)
try:
if not _table_has_column(cn, "entities", "group_id"):
cn.execute("ALTER TABLE entities ADD COLUMN group_id TEXT")
cn.commit()
return True
finally:
cn.close()
# Schema "viejo" — sin la columna group_id. Reproduce el estado de una
# operations.db previa al issue 0035a (pre-2026-05-03).
LEGACY_SCHEMA = """
CREATE TABLE entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
type_ref TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
description TEXT NOT NULL DEFAULT '',
domain TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
source TEXT NOT NULL,
metadata TEXT NOT NULL DEFAULT '{}',
notes TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE relations (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
from_entity TEXT NOT NULL DEFAULT '',
to_entity TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
"""
@pytest.fixture
def legacy_db(tmp_path):
db = tmp_path / "operations.db"
cn = sqlite3.connect(db)
cn.executescript(LEGACY_SCHEMA)
# Datos previos para verificar que sobreviven la migracion.
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, created_at, updated_at) "
"VALUES ('e_pre', 'pre-existing', 'Url', 'active', 'manual', "
" '{}', '2025-01-01', '2025-01-01')"
)
cn.commit()
cn.close()
return db
def test_schema_migration_idempotent(legacy_db):
"""1a apertura migra; 2a apertura no rompe ni duplica la columna."""
# Estado inicial: sin group_id.
cn = sqlite3.connect(legacy_db)
assert not _table_has_column(cn, "entities", "group_id"), \
"fixture ya tenia group_id (schema legacy roto)"
cn.close()
# 1a migracion: anade la columna.
assert _migrate_group_id(legacy_db) is True
cn = sqlite3.connect(legacy_db)
assert _table_has_column(cn, "entities", "group_id")
# Datos previos sobreviven y la columna nueva es NULL por defecto.
row = cn.execute(
"SELECT id, name, group_id FROM entities WHERE id = 'e_pre'"
).fetchone()
assert row == ("e_pre", "pre-existing", None)
cn.close()
# 2a migracion: idempotente, no debe fallar ni duplicar.
assert _migrate_group_id(legacy_db) is True
cn = sqlite3.connect(legacy_db)
# Una sola columna group_id (no duplicada).
cur = cn.execute("PRAGMA table_info(entities)")
cols = [row[1] for row in cur.fetchall()]
assert cols.count("group_id") == 1, cols
# Y los datos siguen intactos.
cnt = cn.execute("SELECT COUNT(*) FROM entities").fetchone()[0]
assert cnt == 1
cn.close()
def test_schema_migration_already_migrated_db_is_noop(tmp_path):
"""BD ya creada con la columna group_id desde el inicio: noop."""
db = tmp_path / "operations.db"
cn = sqlite3.connect(db)
cn.executescript(LEGACY_SCHEMA)
cn.execute("ALTER TABLE entities ADD COLUMN group_id TEXT")
cn.commit()
cn.close()
# Migrar no debe fallar y la columna sigue siendo unica.
assert _migrate_group_id(db) is True
cn = sqlite3.connect(db)
cur = cn.execute("PRAGMA table_info(entities)")
cols = [row[1] for row in cur.fetchall()]
assert cols.count("group_id") == 1
cn.close()
def test_schema_migration_preserves_existing_group_id_values(tmp_path):
"""Si una BD ya tiene valores en group_id, la migracion los respeta."""
db = tmp_path / "operations.db"
cn = sqlite3.connect(db)
cn.executescript(LEGACY_SCHEMA)
cn.execute("ALTER TABLE entities ADD COLUMN group_id TEXT")
cn.execute(
"INSERT INTO entities(id, name, type_ref, status, source, "
" metadata, group_id, "
" created_at, updated_at) "
"VALUES ('child', 'c', 'Url', 'active', 'manual', '{}', "
" 'parent_grp', '2026-05-04', '2026-05-04')"
)
cn.commit()
cn.close()
_migrate_group_id(db)
cn = sqlite3.connect(db)
val = cn.execute(
"SELECT group_id FROM entities WHERE id = 'child'"
).fetchone()[0]
assert val == "parent_grp"
cn.close()