#!/usr/bin/env python3 """Enricher split_words — tokeniza texto en palabras (regex puro, offline). Pensado para probar el grouping de issue 0035 con volumen alto: cualquier parrafo decente supera el umbral de 50 trivialmente, asi se ve un Group cuadrado por flujo. Wire protocol estandar (issue 0026): - stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir, cache_dir, registry_root, params. - stderr: lineas `PROGRESS: ` para feedback de UI. - stdout: una linea JSON al final con resumen. - exit code 0 = ok, !=0 = error. Lectura del texto (igual que split_sentences): 1. `entities.notes` (panel Note del Inspector). 2. node_name (fallback minimo). Tokenizacion: split por whitespace + puntuacion, lowercased. Filtra tokens con `len < min_length` para evitar ruido (a, el, de, y, o, ...). Por defecto deduplica para devolver vocabulario unico (mas util para explorar contenido); con `dedupe=false` cada ocurrencia es un nodo (util para volumen / stress). Grouping (issue 0035c): mismo patron que split_sentences y web_search. """ from __future__ import annotations import json import os import re import sqlite3 import sys import time import uuid from datetime import datetime, timezone DEFAULT_GROUP_THRESHOLD = 50 GROUP_PREVIEW_K = 10 def _coerce_threshold(raw, default: int) -> int: """Acepta int / str numerico / None, devuelve >0 o el default (issue 0035e).""" if raw is None or raw == "": return default try: v = int(raw) except (TypeError, ValueError): return default return v if v > 0 else default # Tokenizer: secuencias de letras (incluye acentos espanyoles + apostrofo # interno tipo "don't"). Mas robusto que split por espacios para texto # real con puntuacion adyacente. Numeros se ignoran — pensado para # contenido natural, no datos. _TOKEN_RE = re.compile(r"[A-Za-zÁÉÍÓÚÜÑáéíóúüñ][A-Za-zÁÉÍÓÚÜÑáéíóúüñ'-]*") def progress(p: float, stage: str = "") -> None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() def log(msg: str) -> None: sys.stderr.write(f"{msg}\n") sys.stderr.flush() def now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def now_ms() -> int: return int(time.time() * 1000) def has_group_id_column(conn: sqlite3.Connection) -> bool: try: cur = conn.execute("PRAGMA table_info(entities)") for row in cur: if row[1] == "group_id": return True except sqlite3.Error: pass return False def read_text(ops_db_path: str, node_id: str, node_name: str) -> str: notes = "" try: c = sqlite3.connect(ops_db_path) try: row = c.execute( "SELECT notes FROM entities WHERE id=?", (node_id,) ).fetchone() if row and isinstance(row[0], str): notes = row[0] finally: c.close() except sqlite3.Error: notes = "" if notes and notes.strip(): return notes.strip() return (node_name or "").strip() def tokenize(text: str, *, min_length: int, dedupe: bool) -> list[str]: """Devuelve lista de tokens (lower) filtrados por min_length. Si `dedupe=True`, conserva solo la primera aparicion (preserva orden). """ seen: set[str] = set() out: list[str] = [] for m in _TOKEN_RE.finditer(text): tok = m.group(0).lower() if len(tok) < min_length: continue if dedupe: if tok in seen: continue seen.add(tok) out.append(tok) return out def insert_word(conn: sqlite3.Connection, *, word: str, rank: int, batch_id: str, group_id: str | None, has_group_col: bool) -> str: ts = now_iso() new_id = f"Word_{now_ms()}_{rank}" meta = { "word": word, "rank": rank, "batch_id": batch_id, } meta_json = json.dumps(meta, ensure_ascii=False) if has_group_col: conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " group_id, created_at, updated_at) " "VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?, ?)", (new_id, word, meta_json, group_id, ts, ts), ) else: conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " "VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?)", (new_id, word, meta_json, ts, ts), ) return new_id def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str, source_node_name: str, count: int, batch_id: str) -> str: ts = now_iso() new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}" name = f"split_words: {source_node_name} ({count})" meta = { "enricher": "split_words", "count": count, "batch_id": batch_id, "source_node_id": source_node_id, } meta_json = json.dumps(meta, ensure_ascii=False) conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " "VALUES (?, ?, 'Group', 'enricher:split_words', ?, ?, ?)", (new_id, name, meta_json, ts, ts), ) return new_id _REL_COUNTER = 0 def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: global _REL_COUNTER cur = conn.execute( "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " "AND name=? LIMIT 1", (from_id, to_id, name), ) if cur.fetchone(): return False ts = now_iso() _REL_COUNTER += 1 rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}" conn.execute( "INSERT INTO relations (id, name, from_entity, to_entity, " " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", (rel_id, name, from_id, to_id, ts, ts), ) return True def main() -> int: raw = sys.stdin.read() try: ctx = json.loads(raw) except Exception as e: log(f"stdin not valid JSON: {e}") return 2 node_id = ctx.get("node_id") or "" node_name = (ctx.get("node_name") or "").strip() ops_db_path = ctx.get("ops_db_path") or "" params = ctx.get("params") or {} max_words = int(params.get("max_words", 500)) min_length = int(params.get("min_length", 3)) dedupe_raw = params.get("dedupe", True) if isinstance(dedupe_raw, str): dedupe = dedupe_raw.lower() not in ("false", "0", "no", "") else: dedupe = bool(dedupe_raw) if not node_id or not ops_db_path: log("missing node_id / ops_db_path") return 2 ops_db_path = ops_db_path.replace("\\", "/") app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") if not os.path.isabs(ops_db_path): if app_dir_raw and os.path.isdir(app_dir_raw): cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) if os.path.exists(cand): ops_db_path = cand if not os.path.isabs(ops_db_path): ops_db_path = os.path.abspath(ops_db_path) if not os.path.exists(ops_db_path): log(f"ops_db_path no existe: {ops_db_path}") print(json.dumps({"error": "ops_db not found", "ops_db_path": ops_db_path, "entities_added": 0, "relations_added": 0})) return 7 progress(0.10, "reading") text = read_text(ops_db_path, node_id, node_name) # min_length aplica a tokens, no al texto entrante. Para texto entrante # exigimos algo razonable: al menos un token posible. if len(text.strip()) < min_length: msg = (f"texto demasiado corto ({len(text)} chars). Escribe el " f"contenido en el panel Note del nodo (doble click) o " f"pon un name mas largo.") log(msg) print(json.dumps({"error": msg, "entities_added": 0, "relations_added": 0})) return 2 progress(0.30, "tokenizing") words = tokenize(text, min_length=min_length, dedupe=dedupe) if max_words > 0: words = words[:max_words] if not words: msg = (f"sin tokens tras filtrar (texto de {len(text)} chars, " f"min_length={min_length}, dedupe={dedupe})") log(msg) print(json.dumps({"error": msg, "entities_added": 0, "relations_added": 0})) return 2 progress(0.55, "writing") conn = sqlite3.connect(ops_db_path) conn.execute("PRAGMA foreign_keys=OFF") entities_added = 0 relations_added = 0 group_id: str | None = None batch_id = uuid.uuid4().hex try: has_group_col = has_group_id_column(conn) n_total = len(words) # Issue 0035e: respeta override del manifest si viene en ctx. threshold = _coerce_threshold(ctx.get("auto_group_threshold"), DEFAULT_GROUP_THRESHOLD) if n_total >= threshold and has_group_col: group_id = insert_group_entity( conn, source_node_id=node_id, source_node_name=node_name or "(text)", count=n_total, batch_id=batch_id, ) entities_added += 1 if insert_relation(conn, group_id, node_id, "WORD_OF"): relations_added += 1 preview = words[:GROUP_PREVIEW_K] grouped = words[GROUP_PREVIEW_K:] else: preview = words grouped = [] for i, w in enumerate(preview): wid = insert_word( conn, word=w, rank=i + 1, batch_id=batch_id, group_id=None, has_group_col=has_group_col, ) entities_added += 1 if insert_relation(conn, wid, node_id, "WORD_OF"): relations_added += 1 for j, w in enumerate(grouped): rank = GROUP_PREVIEW_K + j + 1 wid = insert_word( conn, word=w, rank=rank, batch_id=batch_id, group_id=group_id, has_group_col=has_group_col, ) entities_added += 1 if insert_relation(conn, wid, node_id, "WORD_OF"): relations_added += 1 if grouped and j % 50 == 0: progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing") conn.commit() finally: conn.close() progress(1.0, "done") print(json.dumps({ "words": len(words), "entities_added": entities_added, "relations_added": relations_added, "batch_id": batch_id, "group_id": group_id or "", "grouped": bool(group_id), "deduped": dedupe, }, ensure_ascii=False)) return 0 if __name__ == "__main__": sys.exit(main())