"""Convierte operations.db a triples RDF (subject, predicate, object).""" import json import sqlite3 def ops_to_rdf_triples( db_path: str, namespace: str = "http://osint.local/", ) -> list[tuple[str, str, str]]: """Convierte entities y relations de operations.db a triples RDF. Genera triples para: - Tipo de entidad: (entity_uri, rdf:type, type_ref) - Nombre: (entity_uri, name, literal) - Status y domain: (entity_uri, status|domain, literal) - Cada clave de metadata: (entity_uri, key, str(value)) - Relaciones: (from_entity_uri, relation_name, to_entity_uri) Los IDs de entidades se prefijarlos con el namespace para formar URIs. Args: db_path: Ruta al archivo operations.db. namespace: Prefijo de namespace para construir URIs. Default: "http://osint.local/". Returns: Lista de tuplas (subject, predicate, object) representando los triples RDF. """ conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: cur = conn.cursor() cur.execute( "SELECT id, name, type_ref, status, domain, metadata FROM entities" ) raw_entities = [dict(row) for row in cur.fetchall()] cur.execute( "SELECT id, name, from_entity, to_entity FROM relations" ) raw_relations = [dict(row) for row in cur.fetchall()] finally: conn.close() ns = namespace.rstrip("/") + "/" triples: list[tuple[str, str, str]] = [] for entity in raw_entities: subject = ns + entity["id"] # rdf:type if entity["type_ref"]: triples.append((subject, "rdf:type", entity["type_ref"])) # name if entity["name"]: triples.append((subject, "name", entity["name"])) # status if entity["status"]: triples.append((subject, "status", entity["status"])) # domain if entity["domain"]: triples.append((subject, "domain", entity["domain"])) # metadata keys try: meta = json.loads(entity["metadata"]) if entity["metadata"] else {} except (json.JSONDecodeError, TypeError): meta = {} for key, value in meta.items(): triples.append((subject, key, str(value))) for rel in raw_relations: from_uri = ns + rel["from_entity"] to_uri = ns + rel["to_entity"] predicate = rel["name"] or rel["id"] triples.append((from_uri, predicate, to_uri)) return triples