63a9cb5273
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift, diff_entities/relations, extract_entities/relations_llm, hotness_score, melt, merge_graphs, pivot, build_entity/relation_schema_prompt. Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order, hawkes_intensity + módulo finance.py. Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py. Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
"""merge_graphs — mergea multiples grafos de conocimiento deduplicando entities por similitud."""
|
|
|
|
import sys
|
|
import os
|
|
|
|
# Importar levenshtein_distance desde el registry
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "cybersecurity"))
|
|
try:
|
|
from cybersecurity import levenshtein_distance
|
|
except ImportError:
|
|
# Fallback: reimplementacion inline si el paquete no esta disponible
|
|
def levenshtein_distance(a: str, b: str) -> int:
|
|
"""Calcula la distancia de Levenshtein entre dos strings."""
|
|
if len(a) < len(b):
|
|
return levenshtein_distance(b, a)
|
|
if len(b) == 0:
|
|
return len(a)
|
|
prev_row = list(range(len(b) + 1))
|
|
for i, ca in enumerate(a):
|
|
curr_row = [i + 1]
|
|
for j, cb in enumerate(b):
|
|
cost = 0 if ca == cb else 1
|
|
curr_row.append(
|
|
min(curr_row[j] + 1, prev_row[j + 1] + 1, prev_row[j] + cost)
|
|
)
|
|
prev_row = curr_row
|
|
return prev_row[-1]
|
|
|
|
|
|
def _name_similarity(a: str, b: str) -> float:
|
|
"""Similitud de Levenshtein normalizada entre 0 y 1."""
|
|
if not a and not b:
|
|
return 1.0
|
|
max_len = max(len(a), len(b))
|
|
if max_len == 0:
|
|
return 1.0
|
|
dist = levenshtein_distance(a.lower(), b.lower())
|
|
return 1.0 - dist / max_len
|
|
|
|
|
|
def _count_non_null_fields(entity: dict) -> int:
|
|
"""Cuenta campos con valor no-None."""
|
|
return sum(1 for v in entity.values() if v is not None)
|
|
|
|
|
|
def _merge_two_entities(canonical: dict, other: dict) -> dict:
|
|
"""Combina dos entities: union de campos, ultimo gana en conflictos."""
|
|
merged = dict(canonical)
|
|
for k, v in other.items():
|
|
if k not in merged or merged[k] is None:
|
|
merged[k] = v
|
|
# Si ambos tienen valor, el canonical (primero) gana — no sobreescribir
|
|
return merged
|
|
|
|
|
|
def merge_graphs(
|
|
graphs: list[dict],
|
|
entity_key: str = "name",
|
|
similarity_threshold: float = 0.85,
|
|
) -> dict:
|
|
"""Mergea multiples grafos de conocimiento en uno, deduplicando entities por similitud.
|
|
|
|
Algoritmo:
|
|
1. Juntar todas las entities de todos los grafos (con ID de origen).
|
|
2. Para cada par con similitud de nombre >= threshold, mergear.
|
|
3. Elegir entity canonica (la que tiene mas campos no-null).
|
|
4. Re-apuntar relaciones al ID canonico.
|
|
5. Deduplicar relaciones identicas (mismo source, target, type).
|
|
6. Registrar cada merge en merge_log.
|
|
|
|
Args:
|
|
graphs: Lista de grafos. Cada grafo es un dict con keys:
|
|
"entities" (list[dict]) y "relations" (list[dict]).
|
|
Las entities deben tener "id" y el campo entity_key.
|
|
entity_key: Campo de texto usado para calcular similitud. Default "name".
|
|
similarity_threshold: Umbral de similitud Levenshtein normalizada [0,1].
|
|
Default 0.85.
|
|
|
|
Returns:
|
|
Dict con keys: entities, relations, merge_log.
|
|
"""
|
|
# Recopilar todas las entities y relaciones
|
|
all_entities: list[dict] = []
|
|
all_relations: list[dict] = []
|
|
|
|
for graph in graphs:
|
|
all_entities.extend(graph.get("entities", []))
|
|
all_relations.extend(graph.get("relations", []))
|
|
|
|
# Construir union-find para agrupar entities similares
|
|
# id_map: entity_id original -> entity_id canonico
|
|
id_map: dict[str, str] = {e["id"]: e["id"] for e in all_entities if "id" in e}
|
|
entity_by_id: dict[str, dict] = {e["id"]: e for e in all_entities if "id" in e}
|
|
|
|
merge_log: list[dict] = []
|
|
|
|
def find_canonical(eid: str) -> str:
|
|
while id_map.get(eid, eid) != eid:
|
|
eid = id_map[eid]
|
|
return eid
|
|
|
|
entity_ids = [e["id"] for e in all_entities if "id" in e]
|
|
|
|
# Comparar todos los pares (O(n^2) — aceptable para grafos de knowledge tipicos)
|
|
for i in range(len(entity_ids)):
|
|
for j in range(i + 1, len(entity_ids)):
|
|
id_i = find_canonical(entity_ids[i])
|
|
id_j = find_canonical(entity_ids[j])
|
|
|
|
if id_i == id_j:
|
|
continue # ya mergeados
|
|
|
|
e_i = entity_by_id.get(id_i)
|
|
e_j = entity_by_id.get(id_j)
|
|
|
|
if e_i is None or e_j is None:
|
|
continue
|
|
|
|
name_i = str(e_i.get(entity_key, ""))
|
|
name_j = str(e_j.get(entity_key, ""))
|
|
|
|
sim = _name_similarity(name_i, name_j)
|
|
if sim >= similarity_threshold:
|
|
# Elegir canonical: el que tiene mas campos no-null
|
|
if _count_non_null_fields(e_i) >= _count_non_null_fields(e_j):
|
|
canonical_id, other_id = id_i, id_j
|
|
else:
|
|
canonical_id, other_id = id_j, id_i
|
|
|
|
# Mergear datos
|
|
merged = _merge_two_entities(entity_by_id[canonical_id], entity_by_id[other_id])
|
|
entity_by_id[canonical_id] = merged
|
|
|
|
# Redirigir other_id -> canonical_id
|
|
id_map[other_id] = canonical_id
|
|
|
|
merge_log.append({
|
|
"merged": [other_id, canonical_id],
|
|
"into": canonical_id,
|
|
"similarity": round(sim, 4),
|
|
})
|
|
|
|
# Construir lista final de entities (solo canonicas)
|
|
canonical_ids = {eid for eid in entity_ids if find_canonical(eid) == eid}
|
|
final_entities = [entity_by_id[eid] for eid in canonical_ids if eid in entity_by_id]
|
|
|
|
# Re-apuntar relaciones a IDs canonicos
|
|
final_relations_set: dict[tuple, dict] = {}
|
|
for rel in all_relations:
|
|
new_rel = dict(rel)
|
|
if "source_id" in new_rel:
|
|
new_rel["source_id"] = find_canonical(new_rel["source_id"])
|
|
if "target_id" in new_rel:
|
|
new_rel["target_id"] = find_canonical(new_rel["target_id"])
|
|
|
|
# Deduplicar por (source_id, target_id, relation_type)
|
|
rel_key = (
|
|
new_rel.get("source_id", ""),
|
|
new_rel.get("target_id", ""),
|
|
new_rel.get("relation_type", ""),
|
|
)
|
|
if rel_key not in final_relations_set:
|
|
final_relations_set[rel_key] = new_rel
|
|
|
|
return {
|
|
"entities": final_entities,
|
|
"relations": list(final_relations_set.values()),
|
|
"merge_log": merge_log,
|
|
}
|