"""Extrae entidades de un chunk de texto usando GLiNER (zero-shot NER).""" from __future__ import annotations import os import sys import warnings from typing import Any sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..")) from python.types.datascience.entity_candidate import EntityCandidate def _build_label_maps(entity_schema: list[dict]) -> tuple[list[str], dict[str, str], dict[str, str]]: """Traduce el schema al formato que espera GLiNER. Returns: labels: lista de strings (lo que se pasa a model.predict_entities). label_to_type_ref: dict para mapear el label predicho al type_ref. label_to_label: dict label -> label legible (para `type_label`). """ labels: list[str] = [] label_to_type_ref: dict[str, str] = {} label_to_label: dict[str, str] = {} for entry in entity_schema: label = entry.get("label", "").strip() type_ref = entry.get("type_ref", "").strip() if not label or not type_ref: continue labels.append(label) # last-wins si dos type_refs comparten label. label_to_type_ref[label] = type_ref label_to_label[label] = label return labels, label_to_type_ref, label_to_label def extract_entities_gliner( text: str, entity_schema: list[dict], model: Any, threshold: float = 0.5, flat_ner: bool = True, ) -> list[EntityCandidate]: """Extrae entidades zero-shot con GLiNER, contrato drop-in con `extract_entities_llm`. Cada `entity_schema` entry usa su `label` como label de GLiNER. El type_ref se reconstruye desde `label_to_type_ref`. Offsets de span se anotan en `attributes["start"]` y `attributes["end"]` para que el caller pueda reconciliar con regex IoCs (ver `extract_iocs`). Args: text: Chunk a analizar. entity_schema: Misma estructura que `extract_entities_llm` — lista de dicts con `type_ref` y `label`. model: Instancia GLiNER cargada con `gliner_load_model`. Inyectada por el caller para evitar penalty de carga en batch. threshold: Score minimo para aceptar una entidad (0.0-1.0). flat_ner: True = sin entidades anidadas. False = anidadas (puede producir spans solapados). Returns: Lista de EntityCandidate. Vacia si el modelo no detecta nada o si entity_schema queda sin labels validos tras filtrar. Raises: ValueError: Si entity_schema esta vacio. """ if not entity_schema: raise ValueError("entity_schema no puede estar vacio") labels, label_to_type_ref, label_to_label = _build_label_maps(entity_schema) if not labels: warnings.warn( "extract_entities_gliner: ningun entry del schema tiene " "label+type_ref validos; retornando vacio.", stacklevel=2, ) return [] try: raw_entities = model.predict_entities( text, labels, threshold=threshold, flat_ner=flat_ner, ) except Exception as exc: warnings.warn( f"extract_entities_gliner: error invocando model.predict_entities: {exc}", stacklevel=2, ) return [] if not isinstance(raw_entities, list): warnings.warn( "extract_entities_gliner: predict_entities no retorno una lista; " "retornando vacio.", stacklevel=2, ) return [] candidates: list[EntityCandidate] = [] for item in raw_entities: if not isinstance(item, dict): continue span_text = item.get("text", "") label = item.get("label", "") if not span_text or label not in label_to_type_ref: continue score = item.get("score", 0.0) if not isinstance(score, (int, float)): score = 0.0 confidence = float(max(0.0, min(1.0, score))) start = item.get("start") end = item.get("end") attributes: dict = {} if isinstance(start, int): attributes["start"] = start if isinstance(end, int): attributes["end"] = end candidates.append( EntityCandidate( name=span_text, type_ref=label_to_type_ref[label], type_label=label_to_label.get(label, label), attributes=attributes, confidence=confidence, ) ) return candidates