"""Deduplica RelationCandidate resolviendo nombres a IDs y colapsando duplicados.""" import logging import os import sys logger = logging.getLogger(__name__) # --- Importar levenshtein_distance desde cybersecurity --- # Soporta dos contextos: # 1. Ejecutado desde python/functions/datascience/ (pytest local) # 2. Ejecutado desde la raiz del registry (fn run) def _levenshtein_distance(a: str, b: str) -> int: """Calcula la distancia de edicion de Levenshtein entre dos strings.""" if len(a) < len(b): return _levenshtein_distance(b, a) if len(b) == 0: return len(a) prev_row = list(range(len(b) + 1)) for i, ca in enumerate(a): curr_row = [i + 1] for j, cb in enumerate(b): cost = 0 if ca == cb else 1 curr_row.append( min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost) ) prev_row = curr_row return prev_row[-1] try: _here = os.path.dirname(os.path.abspath(__file__)) _cyber_path = os.path.join(_here, "..", "cybersecurity") if _cyber_path not in sys.path: sys.path.insert(0, _cyber_path) from cybersecurity import levenshtein_distance as _lev except ImportError: _lev = None # type: ignore levenshtein_distance = _lev if _lev is not None else _levenshtein_distance def _fuzzy_resolve(name: str, entity_id_map: dict[str, str], threshold: int = 3) -> str: """Intenta resolver un nombre contra las claves del mapa por fuzzy match. Recorre todas las claves de entity_id_map y busca la mas cercana segun distancia de Levenshtein. Retorna el entity_id si la distancia es <= threshold, o '' si no hay match aceptable. Args: name: nombre a resolver (ya en lowercase strip). entity_id_map: mapa nombre_normalizado -> entity_id. threshold: distancia maxima de edicion para considerar match (default 3). Returns: entity_id del mejor match o '' si no hay match. """ best_id = "" best_dist = threshold + 1 for key, entity_id in entity_id_map.items(): dist = levenshtein_distance(name, key) if dist < best_dist: best_dist = dist best_id = entity_id return best_id if best_dist <= threshold else "" def deduplicate_relations( relations: list, entity_id_map: dict[str, str], ) -> list: """Deduplica relaciones candidatas resolviendo nombres a IDs de entidad finales. Algoritmo: 1. Para cada RelationCandidate, intentar resolver from_name y to_name al entity_id via entity_id_map (lookup exacto primero, ignorando mayusculas). Si no hay match exacto, intentar fuzzy match con levenshtein_distance. Si sigue sin match, descartar la relacion con warning. 2. Descartar self-loops (from_id == to_id). 3. Deduplicar por (from_id, to_id, relation_type): - description: concatenar descripciones unicas separadas por '; ' - confidence: max del grupo 4. Retornar lista limpia de RelationCandidate con from_id y to_id resueltos. Args: relations: lista de RelationCandidate con from_name/to_name originales. entity_id_map: mapa nombre_normalizado -> entity_id (output de deduplicate_entities). Permite resolver nombres que fueron mergeados. Returns: Lista deduplicada de RelationCandidate con from_id y to_id resueltos. """ # Importar tipo — funciona tanto desde datascience/ como desde raiz del registry try: _types_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "..", "..", "python", "types", "datascience", ) if _types_path not in sys.path: sys.path.insert(0, _types_path) from relation_candidate import RelationCandidate except ImportError: from python.types.datascience.relation_candidate import RelationCandidate # type: ignore resolved: list = [] for rel in relations: # --- Resolver from_name --- from_key = rel.from_name.lower().strip() from_id = entity_id_map.get(from_key, "") if not from_id: from_id = _fuzzy_resolve(from_key, entity_id_map) if not from_id: logger.warning( "deduplicate_relations: no se pudo resolver from_name=%r — descartando", rel.from_name, ) continue # --- Resolver to_name --- to_key = rel.to_name.lower().strip() to_id = entity_id_map.get(to_key, "") if not to_id: to_id = _fuzzy_resolve(to_key, entity_id_map) if not to_id: logger.warning( "deduplicate_relations: no se pudo resolver to_name=%r — descartando", rel.to_name, ) continue # --- Descartar self-loops --- if from_id == to_id: logger.debug( "deduplicate_relations: self-loop descartado (from=%r, to=%r, type=%r)", rel.from_name, rel.to_name, rel.relation_type, ) continue resolved.append( RelationCandidate( from_name=rel.from_name, to_name=rel.to_name, from_id=from_id, to_id=to_id, relation_type=rel.relation_type, description=rel.description, confidence=rel.confidence, source_chunk_index=rel.source_chunk_index, ) ) # --- Deduplicar por (from_id, to_id, relation_type) --- groups: dict[tuple, list] = {} for rel in resolved: key = (rel.from_id, rel.to_id, rel.relation_type) groups.setdefault(key, []).append(rel) result: list = [] for (from_id, to_id, rel_type), group in groups.items(): if len(group) == 1: result.append(group[0]) continue # Mergear: max confidence + union de descripciones unicas best_confidence = max(r.confidence for r in group) seen_desc: set[str] = set() descriptions: list[str] = [] for r in group: if r.description and r.description not in seen_desc: descriptions.append(r.description) seen_desc.add(r.description) result.append( RelationCandidate( from_name=group[0].from_name, to_name=group[0].to_name, from_id=from_id, to_id=to_id, relation_type=rel_type, description="; ".join(descriptions), confidence=best_confidence, source_chunk_index=group[0].source_chunk_index, ) ) return result