diff --git a/datafactory_tables.go b/datafactory_tables.go new file mode 100644 index 0000000..e8e2df2 --- /dev/null +++ b/datafactory_tables.go @@ -0,0 +1,92 @@ +package main + +import ( + "database/sql" + "fmt" + "os/exec" + "strconv" + "strings" +) + +type tableInfo struct { + Name string + RowCount int64 +} + +// listTablesForDatabase returns the list of tables for a given DB file. +// Supports kind="sqlite" (uses database/sql + sqlite3 driver) and +// kind="duckdb" (uses python venv subprocess since go-duckdb is not linked +// in sqlite_api — it lives in the root module, not here). +func listTablesForDatabase(kind, path string) ([]tableInfo, error) { + switch kind { + case "sqlite": + return listSQLiteTables(path) + case "duckdb": + return listDuckDBTablesViaCLI(path) + default: + return nil, fmt.Errorf("unsupported kind: %s", kind) + } +} + +func listSQLiteTables(path string) ([]tableInfo, error) { + db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro") + if err != nil { + return nil, err + } + defer db.Close() + + rows, err := db.Query(` + SELECT name FROM sqlite_master + WHERE type='table' + AND name NOT LIKE 'sqlite_%' + AND name != '_migrations' + ORDER BY name`) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []tableInfo + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + continue + } + var cnt int64 + row := db.QueryRow(`SELECT count(*) FROM "` + name + `"`) + _ = row.Scan(&cnt) + out = append(out, tableInfo{Name: name, RowCount: cnt}) + } + return out, nil +} + +func listDuckDBTablesViaCLI(path string) ([]tableInfo, error) { + // Use the python venv's duckdb module to avoid linking go-duckdb. + pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3" + script := fmt.Sprintf( + `import duckdb,sys +c=duckdb.connect(%q, read_only=True) +for (name,) in c.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main' ORDER BY table_name").fetchall(): + cnt=c.execute(f'SELECT count(*) FROM "{name}"').fetchone()[0] + print(f"{name}\t{cnt}") +`, path) + cmd := exec.Command(pyBin, "-c", script) + out, err := cmd.CombinedOutput() + if err != nil { + return nil, fmt.Errorf("duckdb via python: %w (output: %s)", err, string(out)) + } + + var result []tableInfo + for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if line == "" { + continue + } + parts := strings.SplitN(line, "\t", 2) + if len(parts) != 2 { + continue + } + cnt, _ := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64) + result = append(result, tableInfo{Name: parts[0], RowCount: cnt}) + } + return result, nil +} diff --git a/handlers.go b/handlers.go index 4a22b1f..a11acca 100644 --- a/handlers.go +++ b/handlers.go @@ -93,6 +93,9 @@ func (s *Server) Routes(mux *http.ServeMux) { mux.HandleFunc("GET /api/datafactory/nodes", s.handleDataFactoryNodes) mux.HandleFunc("GET /api/datafactory/runs", s.handleDataFactoryRuns) mux.HandleFunc("GET /api/datafactory/databases", s.handleDataFactoryDatabases) + mux.HandleFunc("GET /api/datafactory/tables", s.handleDataFactoryTables) + mux.HandleFunc("GET /api/datafactory/preview", s.handleDataFactoryPreview) + mux.HandleFunc("GET /api/functions/{id}", s.handleFunctionByID) mux.HandleFunc("GET /api/ws/datafactory", s.handleDataFactoryEvents(s.dfHub)) } diff --git a/handlers_datafactory.go b/handlers_datafactory.go index 3d31719..bd83908 100644 --- a/handlers_datafactory.go +++ b/handlers_datafactory.go @@ -15,7 +15,13 @@ package main import ( "context" "database/sql" + "encoding/json" + "fmt" "net/http" + "os" + "os/exec" + "path/filepath" + "regexp" "strconv" ) @@ -49,6 +55,8 @@ type dataFactoryRun struct { Trigger string `json:"trigger"` Error string `json:"error"` Notes string `json:"notes"` + StorageDBID string `json:"storage_db_id"` + StorageTable string `json:"storage_table"` } // dataFactoryDatabase is the JSON row for /api/datafactory/databases. @@ -136,7 +144,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) { if nodeID != "" { rows, err = db.QueryContext(ctx, ` SELECT id, node_id, started_at, COALESCE(finished_at,''), status, - rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes, + COALESCE(storage_db_id,''), COALESCE(storage_table,'') FROM runs WHERE node_id = ? ORDER BY started_at DESC @@ -144,7 +153,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) { } else { rows, err = db.QueryContext(ctx, ` SELECT id, node_id, started_at, COALESCE(finished_at,''), status, - rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes, + COALESCE(storage_db_id,''), COALESCE(storage_table,'') FROM runs ORDER BY started_at DESC LIMIT ?`, limit) @@ -160,7 +170,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) { var rr dataFactoryRun if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt, &rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut, - &rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes); err != nil { + &rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes, + &rr.StorageDBID, &rr.StorageTable); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } @@ -203,3 +214,307 @@ func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Reque } writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)}) } + +// dataFactoryTable is the JSON row for /api/datafactory/tables. +type dataFactoryTable struct { + DatabaseID string `json:"database_id"` + DatabaseLabel string `json:"database_label"` + DatabaseKind string `json:"database_kind"` + TableName string `json:"table_name"` + RowCount int64 `json:"row_count"` + Error string `json:"error,omitempty"` +} + +// tableNameRE restricts table names to alphanumeric + underscore + dot. +// This prevents SQL injection in the preview queries. +var tableNameRE = regexp.MustCompile(`^[A-Za-z0-9_.]+$`) + +// previewColumn is a JSON column descriptor for the preview response. +type previewColumn struct { + Name string `json:"name"` + Type string `json:"type"` +} + +// previewResponse is the JSON shape of /api/datafactory/preview. +type previewResponse struct { + DatabaseID string `json:"database_id"` + TableName string `json:"table_name"` + Columns []previewColumn `json:"columns"` + Rows [][]string `json:"rows"` + TotalRows int64 `json:"total_rows"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} + +// handleDataFactoryPreview serves: +// +// GET /api/datafactory/preview?database_id=&table=&limit=N&offset=N +// +// Supports kind="sqlite" and kind="duckdb" databases registered in +// data_factory.databases. Returns up to 1000 rows (default 100). +func (s *Server) handleDataFactoryPreview(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + + databaseID := q.Get("database_id") + tableName := q.Get("table") + if databaseID == "" || tableName == "" { + writeError(w, http.StatusBadRequest, "database_id and table are required") + return + } + if !tableNameRE.MatchString(tableName) { + writeError(w, http.StatusBadRequest, + fmt.Sprintf("invalid table name %q: only [A-Za-z0-9_.] allowed", tableName)) + return + } + + limit := 100 + if l := q.Get("limit"); l != "" { + if n, err := strconv.Atoi(l); err == nil { + switch { + case n < 1: + limit = 1 + case n > 1000: + limit = 1000 + default: + limit = n + } + } + } + offset := 0 + if o := q.Get("offset"); o != "" { + if n, err := strconv.Atoi(o); err == nil && n >= 0 { + offset = n + } + } + + // Look up the database in data_factory.db. + dfDB, err := s.dataFactoryDB() + if err != nil { + writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 30*queryTimeout) + defer cancel() + + var dbKind, dbURI string + row := dfDB.QueryRowContext(ctx, `SELECT kind, uri FROM databases WHERE id = ?`, databaseID) + if err := row.Scan(&dbKind, &dbURI); err != nil { + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, fmt.Sprintf("database_id %q not found", databaseID)) + } else { + writeError(w, http.StatusInternalServerError, err.Error()) + } + return + } + + // Resolve absolute path. + root := os.Getenv("FN_REGISTRY_ROOT") + if root == "" { + root = "/home/lucas/fn_registry" + } + dbPath := dbURI + if !filepath.IsAbs(dbPath) { + dbPath = filepath.Join(root, dbURI) + } + + var resp previewResponse + resp.DatabaseID = databaseID + resp.TableName = tableName + resp.Limit = limit + resp.Offset = offset + + switch dbKind { + case "sqlite": + if err := previewSQLite(dbPath, tableName, limit, offset, &resp); err != nil { + writeError(w, http.StatusInternalServerError, "sqlite preview: "+err.Error()) + return + } + case "duckdb": + if err := previewDuckDB(dbPath, tableName, limit, offset, &resp); err != nil { + writeError(w, http.StatusInternalServerError, "duckdb preview: "+err.Error()) + return + } + default: + writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported database kind: %q", dbKind)) + return + } + + writeJSON(w, http.StatusOK, resp) +} + +// previewSQLite fetches schema + rows from a SQLite database. +func previewSQLite(path, table string, limit, offset int, out *previewResponse) error { + db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro") + if err != nil { + return err + } + defer db.Close() + + // Schema via PRAGMA table_info. + rows, err := db.Query(fmt.Sprintf(`PRAGMA table_info("%s")`, table)) + if err != nil { + return fmt.Errorf("PRAGMA table_info: %w", err) + } + defer rows.Close() + for rows.Next() { + var cid int + var name, typ, dflt string + var notNull, pk int + if err := rows.Scan(&cid, &name, &typ, ¬Null, &dflt, &pk); err != nil { + return err + } + out.Columns = append(out.Columns, previewColumn{Name: name, Type: typ}) + } + _ = rows.Close() + if len(out.Columns) == 0 { + return fmt.Errorf("table %q not found or has no columns", table) + } + + // Total row count. + cntRow := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM "%s"`, table)) + _ = cntRow.Scan(&out.TotalRows) + + // Data rows. + dataRows, err := db.Query( + fmt.Sprintf(`SELECT * FROM "%s" LIMIT ? OFFSET ?`, table), + limit, offset, + ) + if err != nil { + return fmt.Errorf("SELECT: %w", err) + } + defer dataRows.Close() + + nCols := len(out.Columns) + for dataRows.Next() { + vals := make([]interface{}, nCols) + ptrs := make([]interface{}, nCols) + for i := range vals { + ptrs[i] = &vals[i] + } + if err := dataRows.Scan(ptrs...); err != nil { + return err + } + row := make([]string, nCols) + for i, v := range vals { + switch t := v.(type) { + case nil: + row[i] = "" + case []byte: + row[i] = string(t) + default: + row[i] = fmt.Sprintf("%v", t) + } + } + out.Rows = append(out.Rows, row) + } + if out.Rows == nil { + out.Rows = [][]string{} + } + return nil +} + +// duckPreviewResult is used to unmarshal the Python subprocess output. +type duckPreviewResult struct { + Columns []previewColumn `json:"columns"` + Rows [][]string `json:"rows"` + TotalRows int64 `json:"total_rows"` +} + +// previewDuckDB fetches schema + rows from a DuckDB database via Python subprocess. +func previewDuckDB(path, table string, limit, offset int, out *previewResponse) error { + pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3" + script := fmt.Sprintf( + `import duckdb, json, sys +path, table, lim, off = %q, %q, %d, %d +c = duckdb.connect(path, read_only=True) +cols = c.execute(f"DESCRIBE \"{table}\"").fetchall() +cols_json = [{"name": r[0], "type": r[1]} for r in cols] +rows = c.execute(f"SELECT * FROM \"{table}\" LIMIT {lim} OFFSET {off}").fetchall() +rows_str = [[str(v) if v is not None else "" for v in r] for r in rows] +total = c.execute(f"SELECT count(*) FROM \"{table}\"").fetchone()[0] +print(json.dumps({"columns": cols_json, "rows": rows_str, "total_rows": total})) +`, path, table, limit, offset) + + cmd := exec.Command(pyBin, "-c", script) + rawOut, err := cmd.Output() + if err != nil { + // Capture stderr for better error messages. + if ee, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("python subprocess: %w: %s", err, string(ee.Stderr)) + } + return fmt.Errorf("python subprocess: %w", err) + } + + var result duckPreviewResult + if err := json.Unmarshal(rawOut, &result); err != nil { + return fmt.Errorf("parse subprocess output: %w", err) + } + out.Columns = result.Columns + out.TotalRows = result.TotalRows + out.Rows = result.Rows + if out.Rows == nil { + out.Rows = [][]string{} + } + return nil +} + +func (s *Server) handleDataFactoryTables(w http.ResponseWriter, r *http.Request) { + db, err := s.dataFactoryDB() + if err != nil { + writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) + defer cancel() + + rows, err := db.QueryContext(ctx, ` + SELECT id, kind, label, uri FROM databases ORDER BY kind, label`) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + // Resolve registry root for relative URIs. + root := os.Getenv("FN_REGISTRY_ROOT") + if root == "" { + root = "/home/lucas/fn_registry" + } + + out := make([]dataFactoryTable, 0, 16) + for rows.Next() { + var id, kind, label, uri string + if err := rows.Scan(&id, &kind, &label, &uri); err != nil { + continue + } + path := uri + if !filepath.IsAbs(path) { + path = filepath.Join(root, uri) + } + // Existence check. + if _, errStat := os.Stat(path); errStat != nil { + out = append(out, dataFactoryTable{ + DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, + TableName: "(file not found)", Error: errStat.Error(), + }) + continue + } + tables, errTables := listTablesForDatabase(kind, path) + if errTables != nil { + out = append(out, dataFactoryTable{ + DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, + TableName: "(error)", Error: errTables.Error(), + }) + continue + } + for _, t := range tables { + out = append(out, dataFactoryTable{ + DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, + TableName: t.Name, RowCount: t.RowCount, + }) + } + } + writeJSON(w, http.StatusOK, map[string]any{"tables": out, "count": len(out)}) +} diff --git a/handlers_functions.go b/handlers_functions.go new file mode 100644 index 0000000..ff3207b --- /dev/null +++ b/handlers_functions.go @@ -0,0 +1,132 @@ +package main + +import ( + "context" + "database/sql" + "net/http" + "os" + "path/filepath" + "sync" +) + +var ( + registryDBOnce sync.Once + registryDBConn *sql.DB + registryDBErr error +) + +func resolveRegistryDBPath() string { + if p := os.Getenv("FN_REGISTRY_DB"); p != "" { + return p + } + if root := os.Getenv("FN_REGISTRY_ROOT"); root != "" { + return filepath.Join(root, "registry.db") + } + return "/home/lucas/fn_registry/registry.db" +} + +func openRegistryDB() (*sql.DB, error) { + registryDBOnce.Do(func() { + path := resolveRegistryDBPath() + // Read-only access — registry.db es source of truth gestionada via `fn index`. + db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro&_query_only=1") + if err != nil { + registryDBErr = err + return + } + registryDBConn = db + }) + return registryDBConn, registryDBErr +} + +func (s *Server) handleFunctionByID(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + writeError(w, http.StatusBadRequest, "id is required") + return + } + + db, err := openRegistryDB() + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) + defer cancel() + + // Try functions table first. + const fnQuery = `SELECT id, name, kind, lang, domain, version, purity, signature, + description, tags, returns, returns_optional, error_type, file_path, code, documentation + FROM functions WHERE id = ?` + var ( + fnID, name, kind, lang, domain, version, purity, signature string + description, tags, returns, errorType, filePath string + code, documentation string + returnsOptional bool + ) + err = db.QueryRowContext(ctx, fnQuery, id).Scan( + &fnID, &name, &kind, &lang, &domain, &version, &purity, &signature, + &description, &tags, &returns, &returnsOptional, &errorType, &filePath, + &code, &documentation, + ) + if err == nil { + writeJSON(w, http.StatusOK, map[string]any{ + "entity": "function", + "id": fnID, + "name": name, + "kind": kind, + "lang": lang, + "domain": domain, + "version": version, + "purity": purity, + "signature": signature, + "description": description, + "tags": tags, + "returns": returns, + "returns_optional": returnsOptional, + "error_type": errorType, + "file_path": filePath, + "code": code, + "documentation": documentation, + }) + return + } + if err != sql.ErrNoRows { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + // Fallback: types table. + const typeQuery = `SELECT id, name, lang, domain, version, description, tags, definition, file_path + FROM types WHERE id = ?` + var ( + tID, tName, tLang, tDomain, tVersion string + tDescription, tTags, tDefinition string + tFilePath string + ) + err = db.QueryRowContext(ctx, typeQuery, id).Scan( + &tID, &tName, &tLang, &tDomain, &tVersion, + &tDescription, &tTags, &tDefinition, &tFilePath, + ) + if err == nil { + writeJSON(w, http.StatusOK, map[string]any{ + "entity": "type", + "id": tID, + "name": tName, + "lang": tLang, + "domain": tDomain, + "version": tVersion, + "description": tDescription, + "tags": tTags, + "definition": tDefinition, + "file_path": tFilePath, + }) + return + } + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "not_found") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) +}