b10c545479
Funciones nuevas en python/functions/datascience/: - gliner_load_model: carga + cachea modelo GLiNER por (name, device). device='auto' resuelve a cuda/cpu segun torch.cuda.is_available, sin fallar si torch no esta instalado. ImportError claro si falta gliner. - extract_entities_gliner: contrato drop-in de extract_entities_llm (mismo entity_schema, mismo list[EntityCandidate]). El caller inyecta el modelo (cargado UNA vez por proceso). Anota offsets start/end en attributes para reconciliar con extract_iocs (issue 0040). Diferencias vs LLM extractor: - 50-200x mas rapido en GPU, 0 USD/token. - Malo con IoCs tecnicos (lo cubre 0037). - Threshold y flat_ner ajustables por dominio. pyproject.toml: gliner como extra opcional `[nlp]` para no inflar el .venv de quien no use NER. Instalacion: `uv pip install -e '.[nlp]'`. Refs #0038 — Desbloquea 0039 (GLiREL) y 0040 (pipeline hibrido). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
"""Extrae entidades de un chunk de texto usando GLiNER (zero-shot NER)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import warnings
|
|
from typing import Any
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
|
|
from python.types.datascience.entity_candidate import EntityCandidate
|
|
|
|
|
|
def _build_label_maps(entity_schema: list[dict]) -> tuple[list[str], dict[str, str], dict[str, str]]:
|
|
"""Traduce el schema al formato que espera GLiNER.
|
|
|
|
Returns:
|
|
labels: lista de strings (lo que se pasa a model.predict_entities).
|
|
label_to_type_ref: dict para mapear el label predicho al type_ref.
|
|
label_to_label: dict label -> label legible (para `type_label`).
|
|
"""
|
|
labels: list[str] = []
|
|
label_to_type_ref: dict[str, str] = {}
|
|
label_to_label: dict[str, str] = {}
|
|
for entry in entity_schema:
|
|
label = entry.get("label", "").strip()
|
|
type_ref = entry.get("type_ref", "").strip()
|
|
if not label or not type_ref:
|
|
continue
|
|
labels.append(label)
|
|
# last-wins si dos type_refs comparten label.
|
|
label_to_type_ref[label] = type_ref
|
|
label_to_label[label] = label
|
|
return labels, label_to_type_ref, label_to_label
|
|
|
|
|
|
def extract_entities_gliner(
|
|
text: str,
|
|
entity_schema: list[dict],
|
|
model: Any,
|
|
threshold: float = 0.5,
|
|
flat_ner: bool = True,
|
|
) -> list[EntityCandidate]:
|
|
"""Extrae entidades zero-shot con GLiNER, contrato drop-in con `extract_entities_llm`.
|
|
|
|
Cada `entity_schema` entry usa su `label` como label de GLiNER. El
|
|
type_ref se reconstruye desde `label_to_type_ref`. Offsets de span
|
|
se anotan en `attributes["start"]` y `attributes["end"]` para que
|
|
el caller pueda reconciliar con regex IoCs (ver `extract_iocs`).
|
|
|
|
Args:
|
|
text: Chunk a analizar.
|
|
entity_schema: Misma estructura que `extract_entities_llm` —
|
|
lista de dicts con `type_ref` y `label`.
|
|
model: Instancia GLiNER cargada con `gliner_load_model`. Inyectada
|
|
por el caller para evitar penalty de carga en batch.
|
|
threshold: Score minimo para aceptar una entidad (0.0-1.0).
|
|
flat_ner: True = sin entidades anidadas. False = anidadas (puede
|
|
producir spans solapados).
|
|
|
|
Returns:
|
|
Lista de EntityCandidate. Vacia si el modelo no detecta nada o
|
|
si entity_schema queda sin labels validos tras filtrar.
|
|
|
|
Raises:
|
|
ValueError: Si entity_schema esta vacio.
|
|
"""
|
|
if not entity_schema:
|
|
raise ValueError("entity_schema no puede estar vacio")
|
|
|
|
labels, label_to_type_ref, label_to_label = _build_label_maps(entity_schema)
|
|
if not labels:
|
|
warnings.warn(
|
|
"extract_entities_gliner: ningun entry del schema tiene "
|
|
"label+type_ref validos; retornando vacio.",
|
|
stacklevel=2,
|
|
)
|
|
return []
|
|
|
|
try:
|
|
raw_entities = model.predict_entities(
|
|
text,
|
|
labels,
|
|
threshold=threshold,
|
|
flat_ner=flat_ner,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extract_entities_gliner: error invocando model.predict_entities: {exc}",
|
|
stacklevel=2,
|
|
)
|
|
return []
|
|
|
|
if not isinstance(raw_entities, list):
|
|
warnings.warn(
|
|
"extract_entities_gliner: predict_entities no retorno una lista; "
|
|
"retornando vacio.",
|
|
stacklevel=2,
|
|
)
|
|
return []
|
|
|
|
candidates: list[EntityCandidate] = []
|
|
for item in raw_entities:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
span_text = item.get("text", "")
|
|
label = item.get("label", "")
|
|
if not span_text or label not in label_to_type_ref:
|
|
continue
|
|
|
|
score = item.get("score", 0.0)
|
|
if not isinstance(score, (int, float)):
|
|
score = 0.0
|
|
confidence = float(max(0.0, min(1.0, score)))
|
|
|
|
start = item.get("start")
|
|
end = item.get("end")
|
|
attributes: dict = {}
|
|
if isinstance(start, int):
|
|
attributes["start"] = start
|
|
if isinstance(end, int):
|
|
attributes["end"] = end
|
|
|
|
candidates.append(
|
|
EntityCandidate(
|
|
name=span_text,
|
|
type_ref=label_to_type_ref[label],
|
|
type_label=label_to_label.get(label, label),
|
|
attributes=attributes,
|
|
confidence=confidence,
|
|
)
|
|
)
|
|
|
|
return candidates
|