f851988d6f
Conversión de operations.db a triples RDF y formato sigma.js, más renderizado HTML standalone con dark theme y ForceAtlas2 layout. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
124 lines
3.9 KiB
Python
124 lines
3.9 KiB
Python
"""Convierte operations.db al formato JSON de sigma.js para visualizacion de grafos."""
|
|
|
|
import json
|
|
import sqlite3
|
|
|
|
|
|
COLOR_MAP = {
|
|
"person": "#e74c3c",
|
|
"organization": "#3498db",
|
|
"ip_address": "#2ecc71",
|
|
"domain": "#f39c12",
|
|
"crypto_wallet": "#f1c40f",
|
|
"trading_signal": "#9b59b6",
|
|
"vulnerability": "#e67e22",
|
|
"malware": "#c0392b",
|
|
"email": "#1abc9c",
|
|
}
|
|
|
|
DEFAULT_COLOR = "#95a5a6"
|
|
SIZE_MIN = 5.0
|
|
SIZE_MAX = 20.0
|
|
|
|
|
|
def _calculate_degree(entity_id: str, relations: list[dict]) -> int:
|
|
"""Cuenta cuantas relaciones involucran a esta entidad."""
|
|
return sum(
|
|
1 for r in relations
|
|
if r["from_entity"] == entity_id or r["to_entity"] == entity_id
|
|
)
|
|
|
|
|
|
def _calculate_size(entity_id: str, metadata: dict, relations: list[dict]) -> float:
|
|
"""Calcula el tamanio del nodo basado en degree y risk_score opcional."""
|
|
degree = _calculate_degree(entity_id, relations)
|
|
|
|
# Normalizar degree a rango [0, 1] asumiendo maximo razonable de 50
|
|
degree_norm = min(degree / 50.0, 1.0)
|
|
|
|
if "risk_score" in metadata:
|
|
try:
|
|
risk_norm = float(metadata["risk_score"]) / 100.0
|
|
risk_norm = max(0.0, min(risk_norm, 1.0))
|
|
score = (degree_norm + risk_norm) / 2.0
|
|
except (ValueError, TypeError):
|
|
score = degree_norm
|
|
else:
|
|
score = degree_norm
|
|
|
|
return SIZE_MIN + score * (SIZE_MAX - SIZE_MIN)
|
|
|
|
|
|
def ops_to_sigma_json(db_path: str) -> dict:
|
|
"""Convierte operations.db al formato JSON esperado por sigma.js.
|
|
|
|
Lee entities y relations de la base de datos de operaciones y construye
|
|
el dict con nodos y aristas en el formato de graphology/sigma.js.
|
|
El tamanio de cada nodo se calcula a partir de su degree en el grafo
|
|
y, si esta disponible, de su risk_score en metadata (media 50/50).
|
|
|
|
Args:
|
|
db_path: Ruta al archivo operations.db.
|
|
|
|
Returns:
|
|
Dict con claves 'nodes' y 'edges' compatible con sigma.js / graphology.
|
|
"""
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
cur = conn.cursor()
|
|
|
|
cur.execute(
|
|
"SELECT id, name, type_ref, status, domain, metadata FROM entities"
|
|
)
|
|
raw_entities = [dict(row) for row in cur.fetchall()]
|
|
|
|
cur.execute(
|
|
"SELECT id, name, from_entity, to_entity, weight FROM relations"
|
|
)
|
|
raw_relations = [dict(row) for row in cur.fetchall()]
|
|
finally:
|
|
conn.close()
|
|
|
|
nodes = []
|
|
for entity in raw_entities:
|
|
try:
|
|
meta = json.loads(entity["metadata"]) if entity["metadata"] else {}
|
|
except (json.JSONDecodeError, TypeError):
|
|
meta = {}
|
|
|
|
type_ref = entity["type_ref"] or "unknown"
|
|
color = COLOR_MAP.get(type_ref, DEFAULT_COLOR)
|
|
size = _calculate_size(entity["id"], meta, raw_relations)
|
|
|
|
attributes = {
|
|
"label": entity["name"] or entity["id"],
|
|
"entity_type": type_ref,
|
|
"color": color,
|
|
"size": size,
|
|
"domain": entity["domain"] or "",
|
|
"status": entity["status"] or "",
|
|
}
|
|
# Aplana metadata como atributos adicionales (sin sobrescribir campos reservados)
|
|
reserved = {"label", "entity_type", "color", "size", "domain", "status", "type", "x", "y", "hidden", "zIndex"}
|
|
for k, v in meta.items():
|
|
if k not in reserved:
|
|
attributes[k] = v
|
|
|
|
nodes.append({"key": entity["id"], "attributes": attributes})
|
|
|
|
edges = []
|
|
for rel in raw_relations:
|
|
edges.append({
|
|
"key": rel["id"],
|
|
"source": rel["from_entity"],
|
|
"target": rel["to_entity"],
|
|
"attributes": {
|
|
"label": rel["name"] or "",
|
|
"weight": rel["weight"] if rel["weight"] is not None else 1.0,
|
|
"type": "arrow",
|
|
},
|
|
})
|
|
|
|
return {"nodes": nodes, "edges": edges}
|