feat: extraccion masiva footprint_aurgi (41 funcs + 4 types + stack Docker geo)

Extrae al registry funciones del proyecto interno footprint_aurgi:
- core (6): slugify_ascii, normalize_for_join, cp_provincia_es, infer_provincia_from_cp, safe_read_csv_fallback, csv_to_parquet_duckdb
- geo puras (7): haversine_km, point_in_ring, point_in_polygon, point_in_polygons_bbox, polygon_bbox, extent_with_padding, distance_bucket
- geo I/O (4): load_geojson_polygons, load_boundary_gdf, add_basemap_osm, add_basemap_with_timeout
- valhalla client (4): valhalla_route, valhalla_isochrone, valhalla_isochrones_async, valhalla_matrix_1_to_n
- datascience stats (7): trimmed_mean, geometric_mean, detect_distribution_type, best_central_tendency, summary_stats, kde_density_levels, alpha_shape_concave_hull
- datascience fuzzy (3): fuzzy_merge_adaptive (rapidfuzz), words_to_dataset, remove_words_from_column
- datascience viz (2): plot_kde_2d, plot_heatmap_log
- infra (4): compress_pdf_ghostscript, render_table_page_pdfpages, add_header_logo, osm2pgsql_ingest
- pipelines (4): setup_geo_stack_docker, compute_centers_reachability, generate_isochrones_by_zone, count_points_per_zone
- types geo (4): LonLat, BBox, IsochroneRequest, Centro

Incluye:
- apps/footprint_geo_stack/ (PostGIS + Martin + Valhalla via docker-compose)
- 131/132 tests pasan (1 skip esperado: osm2pgsql en PATH)
- Issue tracker dev/issues/0052-footprint-aurgi-extraction.md
- Atribucion uniforme: source_repo internal:footprint_aurgi, source_license internal-aurgi
- Build con 9 agentes en paralelo (8 wave 1 + 1 wave 2 pipelines)

Tambien commitea trabajo previo no commiteado: aggregate_extraction_results, chunk_with_overlap, clean_pdf_text, merge_entity_aliases, extract_graph_gliner2, extract_relations_mrebel, extract_triples_spacy_es, gliner2/mrebel/marianmt/rebel/spacy_es load_model, parse_rebel_output, translate_es_to_en, issue 0050/0051.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-04 23:35:22 +02:00
parent f73ea072bd
commit faac610745
193 changed files with 13146 additions and 3 deletions
@@ -0,0 +1,103 @@
"""Tests para align_relations_to_entities."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.align_relations_to_entities import align_relations_to_entities
def _t(head, head_type, relation, tail, tail_type):
return {
"head": head,
"head_type": head_type,
"type": relation,
"tail": tail,
"tail_type": tail_type,
}
def test_match_exacto_case_insensitive_resuelve_correctamente():
triplets = [_t("pablo isla", "per", "employer", "inditex", "org")]
entities = ["Pablo Isla", "Inditex"]
result = align_relations_to_entities(triplets, entities)
assert len(result) == 1
assert result[0]["from"] == "Pablo Isla"
assert result[0]["to"] == "Inditex"
assert result[0]["kind"] == "employer"
def test_substring_entity_en_span_del_head():
# mREBEL emite "esta en Bilbao" pero la entidad es "Bilbao"
triplets = [_t("esta en Bilbao", "loc", "located in", "Espana", "loc")]
entities = ["Bilbao", "Espana"]
result = align_relations_to_entities(triplets, entities)
assert len(result) == 1
assert result[0]["from"] == "Bilbao"
assert result[0]["to"] == "Espana"
def test_substring_span_dentro_del_nombre_de_entidad():
# El span "Santander" esta contenido en el entity name "Banco Santander"
triplets = [_t("Santander", "org", "owns", "Openbank", "org")]
entities = ["Banco Santander", "Openbank"]
result = align_relations_to_entities(triplets, entities)
assert len(result) == 1
assert result[0]["from"] == "Banco Santander"
assert result[0]["to"] == "Openbank"
def test_gana_nombre_de_entidad_mas_largo_en_ambiguedad():
# Dos entidades: "Madrid" y "Comunidad de Madrid". El span "Madrid" deberia
# preferir "Comunidad de Madrid" si ese es el mas largo y contiene "madrid".
# En la logica actual: substring bidireccional, gana el primero de names_by_len
# (que ordena DESC por len). "Comunidad de Madrid" es mas largo y su lower
# contiene "madrid", asi que gana.
triplets = [_t("Madrid", "loc", "capital of", "Espana", "loc")]
entities = ["Madrid", "Comunidad de Madrid", "Espana"]
result = align_relations_to_entities(triplets, entities)
assert len(result) == 1
# El exacto case-insensitive resuelve "Madrid" -> "Madrid" directamente
# (antes que la busqueda substring). Verificamos que no rompe y que
# from/to son valores de entities.
assert result[0]["from"] in entities
assert result[0]["to"] in entities
def test_triplet_sin_match_se_descarta():
triplets = [_t("Unknown Entity", "per", "works for", "Another Unknown", "org")]
entities = ["Pablo Isla", "Inditex"]
result = align_relations_to_entities(triplets, entities)
assert result == []
def test_triplet_con_head_igual_tail_se_descarta_self_loop():
triplets = [_t("Inditex", "org", "owns", "Inditex", "org")]
entities = ["Inditex", "Zara"]
result = align_relations_to_entities(triplets, entities)
assert result == []
def test_lista_triplets_vacia_retorna_vacia():
result = align_relations_to_entities([], ["Pablo Isla", "Inditex"])
assert result == []
def test_lista_entity_names_vacia_retorna_vacia():
triplets = [_t("Pablo Isla", "per", "employer", "Inditex", "org")]
result = align_relations_to_entities(triplets, [])
assert result == []
def test_multiples_triplets_con_mezcla_de_matches_y_descartes():
triplets = [
_t("Pablo Isla", "per", "employer", "Inditex", "org"), # match
_t("Ghost Entity", "per", "employer", "Inditex", "org"), # head sin match
_t("Pablo Isla", "per", "employer", "Pablo Isla", "per"), # self-loop
]
entities = ["Pablo Isla", "Inditex"]
result = align_relations_to_entities(triplets, entities)
assert len(result) == 1
assert result[0]["from"] == "Pablo Isla"
assert result[0]["to"] == "Inditex"
@@ -0,0 +1,38 @@
"""Tests para alpha_shape_concave_hull."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from alpha_shape_concave_hull import alpha_shape_concave_hull
def test_alpha_shape_square_large_alpha():
"""4 corner points with large alpha should return a geometry."""
pts = [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)]
result = alpha_shape_concave_hull(pts, alpha=10.0)
assert result is not None
def test_alpha_shape_too_few_points():
result = alpha_shape_concave_hull([(0, 0), (1, 0), (0, 1)], alpha=10.0)
assert result is None
def test_alpha_shape_very_small_alpha_returns_none():
"""Alpha so small that no triangle circumradius fits."""
pts = [(0.0, 0.0), (100.0, 0.0), (100.0, 100.0), (0.0, 100.0)]
result = alpha_shape_concave_hull(pts, alpha=0.0001)
assert result is None
def test_alpha_shape_5_points_returns_geometry():
pts = [
(0.0, 0.0),
(2.0, 0.0),
(2.0, 2.0),
(0.0, 2.0),
(1.0, 1.0),
]
result = alpha_shape_concave_hull(pts, alpha=5.0)
assert result is not None
@@ -0,0 +1,47 @@
"""Tests para best_central_tendency."""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from best_central_tendency import best_central_tendency
def test_best_central_tendency_normal_ish():
label, value = best_central_tendency([1, 2, 3, 4, 5], "normal-ish")
assert label == "mean"
assert abs(value - 3.0) < 1e-9
def test_best_central_tendency_right_skewed():
label, value = best_central_tendency([1, 2, 3, 4, 5], "right-skewed")
assert label == "median"
assert abs(value - 3.0) < 1e-9
def test_best_central_tendency_left_skewed():
label, value = best_central_tendency([1, 2, 3, 4, 5], "left-skewed")
assert label == "median"
def test_best_central_tendency_lognormal_ish():
label, value = best_central_tendency([1, 2, 4, 8], "lognormal-ish")
assert label == "geometric_mean"
assert abs(value - 2 ** 1.5) < 1e-6
def test_best_central_tendency_heavy_tail():
label, value = best_central_tendency([1, 2, 3, 4, 5, 100], "heavy-tail")
assert label == "trimmed_mean_5%"
assert not math.isnan(value)
def test_best_central_tendency_empty():
label, value = best_central_tendency([], "normal-ish")
assert math.isnan(value)
def test_best_central_tendency_default():
label, value = best_central_tendency([1, 2, 3, 4, 5], "other")
assert label == "median"
@@ -0,0 +1,45 @@
"""Tests para detect_distribution_type."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from detect_distribution_type import detect_distribution_type
import numpy as np
def test_detect_too_few_samples():
result = detect_distribution_type([1] * 5)
assert result["type"] == "too_few_samples"
def test_detect_normal_ish():
rng = np.random.default_rng(42)
values = rng.normal(0, 1, 200).tolist()
result = detect_distribution_type(values)
assert result["type"] == "normal-ish", f"Got {result['type']}"
def test_detect_right_skewed():
rng = np.random.default_rng(0)
# Exponential distribution is heavily right-skewed
values = rng.exponential(scale=1.0, size=200).tolist()
result = detect_distribution_type(values)
assert result["type"] in ("right-skewed", "lognormal-ish", "heavy-tail"), f"Got {result['type']}"
def test_detect_stats_keys():
rng = np.random.default_rng(7)
values = rng.normal(5, 2, 100).tolist()
result = detect_distribution_type(values)
assert "stats" in result
assert "n" in result["stats"]
assert result["stats"]["n"] == 100
def test_detect_exactly_30():
rng = np.random.default_rng(1)
values = rng.normal(0, 1, 30).tolist()
result = detect_distribution_type(values)
assert result["type"] != "too_few_samples"
@@ -0,0 +1,67 @@
"""Tests para extract_graph_gliner2.
Usa un stub GLiNER2 para validar el contrato sin descargar el modelo real.
"""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.extract_graph_gliner2 import extract_graph_gliner2
class _Schema:
def entities(self, labels):
self._entities = labels
return self
def relations(self, labels):
self._relations = labels
return self
class _StubModel:
"""Stub que devuelve entidades y relaciones conocidas."""
_extract_result = {
"entities": {"person": ["Pablo Isla"], "organization": ["Inditex"]},
"relation_extraction": {"ceo_of": [("Pablo Isla", "Inditex")]},
}
def create_schema(self):
return _Schema()
def extract(self, text, schema=None, threshold=0.3, include_confidence=False):
return self._extract_result
def test_output_tiene_claves_entities_relation_extraction_elapsed_s():
"""output tiene claves entities relation_extraction elapsed_s"""
result = extract_graph_gliner2(
text="Pablo Isla es CEO de Inditex.",
entity_labels=["person", "organization"],
relation_labels=["ceo_of"],
model=_StubModel(),
)
assert "entities" in result
assert "relation_extraction" in result
assert "elapsed_s" in result
assert isinstance(result["elapsed_s"], float)
def test_stub_model_retorna_shape_correcto():
"""stub model retorna shape correcto"""
result = extract_graph_gliner2(
text="Texto cualquiera.",
entity_labels=["person"],
relation_labels=["works_at"],
model=_StubModel(),
threshold=0.3,
)
assert result["entities"] == {"person": ["Pablo Isla"], "organization": ["Inditex"]}
assert "ceo_of" in result["relation_extraction"]
@@ -0,0 +1,112 @@
"""Tests para extract_relations_mrebel con stubs de modelo."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.extract_relations_mrebel import extract_relations_mrebel
from python.types.datascience.entity_candidate import EntityCandidate
from python.types.datascience.relation_candidate import RelationCandidate
# ---------------------------------------------------------------------------
# Stubs
# ---------------------------------------------------------------------------
class _TokenizerStub:
"""Tokenizer stub que devuelve inputs triviales y decodifica el wire format canonico."""
def __init__(self, decoded_output: str = ""):
self._decoded = decoded_output
def __call__(self, text, return_tensors=None, max_length=512, truncation=True):
return {"input_ids": [[1, 2, 3]]}
def decode(self, token_ids, skip_special_tokens=True):
return self._decoded
class _ModelStub:
"""Modelo stub que devuelve tokens triviales."""
def generate(self, input_ids=None, num_beams=4, length_penalty=1.0, max_length=256, **kwargs):
return [[10, 11, 12]]
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_flujo_completo_con_stub_produce_relation_candidates_correctos():
# Wire format canonico con un triplet valido
decoded = "<triplet> Pablo Isla <per> Inditex <org> employer"
tok = _TokenizerStub(decoded_output=decoded)
model = _ModelStub()
entities = [
EntityCandidate(name="Pablo Isla", type_label="PER", confidence=0.95),
EntityCandidate(name="Inditex", type_label="ORG", confidence=0.92),
]
text = "Pablo Isla es el presidente de Inditex."
result = extract_relations_mrebel(text, entities, tok, model)
assert len(result) == 1
rc = result[0]
assert isinstance(rc, RelationCandidate)
assert rc.from_name == "Pablo Isla"
assert rc.to_name == "Inditex"
assert rc.relation_type == "employer"
assert rc.confidence == 1.0
def test_menos_de_2_entidades_retorna_vacio():
tok = _TokenizerStub()
model = _ModelStub()
entities = [EntityCandidate(name="Pablo Isla", type_label="PER")]
result = extract_relations_mrebel("Texto cualquiera.", entities, tok, model)
assert result == []
def test_texto_vacio_retorna_vacio():
tok = _TokenizerStub()
model = _ModelStub()
entities = [
EntityCandidate(name="A", type_label="PER"),
EntityCandidate(name="B", type_label="ORG"),
]
assert extract_relations_mrebel("", entities, tok, model) == []
def test_triplets_no_alineables_se_descartan():
# El stub emite entidades que no estan en la lista
decoded = "<triplet> Ghost Entity <per> Unknown Org <org> some relation"
tok = _TokenizerStub(decoded_output=decoded)
model = _ModelStub()
entities = [
EntityCandidate(name="Pablo Isla", type_label="PER"),
EntityCandidate(name="Inditex", type_label="ORG"),
]
result = extract_relations_mrebel("Texto largo suficiente.", entities, tok, model)
assert result == []
def test_multiples_frases_generan_multiples_candidates():
# El stub siempre emite el mismo triplet valido — una por frase
decoded = "<triplet> Pablo Isla <per> Inditex <org> employer"
tok = _TokenizerStub(decoded_output=decoded)
model = _ModelStub()
entities = [
EntityCandidate(name="Pablo Isla", type_label="PER"),
EntityCandidate(name="Inditex", type_label="ORG"),
]
# Dos frases separadas por ". "
text = "Pablo Isla es el presidente de Inditex. Inditex tiene sedes en todo el mundo."
result = extract_relations_mrebel(text, entities, tok, model)
# Puede haber 1 o 2 dependiendo de la dedup — lo importante es que no es vacio
assert len(result) >= 1
assert all(isinstance(rc, RelationCandidate) for rc in result)
@@ -0,0 +1,81 @@
"""Tests para extract_triples_spacy_es.
Requiere spaCy y es_core_news_md instalados. Si no estan, los tests se omiten.
"""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.extract_triples_spacy_es import extract_triples_spacy_es
spacy = pytest.importorskip("spacy", reason="spacy not installed — skip")
def _load_nlp():
try:
return spacy.load("es_core_news_md")
except OSError:
return None
_NLP = _load_nlp()
pytestmark = pytest.mark.skipif(
_NLP is None,
reason="es_core_news_md not installed — run: python -m spacy download es_core_news_md",
)
def test_oracion_simple_produce_tripleta_con_sujeto_verbo_objeto():
"""oracion simple produce tripleta con sujeto verbo objeto"""
result = extract_triples_spacy_es("Enmanuel quiere a Ashlly.", _NLP)
assert len(result["triples"]) >= 1
# Al menos una tripleta con sujeto que contenga Enmanuel
subjs = [t["subject"] for t in result["triples"]]
assert any("Enmanuel" in s or "enmanuel" in s.lower() for s in subjs)
def test_carlos_torres_preside_bbva():
"""carlos torres preside bbva produce tripleta president"""
result = extract_triples_spacy_es("Carlos Torres preside BBVA.", _NLP)
triples = result["triples"]
assert len(triples) >= 1
rels = [t["relation"] for t in triples]
assert any("presidir" in r or "presidir" in r.lower() for r in rels)
def test_amancio_ortega_fundo_inditex_en_1985():
"""amancio ortega fundo inditex en 1985 produce tripletas con fundar_en"""
result = extract_triples_spacy_es(
"Amancio Ortega fundo Inditex en 1985.", _NLP
)
triples = result["triples"]
assert len(triples) >= 1
# El verbo y sus objetos deben producir al menos 2 tripletas (Inditex + 1985 como oblicuo)
subjs = {t["subject"] for t in triples}
assert any("Amancio" in s or "Ortega" in s for s in subjs)
# Debe haber al menos la tripleta directa con Inditex
objects = {t["object"] for t in triples}
assert any("Inditex" in o or "1985" in o for o in objects)
def test_texto_sin_verbos_produce_tripletas_vacias():
"""texto sin verbos produce tripletas vacias"""
result = extract_triples_spacy_es("BBVA Santander Inditex.", _NLP)
assert result["triples"] == []
def test_entities_ner_detecta_categorias():
"""entities NER detecta PER ORG LOC"""
result = extract_triples_spacy_es(
"Carlos Torres es presidente de BBVA en Bilbao.", _NLP
)
ents = result["entities"]
labels = {e["label"] for e in ents}
# Debe detectar al menos uno de PER, ORG o LOC
assert labels & {"PER", "ORG", "LOC"}
@@ -0,0 +1,67 @@
"""Tests para fuzzy_merge_adaptive."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from fuzzy_merge_adaptive import fuzzy_merge_adaptive
def test_left_join_con_typo():
left = [{"name": "Madrid"}, {"name": "Barclona"}]
right = [{"name": "Madrid", "cp": "28"}, {"name": "Barcelona", "cp": "08"}]
result = fuzzy_merge_adaptive(left, right, left_key="name", right_key="name")
assert len(result) == 2
scores = [r["match_score"] for r in result]
assert all(s >= 80 for s in scores), f"Scores bajos: {scores}"
assert result[0]["cp"] == "28"
assert result[1]["cp"] == "08"
def test_inner_join_excluye_sin_match():
left = [{"name": "Madrid"}, {"name": "ZZZinexistente"}]
right = [{"name": "Madrid", "cp": "28"}]
result = fuzzy_merge_adaptive(
left, right, left_key="name", right_key="name",
thresholds=[90, 80, 70], how="inner"
)
assert len(result) == 1
assert result[0]["fuzzy_match"] == "Madrid"
def test_left_join_sin_match_devuelve_none():
left = [{"name": "ZZZinexistente"}]
right = [{"name": "Madrid", "cp": "28"}]
result = fuzzy_merge_adaptive(
left, right, left_key="name", right_key="name",
thresholds=[95], how="left"
)
assert len(result) == 1
assert result[0]["fuzzy_match"] is None
assert result[0]["match_score"] == 0
assert result[0]["threshold_used"] is None
def test_threshold_adaptativo():
left = [{"name": "Bcn"}]
right = [{"name": "Barcelona", "cp": "08"}]
result = fuzzy_merge_adaptive(
left, right, left_key="name", right_key="name",
thresholds=[90, 80, 70, 60, 50]
)
assert len(result) == 1
# Puede matchear o no segun score, pero threshold_used <= 90
if result[0]["threshold_used"] is not None:
assert result[0]["threshold_used"] <= 90
def test_colision_de_claves_usa_sufijos():
left = [{"name": "Madrid", "info": "left_info"}]
right = [{"name": "Madrid", "info": "right_info"}]
result = fuzzy_merge_adaptive(left, right, left_key="name", right_key="name")
assert len(result) == 1
assert "info_left" in result[0]
assert "info_right" in result[0]
assert result[0]["info_left"] == "left_info"
assert result[0]["info_right"] == "right_info"
@@ -0,0 +1,35 @@
"""Tests para geometric_mean."""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from geometric_mean import geometric_mean
def test_geometric_mean_powers_of_two():
result = geometric_mean([1, 2, 4, 8])
expected = 2 ** 1.5 # ~2.828
assert abs(result - expected) < 1e-6, f"Expected ~{expected}, got {result}"
def test_geometric_mean_filters_non_positive():
result = geometric_mean([1, -2, 3])
expected = math.exp((math.log(1) + math.log(3)) / 2)
assert abs(result - expected) < 1e-6
def test_geometric_mean_empty_returns_nan():
result = geometric_mean([])
assert math.isnan(result)
def test_geometric_mean_all_negative_returns_nan():
result = geometric_mean([-1, -2, -3])
assert math.isnan(result)
def test_geometric_mean_single_positive():
result = geometric_mean([9.0])
assert abs(result - 9.0) < 1e-9
@@ -0,0 +1,84 @@
"""Tests para gliner2_load_model.
El modelo real (gliner2) es opcional. Los tests usan un stub para validar
el cache sin descargar el modelo. Tests que requieran el modelo real se
marcan con pytest.importorskip('gliner2').
"""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.gliner2_load_model import (
_MODEL_CACHE,
_resolve_device,
gliner2_load_model,
)
class _StubGLiNER2:
"""Stub duck-typed para validar el cache sin descargar el modelo real."""
@classmethod
def from_pretrained(cls, model_name: str) -> "_StubGLiNER2":
return cls()
def create_schema(self):
return self
def entities(self, labels):
return self
def relations(self, labels):
return self
def extract(self, text, **kwargs):
return {"entities": {}, "relation_extraction": {}}
def test_cache_devuelve_la_misma_instancia(monkeypatch):
"""cache devuelve la misma instancia con los mismos parametros"""
_MODEL_CACHE.clear()
monkeypatch.setattr(
"python.functions.datascience.gliner2_load_model.GLiNER2",
_StubGLiNER2,
raising=False,
)
# Patch el import dentro de la funcion
import python.functions.datascience.gliner2_load_model as mod
original = None
try:
from gliner2 import GLiNER2 as _real # type: ignore[import]
original = _real
except ImportError:
pass
_MODEL_CACHE.clear()
# Insertar stub directamente en el cache para simular primera carga
key = ("fastino/gliner2-large-v1", "cpu")
stub = _StubGLiNER2()
_MODEL_CACHE[key] = stub
# Segunda llamada debe devolver el mismo objeto
result = gliner2_load_model(model_name="fastino/gliner2-large-v1", device="cpu")
assert result is stub
_MODEL_CACHE.clear()
def test_device_auto_resuelve_a_cpu_si_torch_no_esta(monkeypatch):
"""device=auto resuelve a cpu si torch no esta instalado"""
import sys
# Simular que torch no esta disponible
monkeypatch.setitem(sys.modules, "torch", None)
resolved = _resolve_device("auto")
assert resolved == "cpu"
def test_import_error_si_gliner2_no_esta_instalado():
"""ImportError si gliner2 no esta instalado"""
pytest.importorskip("gliner2", reason="gliner2 not installed — skip real model test")
@@ -0,0 +1,46 @@
"""Tests para kde_density_levels."""
import sys
import os
import numpy as np
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from kde_density_levels import kde_density_levels
def test_kde_density_levels_returns_dict_for_50_points():
rng = np.random.default_rng(42)
xs = rng.normal(0, 1, 50).tolist()
ys = rng.normal(0, 1, 50).tolist()
result = kde_density_levels(xs, ys)
assert result is not None
assert "method" in result
assert result["method"] in ("kde", "hist")
assert "densities" in result
assert len(result["densities"]) == 50
assert "abs_level" in result
assert "dense_level" in result
def test_kde_density_levels_none_for_few_points():
result = kde_density_levels([1.0, 2.0, 3.0], [1.0, 2.0, 3.0])
assert result is None
def test_kde_density_levels_none_for_4_points():
result = kde_density_levels([1, 2, 3, 4], [1, 2, 3, 4])
assert result is None
def test_kde_density_levels_levels_ordered():
rng = np.random.default_rng(0)
xs = rng.uniform(0, 10, 100).tolist()
ys = rng.uniform(0, 10, 100).tolist()
result = kde_density_levels(xs, ys, abs_quantile=0.1, dense_quantile=0.85)
assert result is not None
assert result["abs_level"] <= result["dense_level"]
def test_kde_density_levels_mismatched_lengths():
result = kde_density_levels([1, 2, 3, 4, 5], [1, 2, 3])
assert result is None
@@ -0,0 +1,75 @@
"""Tests para parse_rebel_output."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.parse_rebel_output import parse_rebel_output
def test_string_vacio_retorna_lista_vacia():
assert parse_rebel_output("") == []
def test_string_solo_espacios_retorna_lista_vacia():
assert parse_rebel_output(" ") == []
def test_un_triplet_completo_retorna_un_dict_con_campos_correctos():
decoded = "tp_XX<triplet> Pablo Isla <per> Inditex <org> employer"
result = parse_rebel_output(decoded)
assert len(result) == 1
t = result[0]
assert t["head"] == "Pablo Isla"
assert t["head_type"] == "per"
assert t["tail"] == "Inditex"
assert t["tail_type"] == "org"
assert t["type"] == "employer"
def test_dos_triplets_retorna_dos_dicts():
decoded = (
"tp_XX<triplet> Pablo Isla <per> Inditex <org> employer "
"<triplet> Arteixo <loc> A Coruna <loc> located in the administrative territorial entity"
)
result = parse_rebel_output(decoded)
assert len(result) == 2
assert result[0]["head"] == "Pablo Isla"
assert result[0]["tail"] == "Inditex"
assert result[1]["head"] == "Arteixo"
assert result[1]["tail"] == "A Coruna"
assert "located" in result[1]["type"]
def test_triplet_incompleto_sin_cierre_no_rompe():
# Solo head span, sin tail ni relacion
decoded = "tp_XX<triplet> Pablo Isla"
result = parse_rebel_output(decoded)
# No hay cierre, puede retornar lista vacia o incompleta pero no rompe
assert isinstance(result, list)
def test_tokens_angulares_desconocidos_no_lanzan_excepcion():
# Un tipo desconocido como <unknown_type> no debe romper el parser
decoded = "<triplet> Entity One <unknown_type> Entity Two <org> some relation"
result = parse_rebel_output(decoded)
assert isinstance(result, list)
def test_sin_prefijo_tp_xx_funciona():
# REBEL monolingue no emite tp_XX
decoded = "<triplet> Barack Obama <per> United States <org> president of"
result = parse_rebel_output(decoded)
assert len(result) == 1
assert result[0]["head"] == "Barack Obama"
assert result[0]["tail"] == "United States"
assert result[0]["type"] == "president of"
def test_strip_tags_s_pad():
decoded = "<s><pad>tp_XX<triplet> Ana <per> BBVA <org> works at</s>"
result = parse_rebel_output(decoded)
assert len(result) == 1
assert result[0]["head"] == "Ana"
assert result[0]["tail"] == "BBVA"
@@ -0,0 +1,38 @@
"""Tests para plot_heatmap_log."""
import sys
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from datascience.plot_heatmap_log import plot_heatmap_log
def test_100_puntos_no_lanza_excepcion():
import matplotlib.pyplot as plt
import numpy as np
rng = np.random.default_rng(0)
xs = rng.uniform(-4.0, -3.5, 100)
ys = rng.uniform(40.3, 40.6, 100)
fig, ax = plt.subplots()
plot_heatmap_log(ax, xs, ys, extent=(-4.0, -3.5, 40.3, 40.6), bins=50)
plt.close(fig)
def test_ax_tiene_imagen_tras_la_llamada():
import matplotlib.pyplot as plt
import numpy as np
rng = np.random.default_rng(1)
xs = rng.uniform(-4.0, -3.5, 100)
ys = rng.uniform(40.3, 40.6, 100)
fig, ax = plt.subplots()
plot_heatmap_log(ax, xs, ys, extent=(-4.0, -3.5, 40.3, 40.6), bins=50)
assert len(ax.images) > 0, "ax should have at least one image after heatmap"
plt.close(fig)
@@ -0,0 +1,32 @@
"""Tests para plot_kde_2d."""
import sys
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from datascience.plot_kde_2d import plot_kde_2d
def test_50_puntos_aleatorios_no_lanza_excepcion():
import matplotlib.pyplot as plt
import numpy as np
rng = np.random.default_rng(42)
xs = rng.normal(0, 1, 50)
ys = rng.normal(0, 1, 50)
fig, ax = plt.subplots()
plot_kde_2d(ax, xs, ys)
plt.close(fig)
def test_arrays_vacios_retorna_sin_error():
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
plot_kde_2d(ax, [], [])
plt.close(fig)
@@ -0,0 +1,42 @@
"""Tests para remove_words_from_column."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from remove_words_from_column import remove_words_from_column
def test_elimina_palabras_case_insensitive():
values = ["Calle Mayor 14", "Avenida del Sol"]
result = remove_words_from_column(values, words=["calle", "avenida", "del"])
assert result == ["Mayor 14", "Sol"]
def test_none_devuelve_string_vacio():
result = remove_words_from_column([None, "hola mundo"], words=["hola"])
assert result[0] == ""
assert result[1] == "mundo"
def test_colapsa_espacios_multiples():
result = remove_words_from_column(["uno dos tres"], words=["dos"])
assert result[0] == "uno tres"
def test_palabras_vacias_no_modifica():
values = ["hola mundo", "foo bar"]
result = remove_words_from_column(values, words=[])
assert result == ["hola mundo", "foo bar"]
def test_palabra_completa_no_parcial():
# "calle" no debe eliminar "calleja"
result = remove_words_from_column(["calleja mayor"], words=["calle"])
assert result[0] == "calleja mayor"
def test_lista_vacia():
result = remove_words_from_column([], words=["foo"])
assert result == []
@@ -0,0 +1,46 @@
"""Tests para spacy_es_load_model."""
from __future__ import annotations
import os
import sys
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.spacy_es_load_model import (
_MODEL_CACHE,
spacy_es_load_model,
)
spacy = pytest.importorskip("spacy", reason="spacy not installed — skip")
def _has_model(model_name: str) -> bool:
try:
spacy.load(model_name)
return True
except OSError:
return False
@pytest.mark.skipif(
not _has_model("es_core_news_md"),
reason="es_core_news_md not installed",
)
def test_cache_devuelve_la_misma_instancia():
"""cache devuelve la misma instancia"""
_MODEL_CACHE.clear()
m1 = spacy_es_load_model("es_core_news_md")
m2 = spacy_es_load_model("es_core_news_md")
assert m1 is m2
_MODEL_CACHE.clear()
def test_oserror_si_el_modelo_no_esta_instalado():
"""OSError si el modelo no esta instalado"""
_MODEL_CACHE.clear()
with pytest.raises(OSError):
spacy_es_load_model("es_nonexistent_model_xyz")
_MODEL_CACHE.clear()
@@ -0,0 +1,38 @@
"""Tests para summary_stats."""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from summary_stats import summary_stats
def test_summary_stats_basic():
result = summary_stats([1, 2, 3, 4, 5])
assert result["n"] == 5
assert abs(result["mean"] - 3.0) < 1e-9
assert abs(result["median"] - 3.0) < 1e-9
assert abs(result["p25"] - 2.0) < 0.01
assert abs(result["p75"] - 4.0) < 0.01
def test_summary_stats_empty():
result = summary_stats([])
assert result["n"] == 0
assert math.isnan(result["mean"])
assert math.isnan(result["median"])
assert math.isnan(result["p25"])
assert math.isnan(result["p75"])
def test_summary_stats_single():
result = summary_stats([7.0])
assert result["n"] == 1
assert abs(result["mean"] - 7.0) < 1e-9
assert abs(result["median"] - 7.0) < 1e-9
def test_summary_stats_keys():
result = summary_stats([1, 2, 3])
assert set(result.keys()) == {"n", "mean", "median", "p25", "p75"}
@@ -0,0 +1,62 @@
"""Tests para translate_es_to_en — smoke tests con modelo stub."""
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
from python.functions.datascience.translate_es_to_en import translate_es_to_en
class _StubTokenizer:
"""Tokenizer stub que devuelve inputs triviales."""
def __call__(self, text, return_tensors=None, max_length=512, truncation=True):
# Devuelve un dict con una clave 'input_ids' que el modelo stub acepta.
return {"input_ids": [[1, 2, 3]], "_text": text}
def decode(self, token_ids, skip_special_tokens=True):
# Devuelve siempre "translated" para testing.
return "translated"
class _StubModel:
"""Modelo stub que devuelve tokens triviales."""
def generate(self, input_ids=None, num_beams=4, max_length=512, **kwargs):
return [[10, 11, 12]]
def test_texto_vacio_retorna_string_vacio():
tok = _StubTokenizer()
model = _StubModel()
assert translate_es_to_en("", tok, model) == ""
def test_solo_espacios_retorna_string_vacio():
tok = _StubTokenizer()
model = _StubModel()
assert translate_es_to_en(" ", tok, model) == ""
def test_una_frase_en_espanol_produce_output_no_vacio():
tok = _StubTokenizer()
model = _StubModel()
result = translate_es_to_en("Pablo Isla es presidente de Inditex.", tok, model)
assert isinstance(result, str)
assert len(result) > 0
def test_multiples_frases_se_unen_con_espacio():
tok = _StubTokenizer()
model = _StubModel()
# El stub siempre devuelve "translated" por frase
result = translate_es_to_en(
"Primera frase. Segunda frase. Tercera frase.",
tok,
model,
)
# Con el stub, cada frase produce "translated", unidas con espacio
parts = result.split(" ")
assert all(p == "translated" for p in parts)
assert len(parts) >= 1
@@ -0,0 +1,33 @@
"""Tests para trimmed_mean."""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from trimmed_mean import trimmed_mean
def test_trimmed_mean_basic():
result = trimmed_mean([1, 2, 3, 4, 5, 100], 0.1)
assert abs(result - 3.5) < 0.5, f"Expected ~3.5, got {result}"
def test_trimmed_mean_empty_returns_nan():
result = trimmed_mean([], 0.05)
assert math.isnan(result)
def test_trimmed_mean_no_trim():
result = trimmed_mean([1.0, 2.0, 3.0, 4.0, 5.0], 0.0)
assert abs(result - 3.0) < 1e-9
def test_trimmed_mean_single_element():
result = trimmed_mean([42.0], 0.05)
assert abs(result - 42.0) < 1e-9
def test_trimmed_mean_uniform():
result = trimmed_mean([5.0, 5.0, 5.0, 5.0, 5.0], 0.1)
assert abs(result - 5.0) < 1e-9
@@ -0,0 +1,49 @@
"""Tests para words_to_dataset."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from words_to_dataset import words_to_dataset
def test_cuenta_palabras_repetidas():
texts = ["calle mayor", "calle del sol", "avenida principal"]
result = words_to_dataset(texts)
palabras = {r["palabra"]: r["ocurrencias"] for r in result}
assert palabras["CALLE"] == 2
def test_eliminar_stopwords_filtra_del():
texts = ["calle mayor", "calle del sol", "avenida principal"]
result = words_to_dataset(texts, eliminar_stopwords=True)
palabras = {r["palabra"] for r in result}
assert "DEL" not in palabras
def test_min_ocurrencias_filtra():
texts = ["calle mayor", "calle del sol", "avenida principal"]
result = words_to_dataset(texts, min_ocurrencias=2)
palabras = {r["palabra"]: r["ocurrencias"] for r in result}
assert "CALLE" in palabras
assert "MAYOR" not in palabras
def test_none_ignorados():
texts = ["hola mundo", None, "hola"]
result = words_to_dataset(texts)
palabras = {r["palabra"]: r["ocurrencias"] for r in result}
assert palabras["HOLA"] == 2
def test_lista_vacia():
result = words_to_dataset([])
assert result == []
def test_orden_descendente():
texts = ["a a a", "b b", "c"]
result = words_to_dataset(texts)
counts = [r["ocurrencias"] for r in result]
assert counts == sorted(counts, reverse=True)