837563c3ba
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
190 lines
6.7 KiB
Python
190 lines
6.7 KiB
Python
"""Deduplica RelationCandidate resolviendo nombres a IDs y colapsando duplicados."""
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Importar levenshtein_distance desde cybersecurity ---
|
|
# Soporta dos contextos:
|
|
# 1. Ejecutado desde python/functions/datascience/ (pytest local)
|
|
# 2. Ejecutado desde la raiz del registry (fn run)
|
|
def _levenshtein_distance(a: str, b: str) -> int:
|
|
"""Calcula la distancia de edicion de Levenshtein entre dos strings."""
|
|
if len(a) < len(b):
|
|
return _levenshtein_distance(b, a)
|
|
if len(b) == 0:
|
|
return len(a)
|
|
prev_row = list(range(len(b) + 1))
|
|
for i, ca in enumerate(a):
|
|
curr_row = [i + 1]
|
|
for j, cb in enumerate(b):
|
|
cost = 0 if ca == cb else 1
|
|
curr_row.append(
|
|
min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost)
|
|
)
|
|
prev_row = curr_row
|
|
return prev_row[-1]
|
|
|
|
|
|
try:
|
|
_here = os.path.dirname(os.path.abspath(__file__))
|
|
_cyber_path = os.path.join(_here, "..", "cybersecurity")
|
|
if _cyber_path not in sys.path:
|
|
sys.path.insert(0, _cyber_path)
|
|
from cybersecurity import levenshtein_distance as _lev
|
|
except ImportError:
|
|
_lev = None # type: ignore
|
|
|
|
levenshtein_distance = _lev if _lev is not None else _levenshtein_distance
|
|
|
|
|
|
def _fuzzy_resolve(name: str, entity_id_map: dict[str, str], threshold: int = 3) -> str:
|
|
"""Intenta resolver un nombre contra las claves del mapa por fuzzy match.
|
|
|
|
Recorre todas las claves de entity_id_map y busca la mas cercana segun
|
|
distancia de Levenshtein. Retorna el entity_id si la distancia es <=
|
|
threshold, o '' si no hay match aceptable.
|
|
|
|
Args:
|
|
name: nombre a resolver (ya en lowercase strip).
|
|
entity_id_map: mapa nombre_normalizado -> entity_id.
|
|
threshold: distancia maxima de edicion para considerar match (default 3).
|
|
|
|
Returns:
|
|
entity_id del mejor match o '' si no hay match.
|
|
"""
|
|
best_id = ""
|
|
best_dist = threshold + 1
|
|
for key, entity_id in entity_id_map.items():
|
|
dist = levenshtein_distance(name, key)
|
|
if dist < best_dist:
|
|
best_dist = dist
|
|
best_id = entity_id
|
|
return best_id if best_dist <= threshold else ""
|
|
|
|
|
|
def deduplicate_relations(
|
|
relations: list,
|
|
entity_id_map: dict[str, str],
|
|
) -> list:
|
|
"""Deduplica relaciones candidatas resolviendo nombres a IDs de entidad finales.
|
|
|
|
Algoritmo:
|
|
1. Para cada RelationCandidate, intentar resolver from_name y to_name al
|
|
entity_id via entity_id_map (lookup exacto primero, ignorando mayusculas).
|
|
Si no hay match exacto, intentar fuzzy match con levenshtein_distance.
|
|
Si sigue sin match, descartar la relacion con warning.
|
|
2. Descartar self-loops (from_id == to_id).
|
|
3. Deduplicar por (from_id, to_id, relation_type):
|
|
- description: concatenar descripciones unicas separadas por '; '
|
|
- confidence: max del grupo
|
|
4. Retornar lista limpia de RelationCandidate con from_id y to_id resueltos.
|
|
|
|
Args:
|
|
relations: lista de RelationCandidate con from_name/to_name originales.
|
|
entity_id_map: mapa nombre_normalizado -> entity_id (output de
|
|
deduplicate_entities). Permite resolver nombres que fueron mergeados.
|
|
|
|
Returns:
|
|
Lista deduplicada de RelationCandidate con from_id y to_id resueltos.
|
|
"""
|
|
# Importar tipo — funciona tanto desde datascience/ como desde raiz del registry
|
|
try:
|
|
_types_path = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
"..", "..", "..", "python", "types", "datascience",
|
|
)
|
|
if _types_path not in sys.path:
|
|
sys.path.insert(0, _types_path)
|
|
from relation_candidate import RelationCandidate
|
|
except ImportError:
|
|
from python.types.datascience.relation_candidate import RelationCandidate # type: ignore
|
|
|
|
resolved: list = []
|
|
|
|
for rel in relations:
|
|
# --- Resolver from_name ---
|
|
from_key = rel.from_name.lower().strip()
|
|
from_id = entity_id_map.get(from_key, "")
|
|
if not from_id:
|
|
from_id = _fuzzy_resolve(from_key, entity_id_map)
|
|
if not from_id:
|
|
logger.warning(
|
|
"deduplicate_relations: no se pudo resolver from_name=%r — descartando",
|
|
rel.from_name,
|
|
)
|
|
continue
|
|
|
|
# --- Resolver to_name ---
|
|
to_key = rel.to_name.lower().strip()
|
|
to_id = entity_id_map.get(to_key, "")
|
|
if not to_id:
|
|
to_id = _fuzzy_resolve(to_key, entity_id_map)
|
|
if not to_id:
|
|
logger.warning(
|
|
"deduplicate_relations: no se pudo resolver to_name=%r — descartando",
|
|
rel.to_name,
|
|
)
|
|
continue
|
|
|
|
# --- Descartar self-loops ---
|
|
if from_id == to_id:
|
|
logger.debug(
|
|
"deduplicate_relations: self-loop descartado (from=%r, to=%r, type=%r)",
|
|
rel.from_name,
|
|
rel.to_name,
|
|
rel.relation_type,
|
|
)
|
|
continue
|
|
|
|
resolved.append(
|
|
RelationCandidate(
|
|
from_name=rel.from_name,
|
|
to_name=rel.to_name,
|
|
from_id=from_id,
|
|
to_id=to_id,
|
|
relation_type=rel.relation_type,
|
|
description=rel.description,
|
|
confidence=rel.confidence,
|
|
source_chunk_index=rel.source_chunk_index,
|
|
)
|
|
)
|
|
|
|
# --- Deduplicar por (from_id, to_id, relation_type) ---
|
|
groups: dict[tuple, list] = {}
|
|
for rel in resolved:
|
|
key = (rel.from_id, rel.to_id, rel.relation_type)
|
|
groups.setdefault(key, []).append(rel)
|
|
|
|
result: list = []
|
|
for (from_id, to_id, rel_type), group in groups.items():
|
|
if len(group) == 1:
|
|
result.append(group[0])
|
|
continue
|
|
|
|
# Mergear: max confidence + union de descripciones unicas
|
|
best_confidence = max(r.confidence for r in group)
|
|
seen_desc: set[str] = set()
|
|
descriptions: list[str] = []
|
|
for r in group:
|
|
if r.description and r.description not in seen_desc:
|
|
descriptions.append(r.description)
|
|
seen_desc.add(r.description)
|
|
|
|
result.append(
|
|
RelationCandidate(
|
|
from_name=group[0].from_name,
|
|
to_name=group[0].to_name,
|
|
from_id=from_id,
|
|
to_id=to_id,
|
|
relation_type=rel_type,
|
|
description="; ".join(descriptions),
|
|
confidence=best_confidence,
|
|
source_chunk_index=group[0].source_chunk_index,
|
|
)
|
|
)
|
|
|
|
return result
|