Files
egutierrez 52495af779 feat(0035e): manifest auto_group_threshold override + propagacion a Python
Manifest YAML puede declarar 'auto_group_threshold: <int>' a nivel
top-level. enrichers.cpp lo parsea y lo guarda en EnricherSpec.
jobs.cpp lo inyecta como campo opcional 'auto_group_threshold' en el
JSON stdin del subprocess. Los enrichers Python que crean Groups
(web_search, split_words, split_sentences, extract_iocs_text) leen el
campo y, si viene > 0, lo usan en lugar de su DEFAULT_GROUP_THRESHOLD.
Helper _coerce_threshold tolera int / str / None / 0 cayendo al default.
2026-05-04 14:20:52 +02:00

339 lines
11 KiB
Python

#!/usr/bin/env python3
"""Enricher split_words — tokeniza texto en palabras (regex puro, offline).
Pensado para probar el grouping de issue 0035 con volumen alto: cualquier
parrafo decente supera el umbral de 50 trivialmente, asi se ve un Group
cuadrado por flujo.
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
Lectura del texto (igual que split_sentences):
1. `entities.notes` (panel Note del Inspector).
2. node_name (fallback minimo).
Tokenizacion: split por whitespace + puntuacion, lowercased. Filtra
tokens con `len < min_length` para evitar ruido (a, el, de, y, o, ...).
Por defecto deduplica para devolver vocabulario unico (mas util para
explorar contenido); con `dedupe=false` cada ocurrencia es un nodo
(util para volumen / stress).
Grouping (issue 0035c): mismo patron que split_sentences y web_search.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
def _coerce_threshold(raw, default: int) -> int:
"""Acepta int / str numerico / None, devuelve >0 o el default (issue 0035e)."""
if raw is None or raw == "":
return default
try:
v = int(raw)
except (TypeError, ValueError):
return default
return v if v > 0 else default
# Tokenizer: secuencias de letras (incluye acentos espanyoles + apostrofo
# interno tipo "don't"). Mas robusto que split por espacios para texto
# real con puntuacion adyacente. Numeros se ignoran — pensado para
# contenido natural, no datos.
_TOKEN_RE = re.compile(r"[A-Za-zÁÉÍÓÚÜÑáéíóúüñ][A-Za-zÁÉÍÓÚÜÑáéíóúüñ'-]*")
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def has_group_id_column(conn: sqlite3.Connection) -> bool:
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def read_text(ops_db_path: str, node_id: str, node_name: str) -> str:
notes = ""
try:
c = sqlite3.connect(ops_db_path)
try:
row = c.execute(
"SELECT notes FROM entities WHERE id=?", (node_id,)
).fetchone()
if row and isinstance(row[0], str):
notes = row[0]
finally:
c.close()
except sqlite3.Error:
notes = ""
if notes and notes.strip():
return notes.strip()
return (node_name or "").strip()
def tokenize(text: str, *, min_length: int, dedupe: bool) -> list[str]:
"""Devuelve lista de tokens (lower) filtrados por min_length.
Si `dedupe=True`, conserva solo la primera aparicion (preserva orden).
"""
seen: set[str] = set()
out: list[str] = []
for m in _TOKEN_RE.finditer(text):
tok = m.group(0).lower()
if len(tok) < min_length:
continue
if dedupe:
if tok in seen:
continue
seen.add(tok)
out.append(tok)
return out
def insert_word(conn: sqlite3.Connection, *, word: str, rank: int,
batch_id: str, group_id: str | None,
has_group_col: bool) -> str:
ts = now_iso()
new_id = f"Word_{now_ms()}_{rank}"
meta = {
"word": word,
"rank": rank,
"batch_id": batch_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" group_id, created_at, updated_at) "
"VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?, ?)",
(new_id, word, meta_json, group_id, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Word', 'enricher:split_words', ?, ?, ?)",
(new_id, word, meta_json, ts, ts),
)
return new_id
def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str,
source_node_name: str, count: int,
batch_id: str) -> str:
ts = now_iso()
new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}"
name = f"split_words: {source_node_name} ({count})"
meta = {
"enricher": "split_words",
"count": count,
"batch_id": batch_id,
"source_node_id": source_node_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Group', 'enricher:split_words', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
if cur.fetchone():
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
max_words = int(params.get("max_words", 500))
min_length = int(params.get("min_length", 3))
dedupe_raw = params.get("dedupe", True)
if isinstance(dedupe_raw, str):
dedupe = dedupe_raw.lower() not in ("false", "0", "no", "")
else:
dedupe = bool(dedupe_raw)
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path}")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 7
progress(0.10, "reading")
text = read_text(ops_db_path, node_id, node_name)
# min_length aplica a tokens, no al texto entrante. Para texto entrante
# exigimos algo razonable: al menos un token posible.
if len(text.strip()) < min_length:
msg = (f"texto demasiado corto ({len(text)} chars). Escribe el "
f"contenido en el panel Note del nodo (doble click) o "
f"pon un name mas largo.")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.30, "tokenizing")
words = tokenize(text, min_length=min_length, dedupe=dedupe)
if max_words > 0:
words = words[:max_words]
if not words:
msg = (f"sin tokens tras filtrar (texto de {len(text)} chars, "
f"min_length={min_length}, dedupe={dedupe})")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.55, "writing")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
group_id: str | None = None
batch_id = uuid.uuid4().hex
try:
has_group_col = has_group_id_column(conn)
n_total = len(words)
# Issue 0035e: respeta override del manifest si viene en ctx.
threshold = _coerce_threshold(ctx.get("auto_group_threshold"),
DEFAULT_GROUP_THRESHOLD)
if n_total >= threshold and has_group_col:
group_id = insert_group_entity(
conn,
source_node_id=node_id,
source_node_name=node_name or "(text)",
count=n_total,
batch_id=batch_id,
)
entities_added += 1
if insert_relation(conn, group_id, node_id, "WORD_OF"):
relations_added += 1
preview = words[:GROUP_PREVIEW_K]
grouped = words[GROUP_PREVIEW_K:]
else:
preview = words
grouped = []
for i, w in enumerate(preview):
wid = insert_word(
conn, word=w, rank=i + 1, batch_id=batch_id,
group_id=None, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, wid, node_id, "WORD_OF"):
relations_added += 1
for j, w in enumerate(grouped):
rank = GROUP_PREVIEW_K + j + 1
wid = insert_word(
conn, word=w, rank=rank, batch_id=batch_id,
group_id=group_id, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, wid, node_id, "WORD_OF"):
relations_added += 1
if grouped and j % 50 == 0:
progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"words": len(words),
"entities_added": entities_added,
"relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
"deduped": dedupe,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())