Files
fn_registry/python/functions/datascience/merge_graphs.py
T
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

170 lines
6.1 KiB
Python

"""merge_graphs — mergea multiples grafos de conocimiento deduplicando entities por similitud."""
import sys
import os
# Importar levenshtein_distance desde el registry
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "cybersecurity"))
try:
from cybersecurity import levenshtein_distance
except ImportError:
# Fallback: reimplementacion inline si el paquete no esta disponible
def levenshtein_distance(a: str, b: str) -> int:
"""Calcula la distancia de Levenshtein entre dos strings."""
if len(a) < len(b):
return levenshtein_distance(b, a)
if len(b) == 0:
return len(a)
prev_row = list(range(len(b) + 1))
for i, ca in enumerate(a):
curr_row = [i + 1]
for j, cb in enumerate(b):
cost = 0 if ca == cb else 1
curr_row.append(
min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost)
)
prev_row = curr_row
return prev_row[-1]
def _name_similarity(a: str, b: str) -> float:
"""Similitud de Levenshtein normalizada entre 0 y 1."""
if not a and not b:
return 1.0
max_len = max(len(a), len(b))
if max_len == 0:
return 1.0
dist = levenshtein_distance(a.lower(), b.lower())
return 1.0 - dist / max_len
def _count_non_null_fields(entity: dict) -> int:
"""Cuenta campos con valor no-None."""
return sum(1 for v in entity.values() if v is not None)
def _merge_two_entities(canonical: dict, other: dict) -> dict:
"""Combina dos entities: union de campos, ultimo gana en conflictos."""
merged = dict(canonical)
for k, v in other.items():
if k not in merged or merged[k] is None:
merged[k] = v
# Si ambos tienen valor, el canonical (primero) gana — no sobreescribir
return merged
def merge_graphs(
graphs: list[dict],
entity_key: str = "name",
similarity_threshold: float = 0.85,
) -> dict:
"""Mergea multiples grafos de conocimiento en uno, deduplicando entities por similitud.
Algoritmo:
1. Juntar todas las entities de todos los grafos (con ID de origen).
2. Para cada par con similitud de nombre >= threshold, mergear.
3. Elegir entity canonica (la que tiene mas campos no-null).
4. Re-apuntar relaciones al ID canonico.
5. Deduplicar relaciones identicas (mismo source, target, type).
6. Registrar cada merge en merge_log.
Args:
graphs: Lista de grafos. Cada grafo es un dict con keys:
"entities" (list[dict]) y "relations" (list[dict]).
Las entities deben tener "id" y el campo entity_key.
entity_key: Campo de texto usado para calcular similitud. Default "name".
similarity_threshold: Umbral de similitud Levenshtein normalizada [0,1].
Default 0.85.
Returns:
Dict con keys: entities, relations, merge_log.
"""
# Recopilar todas las entities y relaciones
all_entities: list[dict] = []
all_relations: list[dict] = []
for graph in graphs:
all_entities.extend(graph.get("entities", []))
all_relations.extend(graph.get("relations", []))
# Construir union-find para agrupar entities similares
# id_map: entity_id original -> entity_id canonico
id_map: dict[str, str] = {e["id"]: e["id"] for e in all_entities if "id" in e}
entity_by_id: dict[str, dict] = {e["id"]: e for e in all_entities if "id" in e}
merge_log: list[dict] = []
def find_canonical(eid: str) -> str:
while id_map.get(eid, eid) != eid:
eid = id_map[eid]
return eid
entity_ids = [e["id"] for e in all_entities if "id" in e]
# Comparar todos los pares (O(n^2) — aceptable para grafos de knowledge tipicos)
for i in range(len(entity_ids)):
for j in range(i + 1, len(entity_ids)):
id_i = find_canonical(entity_ids[i])
id_j = find_canonical(entity_ids[j])
if id_i == id_j:
continue # ya mergeados
e_i = entity_by_id.get(id_i)
e_j = entity_by_id.get(id_j)
if e_i is None or e_j is None:
continue
name_i = str(e_i.get(entity_key, ""))
name_j = str(e_j.get(entity_key, ""))
sim = _name_similarity(name_i, name_j)
if sim >= similarity_threshold:
# Elegir canonical: el que tiene mas campos no-null
if _count_non_null_fields(e_i) >= _count_non_null_fields(e_j):
canonical_id, other_id = id_i, id_j
else:
canonical_id, other_id = id_j, id_i
# Mergear datos
merged = _merge_two_entities(entity_by_id[canonical_id], entity_by_id[other_id])
entity_by_id[canonical_id] = merged
# Redirigir other_id -> canonical_id
id_map[other_id] = canonical_id
merge_log.append({
"merged": [other_id, canonical_id],
"into": canonical_id,
"similarity": round(sim, 4),
})
# Construir lista final de entities (solo canonicas)
canonical_ids = {eid for eid in entity_ids if find_canonical(eid) == eid}
final_entities = [entity_by_id[eid] for eid in canonical_ids if eid in entity_by_id]
# Re-apuntar relaciones a IDs canonicos
final_relations_set: dict[tuple, dict] = {}
for rel in all_relations:
new_rel = dict(rel)
if "source_id" in new_rel:
new_rel["source_id"] = find_canonical(new_rel["source_id"])
if "target_id" in new_rel:
new_rel["target_id"] = find_canonical(new_rel["target_id"])
# Deduplicar por (source_id, target_id, relation_type)
rel_key = (
new_rel.get("source_id", ""),
new_rel.get("target_id", ""),
new_rel.get("relation_type", ""),
)
if rel_key not in final_relations_set:
final_relations_set[rel_key] = new_rel
return {
"entities": final_entities,
"relations": list(final_relations_set.values()),
"merge_log": merge_log,
}