Files
fn_registry/python/functions/pipelines/extraction_pipeline_test.py
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

228 lines
7.6 KiB
Python

"""Tests para extraction_pipeline."""
from __future__ import annotations
import os
import sys
import tempfile
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from python.functions.pipelines.extraction_pipeline import extraction_pipeline
# ── LLM stubs ─────────────────────────────────────────────────────────────────
def _llm_with_entities(messages: list[dict]) -> dict:
"""LLM stub que retorna entidades fijas para el primer mensaje de extraccion."""
system_content = messages[0]["content"] if messages else ""
if "entity" in system_content.lower() or "entities" in system_content.lower():
return {
"entities": [
{
"name": "John Smith",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {"full_name": "John Smith", "nationality": "US"},
"confidence": 0.95,
},
{
"name": "evil-corp.com",
"type_ref": "osint_domain_go_cybersecurity",
"attributes": {"fqdn": "evil-corp.com"},
"confidence": 0.88,
},
]
}
# Llamada de relaciones
return {
"relations": [
{
"from_name": "John Smith",
"to_name": "evil-corp.com",
"relation_type": "operates",
"description": "John Smith operates evil-corp.com",
"confidence": 0.8,
}
]
}
def _llm_empty(messages: list[dict]) -> dict:
"""LLM stub que retorna siempre resultado vacio."""
system_content = messages[0]["content"] if messages else ""
if "entit" in system_content.lower():
return {"entities": []}
return {"relations": []}
ENTITY_PRESETS = [
{
"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "alias", "nationality"],
},
{
"type_ref": "osint_domain_go_cybersecurity",
"label": "Domain",
"metadata_fields": ["fqdn", "registrar"],
},
]
RELATION_TYPES = ["operates", "owns", "funds", "communicates_with", "related_to"]
# ── Tests ──────────────────────────────────────────────────────────────────────
def test_documento_con_entidades_y_relaciones():
"""documento con entidades y relaciones retorna ExtractionResult completo"""
text = (
"John Smith, a US national, operates the domain evil-corp.com. "
"He was identified as the main administrator of the infrastructure."
)
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
chunk_size=500,
chunk_overlap=50,
confidence_threshold=0.5,
dedup_threshold=0.85,
)
assert result is not None
assert len(result.entities) >= 1
assert result.stats.total_chunks >= 1
assert result.stats.total_chars > 0
finally:
os.unlink(tmp_path)
def test_documento_vacio():
"""documento vacio retorna ExtractionResult con listas vacias"""
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("")
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
assert result is not None
assert result.entities == []
assert result.relations == []
assert result.stats.total_chunks == 0
finally:
os.unlink(tmp_path)
def test_documento_sin_entidades_detectables():
"""documento sin entidades detectables retorna listas vacias"""
text = "The weather is nice today. The sun shines brightly over the mountains."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
confidence_threshold=0.5,
)
assert result is not None
assert result.entities == []
assert result.relations == []
assert result.stats.raw_entities_count == 0
finally:
os.unlink(tmp_path)
def test_archivo_no_encontrado_lanza_filenotfounderror():
"""archivo no encontrado lanza FileNotFoundError"""
import pytest
with pytest.raises(FileNotFoundError):
extraction_pipeline(
file_path="/tmp/no_existe_para_test_extraccion_pipeline.txt",
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
def test_entity_presets_vacio_lanza_valueerror():
"""entity presets vacio lanza ValueError"""
import pytest
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write("some text")
tmp_path = f.name
try:
with pytest.raises(ValueError):
extraction_pipeline(
file_path=tmp_path,
entity_presets=[],
relation_types=RELATION_TYPES,
llm_chat_json=_llm_empty,
)
finally:
os.unlink(tmp_path)
def test_progress_callback_se_invoca():
"""progress callback se invoca durante la ejecucion"""
calls: list[tuple[str, float]] = []
def _on_progress(msg: str, pct: float) -> None:
calls.append((msg, pct))
text = "John Smith operates evil-corp.com."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
on_progress=_on_progress,
)
assert len(calls) > 0
messages = [c[0] for c in calls]
assert any("Extracting" in m or "Done" in m or "Dedup" in m for m in messages)
finally:
os.unlink(tmp_path)
def test_stats_se_rellenan_correctamente():
"""stats se rellenan correctamente con conteos y tiempo"""
text = "John Smith, a US national, operates the domain evil-corp.com."
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
f.write(text)
tmp_path = f.name
try:
result = extraction_pipeline(
file_path=tmp_path,
entity_presets=ENTITY_PRESETS,
relation_types=RELATION_TYPES,
llm_chat_json=_llm_with_entities,
)
assert result.stats.total_chars > 0
assert result.stats.total_chunks >= 1
assert result.stats.processing_time_seconds >= 0.0
finally:
os.unlink(tmp_path)