4f743e0840
Pipeline en cascada que combina extract_iocs (regex, coste 0), GLiNER (zero-shot NER), GLiREL (zero-shot RE) y un fallback LLM opcional para chunks con baja confianza o pocas entidades. Devuelve listas concatenadas listas para deduplicate_entities/deduplicate_relations. Cierra 0040.
261 lines
10 KiB
Python
261 lines
10 KiB
Python
"""Pipeline hibrido: extract_iocs + GLiNER + GLiREL + LLM fallback."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import warnings
|
|
from typing import Any, Callable
|
|
|
|
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
if _ROOT not in sys.path:
|
|
sys.path.insert(0, _ROOT)
|
|
|
|
from python.functions.cybersecurity.extract_iocs import extract_iocs
|
|
from python.functions.datascience.extract_entities_gliner import extract_entities_gliner
|
|
from python.functions.datascience.extract_relations_glirel import extract_relations_glirel
|
|
from python.functions.datascience.extract_entities_llm import extract_entities_llm
|
|
from python.functions.datascience.extract_relations_llm import extract_relations_llm
|
|
from python.types.datascience.entity_candidate import EntityCandidate
|
|
from python.types.datascience.relation_candidate import RelationCandidate
|
|
|
|
|
|
_IOC_TYPE_REF = {
|
|
"email": "ioc_email",
|
|
"ip_address": "ioc_ip_address",
|
|
"domain": "ioc_domain",
|
|
"file_hash": "ioc_file_hash",
|
|
"crypto_wallet": "ioc_crypto_wallet",
|
|
"cve_id": "ioc_cve_id",
|
|
"mac_address": "ioc_mac_address",
|
|
"phone_number": "ioc_phone_number",
|
|
}
|
|
|
|
_IOC_LABEL = {
|
|
"email": "Email",
|
|
"ip_address": "IPAddress",
|
|
"domain": "Domain",
|
|
"file_hash": "FileHash",
|
|
"crypto_wallet": "CryptoWallet",
|
|
"cve_id": "CVE",
|
|
"mac_address": "MACAddress",
|
|
"phone_number": "PhoneNumber",
|
|
}
|
|
|
|
|
|
def _ioc_dict_to_candidate(ioc: dict, chunk_index: int) -> EntityCandidate:
|
|
"""Convierte un dict de extract_iocs a EntityCandidate.
|
|
|
|
Anota offsets en `attributes['start'/'end']` para que extract_relations_glirel
|
|
pueda mapearlos a tokens sin fallback `text.find`.
|
|
"""
|
|
ioc_type = ioc.get("type", "")
|
|
return EntityCandidate(
|
|
name=ioc.get("value", ""),
|
|
type_ref=_IOC_TYPE_REF.get(ioc_type, f"ioc_{ioc_type}"),
|
|
type_label=_IOC_LABEL.get(ioc_type, ioc_type),
|
|
attributes={"start": ioc.get("start", -1), "end": ioc.get("end", -1)},
|
|
confidence=1.0,
|
|
source_chunk_indices=[chunk_index],
|
|
)
|
|
|
|
|
|
def _mean_confidence(entities: list[EntityCandidate]) -> float:
|
|
if not entities:
|
|
return 0.0
|
|
return sum(e.confidence for e in entities) / len(entities)
|
|
|
|
|
|
def _merge_entities_dedup_by_name_type(
|
|
base: list[EntityCandidate],
|
|
extra: list[EntityCandidate],
|
|
) -> list[EntityCandidate]:
|
|
"""Anade `extra` a `base` evitando duplicados exactos (name + type_ref).
|
|
|
|
No usa fuzzy: la deduplicacion final la hace el caller con
|
|
`deduplicate_entities`. Aqui solo evita el caso trivial de meter dos veces
|
|
la misma cadena con el mismo type_ref dentro del mismo chunk.
|
|
"""
|
|
seen = {(e.name, e.type_ref) for e in base}
|
|
out = list(base)
|
|
for e in extra:
|
|
key = (e.name, e.type_ref)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(e)
|
|
return out
|
|
|
|
|
|
def extract_graph_hybrid(
|
|
chunks: list[str],
|
|
entity_schema: list[dict],
|
|
relation_types: list[str],
|
|
gliner_model: Any,
|
|
glirel_model: Any,
|
|
llm_chat_json: Callable[[list[dict]], dict] | None = None,
|
|
ioc_types: list[str] | None = None,
|
|
confidence_threshold: float = 0.6,
|
|
languages: str = "Respond in Spanish.",
|
|
min_entities_per_chunk: int = 2,
|
|
) -> tuple[list[EntityCandidate], list[RelationCandidate]]:
|
|
"""Extrae triplets `(entidad, relacion, entidad)` combinando regex + GLiNER + GLiREL + LLM fallback.
|
|
|
|
Cascada por chunk:
|
|
1. `extract_iocs(chunk, ioc_types)` → entidades tecnicas (precision 100%, coste 0).
|
|
2. `extract_entities_gliner(chunk, entity_schema, gliner_model)` → semanticas zero-shot.
|
|
3. Si entidades < `min_entities_per_chunk` o `mean(confidence) < confidence_threshold`
|
|
**y** hay `llm_chat_json` → `extract_entities_llm` para rellenar gaps.
|
|
4. `extract_relations_glirel(chunk, entidades_chunk, relation_types, glirel_model)`.
|
|
5. Si no salieron relaciones con >=2 entidades **y** hay `llm_chat_json` →
|
|
`extract_relations_llm` para esos chunks.
|
|
|
|
Args:
|
|
chunks: Lista de fragmentos de texto a procesar (ya tokenizados/cortados).
|
|
entity_schema: Schema para GLiNER y LLM. Lista de dicts con
|
|
`type_ref`, `label` y opcional `metadata_fields`.
|
|
relation_types: Tipos de relacion permitidos para GLiREL/LLM.
|
|
gliner_model: Instancia GLiNER cargada con `gliner_load_model`.
|
|
glirel_model: Instancia GLiREL cargada con `glirel_load_model`.
|
|
llm_chat_json: Funcion inyectada que recibe messages OpenAI-style y
|
|
retorna dict JSON. Si es None, no se invoca fallback LLM (ahorro maximo).
|
|
ioc_types: Subset de tipos para `extract_iocs`. None = todos.
|
|
confidence_threshold: Bajo este umbral se invoca el LLM como fallback.
|
|
languages: Instruccion de idioma para el LLM (passthrough a las funciones LLM).
|
|
min_entities_per_chunk: Si un chunk tiene menos entidades que esto,
|
|
se considera "complejo" y se llama al LLM.
|
|
|
|
Returns:
|
|
Tupla `(entities, relations)` con todas las candidatas concatenadas.
|
|
El caller debe pasar por `deduplicate_entities` y `deduplicate_relations`
|
|
antes de persistir. Cada `EntityCandidate` lleva
|
|
`source_chunk_indices=[i]` y cada `RelationCandidate`
|
|
lleva `source_chunk_index=i`.
|
|
|
|
Raises:
|
|
ValueError: Si `entity_schema` esta vacio o `chunks` no es lista.
|
|
"""
|
|
if not isinstance(chunks, list):
|
|
raise ValueError("chunks debe ser una lista")
|
|
if not entity_schema:
|
|
raise ValueError("entity_schema no puede estar vacio")
|
|
|
|
all_entities: list[EntityCandidate] = []
|
|
all_relations: list[RelationCandidate] = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
if not chunk or not chunk.strip():
|
|
continue
|
|
|
|
# ── Capa 1: regex IoCs ──────────────────────────────────────────────
|
|
try:
|
|
ioc_dicts = extract_iocs(chunk, ioc_types)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_graph_hybrid: extract_iocs fallo en chunk {i}: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
ioc_dicts = []
|
|
|
|
ioc_entities = [_ioc_dict_to_candidate(d, i) for d in ioc_dicts]
|
|
|
|
# ── Capa 2: GLiNER ──────────────────────────────────────────────────
|
|
try:
|
|
gliner_entities = extract_entities_gliner(
|
|
text=chunk,
|
|
entity_schema=entity_schema,
|
|
model=gliner_model,
|
|
threshold=confidence_threshold,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_graph_hybrid: extract_entities_gliner fallo en chunk {i}: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
gliner_entities = []
|
|
|
|
for ent in gliner_entities:
|
|
if i not in ent.source_chunk_indices:
|
|
ent.source_chunk_indices.append(i)
|
|
|
|
chunk_entities = _merge_entities_dedup_by_name_type(ioc_entities, gliner_entities)
|
|
|
|
# ── Capa 3: LLM entity fallback (opcional) ──────────────────────────
|
|
needs_entity_llm = (
|
|
len(chunk_entities) < min_entities_per_chunk
|
|
or _mean_confidence(gliner_entities) < confidence_threshold
|
|
)
|
|
if needs_entity_llm and llm_chat_json is not None:
|
|
try:
|
|
llm_entities = extract_entities_llm(
|
|
text=chunk,
|
|
entity_schema=entity_schema,
|
|
llm_chat_json=llm_chat_json,
|
|
language_instruction=languages,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_graph_hybrid: extract_entities_llm fallo en chunk {i}: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
llm_entities = []
|
|
|
|
for ent in llm_entities:
|
|
if i not in ent.source_chunk_indices:
|
|
ent.source_chunk_indices.append(i)
|
|
|
|
chunk_entities = _merge_entities_dedup_by_name_type(chunk_entities, llm_entities)
|
|
|
|
all_entities.extend(chunk_entities)
|
|
|
|
# ── Capa 4: GLiREL ──────────────────────────────────────────────────
|
|
if len(chunk_entities) >= 2:
|
|
try:
|
|
glirel_relations = extract_relations_glirel(
|
|
text=chunk,
|
|
entities=chunk_entities,
|
|
relation_types=relation_types,
|
|
model=glirel_model,
|
|
threshold=confidence_threshold,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_graph_hybrid: extract_relations_glirel fallo en chunk {i}: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
glirel_relations = []
|
|
else:
|
|
glirel_relations = []
|
|
|
|
for rel in glirel_relations:
|
|
rel.source_chunk_index = i
|
|
|
|
# ── Capa 5: LLM relation fallback (opcional) ────────────────────────
|
|
if (
|
|
llm_chat_json is not None
|
|
and len(chunk_entities) >= 2
|
|
and not glirel_relations
|
|
):
|
|
try:
|
|
llm_relations = extract_relations_llm(
|
|
text=chunk,
|
|
entities=chunk_entities,
|
|
relation_types=relation_types,
|
|
llm_chat_json=llm_chat_json,
|
|
language_instruction=languages,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_graph_hybrid: extract_relations_llm fallo en chunk {i}: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
llm_relations = []
|
|
|
|
for rel in llm_relations:
|
|
rel.source_chunk_index = i
|
|
glirel_relations.extend(llm_relations)
|
|
|
|
all_relations.extend(glirel_relations)
|
|
|
|
return all_entities, all_relations
|