Files
retrieving_graphs/notebooks/.ipynb_checkpoints/01_graph_backends-checkpoint.ipynb
T

1039 lines
40 KiB
Plaintext

{
"cells": [
{
"cell_type": "markdown",
"id": "intro",
"metadata": {},
"source": [
"# Comparativa: bases de datos de grafos embebidas + LLM retrieval\n",
"\n",
"## Objetivo\n",
"\n",
"1. Cargar el grafo de dependencias de fn_registry en 5 backends de grafos\n",
"2. Benchmark: insercion, traversal, persistencia\n",
"3. Evaluar como un LLM (`claude -p`) genera queries para recuperar datos de cada backend\n",
"\n",
"## Backends\n",
"\n",
"| Backend | Query Language | Tipo |\n",
"|---|---|---|\n",
"| **Kuzu** | Cypher | Graph DB embebida |\n",
"| **NetworkX** | API Python | Libreria in-memory |\n",
"| **SQLite + CTEs** | SQL recursivo | Relacional |\n",
"| **RDFLib** | SPARQL | Triple store |\n",
"| **igraph** | API Python | Libreria C/Python |\n",
"\n",
"## Grafo de prueba\n",
"\n",
"El propio fn_registry: ~354 funciones + 39 tipos como nodos, dependencias (uses_functions, uses_types, returns) como aristas."
]
},
{
"cell_type": "markdown",
"id": "section-1",
"metadata": {},
"source": [
"## 1. Extraer grafo desde registry.db"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "extract-graph",
"metadata": {},
"outputs": [],
"source": [
"import sqlite3\n",
"import json\n",
"import os\n",
"import time\n",
"import shutil\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"\n",
"plt.style.use('seaborn-v0_8-whitegrid')\n",
"\n",
"FN_ROOT = os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry'))\n",
"DB_PATH = os.path.join(FN_ROOT, 'registry.db')\n",
"\n",
"conn = sqlite3.connect(DB_PATH)\n",
"conn.row_factory = sqlite3.Row\n",
"\n",
"# Nodos: funciones\n",
"functions = [dict(r) for r in conn.execute(\n",
" 'SELECT id, name, kind, lang, domain, purity, signature, description, '\n",
" 'uses_functions, uses_types, returns, returns_optional, error_type, tags '\n",
" 'FROM functions ORDER BY name'\n",
").fetchall()]\n",
"\n",
"# Nodos: tipos\n",
"types = [dict(r) for r in conn.execute(\n",
" 'SELECT id, name, lang, domain, algebraic, description, uses_types, tags '\n",
" 'FROM types ORDER BY name'\n",
").fetchall()]\n",
"\n",
"conn.close()\n",
"\n",
"# Construir aristas\n",
"edges = [] # (source_id, target_id, relation_type)\n",
"\n",
"for f in functions:\n",
" fid = f['id']\n",
" # uses_functions\n",
" for dep in json.loads(f.get('uses_functions') or '[]'):\n",
" edges.append((fid, dep, 'uses_function'))\n",
" # uses_types\n",
" for dep in json.loads(f.get('uses_types') or '[]'):\n",
" edges.append((fid, dep, 'uses_type'))\n",
" # returns\n",
" ret = f.get('returns') or ''\n",
" if ret:\n",
" edges.append((fid, ret, 'returns'))\n",
" # error_type\n",
" err = f.get('error_type') or ''\n",
" if err:\n",
" edges.append((fid, err, 'error_type'))\n",
"\n",
"for t in types:\n",
" tid = t['id']\n",
" for dep in json.loads(t.get('uses_types') or '[]'):\n",
" edges.append((tid, dep, 'uses_type'))\n",
"\n",
"# Todos los IDs de nodos referenciados\n",
"all_node_ids = set(f['id'] for f in functions) | set(t['id'] for t in types)\n",
"# Nodos por ID para lookup rapido\n",
"node_map = {f['id']: {**f, 'node_type': 'function'} for f in functions}\n",
"node_map.update({t['id']: {**t, 'node_type': 'type'} for t in types})\n",
"\n",
"# Filtrar aristas a nodos que existen\n",
"valid_edges = [(s, t, r) for s, t, r in edges if s in all_node_ids and t in all_node_ids]\n",
"\n",
"print(f'Nodos: {len(all_node_ids)} ({len(functions)} funciones + {len(types)} tipos)')\n",
"print(f'Aristas: {len(valid_edges)} (de {len(edges)} totales, {len(edges) - len(valid_edges)} con target inexistente)')\n",
"print(f'Relaciones: {set(r for _, _, r in valid_edges)}')"
]
},
{
"cell_type": "markdown",
"id": "section-2",
"metadata": {},
"source": [
"## 2. Benchmark framework"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bench-framework",
"metadata": {},
"outputs": [],
"source": [
"DATA_DIR = 'data/graph_bench'\n",
"os.makedirs(DATA_DIR, exist_ok=True)\n",
"\n",
"# Queries de traversal para benchmark (respuestas verificables contra el grafo)\n",
"BENCH_QUERIES = [\n",
" ('direct_deps', 'Funciones que usa directamente filter_slice_go_core'),\n",
" ('reverse_deps', 'Funciones que dependen de error_go_core'),\n",
" ('two_hop', 'Dependencias a 2 saltos desde init_metabase_go_pipelines'),\n",
" ('domain_subgraph', 'Todas las aristas entre funciones del dominio finance'),\n",
" ('most_connected', 'Top 5 nodos con mas conexiones (in + out degree)'),\n",
" ('path_exists', 'Existe un camino entre cualquier funcion de finance y error_go_core?'),\n",
" ('isolated', 'Funciones sin ninguna dependencia (ni entrante ni saliente)'),\n",
" ('type_users', 'Funciones que usan el tipo SMA_go_finance'),\n",
"]\n",
"\n",
"def dir_size_mb(path):\n",
" total = 0\n",
" if os.path.isfile(path):\n",
" return os.path.getsize(path) / (1024*1024)\n",
" if not os.path.exists(path):\n",
" return 0\n",
" for dp, dn, fns in os.walk(path):\n",
" for f in fns:\n",
" fp = os.path.join(dp, f)\n",
" if os.path.exists(fp):\n",
" total += os.path.getsize(fp)\n",
" return total / (1024*1024)\n",
"\n",
"def cleanup_path(path):\n",
" if os.path.isfile(path):\n",
" os.remove(path)\n",
" elif os.path.isdir(path):\n",
" shutil.rmtree(path, ignore_errors=True)\n",
" for suffix in ['.db', '.pickle', '.graphml']:\n",
" p = path + suffix\n",
" if os.path.exists(p):\n",
" os.remove(p)\n",
"\n",
"print(f'Benchmark queries: {len(BENCH_QUERIES)}')\n",
"for qid, desc in BENCH_QUERIES:\n",
" print(f' {qid}: {desc}')"
]
},
{
"cell_type": "markdown",
"id": "section-3",
"metadata": {},
"source": [
"## 3. Backend: NetworkX (baseline)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "networkx-impl",
"metadata": {},
"outputs": [],
"source": [
"import networkx as nx\n",
"import pickle\n",
"\n",
"def nx_insert(nodes, edges_list, path):\n",
" G = nx.DiGraph()\n",
" for nid, attrs in nodes.items():\n",
" G.add_node(nid, **{k: v for k, v in attrs.items() if isinstance(v, (str, int, float, bool))})\n",
" for src, tgt, rel in edges_list:\n",
" G.add_edge(src, tgt, relation=rel)\n",
" return G\n",
"\n",
"def nx_queries(G):\n",
" results = {}\n",
" \n",
" # direct_deps\n",
" if 'filter_slice_go_core' in G:\n",
" results['direct_deps'] = list(G.successors('filter_slice_go_core'))\n",
" else:\n",
" results['direct_deps'] = []\n",
" \n",
" # reverse_deps\n",
" if 'error_go_core' in G:\n",
" results['reverse_deps'] = list(G.predecessors('error_go_core'))\n",
" else:\n",
" results['reverse_deps'] = []\n",
" \n",
" # two_hop\n",
" two_hop = set()\n",
" if 'init_metabase_go_pipelines' in G:\n",
" for n1 in G.successors('init_metabase_go_pipelines'):\n",
" for n2 in G.successors(n1):\n",
" two_hop.add(n2)\n",
" results['two_hop'] = list(two_hop)\n",
" \n",
" # domain_subgraph\n",
" finance_nodes = [n for n, d in G.nodes(data=True) if d.get('domain') == 'finance']\n",
" finance_edges = [(u, v) for u, v in G.edges() if u in finance_nodes and v in finance_nodes]\n",
" results['domain_subgraph'] = finance_edges\n",
" \n",
" # most_connected\n",
" degree = sorted(((n, G.in_degree(n) + G.out_degree(n)) for n in G.nodes()), key=lambda x: -x[1])[:5]\n",
" results['most_connected'] = degree\n",
" \n",
" # path_exists\n",
" if 'error_go_core' in G:\n",
" has_path = any(\n",
" nx.has_path(G, n, 'error_go_core')\n",
" for n in finance_nodes if n != 'error_go_core'\n",
" )\n",
" results['path_exists'] = has_path\n",
" else:\n",
" results['path_exists'] = False\n",
" \n",
" # isolated\n",
" results['isolated'] = [n for n in G.nodes() if G.degree(n) == 0]\n",
" \n",
" # type_users\n",
" if 'SMA_go_finance' in G:\n",
" results['type_users'] = list(G.predecessors('SMA_go_finance'))\n",
" else:\n",
" results['type_users'] = []\n",
" \n",
" return results\n",
"\n",
"def nx_save(G, path):\n",
" with open(path + '.pickle', 'wb') as f:\n",
" pickle.dump(G, f)\n",
"\n",
"def nx_load(path):\n",
" with open(path + '.pickle', 'rb') as f:\n",
" return pickle.load(f)\n",
"\n",
"# Benchmark\n",
"path = os.path.join(DATA_DIR, 'networkx')\n",
"cleanup_path(path)\n",
"\n",
"t0 = time.perf_counter()\n",
"G_nx = nx_insert(node_map, valid_edges, path)\n",
"nx_insert_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"nx_results = nx_queries(G_nx)\n",
"nx_query_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"nx_save(G_nx, path)\n",
"nx_save_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"G_loaded = nx_load(path)\n",
"_ = list(G_loaded.successors(list(G_loaded.nodes())[0]))\n",
"nx_load_time = time.perf_counter() - t0\n",
"\n",
"nx_disk = dir_size_mb(path + '.pickle')\n",
"\n",
"print(f'NetworkX: {G_nx.number_of_nodes()} nodos, {G_nx.number_of_edges()} aristas')\n",
"print(f' Insert: {nx_insert_time*1000:.1f}ms')\n",
"print(f' Queries (8): {nx_query_time*1000:.1f}ms')\n",
"print(f' Save: {nx_save_time*1000:.1f}ms')\n",
"print(f' Load+query: {nx_load_time*1000:.1f}ms')\n",
"print(f' Disco: {nx_disk:.2f}MB')\n",
"print()\n",
"for k, v in nx_results.items():\n",
" if isinstance(v, list) and len(v) > 5:\n",
" print(f' {k}: {len(v)} resultados — {v[:3]}...')\n",
" else:\n",
" print(f' {k}: {v}')"
]
},
{
"cell_type": "markdown",
"id": "section-4",
"metadata": {},
"source": [
"## 4. Backend: Kuzu (Cypher embebido)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "kuzu-impl",
"metadata": {},
"outputs": [],
"source": [
"import kuzu\n",
"\n",
"def kuzu_setup(nodes, edges_list, path):\n",
" cleanup_path(path)\n",
" os.makedirs(path, exist_ok=True)\n",
" db = kuzu.Database(path)\n",
" conn = kuzu.Connection(db)\n",
" \n",
" # Schema\n",
" conn.execute('CREATE NODE TABLE FnNode(id STRING, name STRING, node_type STRING, '\n",
" 'kind STRING, lang STRING, domain STRING, purity STRING, '\n",
" 'description STRING, PRIMARY KEY(id))')\n",
" conn.execute('CREATE REL TABLE DEPENDS_ON(FROM FnNode TO FnNode, relation STRING)')\n",
" \n",
" # Insert nodos\n",
" for nid, attrs in nodes.items():\n",
" conn.execute(\n",
" 'CREATE (n:FnNode {id: $id, name: $name, node_type: $node_type, '\n",
" 'kind: $kind, lang: $lang, domain: $domain, purity: $purity, '\n",
" 'description: $desc})',\n",
" parameters={\n",
" 'id': nid,\n",
" 'name': attrs.get('name', ''),\n",
" 'node_type': attrs.get('node_type', ''),\n",
" 'kind': attrs.get('kind', ''),\n",
" 'lang': attrs.get('lang', ''),\n",
" 'domain': attrs.get('domain', ''),\n",
" 'purity': attrs.get('purity', ''),\n",
" 'desc': attrs.get('description', ''),\n",
" }\n",
" )\n",
" \n",
" # Insert aristas\n",
" for src, tgt, rel in edges_list:\n",
" conn.execute(\n",
" 'MATCH (a:FnNode {id: $src}), (b:FnNode {id: $tgt}) '\n",
" 'CREATE (a)-[:DEPENDS_ON {relation: $rel}]->(b)',\n",
" parameters={'src': src, 'tgt': tgt, 'rel': rel}\n",
" )\n",
" \n",
" return db, conn\n",
"\n",
"def kuzu_queries(conn):\n",
" results = {}\n",
" \n",
" # direct_deps\n",
" r = conn.execute('MATCH (a:FnNode {id: \"filter_slice_go_core\"})-[:DEPENDS_ON]->(b) RETURN b.id')\n",
" results['direct_deps'] = [row[0] for row in r.get_as_df().values]\n",
" \n",
" # reverse_deps\n",
" r = conn.execute('MATCH (a)-[:DEPENDS_ON]->(b:FnNode {id: \"error_go_core\"}) RETURN a.id')\n",
" results['reverse_deps'] = [row[0] for row in r.get_as_df().values]\n",
" \n",
" # two_hop\n",
" r = conn.execute(\n",
" 'MATCH (a:FnNode {id: \"init_metabase_go_pipelines\"})-[:DEPENDS_ON]->()-[:DEPENDS_ON]->(c) '\n",
" 'RETURN DISTINCT c.id'\n",
" )\n",
" results['two_hop'] = [row[0] for row in r.get_as_df().values]\n",
" \n",
" # domain_subgraph\n",
" r = conn.execute(\n",
" 'MATCH (a:FnNode {domain: \"finance\"})-[e:DEPENDS_ON]->(b:FnNode {domain: \"finance\"}) '\n",
" 'RETURN a.id, b.id'\n",
" )\n",
" results['domain_subgraph'] = [(row[0], row[1]) for row in r.get_as_df().values]\n",
" \n",
" # most_connected (in+out degree via counting edges)\n",
" r = conn.execute(\n",
" 'MATCH (n:FnNode) '\n",
" 'OPTIONAL MATCH (n)-[e1:DEPENDS_ON]->() '\n",
" 'OPTIONAL MATCH ()-[e2:DEPENDS_ON]->(n) '\n",
" 'RETURN n.id, count(DISTINCT e1) + count(DISTINCT e2) AS deg '\n",
" 'ORDER BY deg DESC LIMIT 5'\n",
" )\n",
" results['most_connected'] = [(row[0], row[1]) for row in r.get_as_df().values]\n",
" \n",
" # path_exists\n",
" r = conn.execute(\n",
" 'MATCH (a:FnNode {domain: \"finance\"})-[:DEPENDS_ON* 1..5]->(b:FnNode {id: \"error_go_core\"}) '\n",
" 'RETURN a.id LIMIT 1'\n",
" )\n",
" results['path_exists'] = len(r.get_as_df()) > 0\n",
" \n",
" # isolated\n",
" r = conn.execute(\n",
" 'MATCH (n:FnNode) WHERE NOT EXISTS { MATCH (n)-[:DEPENDS_ON]->() } '\n",
" 'AND NOT EXISTS { MATCH ()-[:DEPENDS_ON]->(n) } RETURN n.id'\n",
" )\n",
" results['isolated'] = [row[0] for row in r.get_as_df().values]\n",
" \n",
" # type_users\n",
" r = conn.execute(\n",
" 'MATCH (a)-[:DEPENDS_ON {relation: \"uses_type\"}]->(b:FnNode {id: \"SMA_go_finance\"}) RETURN a.id'\n",
" )\n",
" results['type_users'] = [row[0] for row in r.get_as_df().values]\n",
" \n",
" return results\n",
"\n",
"# Benchmark\n",
"path = os.path.join(DATA_DIR, 'kuzu')\n",
"\n",
"t0 = time.perf_counter()\n",
"kuzu_db, kuzu_conn = kuzu_setup(node_map, valid_edges, path)\n",
"kuzu_insert_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"kuzu_results = kuzu_queries(kuzu_conn)\n",
"kuzu_query_time = time.perf_counter() - t0\n",
"\n",
"kuzu_disk = dir_size_mb(path)\n",
"\n",
"# Load from cold\n",
"del kuzu_conn, kuzu_db\n",
"t0 = time.perf_counter()\n",
"kuzu_db2 = kuzu.Database(path)\n",
"kuzu_conn2 = kuzu.Connection(kuzu_db2)\n",
"r = kuzu_conn2.execute('MATCH (a:FnNode {id: \"filter_slice_go_core\"})-[:DEPENDS_ON]->(b) RETURN b.id')\n",
"_ = r.get_as_df()\n",
"kuzu_load_time = time.perf_counter() - t0\n",
"del kuzu_conn2, kuzu_db2\n",
"\n",
"print(f'Kuzu:')\n",
"print(f' Insert: {kuzu_insert_time*1000:.1f}ms')\n",
"print(f' Queries (8): {kuzu_query_time*1000:.1f}ms')\n",
"print(f' Load+query: {kuzu_load_time*1000:.1f}ms')\n",
"print(f' Disco: {kuzu_disk:.2f}MB')\n",
"print()\n",
"for k, v in kuzu_results.items():\n",
" if isinstance(v, list) and len(v) > 5:\n",
" print(f' {k}: {len(v)} resultados — {v[:3]}...')\n",
" else:\n",
" print(f' {k}: {v}')"
]
},
{
"cell_type": "markdown",
"id": "section-5",
"metadata": {},
"source": [
"## 5. Backend: SQLite + CTEs recursivos"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "sqlite-impl",
"metadata": {},
"outputs": [],
"source": [
"def sqlite_setup(nodes, edges_list, path):\n",
" dbpath = path + '.db'\n",
" cleanup_path(dbpath)\n",
" db = sqlite3.connect(dbpath)\n",
" db.execute('CREATE TABLE nodes (id TEXT PRIMARY KEY, name TEXT, node_type TEXT, '\n",
" 'kind TEXT, lang TEXT, domain TEXT, purity TEXT, description TEXT)')\n",
" db.execute('CREATE TABLE edges (src TEXT, tgt TEXT, relation TEXT, '\n",
" 'FOREIGN KEY(src) REFERENCES nodes(id), FOREIGN KEY(tgt) REFERENCES nodes(id))')\n",
" db.execute('CREATE INDEX idx_edges_src ON edges(src)')\n",
" db.execute('CREATE INDEX idx_edges_tgt ON edges(tgt)')\n",
" db.execute('CREATE INDEX idx_edges_rel ON edges(relation)')\n",
" db.execute('CREATE INDEX idx_nodes_domain ON nodes(domain)')\n",
" \n",
" db.executemany(\n",
" 'INSERT INTO nodes VALUES (?,?,?,?,?,?,?,?)',\n",
" [(nid, a.get('name',''), a.get('node_type',''), a.get('kind',''),\n",
" a.get('lang',''), a.get('domain',''), a.get('purity',''),\n",
" a.get('description','')) for nid, a in nodes.items()]\n",
" )\n",
" db.executemany('INSERT INTO edges VALUES (?,?,?)', edges_list)\n",
" db.commit()\n",
" return db\n",
"\n",
"def sqlite_queries(db):\n",
" results = {}\n",
" \n",
" # direct_deps\n",
" results['direct_deps'] = [r[0] for r in db.execute(\n",
" 'SELECT tgt FROM edges WHERE src = \"filter_slice_go_core\"'\n",
" ).fetchall()]\n",
" \n",
" # reverse_deps\n",
" results['reverse_deps'] = [r[0] for r in db.execute(\n",
" 'SELECT src FROM edges WHERE tgt = \"error_go_core\"'\n",
" ).fetchall()]\n",
" \n",
" # two_hop (CTE recursivo)\n",
" results['two_hop'] = [r[0] for r in db.execute(\n",
" 'WITH hop1 AS (SELECT tgt FROM edges WHERE src = \"init_metabase_go_pipelines\"), '\n",
" 'hop2 AS (SELECT DISTINCT e.tgt FROM edges e JOIN hop1 h ON e.src = h.tgt) '\n",
" 'SELECT tgt FROM hop2'\n",
" ).fetchall()]\n",
" \n",
" # domain_subgraph\n",
" results['domain_subgraph'] = db.execute(\n",
" 'SELECT e.src, e.tgt FROM edges e '\n",
" 'JOIN nodes n1 ON e.src = n1.id JOIN nodes n2 ON e.tgt = n2.id '\n",
" 'WHERE n1.domain = \"finance\" AND n2.domain = \"finance\"'\n",
" ).fetchall()\n",
" \n",
" # most_connected\n",
" results['most_connected'] = db.execute(\n",
" 'SELECT id, (SELECT COUNT(*) FROM edges WHERE src=id) + '\n",
" '(SELECT COUNT(*) FROM edges WHERE tgt=id) AS deg '\n",
" 'FROM nodes ORDER BY deg DESC LIMIT 5'\n",
" ).fetchall()\n",
" \n",
" # path_exists (CTE recursivo con limite de profundidad)\n",
" results['path_exists'] = len(db.execute(\n",
" 'WITH RECURSIVE reachable(id, depth) AS ('\n",
" ' SELECT src, 0 FROM edges e JOIN nodes n ON e.src = n.id WHERE n.domain = \"finance\" '\n",
" ' UNION '\n",
" ' SELECT e.tgt, r.depth + 1 FROM edges e JOIN reachable r ON e.src = r.id WHERE r.depth < 5'\n",
" ') SELECT 1 FROM reachable WHERE id = \"error_go_core\" LIMIT 1'\n",
" ).fetchall()) > 0\n",
" \n",
" # isolated\n",
" results['isolated'] = [r[0] for r in db.execute(\n",
" 'SELECT n.id FROM nodes n '\n",
" 'WHERE NOT EXISTS (SELECT 1 FROM edges WHERE src = n.id) '\n",
" 'AND NOT EXISTS (SELECT 1 FROM edges WHERE tgt = n.id)'\n",
" ).fetchall()]\n",
" \n",
" # type_users\n",
" results['type_users'] = [r[0] for r in db.execute(\n",
" 'SELECT src FROM edges WHERE tgt = \"SMA_go_finance\" AND relation = \"uses_type\"'\n",
" ).fetchall()]\n",
" \n",
" return results\n",
"\n",
"# Benchmark\n",
"path = os.path.join(DATA_DIR, 'sqlite_graph')\n",
"\n",
"t0 = time.perf_counter()\n",
"sqlite_db = sqlite_setup(node_map, valid_edges, path)\n",
"sqlite_insert_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"sqlite_results = sqlite_queries(sqlite_db)\n",
"sqlite_query_time = time.perf_counter() - t0\n",
"\n",
"sqlite_db.close()\n",
"sqlite_disk = dir_size_mb(path + '.db')\n",
"\n",
"t0 = time.perf_counter()\n",
"db2 = sqlite3.connect(path + '.db')\n",
"_ = db2.execute('SELECT tgt FROM edges WHERE src = \"filter_slice_go_core\"').fetchall()\n",
"db2.close()\n",
"sqlite_load_time = time.perf_counter() - t0\n",
"\n",
"print(f'SQLite + CTEs:')\n",
"print(f' Insert: {sqlite_insert_time*1000:.1f}ms')\n",
"print(f' Queries (8): {sqlite_query_time*1000:.1f}ms')\n",
"print(f' Load+query: {sqlite_load_time*1000:.1f}ms')\n",
"print(f' Disco: {sqlite_disk:.2f}MB')\n",
"print()\n",
"for k, v in sqlite_results.items():\n",
" if isinstance(v, list) and len(v) > 5:\n",
" print(f' {k}: {len(v)} resultados — {v[:3]}...')\n",
" else:\n",
" print(f' {k}: {v}')"
]
},
{
"cell_type": "markdown",
"id": "section-6",
"metadata": {},
"source": [
"## 6. Backend: RDFLib (SPARQL)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "rdflib-impl",
"metadata": {},
"outputs": [],
"source": [
"from rdflib import Graph as RDFGraph, Namespace, Literal, URIRef\n",
"from rdflib.namespace import RDF, RDFS\n",
"\n",
"FN = Namespace('http://fn-registry.local/')\n",
"FNREL = Namespace('http://fn-registry.local/rel/')\n",
"FNPROP = Namespace('http://fn-registry.local/prop/')\n",
"\n",
"def rdf_setup(nodes, edges_list, path):\n",
" g = RDFGraph()\n",
" g.bind('fn', FN)\n",
" g.bind('fnrel', FNREL)\n",
" g.bind('fnprop', FNPROP)\n",
" \n",
" for nid, attrs in nodes.items():\n",
" uri = FN[nid]\n",
" g.add((uri, RDF.type, FN['Function'] if attrs.get('node_type') == 'function' else FN['Type']))\n",
" for prop in ['name', 'kind', 'lang', 'domain', 'purity', 'description']:\n",
" val = attrs.get(prop, '')\n",
" if val:\n",
" g.add((uri, FNPROP[prop], Literal(val)))\n",
" \n",
" for src, tgt, rel in edges_list:\n",
" g.add((FN[src], FNREL[rel], FN[tgt]))\n",
" \n",
" return g\n",
"\n",
"def rdf_queries(g):\n",
" results = {}\n",
" \n",
" # direct_deps\n",
" r = g.query('SELECT ?b WHERE { fn:filter_slice_go_core ?rel ?b . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n",
" initNs={'fn': FN, 'fnrel': FNREL})\n",
" results['direct_deps'] = [str(row[0]).replace(str(FN), '') for row in r]\n",
" \n",
" # reverse_deps\n",
" r = g.query('SELECT ?a WHERE { ?a ?rel fn:error_go_core . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n",
" initNs={'fn': FN, 'fnrel': FNREL})\n",
" results['reverse_deps'] = [str(row[0]).replace(str(FN), '') for row in r]\n",
" \n",
" # two_hop\n",
" r = g.query(\n",
" 'SELECT DISTINCT ?c WHERE { fn:init_metabase_go_pipelines ?r1 ?b . ?b ?r2 ?c . '\n",
" 'FILTER(STRSTARTS(STR(?r1), STR(fnrel:))) FILTER(STRSTARTS(STR(?r2), STR(fnrel:))) }',\n",
" initNs={'fn': FN, 'fnrel': FNREL}\n",
" )\n",
" results['two_hop'] = [str(row[0]).replace(str(FN), '') for row in r]\n",
" \n",
" # domain_subgraph\n",
" r = g.query(\n",
" 'SELECT ?a ?b WHERE { ?a fnprop:domain \"finance\" . ?b fnprop:domain \"finance\" . '\n",
" '?a ?rel ?b . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n",
" initNs={'fn': FN, 'fnrel': FNREL, 'fnprop': FNPROP}\n",
" )\n",
" results['domain_subgraph'] = [(str(row[0]).replace(str(FN), ''), str(row[1]).replace(str(FN), '')) for row in r]\n",
" \n",
" # most_connected (SPARQL no tiene degree nativo, contamos)\n",
" r = g.query(\n",
" 'SELECT ?n (COUNT(DISTINCT ?e) AS ?deg) WHERE { '\n",
" '{ ?n ?rel ?o . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) BIND(?o AS ?e) } '\n",
" 'UNION '\n",
" '{ ?s ?rel ?n . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) BIND(?s AS ?e) } '\n",
" '} GROUP BY ?n ORDER BY DESC(?deg) LIMIT 5',\n",
" initNs={'fn': FN, 'fnrel': FNREL}\n",
" )\n",
" results['most_connected'] = [(str(row[0]).replace(str(FN), ''), int(row[1])) for row in r]\n",
" \n",
" # path_exists (SPARQL 1.1 property paths, max 5 hops)\n",
" r = g.query(\n",
" 'ASK WHERE { ?a fnprop:domain \"finance\" . '\n",
" '?a (fnrel:uses_function|fnrel:uses_type|fnrel:returns|fnrel:error_type){1,5} fn:error_go_core }',\n",
" initNs={'fn': FN, 'fnrel': FNREL, 'fnprop': FNPROP}\n",
" )\n",
" results['path_exists'] = bool(r)\n",
" \n",
" # isolated\n",
" r = g.query(\n",
" 'SELECT ?n WHERE { ?n a ?type . FILTER(?type IN (fn:Function, fn:Type)) '\n",
" 'FILTER NOT EXISTS { ?n ?rel ?o . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) } '\n",
" 'FILTER NOT EXISTS { ?s ?rel2 ?n . FILTER(STRSTARTS(STR(?rel2), STR(fnrel:))) } }',\n",
" initNs={'fn': FN, 'fnrel': FNREL}\n",
" )\n",
" results['isolated'] = [str(row[0]).replace(str(FN), '') for row in r]\n",
" \n",
" # type_users\n",
" r = g.query('SELECT ?a WHERE { ?a fnrel:uses_type fn:SMA_go_finance }',\n",
" initNs={'fn': FN, 'fnrel': FNREL})\n",
" results['type_users'] = [str(row[0]).replace(str(FN), '') for row in r]\n",
" \n",
" return results\n",
"\n",
"# Benchmark\n",
"path = os.path.join(DATA_DIR, 'rdflib')\n",
"\n",
"t0 = time.perf_counter()\n",
"g_rdf = rdf_setup(node_map, valid_edges, path)\n",
"rdf_insert_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"rdf_results = rdf_queries(g_rdf)\n",
"rdf_query_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"g_rdf.serialize(destination=path + '.ttl', format='turtle')\n",
"rdf_save_time = time.perf_counter() - t0\n",
"\n",
"rdf_disk = dir_size_mb(path + '.ttl')\n",
"\n",
"t0 = time.perf_counter()\n",
"g2 = RDFGraph()\n",
"g2.parse(path + '.ttl', format='turtle')\n",
"_ = list(g2.query('SELECT ?b WHERE { fn:filter_slice_go_core ?r ?b . FILTER(STRSTARTS(STR(?r), STR(fnrel:))) }',\n",
" initNs={'fn': FN, 'fnrel': FNREL}))\n",
"rdf_load_time = time.perf_counter() - t0\n",
"\n",
"print(f'RDFLib: {len(g_rdf)} triples')\n",
"print(f' Insert: {rdf_insert_time*1000:.1f}ms')\n",
"print(f' Queries (8): {rdf_query_time*1000:.1f}ms')\n",
"print(f' Save (turtle): {rdf_save_time*1000:.1f}ms')\n",
"print(f' Load+query: {rdf_load_time*1000:.1f}ms')\n",
"print(f' Disco: {rdf_disk:.2f}MB')\n",
"print()\n",
"for k, v in rdf_results.items():\n",
" if isinstance(v, list) and len(v) > 5:\n",
" print(f' {k}: {len(v)} resultados — {v[:3]}...')\n",
" else:\n",
" print(f' {k}: {v}')"
]
},
{
"cell_type": "markdown",
"id": "section-7",
"metadata": {},
"source": [
"## 7. Backend: igraph"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "igraph-impl",
"metadata": {},
"outputs": [],
"source": [
"import igraph as ig\n",
"\n",
"def igraph_setup(nodes, edges_list, path):\n",
" node_ids = list(nodes.keys())\n",
" id_to_idx = {nid: i for i, nid in enumerate(node_ids)}\n",
" \n",
" g = ig.Graph(directed=True)\n",
" g.add_vertices(len(node_ids))\n",
" g.vs['node_id'] = node_ids\n",
" g.vs['name'] = [nodes[nid].get('name', '') for nid in node_ids]\n",
" g.vs['node_type'] = [nodes[nid].get('node_type', '') for nid in node_ids]\n",
" g.vs['domain'] = [nodes[nid].get('domain', '') for nid in node_ids]\n",
" g.vs['purity'] = [nodes[nid].get('purity', '') for nid in node_ids]\n",
" g.vs['kind'] = [nodes[nid].get('kind', '') for nid in node_ids]\n",
" g.vs['lang'] = [nodes[nid].get('lang', '') for nid in node_ids]\n",
" \n",
" edge_tuples = [(id_to_idx[s], id_to_idx[t]) for s, t, _ in edges_list]\n",
" edge_rels = [r for _, _, r in edges_list]\n",
" g.add_edges(edge_tuples)\n",
" g.es['relation'] = edge_rels\n",
" \n",
" return g, id_to_idx\n",
"\n",
"def igraph_queries(g, id_to_idx):\n",
" results = {}\n",
" idx_to_id = {v: k for k, v in id_to_idx.items()}\n",
" \n",
" # direct_deps\n",
" if 'filter_slice_go_core' in id_to_idx:\n",
" idx = id_to_idx['filter_slice_go_core']\n",
" results['direct_deps'] = [idx_to_id[n] for n in g.neighbors(idx, mode='out')]\n",
" else:\n",
" results['direct_deps'] = []\n",
" \n",
" # reverse_deps\n",
" if 'error_go_core' in id_to_idx:\n",
" idx = id_to_idx['error_go_core']\n",
" results['reverse_deps'] = [idx_to_id[n] for n in g.neighbors(idx, mode='in')]\n",
" else:\n",
" results['reverse_deps'] = []\n",
" \n",
" # two_hop\n",
" if 'init_metabase_go_pipelines' in id_to_idx:\n",
" idx = id_to_idx['init_metabase_go_pipelines']\n",
" hop1 = g.neighbors(idx, mode='out')\n",
" hop2 = set()\n",
" for n in hop1:\n",
" hop2.update(g.neighbors(n, mode='out'))\n",
" results['two_hop'] = [idx_to_id[n] for n in hop2]\n",
" else:\n",
" results['two_hop'] = []\n",
" \n",
" # domain_subgraph\n",
" finance_idxs = set(v.index for v in g.vs.select(domain='finance'))\n",
" finance_edges = [(idx_to_id[e.source], idx_to_id[e.target])\n",
" for e in g.es if e.source in finance_idxs and e.target in finance_idxs]\n",
" results['domain_subgraph'] = finance_edges\n",
" \n",
" # most_connected\n",
" degrees = [(idx_to_id[i], g.degree(i, mode='all')) for i in range(g.vcount())]\n",
" degrees.sort(key=lambda x: -x[1])\n",
" results['most_connected'] = degrees[:5]\n",
" \n",
" # path_exists\n",
" if 'error_go_core' in id_to_idx:\n",
" target = id_to_idx['error_go_core']\n",
" finance_idxs_list = list(finance_idxs)\n",
" has_path = any(\n",
" len(g.get_shortest_paths(src, to=target, mode='out')[0]) > 0\n",
" for src in finance_idxs_list if src != target\n",
" )\n",
" results['path_exists'] = has_path\n",
" else:\n",
" results['path_exists'] = False\n",
" \n",
" # isolated\n",
" results['isolated'] = [idx_to_id[v.index] for v in g.vs if g.degree(v.index, mode='all') == 0]\n",
" \n",
" # type_users\n",
" if 'SMA_go_finance' in id_to_idx:\n",
" idx = id_to_idx['SMA_go_finance']\n",
" preds = g.neighbors(idx, mode='in')\n",
" # Filtrar por relacion uses_type\n",
" type_user_idxs = []\n",
" for p in preds:\n",
" eid = g.get_eid(p, idx)\n",
" if g.es[eid]['relation'] == 'uses_type':\n",
" type_user_idxs.append(p)\n",
" results['type_users'] = [idx_to_id[n] for n in type_user_idxs]\n",
" else:\n",
" results['type_users'] = []\n",
" \n",
" return results\n",
"\n",
"# Benchmark\n",
"path = os.path.join(DATA_DIR, 'igraph')\n",
"\n",
"t0 = time.perf_counter()\n",
"g_ig, ig_id_map = igraph_setup(node_map, valid_edges, path)\n",
"ig_insert_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"ig_results = igraph_queries(g_ig, ig_id_map)\n",
"ig_query_time = time.perf_counter() - t0\n",
"\n",
"t0 = time.perf_counter()\n",
"g_ig.write_pickle(path + '.pickle')\n",
"ig_save_time = time.perf_counter() - t0\n",
"\n",
"ig_disk = dir_size_mb(path + '.pickle')\n",
"\n",
"t0 = time.perf_counter()\n",
"g_loaded = ig.Graph.Read_Pickle(path + '.pickle')\n",
"_ = g_loaded.neighbors(0, mode='out')\n",
"ig_load_time = time.perf_counter() - t0\n",
"\n",
"print(f'igraph: {g_ig.vcount()} vertices, {g_ig.ecount()} aristas')\n",
"print(f' Insert: {ig_insert_time*1000:.1f}ms')\n",
"print(f' Queries (8): {ig_query_time*1000:.1f}ms')\n",
"print(f' Save: {ig_save_time*1000:.1f}ms')\n",
"print(f' Load+query: {ig_load_time*1000:.1f}ms')\n",
"print(f' Disco: {ig_disk:.2f}MB')\n",
"print()\n",
"for k, v in ig_results.items():\n",
" if isinstance(v, list) and len(v) > 5:\n",
" print(f' {k}: {len(v)} resultados — {v[:3]}...')\n",
" else:\n",
" print(f' {k}: {v}')"
]
},
{
"cell_type": "markdown",
"id": "section-8",
"metadata": {},
"source": [
"## 8. Tabla resumen y visualizaciones"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "summary",
"metadata": {},
"outputs": [],
"source": [
"summary_data = [\n",
" {'Backend': 'NetworkX', 'Insert (ms)': round(nx_insert_time*1000, 1),\n",
" 'Queries 8x (ms)': round(nx_query_time*1000, 1),\n",
" 'Save (ms)': round(nx_save_time*1000, 1),\n",
" 'Load+query (ms)': round(nx_load_time*1000, 1),\n",
" 'Disk (MB)': round(nx_disk, 2),\n",
" 'Query Language': 'Python API'},\n",
" {'Backend': 'Kuzu', 'Insert (ms)': round(kuzu_insert_time*1000, 1),\n",
" 'Queries 8x (ms)': round(kuzu_query_time*1000, 1),\n",
" 'Save (ms)': 0, # auto-persist\n",
" 'Load+query (ms)': round(kuzu_load_time*1000, 1),\n",
" 'Disk (MB)': round(kuzu_disk, 2),\n",
" 'Query Language': 'Cypher'},\n",
" {'Backend': 'SQLite+CTE', 'Insert (ms)': round(sqlite_insert_time*1000, 1),\n",
" 'Queries 8x (ms)': round(sqlite_query_time*1000, 1),\n",
" 'Save (ms)': 0, # auto-persist\n",
" 'Load+query (ms)': round(sqlite_load_time*1000, 1),\n",
" 'Disk (MB)': round(sqlite_disk, 2),\n",
" 'Query Language': 'SQL + CTEs'},\n",
" {'Backend': 'RDFLib', 'Insert (ms)': round(rdf_insert_time*1000, 1),\n",
" 'Queries 8x (ms)': round(rdf_query_time*1000, 1),\n",
" 'Save (ms)': round(rdf_save_time*1000, 1),\n",
" 'Load+query (ms)': round(rdf_load_time*1000, 1),\n",
" 'Disk (MB)': round(rdf_disk, 2),\n",
" 'Query Language': 'SPARQL'},\n",
" {'Backend': 'igraph', 'Insert (ms)': round(ig_insert_time*1000, 1),\n",
" 'Queries 8x (ms)': round(ig_query_time*1000, 1),\n",
" 'Save (ms)': round(ig_save_time*1000, 1),\n",
" 'Load+query (ms)': round(ig_load_time*1000, 1),\n",
" 'Disk (MB)': round(ig_disk, 2),\n",
" 'Query Language': 'Python API'},\n",
"]\n",
"\n",
"df_summary = pd.DataFrame(summary_data)\n",
"print(df_summary.to_string(index=False))\n",
"print()\n",
"\n",
"# Grafico comparativo\n",
"fig, axes = plt.subplots(1, 3, figsize=(18, 6))\n",
"colors = {'NetworkX': '#e74c3c', 'Kuzu': '#3498db', 'SQLite+CTE': '#2ecc71', 'RDFLib': '#f39c12', 'igraph': '#9b59b6'}\n",
"\n",
"# Insert\n",
"ax = axes[0]\n",
"ax.barh(df_summary['Backend'], df_summary['Insert (ms)'], color=[colors[b] for b in df_summary['Backend']])\n",
"ax.set_xlabel('ms')\n",
"ax.set_title('Insert (nodos + aristas)')\n",
"\n",
"# Queries\n",
"ax = axes[1]\n",
"ax.barh(df_summary['Backend'], df_summary['Queries 8x (ms)'], color=[colors[b] for b in df_summary['Backend']])\n",
"ax.set_xlabel('ms')\n",
"ax.set_title('8 queries de traversal')\n",
"\n",
"# Load + query\n",
"ax = axes[2]\n",
"ax.barh(df_summary['Backend'], df_summary['Load+query (ms)'], color=[colors[b] for b in df_summary['Backend']])\n",
"ax.set_xlabel('ms')\n",
"ax.set_title('Cold start: load + 1 query')\n",
"\n",
"plt.suptitle(f'Comparativa de graph backends ({len(all_node_ids)} nodos, {len(valid_edges)} aristas)', fontsize=14)\n",
"plt.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"id": "section-9",
"metadata": {},
"source": [
"## 9. Validacion cruzada de resultados\n",
"\n",
"Verificamos que todos los backends devuelven los mismos resultados para cada query."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cross-validate",
"metadata": {},
"outputs": [],
"source": [
"all_backend_results = {\n",
" 'NetworkX': nx_results,\n",
" 'Kuzu': kuzu_results,\n",
" 'SQLite+CTE': sqlite_results,\n",
" 'RDFLib': rdf_results,\n",
" 'igraph': ig_results,\n",
"}\n",
"\n",
"print('Validacion cruzada de resultados:')\n",
"print('=' * 60)\n",
"\n",
"for query_name in ['direct_deps', 'reverse_deps', 'two_hop', 'isolated', 'type_users', 'path_exists']:\n",
" print(f'\\n{query_name}:')\n",
" values = {}\n",
" for backend, results in all_backend_results.items():\n",
" val = results.get(query_name)\n",
" if isinstance(val, list):\n",
" values[backend] = sorted(str(v) for v in val)\n",
" else:\n",
" values[backend] = val\n",
" \n",
" # Comparar\n",
" ref_backend = 'NetworkX'\n",
" ref_val = values[ref_backend]\n",
" all_match = True\n",
" for backend, val in values.items():\n",
" match = val == ref_val\n",
" status = 'OK' if match else 'DIFF'\n",
" if isinstance(val, list):\n",
" print(f' {backend:12s}: {len(val)} items [{status}]')\n",
" else:\n",
" print(f' {backend:12s}: {val} [{status}]')\n",
" if not match:\n",
" all_match = False\n",
" if isinstance(val, list) and isinstance(ref_val, list):\n",
" extra = set(val) - set(ref_val)\n",
" missing = set(ref_val) - set(val)\n",
" if extra: print(f' extra: {list(extra)[:5]}')\n",
" if missing: print(f' missing: {list(missing)[:5]}')"
]
},
{
"cell_type": "markdown",
"id": "section-10",
"metadata": {},
"source": [
"## 10. Conclusiones notebook 01\n",
"\n",
"Este notebook establece:\n",
"- El grafo de dependencias del fn_registry cargado en 5 backends\n",
"- Benchmark de rendimiento (insert, queries, persistencia)\n",
"- Validacion cruzada de correctitud\n",
"\n",
"**Siguiente notebook (02):** LLM retrieval — usar `claude -p` para generar queries en cada lenguaje (Cypher, SQL, SPARQL, Python API) y evaluar correctitud vs las respuestas verificadas."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.13.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}