feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,164 @@
|
||||
"""Tests para extract_entities_llm."""
|
||||
|
||||
import warnings
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
||||
|
||||
from python.functions.datascience.extract_entities_llm import extract_entities_llm
|
||||
from python.types.datascience.entity_candidate import EntityCandidate
|
||||
|
||||
SCHEMA = [
|
||||
{
|
||||
"type_ref": "osint_person_go_cybersecurity",
|
||||
"label": "Person",
|
||||
"metadata_fields": ["full_name", "alias", "nationality", "dob", "risk_score"],
|
||||
},
|
||||
{
|
||||
"type_ref": "osint_domain_go_cybersecurity",
|
||||
"label": "Domain",
|
||||
"metadata_fields": ["fqdn", "registrar", "created_date"],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def make_llm(response: dict):
|
||||
"""Crea un stub de LLM que retorna la respuesta dada."""
|
||||
def _llm(messages: list[dict]) -> dict:
|
||||
return response
|
||||
return _llm
|
||||
|
||||
|
||||
def test_texto_con_entidades_claras_retorna_entity_candidate():
|
||||
"""texto con entidades claras retorna EntityCandidate"""
|
||||
llm = make_llm({
|
||||
"entities": [
|
||||
{
|
||||
"name": "John Smith",
|
||||
"type_ref": "osint_person_go_cybersecurity",
|
||||
"attributes": {"full_name": "John Smith", "nationality": "US"},
|
||||
"confidence": 0.95,
|
||||
},
|
||||
{
|
||||
"name": "evil-corp.com",
|
||||
"type_ref": "osint_domain_go_cybersecurity",
|
||||
"attributes": {"fqdn": "evil-corp.com"},
|
||||
"confidence": 0.88,
|
||||
},
|
||||
]
|
||||
})
|
||||
|
||||
result = extract_entities_llm(
|
||||
"John Smith, US citizen, linked to evil-corp.com.", SCHEMA, llm
|
||||
)
|
||||
|
||||
assert len(result) == 2
|
||||
|
||||
person = next(e for e in result if e.name == "John Smith")
|
||||
assert person.type_ref == "osint_person_go_cybersecurity"
|
||||
assert person.type_label == "Person"
|
||||
assert person.attributes["full_name"] == "John Smith"
|
||||
assert person.confidence == 0.95
|
||||
|
||||
domain = next(e for e in result if e.name == "evil-corp.com")
|
||||
assert domain.type_ref == "osint_domain_go_cybersecurity"
|
||||
assert domain.type_label == "Domain"
|
||||
assert domain.attributes["fqdn"] == "evil-corp.com"
|
||||
assert domain.confidence == 0.88
|
||||
|
||||
|
||||
def test_texto_sin_entidades_retorna_lista_vacia():
|
||||
"""texto sin entidades retorna lista vacia"""
|
||||
llm = make_llm({"entities": []})
|
||||
|
||||
result = extract_entities_llm(
|
||||
"The sky is blue and the grass is green.", SCHEMA, llm
|
||||
)
|
||||
|
||||
assert result == []
|
||||
|
||||
|
||||
def test_llm_retorna_json_mal_formado_retorna_lista_vacia_con_warning():
|
||||
"""llm retorna json mal formado retorna lista vacia con warning"""
|
||||
def bad_llm(messages: list[dict]) -> dict:
|
||||
raise ValueError("JSON decode error")
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught:
|
||||
warnings.simplefilter("always")
|
||||
result = extract_entities_llm("Some text with entities.", SCHEMA, bad_llm)
|
||||
|
||||
assert result == []
|
||||
assert len(caught) == 1
|
||||
assert "error llamando al LLM" in str(caught[0].message)
|
||||
|
||||
|
||||
def test_type_ref_invalido_en_respuesta_se_descarta_con_warning():
|
||||
"""type_ref invalido en respuesta se descarta con warning"""
|
||||
llm = make_llm({
|
||||
"entities": [
|
||||
{
|
||||
"name": "Valid Person",
|
||||
"type_ref": "osint_person_go_cybersecurity",
|
||||
"attributes": {},
|
||||
"confidence": 0.9,
|
||||
},
|
||||
{
|
||||
"name": "Unknown Thing",
|
||||
"type_ref": "nonexistent_type_ref",
|
||||
"attributes": {},
|
||||
"confidence": 0.8,
|
||||
},
|
||||
]
|
||||
})
|
||||
|
||||
with warnings.catch_warnings(record=True) as caught:
|
||||
warnings.simplefilter("always")
|
||||
result = extract_entities_llm("Text with entities.", SCHEMA, llm)
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].name == "Valid Person"
|
||||
assert any("nonexistent_type_ref" in str(w.message) for w in caught)
|
||||
|
||||
|
||||
def test_confidence_se_propaga_correctamente():
|
||||
"""confidence se propaga correctamente"""
|
||||
llm = make_llm({
|
||||
"entities": [
|
||||
{
|
||||
"name": "Implied Person",
|
||||
"type_ref": "osint_person_go_cybersecurity",
|
||||
"attributes": {},
|
||||
"confidence": 0.7,
|
||||
},
|
||||
{
|
||||
"name": "Weakly Implied Domain",
|
||||
"type_ref": "osint_domain_go_cybersecurity",
|
||||
"attributes": {},
|
||||
"confidence": 0.5,
|
||||
},
|
||||
{
|
||||
"name": "Explicit Entity",
|
||||
"type_ref": "osint_person_go_cybersecurity",
|
||||
"attributes": {},
|
||||
"confidence": 1.0,
|
||||
},
|
||||
]
|
||||
})
|
||||
|
||||
result = extract_entities_llm("Some text.", SCHEMA, llm)
|
||||
|
||||
assert len(result) == 3
|
||||
confidences = {e.name: e.confidence for e in result}
|
||||
assert confidences["Implied Person"] == 0.7
|
||||
assert confidences["Weakly Implied Domain"] == 0.5
|
||||
assert confidences["Explicit Entity"] == 1.0
|
||||
|
||||
|
||||
def test_schema_vacio_lanza_value_error():
|
||||
"""schema vacio lanza ValueError"""
|
||||
llm = make_llm({"entities": []})
|
||||
|
||||
with pytest.raises(ValueError, match="entity_schema no puede estar vacio"):
|
||||
extract_entities_llm("Some text.", [], llm)
|
||||
Reference in New Issue
Block a user