Files
fn_registry/python/functions/pipelines/extraction_pipeline.md
T
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

124 lines
5.1 KiB
Markdown

---
name: extraction_pipeline
kind: pipeline
lang: py
domain: pipelines
version: "1.0.0"
purity: impure
signature: "def extraction_pipeline(file_path: str, entity_presets: list[dict], relation_types: list[str], llm_chat_json: Callable[[list[dict]], dict], chunk_size: int = 500, chunk_overlap: int = 50, confidence_threshold: float = 0.5, dedup_threshold: float = 0.85, on_progress: Callable[[str, float], None] | None = None) -> ExtractionResult"
description: "Pipeline completa de extraccion de entidades y relaciones desde un documento. Orquesta extract_text_from_file -> preprocess_text -> split_text_into_chunks -> extract_entities_llm por chunk -> deduplicate_entities -> extract_relations_llm por chunk -> deduplicate_relations."
tags: [pipeline, extraction, entities, relations, llm, nlp, fuzzygraph, datascience]
uses_functions:
- extract_text_from_file_py_core
- preprocess_text_py_core
- split_text_into_chunks_py_core
- build_entity_schema_prompt_py_datascience
- build_relation_schema_prompt_py_datascience
- extract_entities_llm_py_datascience
- extract_relations_llm_py_datascience
- deduplicate_entities_py_datascience
- deduplicate_relations_py_datascience
uses_types:
- entity_candidate_py_datascience
- extraction_result_py_datascience
- extraction_stats_py_datascience
- relation_candidate_py_datascience
returns:
- extraction_result_py_datascience
returns_optional: false
error_type: "error_go_core"
imports:
- time
- warnings
- typing.Callable
tested: true
tests:
- "documento con entidades y relaciones retorna ExtractionResult completo"
- "documento vacio retorna ExtractionResult con listas vacias"
- "documento sin entidades detectables retorna listas vacias"
- "archivo no encontrado lanza FileNotFoundError"
- "entity presets vacio lanza ValueError"
- "progress callback se invoca durante la ejecucion"
- "stats se rellenan correctamente con conteos y tiempo"
test_file_path: "python/functions/pipelines/extraction_pipeline_test.py"
file_path: "python/functions/pipelines/extraction_pipeline.py"
---
## Ejemplo
```python
from python.functions.pipelines.extraction_pipeline import extraction_pipeline
entity_presets = [
{
"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "alias", "nationality"],
},
{
"type_ref": "osint_domain_go_cybersecurity",
"label": "Domain",
"metadata_fields": ["fqdn", "registrar"],
},
]
relation_types = ["operates", "owns", "funds", "communicates_with", "related_to"]
# Inyectar un cliente LLM real
def llm_chat_json(messages):
# llamada al proveedor LLM elegido
...
result = extraction_pipeline(
file_path="report.pdf",
entity_presets=entity_presets,
relation_types=relation_types,
llm_chat_json=llm_chat_json,
chunk_size=500,
chunk_overlap=50,
confidence_threshold=0.5,
dedup_threshold=0.85,
on_progress=lambda msg, pct: print(f"[{pct:.0%}] {msg}"),
)
print(f"Entities: {len(result.entities)}, Relations: {len(result.relations)}")
print(f"Stats: {result.stats}")
# Integrar con fuzzygraph / operations.db
for entity in result.entities:
db.add_entity(
name=entity.name,
type_ref=entity.type_ref,
metadata=entity.attributes,
)
for relation in result.relations:
db.add_relation(
name=relation.relation_type,
from_entity=relation.from_id,
to_entity=relation.to_id,
)
```
## Algoritmo
1. **Extract:** `extract_text_from_file(file_path)` — texto crudo desde PDF, TXT, Markdown
2. **Preprocess:** `preprocess_text(text)` — normaliza espacios, caracteres especiales
3. **Split:** `split_text_into_chunks(text, chunk_size, chunk_overlap)` — divide en ventanas solapadas
4. **Extract entities per chunk (0-40%):** Para cada chunk llama `extract_entities_llm` con el schema de presets. Anota `source_chunk_index` en cada candidato
5. **Filter:** filtra por `confidence >= confidence_threshold`
6. **Deduplicate entities (40%):** `deduplicate_entities` con fuzzy matching, produce `entity_id_map`
7. **Extract relations per chunk (40-80%):** Para cada chunk obtiene las entidades de ese chunk y llama `extract_relations_llm`
8. **Deduplicate relations (80-100%):** `deduplicate_relations` resuelve nombres a IDs y colapsa duplicados
9. **Return:** `ExtractionResult` con entidades, relaciones y stats del proceso
## Notas
- El parametro `llm_chat_json` inyecta el cliente LLM, sin acoplamiento a ningun proveedor (OpenAI, Anthropic, Ollama, etc.)
- El progress callback cubre: 0-40% extraccion de entidades, 40-80% extraccion de relaciones, 80-100% deduplicacion
- Si el archivo no existe lanza `FileNotFoundError` antes de cualquier llamada al LLM
- Si `entity_presets` esta vacio lanza `ValueError`
- Errores en chunks individuales se capturan con warnings y continuan (robustez)
- Los `entity_id_map` de `deduplicate_entities` conectan nombres originales del texto con IDs UUID finales para `deduplicate_relations`
- La retorna `ExtractionResult` esta lista para insertar en `operations.db` via `fn ops entity add` / `fn ops relation add`