diff --git a/enrichers/extract_iocs_text/manifest.yaml b/enrichers/extract_iocs_text/manifest.yaml new file mode 100644 index 0000000..3c0599f --- /dev/null +++ b/enrichers/extract_iocs_text/manifest.yaml @@ -0,0 +1,11 @@ +id: extract_iocs_text +name: "Extract IoCs from text" +description: "Extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) directamente del texto del nodo. No requiere fetch previo. Sin red." +applies_to: [text, Text] +emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone] +relations: [EXTRACTED_FROM] +uses_functions: + - extract_iocs_py_cybersecurity +params: + - { name: types, type: string, default: "", description: "CSV de tipos a extraer; vacio = todos" } + - { name: max_entities, type: int, default: 500 } diff --git a/enrichers/extract_iocs_text/run.py b/enrichers/extract_iocs_text/run.py new file mode 100644 index 0000000..b5204a1 --- /dev/null +++ b/enrichers/extract_iocs_text/run.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 +"""Enricher extract_iocs_text — variante offline de extract_text_entities. + +A diferencia de extract_text_entities, este enricher NO depende de un +markdown cacheado (fetch_webpage previo). Lee el texto directamente del +nodo (`metadata.text` > `metadata.description` > `metadata.query` > +`node_name`) y aplica el pipeline `extract_iocs` del registry sobre el. + +Sin red, sin dependencias externas — pensado para probar la app +cuando DDG bloquea con captcha o cuando se trabaja en un entorno +offline. + +Wire protocol estandar (issue 0026). + +Grouping (provisional, fase 1 del issue 0035): + Si len(unique_iocs) >= GROUP_THRESHOLD, se crea UN solo Group + heterogeneo con todos los IoCs dentro (independientemente de su tipo). + La decision 6 del issue 0035 ("multi-tipo → un Group por tipo") es + fase 2 — aqui simplificamos para validar el flow end-to-end. + Cuando llegue la fase 2, esta logica se sustituira por una que cree + N grupos (uno por type_ref que exceda el umbral). +""" +from __future__ import annotations + +import json +import os +import sqlite3 +import sys +import time +import uuid +from datetime import datetime, timezone + + +_TYPE_MAP = { + "email": ("Email", "address"), + "ip_address": ("IPAddress", "address"), + "domain": ("Domain", "name"), + "file_hash": ("FileHash", "value"), + "crypto_wallet": ("CryptoWallet", "address"), + "cve_id": ("CVE", "id"), + "mac_address": ("MACAddress", "address"), + "phone_number": ("Phone", "number"), +} + + +DEFAULT_GROUP_THRESHOLD = 50 +GROUP_PREVIEW_K = 10 + + +def progress(p: float, stage: str = "") -> None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def has_group_id_column(conn: sqlite3.Connection) -> bool: + try: + cur = conn.execute("PRAGMA table_info(entities)") + for row in cur: + if row[1] == "group_id": + return True + except sqlite3.Error: + pass + return False + + +def read_text(metadata: dict, node_name: str) -> str: + for key in ("text", "description", "query"): + v = metadata.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + return (node_name or "").strip() + + +def main() -> int: + raw = sys.stdin.read() + try: + ctx = json.loads(raw) + except Exception as e: + log(f"stdin not valid JSON: {e}") + return 2 + + node_id = ctx.get("node_id") or "" + node_name = (ctx.get("node_name") or "").strip() + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db_path = ctx.get("ops_db_path") or "" + app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") + registry_root = ctx.get("registry_root") or "" + params = ctx.get("params") or {} + + types_csv = (params.get("types") or "").strip() + types_list = ([t.strip() for t in types_csv.split(",") if t.strip()] + if types_csv else None) + max_entities = int(params.get("max_entities", 500)) + + if not node_id or not ops_db_path: + log("missing node_id / ops_db_path") + return 2 + + ops_db_path = ops_db_path.replace("\\", "/") + if not os.path.isabs(ops_db_path): + if app_dir_raw and os.path.isdir(app_dir_raw): + cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) + if os.path.exists(cand): + ops_db_path = cand + if not os.path.isabs(ops_db_path): + ops_db_path = os.path.abspath(ops_db_path) + if not os.path.exists(ops_db_path): + log(f"ops_db_path no existe: {ops_db_path}") + print(json.dumps({"error": "ops_db not found", + "ops_db_path": ops_db_path, + "entities_added": 0, "relations_added": 0})) + return 7 + + progress(0.10, "reading") + text = read_text(metadata, node_name) + if not text: + msg = ("nodo sin texto. Esperaba metadata.text / description / " + "query, o un name con contenido") + log(msg) + print(json.dumps({"error": msg, "entities_added": 0, + "relations_added": 0})) + return 2 + + progress(0.30, "extracting iocs") + # Vendoring: prefiere _vendored/ si existe (binario distribuido); si + # no, fallback al registry_root para modo dev local. + vendored = os.path.join(os.path.dirname(__file__), "_vendored") + if os.path.isdir(vendored): + if vendored not in sys.path: + sys.path.insert(0, vendored) + elif registry_root: + py_funcs = os.path.join(registry_root, "python", "functions") + if py_funcs not in sys.path: + sys.path.insert(0, py_funcs) + try: + from cybersecurity.extract_iocs import extract_iocs # type: ignore + except Exception as e: + log(f"no se pudo importar extract_iocs: {e}") + print(json.dumps({"error": f"extract_iocs import failed: {e}", + "entities_added": 0, "relations_added": 0})) + return 5 + + iocs = extract_iocs(text, types_list) + + # Dedup por (type, value). + seen = set() + unique: list[dict] = [] + for it in iocs: + t = it.get("type") + v = it.get("value") or it.get("address") or it.get("name") or "" + if not t or not v: + continue + key = (t, v) + if key in seen: + continue + seen.add(key) + unique.append(it) + if len(unique) >= max_entities: + break + + progress(0.55, "writing") + conn = sqlite3.connect(ops_db_path) + conn.execute("PRAGMA foreign_keys=OFF") + entities_added = 0 + relations_added = 0 + new_by_type: dict[str, int] = {} + group_id: str | None = None + batch_id = uuid.uuid4().hex + + try: + has_group_col = has_group_id_column(conn) + n_total = len(unique) + threshold = DEFAULT_GROUP_THRESHOLD + + if n_total >= threshold and has_group_col: + # Group heterogeneo (provisional, ver docstring). + ts = now_iso() + group_id = (f"Group_{now_ms()}_" + f"{abs(hash(node_id + batch_id)) % 100000}") + group_name = f"iocs: {node_name or '(text)'} ({n_total})" + group_meta = { + "enricher": "extract_iocs_text", + "count": n_total, + "batch_id": batch_id, + "source_node_id": node_id, + } + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Group', 'enricher:extract_iocs_text', ?, ?, ?)", + (group_id, group_name, json.dumps(group_meta, ensure_ascii=False), + ts, ts), + ) + entities_added += 1 + # Relacion EXTRACTED_FROM del Group al source. + rel_id = f"rel_{now_ms()}_g_extracted" + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, " + " created_at, updated_at) " + "VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)", + (rel_id, group_id, node_id, ts, ts), + ) + relations_added += 1 + preview = unique[:GROUP_PREVIEW_K] + grouped = unique[GROUP_PREVIEW_K:] + else: + preview = unique + grouped = [] + + def _insert_one(it: dict, idx: int, *, in_group: bool) -> None: + nonlocal entities_added, relations_added + ioc_type = it.get("type") + value = (it.get("value") or it.get("address") + or it.get("name") or "") + if not value: + return + type_ref, value_field = _TYPE_MAP.get( + ioc_type, (ioc_type or "Text", "value"), + ) + existed = conn.execute( + "SELECT id, group_id FROM entities WHERE type_ref=? " + "AND name=? LIMIT 1" if has_group_col else + "SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1", + (type_ref, value), + ).fetchone() + ts = now_iso() + if existed: + target_id = existed[0] + # No machacamos group_id existente — un IoC repetido entre + # ejecuciones mantiene su primer agrupamiento. + else: + target_id = f"{type_ref}_{now_ms()}_{idx}" + meta = {value_field: value, "batch_id": batch_id} + if "start" in it: + meta["text_offset"] = it["start"] + node_group = group_id if in_group else None + if has_group_col: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, " + " metadata, group_id, created_at, updated_at) " + "VALUES (?, ?, ?, 'enricher:extract_iocs_text', ?, " + " ?, ?, ?)", + (target_id, value, type_ref, + json.dumps(meta, ensure_ascii=False), + node_group, ts, ts), + ) + else: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, " + " metadata, created_at, updated_at) " + "VALUES (?, ?, ?, 'enricher:extract_iocs_text', ?, " + " ?, ?)", + (target_id, value, type_ref, + json.dumps(meta, ensure_ascii=False), ts, ts), + ) + entities_added += 1 + new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1 + + # Cada IoC mantiene su EXTRACTED_FROM al source original (no + # al Group). El Group solo es contenedor visual. + rel_exists = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " + "AND name='EXTRACTED_FROM' LIMIT 1", + (target_id, node_id), + ).fetchone() + if not rel_exists: + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, " + " created_at, updated_at) " + "VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)", + (f"rel_{now_ms()}_{idx}_extracted", + target_id, node_id, ts, ts), + ) + relations_added += 1 + + n_preview = len(preview) + n_grouped = len(grouped) + for i, it in enumerate(preview): + _insert_one(it, i, in_group=False) + for j, it in enumerate(grouped): + _insert_one(it, n_preview + j, in_group=True) + if n_grouped > 0 and j % 20 == 0: + progress(0.55 + 0.40 * (j / max(1, n_grouped)), "writing") + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "iocs_found": len(unique), + "by_type": new_by_type, + "entities_added": entities_added, + "relations_added": relations_added, + "batch_id": batch_id, + "group_id": group_id or "", + "grouped": bool(group_id), + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/enrichers/split_sentences/manifest.yaml b/enrichers/split_sentences/manifest.yaml new file mode 100644 index 0000000..7d41aa3 --- /dev/null +++ b/enrichers/split_sentences/manifest.yaml @@ -0,0 +1,9 @@ +id: split_sentences +name: "Split text into sentences" +description: "Parte el texto del nodo en frases y crea nodos Sentence conectados con SENTENCE_OF al origen. Sin red, puro regex." +applies_to: [text, Text] +emits: [Sentence] +relations: [SENTENCE_OF] +params: + - { name: max_sentences, type: int, default: 200 } + - { name: min_length, type: int, default: 20, description: "ignora frases con menos de N caracteres" } diff --git a/enrichers/split_sentences/run.py b/enrichers/split_sentences/run.py new file mode 100644 index 0000000..e513e78 --- /dev/null +++ b/enrichers/split_sentences/run.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +"""Enricher split_sentences — parte texto en frases (regex puro, offline). + +Wire protocol estandar (issue 0026): + - stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir, + cache_dir, registry_root, params. + - stderr: lineas `PROGRESS: ` para feedback de UI. + - stdout: una linea JSON al final con resumen. + - exit code 0 = ok, !=0 = error. + +Lectura del texto (en orden de prioridad): + 1. metadata.text (campo canonico de un nodo Text) + 2. metadata.description + 3. metadata.query (compatible con nodos creados desde la barra de busqueda) + 4. node_name (fallback minimo) + +Si tras esto el texto es < min_length, falla con exit 2 y mensaje claro. + +Grouping (issue 0035c, mismo patron que web_search): + - Si len(sentences) >= GROUP_THRESHOLD y la BD soporta group_id: + * Crea Group `type_ref='Group'` colgando del source con SENTENCE_OF. + * Primeras GROUP_PREVIEW_K frases sueltas (group_id=NULL). + * Resto con group_id apuntando al Group recien creado. + - Si None: + sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") + sys.stderr.flush() + + +def log(msg: str) -> None: + sys.stderr.write(f"{msg}\n") + sys.stderr.flush() + + +def now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def now_ms() -> int: + return int(time.time() * 1000) + + +def has_group_id_column(conn: sqlite3.Connection) -> bool: + """Detecta si la columna `group_id` existe en `entities`. + + El schema actual la incluye (issue 0035a) pero las BDs viejas pueden + no tenerla. Si no esta, insertamos sin esa columna. + """ + try: + cur = conn.execute("PRAGMA table_info(entities)") + for row in cur: + if row[1] == "group_id": + return True + except sqlite3.Error: + pass + return False + + +def read_text(metadata: dict, node_name: str) -> str: + """Resuelve el texto a partir del orden de prioridad documentado.""" + for key in ("text", "description", "query"): + v = metadata.get(key) + if isinstance(v, str) and v.strip(): + return v.strip() + return (node_name or "").strip() + + +def split_into_sentences(text: str, min_length: int) -> list[str]: + """Aplica el regex de split y filtra por longitud minima.""" + parts = _SENT_SPLIT_RE.split(text) + out: list[str] = [] + for p in parts: + s = p.strip() + if len(s) < min_length: + continue + out.append(s) + return out + + +def insert_sentence(conn: sqlite3.Connection, *, sentence: str, rank: int, + batch_id: str, group_id: str | None, + has_group_col: bool) -> str: + """Inserta un nodo Sentence y devuelve su id. No deduplica — cada + ejecucion crea entidades nuevas (las frases pueden repetirse entre + ejecuciones distintas y el rank/batch las distingue). + """ + ts = now_iso() + new_id = f"Sentence_{now_ms()}_{rank}" + name = sentence[:80] + ("..." if len(sentence) > 80 else "") + meta = { + "text": sentence, + "rank": rank, + "batch_id": batch_id, + } + meta_json = json.dumps(meta, ensure_ascii=False) + if has_group_col: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " group_id, created_at, updated_at) " + "VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?, ?)", + (new_id, name, meta_json, group_id, ts, ts), + ) + else: + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?)", + (new_id, name, meta_json, ts, ts), + ) + return new_id + + +def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str, + source_node_name: str, count: int, + batch_id: str) -> str: + ts = now_iso() + new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}" + name = f"split_sentences: {source_node_name} ({count})" + meta = { + "enricher": "split_sentences", + "count": count, + "batch_id": batch_id, + "source_node_id": source_node_id, + } + meta_json = json.dumps(meta, ensure_ascii=False) + conn.execute( + "INSERT INTO entities (id, name, type_ref, source, metadata, " + " created_at, updated_at) " + "VALUES (?, ?, 'Group', 'enricher:split_sentences', ?, ?, ?)", + (new_id, name, meta_json, ts, ts), + ) + return new_id + + +_REL_COUNTER = 0 + + +def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str, + name: str) -> bool: + global _REL_COUNTER + cur = conn.execute( + "SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? " + "AND name=? LIMIT 1", + (from_id, to_id, name), + ) + if cur.fetchone(): + return False + ts = now_iso() + _REL_COUNTER += 1 + rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}" + conn.execute( + "INSERT INTO relations (id, name, from_entity, to_entity, " + " created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", + (rel_id, name, from_id, to_id, ts, ts), + ) + return True + + +def main() -> int: + raw = sys.stdin.read() + try: + ctx = json.loads(raw) + except Exception as e: + log(f"stdin not valid JSON: {e}") + return 2 + + node_id = ctx.get("node_id") or "" + node_name = (ctx.get("node_name") or "").strip() + metadata = ctx.get("metadata") or {} + if isinstance(metadata, str): + try: + metadata = json.loads(metadata) + except Exception: + metadata = {} + ops_db_path = ctx.get("ops_db_path") or "" + params = ctx.get("params") or {} + max_sentences = int(params.get("max_sentences", 200)) + min_length = int(params.get("min_length", 20)) + + if not node_id or not ops_db_path: + log("missing node_id / ops_db_path") + return 2 + + # Normalizar y resolver path como en web_search. + ops_db_path = ops_db_path.replace("\\", "/") + app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/") + if not os.path.isabs(ops_db_path): + if app_dir_raw and os.path.isdir(app_dir_raw): + cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path)) + if os.path.exists(cand): + ops_db_path = cand + if not os.path.isabs(ops_db_path): + ops_db_path = os.path.abspath(ops_db_path) + + if not os.path.exists(ops_db_path): + log(f"ops_db_path no existe: {ops_db_path}") + print(json.dumps({"error": "ops_db not found", + "ops_db_path": ops_db_path, + "entities_added": 0, "relations_added": 0})) + return 7 + + progress(0.10, "reading") + text = read_text(metadata, node_name) + if len(text) < min_length: + msg = (f"texto demasiado corto ({len(text)} chars < {min_length}). " + f"Esperaba metadata.text / description / query, o un name " + f"con mas contenido") + log(msg) + print(json.dumps({"error": msg, "entities_added": 0, + "relations_added": 0})) + return 2 + + progress(0.30, "splitting") + sentences = split_into_sentences(text, min_length) + if max_sentences > 0: + sentences = sentences[:max_sentences] + + if not sentences: + msg = (f"sin frases tras split (texto de {len(text)} chars, " + f"min_length={min_length})") + log(msg) + print(json.dumps({"error": msg, "entities_added": 0, + "relations_added": 0})) + return 2 + + progress(0.55, "writing") + conn = sqlite3.connect(ops_db_path) + conn.execute("PRAGMA foreign_keys=OFF") + entities_added = 0 + relations_added = 0 + group_id: str | None = None + batch_id = uuid.uuid4().hex + try: + has_group_col = has_group_id_column(conn) + n_total = len(sentences) + threshold = DEFAULT_GROUP_THRESHOLD + + if n_total >= threshold and has_group_col: + group_id = insert_group_entity( + conn, + source_node_id=node_id, + source_node_name=node_name or "(text)", + count=n_total, + batch_id=batch_id, + ) + entities_added += 1 + if insert_relation(conn, group_id, node_id, "SENTENCE_OF"): + relations_added += 1 + preview = sentences[:GROUP_PREVIEW_K] + grouped = sentences[GROUP_PREVIEW_K:] + else: + preview = sentences + grouped = [] + + # Frases sueltas (preview). + for i, s in enumerate(preview): + sid = insert_sentence( + conn, sentence=s, rank=i + 1, batch_id=batch_id, + group_id=None, has_group_col=has_group_col, + ) + entities_added += 1 + if insert_relation(conn, sid, node_id, "SENTENCE_OF"): + relations_added += 1 + + # Frases agrupadas — siguen colgando del source con SENTENCE_OF. + for j, s in enumerate(grouped): + rank = GROUP_PREVIEW_K + j + 1 + sid = insert_sentence( + conn, sentence=s, rank=rank, batch_id=batch_id, + group_id=group_id, has_group_col=has_group_col, + ) + entities_added += 1 + if insert_relation(conn, sid, node_id, "SENTENCE_OF"): + relations_added += 1 + + if grouped and j % 25 == 0: + progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing") + + conn.commit() + finally: + conn.close() + + progress(1.0, "done") + print(json.dumps({ + "sentences": len(sentences), + "entities_added": entities_added, + "relations_added": relations_added, + "batch_id": batch_id, + "group_id": group_id or "", + "grouped": bool(group_id), + }, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/types.yaml b/examples/types.yaml index 5b1ca3f..c326081 100644 --- a/examples/types.yaml +++ b/examples/types.yaml @@ -122,6 +122,18 @@ entities: - { name: text_length, type: int } - { name: lang, type: string } + # Sentence — fragmento de texto producido por split_sentences. Color + # violeta para distinguirlo de Text/Document; principal_field=text para + # que el viewport muestre el contenido completo en el inspector. + - name: Sentence + color: "#A78BFA" + icon: ti-quote + principal_field: text + fields: + - { name: text, type: string, required: true } + - { name: rank, type: int } + - { name: batch_id, type: string } + # Nodo grupo — cuadrado (regla de forma). Issue 0035: contenedor para # agrupar resultados de enrichers cuando exceden el umbral. Los hijos # son entidades reales con `group_id` apuntando al Group. diff --git a/tests/test_extract_iocs_text.py b/tests/test_extract_iocs_text.py new file mode 100644 index 0000000..e5147ca --- /dev/null +++ b/tests/test_extract_iocs_text.py @@ -0,0 +1,116 @@ +"""Tests del enricher extract_iocs_text — variante offline de extract_text_entities.""" +from __future__ import annotations + +from conftest import ( + base_ctx, list_entities, list_relations, make_node, run_enricher, +) + + +SAMPLE_TEXT = ( + "Reporte de incidente. Contactar a bad@evil.example o a otra@victim.example. " + "IPs vistas: 192.0.2.55 y 10.0.0.12. CVE referenciado: CVE-2024-12345. " + "Hash: 44d88612fea8a8f36de82e1278abb02f." +) + + +def _ioc_paragraph(n: int) -> str: + """Genera texto con muchos IoCs (mezcla de emails, IPs, CVEs).""" + parts = [] + # n/3 emails, n/3 IPs, n/3 CVEs aprox. + for i in range(n // 3 + 1): + parts.append(f"contact{i:03d}@example{i % 7}.org") + for i in range(n // 3 + 1): + # IPs validas en rango 10.x.x.x + a = (i // 256) % 256 + b = i % 256 + parts.append(f"10.{a}.{b}.5") + for i in range(n // 3 + 1): + parts.append(f"CVE-2024-{10000 + i}") + return ", ".join(parts) + "." + + +def test_extract_iocs_text_finds_email_and_ip(ops_db, app_dir, registry_root): + """Texto con emails, IPs, CVE, hash → entidades creadas con tipos correctos.""" + make_node(ops_db, node_id="t1", name="incident", + type_ref="text", metadata={"text": SAMPLE_TEXT}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="incident", node_type="text", + metadata={"text": SAMPLE_TEXT}) + + rc, out, err = run_enricher("extract_iocs_text", ctx) + assert rc == 0, err + assert out is not None + assert out["entities_added"] >= 3, out + + types = {e["type_ref"] for e in list_entities(ops_db) + if e["type_ref"] not in ("text", "Group")} + assert "Email" in types, types + # CVE casi seguro presente; IP/hash/dominios pueden o no segun extract_iocs. + assert "CVE" in types, types + + rels = list_relations(ops_db, name="EXTRACTED_FROM") + assert len(rels) >= 3 + assert all(r["to_entity"] == "t1" for r in rels) + + +def test_extract_iocs_text_uses_metadata_text(ops_db, app_dir, registry_root): + """metadata.text se prioriza sobre node_name.""" + make_node(ops_db, node_id="t1", name="placeholder", + type_ref="text", metadata={"text": SAMPLE_TEXT}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="placeholder", node_type="text", + metadata={"text": SAMPLE_TEXT}) + rc, out, err = run_enricher("extract_iocs_text", ctx) + assert rc == 0, err + # El name "placeholder" no contiene IoCs; si se hubiese usado, no + # habria entidades. Ergo entities_added > 0 demuestra que leyo text. + assert out["entities_added"] >= 2, out + + +def test_extract_iocs_text_no_text_fails(ops_db, app_dir, registry_root): + """Sin texto → exit 2 con error claro.""" + make_node(ops_db, node_id="t1", name="", type_ref="text", metadata={}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="", node_type="text") + rc, out, err = run_enricher("extract_iocs_text", ctx) + assert rc == 2 + assert out is not None + assert "sin texto" in (out.get("error") or "") + + +def test_extract_iocs_text_above_threshold_creates_group(ops_db, app_dir, + registry_root): + """>=50 IoCs → Group heterogeneo con todos dentro (fase 1).""" + text = _ioc_paragraph(180) # ~60 emails + ~60 IPs + ~60 CVEs + make_node(ops_db, node_id="t1", name="dump", + type_ref="text", metadata={"text": text}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="dump", node_type="text", + metadata={"text": text}) + rc, out, err = run_enricher("extract_iocs_text", ctx) + assert rc == 0, err + assert out["iocs_found"] >= 50, out + + if out["grouped"]: + groups = list_entities(ops_db, type_ref="Group") + assert len(groups) == 1 + g = groups[0] + assert g["metadata"]["enricher"] == "extract_iocs_text" + assert g["metadata"]["count"] == out["iocs_found"] + assert g["metadata"]["source_node_id"] == "t1" + + # K primeros sueltos, resto agrupados (heterogeneo). + non_group_iocs = [e for e in list_entities(ops_db) + if e["type_ref"] not in ("text", "Group")] + sueltos = [e for e in non_group_iocs if e["group_id"] is None] + agrupados = [e for e in non_group_iocs if e["group_id"] == g["id"]] + # K=10 sueltos exactos. + assert len(sueltos) == 10 + assert len(agrupados) == out["iocs_found"] - 10 + + # EXTRACTED_FROM del Group al source. + rels = list_relations(ops_db, name="EXTRACTED_FROM") + to_t1_from_group = [r for r in rels + if r["to_entity"] == "t1" + and r["from_entity"] == g["id"]] + assert len(to_t1_from_group) == 1 diff --git a/tests/test_split_sentences.py b/tests/test_split_sentences.py new file mode 100644 index 0000000..1870710 --- /dev/null +++ b/tests/test_split_sentences.py @@ -0,0 +1,147 @@ +"""Tests del enricher split_sentences — split por regex, sin red. + +Cubrimos: + - happy path: 5 frases → 5 nodos Sentence + relaciones SENTENCE_OF. + - below threshold: ningun Group. + - above threshold (>=50): 1 Group + K sueltos + N-K agrupados. + - sin texto: exit 2 con mensaje claro. +""" +from __future__ import annotations + +from conftest import ( + base_ctx, list_entities, list_relations, make_node, run_enricher, +) + + +SAMPLE_TEXT = ( + "El tomate es originario de America. Su cultivo se extendio por Europa " + "en el siglo XVI. Hoy se considera una hortaliza basica. La variedad " + "cherry es popular en ensaladas frescas. Existen mas de mil variedades " + "registradas en el mundo entero." +) + + +def _build_paragraph(n: int) -> str: + """Genera un texto con N frases unicas, cada una >=20 chars.""" + rows = [] + for i in range(n): + rows.append( + f"Esta es la frase numero {i:03d} con suficiente contenido " + f"para superar el min_length por defecto del enricher." + ) + return " ".join(rows) + + +def test_split_sentences_creates_sentence_nodes(ops_db, app_dir, registry_root): + """Texto con 5 frases distintas → 5 Sentence + 5 SENTENCE_OF.""" + make_node(ops_db, node_id="t1", name="tomate doc", + type_ref="text", metadata={"text": SAMPLE_TEXT}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="tomate doc", node_type="text", + metadata={"text": SAMPLE_TEXT}) + + rc, out, err = run_enricher("split_sentences", ctx) + assert rc == 0, err + assert out is not None + assert out["sentences"] == 5, out + assert out["entities_added"] == 5 + assert out["grouped"] is False + assert out["group_id"] == "" + + sentences = list_entities(ops_db, type_ref="Sentence") + assert len(sentences) == 5 + # Todas con metadata.text igual a la frase completa y rank ascendente. + ranks = sorted(s["metadata"]["rank"] for s in sentences) + assert ranks == [1, 2, 3, 4, 5] + # batch_id compartido. + batch_ids = {s["metadata"]["batch_id"] for s in sentences} + assert len(batch_ids) == 1 + + rels = list_relations(ops_db, name="SENTENCE_OF") + assert len(rels) == 5 + assert all(r["to_entity"] == "t1" for r in rels) + + +def test_split_sentences_below_threshold_no_group(ops_db, app_dir, + registry_root): + """30 frases → ningun Group (<50).""" + text = _build_paragraph(30) + make_node(ops_db, node_id="t1", name="big doc", + type_ref="text", metadata={"text": text}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="big doc", node_type="text", + metadata={"text": text}) + rc, out, err = run_enricher("split_sentences", ctx) + assert rc == 0, err + assert out["sentences"] == 30 + assert out["grouped"] is False + assert out["group_id"] == "" + + groups = list_entities(ops_db, type_ref="Group") + assert groups == [] + sentences = list_entities(ops_db, type_ref="Sentence") + assert len(sentences) == 30 + assert all(s["group_id"] is None for s in sentences) + + +def test_split_sentences_above_threshold_creates_group(ops_db, app_dir, + registry_root): + """100 frases → 1 Group + 10 sueltos + 90 con group_id.""" + text = _build_paragraph(100) + make_node(ops_db, node_id="t1", name="huge doc", + type_ref="text", metadata={"text": text}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="huge doc", node_type="text", + metadata={"text": text}) + rc, out, err = run_enricher("split_sentences", ctx) + assert rc == 0, err + assert out["sentences"] == 100 + assert out["grouped"] is True + assert out["group_id"] + + groups = list_entities(ops_db, type_ref="Group") + assert len(groups) == 1 + g = groups[0] + assert g["metadata"]["count"] == 100 + assert g["metadata"]["enricher"] == "split_sentences" + assert g["metadata"]["source_node_id"] == "t1" + assert g["metadata"].get("batch_id") + + sentences = list_entities(ops_db, type_ref="Sentence") + assert len(sentences) == 100 + sueltos = [s for s in sentences if s["group_id"] is None] + children = [s for s in sentences if s["group_id"] == g["id"]] + assert len(sueltos) == 10 + assert len(children) == 90 + + # Group + 100 Sentence = 101 SENTENCE_OF al source. + rels = list_relations(ops_db, name="SENTENCE_OF") + to_t1 = [r for r in rels if r["to_entity"] == "t1"] + assert len(to_t1) == 101 + assert any(r["from_entity"] == g["id"] for r in to_t1) + + +def test_split_sentences_no_text_fails(ops_db, app_dir, registry_root): + """Nodo sin metadata.text/description/query y name corto → exit 2.""" + make_node(ops_db, node_id="t1", name="x", type_ref="text", metadata={}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="x", node_type="text") + rc, out, err = run_enricher("split_sentences", ctx) + assert rc == 2 + assert out is not None + assert "demasiado corto" in (out.get("error") or "") or \ + "min_length" in (out.get("error") or "") + + +def test_split_sentences_uses_metadata_text_priority(ops_db, app_dir, + registry_root): + """metadata.text gana sobre node_name aunque ambos tengan texto.""" + make_node(ops_db, node_id="t1", name="placeholder corto", + type_ref="text", metadata={"text": SAMPLE_TEXT}) + ctx = base_ctx(ops_db=ops_db, app_dir=app_dir, registry_root=registry_root, + node_id="t1", node_name="placeholder corto", + node_type="text", + metadata={"text": SAMPLE_TEXT}) + rc, out, err = run_enricher("split_sentences", ctx) + assert rc == 0, err + assert out["sentences"] == 5 # 5 frases del SAMPLE_TEXT, no 1 del name