"""merge_graphs — mergea multiples grafos de conocimiento deduplicando entities por similitud.""" import sys import os # Importar levenshtein_distance desde el registry sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "cybersecurity")) try: from cybersecurity import levenshtein_distance except ImportError: # Fallback: reimplementacion inline si el paquete no esta disponible def levenshtein_distance(a: str, b: str) -> int: """Calcula la distancia de Levenshtein entre dos strings.""" if len(a) < len(b): return levenshtein_distance(b, a) if len(b) == 0: return len(a) prev_row = list(range(len(b) + 1)) for i, ca in enumerate(a): curr_row = [i + 1] for j, cb in enumerate(b): cost = 0 if ca == cb else 1 curr_row.append( min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost) ) prev_row = curr_row return prev_row[-1] def _name_similarity(a: str, b: str) -> float: """Similitud de Levenshtein normalizada entre 0 y 1.""" if not a and not b: return 1.0 max_len = max(len(a), len(b)) if max_len == 0: return 1.0 dist = levenshtein_distance(a.lower(), b.lower()) return 1.0 - dist / max_len def _count_non_null_fields(entity: dict) -> int: """Cuenta campos con valor no-None.""" return sum(1 for v in entity.values() if v is not None) def _merge_two_entities(canonical: dict, other: dict) -> dict: """Combina dos entities: union de campos, ultimo gana en conflictos.""" merged = dict(canonical) for k, v in other.items(): if k not in merged or merged[k] is None: merged[k] = v # Si ambos tienen valor, el canonical (primero) gana — no sobreescribir return merged def merge_graphs( graphs: list[dict], entity_key: str = "name", similarity_threshold: float = 0.85, ) -> dict: """Mergea multiples grafos de conocimiento en uno, deduplicando entities por similitud. Algoritmo: 1. Juntar todas las entities de todos los grafos (con ID de origen). 2. Para cada par con similitud de nombre >= threshold, mergear. 3. Elegir entity canonica (la que tiene mas campos no-null). 4. Re-apuntar relaciones al ID canonico. 5. Deduplicar relaciones identicas (mismo source, target, type). 6. Registrar cada merge en merge_log. Args: graphs: Lista de grafos. Cada grafo es un dict con keys: "entities" (list[dict]) y "relations" (list[dict]). Las entities deben tener "id" y el campo entity_key. entity_key: Campo de texto usado para calcular similitud. Default "name". similarity_threshold: Umbral de similitud Levenshtein normalizada [0,1]. Default 0.85. Returns: Dict con keys: entities, relations, merge_log. """ # Recopilar todas las entities y relaciones all_entities: list[dict] = [] all_relations: list[dict] = [] for graph in graphs: all_entities.extend(graph.get("entities", [])) all_relations.extend(graph.get("relations", [])) # Construir union-find para agrupar entities similares # id_map: entity_id original -> entity_id canonico id_map: dict[str, str] = {e["id"]: e["id"] for e in all_entities if "id" in e} entity_by_id: dict[str, dict] = {e["id"]: e for e in all_entities if "id" in e} merge_log: list[dict] = [] def find_canonical(eid: str) -> str: while id_map.get(eid, eid) != eid: eid = id_map[eid] return eid entity_ids = [e["id"] for e in all_entities if "id" in e] # Comparar todos los pares (O(n^2) — aceptable para grafos de knowledge tipicos) for i in range(len(entity_ids)): for j in range(i + 1, len(entity_ids)): id_i = find_canonical(entity_ids[i]) id_j = find_canonical(entity_ids[j]) if id_i == id_j: continue # ya mergeados e_i = entity_by_id.get(id_i) e_j = entity_by_id.get(id_j) if e_i is None or e_j is None: continue name_i = str(e_i.get(entity_key, "")) name_j = str(e_j.get(entity_key, "")) sim = _name_similarity(name_i, name_j) if sim >= similarity_threshold: # Elegir canonical: el que tiene mas campos no-null if _count_non_null_fields(e_i) >= _count_non_null_fields(e_j): canonical_id, other_id = id_i, id_j else: canonical_id, other_id = id_j, id_i # Mergear datos merged = _merge_two_entities(entity_by_id[canonical_id], entity_by_id[other_id]) entity_by_id[canonical_id] = merged # Redirigir other_id -> canonical_id id_map[other_id] = canonical_id merge_log.append({ "merged": [other_id, canonical_id], "into": canonical_id, "similarity": round(sim, 4), }) # Construir lista final de entities (solo canonicas) canonical_ids = {eid for eid in entity_ids if find_canonical(eid) == eid} final_entities = [entity_by_id[eid] for eid in canonical_ids if eid in entity_by_id] # Re-apuntar relaciones a IDs canonicos final_relations_set: dict[tuple, dict] = {} for rel in all_relations: new_rel = dict(rel) if "source_id" in new_rel: new_rel["source_id"] = find_canonical(new_rel["source_id"]) if "target_id" in new_rel: new_rel["target_id"] = find_canonical(new_rel["target_id"]) # Deduplicar por (source_id, target_id, relation_type) rel_key = ( new_rel.get("source_id", ""), new_rel.get("target_id", ""), new_rel.get("relation_type", ""), ) if rel_key not in final_relations_set: final_relations_set[rel_key] = new_rel return { "entities": final_entities, "relations": list(final_relations_set.values()), "merge_log": merge_log, }