Files
ontology_graph/lib/extraction_pipeline.py
T
fn-registry agent 40bea81603 chore: initial sync
2026-04-28 22:13:08 +02:00

209 lines
8.6 KiB
Python

"""Pipeline de extraccion de entidades y relaciones desde un documento."""
from __future__ import annotations
import sys
import os
import time
import warnings
from typing import Callable
# Soporte para ejecucion desde la raiz del registry o desde el directorio del archivo
from extract_text_from_file import extract_text_from_file
from core_functions import preprocess_text
from split_text_into_chunks import split_text_into_chunks
from build_entity_schema_prompt import build_entity_schema_prompt
from build_relation_schema_prompt import build_relation_schema_prompt
from extract_entities_llm import extract_entities_llm
from extract_relations_llm import extract_relations_llm
from deduplicate_entities import deduplicate_entities
from deduplicate_relations import deduplicate_relations
from entity_candidate import EntityCandidate
from extraction_result import ExtractionResult
from extraction_stats import ExtractionStats
def extraction_pipeline(
file_path: str,
entity_presets: list[dict],
relation_types: list[str],
llm_chat_json: Callable[[list[dict]], dict],
chunk_size: int = 500,
chunk_overlap: int = 50,
confidence_threshold: float = 0.5,
dedup_threshold: float = 0.85,
on_progress: Callable[[str, float], None] | None = None,
) -> ExtractionResult:
"""Pipeline completa de extraccion de entidades y relaciones desde un documento.
Orquesta extract_text_from_file -> preprocess_text -> split_text_into_chunks
-> extract_entities_llm por chunk -> deduplicate_entities ->
extract_relations_llm por chunk -> deduplicate_relations.
Args:
file_path: ruta al archivo a procesar (PDF, Markdown, TXT).
entity_presets: lista de dicts con type_ref, label y metadata_fields.
Ejemplo: [{"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "nationality"]}]
relation_types: tipos de relacion permitidos para extraccion.
Ejemplo: ["funds", "employs", "communicates_with", "owns"]
llm_chat_json: funcion inyectada que recibe messages OpenAI y retorna dict
con la respuesta JSON ya parseada. Sin acoplamiento a ningun proveedor.
chunk_size: numero de caracteres por chunk (default 500).
chunk_overlap: overlap entre chunks consecutivos (default 50).
confidence_threshold: umbral minimo de confidence para aceptar entidades
candidatas antes de deduplicar (default 0.5).
dedup_threshold: score minimo de similitud para mergear entidades (default 0.85).
on_progress: callback opcional de progreso (message: str, pct: float 0-1).
0-40%: extraccion de entidades, 40-80%: extraccion de relaciones,
80-100%: deduplicacion.
Returns:
ExtractionResult con entidades y relaciones deduplicadas y stats del proceso.
Raises:
FileNotFoundError: si file_path no existe.
ValueError: si entity_presets esta vacio.
"""
if not entity_presets:
raise ValueError("entity_presets no puede estar vacio")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
def _progress(msg: str, pct: float) -> None:
if on_progress is not None:
try:
on_progress(msg, pct)
except Exception:
pass
start_time = time.monotonic()
stats = ExtractionStats()
# ── Paso 1: Extraer texto ──────────────────────────────────────────────────
_progress("Extracting text from file...", 0.0)
try:
raw_text = extract_text_from_file(file_path)
except Exception as exc:
warnings.warn(f"extraction_pipeline: error al extraer texto: {exc}")
raw_text = ""
# ── Paso 2: Preprocesar ────────────────────────────────────────────────────
clean_text = preprocess_text(raw_text)
stats.total_chars = len(clean_text)
# ── Paso 3: Dividir en chunks ──────────────────────────────────────────────
chunks = split_text_into_chunks(clean_text, chunk_size=chunk_size, overlap=chunk_overlap)
n = len(chunks)
stats.total_chunks = n
if n == 0:
stats.processing_time_seconds = time.monotonic() - start_time
return ExtractionResult(entities=[], relations=[], stats=stats)
# ── Paso 4: Extraer entidades por chunk ────────────────────────────────────
all_raw_entities: list[EntityCandidate] = []
for i, chunk in enumerate(chunks):
_progress(f"Extracting entities from chunk {i + 1}/{n}", (i / n) * 0.4)
try:
candidates = extract_entities_llm(
text=chunk,
entity_schema=entity_presets,
llm_chat_json=llm_chat_json,
)
except Exception as exc:
warnings.warn(
f"extraction_pipeline: error en extract_entities_llm chunk {i}: {exc}"
)
candidates = []
for candidate in candidates:
# Anotar el chunk de origen
if i not in candidate.source_chunk_indices:
candidate.source_chunk_indices.append(i)
all_raw_entities.append(candidate)
# ── Paso 5: Filtrar por confidence ─────────────────────────────────────────
filtered_entities = [
e for e in all_raw_entities if e.confidence >= confidence_threshold
]
stats.raw_entities_count = len(filtered_entities)
# Actualizar stats de tipos
for ent in filtered_entities:
stats.entity_types_found[ent.type_ref] = (
stats.entity_types_found.get(ent.type_ref, 0) + 1
)
# ── Paso 6: Deduplicar entidades ───────────────────────────────────────────
_progress("Deduplicating entities...", 0.4)
dedup_result = deduplicate_entities(filtered_entities, name_threshold=dedup_threshold)
stats.final_entities_count = dedup_result.total_after
stats.entities_merged = dedup_result.total_before - dedup_result.total_after
final_entities = dedup_result.entities
entity_id_map = dedup_result.name_to_id # nombre_original -> entity_id
# ── Paso 7: Extraer relaciones por chunk ───────────────────────────────────
all_raw_relations = []
for i, chunk in enumerate(chunks):
_progress(f"Extracting relations...", 0.4 + (i / n) * 0.4)
# Obtener entidades relevantes de este chunk
chunk_entities = [
e for e in final_entities if i in e.source_chunk_indices
]
# Si no hay entidades en este chunk especifico, usar todas
if not chunk_entities:
chunk_entities = final_entities
if len(chunk_entities) < 2:
continue
try:
chunk_relations = extract_relations_llm(
text=chunk,
entities=chunk_entities,
relation_types=relation_types,
llm_chat_json=llm_chat_json,
)
except Exception as exc:
warnings.warn(
f"extraction_pipeline: error en extract_relations_llm chunk {i}: {exc}"
)
chunk_relations = []
for rel in chunk_relations:
rel.source_chunk_index = i
all_raw_relations.extend(chunk_relations)
stats.raw_relations_count = len(all_raw_relations)
# Actualizar stats de tipos de relacion
for rel in all_raw_relations:
stats.relation_types_found[rel.relation_type] = (
stats.relation_types_found.get(rel.relation_type, 0) + 1
)
# ── Paso 8: Deduplicar relaciones ──────────────────────────────────────────
_progress("Deduplicating relations...", 0.8)
final_relations = deduplicate_relations(all_raw_relations, entity_id_map)
stats.final_relations_count = len(final_relations)
stats.relations_merged = stats.raw_relations_count - len(final_relations)
stats.processing_time_seconds = time.monotonic() - start_time
_progress("Done", 1.0)
return ExtractionResult(
entities=final_entities,
relations=final_relations,
stats=stats,
)