{ "cells": [ { "cell_type": "markdown", "id": "intro", "metadata": {}, "source": [ "# Comparativa: bases de datos de grafos embebidas + LLM retrieval\n", "\n", "## Objetivo\n", "\n", "1. Cargar el grafo de dependencias de fn_registry en 5 backends de grafos\n", "2. Benchmark: insercion, traversal, persistencia\n", "3. Evaluar como un LLM (`claude -p`) genera queries para recuperar datos de cada backend\n", "\n", "## Backends\n", "\n", "| Backend | Query Language | Tipo |\n", "|---|---|---|\n", "| **Kuzu** | Cypher | Graph DB embebida |\n", "| **NetworkX** | API Python | Libreria in-memory |\n", "| **SQLite + CTEs** | SQL recursivo | Relacional |\n", "| **RDFLib** | SPARQL | Triple store |\n", "| **igraph** | API Python | Libreria C/Python |\n", "\n", "## Grafo de prueba\n", "\n", "El propio fn_registry: ~354 funciones + 39 tipos como nodos, dependencias (uses_functions, uses_types, returns) como aristas." ] }, { "cell_type": "markdown", "id": "section-1", "metadata": {}, "source": [ "## 1. Extraer grafo desde registry.db" ] }, { "cell_type": "code", "execution_count": null, "id": "extract-graph", "metadata": {}, "outputs": [], "source": [ "import sqlite3\n", "import json\n", "import os\n", "import time\n", "import shutil\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "\n", "plt.style.use('seaborn-v0_8-whitegrid')\n", "\n", "FN_ROOT = os.environ.get('FN_REGISTRY_ROOT', os.path.expanduser('~/fn_registry'))\n", "DB_PATH = os.path.join(FN_ROOT, 'registry.db')\n", "\n", "conn = sqlite3.connect(DB_PATH)\n", "conn.row_factory = sqlite3.Row\n", "\n", "# Nodos: funciones\n", "functions = [dict(r) for r in conn.execute(\n", " 'SELECT id, name, kind, lang, domain, purity, signature, description, '\n", " 'uses_functions, uses_types, returns, returns_optional, error_type, tags '\n", " 'FROM functions ORDER BY name'\n", ").fetchall()]\n", "\n", "# Nodos: tipos\n", "types = [dict(r) for r in conn.execute(\n", " 'SELECT id, name, lang, domain, algebraic, description, uses_types, tags '\n", " 'FROM types ORDER BY name'\n", ").fetchall()]\n", "\n", "conn.close()\n", "\n", "# Construir aristas\n", "edges = [] # (source_id, target_id, relation_type)\n", "\n", "for f in functions:\n", " fid = f['id']\n", " # uses_functions\n", " for dep in json.loads(f.get('uses_functions') or '[]'):\n", " edges.append((fid, dep, 'uses_function'))\n", " # uses_types\n", " for dep in json.loads(f.get('uses_types') or '[]'):\n", " edges.append((fid, dep, 'uses_type'))\n", " # returns\n", " ret = f.get('returns') or ''\n", " if ret:\n", " edges.append((fid, ret, 'returns'))\n", " # error_type\n", " err = f.get('error_type') or ''\n", " if err:\n", " edges.append((fid, err, 'error_type'))\n", "\n", "for t in types:\n", " tid = t['id']\n", " for dep in json.loads(t.get('uses_types') or '[]'):\n", " edges.append((tid, dep, 'uses_type'))\n", "\n", "# Todos los IDs de nodos referenciados\n", "all_node_ids = set(f['id'] for f in functions) | set(t['id'] for t in types)\n", "# Nodos por ID para lookup rapido\n", "node_map = {f['id']: {**f, 'node_type': 'function'} for f in functions}\n", "node_map.update({t['id']: {**t, 'node_type': 'type'} for t in types})\n", "\n", "# Filtrar aristas a nodos que existen\n", "valid_edges = [(s, t, r) for s, t, r in edges if s in all_node_ids and t in all_node_ids]\n", "\n", "print(f'Nodos: {len(all_node_ids)} ({len(functions)} funciones + {len(types)} tipos)')\n", "print(f'Aristas: {len(valid_edges)} (de {len(edges)} totales, {len(edges) - len(valid_edges)} con target inexistente)')\n", "print(f'Relaciones: {set(r for _, _, r in valid_edges)}')" ] }, { "cell_type": "markdown", "id": "section-2", "metadata": {}, "source": [ "## 2. Benchmark framework" ] }, { "cell_type": "code", "execution_count": null, "id": "bench-framework", "metadata": {}, "outputs": [], "source": [ "DATA_DIR = 'data/graph_bench'\n", "os.makedirs(DATA_DIR, exist_ok=True)\n", "\n", "# Queries de traversal para benchmark (respuestas verificables contra el grafo)\n", "BENCH_QUERIES = [\n", " ('direct_deps', 'Funciones que usa directamente filter_slice_go_core'),\n", " ('reverse_deps', 'Funciones que dependen de error_go_core'),\n", " ('two_hop', 'Dependencias a 2 saltos desde init_metabase_go_pipelines'),\n", " ('domain_subgraph', 'Todas las aristas entre funciones del dominio finance'),\n", " ('most_connected', 'Top 5 nodos con mas conexiones (in + out degree)'),\n", " ('path_exists', 'Existe un camino entre cualquier funcion de finance y error_go_core?'),\n", " ('isolated', 'Funciones sin ninguna dependencia (ni entrante ni saliente)'),\n", " ('type_users', 'Funciones que usan el tipo SMA_go_finance'),\n", "]\n", "\n", "def dir_size_mb(path):\n", " total = 0\n", " if os.path.isfile(path):\n", " return os.path.getsize(path) / (1024*1024)\n", " if not os.path.exists(path):\n", " return 0\n", " for dp, dn, fns in os.walk(path):\n", " for f in fns:\n", " fp = os.path.join(dp, f)\n", " if os.path.exists(fp):\n", " total += os.path.getsize(fp)\n", " return total / (1024*1024)\n", "\n", "def cleanup_path(path):\n", " if os.path.isfile(path):\n", " os.remove(path)\n", " elif os.path.isdir(path):\n", " shutil.rmtree(path, ignore_errors=True)\n", " for suffix in ['.db', '.pickle', '.graphml']:\n", " p = path + suffix\n", " if os.path.exists(p):\n", " os.remove(p)\n", "\n", "print(f'Benchmark queries: {len(BENCH_QUERIES)}')\n", "for qid, desc in BENCH_QUERIES:\n", " print(f' {qid}: {desc}')" ] }, { "cell_type": "markdown", "id": "section-3", "metadata": {}, "source": [ "## 3. Backend: NetworkX (baseline)" ] }, { "cell_type": "code", "execution_count": null, "id": "networkx-impl", "metadata": {}, "outputs": [], "source": [ "import networkx as nx\n", "import pickle\n", "\n", "def nx_insert(nodes, edges_list, path):\n", " G = nx.DiGraph()\n", " for nid, attrs in nodes.items():\n", " G.add_node(nid, **{k: v for k, v in attrs.items() if isinstance(v, (str, int, float, bool))})\n", " for src, tgt, rel in edges_list:\n", " G.add_edge(src, tgt, relation=rel)\n", " return G\n", "\n", "def nx_queries(G):\n", " results = {}\n", " \n", " # direct_deps\n", " if 'filter_slice_go_core' in G:\n", " results['direct_deps'] = list(G.successors('filter_slice_go_core'))\n", " else:\n", " results['direct_deps'] = []\n", " \n", " # reverse_deps\n", " if 'error_go_core' in G:\n", " results['reverse_deps'] = list(G.predecessors('error_go_core'))\n", " else:\n", " results['reverse_deps'] = []\n", " \n", " # two_hop\n", " two_hop = set()\n", " if 'init_metabase_go_pipelines' in G:\n", " for n1 in G.successors('init_metabase_go_pipelines'):\n", " for n2 in G.successors(n1):\n", " two_hop.add(n2)\n", " results['two_hop'] = list(two_hop)\n", " \n", " # domain_subgraph\n", " finance_nodes = [n for n, d in G.nodes(data=True) if d.get('domain') == 'finance']\n", " finance_edges = [(u, v) for u, v in G.edges() if u in finance_nodes and v in finance_nodes]\n", " results['domain_subgraph'] = finance_edges\n", " \n", " # most_connected\n", " degree = sorted(((n, G.in_degree(n) + G.out_degree(n)) for n in G.nodes()), key=lambda x: -x[1])[:5]\n", " results['most_connected'] = degree\n", " \n", " # path_exists\n", " if 'error_go_core' in G:\n", " has_path = any(\n", " nx.has_path(G, n, 'error_go_core')\n", " for n in finance_nodes if n != 'error_go_core'\n", " )\n", " results['path_exists'] = has_path\n", " else:\n", " results['path_exists'] = False\n", " \n", " # isolated\n", " results['isolated'] = [n for n in G.nodes() if G.degree(n) == 0]\n", " \n", " # type_users\n", " if 'SMA_go_finance' in G:\n", " results['type_users'] = list(G.predecessors('SMA_go_finance'))\n", " else:\n", " results['type_users'] = []\n", " \n", " return results\n", "\n", "def nx_save(G, path):\n", " with open(path + '.pickle', 'wb') as f:\n", " pickle.dump(G, f)\n", "\n", "def nx_load(path):\n", " with open(path + '.pickle', 'rb') as f:\n", " return pickle.load(f)\n", "\n", "# Benchmark\n", "path = os.path.join(DATA_DIR, 'networkx')\n", "cleanup_path(path)\n", "\n", "t0 = time.perf_counter()\n", "G_nx = nx_insert(node_map, valid_edges, path)\n", "nx_insert_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "nx_results = nx_queries(G_nx)\n", "nx_query_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "nx_save(G_nx, path)\n", "nx_save_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "G_loaded = nx_load(path)\n", "_ = list(G_loaded.successors(list(G_loaded.nodes())[0]))\n", "nx_load_time = time.perf_counter() - t0\n", "\n", "nx_disk = dir_size_mb(path + '.pickle')\n", "\n", "print(f'NetworkX: {G_nx.number_of_nodes()} nodos, {G_nx.number_of_edges()} aristas')\n", "print(f' Insert: {nx_insert_time*1000:.1f}ms')\n", "print(f' Queries (8): {nx_query_time*1000:.1f}ms')\n", "print(f' Save: {nx_save_time*1000:.1f}ms')\n", "print(f' Load+query: {nx_load_time*1000:.1f}ms')\n", "print(f' Disco: {nx_disk:.2f}MB')\n", "print()\n", "for k, v in nx_results.items():\n", " if isinstance(v, list) and len(v) > 5:\n", " print(f' {k}: {len(v)} resultados — {v[:3]}...')\n", " else:\n", " print(f' {k}: {v}')" ] }, { "cell_type": "markdown", "id": "section-4", "metadata": {}, "source": [ "## 4. Backend: Kuzu (Cypher embebido)" ] }, { "cell_type": "code", "execution_count": null, "id": "kuzu-impl", "metadata": {}, "outputs": [], "source": [ "import kuzu\n", "\n", "def kuzu_setup(nodes, edges_list, path):\n", " cleanup_path(path)\n", " os.makedirs(path, exist_ok=True)\n", " db = kuzu.Database(path)\n", " conn = kuzu.Connection(db)\n", " \n", " # Schema\n", " conn.execute('CREATE NODE TABLE FnNode(id STRING, name STRING, node_type STRING, '\n", " 'kind STRING, lang STRING, domain STRING, purity STRING, '\n", " 'description STRING, PRIMARY KEY(id))')\n", " conn.execute('CREATE REL TABLE DEPENDS_ON(FROM FnNode TO FnNode, relation STRING)')\n", " \n", " # Insert nodos\n", " for nid, attrs in nodes.items():\n", " conn.execute(\n", " 'CREATE (n:FnNode {id: $id, name: $name, node_type: $node_type, '\n", " 'kind: $kind, lang: $lang, domain: $domain, purity: $purity, '\n", " 'description: $desc})',\n", " parameters={\n", " 'id': nid,\n", " 'name': attrs.get('name', ''),\n", " 'node_type': attrs.get('node_type', ''),\n", " 'kind': attrs.get('kind', ''),\n", " 'lang': attrs.get('lang', ''),\n", " 'domain': attrs.get('domain', ''),\n", " 'purity': attrs.get('purity', ''),\n", " 'desc': attrs.get('description', ''),\n", " }\n", " )\n", " \n", " # Insert aristas\n", " for src, tgt, rel in edges_list:\n", " conn.execute(\n", " 'MATCH (a:FnNode {id: $src}), (b:FnNode {id: $tgt}) '\n", " 'CREATE (a)-[:DEPENDS_ON {relation: $rel}]->(b)',\n", " parameters={'src': src, 'tgt': tgt, 'rel': rel}\n", " )\n", " \n", " return db, conn\n", "\n", "def kuzu_queries(conn):\n", " results = {}\n", " \n", " # direct_deps\n", " r = conn.execute('MATCH (a:FnNode {id: \"filter_slice_go_core\"})-[:DEPENDS_ON]->(b) RETURN b.id')\n", " results['direct_deps'] = [row[0] for row in r.get_as_df().values]\n", " \n", " # reverse_deps\n", " r = conn.execute('MATCH (a)-[:DEPENDS_ON]->(b:FnNode {id: \"error_go_core\"}) RETURN a.id')\n", " results['reverse_deps'] = [row[0] for row in r.get_as_df().values]\n", " \n", " # two_hop\n", " r = conn.execute(\n", " 'MATCH (a:FnNode {id: \"init_metabase_go_pipelines\"})-[:DEPENDS_ON]->()-[:DEPENDS_ON]->(c) '\n", " 'RETURN DISTINCT c.id'\n", " )\n", " results['two_hop'] = [row[0] for row in r.get_as_df().values]\n", " \n", " # domain_subgraph\n", " r = conn.execute(\n", " 'MATCH (a:FnNode {domain: \"finance\"})-[e:DEPENDS_ON]->(b:FnNode {domain: \"finance\"}) '\n", " 'RETURN a.id, b.id'\n", " )\n", " results['domain_subgraph'] = [(row[0], row[1]) for row in r.get_as_df().values]\n", " \n", " # most_connected (in+out degree via counting edges)\n", " r = conn.execute(\n", " 'MATCH (n:FnNode) '\n", " 'OPTIONAL MATCH (n)-[e1:DEPENDS_ON]->() '\n", " 'OPTIONAL MATCH ()-[e2:DEPENDS_ON]->(n) '\n", " 'RETURN n.id, count(DISTINCT e1) + count(DISTINCT e2) AS deg '\n", " 'ORDER BY deg DESC LIMIT 5'\n", " )\n", " results['most_connected'] = [(row[0], row[1]) for row in r.get_as_df().values]\n", " \n", " # path_exists\n", " r = conn.execute(\n", " 'MATCH (a:FnNode {domain: \"finance\"})-[:DEPENDS_ON* 1..5]->(b:FnNode {id: \"error_go_core\"}) '\n", " 'RETURN a.id LIMIT 1'\n", " )\n", " results['path_exists'] = len(r.get_as_df()) > 0\n", " \n", " # isolated\n", " r = conn.execute(\n", " 'MATCH (n:FnNode) WHERE NOT EXISTS { MATCH (n)-[:DEPENDS_ON]->() } '\n", " 'AND NOT EXISTS { MATCH ()-[:DEPENDS_ON]->(n) } RETURN n.id'\n", " )\n", " results['isolated'] = [row[0] for row in r.get_as_df().values]\n", " \n", " # type_users\n", " r = conn.execute(\n", " 'MATCH (a)-[:DEPENDS_ON {relation: \"uses_type\"}]->(b:FnNode {id: \"SMA_go_finance\"}) RETURN a.id'\n", " )\n", " results['type_users'] = [row[0] for row in r.get_as_df().values]\n", " \n", " return results\n", "\n", "# Benchmark\n", "path = os.path.join(DATA_DIR, 'kuzu')\n", "\n", "t0 = time.perf_counter()\n", "kuzu_db, kuzu_conn = kuzu_setup(node_map, valid_edges, path)\n", "kuzu_insert_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "kuzu_results = kuzu_queries(kuzu_conn)\n", "kuzu_query_time = time.perf_counter() - t0\n", "\n", "kuzu_disk = dir_size_mb(path)\n", "\n", "# Load from cold\n", "del kuzu_conn, kuzu_db\n", "t0 = time.perf_counter()\n", "kuzu_db2 = kuzu.Database(path)\n", "kuzu_conn2 = kuzu.Connection(kuzu_db2)\n", "r = kuzu_conn2.execute('MATCH (a:FnNode {id: \"filter_slice_go_core\"})-[:DEPENDS_ON]->(b) RETURN b.id')\n", "_ = r.get_as_df()\n", "kuzu_load_time = time.perf_counter() - t0\n", "del kuzu_conn2, kuzu_db2\n", "\n", "print(f'Kuzu:')\n", "print(f' Insert: {kuzu_insert_time*1000:.1f}ms')\n", "print(f' Queries (8): {kuzu_query_time*1000:.1f}ms')\n", "print(f' Load+query: {kuzu_load_time*1000:.1f}ms')\n", "print(f' Disco: {kuzu_disk:.2f}MB')\n", "print()\n", "for k, v in kuzu_results.items():\n", " if isinstance(v, list) and len(v) > 5:\n", " print(f' {k}: {len(v)} resultados — {v[:3]}...')\n", " else:\n", " print(f' {k}: {v}')" ] }, { "cell_type": "markdown", "id": "section-5", "metadata": {}, "source": [ "## 5. Backend: SQLite + CTEs recursivos" ] }, { "cell_type": "code", "execution_count": null, "id": "sqlite-impl", "metadata": {}, "outputs": [], "source": [ "def sqlite_setup(nodes, edges_list, path):\n", " dbpath = path + '.db'\n", " cleanup_path(dbpath)\n", " db = sqlite3.connect(dbpath)\n", " db.execute('CREATE TABLE nodes (id TEXT PRIMARY KEY, name TEXT, node_type TEXT, '\n", " 'kind TEXT, lang TEXT, domain TEXT, purity TEXT, description TEXT)')\n", " db.execute('CREATE TABLE edges (src TEXT, tgt TEXT, relation TEXT, '\n", " 'FOREIGN KEY(src) REFERENCES nodes(id), FOREIGN KEY(tgt) REFERENCES nodes(id))')\n", " db.execute('CREATE INDEX idx_edges_src ON edges(src)')\n", " db.execute('CREATE INDEX idx_edges_tgt ON edges(tgt)')\n", " db.execute('CREATE INDEX idx_edges_rel ON edges(relation)')\n", " db.execute('CREATE INDEX idx_nodes_domain ON nodes(domain)')\n", " \n", " db.executemany(\n", " 'INSERT INTO nodes VALUES (?,?,?,?,?,?,?,?)',\n", " [(nid, a.get('name',''), a.get('node_type',''), a.get('kind',''),\n", " a.get('lang',''), a.get('domain',''), a.get('purity',''),\n", " a.get('description','')) for nid, a in nodes.items()]\n", " )\n", " db.executemany('INSERT INTO edges VALUES (?,?,?)', edges_list)\n", " db.commit()\n", " return db\n", "\n", "def sqlite_queries(db):\n", " results = {}\n", " \n", " # direct_deps\n", " results['direct_deps'] = [r[0] for r in db.execute(\n", " 'SELECT tgt FROM edges WHERE src = \"filter_slice_go_core\"'\n", " ).fetchall()]\n", " \n", " # reverse_deps\n", " results['reverse_deps'] = [r[0] for r in db.execute(\n", " 'SELECT src FROM edges WHERE tgt = \"error_go_core\"'\n", " ).fetchall()]\n", " \n", " # two_hop (CTE recursivo)\n", " results['two_hop'] = [r[0] for r in db.execute(\n", " 'WITH hop1 AS (SELECT tgt FROM edges WHERE src = \"init_metabase_go_pipelines\"), '\n", " 'hop2 AS (SELECT DISTINCT e.tgt FROM edges e JOIN hop1 h ON e.src = h.tgt) '\n", " 'SELECT tgt FROM hop2'\n", " ).fetchall()]\n", " \n", " # domain_subgraph\n", " results['domain_subgraph'] = db.execute(\n", " 'SELECT e.src, e.tgt FROM edges e '\n", " 'JOIN nodes n1 ON e.src = n1.id JOIN nodes n2 ON e.tgt = n2.id '\n", " 'WHERE n1.domain = \"finance\" AND n2.domain = \"finance\"'\n", " ).fetchall()\n", " \n", " # most_connected\n", " results['most_connected'] = db.execute(\n", " 'SELECT id, (SELECT COUNT(*) FROM edges WHERE src=id) + '\n", " '(SELECT COUNT(*) FROM edges WHERE tgt=id) AS deg '\n", " 'FROM nodes ORDER BY deg DESC LIMIT 5'\n", " ).fetchall()\n", " \n", " # path_exists (CTE recursivo con limite de profundidad)\n", " results['path_exists'] = len(db.execute(\n", " 'WITH RECURSIVE reachable(id, depth) AS ('\n", " ' SELECT src, 0 FROM edges e JOIN nodes n ON e.src = n.id WHERE n.domain = \"finance\" '\n", " ' UNION '\n", " ' SELECT e.tgt, r.depth + 1 FROM edges e JOIN reachable r ON e.src = r.id WHERE r.depth < 5'\n", " ') SELECT 1 FROM reachable WHERE id = \"error_go_core\" LIMIT 1'\n", " ).fetchall()) > 0\n", " \n", " # isolated\n", " results['isolated'] = [r[0] for r in db.execute(\n", " 'SELECT n.id FROM nodes n '\n", " 'WHERE NOT EXISTS (SELECT 1 FROM edges WHERE src = n.id) '\n", " 'AND NOT EXISTS (SELECT 1 FROM edges WHERE tgt = n.id)'\n", " ).fetchall()]\n", " \n", " # type_users\n", " results['type_users'] = [r[0] for r in db.execute(\n", " 'SELECT src FROM edges WHERE tgt = \"SMA_go_finance\" AND relation = \"uses_type\"'\n", " ).fetchall()]\n", " \n", " return results\n", "\n", "# Benchmark\n", "path = os.path.join(DATA_DIR, 'sqlite_graph')\n", "\n", "t0 = time.perf_counter()\n", "sqlite_db = sqlite_setup(node_map, valid_edges, path)\n", "sqlite_insert_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "sqlite_results = sqlite_queries(sqlite_db)\n", "sqlite_query_time = time.perf_counter() - t0\n", "\n", "sqlite_db.close()\n", "sqlite_disk = dir_size_mb(path + '.db')\n", "\n", "t0 = time.perf_counter()\n", "db2 = sqlite3.connect(path + '.db')\n", "_ = db2.execute('SELECT tgt FROM edges WHERE src = \"filter_slice_go_core\"').fetchall()\n", "db2.close()\n", "sqlite_load_time = time.perf_counter() - t0\n", "\n", "print(f'SQLite + CTEs:')\n", "print(f' Insert: {sqlite_insert_time*1000:.1f}ms')\n", "print(f' Queries (8): {sqlite_query_time*1000:.1f}ms')\n", "print(f' Load+query: {sqlite_load_time*1000:.1f}ms')\n", "print(f' Disco: {sqlite_disk:.2f}MB')\n", "print()\n", "for k, v in sqlite_results.items():\n", " if isinstance(v, list) and len(v) > 5:\n", " print(f' {k}: {len(v)} resultados — {v[:3]}...')\n", " else:\n", " print(f' {k}: {v}')" ] }, { "cell_type": "markdown", "id": "section-6", "metadata": {}, "source": [ "## 6. Backend: RDFLib (SPARQL)" ] }, { "cell_type": "code", "execution_count": null, "id": "rdflib-impl", "metadata": {}, "outputs": [], "source": [ "from rdflib import Graph as RDFGraph, Namespace, Literal, URIRef\n", "from rdflib.namespace import RDF, RDFS\n", "\n", "FN = Namespace('http://fn-registry.local/')\n", "FNREL = Namespace('http://fn-registry.local/rel/')\n", "FNPROP = Namespace('http://fn-registry.local/prop/')\n", "\n", "def rdf_setup(nodes, edges_list, path):\n", " g = RDFGraph()\n", " g.bind('fn', FN)\n", " g.bind('fnrel', FNREL)\n", " g.bind('fnprop', FNPROP)\n", " \n", " for nid, attrs in nodes.items():\n", " uri = FN[nid]\n", " g.add((uri, RDF.type, FN['Function'] if attrs.get('node_type') == 'function' else FN['Type']))\n", " for prop in ['name', 'kind', 'lang', 'domain', 'purity', 'description']:\n", " val = attrs.get(prop, '')\n", " if val:\n", " g.add((uri, FNPROP[prop], Literal(val)))\n", " \n", " for src, tgt, rel in edges_list:\n", " g.add((FN[src], FNREL[rel], FN[tgt]))\n", " \n", " return g\n", "\n", "def rdf_queries(g):\n", " results = {}\n", " \n", " # direct_deps\n", " r = g.query('SELECT ?b WHERE { fn:filter_slice_go_core ?rel ?b . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n", " initNs={'fn': FN, 'fnrel': FNREL})\n", " results['direct_deps'] = [str(row[0]).replace(str(FN), '') for row in r]\n", " \n", " # reverse_deps\n", " r = g.query('SELECT ?a WHERE { ?a ?rel fn:error_go_core . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n", " initNs={'fn': FN, 'fnrel': FNREL})\n", " results['reverse_deps'] = [str(row[0]).replace(str(FN), '') for row in r]\n", " \n", " # two_hop\n", " r = g.query(\n", " 'SELECT DISTINCT ?c WHERE { fn:init_metabase_go_pipelines ?r1 ?b . ?b ?r2 ?c . '\n", " 'FILTER(STRSTARTS(STR(?r1), STR(fnrel:))) FILTER(STRSTARTS(STR(?r2), STR(fnrel:))) }',\n", " initNs={'fn': FN, 'fnrel': FNREL}\n", " )\n", " results['two_hop'] = [str(row[0]).replace(str(FN), '') for row in r]\n", " \n", " # domain_subgraph\n", " r = g.query(\n", " 'SELECT ?a ?b WHERE { ?a fnprop:domain \"finance\" . ?b fnprop:domain \"finance\" . '\n", " '?a ?rel ?b . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) }',\n", " initNs={'fn': FN, 'fnrel': FNREL, 'fnprop': FNPROP}\n", " )\n", " results['domain_subgraph'] = [(str(row[0]).replace(str(FN), ''), str(row[1]).replace(str(FN), '')) for row in r]\n", " \n", " # most_connected (SPARQL no tiene degree nativo, contamos)\n", " r = g.query(\n", " 'SELECT ?n (COUNT(DISTINCT ?e) AS ?deg) WHERE { '\n", " '{ ?n ?rel ?o . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) BIND(?o AS ?e) } '\n", " 'UNION '\n", " '{ ?s ?rel ?n . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) BIND(?s AS ?e) } '\n", " '} GROUP BY ?n ORDER BY DESC(?deg) LIMIT 5',\n", " initNs={'fn': FN, 'fnrel': FNREL}\n", " )\n", " results['most_connected'] = [(str(row[0]).replace(str(FN), ''), int(row[1])) for row in r]\n", " \n", " # path_exists (SPARQL 1.1 property paths, max 5 hops)\n", " r = g.query(\n", " 'ASK WHERE { ?a fnprop:domain \"finance\" . '\n", " '?a (fnrel:uses_function|fnrel:uses_type|fnrel:returns|fnrel:error_type){1,5} fn:error_go_core }',\n", " initNs={'fn': FN, 'fnrel': FNREL, 'fnprop': FNPROP}\n", " )\n", " results['path_exists'] = bool(r)\n", " \n", " # isolated\n", " r = g.query(\n", " 'SELECT ?n WHERE { ?n a ?type . FILTER(?type IN (fn:Function, fn:Type)) '\n", " 'FILTER NOT EXISTS { ?n ?rel ?o . FILTER(STRSTARTS(STR(?rel), STR(fnrel:))) } '\n", " 'FILTER NOT EXISTS { ?s ?rel2 ?n . FILTER(STRSTARTS(STR(?rel2), STR(fnrel:))) } }',\n", " initNs={'fn': FN, 'fnrel': FNREL}\n", " )\n", " results['isolated'] = [str(row[0]).replace(str(FN), '') for row in r]\n", " \n", " # type_users\n", " r = g.query('SELECT ?a WHERE { ?a fnrel:uses_type fn:SMA_go_finance }',\n", " initNs={'fn': FN, 'fnrel': FNREL})\n", " results['type_users'] = [str(row[0]).replace(str(FN), '') for row in r]\n", " \n", " return results\n", "\n", "# Benchmark\n", "path = os.path.join(DATA_DIR, 'rdflib')\n", "\n", "t0 = time.perf_counter()\n", "g_rdf = rdf_setup(node_map, valid_edges, path)\n", "rdf_insert_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "rdf_results = rdf_queries(g_rdf)\n", "rdf_query_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "g_rdf.serialize(destination=path + '.ttl', format='turtle')\n", "rdf_save_time = time.perf_counter() - t0\n", "\n", "rdf_disk = dir_size_mb(path + '.ttl')\n", "\n", "t0 = time.perf_counter()\n", "g2 = RDFGraph()\n", "g2.parse(path + '.ttl', format='turtle')\n", "_ = list(g2.query('SELECT ?b WHERE { fn:filter_slice_go_core ?r ?b . FILTER(STRSTARTS(STR(?r), STR(fnrel:))) }',\n", " initNs={'fn': FN, 'fnrel': FNREL}))\n", "rdf_load_time = time.perf_counter() - t0\n", "\n", "print(f'RDFLib: {len(g_rdf)} triples')\n", "print(f' Insert: {rdf_insert_time*1000:.1f}ms')\n", "print(f' Queries (8): {rdf_query_time*1000:.1f}ms')\n", "print(f' Save (turtle): {rdf_save_time*1000:.1f}ms')\n", "print(f' Load+query: {rdf_load_time*1000:.1f}ms')\n", "print(f' Disco: {rdf_disk:.2f}MB')\n", "print()\n", "for k, v in rdf_results.items():\n", " if isinstance(v, list) and len(v) > 5:\n", " print(f' {k}: {len(v)} resultados — {v[:3]}...')\n", " else:\n", " print(f' {k}: {v}')" ] }, { "cell_type": "markdown", "id": "section-7", "metadata": {}, "source": [ "## 7. Backend: igraph" ] }, { "cell_type": "code", "execution_count": null, "id": "igraph-impl", "metadata": {}, "outputs": [], "source": [ "import igraph as ig\n", "\n", "def igraph_setup(nodes, edges_list, path):\n", " node_ids = list(nodes.keys())\n", " id_to_idx = {nid: i for i, nid in enumerate(node_ids)}\n", " \n", " g = ig.Graph(directed=True)\n", " g.add_vertices(len(node_ids))\n", " g.vs['node_id'] = node_ids\n", " g.vs['name'] = [nodes[nid].get('name', '') for nid in node_ids]\n", " g.vs['node_type'] = [nodes[nid].get('node_type', '') for nid in node_ids]\n", " g.vs['domain'] = [nodes[nid].get('domain', '') for nid in node_ids]\n", " g.vs['purity'] = [nodes[nid].get('purity', '') for nid in node_ids]\n", " g.vs['kind'] = [nodes[nid].get('kind', '') for nid in node_ids]\n", " g.vs['lang'] = [nodes[nid].get('lang', '') for nid in node_ids]\n", " \n", " edge_tuples = [(id_to_idx[s], id_to_idx[t]) for s, t, _ in edges_list]\n", " edge_rels = [r for _, _, r in edges_list]\n", " g.add_edges(edge_tuples)\n", " g.es['relation'] = edge_rels\n", " \n", " return g, id_to_idx\n", "\n", "def igraph_queries(g, id_to_idx):\n", " results = {}\n", " idx_to_id = {v: k for k, v in id_to_idx.items()}\n", " \n", " # direct_deps\n", " if 'filter_slice_go_core' in id_to_idx:\n", " idx = id_to_idx['filter_slice_go_core']\n", " results['direct_deps'] = [idx_to_id[n] for n in g.neighbors(idx, mode='out')]\n", " else:\n", " results['direct_deps'] = []\n", " \n", " # reverse_deps\n", " if 'error_go_core' in id_to_idx:\n", " idx = id_to_idx['error_go_core']\n", " results['reverse_deps'] = [idx_to_id[n] for n in g.neighbors(idx, mode='in')]\n", " else:\n", " results['reverse_deps'] = []\n", " \n", " # two_hop\n", " if 'init_metabase_go_pipelines' in id_to_idx:\n", " idx = id_to_idx['init_metabase_go_pipelines']\n", " hop1 = g.neighbors(idx, mode='out')\n", " hop2 = set()\n", " for n in hop1:\n", " hop2.update(g.neighbors(n, mode='out'))\n", " results['two_hop'] = [idx_to_id[n] for n in hop2]\n", " else:\n", " results['two_hop'] = []\n", " \n", " # domain_subgraph\n", " finance_idxs = set(v.index for v in g.vs.select(domain='finance'))\n", " finance_edges = [(idx_to_id[e.source], idx_to_id[e.target])\n", " for e in g.es if e.source in finance_idxs and e.target in finance_idxs]\n", " results['domain_subgraph'] = finance_edges\n", " \n", " # most_connected\n", " degrees = [(idx_to_id[i], g.degree(i, mode='all')) for i in range(g.vcount())]\n", " degrees.sort(key=lambda x: -x[1])\n", " results['most_connected'] = degrees[:5]\n", " \n", " # path_exists\n", " if 'error_go_core' in id_to_idx:\n", " target = id_to_idx['error_go_core']\n", " finance_idxs_list = list(finance_idxs)\n", " has_path = any(\n", " len(g.get_shortest_paths(src, to=target, mode='out')[0]) > 0\n", " for src in finance_idxs_list if src != target\n", " )\n", " results['path_exists'] = has_path\n", " else:\n", " results['path_exists'] = False\n", " \n", " # isolated\n", " results['isolated'] = [idx_to_id[v.index] for v in g.vs if g.degree(v.index, mode='all') == 0]\n", " \n", " # type_users\n", " if 'SMA_go_finance' in id_to_idx:\n", " idx = id_to_idx['SMA_go_finance']\n", " preds = g.neighbors(idx, mode='in')\n", " # Filtrar por relacion uses_type\n", " type_user_idxs = []\n", " for p in preds:\n", " eid = g.get_eid(p, idx)\n", " if g.es[eid]['relation'] == 'uses_type':\n", " type_user_idxs.append(p)\n", " results['type_users'] = [idx_to_id[n] for n in type_user_idxs]\n", " else:\n", " results['type_users'] = []\n", " \n", " return results\n", "\n", "# Benchmark\n", "path = os.path.join(DATA_DIR, 'igraph')\n", "\n", "t0 = time.perf_counter()\n", "g_ig, ig_id_map = igraph_setup(node_map, valid_edges, path)\n", "ig_insert_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "ig_results = igraph_queries(g_ig, ig_id_map)\n", "ig_query_time = time.perf_counter() - t0\n", "\n", "t0 = time.perf_counter()\n", "g_ig.write_pickle(path + '.pickle')\n", "ig_save_time = time.perf_counter() - t0\n", "\n", "ig_disk = dir_size_mb(path + '.pickle')\n", "\n", "t0 = time.perf_counter()\n", "g_loaded = ig.Graph.Read_Pickle(path + '.pickle')\n", "_ = g_loaded.neighbors(0, mode='out')\n", "ig_load_time = time.perf_counter() - t0\n", "\n", "print(f'igraph: {g_ig.vcount()} vertices, {g_ig.ecount()} aristas')\n", "print(f' Insert: {ig_insert_time*1000:.1f}ms')\n", "print(f' Queries (8): {ig_query_time*1000:.1f}ms')\n", "print(f' Save: {ig_save_time*1000:.1f}ms')\n", "print(f' Load+query: {ig_load_time*1000:.1f}ms')\n", "print(f' Disco: {ig_disk:.2f}MB')\n", "print()\n", "for k, v in ig_results.items():\n", " if isinstance(v, list) and len(v) > 5:\n", " print(f' {k}: {len(v)} resultados — {v[:3]}...')\n", " else:\n", " print(f' {k}: {v}')" ] }, { "cell_type": "markdown", "id": "section-8", "metadata": {}, "source": [ "## 8. Tabla resumen y visualizaciones" ] }, { "cell_type": "code", "execution_count": null, "id": "summary", "metadata": {}, "outputs": [], "source": [ "summary_data = [\n", " {'Backend': 'NetworkX', 'Insert (ms)': round(nx_insert_time*1000, 1),\n", " 'Queries 8x (ms)': round(nx_query_time*1000, 1),\n", " 'Save (ms)': round(nx_save_time*1000, 1),\n", " 'Load+query (ms)': round(nx_load_time*1000, 1),\n", " 'Disk (MB)': round(nx_disk, 2),\n", " 'Query Language': 'Python API'},\n", " {'Backend': 'Kuzu', 'Insert (ms)': round(kuzu_insert_time*1000, 1),\n", " 'Queries 8x (ms)': round(kuzu_query_time*1000, 1),\n", " 'Save (ms)': 0, # auto-persist\n", " 'Load+query (ms)': round(kuzu_load_time*1000, 1),\n", " 'Disk (MB)': round(kuzu_disk, 2),\n", " 'Query Language': 'Cypher'},\n", " {'Backend': 'SQLite+CTE', 'Insert (ms)': round(sqlite_insert_time*1000, 1),\n", " 'Queries 8x (ms)': round(sqlite_query_time*1000, 1),\n", " 'Save (ms)': 0, # auto-persist\n", " 'Load+query (ms)': round(sqlite_load_time*1000, 1),\n", " 'Disk (MB)': round(sqlite_disk, 2),\n", " 'Query Language': 'SQL + CTEs'},\n", " {'Backend': 'RDFLib', 'Insert (ms)': round(rdf_insert_time*1000, 1),\n", " 'Queries 8x (ms)': round(rdf_query_time*1000, 1),\n", " 'Save (ms)': round(rdf_save_time*1000, 1),\n", " 'Load+query (ms)': round(rdf_load_time*1000, 1),\n", " 'Disk (MB)': round(rdf_disk, 2),\n", " 'Query Language': 'SPARQL'},\n", " {'Backend': 'igraph', 'Insert (ms)': round(ig_insert_time*1000, 1),\n", " 'Queries 8x (ms)': round(ig_query_time*1000, 1),\n", " 'Save (ms)': round(ig_save_time*1000, 1),\n", " 'Load+query (ms)': round(ig_load_time*1000, 1),\n", " 'Disk (MB)': round(ig_disk, 2),\n", " 'Query Language': 'Python API'},\n", "]\n", "\n", "df_summary = pd.DataFrame(summary_data)\n", "print(df_summary.to_string(index=False))\n", "print()\n", "\n", "# Grafico comparativo\n", "fig, axes = plt.subplots(1, 3, figsize=(18, 6))\n", "colors = {'NetworkX': '#e74c3c', 'Kuzu': '#3498db', 'SQLite+CTE': '#2ecc71', 'RDFLib': '#f39c12', 'igraph': '#9b59b6'}\n", "\n", "# Insert\n", "ax = axes[0]\n", "ax.barh(df_summary['Backend'], df_summary['Insert (ms)'], color=[colors[b] for b in df_summary['Backend']])\n", "ax.set_xlabel('ms')\n", "ax.set_title('Insert (nodos + aristas)')\n", "\n", "# Queries\n", "ax = axes[1]\n", "ax.barh(df_summary['Backend'], df_summary['Queries 8x (ms)'], color=[colors[b] for b in df_summary['Backend']])\n", "ax.set_xlabel('ms')\n", "ax.set_title('8 queries de traversal')\n", "\n", "# Load + query\n", "ax = axes[2]\n", "ax.barh(df_summary['Backend'], df_summary['Load+query (ms)'], color=[colors[b] for b in df_summary['Backend']])\n", "ax.set_xlabel('ms')\n", "ax.set_title('Cold start: load + 1 query')\n", "\n", "plt.suptitle(f'Comparativa de graph backends ({len(all_node_ids)} nodos, {len(valid_edges)} aristas)', fontsize=14)\n", "plt.tight_layout()\n", "plt.show()" ] }, { "cell_type": "markdown", "id": "section-9", "metadata": {}, "source": [ "## 9. Validacion cruzada de resultados\n", "\n", "Verificamos que todos los backends devuelven los mismos resultados para cada query." ] }, { "cell_type": "code", "execution_count": null, "id": "cross-validate", "metadata": {}, "outputs": [], "source": [ "all_backend_results = {\n", " 'NetworkX': nx_results,\n", " 'Kuzu': kuzu_results,\n", " 'SQLite+CTE': sqlite_results,\n", " 'RDFLib': rdf_results,\n", " 'igraph': ig_results,\n", "}\n", "\n", "print('Validacion cruzada de resultados:')\n", "print('=' * 60)\n", "\n", "for query_name in ['direct_deps', 'reverse_deps', 'two_hop', 'isolated', 'type_users', 'path_exists']:\n", " print(f'\\n{query_name}:')\n", " values = {}\n", " for backend, results in all_backend_results.items():\n", " val = results.get(query_name)\n", " if isinstance(val, list):\n", " values[backend] = sorted(str(v) for v in val)\n", " else:\n", " values[backend] = val\n", " \n", " # Comparar\n", " ref_backend = 'NetworkX'\n", " ref_val = values[ref_backend]\n", " all_match = True\n", " for backend, val in values.items():\n", " match = val == ref_val\n", " status = 'OK' if match else 'DIFF'\n", " if isinstance(val, list):\n", " print(f' {backend:12s}: {len(val)} items [{status}]')\n", " else:\n", " print(f' {backend:12s}: {val} [{status}]')\n", " if not match:\n", " all_match = False\n", " if isinstance(val, list) and isinstance(ref_val, list):\n", " extra = set(val) - set(ref_val)\n", " missing = set(ref_val) - set(val)\n", " if extra: print(f' extra: {list(extra)[:5]}')\n", " if missing: print(f' missing: {list(missing)[:5]}')" ] }, { "cell_type": "markdown", "id": "section-10", "metadata": {}, "source": [ "## 10. Conclusiones notebook 01\n", "\n", "Este notebook establece:\n", "- El grafo de dependencias del fn_registry cargado en 5 backends\n", "- Benchmark de rendimiento (insert, queries, persistencia)\n", "- Validacion cruzada de correctitud\n", "\n", "**Siguiente notebook (02):** LLM retrieval — usar `claude -p` para generar queries en cada lenguaje (Cypher, SQL, SPARQL, Python API) y evaluar correctitud vs las respuestas verificadas." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.7" } }, "nbformat": 4, "nbformat_minor": 5 }