feat: funciones datascience — ops_to_rdf_triples, ops_to_sigma_json, render_sigma_html

Conversión de operations.db a triples RDF y formato sigma.js, más
renderizado HTML standalone con dark theme y ForceAtlas2 layout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-02 22:03:51 +02:00
parent fc6681a31a
commit 99672a4745
6 changed files with 595 additions and 0 deletions
@@ -0,0 +1,55 @@
---
name: ops_to_rdf_triples
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def ops_to_rdf_triples(db_path: str, namespace: str = 'http://osint.local/') -> list[tuple[str, str, str]]"
description: "Convierte entities y relations de operations.db a triples RDF (subject, predicate, object). Prefija IDs con namespace para formar URIs. Solo stdlib."
tags: [rdf, graph, osint, knowledge-graph, triples, operations, semantic-web]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [json, sqlite3]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/ops_to_rdf_triples.py"
---
## Ejemplo
```python
from datascience.ops_to_rdf_triples import ops_to_rdf_triples
triples = ops_to_rdf_triples("apps/my_analysis/operations.db")
for s, p, o in triples[:5]:
print(f"{s} -- {p} --> {o}")
# Con namespace personalizado
triples_ns = ops_to_rdf_triples(
"apps/my_analysis/operations.db",
namespace="http://mi-empresa.com/osint/"
)
```
## Notas
Funcion pura — solo abre la DB en lectura, no escribe nada.
Triples generados por entidad:
- `(ns+id, rdf:type, type_ref)` — si type_ref no es None
- `(ns+id, name, literal)` — si name no es None
- `(ns+id, status, literal)` — si status no es None
- `(ns+id, domain, literal)` — si domain no es None
- `(ns+id, key, str(value))` — por cada clave en el JSON de metadata
Triples generados por relacion:
- `(ns+from_entity, relation_name, ns+to_entity)`
Los subjects de relaciones tipo URI reciben el prefijo de namespace. Los predicados literales (name, status, etc.) no llevan prefijo. Esta separacion sigue la convencion RDF de distinguir recursos de literales sin introducir dependencias externas (rdflib u otras).
Para exportar a Turtle (.ttl) o N-Triples, el notebook puede iterar la lista y formatear segun necesite.
@@ -0,0 +1,82 @@
"""Convierte operations.db a triples RDF (subject, predicate, object)."""
import json
import sqlite3
def ops_to_rdf_triples(
db_path: str,
namespace: str = "http://osint.local/",
) -> list[tuple[str, str, str]]:
"""Convierte entities y relations de operations.db a triples RDF.
Genera triples para:
- Tipo de entidad: (entity_uri, rdf:type, type_ref)
- Nombre: (entity_uri, name, literal)
- Status y domain: (entity_uri, status|domain, literal)
- Cada clave de metadata: (entity_uri, key, str(value))
- Relaciones: (from_entity_uri, relation_name, to_entity_uri)
Los IDs de entidades se prefijarlos con el namespace para formar URIs.
Args:
db_path: Ruta al archivo operations.db.
namespace: Prefijo de namespace para construir URIs. Default: "http://osint.local/".
Returns:
Lista de tuplas (subject, predicate, object) representando los triples RDF.
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT id, name, type_ref, status, domain, metadata FROM entities"
)
raw_entities = [dict(row) for row in cur.fetchall()]
cur.execute(
"SELECT id, name, from_entity, to_entity FROM relations"
)
raw_relations = [dict(row) for row in cur.fetchall()]
finally:
conn.close()
ns = namespace.rstrip("/") + "/"
triples: list[tuple[str, str, str]] = []
for entity in raw_entities:
subject = ns + entity["id"]
# rdf:type
if entity["type_ref"]:
triples.append((subject, "rdf:type", entity["type_ref"]))
# name
if entity["name"]:
triples.append((subject, "name", entity["name"]))
# status
if entity["status"]:
triples.append((subject, "status", entity["status"]))
# domain
if entity["domain"]:
triples.append((subject, "domain", entity["domain"]))
# metadata keys
try:
meta = json.loads(entity["metadata"]) if entity["metadata"] else {}
except (json.JSONDecodeError, TypeError):
meta = {}
for key, value in meta.items():
triples.append((subject, key, str(value)))
for rel in raw_relations:
from_uri = ns + rel["from_entity"]
to_uri = ns + rel["to_entity"]
predicate = rel["name"] or rel["id"]
triples.append((from_uri, predicate, to_uri))
return triples
@@ -0,0 +1,44 @@
---
name: ops_to_sigma_json
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: pure
signature: "def ops_to_sigma_json(db_path: str) -> dict"
description: "Convierte operations.db al formato JSON de sigma.js/graphology. Lee entities y relations, asigna colores por tipo y calcula tamanio de nodo combinando degree y risk_score."
tags: [graph, sigma, osint, visualization, operations, network]
uses_functions: []
uses_types: []
returns: []
returns_optional: false
error_type: ""
imports: [json, sqlite3]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/ops_to_sigma_json.py"
---
## Ejemplo
```python
from datascience.ops_to_sigma_json import ops_to_sigma_json
graph = ops_to_sigma_json("apps/my_analysis/operations.db")
print(len(graph["nodes"])) # numero de entidades
print(len(graph["edges"])) # numero de relaciones
```
## Notas
Funcion pura en el sentido de que no escribe ningun archivo ni tiene efectos secundarios observables mas alla de abrir y cerrar la conexion SQLite en modo lectura.
COLOR_MAP cubre los tipos de entidad OSINT mas comunes. Tipos desconocidos reciben `#95a5a6` (gris).
El tamanio del nodo (`size`) se calcula en el rango [5, 20]:
- Si la entidad tiene `risk_score` en metadata: `(degree_norm + risk_norm) / 2`
- Si no: `degree_norm` puro
- degree_norm = `min(degree / 50, 1.0)`
La metadata se aplana como atributos adicionales del nodo, sin sobrescribir campos reservados (`label`, `type`, `color`, `size`, `domain`, `status`).
@@ -0,0 +1,123 @@
"""Convierte operations.db al formato JSON de sigma.js para visualizacion de grafos."""
import json
import sqlite3
COLOR_MAP = {
"person": "#e74c3c",
"organization": "#3498db",
"ip_address": "#2ecc71",
"domain": "#f39c12",
"crypto_wallet": "#f1c40f",
"trading_signal": "#9b59b6",
"vulnerability": "#e67e22",
"malware": "#c0392b",
"email": "#1abc9c",
}
DEFAULT_COLOR = "#95a5a6"
SIZE_MIN = 5.0
SIZE_MAX = 20.0
def _calculate_degree(entity_id: str, relations: list[dict]) -> int:
"""Cuenta cuantas relaciones involucran a esta entidad."""
return sum(
1 for r in relations
if r["from_entity"] == entity_id or r["to_entity"] == entity_id
)
def _calculate_size(entity_id: str, metadata: dict, relations: list[dict]) -> float:
"""Calcula el tamanio del nodo basado en degree y risk_score opcional."""
degree = _calculate_degree(entity_id, relations)
# Normalizar degree a rango [0, 1] asumiendo maximo razonable de 50
degree_norm = min(degree / 50.0, 1.0)
if "risk_score" in metadata:
try:
risk_norm = float(metadata["risk_score"]) / 100.0
risk_norm = max(0.0, min(risk_norm, 1.0))
score = (degree_norm + risk_norm) / 2.0
except (ValueError, TypeError):
score = degree_norm
else:
score = degree_norm
return SIZE_MIN + score * (SIZE_MAX - SIZE_MIN)
def ops_to_sigma_json(db_path: str) -> dict:
"""Convierte operations.db al formato JSON esperado por sigma.js.
Lee entities y relations de la base de datos de operaciones y construye
el dict con nodos y aristas en el formato de graphology/sigma.js.
El tamanio de cada nodo se calcula a partir de su degree en el grafo
y, si esta disponible, de su risk_score en metadata (media 50/50).
Args:
db_path: Ruta al archivo operations.db.
Returns:
Dict con claves 'nodes' y 'edges' compatible con sigma.js / graphology.
"""
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
cur = conn.cursor()
cur.execute(
"SELECT id, name, type_ref, status, domain, metadata FROM entities"
)
raw_entities = [dict(row) for row in cur.fetchall()]
cur.execute(
"SELECT id, name, from_entity, to_entity, weight FROM relations"
)
raw_relations = [dict(row) for row in cur.fetchall()]
finally:
conn.close()
nodes = []
for entity in raw_entities:
try:
meta = json.loads(entity["metadata"]) if entity["metadata"] else {}
except (json.JSONDecodeError, TypeError):
meta = {}
type_ref = entity["type_ref"] or "unknown"
color = COLOR_MAP.get(type_ref, DEFAULT_COLOR)
size = _calculate_size(entity["id"], meta, raw_relations)
attributes = {
"label": entity["name"] or entity["id"],
"entity_type": type_ref,
"color": color,
"size": size,
"domain": entity["domain"] or "",
"status": entity["status"] or "",
}
# Aplana metadata como atributos adicionales (sin sobrescribir campos reservados)
reserved = {"label", "entity_type", "color", "size", "domain", "status", "type", "x", "y", "hidden", "zIndex"}
for k, v in meta.items():
if k not in reserved:
attributes[k] = v
nodes.append({"key": entity["id"], "attributes": attributes})
edges = []
for rel in raw_relations:
edges.append({
"key": rel["id"],
"source": rel["from_entity"],
"target": rel["to_entity"],
"attributes": {
"label": rel["name"] or "",
"weight": rel["weight"] if rel["weight"] is not None else 1.0,
"type": "arrow",
},
})
return {"nodes": nodes, "edges": edges}
@@ -0,0 +1,57 @@
---
name: render_sigma_html
kind: function
lang: py
domain: datascience
version: "1.0.0"
purity: impure
signature: "def render_sigma_html(graph_data: dict, output_path: str, title: str = 'OSINT Graph') -> str"
description: "Genera un archivo HTML standalone con sigma.js v2.4 que visualiza un grafo OSINT. Aplica ForceAtlas2, dark theme, filtros por tipo de nodo y tooltip con metadata. Retorna el path absoluto del archivo escrito."
tags: [graph, sigma, osint, visualization, html, forceatlas2, network, dark-theme]
uses_functions: [ops_to_sigma_json_py_datascience]
uses_types: []
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [json, os]
tested: false
tests: []
test_file_path: ""
file_path: "python/functions/datascience/render_sigma_html.py"
---
## Ejemplo
```python
from datascience.ops_to_sigma_json import ops_to_sigma_json
from datascience.render_sigma_html import render_sigma_html
graph = ops_to_sigma_json("apps/osint_analysis/operations.db")
path = render_sigma_html(graph, "/tmp/osint_graph.html", title="Red OSINT")
print(f"Abre en el browser: {path}")
```
## Notas
Funcion impura — escribe un archivo en disco.
El HTML es completamente standalone: no necesita servidor web ni backend. Todos los assets se cargan desde jsDelivr CDN:
- graphology 0.25.4
- graphology-layout-forceatlas2 0.10.1
- sigma 2.4.0
El JSON del grafo se embebe directamente en el `<script>` para evitar peticiones CORS al abrir desde `file://`.
ForceAtlas2 se ejecuta sincrono con 500 iteraciones antes de instanciar el renderer. Para grafos grandes (>500 nodos) considerar reducir iteraciones o usar `barnesHutOptimize: true` (que se activa automaticamente para grafos >300 nodos).
Panel lateral (fijo, top-right):
- Titulo del grafo
- Contador de nodos y aristas
- Checkboxes de filtro por tipo (con color dot)
Tooltip (hover sobre nodo):
- Label y tipo del nodo
- status, domain
- Todos los campos adicionales de metadata aplanados
Los caracteres especiales en labels y metadata se escapan correctamente para evitar XSS al abrir el HTML.
@@ -0,0 +1,234 @@
"""Renderiza un grafo sigma.js como HTML standalone con dark theme y layout ForceAtlas2."""
import json
import os
_HTML_TEMPLATE = """\
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{title}</title>
<script src="https://cdn.jsdelivr.net/npm/graphology@0.25.4/dist/graphology.umd.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/graphology-library@0.8.0/dist/graphology-library.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sigma@2.4.0/build/sigma.min.js"></script>
<style>
* {{ box-sizing: border-box; margin: 0; padding: 0; }}
body {{ background: #1a1a2e; color: #eee; font-family: 'Segoe UI', system-ui, sans-serif; overflow: hidden; }}
#container {{ width: 100vw; height: 100vh; }}
#panel {{
position: absolute; top: 12px; right: 12px;
background: rgba(10, 10, 30, 0.88);
border: 1px solid rgba(255,255,255,0.12);
padding: 16px; border-radius: 10px;
z-index: 10; min-width: 200px; max-width: 260px;
backdrop-filter: blur(6px);
}}
#panel h3 {{ font-size: 14px; font-weight: 600; margin-bottom: 12px; color: #a0c4ff; letter-spacing: 0.5px; }}
#stats {{ font-size: 11px; color: #888; margin-bottom: 12px; }}
#filters {{ display: flex; flex-direction: column; gap: 6px; }}
.filter-item {{ display: flex; align-items: center; gap: 8px; font-size: 12px; cursor: pointer; }}
.filter-item input {{ cursor: pointer; accent-color: #a0c4ff; }}
.color-dot {{ width: 10px; height: 10px; border-radius: 50%; flex-shrink: 0; }}
#tooltip {{
position: absolute; display: none;
background: rgba(5, 5, 20, 0.95);
border: 1px solid rgba(255,255,255,0.15);
padding: 10px 14px; border-radius: 8px;
pointer-events: none; z-index: 20;
max-width: 300px; font-size: 12px; line-height: 1.6;
}}
#tooltip .tt-title {{ font-weight: 600; color: #a0c4ff; margin-bottom: 6px; font-size: 13px; }}
#tooltip .tt-row {{ display: flex; gap: 6px; }}
#tooltip .tt-key {{ color: #888; min-width: 80px; }}
#tooltip .tt-val {{ color: #eee; word-break: break-all; }}
</style>
</head>
<body>
<div id="container"></div>
<div id="panel">
<h3>{title}</h3>
<div id="stats"></div>
<div id="filters"></div>
</div>
<div id="tooltip"></div>
<script>
(function () {{
const graphData = {json_data};
// ── Build graphology graph ──────────────────────────────────────────────
const Graph = graphology.Graph || graphology;
const g = new Graph({{ multi: true, type: 'directed' }});
// Assign random initial positions
graphData.nodes.forEach(function (n) {{
g.addNode(n.key, Object.assign({{
x: (Math.random() - 0.5) * 10,
y: (Math.random() - 0.5) * 10,
}}, n.attributes));
}});
graphData.edges.forEach(function (e) {{
try {{
g.addEdgeWithKey(e.key, e.source, e.target, e.attributes || {{}});
}} catch (err) {{
// skip duplicate edge keys gracefully
}}
}});
// ── ForceAtlas2 layout (synchronous, 500 iterations) ───────────────────
const FA2 = graphologyLibrary.layoutForceAtlas2;
FA2.assign(g, {{
iterations: 500,
settings: {{
gravity: 1,
scalingRatio: 2,
slowDown: 5,
barnesHutOptimize: g.order > 300,
}},
}});
// ── Sigma renderer ──────────────────────────────────────────────────────
const renderer = new Sigma(g, document.getElementById('container'), {{
renderEdgeLabels: false,
defaultEdgeColor: '#444',
defaultNodeColor: '#95a5a6',
labelColor: {{ color: '#ccc' }},
labelSize: 11,
edgeReducer: function (edge, data) {{
return Object.assign({{}}, data, {{ size: Math.max(1, (data.weight || 1) * 0.8) }});
}},
}});
// ── Stats panel ─────────────────────────────────────────────────────────
document.getElementById('stats').textContent =
graphData.nodes.length + ' nodes · ' + graphData.edges.length + ' edges';
// ── Filter panel by node type ───────────────────────────────────────────
const typeColors = {{}};
graphData.nodes.forEach(function (n) {{
const t = n.attributes.entity_type || 'unknown';
typeColors[t] = n.attributes.color || '#95a5a6';
}});
const hiddenTypes = new Set();
const filtersDiv = document.getElementById('filters');
Object.keys(typeColors).sort().forEach(function (type) {{
const color = typeColors[type];
const label = document.createElement('label');
label.className = 'filter-item';
const cb = document.createElement('input');
cb.type = 'checkbox';
cb.checked = true;
cb.addEventListener('change', function () {{
if (cb.checked) hiddenTypes.delete(type);
else hiddenTypes.add(type);
renderer.refresh();
}});
const dot = document.createElement('span');
dot.className = 'color-dot';
dot.style.background = color;
label.appendChild(cb);
label.appendChild(dot);
label.appendChild(document.createTextNode(type));
filtersDiv.appendChild(label);
}});
// Node reducer applies type filter
renderer.setSetting('nodeReducer', function (node, data) {{
if (hiddenTypes.has(data.entity_type)) return Object.assign({{}}, data, {{ hidden: true }});
return data;
}});
// ── Tooltip on hover ────────────────────────────────────────────────────
const tooltip = document.getElementById('tooltip');
renderer.on('enterNode', function (ref) {{
const nodeAttrs = g.getNodeAttributes(ref.node);
const reserved = new Set(['x', 'y', 'size', 'color', 'label', 'type', 'hidden']);
let html = '<div class="tt-title">' + escHtml(nodeAttrs.label || ref.node) + '</div>';
html += '<div class="tt-row"><span class="tt-key">type</span><span class="tt-val">' + escHtml(nodeAttrs.entity_type || '') + '</span></div>';
html += '<div class="tt-row"><span class="tt-key">status</span><span class="tt-val">' + escHtml(nodeAttrs.status || '') + '</span></div>';
html += '<div class="tt-row"><span class="tt-key">domain</span><span class="tt-val">' + escHtml(nodeAttrs.domain || '') + '</span></div>';
Object.keys(nodeAttrs).sort().forEach(function (k) {{
if (!reserved.has(k) && !['status', 'domain', 'type', 'label'].includes(k)) {{
html += '<div class="tt-row"><span class="tt-key">' + escHtml(k) + '</span><span class="tt-val">' + escHtml(String(nodeAttrs[k])) + '</span></div>';
}}
}});
tooltip.innerHTML = html;
tooltip.style.display = 'block';
}});
renderer.on('leaveNode', function () {{
tooltip.style.display = 'none';
}});
document.getElementById('container').addEventListener('mousemove', function (e) {{
tooltip.style.left = (e.clientX + 16) + 'px';
tooltip.style.top = (e.clientY + 16) + 'px';
}});
function escHtml(str) {{
return String(str)
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;');
}}
}})();
</script>
</body>
</html>
"""
def render_sigma_html(
graph_data: dict,
output_path: str,
title: str = "OSINT Graph",
) -> str:
"""Genera un HTML standalone con sigma.js que visualiza el grafo OSINT.
Recibe el dict producido por ops_to_sigma_json, embebe los datos como JSON
en el HTML, aplica ForceAtlas2 (500 iteraciones sincrono) y renderiza con
sigma.js v2.4. Incluye dark theme, panel de filtros por tipo de nodo y
tooltip con metadata al hacer hover.
Args:
graph_data: Dict con claves 'nodes' y 'edges' en formato graphology/sigma.
output_path: Ruta del archivo HTML a escribir.
title: Titulo del grafo mostrado en el panel y la pestana.
Returns:
Ruta absoluta del archivo HTML escrito.
Raises:
Exception: Si no se puede escribir el archivo en output_path.
"""
json_data = json.dumps(graph_data, ensure_ascii=False)
html = _HTML_TEMPLATE.format(
title=title,
json_data=json_data,
)
abs_path = os.path.abspath(output_path)
os.makedirs(os.path.dirname(abs_path) or ".", exist_ok=True)
try:
with open(abs_path, "w", encoding="utf-8") as f:
f.write(html)
except OSError as exc:
raise Exception(f"render_sigma_html: no se pudo escribir '{abs_path}': {exc}") from exc
return abs_path