Files
fn_registry/python/functions/datascience/deduplicate_entities.py
T
egutierrez 63a9cb5273 feat: funciones Python datascience, finance, cybersecurity y pipelines
Datascience: aggregate_by_group, deduplicate_entities/relations, detect_drift,
diff_entities/relations, extract_entities/relations_llm, hotness_score, melt,
merge_graphs, pivot, build_entity/relation_schema_prompt.
Finance: avellaneda_stoikov_quotes, generate_gbm_prices, generate_taker_order,
hawkes_intensity + módulo finance.py.
Cybersecurity: envelope_encrypt/decrypt + módulo cybersecurity.py.
Pipelines: extraction_pipeline, monte_carlo_market, run_market_sim.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 17:11:32 +02:00

284 lines
9.7 KiB
Python

"""Deduplica entidades candidatas usando fuzzy matching de nombres."""
from __future__ import annotations
import sys
import os
import uuid
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", ".."))
from python.types.datascience.entity_candidate import EntityCandidate
from python.types.datascience.deduplication_result import DeduplicationResult
from python.functions.core.normalize_entity_name import normalize_entity_name
from python.functions.core.merge_entity_attributes import merge_entity_attributes
# ── Similitud helpers ──────────────────────────────────────────────────────────
def _levenshtein(a: str, b: str) -> int:
"""Distancia de edicion Levenshtein entre dos strings."""
if a == b:
return 0
if not a:
return len(b)
if not b:
return len(a)
prev = list(range(len(b) + 1))
for i, ca in enumerate(a, 1):
curr = [i]
for j, cb in enumerate(b, 1):
cost = 0 if ca == cb else 1
curr.append(min(prev[j] + 1, curr[j - 1] + 1, prev[j - 1] + cost))
prev = curr
return prev[-1]
def _jaccard(tokens_a: list[str], tokens_b: list[str]) -> float:
"""Similitud de Jaccard entre dos conjuntos de tokens."""
set_a = set(tokens_a)
set_b = set(tokens_b)
if not set_a and not set_b:
return 1.0
inter = len(set_a & set_b)
union = len(set_a | set_b)
return inter / union if union else 0.0
def _name_similarity(a: str, b: str) -> float:
"""Score de similitud entre dos nombres normalizados.
Combina similitud de Levenshtein y Jaccard sobre tokens.
Aplica bonus de contencion (+0.3) y deteccion de acronimos.
"""
if a == b:
return 1.0
# Similitud Levenshtein
max_len = max(len(a), len(b))
lev_sim = 1.0 - (_levenshtein(a, b) / max_len) if max_len else 1.0
# Similitud Jaccard sobre tokens
tokens_a = a.split()
tokens_b = b.split()
jac_sim = _jaccard(tokens_a, tokens_b)
score = max(lev_sim, jac_sim)
# Bonus de contencion: un nombre contiene al otro
if a in b or b in a:
score = min(1.0, score + 0.3)
# Deteccion de acronimo: "FBI" ~ "Federal Bureau of Investigation"
if _is_acronym_of(a, tokens_b) or _is_acronym_of(b, tokens_a):
score = min(1.0, score + 0.3)
return score
def _is_acronym_of(candidate: str, tokens: list[str]) -> bool:
"""Comprueba si candidate es un acronimo formado por las iniciales de tokens."""
if not candidate or not tokens:
return False
initials = "".join(t[0] for t in tokens if t).upper()
return candidate.upper() == initials
_EXACT_TYPES = {"ip", "email", "domain", "crypto_wallet", "phone"}
def _is_exact_type(entity_type: str) -> bool:
"""Tipos tecnicos donde solo se acepta matching exacto."""
return entity_type.lower() in _EXACT_TYPES
# ── Union-Find ─────────────────────────────────────────────────────────────────
class _UnionFind:
def __init__(self, n: int) -> None:
self._parent = list(range(n))
self._rank = [0] * n
def find(self, x: int) -> int:
while self._parent[x] != x:
self._parent[x] = self._parent[self._parent[x]]
x = self._parent[x]
return x
def union(self, x: int, y: int) -> None:
rx, ry = self.find(x), self.find(y)
if rx == ry:
return
if self._rank[rx] < self._rank[ry]:
rx, ry = ry, rx
self._parent[ry] = rx
if self._rank[rx] == self._rank[ry]:
self._rank[rx] += 1
# ── Implementacion principal ────────────────────────────────────────────────────
def deduplicate_entities(
candidates: list[EntityCandidate],
name_threshold: float = 0.85,
same_type_only: bool = True,
) -> DeduplicationResult:
"""Agrupa entidades candidatas que refieren a la misma entidad real.
Usa fuzzy matching de nombres (Levenshtein + Jaccard) y Union-Find para
detectar clusters transitivos. Por cada cluster genera una entidad canonica
mergeando atributos de todos sus miembros.
Para tipos tecnicos (ip, email, domain, crypto_wallet, phone) solo se
acepta matching exacto normalizado, ignorando el umbral de nombre.
Args:
candidates: lista de EntityCandidate a deduplicar.
name_threshold: score minimo para considerar dos nombres iguales (0-1).
same_type_only: si True, solo compara entidades del mismo type_ref.
Returns:
DeduplicationResult con entidades deduplicadas, mapas de resolucion
e historial de merges.
"""
if not candidates:
return DeduplicationResult(
entities=[],
entity_id_map={},
name_to_id={},
merge_log=[],
total_before=0,
total_after=0,
)
n = len(candidates)
# Paso 1: normalizar nombres
normalized: list[str] = []
for c in candidates:
norm = normalize_entity_name(c.name, c.type_ref)
normalized.append(norm)
# Paso 2: Union-Find sobre todos los indices
uf = _UnionFind(n)
# Paso 3: comparacion pairwise (con agrupacion por tipo si same_type_only)
merge_pairs: list[tuple[int, int, float]] = []
for i in range(n):
for j in range(i + 1, n):
if same_type_only and candidates[i].type_ref != candidates[j].type_ref:
continue
ni, nj = normalized[i], normalized[j]
et = candidates[i].type_ref.lower()
if _is_exact_type(et):
if ni == nj:
uf.union(i, j)
merge_pairs.append((i, j, 1.0))
continue
score = _name_similarity(ni, nj)
if score >= name_threshold:
uf.union(i, j)
merge_pairs.append((i, j, score))
# Paso 4: agrupar indices por raiz del Union-Find
clusters: dict[int, list[int]] = {}
for i in range(n):
root = uf.find(i)
clusters.setdefault(root, []).append(i)
# Paso 5: merge por cluster
merged_entities: list[EntityCandidate] = []
entity_id_map: dict[str, str] = {}
name_to_id: dict[str, str] = {}
merge_log: list[dict] = []
# Pares mergeados para construir el log
merged_pairs_by_root: dict[int, list[tuple[int, int, float]]] = {}
for i, j, score in merge_pairs:
root = uf.find(i)
merged_pairs_by_root.setdefault(root, []).append((i, j, score))
for root, indices in clusters.items():
cluster_candidates = [candidates[idx] for idx in indices]
if len(cluster_candidates) == 1:
c = cluster_candidates[0]
canonical_name = c.name
canonical_norm = normalized[indices[0]]
merged_attrs = c.attributes
merged_confidence = c.confidence
merged_chunks = list(c.source_chunk_indices)
merged_from = list(c.merged_from) if c.merged_from else [c.name]
else:
# Candidato con mayor confidence es el canonico
best = max(cluster_candidates, key=lambda c: c.confidence)
canonical_name = best.name
canonical_norm = normalize_entity_name(best.name, best.type_ref)
merged_attrs = merge_entity_attributes(
[c.attributes for c in cluster_candidates]
)
merged_confidence = max(c.confidence for c in cluster_candidates)
merged_chunks: list[int] = []
seen_chunks: set[int] = set()
for c in cluster_candidates:
for idx in c.source_chunk_indices:
if idx not in seen_chunks:
merged_chunks.append(idx)
seen_chunks.add(idx)
merged_from: list[str] = []
seen_names: set[str] = set()
for c in cluster_candidates:
names_to_add = c.merged_from if c.merged_from else [c.name]
for nm in names_to_add:
if nm not in seen_names:
merged_from.append(nm)
seen_names.add(nm)
# Log de merge
other_names = [c.name for c in cluster_candidates if c is not best]
pairs = merged_pairs_by_root.get(root, [])
max_score = max((s for _, _, s in pairs), default=1.0)
merge_log.append(
{
"canonical": canonical_name,
"merged": other_names,
"score": round(max_score, 4),
"reason": "fuzzy_name",
}
)
ent_id = str(uuid.uuid4())
entity = EntityCandidate(
name=canonical_name,
name_normalized=canonical_norm,
type_ref=cluster_candidates[0].type_ref,
type_label=cluster_candidates[0].type_label,
attributes=merged_attrs,
confidence=merged_confidence,
source_chunk_indices=merged_chunks,
merged_from=merged_from,
)
merged_entities.append(entity)
# Poblar mapas de resolucion
entity_id_map[canonical_norm] = ent_id
for orig_name in merged_from:
name_to_id[orig_name] = ent_id
name_to_id[canonical_norm] = ent_id
return DeduplicationResult(
entities=merged_entities,
entity_id_map=entity_id_map,
name_to_id=name_to_id,
merge_log=merge_log,
total_before=n,
total_after=len(merged_entities),
)