feat: enrichers offline split_sentences + extract_iocs_text

Para probar la app sin depender de red (DDG bloquea con captcha desde
ciertas IPs). Ambos aplican grouping (umbral 50, preview K=10) replicando
el patron de web_search.

- split_sentences: parte texto en frases (regex), crea nodos Sentence
  conectados con SENTENCE_OF.
- extract_iocs_text: variante de extract_text_entities que lee directo
  metadata.text/description/name, sin requerir fetch previo. Vendoriza
  extract_iocs_py_cybersecurity. Multi-tipo, agrupado en un solo Group
  heterogeneo (decision 6 multi-grupo-por-tipo es fase 2).
- Tipo Sentence en types.yaml.

Tests pytest cubren below/above threshold para ambos.
This commit is contained in:
2026-05-03 15:20:39 +02:00
parent 092ad2801e
commit 0e435c2e21
7 changed files with 934 additions and 0 deletions
+11
View File
@@ -0,0 +1,11 @@
id: extract_iocs_text
name: "Extract IoCs from text"
description: "Extrae IoCs (IPs, emails, dominios, hashes, crypto wallets, CVEs, MAC, telefonos) directamente del texto del nodo. No requiere fetch previo. Sin red."
applies_to: [text, Text]
emits: [Email, IPAddress, Domain, FileHash, CryptoWallet, CVE, MACAddress, Phone]
relations: [EXTRACTED_FROM]
uses_functions:
- extract_iocs_py_cybersecurity
params:
- { name: types, type: string, default: "", description: "CSV de tipos a extraer; vacio = todos" }
- { name: max_entities, type: int, default: 500 }
+320
View File
@@ -0,0 +1,320 @@
#!/usr/bin/env python3
"""Enricher extract_iocs_text — variante offline de extract_text_entities.
A diferencia de extract_text_entities, este enricher NO depende de un
markdown cacheado (fetch_webpage previo). Lee el texto directamente del
nodo (`metadata.text` > `metadata.description` > `metadata.query` >
`node_name`) y aplica el pipeline `extract_iocs` del registry sobre el.
Sin red, sin dependencias externas — pensado para probar la app
cuando DDG bloquea con captcha o cuando se trabaja en un entorno
offline.
Wire protocol estandar (issue 0026).
Grouping (provisional, fase 1 del issue 0035):
Si len(unique_iocs) >= GROUP_THRESHOLD, se crea UN solo Group
heterogeneo con todos los IoCs dentro (independientemente de su tipo).
La decision 6 del issue 0035 ("multi-tipo → un Group por tipo") es
fase 2 — aqui simplificamos para validar el flow end-to-end.
Cuando llegue la fase 2, esta logica se sustituira por una que cree
N grupos (uno por type_ref que exceda el umbral).
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
_TYPE_MAP = {
"email": ("Email", "address"),
"ip_address": ("IPAddress", "address"),
"domain": ("Domain", "name"),
"file_hash": ("FileHash", "value"),
"crypto_wallet": ("CryptoWallet", "address"),
"cve_id": ("CVE", "id"),
"mac_address": ("MACAddress", "address"),
"phone_number": ("Phone", "number"),
}
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def has_group_id_column(conn: sqlite3.Connection) -> bool:
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def read_text(metadata: dict, node_name: str) -> str:
for key in ("text", "description", "query"):
v = metadata.get(key)
if isinstance(v, str) and v.strip():
return v.strip()
return (node_name or "").strip()
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
registry_root = ctx.get("registry_root") or ""
params = ctx.get("params") or {}
types_csv = (params.get("types") or "").strip()
types_list = ([t.strip() for t in types_csv.split(",") if t.strip()]
if types_csv else None)
max_entities = int(params.get("max_entities", 500))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
ops_db_path = ops_db_path.replace("\\", "/")
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path}")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 7
progress(0.10, "reading")
text = read_text(metadata, node_name)
if not text:
msg = ("nodo sin texto. Esperaba metadata.text / description / "
"query, o un name con contenido")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.30, "extracting iocs")
# Vendoring: prefiere _vendored/ si existe (binario distribuido); si
# no, fallback al registry_root para modo dev local.
vendored = os.path.join(os.path.dirname(__file__), "_vendored")
if os.path.isdir(vendored):
if vendored not in sys.path:
sys.path.insert(0, vendored)
elif registry_root:
py_funcs = os.path.join(registry_root, "python", "functions")
if py_funcs not in sys.path:
sys.path.insert(0, py_funcs)
try:
from cybersecurity.extract_iocs import extract_iocs # type: ignore
except Exception as e:
log(f"no se pudo importar extract_iocs: {e}")
print(json.dumps({"error": f"extract_iocs import failed: {e}",
"entities_added": 0, "relations_added": 0}))
return 5
iocs = extract_iocs(text, types_list)
# Dedup por (type, value).
seen = set()
unique: list[dict] = []
for it in iocs:
t = it.get("type")
v = it.get("value") or it.get("address") or it.get("name") or ""
if not t or not v:
continue
key = (t, v)
if key in seen:
continue
seen.add(key)
unique.append(it)
if len(unique) >= max_entities:
break
progress(0.55, "writing")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
new_by_type: dict[str, int] = {}
group_id: str | None = None
batch_id = uuid.uuid4().hex
try:
has_group_col = has_group_id_column(conn)
n_total = len(unique)
threshold = DEFAULT_GROUP_THRESHOLD
if n_total >= threshold and has_group_col:
# Group heterogeneo (provisional, ver docstring).
ts = now_iso()
group_id = (f"Group_{now_ms()}_"
f"{abs(hash(node_id + batch_id)) % 100000}")
group_name = f"iocs: {node_name or '(text)'} ({n_total})"
group_meta = {
"enricher": "extract_iocs_text",
"count": n_total,
"batch_id": batch_id,
"source_node_id": node_id,
}
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Group', 'enricher:extract_iocs_text', ?, ?, ?)",
(group_id, group_name, json.dumps(group_meta, ensure_ascii=False),
ts, ts),
)
entities_added += 1
# Relacion EXTRACTED_FROM del Group al source.
rel_id = f"rel_{now_ms()}_g_extracted"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) "
"VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)",
(rel_id, group_id, node_id, ts, ts),
)
relations_added += 1
preview = unique[:GROUP_PREVIEW_K]
grouped = unique[GROUP_PREVIEW_K:]
else:
preview = unique
grouped = []
def _insert_one(it: dict, idx: int, *, in_group: bool) -> None:
nonlocal entities_added, relations_added
ioc_type = it.get("type")
value = (it.get("value") or it.get("address")
or it.get("name") or "")
if not value:
return
type_ref, value_field = _TYPE_MAP.get(
ioc_type, (ioc_type or "Text", "value"),
)
existed = conn.execute(
"SELECT id, group_id FROM entities WHERE type_ref=? "
"AND name=? LIMIT 1" if has_group_col else
"SELECT id FROM entities WHERE type_ref=? AND name=? LIMIT 1",
(type_ref, value),
).fetchone()
ts = now_iso()
if existed:
target_id = existed[0]
# No machacamos group_id existente — un IoC repetido entre
# ejecuciones mantiene su primer agrupamiento.
else:
target_id = f"{type_ref}_{now_ms()}_{idx}"
meta = {value_field: value, "batch_id": batch_id}
if "start" in it:
meta["text_offset"] = it["start"]
node_group = group_id if in_group else None
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, "
" metadata, group_id, created_at, updated_at) "
"VALUES (?, ?, ?, 'enricher:extract_iocs_text', ?, "
" ?, ?, ?)",
(target_id, value, type_ref,
json.dumps(meta, ensure_ascii=False),
node_group, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, "
" metadata, created_at, updated_at) "
"VALUES (?, ?, ?, 'enricher:extract_iocs_text', ?, "
" ?, ?)",
(target_id, value, type_ref,
json.dumps(meta, ensure_ascii=False), ts, ts),
)
entities_added += 1
new_by_type[type_ref] = new_by_type.get(type_ref, 0) + 1
# Cada IoC mantiene su EXTRACTED_FROM al source original (no
# al Group). El Group solo es contenedor visual.
rel_exists = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name='EXTRACTED_FROM' LIMIT 1",
(target_id, node_id),
).fetchone()
if not rel_exists:
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) "
"VALUES (?, 'EXTRACTED_FROM', ?, ?, ?, ?)",
(f"rel_{now_ms()}_{idx}_extracted",
target_id, node_id, ts, ts),
)
relations_added += 1
n_preview = len(preview)
n_grouped = len(grouped)
for i, it in enumerate(preview):
_insert_one(it, i, in_group=False)
for j, it in enumerate(grouped):
_insert_one(it, n_preview + j, in_group=True)
if n_grouped > 0 and j % 20 == 0:
progress(0.55 + 0.40 * (j / max(1, n_grouped)), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"iocs_found": len(unique),
"by_type": new_by_type,
"entities_added": entities_added,
"relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())
+9
View File
@@ -0,0 +1,9 @@
id: split_sentences
name: "Split text into sentences"
description: "Parte el texto del nodo en frases y crea nodos Sentence conectados con SENTENCE_OF al origen. Sin red, puro regex."
applies_to: [text, Text]
emits: [Sentence]
relations: [SENTENCE_OF]
params:
- { name: max_sentences, type: int, default: 200 }
- { name: min_length, type: int, default: 20, description: "ignora frases con menos de N caracteres" }
+319
View File
@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""Enricher split_sentences — parte texto en frases (regex puro, offline).
Wire protocol estandar (issue 0026):
- stdin: JSON con node_id, node_name, metadata, ops_db_path, app_dir,
cache_dir, registry_root, params.
- stderr: lineas `PROGRESS:<float> <stage>` para feedback de UI.
- stdout: una linea JSON al final con resumen.
- exit code 0 = ok, !=0 = error.
Lectura del texto (en orden de prioridad):
1. metadata.text (campo canonico de un nodo Text)
2. metadata.description
3. metadata.query (compatible con nodos creados desde la barra de busqueda)
4. node_name (fallback minimo)
Si tras esto el texto es < min_length, falla con exit 2 y mensaje claro.
Grouping (issue 0035c, mismo patron que web_search):
- Si len(sentences) >= GROUP_THRESHOLD y la BD soporta group_id:
* Crea Group `type_ref='Group'` colgando del source con SENTENCE_OF.
* Primeras GROUP_PREVIEW_K frases sueltas (group_id=NULL).
* Resto con group_id apuntando al Group recien creado.
- Si <threshold: todas sueltas, sin Group.
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
import sys
import time
import uuid
from datetime import datetime, timezone
DEFAULT_GROUP_THRESHOLD = 50
GROUP_PREVIEW_K = 10
# Split por delimitador de oracion (.!?) seguido de whitespace seguido de
# inicial de oracion en mayusculas (incluye acentos espanoles). Robusto
# para texto en espanol e ingles. Casos limite (abreviaturas como "Sr.",
# "Dr.") quedan como falsos negativos aceptables — el split es heuristico.
_SENT_SPLIT_RE = re.compile(r'(?<=[.!?])\s+(?=[A-ZÁÉÍÓÚÜÑ])')
def progress(p: float, stage: str = "") -> None:
sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n")
sys.stderr.flush()
def log(msg: str) -> None:
sys.stderr.write(f"{msg}\n")
sys.stderr.flush()
def now_iso() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def now_ms() -> int:
return int(time.time() * 1000)
def has_group_id_column(conn: sqlite3.Connection) -> bool:
"""Detecta si la columna `group_id` existe en `entities`.
El schema actual la incluye (issue 0035a) pero las BDs viejas pueden
no tenerla. Si no esta, insertamos sin esa columna.
"""
try:
cur = conn.execute("PRAGMA table_info(entities)")
for row in cur:
if row[1] == "group_id":
return True
except sqlite3.Error:
pass
return False
def read_text(metadata: dict, node_name: str) -> str:
"""Resuelve el texto a partir del orden de prioridad documentado."""
for key in ("text", "description", "query"):
v = metadata.get(key)
if isinstance(v, str) and v.strip():
return v.strip()
return (node_name or "").strip()
def split_into_sentences(text: str, min_length: int) -> list[str]:
"""Aplica el regex de split y filtra por longitud minima."""
parts = _SENT_SPLIT_RE.split(text)
out: list[str] = []
for p in parts:
s = p.strip()
if len(s) < min_length:
continue
out.append(s)
return out
def insert_sentence(conn: sqlite3.Connection, *, sentence: str, rank: int,
batch_id: str, group_id: str | None,
has_group_col: bool) -> str:
"""Inserta un nodo Sentence y devuelve su id. No deduplica — cada
ejecucion crea entidades nuevas (las frases pueden repetirse entre
ejecuciones distintas y el rank/batch las distingue).
"""
ts = now_iso()
new_id = f"Sentence_{now_ms()}_{rank}"
name = sentence[:80] + ("..." if len(sentence) > 80 else "")
meta = {
"text": sentence,
"rank": rank,
"batch_id": batch_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
if has_group_col:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" group_id, created_at, updated_at) "
"VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?, ?)",
(new_id, name, meta_json, group_id, ts, ts),
)
else:
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Sentence', 'enricher:split_sentences', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
def insert_group_entity(conn: sqlite3.Connection, *, source_node_id: str,
source_node_name: str, count: int,
batch_id: str) -> str:
ts = now_iso()
new_id = f"Group_{now_ms()}_{abs(hash(source_node_id + batch_id)) % 100000}"
name = f"split_sentences: {source_node_name} ({count})"
meta = {
"enricher": "split_sentences",
"count": count,
"batch_id": batch_id,
"source_node_id": source_node_id,
}
meta_json = json.dumps(meta, ensure_ascii=False)
conn.execute(
"INSERT INTO entities (id, name, type_ref, source, metadata, "
" created_at, updated_at) "
"VALUES (?, ?, 'Group', 'enricher:split_sentences', ?, ?, ?)",
(new_id, name, meta_json, ts, ts),
)
return new_id
_REL_COUNTER = 0
def insert_relation(conn: sqlite3.Connection, from_id: str, to_id: str,
name: str) -> bool:
global _REL_COUNTER
cur = conn.execute(
"SELECT 1 FROM relations WHERE from_entity=? AND to_entity=? "
"AND name=? LIMIT 1",
(from_id, to_id, name),
)
if cur.fetchone():
return False
ts = now_iso()
_REL_COUNTER += 1
rel_id = f"rel_{now_ms()}_{_REL_COUNTER}_{name.lower()}"
conn.execute(
"INSERT INTO relations (id, name, from_entity, to_entity, "
" created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
(rel_id, name, from_id, to_id, ts, ts),
)
return True
def main() -> int:
raw = sys.stdin.read()
try:
ctx = json.loads(raw)
except Exception as e:
log(f"stdin not valid JSON: {e}")
return 2
node_id = ctx.get("node_id") or ""
node_name = (ctx.get("node_name") or "").strip()
metadata = ctx.get("metadata") or {}
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except Exception:
metadata = {}
ops_db_path = ctx.get("ops_db_path") or ""
params = ctx.get("params") or {}
max_sentences = int(params.get("max_sentences", 200))
min_length = int(params.get("min_length", 20))
if not node_id or not ops_db_path:
log("missing node_id / ops_db_path")
return 2
# Normalizar y resolver path como en web_search.
ops_db_path = ops_db_path.replace("\\", "/")
app_dir_raw = (ctx.get("app_dir") or "").replace("\\", "/")
if not os.path.isabs(ops_db_path):
if app_dir_raw and os.path.isdir(app_dir_raw):
cand = os.path.normpath(os.path.join(app_dir_raw, ops_db_path))
if os.path.exists(cand):
ops_db_path = cand
if not os.path.isabs(ops_db_path):
ops_db_path = os.path.abspath(ops_db_path)
if not os.path.exists(ops_db_path):
log(f"ops_db_path no existe: {ops_db_path}")
print(json.dumps({"error": "ops_db not found",
"ops_db_path": ops_db_path,
"entities_added": 0, "relations_added": 0}))
return 7
progress(0.10, "reading")
text = read_text(metadata, node_name)
if len(text) < min_length:
msg = (f"texto demasiado corto ({len(text)} chars < {min_length}). "
f"Esperaba metadata.text / description / query, o un name "
f"con mas contenido")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.30, "splitting")
sentences = split_into_sentences(text, min_length)
if max_sentences > 0:
sentences = sentences[:max_sentences]
if not sentences:
msg = (f"sin frases tras split (texto de {len(text)} chars, "
f"min_length={min_length})")
log(msg)
print(json.dumps({"error": msg, "entities_added": 0,
"relations_added": 0}))
return 2
progress(0.55, "writing")
conn = sqlite3.connect(ops_db_path)
conn.execute("PRAGMA foreign_keys=OFF")
entities_added = 0
relations_added = 0
group_id: str | None = None
batch_id = uuid.uuid4().hex
try:
has_group_col = has_group_id_column(conn)
n_total = len(sentences)
threshold = DEFAULT_GROUP_THRESHOLD
if n_total >= threshold and has_group_col:
group_id = insert_group_entity(
conn,
source_node_id=node_id,
source_node_name=node_name or "(text)",
count=n_total,
batch_id=batch_id,
)
entities_added += 1
if insert_relation(conn, group_id, node_id, "SENTENCE_OF"):
relations_added += 1
preview = sentences[:GROUP_PREVIEW_K]
grouped = sentences[GROUP_PREVIEW_K:]
else:
preview = sentences
grouped = []
# Frases sueltas (preview).
for i, s in enumerate(preview):
sid = insert_sentence(
conn, sentence=s, rank=i + 1, batch_id=batch_id,
group_id=None, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, sid, node_id, "SENTENCE_OF"):
relations_added += 1
# Frases agrupadas — siguen colgando del source con SENTENCE_OF.
for j, s in enumerate(grouped):
rank = GROUP_PREVIEW_K + j + 1
sid = insert_sentence(
conn, sentence=s, rank=rank, batch_id=batch_id,
group_id=group_id, has_group_col=has_group_col,
)
entities_added += 1
if insert_relation(conn, sid, node_id, "SENTENCE_OF"):
relations_added += 1
if grouped and j % 25 == 0:
progress(0.55 + 0.40 * (j / max(1, len(grouped))), "writing")
conn.commit()
finally:
conn.close()
progress(1.0, "done")
print(json.dumps({
"sentences": len(sentences),
"entities_added": entities_added,
"relations_added": relations_added,
"batch_id": batch_id,
"group_id": group_id or "",
"grouped": bool(group_id),
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
sys.exit(main())