feat: funciones datascience — ops_to_rdf_triples, ops_to_sigma_json, render_sigma_html
Conversión de operations.db a triples RDF y formato sigma.js, más renderizado HTML standalone con dark theme y ForceAtlas2 layout. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,82 @@
|
||||
"""Convierte operations.db a triples RDF (subject, predicate, object)."""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
|
||||
def ops_to_rdf_triples(
|
||||
db_path: str,
|
||||
namespace: str = "http://osint.local/",
|
||||
) -> list[tuple[str, str, str]]:
|
||||
"""Convierte entities y relations de operations.db a triples RDF.
|
||||
|
||||
Genera triples para:
|
||||
- Tipo de entidad: (entity_uri, rdf:type, type_ref)
|
||||
- Nombre: (entity_uri, name, literal)
|
||||
- Status y domain: (entity_uri, status|domain, literal)
|
||||
- Cada clave de metadata: (entity_uri, key, str(value))
|
||||
- Relaciones: (from_entity_uri, relation_name, to_entity_uri)
|
||||
|
||||
Los IDs de entidades se prefijarlos con el namespace para formar URIs.
|
||||
|
||||
Args:
|
||||
db_path: Ruta al archivo operations.db.
|
||||
namespace: Prefijo de namespace para construir URIs. Default: "http://osint.local/".
|
||||
|
||||
Returns:
|
||||
Lista de tuplas (subject, predicate, object) representando los triples RDF.
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT id, name, type_ref, status, domain, metadata FROM entities"
|
||||
)
|
||||
raw_entities = [dict(row) for row in cur.fetchall()]
|
||||
|
||||
cur.execute(
|
||||
"SELECT id, name, from_entity, to_entity FROM relations"
|
||||
)
|
||||
raw_relations = [dict(row) for row in cur.fetchall()]
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
ns = namespace.rstrip("/") + "/"
|
||||
triples: list[tuple[str, str, str]] = []
|
||||
|
||||
for entity in raw_entities:
|
||||
subject = ns + entity["id"]
|
||||
|
||||
# rdf:type
|
||||
if entity["type_ref"]:
|
||||
triples.append((subject, "rdf:type", entity["type_ref"]))
|
||||
|
||||
# name
|
||||
if entity["name"]:
|
||||
triples.append((subject, "name", entity["name"]))
|
||||
|
||||
# status
|
||||
if entity["status"]:
|
||||
triples.append((subject, "status", entity["status"]))
|
||||
|
||||
# domain
|
||||
if entity["domain"]:
|
||||
triples.append((subject, "domain", entity["domain"]))
|
||||
|
||||
# metadata keys
|
||||
try:
|
||||
meta = json.loads(entity["metadata"]) if entity["metadata"] else {}
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
meta = {}
|
||||
|
||||
for key, value in meta.items():
|
||||
triples.append((subject, key, str(value)))
|
||||
|
||||
for rel in raw_relations:
|
||||
from_uri = ns + rel["from_entity"]
|
||||
to_uri = ns + rel["to_entity"]
|
||||
predicate = rel["name"] or rel["id"]
|
||||
triples.append((from_uri, predicate, to_uri))
|
||||
|
||||
return triples
|
||||
Reference in New Issue
Block a user