"""Convierte operations.db al formato JSON de sigma.js para visualizacion de grafos.""" import json import sqlite3 COLOR_MAP = { "person": "#e74c3c", "organization": "#3498db", "ip_address": "#2ecc71", "domain": "#f39c12", "crypto_wallet": "#f1c40f", "trading_signal": "#9b59b6", "vulnerability": "#e67e22", "malware": "#c0392b", "email": "#1abc9c", } DEFAULT_COLOR = "#95a5a6" SIZE_MIN = 5.0 SIZE_MAX = 20.0 def _calculate_degree(entity_id: str, relations: list[dict]) -> int: """Cuenta cuantas relaciones involucran a esta entidad.""" return sum( 1 for r in relations if r["from_entity"] == entity_id or r["to_entity"] == entity_id ) def _calculate_size(entity_id: str, metadata: dict, relations: list[dict]) -> float: """Calcula el tamanio del nodo basado en degree y risk_score opcional.""" degree = _calculate_degree(entity_id, relations) # Normalizar degree a rango [0, 1] asumiendo maximo razonable de 50 degree_norm = min(degree / 50.0, 1.0) if "risk_score" in metadata: try: risk_norm = float(metadata["risk_score"]) / 100.0 risk_norm = max(0.0, min(risk_norm, 1.0)) score = (degree_norm + risk_norm) / 2.0 except (ValueError, TypeError): score = degree_norm else: score = degree_norm return SIZE_MIN + score * (SIZE_MAX - SIZE_MIN) def ops_to_sigma_json(db_path: str) -> dict: """Convierte operations.db al formato JSON esperado por sigma.js. Lee entities y relations de la base de datos de operaciones y construye el dict con nodos y aristas en el formato de graphology/sigma.js. El tamanio de cada nodo se calcula a partir de su degree en el grafo y, si esta disponible, de su risk_score en metadata (media 50/50). Args: db_path: Ruta al archivo operations.db. Returns: Dict con claves 'nodes' y 'edges' compatible con sigma.js / graphology. """ conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() cur.execute( "SELECT id, name, type_ref, status, domain, metadata FROM entities" ) raw_entities = [dict(row) for row in cur.fetchall()] cur.execute( "SELECT id, name, from_entity, to_entity, weight FROM relations" ) raw_relations = [dict(row) for row in cur.fetchall()] finally: conn.close() nodes = [] for entity in raw_entities: try: meta = json.loads(entity["metadata"]) if entity["metadata"] else {} except (json.JSONDecodeError, TypeError): meta = {} type_ref = entity["type_ref"] or "unknown" color = COLOR_MAP.get(type_ref, DEFAULT_COLOR) size = _calculate_size(entity["id"], meta, raw_relations) attributes = { "label": entity["name"] or entity["id"], "entity_type": type_ref, "color": color, "size": size, "domain": entity["domain"] or "", "status": entity["status"] or "", } # Aplana metadata como atributos adicionales (sin sobrescribir campos reservados) reserved = {"label", "entity_type", "color", "size", "domain", "status", "type", "x", "y", "hidden", "zIndex"} for k, v in meta.items(): if k not in reserved: attributes[k] = v nodes.append({"key": entity["id"], "attributes": attributes}) edges = [] for rel in raw_relations: edges.append({ "key": rel["id"], "source": rel["from_entity"], "target": rel["to_entity"], "attributes": { "label": rel["name"] or "", "weight": rel["weight"] if rel["weight"] is not None else 1.0, "type": "arrow", }, }) return {"nodes": nodes, "edges": edges}