Files
egutierrez 837563c3ba feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

190 lines
6.7 KiB
Python

"""Deduplica RelationCandidate resolviendo nombres a IDs y colapsando duplicados."""
import logging
import os
import sys
logger = logging.getLogger(__name__)
# --- Importar levenshtein_distance desde cybersecurity ---
# Soporta dos contextos:
# 1. Ejecutado desde python/functions/datascience/ (pytest local)
# 2. Ejecutado desde la raiz del registry (fn run)
def _levenshtein_distance(a: str, b: str) -> int:
"""Calcula la distancia de edicion de Levenshtein entre dos strings."""
if len(a) < len(b):
return _levenshtein_distance(b, a)
if len(b) == 0:
return len(a)
prev_row = list(range(len(b) + 1))
for i, ca in enumerate(a):
curr_row = [i + 1]
for j, cb in enumerate(b):
cost = 0 if ca == cb else 1
curr_row.append(
min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost)
)
prev_row = curr_row
return prev_row[-1]
try:
_here = os.path.dirname(os.path.abspath(__file__))
_cyber_path = os.path.join(_here, "..", "cybersecurity")
if _cyber_path not in sys.path:
sys.path.insert(0, _cyber_path)
from cybersecurity import levenshtein_distance as _lev
except ImportError:
_lev = None # type: ignore
levenshtein_distance = _lev if _lev is not None else _levenshtein_distance
def _fuzzy_resolve(name: str, entity_id_map: dict[str, str], threshold: int = 3) -> str:
"""Intenta resolver un nombre contra las claves del mapa por fuzzy match.
Recorre todas las claves de entity_id_map y busca la mas cercana segun
distancia de Levenshtein. Retorna el entity_id si la distancia es <=
threshold, o '' si no hay match aceptable.
Args:
name: nombre a resolver (ya en lowercase strip).
entity_id_map: mapa nombre_normalizado -> entity_id.
threshold: distancia maxima de edicion para considerar match (default 3).
Returns:
entity_id del mejor match o '' si no hay match.
"""
best_id = ""
best_dist = threshold + 1
for key, entity_id in entity_id_map.items():
dist = levenshtein_distance(name, key)
if dist < best_dist:
best_dist = dist
best_id = entity_id
return best_id if best_dist <= threshold else ""
def deduplicate_relations(
relations: list,
entity_id_map: dict[str, str],
) -> list:
"""Deduplica relaciones candidatas resolviendo nombres a IDs de entidad finales.
Algoritmo:
1. Para cada RelationCandidate, intentar resolver from_name y to_name al
entity_id via entity_id_map (lookup exacto primero, ignorando mayusculas).
Si no hay match exacto, intentar fuzzy match con levenshtein_distance.
Si sigue sin match, descartar la relacion con warning.
2. Descartar self-loops (from_id == to_id).
3. Deduplicar por (from_id, to_id, relation_type):
- description: concatenar descripciones unicas separadas por '; '
- confidence: max del grupo
4. Retornar lista limpia de RelationCandidate con from_id y to_id resueltos.
Args:
relations: lista de RelationCandidate con from_name/to_name originales.
entity_id_map: mapa nombre_normalizado -> entity_id (output de
deduplicate_entities). Permite resolver nombres que fueron mergeados.
Returns:
Lista deduplicada de RelationCandidate con from_id y to_id resueltos.
"""
# Importar tipo — funciona tanto desde datascience/ como desde raiz del registry
try:
_types_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..", "..", "..", "python", "types", "datascience",
)
if _types_path not in sys.path:
sys.path.insert(0, _types_path)
from relation_candidate import RelationCandidate
except ImportError:
from python.types.datascience.relation_candidate import RelationCandidate # type: ignore
resolved: list = []
for rel in relations:
# --- Resolver from_name ---
from_key = rel.from_name.lower().strip()
from_id = entity_id_map.get(from_key, "")
if not from_id:
from_id = _fuzzy_resolve(from_key, entity_id_map)
if not from_id:
logger.warning(
"deduplicate_relations: no se pudo resolver from_name=%r — descartando",
rel.from_name,
)
continue
# --- Resolver to_name ---
to_key = rel.to_name.lower().strip()
to_id = entity_id_map.get(to_key, "")
if not to_id:
to_id = _fuzzy_resolve(to_key, entity_id_map)
if not to_id:
logger.warning(
"deduplicate_relations: no se pudo resolver to_name=%r — descartando",
rel.to_name,
)
continue
# --- Descartar self-loops ---
if from_id == to_id:
logger.debug(
"deduplicate_relations: self-loop descartado (from=%r, to=%r, type=%r)",
rel.from_name,
rel.to_name,
rel.relation_type,
)
continue
resolved.append(
RelationCandidate(
from_name=rel.from_name,
to_name=rel.to_name,
from_id=from_id,
to_id=to_id,
relation_type=rel.relation_type,
description=rel.description,
confidence=rel.confidence,
source_chunk_index=rel.source_chunk_index,
)
)
# --- Deduplicar por (from_id, to_id, relation_type) ---
groups: dict[tuple, list] = {}
for rel in resolved:
key = (rel.from_id, rel.to_id, rel.relation_type)
groups.setdefault(key, []).append(rel)
result: list = []
for (from_id, to_id, rel_type), group in groups.items():
if len(group) == 1:
result.append(group[0])
continue
# Mergear: max confidence + union de descripciones unicas
best_confidence = max(r.confidence for r in group)
seen_desc: set[str] = set()
descriptions: list[str] = []
for r in group:
if r.description and r.description not in seen_desc:
descriptions.append(r.description)
seen_desc.add(r.description)
result.append(
RelationCandidate(
from_name=group[0].from_name,
to_name=group[0].to_name,
from_id=from_id,
to_id=to_id,
relation_type=rel_type,
description="; ".join(descriptions),
confidence=best_confidence,
source_chunk_index=group[0].source_chunk_index,
)
)
return result