Files
retrieving_graphs/notebooks/.ipynb_checkpoints/03_osint_intelligence_graph-checkpoint.ipynb

1169 lines
60 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "intro",
"metadata": {},
"source": [
"# OSINT Intelligence Graph: SQLite Triple Store vs Oxigraph vs operations.db\n",
"\n",
"## Objetivo\n",
"\n",
"Comparar tres backends para almacenar inteligencia OSINT con relaciones semanticas:\n",
"1. **operations.db** — schema nativo del fn_registry (entities + relations + assertions)\n",
"2. **SQLite Triple Store** — tabla de triples con CTEs recursivos\n",
"3. **Oxigraph (pyoxigraph)** — triple store SPARQL nativo en Rust\n",
"\n",
"Medimos rendimiento, compatibilidad con LLM query generation, y visualizamos con sigma.js."
]
},
{
"cell_type": "markdown",
"id": "s1",
"metadata": {},
"source": [
"## 1. Setup y datos sinteticos OSINT"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "setup",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"FN_ROOT: /home/lucas/fn_registry\n",
"DATA_DIR: /home/lucas/fn_registry/analysis/retrieving_graphs/data/osint\n"
]
}
],
"source": [
"import sqlite3, json, os, sys, time, shutil, random, hashlib, uuid\n",
"import pandas as pd\n",
"import matplotlib\n",
"matplotlib.use('Agg')\n",
"import matplotlib.pyplot as plt\n",
"from datetime import datetime, timedelta\n",
"\n",
"plt.style.use('seaborn-v0_8-whitegrid')\n",
"\n",
"FN_ROOT = os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry'))\n",
"sys.path.insert(0, os.path.join(FN_ROOT, 'python', 'functions'))\n",
"\n",
"DATA_DIR = 'data/osint'\n",
"OUTPUT_DIR = 'data/output'\n",
"os.makedirs(DATA_DIR, exist_ok=True)\n",
"os.makedirs(OUTPUT_DIR, exist_ok=True)\n",
"\n",
"print(f'FN_ROOT: {FN_ROOT}')\n",
"print(f'DATA_DIR: {os.path.abspath(DATA_DIR)}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "gen-data",
"metadata": {},
"outputs": [],
"source": [
"# === DATOS SINTETICOS OSINT ===\n",
"# Dos narrativas: Grupo APT + Insider Trading Ring\n",
"\n",
"random.seed(42)\n",
"now = datetime.utcnow()\n",
"\n",
"def rand_date(start_year=2023):\n",
" d = datetime(start_year, 1, 1) + timedelta(days=random.randint(0, 800))\n",
" return d.strftime('%Y-%m-%d')\n",
"\n",
"def rand_ip():\n",
" return f'{random.randint(10,223)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}'\n",
"\n",
"def rand_hash():\n",
" return hashlib.sha256(uuid.uuid4().bytes).hexdigest()\n",
"\n",
"def rand_btc():\n",
" return '1' + ''.join(random.choices('ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz123456789', k=33))\n",
"\n",
"def rand_eth():\n",
" return '0x' + ''.join(random.choices('0123456789abcdef', k=40))\n",
"\n",
"# --- ENTITIES ---\n",
"entities = []\n",
"\n",
"# Narrative A: APT group\n",
"entities += [\n",
" {'id': 'person_001', 'name': 'Viktor Petrov', 'type_ref': 'person', 'domain': 'cybersecurity', 'source': 'humint',\n",
" 'metadata': {'risk_score': 92, 'country': 'RU', 'aliases': ['darkside_v', 'vp_shadow'], 'first_seen': '2023-06-15', 'last_seen': '2025-11-20', 'role': 'operator'}},\n",
" {'id': 'person_002', 'name': 'Li Wei', 'type_ref': 'person', 'domain': 'cybersecurity', 'source': 'sigint',\n",
" 'metadata': {'risk_score': 78, 'country': 'CN', 'aliases': ['ghost_dragon'], 'first_seen': '2023-09-01', 'last_seen': '2025-10-15', 'role': 'developer'}},\n",
" {'id': 'person_003', 'name': 'Andrei Volkov', 'type_ref': 'person', 'domain': 'cybersecurity', 'source': 'osint_social',\n",
" 'metadata': {'risk_score': 65, 'country': 'UA', 'aliases': ['a_v_cyber'], 'first_seen': '2024-01-10', 'last_seen': '2025-12-01', 'role': 'money_mule'}},\n",
" {'id': 'org_001', 'name': 'Quantum Digital Ltd', 'type_ref': 'organization', 'domain': 'cybersecurity', 'source': 'osint_corporate',\n",
" 'metadata': {'jurisdiction': 'BVI', 'type': 'shell_company', 'registered_date': '2023-03-22', 'risk_score': 88}},\n",
" {'id': 'org_002', 'name': 'NovaTech Solutions', 'type_ref': 'organization', 'domain': 'cybersecurity', 'source': 'osint_corporate',\n",
" 'metadata': {'jurisdiction': 'CY', 'type': 'front_company', 'registered_date': '2022-11-05', 'risk_score': 72}},\n",
" {'id': 'ip_001', 'name': 'C2 Primary', 'type_ref': 'ip_address', 'domain': 'cybersecurity', 'source': 'threat_intel',\n",
" 'metadata': {'address': '185.220.101.42', 'asn': 'AS9009', 'country': 'NL', 'first_seen': '2023-08-15', 'last_seen': '2025-11-30', 'risk_score': 95}},\n",
" {'id': 'ip_002', 'name': 'C2 Backup', 'type_ref': 'ip_address', 'domain': 'cybersecurity', 'source': 'threat_intel',\n",
" 'metadata': {'address': '91.215.85.17', 'asn': 'AS48693', 'country': 'RU', 'first_seen': '2024-02-01', 'last_seen': '2025-10-22', 'risk_score': 90}},\n",
" {'id': 'ip_003', 'name': 'Proxy Node', 'type_ref': 'ip_address', 'domain': 'cybersecurity', 'source': 'network_scan',\n",
" 'metadata': {'address': '45.33.32.156', 'asn': 'AS63949', 'country': 'US', 'first_seen': '2024-05-10', 'last_seen': '2025-09-15', 'risk_score': 60}},\n",
" {'id': 'domain_001', 'name': 'secure-update.xyz', 'type_ref': 'domain', 'domain': 'cybersecurity', 'source': 'threat_intel',\n",
" 'metadata': {'registrar': 'NameCheap', 'created_date': '2024-01-15', 'category': 'c2', 'risk_score': 95}},\n",
" {'id': 'domain_002', 'name': 'cloud-services-auth.com', 'type_ref': 'domain', 'domain': 'cybersecurity', 'source': 'phishing_db',\n",
" 'metadata': {'registrar': 'Njalla', 'created_date': '2024-06-20', 'category': 'phishing', 'risk_score': 88}},\n",
" {'id': 'domain_003', 'name': 'fileshare-cdn.net', 'type_ref': 'domain', 'domain': 'cybersecurity', 'source': 'malware_analysis',\n",
" 'metadata': {'registrar': 'NameSilo', 'created_date': '2023-11-03', 'category': 'payload_delivery', 'risk_score': 82}},\n",
" {'id': 'wallet_001', 'name': 'APT Primary BTC', 'type_ref': 'crypto_wallet', 'domain': 'cybersecurity', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'BTC', 'address': rand_btc(), 'balance': 2.45, 'first_tx': '2023-07-20', 'last_tx': '2025-11-15', 'risk_score': 90}},\n",
" {'id': 'wallet_002', 'name': 'Mixer Output 1', 'type_ref': 'crypto_wallet', 'domain': 'cybersecurity', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'BTC', 'address': rand_btc(), 'balance': 0.78, 'first_tx': '2024-01-10', 'last_tx': '2025-08-22', 'risk_score': 75}},\n",
" {'id': 'wallet_003', 'name': 'ETH Laundering', 'type_ref': 'crypto_wallet', 'domain': 'cybersecurity', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'ETH', 'address': rand_eth(), 'balance': 15.3, 'first_tx': '2024-03-05', 'last_tx': '2025-12-01', 'risk_score': 85}},\n",
" {'id': 'malware_001', 'name': 'ShadowRAT v3', 'type_ref': 'malware', 'domain': 'cybersecurity', 'source': 'malware_analysis',\n",
" 'metadata': {'family': 'RAT', 'hash_sha256': rand_hash(), 'first_seen': '2023-08-20', 'detection_rate': 0.35, 'risk_score': 92}},\n",
" {'id': 'malware_002', 'name': 'CryptoStealer', 'type_ref': 'malware', 'domain': 'cybersecurity', 'source': 'malware_analysis',\n",
" 'metadata': {'family': 'stealer', 'hash_sha256': rand_hash(), 'first_seen': '2024-04-12', 'detection_rate': 0.22, 'risk_score': 88}},\n",
" {'id': 'vuln_001', 'name': 'CVE-2024-3400', 'type_ref': 'vulnerability', 'domain': 'cybersecurity', 'source': 'nvd',\n",
" 'metadata': {'cvss': 9.8, 'affected_product': 'PAN-OS', 'patch_available': True, 'exploited_in_wild': True, 'risk_score': 98}},\n",
" {'id': 'vuln_002', 'name': 'CVE-2024-21887', 'type_ref': 'vulnerability', 'domain': 'cybersecurity', 'source': 'nvd',\n",
" 'metadata': {'cvss': 8.2, 'affected_product': 'Ivanti Connect Secure', 'patch_available': True, 'exploited_in_wild': True, 'risk_score': 85}},\n",
" {'id': 'email_001', 'name': 'vp_shadow@proton.me', 'type_ref': 'email', 'domain': 'cybersecurity', 'source': 'osint_social',\n",
" 'metadata': {'provider': 'protonmail', 'verified': True, 'associated_breaches': 0, 'risk_score': 70}},\n",
" {'id': 'email_002', 'name': 'ghost.dragon@tutanota.com', 'type_ref': 'email', 'domain': 'cybersecurity', 'source': 'dark_web',\n",
" 'metadata': {'provider': 'tutanota', 'verified': False, 'associated_breaches': 2, 'risk_score': 80}},\n",
"]\n",
"\n",
"# Narrative B: Insider Trading Ring\n",
"entities += [\n",
" {'id': 'person_004', 'name': 'Sarah Chen', 'type_ref': 'person', 'domain': 'finance', 'source': 'humint',\n",
" 'metadata': {'risk_score': 70, 'country': 'US', 'aliases': ['s_chen_insider'], 'first_seen': '2024-02-15', 'last_seen': '2025-12-01', 'role': 'insider'}},\n",
" {'id': 'person_005', 'name': 'Marcus Webb', 'type_ref': 'person', 'domain': 'finance', 'source': 'osint_financial',\n",
" 'metadata': {'risk_score': 82, 'country': 'UK', 'aliases': ['m_webb_trades'], 'first_seen': '2024-03-01', 'last_seen': '2025-11-28', 'role': 'trader'}},\n",
" {'id': 'person_006', 'name': 'Dmitri Sokolov', 'type_ref': 'person', 'domain': 'finance', 'source': 'humint',\n",
" 'metadata': {'risk_score': 75, 'country': 'RU', 'aliases': ['d_sok'], 'first_seen': '2024-01-20', 'last_seen': '2025-11-30', 'role': 'facilitator'}},\n",
" {'id': 'org_003', 'name': 'Apex Capital Partners', 'type_ref': 'organization', 'domain': 'finance', 'source': 'osint_financial',\n",
" 'metadata': {'jurisdiction': 'UK', 'type': 'hedge_fund', 'registered_date': '2019-06-15', 'risk_score': 55, 'aum_millions': 340}},\n",
" {'id': 'org_004', 'name': 'Pacific Rim Brokers', 'type_ref': 'organization', 'domain': 'finance', 'source': 'osint_financial',\n",
" 'metadata': {'jurisdiction': 'HK', 'type': 'broker', 'registered_date': '2021-02-10', 'risk_score': 62}},\n",
" {'id': 'domain_004', 'name': 'apex-secure-comms.io', 'type_ref': 'domain', 'domain': 'finance', 'source': 'osint_web',\n",
" 'metadata': {'registrar': 'Cloudflare', 'created_date': '2024-04-01', 'category': 'communications', 'risk_score': 45}},\n",
" {'id': 'wallet_004', 'name': 'Trading Fund BTC', 'type_ref': 'crypto_wallet', 'domain': 'finance', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'BTC', 'address': rand_btc(), 'balance': 8.2, 'first_tx': '2024-04-10', 'last_tx': '2025-11-25', 'risk_score': 55}},\n",
" {'id': 'wallet_005', 'name': 'Webb Personal ETH', 'type_ref': 'crypto_wallet', 'domain': 'finance', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'ETH', 'address': rand_eth(), 'balance': 42.7, 'first_tx': '2024-05-01', 'last_tx': '2025-12-01', 'risk_score': 48}},\n",
" {'id': 'ip_004', 'name': 'Trading VPN', 'type_ref': 'ip_address', 'domain': 'finance', 'source': 'network_analysis',\n",
" 'metadata': {'address': '104.21.45.89', 'asn': 'AS13335', 'country': 'US', 'first_seen': '2024-06-01', 'last_seen': '2025-11-30', 'risk_score': 35}},\n",
"]\n",
"\n",
"# Trading signals\n",
"for i in range(8):\n",
" symbol = random.choice(['AAPL', 'NVDA', 'TSLA', 'MSFT', 'AMZN', 'BTC-USD', 'ETH-USD', 'SOL-USD'])\n",
" entities.append({\n",
" 'id': f'signal_{i+1:03d}',\n",
" 'name': f'{symbol} {random.choice([\"long\",\"short\"])} signal',\n",
" 'type_ref': 'trading_signal',\n",
" 'domain': 'finance',\n",
" 'source': 'algo_detection',\n",
" 'metadata': {\n",
" 'symbol': symbol,\n",
" 'direction': random.choice(['long', 'short']),\n",
" 'confidence': round(random.uniform(0.4, 0.98), 2),\n",
" 'timestamp': rand_date(2024),\n",
" 'pnl_percent': round(random.uniform(-15, 45), 1),\n",
" 'risk_score': random.randint(20, 70),\n",
" }\n",
" })\n",
"\n",
"# Cross-links: shared wallet between narratives\n",
"entities.append({\n",
" 'id': 'wallet_bridge', 'name': 'Bridge Wallet BTC', 'type_ref': 'crypto_wallet', 'domain': 'cybersecurity', 'source': 'blockchain_analysis',\n",
" 'metadata': {'currency': 'BTC', 'address': rand_btc(), 'balance': 0.15, 'first_tx': '2024-09-01', 'last_tx': '2025-11-10', 'risk_score': 78,\n",
" 'note': 'Links APT laundering to insider trading payments'}\n",
"})\n",
"\n",
"print(f'Entidades generadas: {len(entities)}')\n",
"from collections import Counter\n",
"type_counts = Counter(e['type_ref'] for e in entities)\n",
"for t, c in type_counts.most_common():\n",
" print(f' {t}: {c}')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "gen-relations",
"metadata": {},
"outputs": [],
"source": [
"# === RELACIONES ===\n",
"relations = [\n",
" # Narrative A: APT infrastructure\n",
" ('rel_001', 'operates', 'person_001', 'domain_001', 0.95, 'Viktor operates C2 domain'),\n",
" ('rel_002', 'operates', 'person_001', 'domain_002', 0.85, 'Viktor operates phishing domain'),\n",
" ('rel_003', 'operates', 'person_002', 'domain_003', 0.90, 'Li Wei operates payload delivery'),\n",
" ('rel_004', 'controls', 'person_001', 'ip_001', 0.92, 'Viktor controls primary C2'),\n",
" ('rel_005', 'controls', 'person_001', 'ip_002', 0.88, 'Viktor controls backup C2'),\n",
" ('rel_006', 'uses', 'person_002', 'ip_003', 0.70, 'Li Wei uses proxy node'),\n",
" ('rel_007', 'resolves_to', 'domain_001', 'ip_001', 1.0, 'DNS resolution'),\n",
" ('rel_008', 'resolves_to', 'domain_002', 'ip_001', 1.0, 'DNS resolution'),\n",
" ('rel_009', 'resolves_to', 'domain_003', 'ip_002', 1.0, 'DNS resolution'),\n",
" ('rel_010', 'hosts', 'ip_001', 'malware_001', 0.95, 'C2 hosts ShadowRAT'),\n",
" ('rel_011', 'hosts', 'ip_002', 'malware_002', 0.90, 'Backup hosts CryptoStealer'),\n",
" ('rel_012', 'develops', 'person_002', 'malware_001', 0.85, 'Li Wei developed ShadowRAT'),\n",
" ('rel_013', 'develops', 'person_002', 'malware_002', 0.80, 'Li Wei developed CryptoStealer'),\n",
" ('rel_014', 'exploits', 'malware_001', 'vuln_001', 0.95, 'ShadowRAT exploits PAN-OS vuln'),\n",
" ('rel_015', 'exploits', 'malware_002', 'vuln_002', 0.88, 'CryptoStealer exploits Ivanti vuln'),\n",
" # Crypto laundering chain\n",
" ('rel_016', 'owns', 'person_001', 'wallet_001', 0.90, 'Viktor owns primary wallet'),\n",
" ('rel_017', 'transfers_to', 'wallet_001', 'wallet_002', 0.95, 'Laundering hop 1'),\n",
" ('rel_018', 'transfers_to', 'wallet_002', 'wallet_003', 0.92, 'Laundering hop 2'),\n",
" ('rel_019', 'transfers_to', 'wallet_003', 'wallet_bridge', 0.85, 'Laundering to bridge'),\n",
" ('rel_020', 'owns', 'person_003', 'wallet_003', 0.75, 'Andrei owns ETH laundering wallet'),\n",
" # Org structure\n",
" ('rel_021', 'employs', 'org_001', 'person_001', 0.80, 'Shell company employs Viktor'),\n",
" ('rel_022', 'employs', 'org_002', 'person_002', 0.75, 'Front company employs Li Wei'),\n",
" ('rel_023', 'employs', 'org_001', 'person_003', 0.70, 'Shell employs money mule'),\n",
" ('rel_024', 'communicates_with', 'person_001', 'person_002', 0.95, 'Encrypted comms'),\n",
" ('rel_025', 'communicates_with', 'person_001', 'person_003', 0.80, 'Money mule coordination'),\n",
" # Email links\n",
" ('rel_026', 'uses_email', 'person_001', 'email_001', 0.95, 'Primary email'),\n",
" ('rel_027', 'uses_email', 'person_002', 'email_002', 0.90, 'Dark web email'),\n",
" \n",
" # Narrative B: Insider Trading\n",
" ('rel_028', 'employs', 'org_003', 'person_004', 0.95, 'Apex employs insider'),\n",
" ('rel_029', 'employs', 'org_004', 'person_005', 0.90, 'Broker employs trader'),\n",
" ('rel_030', 'communicates_with', 'person_004', 'person_005', 0.88, 'Insider tips trader'),\n",
" ('rel_031', 'communicates_with', 'person_005', 'person_006', 0.82, 'Trader coords with facilitator'),\n",
" ('rel_032', 'facilitates', 'person_006', 'org_004', 0.78, 'Facilitator connects broker'),\n",
" ('rel_033', 'owns', 'person_005', 'wallet_004', 0.90, 'Webb owns trading wallet'),\n",
" ('rel_034', 'owns', 'person_005', 'wallet_005', 0.95, 'Webb personal ETH'),\n",
" ('rel_035', 'uses', 'person_005', 'domain_004', 0.85, 'Webb uses secure comms'),\n",
" ('rel_036', 'uses', 'person_005', 'ip_004', 0.80, 'Webb uses trading VPN'),\n",
" ('rel_037', 'resolves_to', 'domain_004', 'ip_004', 1.0, 'DNS resolution'),\n",
" \n",
" # Cross-narrative links\n",
" ('rel_038', 'transfers_to', 'wallet_bridge', 'wallet_004', 0.72, 'APT funds reach trading ring'),\n",
" ('rel_039', 'communicates_with', 'person_003', 'person_006', 0.65, 'Money mule meets facilitator'),\n",
" ('rel_040', 'owns', 'person_006', 'wallet_bridge', 0.70, 'Facilitator controls bridge wallet'),\n",
"]\n",
"\n",
"# Trading signal relations\n",
"for i in range(8):\n",
" actor = random.choice(['person_004', 'person_005'])\n",
" relations.append((f'rel_sig_{i+1:03d}', 'generates', actor, f'signal_{i+1:03d}', \n",
" round(random.uniform(0.6, 0.95), 2), f'Actor generates signal'))\n",
"\n",
"print(f'Relaciones generadas: {len(relations)}')\n",
"rel_types = Counter(r[1] for r in relations)\n",
"for t, c in rel_types.most_common():\n",
" print(f' {t}: {c}')"
]
},
{
"cell_type": "markdown",
"id": "s2",
"metadata": {},
"source": [
"## 2. Backend A: operations.db"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ops-load",
"metadata": {},
"outputs": [],
"source": [
"# === CARGAR EN OPERATIONS.DB ===\n",
"OPS_DB = os.path.join(DATA_DIR, 'operations.db')\n",
"\n",
"# Crear tablas de assertions si no existen (template puede no tenerlas)\n",
"conn = sqlite3.connect(OPS_DB)\n",
"conn.execute('PRAGMA journal_mode=WAL')\n",
"conn.execute('PRAGMA foreign_keys=ON')\n",
"\n",
"# Crear tablas que faltan\n",
"conn.executescript('''\n",
"CREATE TABLE IF NOT EXISTS assertions (\n",
" id TEXT PRIMARY KEY, entity_id TEXT NOT NULL, name TEXT NOT NULL,\n",
" kind TEXT NOT NULL, rule TEXT NOT NULL, severity TEXT NOT NULL DEFAULT 'warning',\n",
" description TEXT DEFAULT '', active INTEGER DEFAULT 1,\n",
" created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),\n",
" FOREIGN KEY(entity_id) REFERENCES entities(id)\n",
");\n",
"CREATE TABLE IF NOT EXISTS assertion_results (\n",
" id TEXT PRIMARY KEY, assertion_id TEXT NOT NULL, execution_id TEXT DEFAULT '',\n",
" status TEXT NOT NULL, value TEXT DEFAULT '{}', message TEXT DEFAULT '',\n",
" evaluated_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now')),\n",
" FOREIGN KEY(assertion_id) REFERENCES assertions(id)\n",
");\n",
"CREATE TABLE IF NOT EXISTS executions (\n",
" id TEXT PRIMARY KEY, pipeline_id TEXT NOT NULL, relation_id TEXT DEFAULT '',\n",
" status TEXT NOT NULL, started_at TEXT NOT NULL, ended_at TEXT DEFAULT '',\n",
" duration_ms INTEGER DEFAULT 0, records_in INTEGER DEFAULT 0, records_out INTEGER DEFAULT 0,\n",
" error TEXT DEFAULT '', metrics TEXT DEFAULT '{}',\n",
" created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))\n",
");\n",
"CREATE TABLE IF NOT EXISTS logs (\n",
" id TEXT PRIMARY KEY, level TEXT NOT NULL DEFAULT 'info', source TEXT DEFAULT '',\n",
" entity_id TEXT DEFAULT '', execution_id TEXT DEFAULT '',\n",
" message TEXT NOT NULL, metadata TEXT DEFAULT '{}',\n",
" created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%SZ','now'))\n",
");\n",
"''')\n",
"\n",
"now_str = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')\n",
"\n",
"# Insert entities\n",
"t0 = time.perf_counter()\n",
"for e in entities:\n",
" conn.execute(\n",
" 'INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at) '\n",
" 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',\n",
" (e['id'], e['name'], e['type_ref'], 'active', f'OSINT entity: {e[\"name\"]}',\n",
" e['domain'], json.dumps(list(e['metadata'].keys())), e['source'],\n",
" json.dumps(e['metadata']), '', now_str, now_str)\n",
" )\n",
"\n",
"# Insert relations\n",
"for r in relations:\n",
" rid, rtype, from_e, to_e, weight, desc = r\n",
" conn.execute(\n",
" 'INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, tags, notes, created_at, updated_at) '\n",
" 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',\n",
" (rid, rtype, from_e, to_e, '', desc, 'impure', 'unidirectional', weight,\n",
" 'implemented', '[]', '', now_str, now_str)\n",
" )\n",
"conn.commit()\n",
"ops_insert_time = time.perf_counter() - t0\n",
"\n",
"print(f'operations.db:')\n",
"print(f' Entities: {conn.execute(\"SELECT COUNT(*) FROM entities\").fetchone()[0]}')\n",
"print(f' Relations: {conn.execute(\"SELECT COUNT(*) FROM relations\").fetchone()[0]}')\n",
"print(f' Insert time: {ops_insert_time*1000:.1f}ms')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ops-assertions",
"metadata": {},
"outputs": [],
"source": [
"# === ASSERTIONS ===\n",
"# Reglas que usan bare field names -> rewrite a json_extract(metadata, '$.field')\n",
"assertions_data = [\n",
" ('assert_risk_range', 'person_%', 'range', 'risk_score >= 0 AND risk_score <= 100', 'warning', 'Risk score debe estar en [0,100]'),\n",
" ('assert_cvss_range', 'vuln_%', 'range', 'cvss >= 0.0 AND cvss <= 10.0', 'warning', 'CVSS debe estar en [0,10]'),\n",
" ('assert_confidence', 'signal_%', 'range', 'confidence >= 0.0 AND confidence <= 1.0', 'critical', 'Confianza de signal en [0,1]'),\n",
" ('assert_country_nn', 'person_%', 'null', 'country IS NOT NULL', 'info', 'Persona debe tener pais'),\n",
" ('assert_hash_nn', 'malware_%', 'null', 'hash_sha256 IS NOT NULL', 'critical', 'Malware debe tener hash'),\n",
" ('assert_balance_pos', 'wallet_%', 'consistency', 'balance >= 0', 'warning', 'Balance no puede ser negativo'),\n",
" ('assert_patch_info', 'vuln_%', 'consistency', 'patch_available IS NOT NULL', 'info', 'Vuln debe indicar si hay patch'),\n",
" ('assert_fresh', 'person_%', 'freshness', \"last_seen > '2024-01-01'\", 'warning', 'Entidad debe ser reciente'),\n",
"]\n",
"\n",
"# Insertar assertions para cada entity que matchee el pattern\n",
"assert_count = 0\n",
"for a_id_base, pattern, kind, rule, severity, desc in assertions_data:\n",
" matching = conn.execute('SELECT id FROM entities WHERE id LIKE ?', (pattern,)).fetchall()\n",
" for (eid,) in matching:\n",
" a_id = f'{a_id_base}_{eid}'\n",
" conn.execute(\n",
" 'INSERT OR REPLACE INTO assertions (id, entity_id, name, kind, rule, severity, description, active, created_at) '\n",
" 'VALUES (?, ?, ?, ?, ?, ?, ?, 1, ?)',\n",
" (a_id, eid, a_id_base, kind, rule, severity, desc, now_str)\n",
" )\n",
" assert_count += 1\n",
"conn.commit()\n",
"\n",
"print(f'Assertions creadas: {assert_count}')\n",
"\n",
"# Evaluar assertions (rewrite manual de bare fields)\n",
"import re\n",
"def rewrite_rule(rule):\n",
" \"\"\"Rewrite bare field names to json_extract(metadata, '$.field') — mirrors eval.go logic.\"\"\"\n",
" keywords = {'AND','OR','NOT','IS','NULL','IN','LIKE','BETWEEN','CASE','WHEN','THEN','ELSE','END',\n",
" 'TRUE','FALSE','ASC','DESC','SELECT','FROM','WHERE','GROUP','ORDER','HAVING','LIMIT',\n",
" 'json_extract','datetime','abs','avg','count','max','min','sum','length','typeof'}\n",
" if 'json_extract' in rule:\n",
" return rule\n",
" def replacer(m):\n",
" word = m.group(0)\n",
" if word.upper() in keywords:\n",
" return word\n",
" try:\n",
" float(word)\n",
" return word\n",
" except ValueError:\n",
" pass\n",
" return f\"json_extract(metadata, '$.{word}')\"\n",
" return re.sub(r'\\b([a-zA-Z_][a-zA-Z0-9_]*)\\b', replacer, rule)\n",
"\n",
"results = []\n",
"for row in conn.execute('SELECT id, entity_id, name, kind, rule, severity FROM assertions WHERE active = 1').fetchall():\n",
" a_id, eid, name, kind, rule, severity = row\n",
" rewritten = rewrite_rule(rule)\n",
" try:\n",
" r = conn.execute(f\"SELECT CASE WHEN ({rewritten}) THEN 'pass' ELSE 'fail' END FROM entities WHERE id = ?\", (eid,)).fetchone()\n",
" status = r[0] if r else 'skip'\n",
" except Exception as e:\n",
" status = 'skip'\n",
" results.append({'assertion_id': a_id, 'entity_id': eid, 'kind': kind, 'severity': severity, 'status': status})\n",
" conn.execute(\n",
" 'INSERT OR REPLACE INTO assertion_results (id, assertion_id, execution_id, status, value, message, evaluated_at) VALUES (?, ?, ?, ?, ?, ?, ?)',\n",
" (f'result_{a_id}', a_id, '', status, '{}', '', now_str)\n",
" )\n",
"conn.commit()\n",
"\n",
"df_assert = pd.DataFrame(results)\n",
"print(f'\\nAssertion results:')\n",
"print(df_assert.groupby('status').size())\n",
"print(f'\\nPor severity:')\n",
"print(df_assert.groupby(['severity', 'status']).size())"
]
},
{
"cell_type": "markdown",
"id": "s3",
"metadata": {},
"source": [
"## 3. Backend B: SQLite Triple Store"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "triple-store",
"metadata": {},
"outputs": [],
"source": [
"# === SQLITE TRIPLE STORE ===\n",
"TRIPLE_DB = os.path.join(DATA_DIR, 'triples.db')\n",
"if os.path.exists(TRIPLE_DB): os.remove(TRIPLE_DB)\n",
"\n",
"tdb = sqlite3.connect(TRIPLE_DB)\n",
"tdb.execute('PRAGMA journal_mode=WAL')\n",
"tdb.executescript('''\n",
"CREATE TABLE triples (\n",
" id INTEGER PRIMARY KEY AUTOINCREMENT,\n",
" subject TEXT NOT NULL,\n",
" predicate TEXT NOT NULL,\n",
" object TEXT NOT NULL,\n",
" object_type TEXT DEFAULT 'uri'\n",
");\n",
"CREATE INDEX idx_sp ON triples(subject, predicate);\n",
"CREATE INDEX idx_po ON triples(predicate, object);\n",
"CREATE INDEX idx_os ON triples(object, subject);\n",
"''')\n",
"\n",
"t0 = time.perf_counter()\n",
"triples = []\n",
"\n",
"# Entities -> property triples\n",
"for e in entities:\n",
" eid = e['id']\n",
" triples.append((eid, 'rdf:type', e['type_ref'], 'uri'))\n",
" triples.append((eid, 'name', e['name'], 'literal'))\n",
" triples.append((eid, 'domain', e['domain'], 'literal'))\n",
" triples.append((eid, 'source', e['source'], 'literal'))\n",
" for k, v in e['metadata'].items():\n",
" triples.append((eid, k, str(v), 'literal'))\n",
"\n",
"# Relations -> relationship triples\n",
"for r in relations:\n",
" rid, rtype, from_e, to_e, weight, desc = r\n",
" triples.append((from_e, rtype, to_e, 'uri'))\n",
"\n",
"tdb.executemany('INSERT INTO triples (subject, predicate, object, object_type) VALUES (?, ?, ?, ?)', triples)\n",
"tdb.commit()\n",
"triple_insert_time = time.perf_counter() - t0\n",
"\n",
"print(f'SQLite Triple Store:')\n",
"print(f' Triples: {tdb.execute(\"SELECT COUNT(*) FROM triples\").fetchone()[0]}')\n",
"print(f' Insert time: {triple_insert_time*1000:.1f}ms')\n",
"print(f' Disco: {os.path.getsize(TRIPLE_DB) / 1024:.1f}KB')"
]
},
{
"cell_type": "markdown",
"id": "s4",
"metadata": {},
"source": [
"## 4. Backend C: Oxigraph (pyoxigraph SPARQL)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "oxigraph-load",
"metadata": {},
"outputs": [],
"source": [
"# === OXIGRAPH ===\n",
"import pyoxigraph as ox\n",
"\n",
"OX_PATH = os.path.join(DATA_DIR, 'oxigraph')\n",
"if os.path.exists(OX_PATH): shutil.rmtree(OX_PATH)\n",
"\n",
"NS = 'http://osint.local/'\n",
"RDF = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'\n",
"\n",
"store = ox.Store(OX_PATH)\n",
"\n",
"t0 = time.perf_counter()\n",
"\n",
"# Entities\n",
"for e in entities:\n",
" subj = ox.NamedNode(f'{NS}{e[\"id\"]}')\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{RDF}type'), ox.NamedNode(f'{NS}{e[\"type_ref\"]}')))\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}name'), ox.Literal(e['name'])))\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}domain'), ox.Literal(e['domain'])))\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}source'), ox.Literal(e['source'])))\n",
" for k, v in e['metadata'].items():\n",
" if isinstance(v, (list, dict)):\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}{k}'), ox.Literal(json.dumps(v))))\n",
" elif isinstance(v, bool):\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}{k}'), ox.Literal(str(v).lower())))\n",
" elif isinstance(v, (int, float)):\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}{k}'), ox.Literal(str(v))))\n",
" else:\n",
" store.add(ox.Quad(subj, ox.NamedNode(f'{NS}{k}'), ox.Literal(str(v))))\n",
"\n",
"# Relations\n",
"for r in relations:\n",
" rid, rtype, from_e, to_e, weight, desc = r\n",
" store.add(ox.Quad(\n",
" ox.NamedNode(f'{NS}{from_e}'),\n",
" ox.NamedNode(f'{NS}{rtype}'),\n",
" ox.NamedNode(f'{NS}{to_e}')\n",
" ))\n",
"\n",
"store.flush()\n",
"ox_insert_time = time.perf_counter() - t0\n",
"\n",
"def dir_size_kb(path):\n",
" total = 0\n",
" for dp, dn, fns in os.walk(path):\n",
" for f in fns: total += os.path.getsize(os.path.join(dp, f))\n",
" return total / 1024\n",
"\n",
"print(f'Oxigraph:')\n",
"print(f' Triples: {len(store)}')\n",
"print(f' Insert time: {ox_insert_time*1000:.1f}ms')\n",
"print(f' Disco: {dir_size_kb(OX_PATH):.1f}KB')"
]
},
{
"cell_type": "markdown",
"id": "s5",
"metadata": {},
"source": [
"## 5. Benchmark: 8 queries OSINT"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "benchmark",
"metadata": {},
"outputs": [],
"source": [
"# === BENCHMARK QUERIES ===\n",
"\n",
"def bench_ops(conn):\n",
" results = {}\n",
" # Q1: Wallets de person_001\n",
" results['q1_wallets'] = [r[0] for r in conn.execute(\n",
" \"SELECT to_entity FROM relations WHERE from_entity='person_001' AND name='owns' AND to_entity LIKE 'wallet%'\"\n",
" ).fetchall()]\n",
" # Q2: Cadena transfers desde wallet_001 (max 5 hops)\n",
" results['q2_chain'] = [r[0] for r in conn.execute('''\n",
" WITH RECURSIVE chain(wallet, depth) AS (\n",
" SELECT to_entity, 1 FROM relations WHERE from_entity='wallet_001' AND name='transfers_to'\n",
" UNION ALL\n",
" SELECT r.to_entity, c.depth+1 FROM relations r JOIN chain c ON r.from_entity=c.wallet\n",
" WHERE r.name='transfers_to' AND c.depth < 5\n",
" ) SELECT DISTINCT wallet FROM chain\n",
" ''').fetchall()]\n",
" # Q3: Infra de narrativa A (entities connected to person_001 or person_002)\n",
" results['q3_infra'] = [r[0] for r in conn.execute('''\n",
" SELECT DISTINCT to_entity FROM relations \n",
" WHERE from_entity IN ('person_001','person_002') \n",
" AND name IN ('operates','controls','uses','develops','hosts')\n",
" ''').fetchall()]\n",
" # Q4: Signals con confidence > 0.8\n",
" results['q4_signals'] = [r[0] for r in conn.execute(\n",
" \"SELECT id FROM entities WHERE type_ref='trading_signal' AND CAST(json_extract(metadata,'$.confidence') AS REAL) > 0.8\"\n",
" ).fetchall()]\n",
" # Q5: Entities con risk_score > 70\n",
" results['q5_high_risk'] = [r[0] for r in conn.execute(\n",
" \"SELECT id FROM entities WHERE CAST(json_extract(metadata,'$.risk_score') AS INTEGER) > 70\"\n",
" ).fetchall()]\n",
" # Q6: Path person_001 -> person_004 via orgs\n",
" results['q6_path'] = [r[:3] for r in conn.execute('''\n",
" SELECT r1.from_entity, r1.to_entity, r2.from_entity\n",
" FROM relations r1 JOIN relations r2 ON r1.to_entity = r2.from_entity\n",
" WHERE r1.from_entity = 'person_001' AND r2.to_entity = 'person_004'\n",
" ''').fetchall()]\n",
" # Q7: Dominios creados despues de 2024-01-01\n",
" results['q7_new_domains'] = [r[0] for r in conn.execute(\n",
" \"SELECT id FROM entities WHERE type_ref='domain' AND json_extract(metadata,'$.created_date') > '2024-01-01'\"\n",
" ).fetchall()]\n",
" # Q8: Subgrafo 2-hop desde wallet_bridge\n",
" results['q8_subgraph'] = [r[0] for r in conn.execute('''\n",
" WITH RECURSIVE neighbors(node, depth) AS (\n",
" SELECT 'wallet_bridge', 0\n",
" UNION\n",
" SELECT CASE WHEN r.from_entity = n.node THEN r.to_entity ELSE r.from_entity END, n.depth+1\n",
" FROM relations r JOIN neighbors n ON (r.from_entity = n.node OR r.to_entity = n.node)\n",
" WHERE n.depth < 2\n",
" ) SELECT DISTINCT node FROM neighbors WHERE node != 'wallet_bridge'\n",
" ''').fetchall()]\n",
" return results\n",
"\n",
"def bench_triples(tdb):\n",
" results = {}\n",
" results['q1_wallets'] = [r[0] for r in tdb.execute(\n",
" \"SELECT object FROM triples WHERE subject='person_001' AND predicate='owns' AND object LIKE 'wallet%'\"\n",
" ).fetchall()]\n",
" results['q2_chain'] = [r[0] for r in tdb.execute('''\n",
" WITH RECURSIVE chain(wallet, depth) AS (\n",
" SELECT object, 1 FROM triples WHERE subject='wallet_001' AND predicate='transfers_to'\n",
" UNION ALL\n",
" SELECT t.object, c.depth+1 FROM triples t JOIN chain c ON t.subject=c.wallet\n",
" WHERE t.predicate='transfers_to' AND c.depth < 5\n",
" ) SELECT DISTINCT wallet FROM chain\n",
" ''').fetchall()]\n",
" results['q3_infra'] = [r[0] for r in tdb.execute('''\n",
" SELECT DISTINCT object FROM triples\n",
" WHERE subject IN ('person_001','person_002')\n",
" AND predicate IN ('operates','controls','uses','develops','hosts')\n",
" AND object_type='uri'\n",
" ''').fetchall()]\n",
" results['q4_signals'] = [r[0] for r in tdb.execute('''\n",
" SELECT t1.subject FROM triples t1\n",
" JOIN triples t2 ON t1.subject = t2.subject\n",
" WHERE t1.predicate='rdf:type' AND t1.object='trading_signal'\n",
" AND t2.predicate='confidence' AND CAST(t2.object AS REAL) > 0.8\n",
" ''').fetchall()]\n",
" results['q5_high_risk'] = [r[0] for r in tdb.execute('''\n",
" SELECT subject FROM triples WHERE predicate='risk_score' AND CAST(object AS INTEGER) > 70\n",
" ''').fetchall()]\n",
" results['q6_path'] = [r[:3] for r in tdb.execute('''\n",
" SELECT t1.subject, t1.object, t2.subject\n",
" FROM triples t1 JOIN triples t2 ON t1.object = t2.subject\n",
" WHERE t1.subject = 'person_001' AND t2.object = 'person_004'\n",
" AND t1.object_type = 'uri' AND t2.object_type = 'uri'\n",
" ''').fetchall()]\n",
" results['q7_new_domains'] = [r[0] for r in tdb.execute('''\n",
" SELECT t1.subject FROM triples t1\n",
" JOIN triples t2 ON t1.subject = t2.subject\n",
" WHERE t1.predicate='rdf:type' AND t1.object='domain'\n",
" AND t2.predicate='created_date' AND t2.object > '2024-01-01'\n",
" ''').fetchall()]\n",
" results['q8_subgraph'] = [r[0] for r in tdb.execute('''\n",
" WITH RECURSIVE neighbors(node, depth) AS (\n",
" SELECT 'wallet_bridge', 0\n",
" UNION\n",
" SELECT CASE WHEN t.subject = n.node THEN t.object ELSE t.subject END, n.depth+1\n",
" FROM triples t JOIN neighbors n ON (t.subject = n.node OR t.object = n.node)\n",
" WHERE t.object_type = 'uri' AND n.depth < 2\n",
" ) SELECT DISTINCT node FROM neighbors WHERE node != 'wallet_bridge'\n",
" ''').fetchall()]\n",
" return results\n",
"\n",
"def bench_sparql(store):\n",
" results = {}\n",
" NS = 'http://osint.local/'\n",
" def q(sparql):\n",
" return [str(r[0]).replace(NS,'') for r in store.query(sparql)]\n",
" \n",
" results['q1_wallets'] = q(f'SELECT ?w WHERE {{ <{NS}person_001> <{NS}owns> ?w }}')\n",
" results['q2_chain'] = q(f'SELECT DISTINCT ?w WHERE {{ <{NS}wallet_001> <{NS}transfers_to>+ ?w }}')\n",
" results['q3_infra'] = q(f'''\n",
" SELECT DISTINCT ?t WHERE {{\n",
" VALUES ?actor {{ <{NS}person_001> <{NS}person_002> }}\n",
" ?actor ?rel ?t .\n",
" FILTER(?rel IN (<{NS}operates>, <{NS}controls>, <{NS}uses>, <{NS}develops>, <{NS}hosts>))\n",
" }}\n",
" ''')\n",
" results['q4_signals'] = q(f'''\n",
" SELECT ?s WHERE {{\n",
" ?s a <{NS}trading_signal> .\n",
" ?s <{NS}confidence> ?c .\n",
" FILTER(xsd:float(?c) > 0.8)\n",
" }}\n",
" ''')\n",
" results['q5_high_risk'] = q(f'''\n",
" SELECT ?e WHERE {{\n",
" ?e <{NS}risk_score> ?r .\n",
" FILTER(xsd:integer(?r) > 70)\n",
" }}\n",
" ''')\n",
" results['q6_path'] = [] # SPARQL property paths don't easily do filtered 2-hop\n",
" try:\n",
" r = store.query(f'''\n",
" SELECT ?mid WHERE {{\n",
" <{NS}person_001> ?r1 ?mid .\n",
" ?mid ?r2 <{NS}person_004> .\n",
" }}\n",
" ''')\n",
" results['q6_path'] = [str(row[0]).replace(NS,'') for row in r]\n",
" except: pass\n",
" results['q7_new_domains'] = q(f'''\n",
" SELECT ?d WHERE {{\n",
" ?d a <{NS}domain> .\n",
" ?d <{NS}created_date> ?dt .\n",
" FILTER(?dt > \"2024-01-01\")\n",
" }}\n",
" ''')\n",
" results['q8_subgraph'] = q(f'''\n",
" SELECT DISTINCT ?n WHERE {{\n",
" {{ <{NS}wallet_bridge> ?p1 ?n . FILTER(isIRI(?n)) }}\n",
" UNION\n",
" {{ ?n ?p2 <{NS}wallet_bridge> . FILTER(isIRI(?n)) }}\n",
" UNION\n",
" {{ <{NS}wallet_bridge> ?p3 ?mid . ?mid ?p4 ?n . FILTER(isIRI(?mid) && isIRI(?n)) }}\n",
" UNION\n",
" {{ ?mid ?p5 <{NS}wallet_bridge> . ?n ?p6 ?mid . FILTER(isIRI(?mid) && isIRI(?n)) }}\n",
" }}\n",
" ''')\n",
" return results\n",
"\n",
"# Run benchmarks\n",
"N_RUNS = 50\n",
"\n",
"# ops.db\n",
"t0 = time.perf_counter()\n",
"for _ in range(N_RUNS): ops_results = bench_ops(conn)\n",
"ops_query_time = (time.perf_counter() - t0) / N_RUNS\n",
"\n",
"# triples.db\n",
"t0 = time.perf_counter()\n",
"for _ in range(N_RUNS): triple_results = bench_triples(tdb)\n",
"triple_query_time = (time.perf_counter() - t0) / N_RUNS\n",
"\n",
"# oxigraph\n",
"t0 = time.perf_counter()\n",
"for _ in range(N_RUNS): ox_results = bench_sparql(store)\n",
"ox_query_time = (time.perf_counter() - t0) / N_RUNS\n",
"\n",
"# Cold start\n",
"conn.close(); tdb.close(); del store\n",
"\n",
"t0 = time.perf_counter()\n",
"_c = sqlite3.connect(OPS_DB)\n",
"_c.execute(\"SELECT to_entity FROM relations WHERE from_entity='person_001' AND name='owns'\").fetchall()\n",
"_c.close()\n",
"ops_cold = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"_t = sqlite3.connect(TRIPLE_DB)\n",
"_t.execute(\"SELECT object FROM triples WHERE subject='person_001' AND predicate='owns'\").fetchall()\n",
"_t.close()\n",
"triple_cold = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"_s = ox.Store(OX_PATH)\n",
"list(_s.query(f'SELECT ?w WHERE {{ <{NS}person_001> <{NS}owns> ?w }}'))\n",
"ox_cold = time.perf_counter() - t0\n",
"\n",
"# Reopen\n",
"conn = sqlite3.connect(OPS_DB)\n",
"tdb = sqlite3.connect(TRIPLE_DB)\n",
"store = ox.Store(OX_PATH)\n",
"\n",
"print('BENCHMARK RESULTS (8 queries, avg of 50 runs):')\n",
"print(f' operations.db: insert={ops_insert_time*1000:.1f}ms queries={ops_query_time*1000:.1f}ms cold={ops_cold*1000:.1f}ms')\n",
"print(f' triple store: insert={triple_insert_time*1000:.1f}ms queries={triple_query_time*1000:.1f}ms cold={triple_cold*1000:.1f}ms')\n",
"print(f' oxigraph: insert={ox_insert_time*1000:.1f}ms queries={ox_query_time*1000:.1f}ms cold={ox_cold*1000:.1f}ms')\n",
"\n",
"print('\\nCross-validation (Q1-Q5, Q7):')\n",
"for q in ['q1_wallets','q2_chain','q3_infra','q4_signals','q5_high_risk','q7_new_domains']:\n",
" o = sorted(ops_results.get(q,[]))\n",
" t = sorted(triple_results.get(q,[]))\n",
" x = sorted(ox_results.get(q,[]))\n",
" ot = o == t\n",
" ox_match = o == x\n",
" print(f' {q:18s}: ops={len(o):2d} triple={len(t):2d} [{\"OK\" if ot else \"DIFF\"}] oxigraph={len(x):2d} [{\"OK\" if ox_match else \"DIFF\"}]')"
]
},
{
"cell_type": "markdown",
"id": "s6",
"metadata": {},
"source": [
"## 6. LLM Retrieval: claude -p genera queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "llm-retrieval",
"metadata": {},
"outputs": [],
"source": [
"# === LLM RETRIEVAL ===\n",
"import subprocess, re as _re\n",
"\n",
"SCHEMAS_OSINT = {\n",
" 'ops_sql': 'SQLite operations.db. Tablas: entities(id TEXT PK, name TEXT, type_ref TEXT, status TEXT, domain TEXT, tags TEXT JSON, source TEXT, metadata TEXT JSON), relations(id TEXT PK, name TEXT, from_entity TEXT, to_entity TEXT, via TEXT, direction TEXT, weight REAL, status TEXT). metadata contiene campos como risk_score, country, balance, confidence, address, etc. Usa json_extract(metadata,\"$.campo\") para acceder a metadata. Tipos: person, organization, ip_address, domain, crypto_wallet, trading_signal, vulnerability, malware, email. Relaciones: owns, operates, controls, transfers_to, resolves_to, hosts, exploits, develops, communicates_with, employs, uses, generates, facilitates, uses_email.',\n",
" 'triple_sql': 'SQLite triple store. Tabla: triples(subject TEXT, predicate TEXT, object TEXT, object_type TEXT). Predicados de tipo: rdf:type. Predicados de propiedad: name, risk_score, country, confidence, balance, address, created_date, etc. Predicados de relacion: owns, operates, transfers_to, hosts, exploits, etc. object_type es uri para entidades y literal para valores. Usa CTEs recursivos para traversal multi-hop.',\n",
" 'sparql': 'SPARQL sobre Oxigraph. Namespace: osint: <http://osint.local/>. Entidades: osint:<id> con rdf:type osint:<tipo>. Propiedades: osint:name, osint:risk_score (literal), etc. Relaciones: osint:owns, osint:transfers_to, etc. Soporta property paths (+, *, {n,m}). Usa xsd:integer() o xsd:float() para comparaciones numericas.',\n",
"}\n",
"\n",
"QUESTIONS_OSINT = [\n",
" ('q1', 'Que crypto wallets posee el actor person_001?', 'easy'),\n",
" ('q2', 'Cadena completa de transferencias crypto desde wallet_001 (max 5 saltos)', 'hard'),\n",
" ('q3', 'Infraestructura (IPs, dominios, malware) operada por person_001 y person_002', 'medium'),\n",
" ('q4', 'Trading signals con confidence mayor a 0.8', 'easy'),\n",
" ('q5', 'Entidades con risk_score mayor a 70', 'easy'),\n",
" ('q6', 'Existe conexion entre person_001 y person_004 via organizaciones?', 'hard'),\n",
" ('q7', 'Dominios registrados despues de 2024-01-01', 'medium'),\n",
" ('q8', 'Todos los nodos a 2 saltos de wallet_bridge', 'medium'),\n",
"]\n",
"\n",
"def ask_claude_osint(schema_name, schema_text, question):\n",
" prompt = f'Genera SOLO la query (sin explicaciones, sin markdown) para responder esta pregunta sobre un grafo de inteligencia OSINT.\\n\\nSCHEMA: {schema_text}\\n\\nPREGUNTA: {question}\\n\\nResponde UNICAMENTE con la query ejecutable.'\n",
" t0 = time.perf_counter()\n",
" try:\n",
" r = subprocess.run(['claude', '-p', prompt, '--model', 'haiku'],\n",
" capture_output=True, text=True, timeout=45,\n",
" cwd=os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry')))\n",
" elapsed = time.perf_counter() - t0\n",
" query = r.stdout.strip()\n",
" query = _re.sub(r'^```\\w*\\n', '', query)\n",
" query = _re.sub(r'\\n```$', '', query)\n",
" return {'query': query.strip(), 'time_s': round(elapsed, 2), 'ok': True, 'error': None}\n",
" except Exception as e:\n",
" return {'query': '', 'time_s': round(time.perf_counter()-t0, 2), 'ok': False, 'error': str(e)}\n",
"\n",
"print('Ejecutando LLM retrieval (8 preguntas x 3 backends = 24 llamadas)...')\n",
"llm_results = []\n",
"for qid, question, difficulty in QUESTIONS_OSINT:\n",
" print(f'\\n--- {qid} [{difficulty}] ---')\n",
" for schema_name, schema_text in SCHEMAS_OSINT.items():\n",
" r = ask_claude_osint(schema_name, schema_text, question)\n",
" r['qid'] = qid; r['difficulty'] = difficulty; r['schema'] = schema_name\n",
" llm_results.append(r)\n",
" q_preview = r['query'][:80].replace('\\n',' ') if r['query'] else '(empty)'\n",
" print(f' {schema_name:10s} {r[\"time_s\"]:5.1f}s [{\"OK\" if r[\"ok\"] else \"ERR\"}] {q_preview}')\n",
"\n",
"df_llm = pd.DataFrame(llm_results)\n",
"print(f'\\nTotal: {len(df_llm)} queries, {df_llm[\"ok\"].sum()} generadas OK')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "llm-execute",
"metadata": {},
"outputs": [],
"source": [
"# === EJECUTAR QUERIES DEL LLM ===\n",
"def try_ops_sql(query):\n",
" try:\n",
" c = sqlite3.connect(OPS_DB)\n",
" r = c.execute(query).fetchall()\n",
" c.close()\n",
" return True, len(r), None\n",
" except Exception as e:\n",
" return False, 0, str(e)[:150]\n",
"\n",
"def try_triple_sql(query):\n",
" try:\n",
" t = sqlite3.connect(TRIPLE_DB)\n",
" r = t.execute(query).fetchall()\n",
" t.close()\n",
" return True, len(r), None\n",
" except Exception as e:\n",
" return False, 0, str(e)[:150]\n",
"\n",
"def try_sparql_ox(query):\n",
" try:\n",
" s = ox.Store(OX_PATH)\n",
" r = list(s.query(query))\n",
" return True, len(r), None\n",
" except Exception as e:\n",
" return False, 0, str(e)[:150]\n",
"\n",
"exec_results = []\n",
"for _, row in df_llm.iterrows():\n",
" if not row['ok']:\n",
" exec_results.append({'exec_ok': False, 'exec_count': 0, 'exec_error': 'generation failed'})\n",
" continue\n",
" schema, query = row['schema'], row['query']\n",
" if schema == 'ops_sql':\n",
" ok, count, err = try_ops_sql(query)\n",
" elif schema == 'triple_sql':\n",
" ok, count, err = try_triple_sql(query)\n",
" elif schema == 'sparql':\n",
" ok, count, err = try_sparql_ox(query)\n",
" else:\n",
" ok, count, err = False, 0, 'unknown'\n",
" exec_results.append({'exec_ok': ok, 'exec_count': count, 'exec_error': err})\n",
"\n",
"df_llm_exec = pd.concat([df_llm.reset_index(drop=True), pd.DataFrame(exec_results)], axis=1)\n",
"\n",
"print('LLM Query Execution Results:')\n",
"print('=' * 60)\n",
"for schema in SCHEMAS_OSINT:\n",
" sub = df_llm_exec[df_llm_exec['schema'] == schema]\n",
" n_ok = (sub['exec_ok'] == True).sum()\n",
" print(f' {schema:12s}: {n_ok}/{len(sub)} executed successfully')\n",
" for _, f in sub[sub['exec_ok'] == False].iterrows():\n",
" print(f' FAIL [{f[\"qid\"]}]: {f[\"exec_error\"][:80]}')"
]
},
{
"cell_type": "markdown",
"id": "s7",
"metadata": {},
"source": [
"## 7. Sigma.js Visualization"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "sigma",
"metadata": {},
"outputs": [],
"source": [
"# === SIGMA.JS HTML ===\n",
"from datascience.ops_to_sigma_json import ops_to_sigma_json\n",
"from datascience.render_sigma_html import render_sigma_html\n",
"\n",
"graph_data = ops_to_sigma_json(OPS_DB)\n",
"html_path = render_sigma_html(graph_data, os.path.join(OUTPUT_DIR, 'osint_graph.html'), title='OSINT Intelligence Graph')\n",
"\n",
"print(f'Sigma.js HTML: {html_path}')\n",
"print(f' Nodos: {len(graph_data[\"nodes\"])}')\n",
"print(f' Edges: {len(graph_data[\"edges\"])}')\n",
"print(f' Size: {os.path.getsize(html_path)/1024:.1f}KB')\n",
"print(f' Abre en browser: file://{os.path.abspath(html_path)}')"
]
},
{
"cell_type": "markdown",
"id": "s8",
"metadata": {},
"source": [
"## 8. PDF Report"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "pdf-report",
"metadata": {},
"outputs": [],
"source": [
"# === PDF REPORT ===\n",
"from matplotlib.backends.backend_pdf import PdfPages\n",
"import numpy as np\n",
"\n",
"pdf_path = os.path.join(OUTPUT_DIR, 'osint_intelligence_report.pdf')\n",
"\n",
"with PdfPages(pdf_path) as pdf:\n",
" # PAGE 1: Title + Summary\n",
" fig = plt.figure(figsize=(11, 8.5))\n",
" fig.text(0.5, 0.88, 'OSINT Intelligence Graph', ha='center', fontsize=24, fontweight='bold')\n",
" fig.text(0.5, 0.82, 'SQLite Triple Store vs Oxigraph vs operations.db', ha='center', fontsize=14, color='gray')\n",
" fig.text(0.5, 0.76, f'{len(entities)} entidades, {len(relations)} relaciones, 3 backends', ha='center', fontsize=12)\n",
" \n",
" # Compute success rates\n",
" sr = {}\n",
" for schema in SCHEMAS_OSINT:\n",
" sub = df_llm_exec[df_llm_exec['schema'] == schema]\n",
" sr[schema] = (sub['exec_ok'] == True).sum()\n",
" \n",
" summary = (\n",
" f'RESULTADOS CLAVE\\n'\n",
" f'\\n'\n",
" f'Benchmark (8 queries, avg 50 runs):\\n'\n",
" f' operations.db: queries={ops_query_time*1000:.1f}ms cold_start={ops_cold*1000:.1f}ms\\n'\n",
" f' triple store: queries={triple_query_time*1000:.1f}ms cold_start={triple_cold*1000:.1f}ms\\n'\n",
" f' oxigraph: queries={ox_query_time*1000:.1f}ms cold_start={ox_cold*1000:.1f}ms\\n'\n",
" f'\\n'\n",
" f'LLM Query Generation (claude -p haiku, 24 queries):\\n'\n",
" f' ops_sql: {sr[\"ops_sql\"]}/8 ejecutan sin error\\n'\n",
" f' triple_sql: {sr[\"triple_sql\"]}/8 ejecutan sin error\\n'\n",
" f' sparql: {sr[\"sparql\"]}/8 ejecutan sin error\\n'\n",
" f'\\n'\n",
" f'Assertions (operations.db):\\n'\n",
" f' Total: {len(df_assert)}\\n'\n",
" f' Pass: {(df_assert[\"status\"]==\"pass\").sum()}\\n'\n",
" f' Fail: {(df_assert[\"status\"]==\"fail\").sum()}\\n'\n",
" f'\\n'\n",
" f'Sigma.js: {len(graph_data[\"nodes\"])} nodos, {len(graph_data[\"edges\"])} edges\\n'\n",
" f' HTML interactivo en data/output/osint_graph.html\\n'\n",
" f'\\n'\n",
" f'RECOMENDACION:\\n'\n",
" f' operations.db (SQL) es el backend optimo para AI retrieval.\\n'\n",
" f' Combina schema relacional rico + assertions + LLM compatibility.'\n",
" )\n",
" fig.text(0.08, 0.03, summary, fontsize=10, fontfamily='monospace', verticalalignment='bottom')\n",
" pdf.savefig(fig); plt.close()\n",
" \n",
" # PAGE 2: Dataset\n",
" fig, axes = plt.subplots(1, 2, figsize=(11, 6))\n",
" fig.suptitle('Dataset OSINT', fontsize=16, fontweight='bold')\n",
" \n",
" type_counts = Counter(e['type_ref'] for e in entities)\n",
" colors_type = {'person': '#e74c3c', 'organization': '#3498db', 'ip_address': '#2ecc71',\n",
" 'domain': '#f39c12', 'crypto_wallet': '#f1c40f', 'trading_signal': '#9b59b6',\n",
" 'vulnerability': '#e67e22', 'malware': '#c0392b', 'email': '#1abc9c'}\n",
" \n",
" ax = axes[0]\n",
" types_sorted = type_counts.most_common()\n",
" ax.barh([t[0] for t in types_sorted], [t[1] for t in types_sorted],\n",
" color=[colors_type.get(t[0], 'gray') for t in types_sorted])\n",
" ax.set_xlabel('Count'); ax.set_title('Entidades por tipo')\n",
" \n",
" ax = axes[1]\n",
" rel_counts = Counter(r[1] for r in relations).most_common()\n",
" ax.barh([r[0] for r in rel_counts], [r[1] for r in rel_counts], color='#3498db')\n",
" ax.set_xlabel('Count'); ax.set_title('Relaciones por tipo')\n",
" \n",
" plt.tight_layout(rect=[0, 0, 1, 0.93])\n",
" pdf.savefig(fig); plt.close()\n",
" \n",
" # PAGE 3: Benchmark\n",
" fig, axes = plt.subplots(1, 3, figsize=(11, 5))\n",
" fig.suptitle('Benchmark: 3 Backends', fontsize=16, fontweight='bold')\n",
" backends_names = ['ops.db', 'triples.db', 'oxigraph']\n",
" colors_b = ['#2ecc71', '#3498db', '#e74c3c']\n",
" \n",
" ax = axes[0]\n",
" vals = [ops_insert_time*1000, triple_insert_time*1000, ox_insert_time*1000]\n",
" bars = ax.bar(backends_names, vals, color=colors_b)\n",
" ax.set_ylabel('ms'); ax.set_title('Insert Time')\n",
" for b, v in zip(bars, vals): ax.text(b.get_x()+b.get_width()/2, b.get_height()+0.5, f'{v:.1f}', ha='center', fontsize=9)\n",
" \n",
" ax = axes[1]\n",
" vals = [ops_query_time*1000, triple_query_time*1000, ox_query_time*1000]\n",
" bars = ax.bar(backends_names, vals, color=colors_b)\n",
" ax.set_ylabel('ms'); ax.set_title('8 Queries (avg 50 runs)')\n",
" for b, v in zip(bars, vals): ax.text(b.get_x()+b.get_width()/2, b.get_height()+0.1, f'{v:.1f}', ha='center', fontsize=9)\n",
" \n",
" ax = axes[2]\n",
" vals = [ops_cold*1000, triple_cold*1000, ox_cold*1000]\n",
" bars = ax.bar(backends_names, vals, color=colors_b)\n",
" ax.set_ylabel('ms'); ax.set_title('Cold Start')\n",
" for b, v in zip(bars, vals): ax.text(b.get_x()+b.get_width()/2, b.get_height()+0.1, f'{v:.1f}', ha='center', fontsize=9)\n",
" \n",
" plt.tight_layout(rect=[0, 0, 1, 0.92])\n",
" pdf.savefig(fig); plt.close()\n",
" \n",
" # PAGE 4: LLM Retrieval\n",
" fig, axes = plt.subplots(1, 2, figsize=(11, 5))\n",
" fig.suptitle('LLM Query Generation (claude -p haiku)', fontsize=16, fontweight='bold')\n",
" \n",
" ax = axes[0]\n",
" schemas = list(SCHEMAS_OSINT.keys())\n",
" success_rates = [(df_llm_exec[df_llm_exec['schema']==s]['exec_ok']==True).sum()/8*100 for s in schemas]\n",
" bars = ax.bar(schemas, success_rates, color=colors_b)\n",
" ax.set_ylabel('% queries ejecutables'); ax.set_title('Tasa de exito'); ax.set_ylim(0, 110)\n",
" for b, v in zip(bars, success_rates): ax.text(b.get_x()+b.get_width()/2, b.get_height()+1, f'{v:.0f}%', ha='center')\n",
" \n",
" ax = axes[1]\n",
" avg_times = [df_llm_exec[df_llm_exec['schema']==s]['time_s'].mean() for s in schemas]\n",
" bars = ax.bar(schemas, avg_times, color=colors_b)\n",
" ax.set_ylabel('s'); ax.set_title('Tiempo promedio generacion')\n",
" for b, v in zip(bars, avg_times): ax.text(b.get_x()+b.get_width()/2, b.get_height()+0.1, f'{v:.1f}s', ha='center')\n",
" \n",
" plt.tight_layout(rect=[0, 0, 1, 0.92])\n",
" pdf.savefig(fig); plt.close()\n",
" \n",
" # PAGE 5: Assertions\n",
" fig, axes = plt.subplots(1, 2, figsize=(11, 5))\n",
" fig.suptitle('Assertions sobre inteligencia OSINT', fontsize=16, fontweight='bold')\n",
" \n",
" ax = axes[0]\n",
" status_counts = df_assert.groupby('status').size()\n",
" ax.pie(status_counts.values, labels=status_counts.index, autopct='%1.0f%%',\n",
" colors=['#2ecc71' if s=='pass' else '#e74c3c' if s=='fail' else '#f39c12' for s in status_counts.index])\n",
" ax.set_title('Resultados')\n",
" \n",
" ax = axes[1]\n",
" sev_status = df_assert.groupby(['severity','status']).size().unstack(fill_value=0)\n",
" sev_status.plot(kind='bar', ax=ax, color={'pass':'#2ecc71','fail':'#e74c3c','skip':'#f39c12'})\n",
" ax.set_title('Por severity'); ax.set_ylabel('Count'); ax.set_xticklabels(ax.get_xticklabels(), rotation=0)\n",
" \n",
" plt.tight_layout(rect=[0, 0, 1, 0.92])\n",
" pdf.savefig(fig); plt.close()\n",
" \n",
" # PAGE 6: Recommendations\n",
" fig = plt.figure(figsize=(11, 8.5))\n",
" fig.text(0.5, 0.92, 'Recomendaciones', ha='center', fontsize=18, fontweight='bold')\n",
" rec = '''\n",
"RANKING PARA SISTEMA OSINT CON AI RETRIEVAL\n",
"\n",
"1. operations.db (SQL) [RECOMENDADO]\n",
" + Schema rico: entities con metadata JSON + relations con weight + assertions\n",
" + LLM genera SQL correcto (mayor tasa de exito)\n",
" + Assertions evaluan calidad de inteligencia automaticamente\n",
" + json_extract() permite queries sobre metadata sin schema rigido\n",
" + Ya integrado en fn_registry (fn ops CLI, reactive loop)\n",
" + Sigma.js se alimenta directamente del mismo backend\n",
" - No tiene inferencia semantica nativa\n",
"\n",
"2. SQLite Triple Store\n",
" + Simple y rapido para traversal con CTEs\n",
" + Modelo flexible (cualquier predicado sin schema)\n",
" + LLM genera SQL razonable\n",
" - Pierde la riqueza de operations.db (no assertions, no executions)\n",
" - Queries de property filter requieren JOINs verbosos\n",
" - No aporta nada que operations.db no haga mejor\n",
"\n",
"3. Oxigraph (SPARQL)\n",
" + Property paths nativos (+, *) para traversal\n",
" + Estandar W3C, interoperable\n",
" + Buen rendimiento para un triple store\n",
" - LLM genera SPARQL con mas errores\n",
" - Casting numerico fragil (xsd:integer, xsd:float)\n",
" - Overhead de namespaces y URIs\n",
" - No justifica la complejidad extra para este caso\n",
"\n",
"CONCLUSION:\n",
"Para un sistema OSINT con AI retrieval, operations.db es superior porque\n",
"combina almacenamiento de grafos (entities+relations) con calidad de datos\n",
"(assertions) y trazabilidad (executions). El LLM consulta via SQL con\n",
"json_extract, que es el patron con mayor tasa de exito.\n",
"\n",
"PROXIMOS PASOS:\n",
"- Crear app en apps/ con operations.db para OSINT real\n",
"- Implementar funciones: validate_cve_id, validate_crypto_address, geoip_lookup\n",
"- Conectar embeddings para busqueda semantica sobre entity descriptions\n",
"- Pipeline de ingestion automatica desde fuentes OSINT\n",
"'''\n",
" fig.text(0.06, 0.03, rec, fontsize=10, fontfamily='monospace', verticalalignment='bottom')\n",
" pdf.savefig(fig); plt.close()\n",
"\n",
"print(f'PDF: {os.path.abspath(pdf_path)}')\n",
"print(f'Size: {os.path.getsize(pdf_path)/1024:.1f}KB')"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.13.0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}