63a9cb5273
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
165 lines
5.1 KiB
Python
165 lines
5.1 KiB
Python
"""Tests para extract_entities_llm."""
|
|
|
|
import warnings
|
|
import sys
|
|
import os
|
|
import pytest
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
|
|
from python.functions.datascience.extract_entities_llm import extract_entities_llm
|
|
from python.types.datascience.entity_candidate import EntityCandidate
|
|
|
|
SCHEMA = [
|
|
{
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"label": "Person",
|
|
"metadata_fields": ["full_name", "alias", "nationality", "dob", "risk_score"],
|
|
},
|
|
{
|
|
"type_ref": "osint_domain_go_cybersecurity",
|
|
"label": "Domain",
|
|
"metadata_fields": ["fqdn", "registrar", "created_date"],
|
|
},
|
|
]
|
|
|
|
|
|
def make_llm(response: dict):
|
|
"""Crea un stub de LLM que retorna la respuesta dada."""
|
|
def _llm(messages: list[dict]) -> dict:
|
|
return response
|
|
return _llm
|
|
|
|
|
|
def test_texto_con_entidades_claras_retorna_entity_candidate():
|
|
"""texto con entidades claras retorna EntityCandidate"""
|
|
llm = make_llm({
|
|
"entities": [
|
|
{
|
|
"name": "John Smith",
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"attributes": {"full_name": "John Smith", "nationality": "US"},
|
|
"confidence": 0.95,
|
|
},
|
|
{
|
|
"name": "evil-corp.com",
|
|
"type_ref": "osint_domain_go_cybersecurity",
|
|
"attributes": {"fqdn": "evil-corp.com"},
|
|
"confidence": 0.88,
|
|
},
|
|
]
|
|
})
|
|
|
|
result = extract_entities_llm(
|
|
"John Smith, US citizen, linked to evil-corp.com.", SCHEMA, llm
|
|
)
|
|
|
|
assert len(result) == 2
|
|
|
|
person = next(e for e in result if e.name == "John Smith")
|
|
assert person.type_ref == "osint_person_go_cybersecurity"
|
|
assert person.type_label == "Person"
|
|
assert person.attributes["full_name"] == "John Smith"
|
|
assert person.confidence == 0.95
|
|
|
|
domain = next(e for e in result if e.name == "evil-corp.com")
|
|
assert domain.type_ref == "osint_domain_go_cybersecurity"
|
|
assert domain.type_label == "Domain"
|
|
assert domain.attributes["fqdn"] == "evil-corp.com"
|
|
assert domain.confidence == 0.88
|
|
|
|
|
|
def test_texto_sin_entidades_retorna_lista_vacia():
|
|
"""texto sin entidades retorna lista vacia"""
|
|
llm = make_llm({"entities": []})
|
|
|
|
result = extract_entities_llm(
|
|
"The sky is blue and the grass is green.", SCHEMA, llm
|
|
)
|
|
|
|
assert result == []
|
|
|
|
|
|
def test_llm_retorna_json_mal_formado_retorna_lista_vacia_con_warning():
|
|
"""llm retorna json mal formado retorna lista vacia con warning"""
|
|
def bad_llm(messages: list[dict]) -> dict:
|
|
raise ValueError("JSON decode error")
|
|
|
|
with warnings.catch_warnings(record=True) as caught:
|
|
warnings.simplefilter("always")
|
|
result = extract_entities_llm("Some text with entities.", SCHEMA, bad_llm)
|
|
|
|
assert result == []
|
|
assert len(caught) == 1
|
|
assert "error llamando al LLM" in str(caught[0].message)
|
|
|
|
|
|
def test_type_ref_invalido_en_respuesta_se_descarta_con_warning():
|
|
"""type_ref invalido en respuesta se descarta con warning"""
|
|
llm = make_llm({
|
|
"entities": [
|
|
{
|
|
"name": "Valid Person",
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"attributes": {},
|
|
"confidence": 0.9,
|
|
},
|
|
{
|
|
"name": "Unknown Thing",
|
|
"type_ref": "nonexistent_type_ref",
|
|
"attributes": {},
|
|
"confidence": 0.8,
|
|
},
|
|
]
|
|
})
|
|
|
|
with warnings.catch_warnings(record=True) as caught:
|
|
warnings.simplefilter("always")
|
|
result = extract_entities_llm("Text with entities.", SCHEMA, llm)
|
|
|
|
assert len(result) == 1
|
|
assert result[0].name == "Valid Person"
|
|
assert any("nonexistent_type_ref" in str(w.message) for w in caught)
|
|
|
|
|
|
def test_confidence_se_propaga_correctamente():
|
|
"""confidence se propaga correctamente"""
|
|
llm = make_llm({
|
|
"entities": [
|
|
{
|
|
"name": "Implied Person",
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"attributes": {},
|
|
"confidence": 0.7,
|
|
},
|
|
{
|
|
"name": "Weakly Implied Domain",
|
|
"type_ref": "osint_domain_go_cybersecurity",
|
|
"attributes": {},
|
|
"confidence": 0.5,
|
|
},
|
|
{
|
|
"name": "Explicit Entity",
|
|
"type_ref": "osint_person_go_cybersecurity",
|
|
"attributes": {},
|
|
"confidence": 1.0,
|
|
},
|
|
]
|
|
})
|
|
|
|
result = extract_entities_llm("Some text.", SCHEMA, llm)
|
|
|
|
assert len(result) == 3
|
|
confidences = {e.name: e.confidence for e in result}
|
|
assert confidences["Implied Person"] == 0.7
|
|
assert confidences["Weakly Implied Domain"] == 0.5
|
|
assert confidences["Explicit Entity"] == 1.0
|
|
|
|
|
|
def test_schema_vacio_lanza_value_error():
|
|
"""schema vacio lanza ValueError"""
|
|
llm = make_llm({"entities": []})
|
|
|
|
with pytest.raises(ValueError, match="entity_schema no puede estar vacio"):
|
|
extract_entities_llm("Some text.", [], llm)
|