Files
ontology_graph/lib/merge_entity_attributes.py
fn-registry agent 40bea81603 chore: initial sync
2026-04-28 22:13:08 +02:00

79 lines
2.5 KiB
Python

"""Combina atributos de multiples candidatos de la misma entidad."""
from __future__ import annotations
_NUMERIC_FIELDS = {"risk_score", "balance", "cvss"}
_DATE_MIN_FIELDS = {"first_seen", "created_date"}
_DATE_MAX_FIELDS = {"last_seen", "expires_date"}
_BOOL_FIELDS = {"verified", "exploited"}
def merge_entity_attributes(attr_list: list[dict]) -> dict:
"""Combina atributos de multiples candidatos de la misma entidad.
Para cada campo presente en cualquier candidato recopila todos los valores
non-null y aplica heuristicas de resolucion por tipo de campo:
- Numerico (risk_score, balance, cvss): max
- Fecha min (first_seen, created_date): min (mas antigua)
- Fecha max (last_seen, expires_date): max (mas reciente)
- Lista (cualquier valor de tipo list): union sin duplicados
- Boolean (verified, exploited): OR logico
- String: el mas largo
Args:
attr_list: Lista de dicts con los atributos de cada candidato.
Returns:
Dict con los atributos fusionados.
"""
if not attr_list:
return {}
# Recopilar todas las claves presentes en cualquier candidato
all_keys: set[str] = set()
for attrs in attr_list:
all_keys.update(attrs.keys())
merged: dict = {}
for key in all_keys:
# Recopilar valores non-null
values = [attrs[key] for attrs in attr_list if key in attrs and attrs[key] is not None]
if not values:
merged[key] = None
continue
if len(values) == 1:
merged[key] = values[0]
continue
# Todos iguales
if all(v == values[0] for v in values):
merged[key] = values[0]
continue
# Resolver conflicto segun tipo de campo
if key in _NUMERIC_FIELDS:
merged[key] = max(values)
elif key in _DATE_MIN_FIELDS:
merged[key] = min(values)
elif key in _DATE_MAX_FIELDS:
merged[key] = max(values)
elif key in _BOOL_FIELDS:
merged[key] = any(values)
elif isinstance(values[0], list):
# Union de listas sin duplicados, preservando orden de aparicion
seen: list = []
for lst in values:
for item in lst:
if item not in seen:
seen.append(item)
merged[key] = seen
else:
# String u otro: usar el mas largo
str_values = [str(v) for v in values]
merged[key] = max(str_values, key=len)
return merged