Files
egutierrez 4f743e0840 feat(pipelines): extract_graph_hybrid (regex + GLiNER + GLiREL + LLM fallback)
Pipeline en cascada que combina extract_iocs (regex, coste 0), GLiNER
(zero-shot NER), GLiREL (zero-shot RE) y un fallback LLM opcional para
chunks con baja confianza o pocas entidades. Devuelve listas concatenadas
listas para deduplicate_entities/deduplicate_relations.

Cierra 0040.
2026-04-30 16:52:46 +02:00

261 lines
10 KiB
Python

"""Pipeline hibrido: extract_iocs + GLiNER + GLiREL + LLM fallback."""
from __future__ import annotations
import os
import sys
import warnings
from typing import Any, Callable
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from python.functions.cybersecurity.extract_iocs import extract_iocs
from python.functions.datascience.extract_entities_gliner import extract_entities_gliner
from python.functions.datascience.extract_relations_glirel import extract_relations_glirel
from python.functions.datascience.extract_entities_llm import extract_entities_llm
from python.functions.datascience.extract_relations_llm import extract_relations_llm
from python.types.datascience.entity_candidate import EntityCandidate
from python.types.datascience.relation_candidate import RelationCandidate
_IOC_TYPE_REF = {
"email": "ioc_email",
"ip_address": "ioc_ip_address",
"domain": "ioc_domain",
"file_hash": "ioc_file_hash",
"crypto_wallet": "ioc_crypto_wallet",
"cve_id": "ioc_cve_id",
"mac_address": "ioc_mac_address",
"phone_number": "ioc_phone_number",
}
_IOC_LABEL = {
"email": "Email",
"ip_address": "IPAddress",
"domain": "Domain",
"file_hash": "FileHash",
"crypto_wallet": "CryptoWallet",
"cve_id": "CVE",
"mac_address": "MACAddress",
"phone_number": "PhoneNumber",
}
def _ioc_dict_to_candidate(ioc: dict, chunk_index: int) -> EntityCandidate:
"""Convierte un dict de extract_iocs a EntityCandidate.
Anota offsets en `attributes['start'/'end']` para que extract_relations_glirel
pueda mapearlos a tokens sin fallback `text.find`.
"""
ioc_type = ioc.get("type", "")
return EntityCandidate(
name=ioc.get("value", ""),
type_ref=_IOC_TYPE_REF.get(ioc_type, f"ioc_{ioc_type}"),
type_label=_IOC_LABEL.get(ioc_type, ioc_type),
attributes={"start": ioc.get("start", -1), "end": ioc.get("end", -1)},
confidence=1.0,
source_chunk_indices=[chunk_index],
)
def _mean_confidence(entities: list[EntityCandidate]) -> float:
if not entities:
return 0.0
return sum(e.confidence for e in entities) / len(entities)
def _merge_entities_dedup_by_name_type(
base: list[EntityCandidate],
extra: list[EntityCandidate],
) -> list[EntityCandidate]:
"""Anade `extra` a `base` evitando duplicados exactos (name + type_ref).
No usa fuzzy: la deduplicacion final la hace el caller con
`deduplicate_entities`. Aqui solo evita el caso trivial de meter dos veces
la misma cadena con el mismo type_ref dentro del mismo chunk.
"""
seen = {(e.name, e.type_ref) for e in base}
out = list(base)
for e in extra:
key = (e.name, e.type_ref)
if key in seen:
continue
seen.add(key)
out.append(e)
return out
def extract_graph_hybrid(
chunks: list[str],
entity_schema: list[dict],
relation_types: list[str],
gliner_model: Any,
glirel_model: Any,
llm_chat_json: Callable[[list[dict]], dict] | None = None,
ioc_types: list[str] | None = None,
confidence_threshold: float = 0.6,
languages: str = "Respond in Spanish.",
min_entities_per_chunk: int = 2,
) -> tuple[list[EntityCandidate], list[RelationCandidate]]:
"""Extrae triplets `(entidad, relacion, entidad)` combinando regex + GLiNER + GLiREL + LLM fallback.
Cascada por chunk:
1. `extract_iocs(chunk, ioc_types)` → entidades tecnicas (precision 100%, coste 0).
2. `extract_entities_gliner(chunk, entity_schema, gliner_model)` → semanticas zero-shot.
3. Si entidades < `min_entities_per_chunk` o `mean(confidence) < confidence_threshold`
**y** hay `llm_chat_json` → `extract_entities_llm` para rellenar gaps.
4. `extract_relations_glirel(chunk, entidades_chunk, relation_types, glirel_model)`.
5. Si no salieron relaciones con >=2 entidades **y** hay `llm_chat_json` →
`extract_relations_llm` para esos chunks.
Args:
chunks: Lista de fragmentos de texto a procesar (ya tokenizados/cortados).
entity_schema: Schema para GLiNER y LLM. Lista de dicts con
`type_ref`, `label` y opcional `metadata_fields`.
relation_types: Tipos de relacion permitidos para GLiREL/LLM.
gliner_model: Instancia GLiNER cargada con `gliner_load_model`.
glirel_model: Instancia GLiREL cargada con `glirel_load_model`.
llm_chat_json: Funcion inyectada que recibe messages OpenAI-style y
retorna dict JSON. Si es None, no se invoca fallback LLM (ahorro maximo).
ioc_types: Subset de tipos para `extract_iocs`. None = todos.
confidence_threshold: Bajo este umbral se invoca el LLM como fallback.
languages: Instruccion de idioma para el LLM (passthrough a las funciones LLM).
min_entities_per_chunk: Si un chunk tiene menos entidades que esto,
se considera "complejo" y se llama al LLM.
Returns:
Tupla `(entities, relations)` con todas las candidatas concatenadas.
El caller debe pasar por `deduplicate_entities` y `deduplicate_relations`
antes de persistir. Cada `EntityCandidate` lleva
`source_chunk_indices=[i]` y cada `RelationCandidate`
lleva `source_chunk_index=i`.
Raises:
ValueError: Si `entity_schema` esta vacio o `chunks` no es lista.
"""
if not isinstance(chunks, list):
raise ValueError("chunks debe ser una lista")
if not entity_schema:
raise ValueError("entity_schema no puede estar vacio")
all_entities: list[EntityCandidate] = []
all_relations: list[RelationCandidate] = []
for i, chunk in enumerate(chunks):
if not chunk or not chunk.strip():
continue
# ── Capa 1: regex IoCs ──────────────────────────────────────────────
try:
ioc_dicts = extract_iocs(chunk, ioc_types)
except Exception as exc:
warnings.warn(
f"extract_graph_hybrid: extract_iocs fallo en chunk {i}: {exc}",
stacklevel=2,
)
ioc_dicts = []
ioc_entities = [_ioc_dict_to_candidate(d, i) for d in ioc_dicts]
# ── Capa 2: GLiNER ──────────────────────────────────────────────────
try:
gliner_entities = extract_entities_gliner(
text=chunk,
entity_schema=entity_schema,
model=gliner_model,
threshold=confidence_threshold,
)
except Exception as exc:
warnings.warn(
f"extract_graph_hybrid: extract_entities_gliner fallo en chunk {i}: {exc}",
stacklevel=2,
)
gliner_entities = []
for ent in gliner_entities:
if i not in ent.source_chunk_indices:
ent.source_chunk_indices.append(i)
chunk_entities = _merge_entities_dedup_by_name_type(ioc_entities, gliner_entities)
# ── Capa 3: LLM entity fallback (opcional) ──────────────────────────
needs_entity_llm = (
len(chunk_entities) < min_entities_per_chunk
or _mean_confidence(gliner_entities) < confidence_threshold
)
if needs_entity_llm and llm_chat_json is not None:
try:
llm_entities = extract_entities_llm(
text=chunk,
entity_schema=entity_schema,
llm_chat_json=llm_chat_json,
language_instruction=languages,
)
except Exception as exc:
warnings.warn(
f"extract_graph_hybrid: extract_entities_llm fallo en chunk {i}: {exc}",
stacklevel=2,
)
llm_entities = []
for ent in llm_entities:
if i not in ent.source_chunk_indices:
ent.source_chunk_indices.append(i)
chunk_entities = _merge_entities_dedup_by_name_type(chunk_entities, llm_entities)
all_entities.extend(chunk_entities)
# ── Capa 4: GLiREL ──────────────────────────────────────────────────
if len(chunk_entities) >= 2:
try:
glirel_relations = extract_relations_glirel(
text=chunk,
entities=chunk_entities,
relation_types=relation_types,
model=glirel_model,
threshold=confidence_threshold,
)
except Exception as exc:
warnings.warn(
f"extract_graph_hybrid: extract_relations_glirel fallo en chunk {i}: {exc}",
stacklevel=2,
)
glirel_relations = []
else:
glirel_relations = []
for rel in glirel_relations:
rel.source_chunk_index = i
# ── Capa 5: LLM relation fallback (opcional) ────────────────────────
if (
llm_chat_json is not None
and len(chunk_entities) >= 2
and not glirel_relations
):
try:
llm_relations = extract_relations_llm(
text=chunk,
entities=chunk_entities,
relation_types=relation_types,
llm_chat_json=llm_chat_json,
language_instruction=languages,
)
except Exception as exc:
warnings.warn(
f"extract_graph_hybrid: extract_relations_llm fallo en chunk {i}: {exc}",
stacklevel=2,
)
llm_relations = []
for rel in llm_relations:
rel.source_chunk_index = i
glirel_relations.extend(llm_relations)
all_relations.extend(glirel_relations)
return all_entities, all_relations