837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
212 lines
9.1 KiB
Python
212 lines
9.1 KiB
Python
"""Pipeline de extraccion de entidades y relaciones desde un documento."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import warnings
|
|
from typing import Callable
|
|
|
|
# Soporte para ejecucion desde la raiz del registry o desde el directorio del archivo
|
|
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
if _ROOT not in sys.path:
|
|
sys.path.insert(0, _ROOT)
|
|
|
|
from python.functions.core.extract_text_from_file import extract_text_from_file
|
|
from python.functions.core.core import preprocess_text
|
|
from python.functions.core.split_text_into_chunks import split_text_into_chunks
|
|
from python.functions.datascience.build_entity_schema_prompt import build_entity_schema_prompt
|
|
from python.functions.datascience.build_relation_schema_prompt import build_relation_schema_prompt
|
|
from python.functions.datascience.extract_entities_llm import extract_entities_llm
|
|
from python.functions.datascience.extract_relations_llm import extract_relations_llm
|
|
from python.functions.datascience.deduplicate_entities import deduplicate_entities
|
|
from python.functions.datascience.deduplicate_relations import deduplicate_relations
|
|
from python.types.datascience.entity_candidate import EntityCandidate
|
|
from python.types.datascience.extraction_result import ExtractionResult
|
|
from python.types.datascience.extraction_stats import ExtractionStats
|
|
|
|
|
|
def extraction_pipeline(
|
|
file_path: str,
|
|
entity_presets: list[dict],
|
|
relation_types: list[str],
|
|
llm_chat_json: Callable[[list[dict]], dict],
|
|
chunk_size: int = 500,
|
|
chunk_overlap: int = 50,
|
|
confidence_threshold: float = 0.5,
|
|
dedup_threshold: float = 0.85,
|
|
on_progress: Callable[[str, float], None] | None = None,
|
|
) -> ExtractionResult:
|
|
"""Pipeline completa de extraccion de entidades y relaciones desde un documento.
|
|
|
|
Orquesta extract_text_from_file -> preprocess_text -> split_text_into_chunks
|
|
-> extract_entities_llm por chunk -> deduplicate_entities ->
|
|
extract_relations_llm por chunk -> deduplicate_relations.
|
|
|
|
Args:
|
|
file_path: ruta al archivo a procesar (PDF, Markdown, TXT).
|
|
entity_presets: lista de dicts con type_ref, label y metadata_fields.
|
|
Ejemplo: [{"type_ref": "osint_person_go_cybersecurity",
|
|
"label": "Person",
|
|
"metadata_fields": ["full_name", "nationality"]}]
|
|
relation_types: tipos de relacion permitidos para extraccion.
|
|
Ejemplo: ["funds", "employs", "communicates_with", "owns"]
|
|
llm_chat_json: funcion inyectada que recibe messages OpenAI y retorna dict
|
|
con la respuesta JSON ya parseada. Sin acoplamiento a ningun proveedor.
|
|
chunk_size: numero de caracteres por chunk (default 500).
|
|
chunk_overlap: overlap entre chunks consecutivos (default 50).
|
|
confidence_threshold: umbral minimo de confidence para aceptar entidades
|
|
candidatas antes de deduplicar (default 0.5).
|
|
dedup_threshold: score minimo de similitud para mergear entidades (default 0.85).
|
|
on_progress: callback opcional de progreso (message: str, pct: float 0-1).
|
|
0-40%: extraccion de entidades, 40-80%: extraccion de relaciones,
|
|
80-100%: deduplicacion.
|
|
|
|
Returns:
|
|
ExtractionResult con entidades y relaciones deduplicadas y stats del proceso.
|
|
|
|
Raises:
|
|
FileNotFoundError: si file_path no existe.
|
|
ValueError: si entity_presets esta vacio.
|
|
"""
|
|
if not entity_presets:
|
|
raise ValueError("entity_presets no puede estar vacio")
|
|
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"Archivo no encontrado: {file_path}")
|
|
|
|
def _progress(msg: str, pct: float) -> None:
|
|
if on_progress is not None:
|
|
try:
|
|
on_progress(msg, pct)
|
|
except Exception:
|
|
pass
|
|
|
|
start_time = time.monotonic()
|
|
stats = ExtractionStats()
|
|
|
|
# ── Paso 1: Extraer texto ──────────────────────────────────────────────────
|
|
_progress("Extracting text from file...", 0.0)
|
|
try:
|
|
raw_text = extract_text_from_file(file_path)
|
|
except Exception as exc:
|
|
warnings.warn(f"extraction_pipeline: error al extraer texto: {exc}")
|
|
raw_text = ""
|
|
|
|
# ── Paso 2: Preprocesar ────────────────────────────────────────────────────
|
|
clean_text = preprocess_text(raw_text)
|
|
stats.total_chars = len(clean_text)
|
|
|
|
# ── Paso 3: Dividir en chunks ──────────────────────────────────────────────
|
|
chunks = split_text_into_chunks(clean_text, chunk_size=chunk_size, overlap=chunk_overlap)
|
|
n = len(chunks)
|
|
stats.total_chunks = n
|
|
|
|
if n == 0:
|
|
stats.processing_time_seconds = time.monotonic() - start_time
|
|
return ExtractionResult(entities=[], relations=[], stats=stats)
|
|
|
|
# ── Paso 4: Extraer entidades por chunk ────────────────────────────────────
|
|
all_raw_entities: list[EntityCandidate] = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
_progress(f"Extracting entities from chunk {i + 1}/{n}", (i / n) * 0.4)
|
|
try:
|
|
candidates = extract_entities_llm(
|
|
text=chunk,
|
|
entity_schema=entity_presets,
|
|
llm_chat_json=llm_chat_json,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extraction_pipeline: error en extract_entities_llm chunk {i}: {exc}"
|
|
)
|
|
candidates = []
|
|
|
|
for candidate in candidates:
|
|
# Anotar el chunk de origen
|
|
if i not in candidate.source_chunk_indices:
|
|
candidate.source_chunk_indices.append(i)
|
|
all_raw_entities.append(candidate)
|
|
|
|
# ── Paso 5: Filtrar por confidence ─────────────────────────────────────────
|
|
filtered_entities = [
|
|
e for e in all_raw_entities if e.confidence >= confidence_threshold
|
|
]
|
|
stats.raw_entities_count = len(filtered_entities)
|
|
|
|
# Actualizar stats de tipos
|
|
for ent in filtered_entities:
|
|
stats.entity_types_found[ent.type_ref] = (
|
|
stats.entity_types_found.get(ent.type_ref, 0) + 1
|
|
)
|
|
|
|
# ── Paso 6: Deduplicar entidades ───────────────────────────────────────────
|
|
_progress("Deduplicating entities...", 0.4)
|
|
dedup_result = deduplicate_entities(filtered_entities, name_threshold=dedup_threshold)
|
|
|
|
stats.final_entities_count = dedup_result.total_after
|
|
stats.entities_merged = dedup_result.total_before - dedup_result.total_after
|
|
|
|
final_entities = dedup_result.entities
|
|
entity_id_map = dedup_result.name_to_id # nombre_original -> entity_id
|
|
|
|
# ── Paso 7: Extraer relaciones por chunk ───────────────────────────────────
|
|
all_raw_relations = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
_progress(f"Extracting relations...", 0.4 + (i / n) * 0.4)
|
|
|
|
# Obtener entidades relevantes de este chunk
|
|
chunk_entities = [
|
|
e for e in final_entities if i in e.source_chunk_indices
|
|
]
|
|
# Si no hay entidades en este chunk especifico, usar todas
|
|
if not chunk_entities:
|
|
chunk_entities = final_entities
|
|
|
|
if len(chunk_entities) < 2:
|
|
continue
|
|
|
|
try:
|
|
chunk_relations = extract_relations_llm(
|
|
text=chunk,
|
|
entities=chunk_entities,
|
|
relation_types=relation_types,
|
|
llm_chat_json=llm_chat_json,
|
|
)
|
|
except Exception as exc:
|
|
warnings.warn(
|
|
f"extraction_pipeline: error en extract_relations_llm chunk {i}: {exc}"
|
|
)
|
|
chunk_relations = []
|
|
|
|
for rel in chunk_relations:
|
|
rel.source_chunk_index = i
|
|
all_raw_relations.extend(chunk_relations)
|
|
|
|
stats.raw_relations_count = len(all_raw_relations)
|
|
|
|
# Actualizar stats de tipos de relacion
|
|
for rel in all_raw_relations:
|
|
stats.relation_types_found[rel.relation_type] = (
|
|
stats.relation_types_found.get(rel.relation_type, 0) + 1
|
|
)
|
|
|
|
# ── Paso 8: Deduplicar relaciones ──────────────────────────────────────────
|
|
_progress("Deduplicating relations...", 0.8)
|
|
final_relations = deduplicate_relations(all_raw_relations, entity_id_map)
|
|
|
|
stats.final_relations_count = len(final_relations)
|
|
stats.relations_merged = stats.raw_relations_count - len(final_relations)
|
|
stats.processing_time_seconds = time.monotonic() - start_time
|
|
|
|
_progress("Done", 1.0)
|
|
|
|
return ExtractionResult(
|
|
entities=final_entities,
|
|
relations=final_relations,
|
|
stats=stats,
|
|
)
|