--- name: extract_graph_hybrid kind: pipeline lang: py domain: pipelines version: "1.0.0" purity: impure signature: "def extract_graph_hybrid(chunks: list[str], entity_schema: list[dict], relation_types: list[str], gliner_model: Any, glirel_model: Any, llm_chat_json: Callable[[list[dict]], dict] | None = None, ioc_types: list[str] | None = None, confidence_threshold: float = 0.6, languages: str = 'Respond in Spanish.', min_entities_per_chunk: int = 2) -> tuple[list[EntityCandidate], list[RelationCandidate]]" description: "Pipeline hibrido en cascada que combina extract_iocs (regex, coste 0), GLiNER (zero-shot NER, coste bajo), GLiREL (zero-shot RE) y un LLM fallback opcional para chunks complejos o de baja confianza. Devuelve listas concatenadas listas para deduplicate_entities/deduplicate_relations." tags: [pipeline, extraction, entities, relations, gliner, glirel, ioc, regex, llm, nlp, datascience, cybersecurity, hybrid] uses_functions: - extract_iocs_py_cybersecurity - extract_entities_gliner_py_datascience - extract_relations_glirel_py_datascience - extract_entities_llm_py_datascience - extract_relations_llm_py_datascience uses_types: - entity_candidate_py_datascience - relation_candidate_py_datascience returns: - entity_candidate_py_datascience - relation_candidate_py_datascience returns_optional: false error_type: "error_go_core" imports: - typing.Any - typing.Callable - warnings params: - name: chunks desc: "Lista de fragmentos de texto ya cortados (p.ej. via split_text_into_chunks)." - name: entity_schema desc: "Schema para GLiNER y LLM. Lista de dicts con type_ref, label y opcional metadata_fields." - name: relation_types desc: "Tipos de relacion permitidos para GLiREL/LLM (ej: ['operates','owns','communicates_with'])." - name: gliner_model desc: "Instancia GLiNER cargada con gliner_load_model. Inyectada por el caller." - name: glirel_model desc: "Instancia GLiREL cargada con glirel_load_model. Inyectada por el caller." - name: llm_chat_json desc: "Cliente LLM inyectado (sin acoplamiento al proveedor). Si None, no hay fallback LLM." - name: ioc_types desc: "Subset de tipos para extract_iocs (email, ip_address, domain, file_hash, ...). None = todos." - name: confidence_threshold desc: "Por debajo de este umbral, GLiNER se considera de baja confianza y se invoca el LLM." - name: languages desc: "Instruccion de idioma passthrough al LLM (ej: 'Respond in Spanish.')." - name: min_entities_per_chunk desc: "Si un chunk arroja menos entidades que esto, se invoca el LLM como fallback (default 2)." output: "Tupla (entities, relations) con candidatas concatenadas (sin deduplicar). El caller debe pasar por deduplicate_entities y deduplicate_relations." tested: true tests: - "corpus OSINT con IoCs y entidades semanticas devuelve mezcla regex+GLiNER" - "chunks vacios o con solo whitespace se saltan" - "entity_schema vacio lanza ValueError" - "chunks no-lista lanza ValueError" - "GLiNER produciendo pocas entidades dispara fallback LLM si llm_chat_json esta presente" - "sin llm_chat_json no se invoca ningun fallback LLM" - "GLiREL sin relaciones dispara fallback LLM relations" - "ioc_types acota el set de extractores regex" - "errores de extractores se capturan con warnings y no abortan el pipeline" test_file_path: "python/functions/pipelines/tests/test_extract_graph_hybrid.py" file_path: "python/functions/pipelines/extract_graph_hybrid.py" --- ## Ejemplo ```python from python.functions.pipelines.extract_graph_hybrid import extract_graph_hybrid from python.functions.datascience.gliner_load_model import gliner_load_model from python.functions.datascience.glirel_load_model import glirel_load_model from python.functions.datascience.deduplicate_entities import deduplicate_entities from python.functions.datascience.deduplicate_relations import deduplicate_relations gliner = gliner_load_model("urchade/gliner_multi-v2.1", device="auto") glirel = glirel_load_model("jackboyla/glirel-large-v0", device="auto") entity_schema = [ {"type_ref": "osint_person_go_cybersecurity", "label": "Person"}, {"type_ref": "osint_organization_go_cybersecurity", "label": "Organization"}, {"type_ref": "osint_location_go_cybersecurity", "label": "Location"}, ] relation_types = ["operates", "owns", "communicates_with", "employed_by"] chunks = [ "Alice Johnson works at OpenAI in San Francisco. Contact: alice@openai.com.", "The C2 server lives at 192.168.0.1 and resolves to evil-corp.com.", ] # Sin LLM (coste cero, solo regex + GLiNER + GLiREL) entities, relations = extract_graph_hybrid( chunks=chunks, entity_schema=entity_schema, relation_types=relation_types, gliner_model=gliner, glirel_model=glirel, llm_chat_json=None, ) # Con LLM fallback solo en chunks complejos def llm_chat_json(messages): # llamar a OpenAI/Anthropic/Ollama y devolver el JSON ya parseado ... entities, relations = extract_graph_hybrid( chunks=chunks, entity_schema=entity_schema, relation_types=relation_types, gliner_model=gliner, glirel_model=glirel, llm_chat_json=llm_chat_json, confidence_threshold=0.6, min_entities_per_chunk=2, ) # Deduplicar antes de persistir dedup = deduplicate_entities(entities, name_threshold=0.85) final_relations = deduplicate_relations(relations, dedup.name_to_id) ``` ## Algoritmo Por cada chunk: 1. **Regex (capa tecnica)** — `extract_iocs(chunk, ioc_types)` devuelve dicts `{value, start, end, type}` que se mapean a `EntityCandidate` con `type_ref` propio (`ioc_email`, `ioc_ip_address`, `ioc_domain`, ...) y `confidence=1.0`. Los offsets se anotan en `attributes['start'/'end']` para que GLiREL pueda mapearlos a tokens sin fallback `text.find`. 2. **GLiNER (capa semantica)** — `extract_entities_gliner` con el schema y el `confidence_threshold` como filtro de score. 3. **Merge** — IoCs + GLiNER deduplicados por `(name, type_ref)`. NO se colapsa fuzzy aqui; eso lo hace el caller. 4. **LLM fallback (opcional)** — si el chunk tiene menos de `min_entities_per_chunk` entidades **o** `mean(gliner_confidence) < confidence_threshold` **y** `llm_chat_json is not None`, se invoca `extract_entities_llm` y se mezcla. 5. **GLiREL (relaciones zero-shot)** — solo si hay >=2 entidades. 6. **LLM fallback de relaciones (opcional)** — si GLiREL no devolvio nada con >=2 entidades **y** hay `llm_chat_json`, se invoca `extract_relations_llm` para ese chunk. `source_chunk_indices` y `source_chunk_index` se rellenan para que `deduplicate_relations` pueda reconstruir el grafo origen→destino. ## Por que cascada y no all-LLM | Capa | Coste por 100 KB | Latencia | Calidad | |------|------------------|----------|---------| | `extract_iocs` (regex) | 0 | <50 ms | Precision 100% en IoCs tecnicos | | GLiNER (`gliner_multi-v2.1`) | 0 (modelo local, GPU/CPU) | ~1-3 s/chunk en CPU, <0.5 s en GPU | F1 0.7-0.85 en NER zero-shot | | GLiREL (`glirel-large-v0`) | 0 (modelo local) | ~2-4 s/chunk en CPU | F1 0.5-0.75 en RE zero-shot | | LLM (GPT-4 / Claude Sonnet) | $0.5-3 por 100 KB | 5-15 s/chunk | F1 0.85-0.95 | El pipeline hibrido reserva el LLM (caro y lento) para los chunks que GLiNER/GLiREL no resuelven con suficiente confianza. En corpus OSINT tipicos el LLM se invoca en <20% de los chunks → coste total 5-10x menor que un pipeline 100% LLM con perdida de calidad <5 puntos F1. ## Solapamiento IoC ↔ GLiNER GLiNER puede detectar `apple.com` como `Organization` mientras que regex lo detecta como `domain`. **Decision intencional**: ambos se conservan con `type_ref` distinto (`osint_organization_go_cybersecurity` vs `ioc_domain`). `deduplicate_entities(..., same_type_only=True)` no las mezcla. El caller decide si quiere unificar (por ejemplo, anotando una relacion `domain_of` entre las dos). ## Recomendaciones operativas - **Batch size**: ~100-200 chunks de 500-1000 caracteres por llamada al pipeline. Mas chunks → mas paralelismo aprovechable; menos chunks → menos overhead de carga del modelo. - **Latencia esperada (CPU)**: ~3-5 s/chunk sin LLM, +5-15 s/chunk si cae al LLM fallback. - **Latencia esperada (GPU)**: ~0.5-1 s/chunk sin LLM. - **Cuando bajar `confidence_threshold`**: en corpus con jerga muy especifica donde GLiNER no aprendio bien — pero esto incrementa el coste si hay LLM (mas chunks caen al fallback). - **Cuando subir `min_entities_per_chunk`**: si quieres forzar fallback LLM en chunks "ricos" para asegurar cobertura completa. ## Notas - La deduplicacion fuzzy (Levenshtein + Union-Find) la hace `deduplicate_entities` — NO replicar aqui. - Los errores de cualquier extractor en cualquier chunk se capturan con `warnings.warn` y NO abortan el pipeline (robustez sobre completitud). - Las funciones LLM aceptan `language_instruction`; aqui se pasa como `languages` (default `"Respond in Spanish."`). - Pensar en una app `apps/osint_extractor/` que use este pipeline + sigma viz como demo. Fuera de scope de este issue.