Files
fn_registry/python/functions/datascience/extract_entities_gliner.py
egutierrez 0076870e99 feat(datascience): GLiNER entity extractor (zero-shot NER) drop-in con LLM
Funciones nuevas en python/functions/datascience/:
- gliner_load_model: carga + cachea modelo GLiNER por (name, device).
  device='auto' resuelve a cuda/cpu segun torch.cuda.is_available, sin
  fallar si torch no esta instalado. ImportError claro si falta gliner.
- extract_entities_gliner: contrato drop-in de extract_entities_llm
  (mismo entity_schema, mismo list[EntityCandidate]). El caller inyecta
  el modelo (cargado UNA vez por proceso). Anota offsets start/end en
  attributes para reconciliar con extract_iocs (issue 0040).

Diferencias vs LLM extractor:
- 50-200x mas rapido en GPU, 0 USD/token.
- Malo con IoCs tecnicos (lo cubre 0037).
- Threshold y flat_ner ajustables por dominio.

pyproject.toml: gliner como extra opcional `[nlp]` para no inflar el
.venv de quien no use NER. Instalacion: `uv pip install -e '.[nlp]'`.

Refs #0038 — Desbloquea 0039 (GLiREL) y 0040 (pipeline hibrido).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 16:41:30 +02:00

137 lines
4.4 KiB
Python

"""Extrae entidades de un chunk de texto usando GLiNER (zero-shot NER)."""
from __future__ import annotations
import os
import sys
import warnings
from typing import Any
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from python.types.datascience.entity_candidate import EntityCandidate
def _build_label_maps(entity_schema: list[dict]) -> tuple[list[str], dict[str, str], dict[str, str]]:
"""Traduce el schema al formato que espera GLiNER.
Returns:
labels: lista de strings (lo que se pasa a model.predict_entities).
label_to_type_ref: dict para mapear el label predicho al type_ref.
label_to_label: dict label -> label legible (para `type_label`).
"""
labels: list[str] = []
label_to_type_ref: dict[str, str] = {}
label_to_label: dict[str, str] = {}
for entry in entity_schema:
label = entry.get("label", "").strip()
type_ref = entry.get("type_ref", "").strip()
if not label or not type_ref:
continue
labels.append(label)
# last-wins si dos type_refs comparten label.
label_to_type_ref[label] = type_ref
label_to_label[label] = label
return labels, label_to_type_ref, label_to_label
def extract_entities_gliner(
text: str,
entity_schema: list[dict],
model: Any,
threshold: float = 0.5,
flat_ner: bool = True,
) -> list[EntityCandidate]:
"""Extrae entidades zero-shot con GLiNER, contrato drop-in con `extract_entities_llm`.
Cada `entity_schema` entry usa su `label` como label de GLiNER. El
type_ref se reconstruye desde `label_to_type_ref`. Offsets de span
se anotan en `attributes["start"]` y `attributes["end"]` para que
el caller pueda reconciliar con regex IoCs (ver `extract_iocs`).
Args:
text: Chunk a analizar.
entity_schema: Misma estructura que `extract_entities_llm` —
lista de dicts con `type_ref` y `label`.
model: Instancia GLiNER cargada con `gliner_load_model`. Inyectada
por el caller para evitar penalty de carga en batch.
threshold: Score minimo para aceptar una entidad (0.0-1.0).
flat_ner: True = sin entidades anidadas. False = anidadas (puede
producir spans solapados).
Returns:
Lista de EntityCandidate. Vacia si el modelo no detecta nada o
si entity_schema queda sin labels validos tras filtrar.
Raises:
ValueError: Si entity_schema esta vacio.
"""
if not entity_schema:
raise ValueError("entity_schema no puede estar vacio")
labels, label_to_type_ref, label_to_label = _build_label_maps(entity_schema)
if not labels:
warnings.warn(
"extract_entities_gliner: ningun entry del schema tiene "
"label+type_ref validos; retornando vacio.",
stacklevel=2,
)
return []
try:
raw_entities = model.predict_entities(
text,
labels,
threshold=threshold,
flat_ner=flat_ner,
)
except Exception as exc:
warnings.warn(
f"extract_entities_gliner: error invocando model.predict_entities: {exc}",
stacklevel=2,
)
return []
if not isinstance(raw_entities, list):
warnings.warn(
"extract_entities_gliner: predict_entities no retorno una lista; "
"retornando vacio.",
stacklevel=2,
)
return []
candidates: list[EntityCandidate] = []
for item in raw_entities:
if not isinstance(item, dict):
continue
span_text = item.get("text", "")
label = item.get("label", "")
if not span_text or label not in label_to_type_ref:
continue
score = item.get("score", 0.0)
if not isinstance(score, (int, float)):
score = 0.0
confidence = float(max(0.0, min(1.0, score)))
start = item.get("start")
end = item.get("end")
attributes: dict = {}
if isinstance(start, int):
attributes["start"] = start
if isinstance(end, int):
attributes["end"] = end
candidates.append(
EntityCandidate(
name=span_text,
type_ref=label_to_type_ref[label],
type_label=label_to_label.get(label, label),
attributes=attributes,
confidence=confidence,
)
)
return candidates