837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
228 lines
7.6 KiB
Python
228 lines
7.6 KiB
Python
"""Tests para extraction_pipeline."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
if _ROOT not in sys.path:
|
|
sys.path.insert(0, _ROOT)
|
|
|
|
from python.functions.pipelines.extraction_pipeline import extraction_pipeline
|
|
|
|
|
|
# ── LLM stubs ─────────────────────────────────────────────────────────────────
|
|
|
|
def _llm_with_entities(messages: list[dict]) -> dict:
|
|
"""LLM stub que retorna entidades fijas para el primer mensaje de extraccion."""
|
|
system_content = messages[0]["content"] if messages else ""
|
|
if "entity" in system_content.lower() or "entities" in system_content.lower():
|
|
return {
|
|
"entities": [
|
|
{
|
|
"name": "John Smith",
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"attributes": {"full_name": "John Smith", "nationality": "US"},
|
|
"confidence": 0.95,
|
|
},
|
|
{
|
|
"name": "evil-corp.com",
|
|
"type_ref": "osint_domain_go_cybersecurity",
|
|
"attributes": {"fqdn": "evil-corp.com"},
|
|
"confidence": 0.88,
|
|
},
|
|
]
|
|
}
|
|
# Llamada de relaciones
|
|
return {
|
|
"relations": [
|
|
{
|
|
"from_name": "John Smith",
|
|
"to_name": "evil-corp.com",
|
|
"relation_type": "operates",
|
|
"description": "John Smith operates evil-corp.com",
|
|
"confidence": 0.8,
|
|
}
|
|
]
|
|
}
|
|
|
|
|
|
def _llm_empty(messages: list[dict]) -> dict:
|
|
"""LLM stub que retorna siempre resultado vacio."""
|
|
system_content = messages[0]["content"] if messages else ""
|
|
if "entit" in system_content.lower():
|
|
return {"entities": []}
|
|
return {"relations": []}
|
|
|
|
|
|
ENTITY_PRESETS = [
|
|
{
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"label": "Person",
|
|
"metadata_fields": ["full_name", "alias", "nationality"],
|
|
},
|
|
{
|
|
"type_ref": "osint_domain_go_cybersecurity",
|
|
"label": "Domain",
|
|
"metadata_fields": ["fqdn", "registrar"],
|
|
},
|
|
]
|
|
|
|
RELATION_TYPES = ["operates", "owns", "funds", "communicates_with", "related_to"]
|
|
|
|
|
|
# ── Tests ──────────────────────────────────────────────────────────────────────
|
|
|
|
def test_documento_con_entidades_y_relaciones():
|
|
"""documento con entidades y relaciones retorna ExtractionResult completo"""
|
|
text = (
|
|
"John Smith, a US national, operates the domain evil-corp.com. "
|
|
"He was identified as the main administrator of the infrastructure."
|
|
)
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write(text)
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
result = extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_with_entities,
|
|
chunk_size=500,
|
|
chunk_overlap=50,
|
|
confidence_threshold=0.5,
|
|
dedup_threshold=0.85,
|
|
)
|
|
assert result is not None
|
|
assert len(result.entities) >= 1
|
|
assert result.stats.total_chunks >= 1
|
|
assert result.stats.total_chars > 0
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
def test_documento_vacio():
|
|
"""documento vacio retorna ExtractionResult con listas vacias"""
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write("")
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
result = extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_empty,
|
|
)
|
|
assert result is not None
|
|
assert result.entities == []
|
|
assert result.relations == []
|
|
assert result.stats.total_chunks == 0
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
def test_documento_sin_entidades_detectables():
|
|
"""documento sin entidades detectables retorna listas vacias"""
|
|
text = "The weather is nice today. The sun shines brightly over the mountains."
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write(text)
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
result = extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_empty,
|
|
confidence_threshold=0.5,
|
|
)
|
|
assert result is not None
|
|
assert result.entities == []
|
|
assert result.relations == []
|
|
assert result.stats.raw_entities_count == 0
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
def test_archivo_no_encontrado_lanza_filenotfounderror():
|
|
"""archivo no encontrado lanza FileNotFoundError"""
|
|
import pytest
|
|
with pytest.raises(FileNotFoundError):
|
|
extraction_pipeline(
|
|
file_path="/tmp/no_existe_para_test_extraccion_pipeline.txt",
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_empty,
|
|
)
|
|
|
|
|
|
def test_entity_presets_vacio_lanza_valueerror():
|
|
"""entity presets vacio lanza ValueError"""
|
|
import pytest
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write("some text")
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
with pytest.raises(ValueError):
|
|
extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=[],
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_empty,
|
|
)
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
def test_progress_callback_se_invoca():
|
|
"""progress callback se invoca durante la ejecucion"""
|
|
calls: list[tuple[str, float]] = []
|
|
|
|
def _on_progress(msg: str, pct: float) -> None:
|
|
calls.append((msg, pct))
|
|
|
|
text = "John Smith operates evil-corp.com."
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write(text)
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_with_entities,
|
|
on_progress=_on_progress,
|
|
)
|
|
assert len(calls) > 0
|
|
messages = [c[0] for c in calls]
|
|
assert any("Extracting" in m or "Done" in m or "Dedup" in m for m in messages)
|
|
finally:
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
def test_stats_se_rellenan_correctamente():
|
|
"""stats se rellenan correctamente con conteos y tiempo"""
|
|
text = "John Smith, a US national, operates the domain evil-corp.com."
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f:
|
|
f.write(text)
|
|
tmp_path = f.name
|
|
|
|
try:
|
|
result = extraction_pipeline(
|
|
file_path=tmp_path,
|
|
entity_presets=ENTITY_PRESETS,
|
|
relation_types=RELATION_TYPES,
|
|
llm_chat_json=_llm_with_entities,
|
|
)
|
|
assert result.stats.total_chars > 0
|
|
assert result.stats.total_chunks >= 1
|
|
assert result.stats.processing_time_seconds >= 0.0
|
|
finally:
|
|
os.unlink(tmp_path)
|