#!/usr/bin/env python3 """Enricher paste_extract — modo preview puro para el panel "Paste & Extract". A diferencia del resto de enrichers, este NO escribe a operations.db. Recibe el texto via `params.text` y devuelve un JSON con las entidades y relaciones propuestas. La aplicacion (panel C++) procesa la propuesta, el usuario marca cuales aceptar y la propia app persiste con dedupe via entity_ops. Cascada de extraccion (graceful fallback): 1. `extract_iocs(text, types_list)` — regex puro, siempre disponible. 2. Si `use_hybrid=true` y los modelos cargan correctamente → ademas `extract_graph_hybrid` (GLiNER + GLiREL + LLM opcional). Si la carga del modelo falla por dependencias o por tiempo se ignora silenciosamente — el panel muestra solo lo que llego. Wire protocol estandar (issue 0026), pero tolera ausencia de `node_id` y de `ops_db_path`: este enricher es global, no pertenece a un nodo. Output JSON: { "entities": [ {"id": "", "type_ref": "Email", "name": "x@y.z", "metadata": {...}, "source": "regex|gliner|llm", "start": 12, "end": 27, "confidence": 1.0} ], "relations": [ {"from_id": "", "to_id": "", "name": "RELATED_TO", "source": "glirel|llm", "confidence": 0.7} ], "stats": {"layers": ["regex"], "n_entities": N, "n_relations": M} } """ from __future__ import annotations import json import os import sys _TYPE_MAP = { "email": ("Email", "address"), "ip_address": ("IPAddress", "address"), "domain": ("Domain", "name"), "file_hash": ("FileHash", "value"), "crypto_wallet": ("CryptoWallet", "address"), "cve_id": ("CVE", "id"), "mac_address": ("MACAddress", "address"), "phone_number": ("Phone", "number"), } def progress(p: float, stage: str = "") -> None: sys.stderr.write(f"PROGRESS:{p:.2f} {stage}\n") sys.stderr.flush() def log(msg: str) -> None: sys.stderr.write(f"{msg}\n") sys.stderr.flush() def _setup_registry_path(registry_root: str) -> None: """Intenta hacer importables los paquetes del registry. Estrategia identica a otros enrichers: 1. `/_vendored/` — para distribuciones binarias. 2. `/python/functions/` — modo dev local. """ vendored = os.path.join(os.path.dirname(__file__), "_vendored") if os.path.isdir(vendored): if vendored not in sys.path: sys.path.insert(0, vendored) return if registry_root: py_funcs = os.path.join(registry_root, "python", "functions") if os.path.isdir(py_funcs) and py_funcs not in sys.path: sys.path.insert(0, py_funcs) def _run_regex(text: str, types_list, max_entities: int) -> tuple[list, list]: """Capa 1: extract_iocs. Devuelve (entities, relations).""" try: from cybersecurity.extract_iocs import extract_iocs # type: ignore except Exception as e: log(f"extract_iocs no importable: {e}") return [], [] iocs = extract_iocs(text, types_list) seen = set() entities = [] for i, it in enumerate(iocs): ioc_type = it.get("type") value = it.get("value") or it.get("address") or it.get("name") or "" if not ioc_type or not value: continue key = (ioc_type, value) if key in seen: continue seen.add(key) type_ref, value_field = _TYPE_MAP.get(ioc_type, (ioc_type, "value")) entities.append({ "id": f"tmp_e_{i}", "type_ref": type_ref, "name": value, "metadata": {value_field: value}, "source": "regex", "start": int(it.get("start", -1)), "end": int(it.get("end", -1)), "confidence": 1.0, }) if len(entities) >= max_entities: break return entities, [] # regex no produce relaciones def _try_run_hybrid(text: str, registry_root: str, confidence_threshold: float = 0.6, ) -> tuple[list, list]: """Capa 2 opcional: extract_graph_hybrid. Devuelve (entities, relations). Cualquier error o ausencia de dependencias se traga silenciosamente — GLiNER/GLiREL pueden no estar instalados. La idea es no romper el flow del panel cuando solo regex esta disponible. """ try: # El pipeline esta en `pipelines/`, importarlo via path absoluto si # el registry_root esta disponible. En modo vendored este pipeline # quizas no este — fallback silencioso. sys.path.insert(0, os.path.join(registry_root, "python", "functions", "pipelines")) from extract_graph_hybrid import extract_graph_hybrid # type: ignore from datascience.gliner_load_model import gliner_load_model # type: ignore from datascience.glirel_load_model import glirel_load_model # type: ignore except Exception as e: log(f"hybrid pipeline no disponible: {e}") return [], [] # Schema generico para entidades semanticas. Si el caller quiere un # schema custom, lo extendemos via params en una iteracion futura. entity_schema = [ {"type_ref": "Person", "label": "person"}, {"type_ref": "Organization", "label": "organization"}, {"type_ref": "Location", "label": "location"}, ] relation_types = ["works_at", "located_in", "part_of", "related_to"] try: gliner_model = gliner_load_model() glirel_model = glirel_load_model() except Exception as e: log(f"hybrid: load modelo fallo: {e}") return [], [] try: ents, rels = extract_graph_hybrid( chunks=[text], entity_schema=entity_schema, relation_types=relation_types, gliner_model=gliner_model, glirel_model=glirel_model, llm_chat_json=None, confidence_threshold=confidence_threshold, ) except Exception as e: log(f"hybrid: extract fallo: {e}") return [], [] out_entities = [] name_to_idx: dict[tuple[str, str], int] = {} for i, ec in enumerate(ents): idx = len(out_entities) out_entities.append({ "id": f"tmp_h_{idx}", "type_ref": ec.type_ref, "name": ec.name, "metadata": dict(getattr(ec, "attributes", {}) or {}), "source": "hybrid", "start": int((ec.attributes or {}).get("start", -1)) if hasattr(ec, "attributes") else -1, "end": int((ec.attributes or {}).get("end", -1)) if hasattr(ec, "attributes") else -1, "confidence": float(ec.confidence), }) name_to_idx[(ec.type_ref, ec.name)] = idx out_relations = [] for rc in rels: # Mapear from/to (RelationCandidate) a tmp_id si los podemos casar. from_key = (getattr(rc, "from_type_ref", None), getattr(rc, "from_name", None)) to_key = (getattr(rc, "to_type_ref", None), getattr(rc, "to_name", None)) if None in from_key or None in to_key: continue fi = name_to_idx.get(from_key) ti = name_to_idx.get(to_key) if fi is None or ti is None: continue out_relations.append({ "from_id": f"tmp_h_{fi}", "to_id": f"tmp_h_{ti}", "name": getattr(rc, "name", "RELATED_TO") or "RELATED_TO", "source": "hybrid", "confidence": float(getattr(rc, "confidence", 0.0)), }) return out_entities, out_relations def main() -> int: raw = sys.stdin.read() try: ctx = json.loads(raw) except Exception as e: log(f"stdin not valid JSON: {e}") return 2 params = ctx.get("params") or {} registry_root = ctx.get("registry_root") or "" text = (params.get("text") or "").strip() if not text: log("missing params.text") print(json.dumps({"error": "missing params.text", "entities": [], "relations": [], "stats": {"layers": [], "n_entities": 0, "n_relations": 0}})) return 2 types_csv = (params.get("types") or "").strip() types_list = [t.strip() for t in types_csv.split(",") if t.strip()] \ if types_csv else None max_entities = int(params.get("max_entities", 200)) use_hybrid_raw = params.get("use_hybrid", False) if isinstance(use_hybrid_raw, str): use_hybrid = use_hybrid_raw.strip().lower() in ("1", "true", "yes", "on") else: use_hybrid = bool(use_hybrid_raw) progress(0.05, "init") _setup_registry_path(registry_root) progress(0.20, "regex") regex_entities, _ = _run_regex(text, types_list, max_entities) layers = ["regex"] hybrid_entities: list = [] hybrid_relations: list = [] if use_hybrid: progress(0.40, "hybrid") hybrid_entities, hybrid_relations = _try_run_hybrid(text, registry_root) if hybrid_entities or hybrid_relations: layers.append("hybrid") # Mergear regex + hybrid evitando duplicados exactos (type_ref, name). progress(0.85, "merge") seen = set() entities: list[dict] = [] for src in (regex_entities, hybrid_entities): for e in src: key = (e["type_ref"], e["name"]) if key in seen: continue seen.add(key) entities.append(e) if len(entities) >= max_entities: break if len(entities) >= max_entities: break # Reasignar tmp ids tras merge para que sean estables 0..N-1. id_remap: dict[str, str] = {} for i, e in enumerate(entities): new_id = f"tmp_{i}" id_remap[e["id"]] = new_id e["id"] = new_id relations: list[dict] = [] for r in hybrid_relations: fi = id_remap.get(r["from_id"]) ti = id_remap.get(r["to_id"]) if fi is None or ti is None: continue relations.append({ "from_id": fi, "to_id": ti, "name": r.get("name") or "RELATED_TO", "source": r.get("source") or "hybrid", "confidence": float(r.get("confidence", 0.0)), }) progress(1.0, "done") print(json.dumps({ "entities": entities, "relations": relations, "stats": { "layers": layers, "n_entities": len(entities), "n_relations": len(relations), }, }, ensure_ascii=False)) return 0 if __name__ == "__main__": sys.exit(main())