Merge issue/split-words-enricher

This commit is contained in:
2026-05-04 00:14:57 +02:00
4 changed files with 501 additions and 0 deletions
+10
View File
@@ -0,0 +1,10 @@
id: split_words
name: "Split text into words"
description: "Tokeniza el texto del nodo (entities.notes con fallback a name) en palabras y crea un nodo Word por cada token. Pensado para probar grouping con volumen alto: cualquier parrafo decente supera el umbral de 50 trivialmente."
applies_to: [text, Text]
emits: [Word]
relations: [WORD_OF]
params:
- { name: max_words, type: int, default: 500 }
- { name: min_length, type: int, default: 3, description: "ignora tokens con menos de N caracteres (filtra ruido tipo 'a', 'el', 'de')" }
- { name: dedupe, type: bool, default: true, description: "si true, una palabra repetida produce un solo nodo" }
+325
View File
@@ -0,0 +1,325 @@
#!/usr/bin/env python3
"""Enricher split_words — tokeniza texto en palabras (regex puro, offline).
Pensado para probar el grouping de issue 0035 con volumen alto: cualquier
parrafo decente supera el umbral de 50 trivialmente, asi se ve un Group
cuadrado por flujo.
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
Lectura del texto (igual que split_sentences):
1. `entities.notes` (panel Note del Inspector).
2. node_name (fallback minimo).
Tokenizacion: split por whitespace + puntuacion, lowercased. Filtra
tokens con `len < min_length` para evitar ruido (a, el, de, y, o, ...).
Por defecto deduplica para devolver vocabulario unico (mas util para
explorar contenido); con `dedupe=false` cada ocurrencia es un nodo
(util para volumen / stress).
Grouping (issue 0035c): mismo patron que split_sentences y web_search.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
# Tokenizer: secuencias de letras (incluye acentos espanyoles + apostrofo
# interno tipo "don't"). Mas robusto que split por espacios para texto
# real con puntuacion adyacente. Numeros se ignoran — pensado para
# contenido natural, no datos.
_TOKEN_RE = re.compile(r"[A-Za-zÁÉÍÓÚÜÑáéíóúüñ][A-Za-zÁÉÍÓÚÜÑáéíóúüñ'-]*")
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def has_group_id_column(conn: sqlite3.Connection) -> bool:
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def read_text(ops_db_path: str, node_id: str, node_name: str) -> str:
notes = ""
try:
c = sqlite3.connect(ops_db_path)
try:
row = c.execute(
"SELECT notes FROM entities WHERE id=?", (node_id,)
).fetchone()
if row and isinstance(row[0], str):
notes = row[0]
finally:
c.close()
except sqlite3.Error:
notes = ""
if notes and notes.strip():
return notes.strip()
return (node_name or "").strip()
def tokenize(text: str, *, min_length: int, dedupe: bool) -> list[str]:
"""Devuelve lista de tokens (lower) filtrados por min_length.
Si `dedupe=True`, conserva solo la primera aparicion (preserva orden).
"""
seen: set[str] = set()
out: list[str] = []
for m in _TOKEN_RE.finditer(text):
tok = m.group(0).lower()
if len(tok) < min_length:
continue
if dedupe:
if tok in seen:
continue
seen.add(tok)
out.append(tok)
return out
def insert_word(conn: sqlite3.Connection, *, word: str, rank: int,
batch_id: str, group_id: str | None,
has_group_col: bool) -> str:
ts = now_iso()
new_id = f"Word_{now_ms()}_{rank}"
meta = {
"word": word,
"rank": rank,
"batch_id": batch_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" group_id, created_at, updated_at) "
"VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?, ?)",
(new_id, word, meta_json, group_id, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?)",
(new_id, word, meta_json, ts, ts),
)
return new_id
def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str,
source_node_name: str, count: int,
batch_id: str) -> str:
ts = now_iso()
new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}"
name = f"split_words: {source_node_name} ({count})"
meta = {
"enricher": "split_words",
"count": count,
"batch_id": batch_id,
"source_node_id": source_node_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Group', 'enricher:split_words', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
if cur.fetchone():
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
max_words = int(params.get("max_words", 500))
min_length = int(params.get("min_length", 3))
dedupe_raw = params.get("dedupe", True)
if isinstance(dedupe_raw, str):
dedupe = dedupe_raw.lower() not in ("false", "0", "no", "")
else:
dedupe = bool(dedupe_raw)
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path}")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 7
progress(0.10, "reading")
text = read_text(ops_db_path, node_id, node_name)
# min_length aplica a tokens, no al texto entrante. Para texto entrante
# exigimos algo razonable: al menos un token posible.
if len(text.strip()) < min_length:
msg = (f"texto demasiado corto ({len(text)} chars). Escribe el "
f"contenido en el panel Note del nodo (doble click) o "
f"pon un name mas largo.")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.30, "tokenizing")
words = tokenize(text, min_length=min_length, dedupe=dedupe)
if max_words > 0:
words = words[:max_words]
if not words:
msg = (f"sin tokens tras filtrar (texto de {len(text)} chars, "
f"min_length={min_length}, dedupe={dedupe})")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.55, "writing")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
group_id: str | None = None
batch_id = uuid.uuid4().hex
try:
has_group_col = has_group_id_column(conn)
n_total = len(words)
threshold = DEFAULT_GROUP_THRESHOLD
if n_total >= threshold and has_group_col:
group_id = insert_group_entity(
conn,
source_node_id=node_id,
source_node_name=node_name or "(text)",
count=n_total,
batch_id=batch_id,
)
entities_added += 1
if insert_relation(conn, group_id, node_id, "WORD_OF"):
relations_added += 1
preview = words[:GROUP_PREVIEW_K]
grouped = words[GROUP_PREVIEW_K:]
else:
preview = words
grouped = []
for i, w in enumerate(preview):
wid = insert_word(
conn, word=w, rank=i + 1, batch_id=batch_id,
group_id=None, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, wid, node_id, "WORD_OF"):
relations_added += 1
for j, w in enumerate(grouped):
rank = GROUP_PREVIEW_K + j + 1
wid = insert_word(
conn, word=w, rank=rank, batch_id=batch_id,
group_id=group_id, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, wid, node_id, "WORD_OF"):
relations_added += 1
if grouped and j % 50 == 0:
progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"words": len(words),
"entities_added": entities_added,
"relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
"deduped": dedupe,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())
+12
View File
@@ -134,6 +134,18 @@ entities:
- { name: rank, type: int }
- { name: batch_id, type: string }
# Word — token emitido por split_words. Pensado para probar grouping
# con volumen alto: cualquier parrafo decente supera el umbral de 50
# tokens trivialmente. Color amarillo para distinguir de Sentence.
- name: Word
color: "#FCD34D"
icon: ti-letter-w
principal_field: word
fields:
- { name: word, type: string, required: true }
- { name: rank, type: int }
- { name: batch_id, type: string }
# Nodo grupo — cuadrado (regla de forma). Issue 0035: contenedor para
# agrupar resultados de enrichers cuando exceden el umbral. Los hijos
# son entidades reales con `group_id` apuntando al Group.
+154
View File
@@ -0,0 +1,154 @@
"""Tests del enricher split_words (offline, regex puro).
Mismo patron que test_split_sentences.py: nodo text con `notes` largo,
verificamos tokens, fallback a name, threshold/grouping y dedupe.
"""
from __future__ import annotations
from conftest import (
base_ctx, list_entities, list_relations, make_node, run_enricher,
)
# Texto con suficientes palabras unicas para superar threshold (50)
# si dedupe=true: cuento ~85 unicas a ojo. Sin dedupe (count=true)
# todas las ocurrencias dan ~140+ tokens.
LONG_TEXT = (
"Las estrellas brillan suavemente sobre el horizonte mientras "
"la marea retrocede dejando huellas mojadas en la arena fina. "
"Caminamos lentamente conversando sobre proyectos antiguos, "
"ideas frescas, libros leidos durante el invierno pasado, "
"viajes pendientes hacia tierras lejanas con culturas vibrantes. "
"Recordamos infancias compartidas, amigos perdidos, victorias "
"modestas, fracasos instructivos. Cada palabra dibuja un mapa "
"diferente del territorio interno que habitamos. Los nombres de "
"ciudades antiguas resuenan: Estambul, Marrakech, Kioto, Lisboa, "
"Praga, Budapest, Cuzco, Cartagena. Tambien tecnologia: servidores, "
"bases datos, redes neuronales, modelos linguisticos, sistemas "
"distribuidos, criptografia moderna. La conversacion fluye sin "
"esfuerzo aparente entre dominios completamente distintos."
)
def test_split_words_creates_word_nodes(ops_db, app_dir, registry_root):
"""Texto corto < threshold genera Words sueltos sin Group."""
short_text = "uno dos tres cuatro cinco seis siete ocho nueve diez."
make_node(ops_db, node_id="t1", name="short",
type_ref="text", notes=short_text)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="short", node_type="text")
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
assert out["grouped"] is False
assert out["entities_added"] == out["words"]
assert out["words"] >= 5 # filtra <3 chars: dos, tres, cuatro, ...
words = list_entities(ops_db, type_ref="Word")
assert len(words) == out["words"]
def test_split_words_above_threshold_creates_group(ops_db, app_dir,
registry_root):
"""Texto largo (≥50 tokens unicos) → Group + 10 sueltos + resto."""
make_node(ops_db, node_id="t1", name="largo",
type_ref="text", notes=LONG_TEXT)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="largo", node_type="text")
rc, out, err = run_enricher("split_words", ctx, timeout=60)
assert rc == 0, err
assert out["words"] >= 50
assert out["grouped"] is True
assert out["group_id"]
# Group + words = entities_added.
assert out["entities_added"] == out["words"] + 1
# Hay 10 Words sueltos (group_id NULL) y resto agrupados.
import sqlite3
cn = sqlite3.connect(ops_db)
n_loose = cn.execute(
"SELECT count(*) FROM entities WHERE type_ref='Word' "
"AND group_id IS NULL"
).fetchone()[0]
n_grouped = cn.execute(
"SELECT count(*) FROM entities WHERE type_ref='Word' "
"AND group_id = ?", (out["group_id"],)
).fetchone()[0]
cn.close()
assert n_loose == 10
assert n_loose + n_grouped == out["words"]
def test_split_words_dedupe_default_true(ops_db, app_dir, registry_root):
"""Por defecto dedupe=true: 'casa casa casa' produce 1 Word."""
make_node(ops_db, node_id="t1", name="dup",
type_ref="text", notes="casa casa casa perro perro gato.")
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="dup", node_type="text")
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
assert out["deduped"] is True
# casa, perro, gato (todas ≥3 chars).
assert out["words"] == 3
def test_split_words_dedupe_false(ops_db, app_dir, registry_root):
"""Con dedupe=false cada ocurrencia es un nodo Word."""
make_node(ops_db, node_id="t1", name="dup",
type_ref="text", notes="casa casa casa perro perro gato.")
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="dup", node_type="text",
params={"dedupe": False})
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
assert out["deduped"] is False
# 3 casa + 2 perro + 1 gato = 6 tokens.
assert out["words"] == 6
def test_split_words_min_length_filters(ops_db, app_dir, registry_root):
"""min_length filtra tokens cortos. Default 3."""
make_node(ops_db, node_id="t1", name="cortos",
type_ref="text",
notes="a el de la y o un casa perro elefante.")
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="cortos", node_type="text")
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
# >=3 chars: casa, perro, elefante. (un=2, de=2, la=2 quedan fuera).
assert out["words"] == 3
def test_split_words_uses_notes_priority(ops_db, app_dir, registry_root):
"""Lee `entities.notes` por encima de node_name."""
make_node(ops_db, node_id="t1", name="ignorame",
type_ref="text",
notes="estos cinco tokens deberian ganar.")
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="ignorame", node_type="text")
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
# estos, cinco, tokens, deberian, ganar (todos ≥3).
assert out["words"] == 5
def test_split_words_no_text_fails(ops_db, app_dir, registry_root):
"""Sin notes y name corto → exit 2."""
make_node(ops_db, node_id="t1", name="x", type_ref="text", metadata={})
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="x", node_type="text")
rc, out, err = run_enricher("split_words", ctx)
assert rc == 2
assert out is not None
assert "demasiado corto" in (out.get("error") or "")
def test_split_words_max_words_truncates(ops_db, app_dir, registry_root):
"""max_words limita el output."""
make_node(ops_db, node_id="t1", name="largo",
type_ref="text", notes=LONG_TEXT)
ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root,
node_id="t1", node_name="largo", node_type="text",
params={"max_words": 12})
rc, out, err = run_enricher("split_words", ctx)
assert rc == 0, err
assert out["words"] == 12