"""Pipeline hibrido: extract_iocs + GLiNER + GLiREL + LLM fallback.""" from __future__ import annotations import os import sys import warnings from typing import Any, Callable _ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) from python.functions.cybersecurity.extract_iocs import extract_iocs from python.functions.datascience.extract_entities_gliner import extract_entities_gliner from python.functions.datascience.extract_relations_glirel import extract_relations_glirel from python.functions.datascience.extract_entities_llm import extract_entities_llm from python.functions.datascience.extract_relations_llm import extract_relations_llm from python.types.datascience.entity_candidate import EntityCandidate from python.types.datascience.relation_candidate import RelationCandidate _IOC_TYPE_REF = { "email": "ioc_email", "ip_address": "ioc_ip_address", "domain": "ioc_domain", "file_hash": "ioc_file_hash", "crypto_wallet": "ioc_crypto_wallet", "cve_id": "ioc_cve_id", "mac_address": "ioc_mac_address", "phone_number": "ioc_phone_number", } _IOC_LABEL = { "email": "Email", "ip_address": "IPAddress", "domain": "Domain", "file_hash": "FileHash", "crypto_wallet": "CryptoWallet", "cve_id": "CVE", "mac_address": "MACAddress", "phone_number": "PhoneNumber", } def _ioc_dict_to_candidate(ioc: dict, chunk_index: int) -> EntityCandidate: """Convierte un dict de extract_iocs a EntityCandidate. Anota offsets en `attributes['start'/'end']` para que extract_relations_glirel pueda mapearlos a tokens sin fallback `text.find`. """ ioc_type = ioc.get("type", "") return EntityCandidate( name=ioc.get("value", ""), type_ref=_IOC_TYPE_REF.get(ioc_type, f"ioc_{ioc_type}"), type_label=_IOC_LABEL.get(ioc_type, ioc_type), attributes={"start": ioc.get("start", -1), "end": ioc.get("end", -1)}, confidence=1.0, source_chunk_indices=[chunk_index], ) def _mean_confidence(entities: list[EntityCandidate]) -> float: if not entities: return 0.0 return sum(e.confidence for e in entities) / len(entities) def _merge_entities_dedup_by_name_type( base: list[EntityCandidate], extra: list[EntityCandidate], ) -> list[EntityCandidate]: """Anade `extra` a `base` evitando duplicados exactos (name + type_ref). No usa fuzzy: la deduplicacion final la hace el caller con `deduplicate_entities`. Aqui solo evita el caso trivial de meter dos veces la misma cadena con el mismo type_ref dentro del mismo chunk. """ seen = {(e.name, e.type_ref) for e in base} out = list(base) for e in extra: key = (e.name, e.type_ref) if key in seen: continue seen.add(key) out.append(e) return out def extract_graph_hybrid( chunks: list[str], entity_schema: list[dict], relation_types: list[str], gliner_model: Any, glirel_model: Any, llm_chat_json: Callable[[list[dict]], dict] | None = None, ioc_types: list[str] | None = None, confidence_threshold: float = 0.6, languages: str = "Respond in Spanish.", min_entities_per_chunk: int = 2, ) -> tuple[list[EntityCandidate], list[RelationCandidate]]: """Extrae triplets `(entidad, relacion, entidad)` combinando regex + GLiNER + GLiREL + LLM fallback. Cascada por chunk: 1. `extract_iocs(chunk, ioc_types)` → entidades tecnicas (precision 100%, coste 0). 2. `extract_entities_gliner(chunk, entity_schema, gliner_model)` → semanticas zero-shot. 3. Si entidades < `min_entities_per_chunk` o `mean(confidence) < confidence_threshold` **y** hay `llm_chat_json` → `extract_entities_llm` para rellenar gaps. 4. `extract_relations_glirel(chunk, entidades_chunk, relation_types, glirel_model)`. 5. Si no salieron relaciones con >=2 entidades **y** hay `llm_chat_json` → `extract_relations_llm` para esos chunks. Args: chunks: Lista de fragmentos de texto a procesar (ya tokenizados/cortados). entity_schema: Schema para GLiNER y LLM. Lista de dicts con `type_ref`, `label` y opcional `metadata_fields`. relation_types: Tipos de relacion permitidos para GLiREL/LLM. gliner_model: Instancia GLiNER cargada con `gliner_load_model`. glirel_model: Instancia GLiREL cargada con `glirel_load_model`. llm_chat_json: Funcion inyectada que recibe messages OpenAI-style y retorna dict JSON. Si es None, no se invoca fallback LLM (ahorro maximo). ioc_types: Subset de tipos para `extract_iocs`. None = todos. confidence_threshold: Bajo este umbral se invoca el LLM como fallback. languages: Instruccion de idioma para el LLM (passthrough a las funciones LLM). min_entities_per_chunk: Si un chunk tiene menos entidades que esto, se considera "complejo" y se llama al LLM. Returns: Tupla `(entities, relations)` con todas las candidatas concatenadas. El caller debe pasar por `deduplicate_entities` y `deduplicate_relations` antes de persistir. Cada `EntityCandidate` lleva `source_chunk_indices=[i]` y cada `RelationCandidate` lleva `source_chunk_index=i`. Raises: ValueError: Si `entity_schema` esta vacio o `chunks` no es lista. """ if not isinstance(chunks, list): raise ValueError("chunks debe ser una lista") if not entity_schema: raise ValueError("entity_schema no puede estar vacio") all_entities: list[EntityCandidate] = [] all_relations: list[RelationCandidate] = [] for i, chunk in enumerate(chunks): if not chunk or not chunk.strip(): continue # ── Capa 1: regex IoCs ────────────────────────────────────────────── try: ioc_dicts = extract_iocs(chunk, ioc_types) except Exception as exc: warnings.warn( f"extract_graph_hybrid: extract_iocs fallo en chunk {i}: {exc}", stacklevel=2, ) ioc_dicts = [] ioc_entities = [_ioc_dict_to_candidate(d, i) for d in ioc_dicts] # ── Capa 2: GLiNER ────────────────────────────────────────────────── try: gliner_entities = extract_entities_gliner( text=chunk, entity_schema=entity_schema, model=gliner_model, threshold=confidence_threshold, ) except Exception as exc: warnings.warn( f"extract_graph_hybrid: extract_entities_gliner fallo en chunk {i}: {exc}", stacklevel=2, ) gliner_entities = [] for ent in gliner_entities: if i not in ent.source_chunk_indices: ent.source_chunk_indices.append(i) chunk_entities = _merge_entities_dedup_by_name_type(ioc_entities, gliner_entities) # ── Capa 3: LLM entity fallback (opcional) ────────────────────────── needs_entity_llm = ( len(chunk_entities) < min_entities_per_chunk or _mean_confidence(gliner_entities) < confidence_threshold ) if needs_entity_llm and llm_chat_json is not None: try: llm_entities = extract_entities_llm( text=chunk, entity_schema=entity_schema, llm_chat_json=llm_chat_json, language_instruction=languages, ) except Exception as exc: warnings.warn( f"extract_graph_hybrid: extract_entities_llm fallo en chunk {i}: {exc}", stacklevel=2, ) llm_entities = [] for ent in llm_entities: if i not in ent.source_chunk_indices: ent.source_chunk_indices.append(i) chunk_entities = _merge_entities_dedup_by_name_type(chunk_entities, llm_entities) all_entities.extend(chunk_entities) # ── Capa 4: GLiREL ────────────────────────────────────────────────── if len(chunk_entities) >= 2: try: glirel_relations = extract_relations_glirel( text=chunk, entities=chunk_entities, relation_types=relation_types, model=glirel_model, threshold=confidence_threshold, ) except Exception as exc: warnings.warn( f"extract_graph_hybrid: extract_relations_glirel fallo en chunk {i}: {exc}", stacklevel=2, ) glirel_relations = [] else: glirel_relations = [] for rel in glirel_relations: rel.source_chunk_index = i # ── Capa 5: LLM relation fallback (opcional) ──────────────────────── if ( llm_chat_json is not None and len(chunk_entities) >= 2 and not glirel_relations ): try: llm_relations = extract_relations_llm( text=chunk, entities=chunk_entities, relation_types=relation_types, llm_chat_json=llm_chat_json, language_instruction=languages, ) except Exception as exc: warnings.warn( f"extract_graph_hybrid: extract_relations_llm fallo en chunk {i}: {exc}", stacklevel=2, ) llm_relations = [] for rel in llm_relations: rel.source_chunk_index = i glirel_relations.extend(llm_relations) all_relations.extend(glirel_relations) return all_entities, all_relations