package main // REST handlers (read-only) for data_factory.db (issue 0097). // // Endpoints: // GET /api/datafactory/nodes?kind= // GET /api/datafactory/runs?node_id=&limit=N // GET /api/datafactory/databases // // All endpoints lazy-open data_factory.db on first request (creating the // file + applying migrations if missing). If the open fails, returns 503. // POST trigger is intentionally NOT implemented — sqlite_api is read-only; // run insertion is done out-of-band by DAG steps / function wrappers. import ( "context" "database/sql" "encoding/json" "fmt" "net/http" "os" "os/exec" "path/filepath" "regexp" "strconv" ) // dataFactoryNode is the JSON row for /api/datafactory/nodes. type dataFactoryNode struct { ID string `json:"id"` Kind string `json:"kind"` Name string `json:"name"` FunctionID string `json:"function_id"` Description string `json:"description"` ConfigJSON string `json:"config_json"` ScheduleCron string `json:"schedule_cron"` Enabled bool `json:"enabled"` TagsCSV string `json:"tags_csv"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } // dataFactoryRun is the JSON row for /api/datafactory/runs. type dataFactoryRun struct { ID string `json:"id"` NodeID string `json:"node_id"` StartedAt string `json:"started_at"` FinishedAt string `json:"finished_at"` Status string `json:"status"` RowsIn int64 `json:"rows_in"` RowsOut int64 `json:"rows_out"` KbIn int64 `json:"kb_in"` KbOut int64 `json:"kb_out"` DurationMS int64 `json:"duration_ms"` Trigger string `json:"trigger"` Error string `json:"error"` Notes string `json:"notes"` StorageDBID string `json:"storage_db_id"` StorageTable string `json:"storage_table"` } // dataFactoryDatabase is the JSON row for /api/datafactory/databases. type dataFactoryDatabase struct { ID string `json:"id"` Kind string `json:"kind"` Label string `json:"label"` URI string `json:"uri"` Description string `json:"description"` TagsCSV string `json:"tags_csv"` LastSeenAt string `json:"last_seen_at"` TableCount int64 `json:"table_count"` SizeBytes int64 `json:"size_bytes"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } func (s *Server) handleDataFactoryNodes(w http.ResponseWriter, r *http.Request) { db, err := s.dataFactoryDB() if err != nil { writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) return } ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) defer cancel() kind := r.URL.Query().Get("kind") var rows *sql.Rows if kind != "" { rows, err = db.QueryContext(ctx, ` SELECT id, kind, name, function_id, description, config_json, schedule_cron, enabled, tags_csv, created_at, updated_at FROM nodes WHERE kind = ? ORDER BY kind, name`, kind) } else { rows, err = db.QueryContext(ctx, ` SELECT id, kind, name, function_id, description, config_json, schedule_cron, enabled, tags_csv, created_at, updated_at FROM nodes ORDER BY kind, name`) } if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer rows.Close() nodes := make([]dataFactoryNode, 0, 16) for rows.Next() { var n dataFactoryNode var enabled int if err := rows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description, &n.ConfigJSON, &n.ScheduleCron, &enabled, &n.TagsCSV, &n.CreatedAt, &n.UpdatedAt); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } n.Enabled = enabled != 0 nodes = append(nodes, n) } writeJSON(w, http.StatusOK, map[string]any{"nodes": nodes, "count": len(nodes)}) } func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) { db, err := s.dataFactoryDB() if err != nil { writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) return } ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) defer cancel() nodeID := r.URL.Query().Get("node_id") limit := 100 if l := r.URL.Query().Get("limit"); l != "" { if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 { limit = n } } var rows *sql.Rows if nodeID != "" { rows, err = db.QueryContext(ctx, ` SELECT id, node_id, started_at, COALESCE(finished_at,''), status, rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes, COALESCE(storage_db_id,''), COALESCE(storage_table,'') FROM runs WHERE node_id = ? ORDER BY started_at DESC LIMIT ?`, nodeID, limit) } else { rows, err = db.QueryContext(ctx, ` SELECT id, node_id, started_at, COALESCE(finished_at,''), status, rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes, COALESCE(storage_db_id,''), COALESCE(storage_table,'') FROM runs ORDER BY started_at DESC LIMIT ?`, limit) } if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer rows.Close() runs := make([]dataFactoryRun, 0, 16) for rows.Next() { var rr dataFactoryRun if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt, &rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut, &rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes, &rr.StorageDBID, &rr.StorageTable); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } runs = append(runs, rr) } writeJSON(w, http.StatusOK, map[string]any{"runs": runs, "count": len(runs)}) } func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Request) { db, err := s.dataFactoryDB() if err != nil { writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) return } ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) defer cancel() rows, err := db.QueryContext(ctx, ` SELECT id, kind, label, uri, description, tags_csv, last_seen_at, table_count, size_bytes, created_at, updated_at FROM databases ORDER BY kind, label`) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer rows.Close() out := make([]dataFactoryDatabase, 0, 16) for rows.Next() { var d dataFactoryDatabase if err := rows.Scan(&d.ID, &d.Kind, &d.Label, &d.URI, &d.Description, &d.TagsCSV, &d.LastSeenAt, &d.TableCount, &d.SizeBytes, &d.CreatedAt, &d.UpdatedAt); err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } out = append(out, d) } writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)}) } // dataFactoryTable is the JSON row for /api/datafactory/tables. type dataFactoryTable struct { DatabaseID string `json:"database_id"` DatabaseLabel string `json:"database_label"` DatabaseKind string `json:"database_kind"` TableName string `json:"table_name"` RowCount int64 `json:"row_count"` Error string `json:"error,omitempty"` } // tableNameRE restricts table names to alphanumeric + underscore + dot. // This prevents SQL injection in the preview queries. var tableNameRE = regexp.MustCompile(`^[A-Za-z0-9_.]+$`) // previewColumn is a JSON column descriptor for the preview response. type previewColumn struct { Name string `json:"name"` Type string `json:"type"` } // previewResponse is the JSON shape of /api/datafactory/preview. type previewResponse struct { DatabaseID string `json:"database_id"` TableName string `json:"table_name"` Columns []previewColumn `json:"columns"` Rows [][]string `json:"rows"` TotalRows int64 `json:"total_rows"` Limit int `json:"limit"` Offset int `json:"offset"` } // handleDataFactoryPreview serves: // // GET /api/datafactory/preview?database_id=&table=&limit=N&offset=N // // Supports kind="sqlite" and kind="duckdb" databases registered in // data_factory.databases. Returns up to 1000 rows (default 100). func (s *Server) handleDataFactoryPreview(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() databaseID := q.Get("database_id") tableName := q.Get("table") if databaseID == "" || tableName == "" { writeError(w, http.StatusBadRequest, "database_id and table are required") return } if !tableNameRE.MatchString(tableName) { writeError(w, http.StatusBadRequest, fmt.Sprintf("invalid table name %q: only [A-Za-z0-9_.] allowed", tableName)) return } limit := 100 if l := q.Get("limit"); l != "" { if n, err := strconv.Atoi(l); err == nil { switch { case n < 1: limit = 1 case n > 1000: limit = 1000 default: limit = n } } } offset := 0 if o := q.Get("offset"); o != "" { if n, err := strconv.Atoi(o); err == nil && n >= 0 { offset = n } } // Look up the database in data_factory.db. dfDB, err := s.dataFactoryDB() if err != nil { writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) return } ctx, cancel := context.WithTimeout(r.Context(), 30*queryTimeout) defer cancel() var dbKind, dbURI string row := dfDB.QueryRowContext(ctx, `SELECT kind, uri FROM databases WHERE id = ?`, databaseID) if err := row.Scan(&dbKind, &dbURI); err != nil { if err == sql.ErrNoRows { writeError(w, http.StatusNotFound, fmt.Sprintf("database_id %q not found", databaseID)) } else { writeError(w, http.StatusInternalServerError, err.Error()) } return } // Resolve absolute path. root := os.Getenv("FN_REGISTRY_ROOT") if root == "" { root = "/home/lucas/fn_registry" } dbPath := dbURI if !filepath.IsAbs(dbPath) { dbPath = filepath.Join(root, dbURI) } var resp previewResponse resp.DatabaseID = databaseID resp.TableName = tableName resp.Limit = limit resp.Offset = offset switch dbKind { case "sqlite": if err := previewSQLite(dbPath, tableName, limit, offset, &resp); err != nil { writeError(w, http.StatusInternalServerError, "sqlite preview: "+err.Error()) return } case "duckdb": if err := previewDuckDB(dbPath, tableName, limit, offset, &resp); err != nil { writeError(w, http.StatusInternalServerError, "duckdb preview: "+err.Error()) return } default: writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported database kind: %q", dbKind)) return } writeJSON(w, http.StatusOK, resp) } // previewSQLite fetches schema + rows from a SQLite database. func previewSQLite(path, table string, limit, offset int, out *previewResponse) error { db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro") if err != nil { return err } defer db.Close() // Schema via PRAGMA table_info. rows, err := db.Query(fmt.Sprintf(`PRAGMA table_info("%s")`, table)) if err != nil { return fmt.Errorf("PRAGMA table_info: %w", err) } defer rows.Close() for rows.Next() { var cid int var name, typ, dflt string var notNull, pk int if err := rows.Scan(&cid, &name, &typ, ¬Null, &dflt, &pk); err != nil { return err } out.Columns = append(out.Columns, previewColumn{Name: name, Type: typ}) } _ = rows.Close() if len(out.Columns) == 0 { return fmt.Errorf("table %q not found or has no columns", table) } // Total row count. cntRow := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM "%s"`, table)) _ = cntRow.Scan(&out.TotalRows) // Data rows. dataRows, err := db.Query( fmt.Sprintf(`SELECT * FROM "%s" LIMIT ? OFFSET ?`, table), limit, offset, ) if err != nil { return fmt.Errorf("SELECT: %w", err) } defer dataRows.Close() nCols := len(out.Columns) for dataRows.Next() { vals := make([]interface{}, nCols) ptrs := make([]interface{}, nCols) for i := range vals { ptrs[i] = &vals[i] } if err := dataRows.Scan(ptrs...); err != nil { return err } row := make([]string, nCols) for i, v := range vals { switch t := v.(type) { case nil: row[i] = "" case []byte: row[i] = string(t) default: row[i] = fmt.Sprintf("%v", t) } } out.Rows = append(out.Rows, row) } if out.Rows == nil { out.Rows = [][]string{} } return nil } // duckPreviewResult is used to unmarshal the Python subprocess output. type duckPreviewResult struct { Columns []previewColumn `json:"columns"` Rows [][]string `json:"rows"` TotalRows int64 `json:"total_rows"` } // previewDuckDB fetches schema + rows from a DuckDB database via Python subprocess. func previewDuckDB(path, table string, limit, offset int, out *previewResponse) error { pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3" script := fmt.Sprintf( `import duckdb, json, sys path, table, lim, off = %q, %q, %d, %d c = duckdb.connect(path, read_only=True) cols = c.execute(f"DESCRIBE \"{table}\"").fetchall() cols_json = [{"name": r[0], "type": r[1]} for r in cols] rows = c.execute(f"SELECT * FROM \"{table}\" LIMIT {lim} OFFSET {off}").fetchall() rows_str = [[str(v) if v is not None else "" for v in r] for r in rows] total = c.execute(f"SELECT count(*) FROM \"{table}\"").fetchone()[0] print(json.dumps({"columns": cols_json, "rows": rows_str, "total_rows": total})) `, path, table, limit, offset) cmd := exec.Command(pyBin, "-c", script) rawOut, err := cmd.Output() if err != nil { // Capture stderr for better error messages. if ee, ok := err.(*exec.ExitError); ok { return fmt.Errorf("python subprocess: %w: %s", err, string(ee.Stderr)) } return fmt.Errorf("python subprocess: %w", err) } var result duckPreviewResult if err := json.Unmarshal(rawOut, &result); err != nil { return fmt.Errorf("parse subprocess output: %w", err) } out.Columns = result.Columns out.TotalRows = result.TotalRows out.Rows = result.Rows if out.Rows == nil { out.Rows = [][]string{} } return nil } func (s *Server) handleDataFactoryTables(w http.ResponseWriter, r *http.Request) { db, err := s.dataFactoryDB() if err != nil { writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) return } ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) defer cancel() rows, err := db.QueryContext(ctx, ` SELECT id, kind, label, uri FROM databases ORDER BY kind, label`) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) return } defer rows.Close() // Resolve registry root for relative URIs. root := os.Getenv("FN_REGISTRY_ROOT") if root == "" { root = "/home/lucas/fn_registry" } out := make([]dataFactoryTable, 0, 16) for rows.Next() { var id, kind, label, uri string if err := rows.Scan(&id, &kind, &label, &uri); err != nil { continue } path := uri if !filepath.IsAbs(path) { path = filepath.Join(root, uri) } // Existence check. if _, errStat := os.Stat(path); errStat != nil { out = append(out, dataFactoryTable{ DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, TableName: "(file not found)", Error: errStat.Error(), }) continue } tables, errTables := listTablesForDatabase(kind, path) if errTables != nil { out = append(out, dataFactoryTable{ DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, TableName: "(error)", Error: errTables.Error(), }) continue } for _, t := range tables { out = append(out, dataFactoryTable{ DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind, TableName: t.Name, RowCount: t.RowCount, }) } } writeJSON(w, http.StatusOK, map[string]any{"tables": out, "count": len(out)}) }