Files
egutierrez 99672a4745 feat: funciones datascience — ops_to_rdf_triples, ops_to_sigma_json, render_sigma_html
Conversión de operations.db a triples RDF y formato sigma.js, más
renderizado HTML standalone con dark theme y ForceAtlas2 layout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 22:03:51 +02:00

124 lines
3.9 KiB
Python

"""Convierte operations.db al formato JSON de sigma.js para visualizacion de grafos."""
import json
import sqlite3
COLOR_MAP = {
"person": "#e74c3c",
"organization": "#3498db",
"ip_address": "#2ecc71",
"domain": "#f39c12",
"crypto_wallet": "#f1c40f",
"trading_signal": "#9b59b6",
"vulnerability": "#e67e22",
"malware": "#c0392b",
"email": "#1abc9c",
}
DEFAULT_COLOR = "#95a5a6"
SIZE_MIN = 5.0
SIZE_MAX = 20.0
def _calculate_degree(entity_id: str, relations: list[dict]) -> int:
"""Cuenta cuantas relaciones involucran a esta entidad."""
return sum(
1 for r in relations
if r["from_entity"] == entity_id or r["to_entity"] == entity_id
)
def _calculate_size(entity_id: str, metadata: dict, relations: list[dict]) -> float:
"""Calcula el tamanio del nodo basado en degree y risk_score opcional."""
degree = _calculate_degree(entity_id, relations)
# Normalizar degree a rango [0, 1] asumiendo maximo razonable de 50
degree_norm = min(degree / 50.0, 1.0)
if "risk_score" in metadata:
try:
risk_norm = float(metadata["risk_score"]) / 100.0
risk_norm = max(0.0, min(risk_norm, 1.0))
score = (degree_norm + risk_norm) / 2.0
except (ValueError, TypeError):
score = degree_norm
else:
score = degree_norm
return SIZE_MIN + score * (SIZE_MAX - SIZE_MIN)
def ops_to_sigma_json(db_path: str) -> dict:
"""Convierte operations.db al formato JSON esperado por sigma.js.
Lee entities y relations de la base de datos de operaciones y construye
el dict con nodos y aristas en el formato de graphology/sigma.js.
El tamanio de cada nodo se calcula a partir de su degree en el grafo
y, si esta disponible, de su risk_score en metadata (media 50/50).
Args:
db_path: Ruta al archivo operations.db.
Returns:
Dict con claves 'nodes' y 'edges' compatible con sigma.js / graphology.
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT id, name, type_ref, status, domain, metadata FROM entities"
)
raw_entities = [dict(row) for row in cur.fetchall()]
cur.execute(
"SELECT id, name, from_entity, to_entity, weight FROM relations"
)
raw_relations = [dict(row) for row in cur.fetchall()]
finally:
conn.close()
nodes = []
for entity in raw_entities:
try:
meta = json.loads(entity["metadata"]) if entity["metadata"] else {}
except (json.JSONDecodeError, TypeError):
meta = {}
type_ref = entity["type_ref"] or "unknown"
color = COLOR_MAP.get(type_ref, DEFAULT_COLOR)
size = _calculate_size(entity["id"], meta, raw_relations)
attributes = {
"label": entity["name"] or entity["id"],
"entity_type": type_ref,
"color": color,
"size": size,
"domain": entity["domain"] or "",
"status": entity["status"] or "",
}
# Aplana metadata como atributos adicionales (sin sobrescribir campos reservados)
reserved = {"label", "entity_type", "color", "size", "domain", "status", "type", "x", "y", "hidden", "zIndex"}
for k, v in meta.items():
if k not in reserved:
attributes[k] = v
nodes.append({"key": entity["id"], "attributes": attributes})
edges = []
for rel in raw_relations:
edges.append({
"key": rel["id"],
"source": rel["from_entity"],
"target": rel["to_entity"],
"attributes": {
"label": rel["name"] or "",
"weight": rel["weight"] if rel["weight"] is not None else 1.0,
"type": "arrow",
},
})
return {"nodes": nodes, "edges": edges}