diff --git a/enrichers/split_words/manifest.yaml b/enrichers/split_words/manifest.yaml new file mode 100644 index 0000000..dd26215 --- /dev/null +++ b/enrichers/split_words/manifest.yaml @@ -0,0 +1,10 @@ +id: split_words +name: "Split text into words" +description: "Tokeniza el texto del nodo (entities.notes con fallback a name) en palabras y crea un nodo Word por cada token. Pensado para probar grouping con volumen alto: cualquier parrafo decente supera el umbral de 50 trivialmente." +applies_to: [text, Text] +emits: [Word] +relations: [WORD_OF] +params: + - { name: max_words, type: int, default: 500 } + - { name: min_length, type: int, default: 3, description: "ignora tokens con menos de N caracteres (filtra ruido tipo 'a', 'el', 'de')" } + - { name: dedupe, type: bool, default: true, description: "si true, una palabra repetida produce un solo nodo" } diff --git a/enrichers/split_words/run.py b/enrichers/split_words/run.py new file mode 100644 index 0000000..9cdaf0a --- /dev/null +++ b/enrichers/split_words/run.py @@ -0,0 +1,325 @@ +#!/usr/bin/env python3 +"""Enricher split_words — tokeniza texto en palabras (regex puro, offline). + +Pensado para probar el grouping de issue 0035 con volumen alto: cualquier +parrafo decente supera el umbral de 50 trivialmente, asi se ve un Group +cuadrado por flujo. + +Wire protocol estandar (issue 0026): + - stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir, + cache_dir, registry_root, params. + - stderr: lineas `PROGRESS: ` para feedback de UI. + - stdout: una linea JSON al final con resumen. + - exit code 0 = ok, !=0 = error. + +Lectura del texto (igual que split_sentences): + 1. `entities.notes` (panel Note del Inspector). + 2. node_name (fallback minimo). + +Tokenizacion: split por whitespace + puntuacion, lowercased. Filtra +tokens con `len < min_length` para evitar ruido (a, el, de, y, o, ...). +Por defecto deduplica para devolver vocabulario unico (mas util para +explorar contenido); con `dedupe=false` cada ocurrencia es un nodo +(util para volumen / stress). + +Grouping (issue 0035c): mismo patron que split_sentences y web_search. +""" +from __future__ import annotations + +import json +import os +import re +import sqlite3 +import sys +import time +import uuid +from datetime import datetime, timezone + + +DEFAULT_GROUP_THRESHOLD = 50 +GROUP_PREVIEW_K = 10 + +# Tokenizer: secuencias de letras (incluye acentos espanyoles + apostrofo +# interno tipo "don't"). Mas robusto que split por espacios para texto +# real con puntuacion adyacente. Numeros se ignoran — pensado para +# contenido natural, no datos. +_TOKEN_RE = re.compile(r"[A-Za-zÁÉÍÓÚÜÑáéíóúüñ][A-Za-zÁÉÍÓÚÜÑáéíóúüñ'-]*") + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def has_group_id_column(conn: sqlite3.Connection) -> bool: + try: + cur = conn.execute("PRAGMA table_info(entities)") + for row in cur: + if row[1] == "group_id": + return True + except sqlite3.Error: + pass + return False + + +def read_text(ops_db_path: str, node_id: str, node_name: str) -> str: + notes = "" + try: + c = sqlite3.connect(ops_db_path) + try: + row = c.execute( + "SELECT notes FROM entities WHERE id=?", (node_id,) + ).fetchone() + if row and isinstance(row[0], str): + notes = row[0] + finally: + c.close() + except sqlite3.Error: + notes = "" + if notes and notes.strip(): + return notes.strip() + return (node_name or "").strip() + + +def tokenize(text: str, *, min_length: int, dedupe: bool) -> list[str]: + """Devuelve lista de tokens (lower) filtrados por min_length. + + Si `dedupe=True`, conserva solo la primera aparicion (preserva orden). + """ + seen: set[str] = set() + out: list[str] = [] + for m in _TOKEN_RE.finditer(text): + tok = m.group(0).lower() + if len(tok) < min_length: + continue + if dedupe: + if tok in seen: + continue + seen.add(tok) + out.append(tok) + return out + + +def insert_word(conn: sqlite3.Connection, *, word: str, rank: int, + batch_id: str, group_id: str | None, + has_group_col: bool) -> str: + ts = now_iso() + new_id = f"Word_{now_ms()}_{rank}" + meta = { + "word": word, + "rank": rank, + "batch_id": batch_id, + } + meta_json = json.dumps(meta, ensure_ascii=False) + if has_group_col: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " group_id, created_at, updated_at) " + "VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?, ?)", + (new_id, word, meta_json, group_id, ts, ts), + ) + else: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?)", + (new_id, word, meta_json, ts, ts), + ) + return new_id + + +def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str, + source_node_name: str, count: int, + batch_id: str) -> str: + ts = now_iso() + new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}" + name = f"split_words: {source_node_name} ({count})" + meta = { + "enricher": "split_words", + "count": count, + "batch_id": batch_id, + "source_node_id": source_node_id, + } + meta_json = json.dumps(meta, ensure_ascii=False) + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Group', 'enricher:split_words', ?, ?, ?)", + (new_id, name, meta_json, ts, ts), + ) + return new_id + + +_REL_COUNTER = 0 + + +def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, + name: str) -> bool: + global _REL_COUNTER + cur = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " + "AND name=? LIMIT 1", + (from_id, to_id, name), + ) + if cur.fetchone(): + return False + ts = now_iso() + _REL_COUNTER += 1 + rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}" + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, " + " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", + (rel_id, name, from_id, to_id, ts, ts), + ) + return True + + +def main() -> int: + raw = sys.stdin.read() + try: + ctx = json.loads(raw) + except Exception as e: + log(f"stdin not valid JSON: {e}") + return 2 + + node_id = ctx.get("node_id") or "" + node_name = (ctx.get("node_name") or "").strip() + ops_db_path = ctx.get("ops_db_path") or "" + params = ctx.get("params") or {} + max_words = int(params.get("max_words", 500)) + min_length = int(params.get("min_length", 3)) + dedupe_raw = params.get("dedupe", True) + if isinstance(dedupe_raw, str): + dedupe = dedupe_raw.lower() not in ("false", "0", "no", "") + else: + dedupe = bool(dedupe_raw) + + if not node_id or not ops_db_path: + log("missing node_id / ops_db_path") + return 2 + + ops_db_path = ops_db_path.replace("\\", "/") + app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") + if not os.path.isabs(ops_db_path): + if app_dir_raw and os.path.isdir(app_dir_raw): + cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) + if os.path.exists(cand): + ops_db_path = cand + if not os.path.isabs(ops_db_path): + ops_db_path = os.path.abspath(ops_db_path) + + if not os.path.exists(ops_db_path): + log(f"ops_db_path no existe: {ops_db_path}") + print(json.dumps({"error": "ops_db not found", + "ops_db_path": ops_db_path, + "entities_added": 0, "relations_added": 0})) + return 7 + + progress(0.10, "reading") + text = read_text(ops_db_path, node_id, node_name) + # min_length aplica a tokens, no al texto entrante. Para texto entrante + # exigimos algo razonable: al menos un token posible. + if len(text.strip()) < min_length: + msg = (f"texto demasiado corto ({len(text)} chars). Escribe el " + f"contenido en el panel Note del nodo (doble click) o " + f"pon un name mas largo.") + log(msg) + print(json.dumps({"error": msg, "entities_added": 0, + "relations_added": 0})) + return 2 + + progress(0.30, "tokenizing") + words = tokenize(text, min_length=min_length, dedupe=dedupe) + if max_words > 0: + words = words[:max_words] + + if not words: + msg = (f"sin tokens tras filtrar (texto de {len(text)} chars, " + f"min_length={min_length}, dedupe={dedupe})") + log(msg) + print(json.dumps({"error": msg, "entities_added": 0, + "relations_added": 0})) + return 2 + + progress(0.55, "writing") + conn = sqlite3.connect(ops_db_path) + conn.execute("PRAGMA foreign_keys=OFF") + entities_added = 0 + relations_added = 0 + group_id: str | None = None + batch_id = uuid.uuid4().hex + try: + has_group_col = has_group_id_column(conn) + n_total = len(words) + threshold = DEFAULT_GROUP_THRESHOLD + + if n_total >= threshold and has_group_col: + group_id = insert_group_entity( + conn, + source_node_id=node_id, + source_node_name=node_name or "(text)", + count=n_total, + batch_id=batch_id, + ) + entities_added += 1 + if insert_relation(conn, group_id, node_id, "WORD_OF"): + relations_added += 1 + preview = words[:GROUP_PREVIEW_K] + grouped = words[GROUP_PREVIEW_K:] + else: + preview = words + grouped = [] + + for i, w in enumerate(preview): + wid = insert_word( + conn, word=w, rank=i + 1, batch_id=batch_id, + group_id=None, has_group_col=has_group_col, + ) + entities_added += 1 + if insert_relation(conn, wid, node_id, "WORD_OF"): + relations_added += 1 + + for j, w in enumerate(grouped): + rank = GROUP_PREVIEW_K + j + 1 + wid = insert_word( + conn, word=w, rank=rank, batch_id=batch_id, + group_id=group_id, has_group_col=has_group_col, + ) + entities_added += 1 + if insert_relation(conn, wid, node_id, "WORD_OF"): + relations_added += 1 + if grouped and j % 50 == 0: + progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing") + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "words": len(words), + "entities_added": entities_added, + "relations_added": relations_added, + "batch_id": batch_id, + "group_id": group_id or "", + "grouped": bool(group_id), + "deduped": dedupe, + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/types.yaml b/examples/types.yaml index c326081..3d66260 100644 --- a/examples/types.yaml +++ b/examples/types.yaml @@ -134,6 +134,18 @@ entities: - { name: rank, type: int } - { name: batch_id, type: string } + # Word — token emitido por split_words. Pensado para probar grouping + # con volumen alto: cualquier parrafo decente supera el umbral de 50 + # tokens trivialmente. Color amarillo para distinguir de Sentence. + - name: Word + color: "#FCD34D" + icon: ti-letter-w + principal_field: word + fields: + - { name: word, type: string, required: true } + - { name: rank, type: int } + - { name: batch_id, type: string } + # Nodo grupo — cuadrado (regla de forma). Issue 0035: contenedor para # agrupar resultados de enrichers cuando exceden el umbral. Los hijos # son entidades reales con `group_id` apuntando al Group. diff --git a/tests/test_split_words.py b/tests/test_split_words.py new file mode 100644 index 0000000..f883e00 --- /dev/null +++ b/tests/test_split_words.py @@ -0,0 +1,154 @@ +"""Tests del enricher split_words (offline, regex puro). + +Mismo patron que test_split_sentences.py: nodo text con `notes` largo, +verificamos tokens, fallback a name, threshold/grouping y dedupe. +""" +from __future__ import annotations + +from conftest import ( + base_ctx, list_entities, list_relations, make_node, run_enricher, +) + + +# Texto con suficientes palabras unicas para superar threshold (50) +# si dedupe=true: cuento ~85 unicas a ojo. Sin dedupe (count=true) +# todas las ocurrencias dan ~140+ tokens. +LONG_TEXT = ( + "Las estrellas brillan suavemente sobre el horizonte mientras " + "la marea retrocede dejando huellas mojadas en la arena fina. " + "Caminamos lentamente conversando sobre proyectos antiguos, " + "ideas frescas, libros leidos durante el invierno pasado, " + "viajes pendientes hacia tierras lejanas con culturas vibrantes. " + "Recordamos infancias compartidas, amigos perdidos, victorias " + "modestas, fracasos instructivos. Cada palabra dibuja un mapa " + "diferente del territorio interno que habitamos. Los nombres de " + "ciudades antiguas resuenan: Estambul, Marrakech, Kioto, Lisboa, " + "Praga, Budapest, Cuzco, Cartagena. Tambien tecnologia: servidores, " + "bases datos, redes neuronales, modelos linguisticos, sistemas " + "distribuidos, criptografia moderna. La conversacion fluye sin " + "esfuerzo aparente entre dominios completamente distintos." +) + + +def test_split_words_creates_word_nodes(ops_db, app_dir, registry_root): + """Texto corto < threshold genera Words sueltos sin Group.""" + short_text = "uno dos tres cuatro cinco seis siete ocho nueve diez." + make_node(ops_db, node_id="t1", name="short", + type_ref="text", notes=short_text) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="short", node_type="text") + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + assert out["grouped"] is False + assert out["entities_added"] == out["words"] + assert out["words"] >= 5 # filtra <3 chars: dos, tres, cuatro, ... + + words = list_entities(ops_db, type_ref="Word") + assert len(words) == out["words"] + + +def test_split_words_above_threshold_creates_group(ops_db, app_dir, + registry_root): + """Texto largo (≥50 tokens unicos) → Group + 10 sueltos + resto.""" + make_node(ops_db, node_id="t1", name="largo", + type_ref="text", notes=LONG_TEXT) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="largo", node_type="text") + rc, out, err = run_enricher("split_words", ctx, timeout=60) + assert rc == 0, err + assert out["words"] >= 50 + assert out["grouped"] is True + assert out["group_id"] + # Group + words = entities_added. + assert out["entities_added"] == out["words"] + 1 + + # Hay 10 Words sueltos (group_id NULL) y resto agrupados. + import sqlite3 + cn = sqlite3.connect(ops_db) + n_loose = cn.execute( + "SELECT count(*) FROM entities WHERE type_ref='Word' " + "AND group_id IS NULL" + ).fetchone()[0] + n_grouped = cn.execute( + "SELECT count(*) FROM entities WHERE type_ref='Word' " + "AND group_id = ?", (out["group_id"],) + ).fetchone()[0] + cn.close() + assert n_loose == 10 + assert n_loose + n_grouped == out["words"] + + +def test_split_words_dedupe_default_true(ops_db, app_dir, registry_root): + """Por defecto dedupe=true: 'casa casa casa' produce 1 Word.""" + make_node(ops_db, node_id="t1", name="dup", + type_ref="text", notes="casa casa casa perro perro gato.") + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="dup", node_type="text") + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + assert out["deduped"] is True + # casa, perro, gato (todas ≥3 chars). + assert out["words"] == 3 + + +def test_split_words_dedupe_false(ops_db, app_dir, registry_root): + """Con dedupe=false cada ocurrencia es un nodo Word.""" + make_node(ops_db, node_id="t1", name="dup", + type_ref="text", notes="casa casa casa perro perro gato.") + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="dup", node_type="text", + params={"dedupe": False}) + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + assert out["deduped"] is False + # 3 casa + 2 perro + 1 gato = 6 tokens. + assert out["words"] == 6 + + +def test_split_words_min_length_filters(ops_db, app_dir, registry_root): + """min_length filtra tokens cortos. Default 3.""" + make_node(ops_db, node_id="t1", name="cortos", + type_ref="text", + notes="a el de la y o un casa perro elefante.") + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="cortos", node_type="text") + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + # >=3 chars: casa, perro, elefante. (un=2, de=2, la=2 quedan fuera). + assert out["words"] == 3 + + +def test_split_words_uses_notes_priority(ops_db, app_dir, registry_root): + """Lee `entities.notes` por encima de node_name.""" + make_node(ops_db, node_id="t1", name="ignorame", + type_ref="text", + notes="estos cinco tokens deberian ganar.") + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="ignorame", node_type="text") + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + # estos, cinco, tokens, deberian, ganar (todos ≥3). + assert out["words"] == 5 + + +def test_split_words_no_text_fails(ops_db, app_dir, registry_root): + """Sin notes y name corto → exit 2.""" + make_node(ops_db, node_id="t1", name="x", type_ref="text", metadata={}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="x", node_type="text") + rc, out, err = run_enricher("split_words", ctx) + assert rc == 2 + assert out is not None + assert "demasiado corto" in (out.get("error") or "") + + +def test_split_words_max_words_truncates(ops_db, app_dir, registry_root): + """max_words limita el output.""" + make_node(ops_db, node_id="t1", name="largo", + type_ref="text", notes=LONG_TEXT) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="largo", node_type="text", + params={"max_words": 12}) + rc, out, err = run_enricher("split_words", ctx) + assert rc == 0, err + assert out["words"] == 12