Files
fn_registry/python/functions/datascience/extract_entities_llm_test.py
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

165 lines
5.1 KiB
Python

"""Tests para extract_entities_llm."""
import warnings
import sys
import os
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from python.functions.datascience.extract_entities_llm import extract_entities_llm
from python.types.datascience.entity_candidate import EntityCandidate
SCHEMA = [
{
"type_ref": "osint_person_go_cybersecurity",
"label": "Person",
"metadata_fields": ["full_name", "alias", "nationality", "dob", "risk_score"],
},
{
"type_ref": "osint_domain_go_cybersecurity",
"label": "Domain",
"metadata_fields": ["fqdn", "registrar", "created_date"],
},
]
def make_llm(response: dict):
"""Crea un stub de LLM que retorna la respuesta dada."""
def _llm(messages: list[dict]) -> dict:
return response
return _llm
def test_texto_con_entidades_claras_retorna_entity_candidate():
"""texto con entidades claras retorna EntityCandidate"""
llm = make_llm({
"entities": [
{
"name": "John Smith",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {"full_name": "John Smith", "nationality": "US"},
"confidence": 0.95,
},
{
"name": "evil-corp.com",
"type_ref": "osint_domain_go_cybersecurity",
"attributes": {"fqdn": "evil-corp.com"},
"confidence": 0.88,
},
]
})
result = extract_entities_llm(
"John Smith, US citizen, linked to evil-corp.com.", SCHEMA, llm
)
assert len(result) == 2
person = next(e for e in result if e.name == "John Smith")
assert person.type_ref == "osint_person_go_cybersecurity"
assert person.type_label == "Person"
assert person.attributes["full_name"] == "John Smith"
assert person.confidence == 0.95
domain = next(e for e in result if e.name == "evil-corp.com")
assert domain.type_ref == "osint_domain_go_cybersecurity"
assert domain.type_label == "Domain"
assert domain.attributes["fqdn"] == "evil-corp.com"
assert domain.confidence == 0.88
def test_texto_sin_entidades_retorna_lista_vacia():
"""texto sin entidades retorna lista vacia"""
llm = make_llm({"entities": []})
result = extract_entities_llm(
"The sky is blue and the grass is green.", SCHEMA, llm
)
assert result == []
def test_llm_retorna_json_mal_formado_retorna_lista_vacia_con_warning():
"""llm retorna json mal formado retorna lista vacia con warning"""
def bad_llm(messages: list[dict]) -> dict:
raise ValueError("JSON decode error")
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = extract_entities_llm("Some text with entities.", SCHEMA, bad_llm)
assert result == []
assert len(caught) == 1
assert "error llamando al LLM" in str(caught[0].message)
def test_type_ref_invalido_en_respuesta_se_descarta_con_warning():
"""type_ref invalido en respuesta se descarta con warning"""
llm = make_llm({
"entities": [
{
"name": "Valid Person",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {},
"confidence": 0.9,
},
{
"name": "Unknown Thing",
"type_ref": "nonexistent_type_ref",
"attributes": {},
"confidence": 0.8,
},
]
})
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
result = extract_entities_llm("Text with entities.", SCHEMA, llm)
assert len(result) == 1
assert result[0].name == "Valid Person"
assert any("nonexistent_type_ref" in str(w.message) for w in caught)
def test_confidence_se_propaga_correctamente():
"""confidence se propaga correctamente"""
llm = make_llm({
"entities": [
{
"name": "Implied Person",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {},
"confidence": 0.7,
},
{
"name": "Weakly Implied Domain",
"type_ref": "osint_domain_go_cybersecurity",
"attributes": {},
"confidence": 0.5,
},
{
"name": "Explicit Entity",
"type_ref": "osint_person_go_cybersecurity",
"attributes": {},
"confidence": 1.0,
},
]
})
result = extract_entities_llm("Some text.", SCHEMA, llm)
assert len(result) == 3
confidences = {e.name: e.confidence for e in result}
assert confidences["Implied Person"] == 0.7
assert confidences["Weakly Implied Domain"] == 0.5
assert confidences["Explicit Entity"] == 1.0
def test_schema_vacio_lanza_value_error():
"""schema vacio lanza ValueError"""
llm = make_llm({"entities": []})
with pytest.raises(ValueError, match="entity_schema no puede estar vacio"):
extract_entities_llm("Some text.", [], llm)