#!/usr/bin/env python3 """Enricher split_sentences — parte texto en frases (regex puro, offline). Wire protocol estandar (issue 0026): - stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir, cache_dir, registry_root, params. - stderr: lineas `PROGRESS: ` para feedback de UI. - stdout: una linea JSON al final con resumen. - exit code 0 = ok, !=0 = error. Lectura del texto (en orden de prioridad): 1. `entities.notes` (lo que el usuario escribe en el panel Note via doble click — sitio canonico de texto largo) 2. node_name (titulo del nodo, fallback minimo) Si tras esto el texto es < min_length, falla con exit 2 y mensaje claro. Grouping (issue 0035c, mismo patron que web_search): - Si len(sentences) >= GROUP_THRESHOLD y la BD soporta group_id: * Crea Group `type_ref='Group'` colgando del source con SENTENCE_OF. * Primeras GROUP_PREVIEW_K frases sueltas (group_id=NULL). * Resto con group_id apuntando al Group recien creado. - Si None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() def log(msg: str) -> None: sys.stderr.write(f"{msg}\n") sys.stderr.flush() def now_iso() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") def now_ms() -> int: return int(time.time() * 1000) def has_group_id_column(conn: sqlite3.Connection) -> bool: """Detecta si la columna `group_id` existe en `entities`. El schema actual la incluye (issue 0035a) pero las BDs viejas pueden no tenerla. Si no esta, insertamos sin esa columna. """ try: cur = conn.execute("PRAGMA table_info(entities)") for row in cur: if row[1] == "group_id": return True except sqlite3.Error: pass return False def read_text(ops_db_path: str, node_id: str, node_name: str) -> str: """Resuelve el texto a procesar. Prioridad: 1. `entities.notes` del nodo (lo que el usuario escribe en el panel Note via doble click). Es el sitio canonico para texto largo. 2. `node_name` (titulo del nodo) como fallback minimo. """ notes = "" try: c = sqlite3.connect(ops_db_path) try: row = c.execute( "SELECT notes FROM entities WHERE id=?", (node_id,) ).fetchone() if row and isinstance(row[0], str): notes = row[0] finally: c.close() except sqlite3.Error: notes = "" if notes and notes.strip(): return notes.strip() return (node_name or "").strip() def split_into_sentences(text: str, min_length: int) -> list[str]: """Aplica el regex de split y filtra por longitud minima.""" parts = _SENT_SPLIT_RE.split(text) out: list[str] = [] for p in parts: s = p.strip() if len(s) < min_length: continue out.append(s) return out def insert_sentence(conn: sqlite3.Connection, *, sentence: str, rank: int, batch_id: str, group_id: str | None, has_group_col: bool) -> str: """Inserta un nodo Sentence y devuelve su id. No deduplica — cada ejecucion crea entidades nuevas (las frases pueden repetirse entre ejecuciones distintas y el rank/batch las distingue). """ ts = now_iso() new_id = f"Sentence_{now_ms()}_{rank}" name = sentence[:80] + ("..." if len(sentence) > 80 else "") meta = { "text": sentence, "rank": rank, "batch_id": batch_id, } meta_json = json.dumps(meta, ensure_ascii=False) if has_group_col: conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " group_id, created_at, updated_at) " "VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?, ?)", (new_id, name, meta_json, group_id, ts, ts), ) else: conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " "VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?)", (new_id, name, meta_json, ts, ts), ) return new_id def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str, source_node_name: str, count: int, batch_id: str) -> str: ts = now_iso() new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}" name = f"split_sentences: {source_node_name} ({count})" meta = { "enricher": "split_sentences", "count": count, "batch_id": batch_id, "source_node_id": source_node_id, } meta_json = json.dumps(meta, ensure_ascii=False) conn.execute( "INSERT INTO entities (id, name, type_ref, source, metadata, " " created_at, updated_at) " "VALUES (?, ?, 'Group', 'enricher:split_sentences', ?, ?, ?)", (new_id, name, meta_json, ts, ts), ) return new_id _REL_COUNTER = 0 def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, name: str) -> bool: global _REL_COUNTER cur = conn.execute( "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " "AND name=? LIMIT 1", (from_id, to_id, name), ) if cur.fetchone(): return False ts = now_iso() _REL_COUNTER += 1 rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}" conn.execute( "INSERT INTO relations (id, name, from_entity, to_entity, " " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", (rel_id, name, from_id, to_id, ts, ts), ) return True def main() -> int: raw = sys.stdin.read() try: ctx = json.loads(raw) except Exception as e: log(f"stdin not valid JSON: {e}") return 2 node_id = ctx.get("node_id") or "" node_name = (ctx.get("node_name") or "").strip() metadata = ctx.get("metadata") or {} if isinstance(metadata, str): try: metadata = json.loads(metadata) except Exception: metadata = {} ops_db_path = ctx.get("ops_db_path") or "" params = ctx.get("params") or {} max_sentences = int(params.get("max_sentences", 200)) min_length = int(params.get("min_length", 20)) if not node_id or not ops_db_path: log("missing node_id / ops_db_path") return 2 # Normalizar y resolver path como en web_search. ops_db_path = ops_db_path.replace("\\", "/") app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") if not os.path.isabs(ops_db_path): if app_dir_raw and os.path.isdir(app_dir_raw): cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) if os.path.exists(cand): ops_db_path = cand if not os.path.isabs(ops_db_path): ops_db_path = os.path.abspath(ops_db_path) if not os.path.exists(ops_db_path): log(f"ops_db_path no existe: {ops_db_path}") print(json.dumps({"error": "ops_db not found", "ops_db_path": ops_db_path, "entities_added": 0, "relations_added": 0})) return 7 progress(0.10, "reading") text = read_text(ops_db_path, node_id, node_name) if len(text) < min_length: msg = (f"texto demasiado corto ({len(text)} chars < {min_length}). " f"Escribe el texto en el panel Note del nodo (doble click " f"para abrir) o pon un name mas largo") log(msg) print(json.dumps({"error": msg, "entities_added": 0, "relations_added": 0})) return 2 progress(0.30, "splitting") sentences = split_into_sentences(text, min_length) if max_sentences > 0: sentences = sentences[:max_sentences] if not sentences: msg = (f"sin frases tras split (texto de {len(text)} chars, " f"min_length={min_length})") log(msg) print(json.dumps({"error": msg, "entities_added": 0, "relations_added": 0})) return 2 progress(0.55, "writing") conn = sqlite3.connect(ops_db_path) conn.execute("PRAGMA foreign_keys=OFF") entities_added = 0 relations_added = 0 group_id: str | None = None batch_id = uuid.uuid4().hex try: has_group_col = has_group_id_column(conn) n_total = len(sentences) threshold = DEFAULT_GROUP_THRESHOLD if n_total >= threshold and has_group_col: group_id = insert_group_entity( conn, source_node_id=node_id, source_node_name=node_name or "(text)", count=n_total, batch_id=batch_id, ) entities_added += 1 if insert_relation(conn, group_id, node_id, "SENTENCE_OF"): relations_added += 1 preview = sentences[:GROUP_PREVIEW_K] grouped = sentences[GROUP_PREVIEW_K:] else: preview = sentences grouped = [] # Frases sueltas (preview). for i, s in enumerate(preview): sid = insert_sentence( conn, sentence=s, rank=i + 1, batch_id=batch_id, group_id=None, has_group_col=has_group_col, ) entities_added += 1 if insert_relation(conn, sid, node_id, "SENTENCE_OF"): relations_added += 1 # Frases agrupadas — siguen colgando del source con SENTENCE_OF. for j, s in enumerate(grouped): rank = GROUP_PREVIEW_K + j + 1 sid = insert_sentence( conn, sentence=s, rank=rank, batch_id=batch_id, group_id=group_id, has_group_col=has_group_col, ) entities_added += 1 if insert_relation(conn, sid, node_id, "SENTENCE_OF"): relations_added += 1 if grouped and j % 25 == 0: progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing") conn.commit() finally: conn.close() progress(1.0, "done") print(json.dumps({ "sentences": len(sentences), "entities_added": entities_added, "relations_added": relations_added, "batch_id": batch_id, "group_id": group_id or "", "grouped": bool(group_id), }, ensure_ascii=False)) return 0 if __name__ == "__main__": sys.exit(main())