155a6db824
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
521 lines
15 KiB
Go
521 lines
15 KiB
Go
package main
|
|
|
|
// REST handlers (read-only) for data_factory.db (issue 0097).
|
|
//
|
|
// Endpoints:
|
|
// GET /api/datafactory/nodes?kind=<kind>
|
|
// GET /api/datafactory/runs?node_id=<id>&limit=N
|
|
// GET /api/datafactory/databases
|
|
//
|
|
// All endpoints lazy-open data_factory.db on first request (creating the
|
|
// file + applying migrations if missing). If the open fails, returns 503.
|
|
// POST trigger is intentionally NOT implemented — sqlite_api is read-only;
|
|
// run insertion is done out-of-band by DAG steps / function wrappers.
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strconv"
|
|
)
|
|
|
|
// dataFactoryNode is the JSON row for /api/datafactory/nodes.
|
|
type dataFactoryNode struct {
|
|
ID string `json:"id"`
|
|
Kind string `json:"kind"`
|
|
Name string `json:"name"`
|
|
FunctionID string `json:"function_id"`
|
|
Description string `json:"description"`
|
|
ConfigJSON string `json:"config_json"`
|
|
ScheduleCron string `json:"schedule_cron"`
|
|
Enabled bool `json:"enabled"`
|
|
TagsCSV string `json:"tags_csv"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
}
|
|
|
|
// dataFactoryRun is the JSON row for /api/datafactory/runs.
|
|
type dataFactoryRun struct {
|
|
ID string `json:"id"`
|
|
NodeID string `json:"node_id"`
|
|
StartedAt string `json:"started_at"`
|
|
FinishedAt string `json:"finished_at"`
|
|
Status string `json:"status"`
|
|
RowsIn int64 `json:"rows_in"`
|
|
RowsOut int64 `json:"rows_out"`
|
|
KbIn int64 `json:"kb_in"`
|
|
KbOut int64 `json:"kb_out"`
|
|
DurationMS int64 `json:"duration_ms"`
|
|
Trigger string `json:"trigger"`
|
|
Error string `json:"error"`
|
|
Notes string `json:"notes"`
|
|
StorageDBID string `json:"storage_db_id"`
|
|
StorageTable string `json:"storage_table"`
|
|
}
|
|
|
|
// dataFactoryDatabase is the JSON row for /api/datafactory/databases.
|
|
type dataFactoryDatabase struct {
|
|
ID string `json:"id"`
|
|
Kind string `json:"kind"`
|
|
Label string `json:"label"`
|
|
URI string `json:"uri"`
|
|
Description string `json:"description"`
|
|
TagsCSV string `json:"tags_csv"`
|
|
LastSeenAt string `json:"last_seen_at"`
|
|
TableCount int64 `json:"table_count"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
}
|
|
|
|
func (s *Server) handleDataFactoryNodes(w http.ResponseWriter, r *http.Request) {
|
|
db, err := s.dataFactoryDB()
|
|
if err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
|
defer cancel()
|
|
|
|
kind := r.URL.Query().Get("kind")
|
|
var rows *sql.Rows
|
|
if kind != "" {
|
|
rows, err = db.QueryContext(ctx, `
|
|
SELECT id, kind, name, function_id, description, config_json,
|
|
schedule_cron, enabled, tags_csv, created_at, updated_at
|
|
FROM nodes
|
|
WHERE kind = ?
|
|
ORDER BY kind, name`, kind)
|
|
} else {
|
|
rows, err = db.QueryContext(ctx, `
|
|
SELECT id, kind, name, function_id, description, config_json,
|
|
schedule_cron, enabled, tags_csv, created_at, updated_at
|
|
FROM nodes
|
|
ORDER BY kind, name`)
|
|
}
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
nodes := make([]dataFactoryNode, 0, 16)
|
|
for rows.Next() {
|
|
var n dataFactoryNode
|
|
var enabled int
|
|
if err := rows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description,
|
|
&n.ConfigJSON, &n.ScheduleCron, &enabled, &n.TagsCSV,
|
|
&n.CreatedAt, &n.UpdatedAt); err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
n.Enabled = enabled != 0
|
|
nodes = append(nodes, n)
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"nodes": nodes, "count": len(nodes)})
|
|
}
|
|
|
|
func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
|
|
db, err := s.dataFactoryDB()
|
|
if err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
|
defer cancel()
|
|
|
|
nodeID := r.URL.Query().Get("node_id")
|
|
limit := 100
|
|
if l := r.URL.Query().Get("limit"); l != "" {
|
|
if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 {
|
|
limit = n
|
|
}
|
|
}
|
|
|
|
var rows *sql.Rows
|
|
if nodeID != "" {
|
|
rows, err = db.QueryContext(ctx, `
|
|
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
|
|
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
|
|
FROM runs
|
|
WHERE node_id = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT ?`, nodeID, limit)
|
|
} else {
|
|
rows, err = db.QueryContext(ctx, `
|
|
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
|
|
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
|
|
FROM runs
|
|
ORDER BY started_at DESC
|
|
LIMIT ?`, limit)
|
|
}
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
runs := make([]dataFactoryRun, 0, 16)
|
|
for rows.Next() {
|
|
var rr dataFactoryRun
|
|
if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt,
|
|
&rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut,
|
|
&rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes,
|
|
&rr.StorageDBID, &rr.StorageTable); err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
runs = append(runs, rr)
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"runs": runs, "count": len(runs)})
|
|
}
|
|
|
|
func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Request) {
|
|
db, err := s.dataFactoryDB()
|
|
if err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
|
defer cancel()
|
|
|
|
rows, err := db.QueryContext(ctx, `
|
|
SELECT id, kind, label, uri, description, tags_csv, last_seen_at,
|
|
table_count, size_bytes, created_at, updated_at
|
|
FROM databases
|
|
ORDER BY kind, label`)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := make([]dataFactoryDatabase, 0, 16)
|
|
for rows.Next() {
|
|
var d dataFactoryDatabase
|
|
if err := rows.Scan(&d.ID, &d.Kind, &d.Label, &d.URI, &d.Description,
|
|
&d.TagsCSV, &d.LastSeenAt, &d.TableCount, &d.SizeBytes,
|
|
&d.CreatedAt, &d.UpdatedAt); err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
out = append(out, d)
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)})
|
|
}
|
|
|
|
// dataFactoryTable is the JSON row for /api/datafactory/tables.
|
|
type dataFactoryTable struct {
|
|
DatabaseID string `json:"database_id"`
|
|
DatabaseLabel string `json:"database_label"`
|
|
DatabaseKind string `json:"database_kind"`
|
|
TableName string `json:"table_name"`
|
|
RowCount int64 `json:"row_count"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// tableNameRE restricts table names to alphanumeric + underscore + dot.
|
|
// This prevents SQL injection in the preview queries.
|
|
var tableNameRE = regexp.MustCompile(`^[A-Za-z0-9_.]+$`)
|
|
|
|
// previewColumn is a JSON column descriptor for the preview response.
|
|
type previewColumn struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
// previewResponse is the JSON shape of /api/datafactory/preview.
|
|
type previewResponse struct {
|
|
DatabaseID string `json:"database_id"`
|
|
TableName string `json:"table_name"`
|
|
Columns []previewColumn `json:"columns"`
|
|
Rows [][]string `json:"rows"`
|
|
TotalRows int64 `json:"total_rows"`
|
|
Limit int `json:"limit"`
|
|
Offset int `json:"offset"`
|
|
}
|
|
|
|
// handleDataFactoryPreview serves:
|
|
//
|
|
// GET /api/datafactory/preview?database_id=<id>&table=<name>&limit=N&offset=N
|
|
//
|
|
// Supports kind="sqlite" and kind="duckdb" databases registered in
|
|
// data_factory.databases. Returns up to 1000 rows (default 100).
|
|
func (s *Server) handleDataFactoryPreview(w http.ResponseWriter, r *http.Request) {
|
|
q := r.URL.Query()
|
|
|
|
databaseID := q.Get("database_id")
|
|
tableName := q.Get("table")
|
|
if databaseID == "" || tableName == "" {
|
|
writeError(w, http.StatusBadRequest, "database_id and table are required")
|
|
return
|
|
}
|
|
if !tableNameRE.MatchString(tableName) {
|
|
writeError(w, http.StatusBadRequest,
|
|
fmt.Sprintf("invalid table name %q: only [A-Za-z0-9_.] allowed", tableName))
|
|
return
|
|
}
|
|
|
|
limit := 100
|
|
if l := q.Get("limit"); l != "" {
|
|
if n, err := strconv.Atoi(l); err == nil {
|
|
switch {
|
|
case n < 1:
|
|
limit = 1
|
|
case n > 1000:
|
|
limit = 1000
|
|
default:
|
|
limit = n
|
|
}
|
|
}
|
|
}
|
|
offset := 0
|
|
if o := q.Get("offset"); o != "" {
|
|
if n, err := strconv.Atoi(o); err == nil && n >= 0 {
|
|
offset = n
|
|
}
|
|
}
|
|
|
|
// Look up the database in data_factory.db.
|
|
dfDB, err := s.dataFactoryDB()
|
|
if err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), 30*queryTimeout)
|
|
defer cancel()
|
|
|
|
var dbKind, dbURI string
|
|
row := dfDB.QueryRowContext(ctx, `SELECT kind, uri FROM databases WHERE id = ?`, databaseID)
|
|
if err := row.Scan(&dbKind, &dbURI); err != nil {
|
|
if err == sql.ErrNoRows {
|
|
writeError(w, http.StatusNotFound, fmt.Sprintf("database_id %q not found", databaseID))
|
|
} else {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
}
|
|
return
|
|
}
|
|
|
|
// Resolve absolute path.
|
|
root := os.Getenv("FN_REGISTRY_ROOT")
|
|
if root == "" {
|
|
root = "/home/lucas/fn_registry"
|
|
}
|
|
dbPath := dbURI
|
|
if !filepath.IsAbs(dbPath) {
|
|
dbPath = filepath.Join(root, dbURI)
|
|
}
|
|
|
|
var resp previewResponse
|
|
resp.DatabaseID = databaseID
|
|
resp.TableName = tableName
|
|
resp.Limit = limit
|
|
resp.Offset = offset
|
|
|
|
switch dbKind {
|
|
case "sqlite":
|
|
if err := previewSQLite(dbPath, tableName, limit, offset, &resp); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "sqlite preview: "+err.Error())
|
|
return
|
|
}
|
|
case "duckdb":
|
|
if err := previewDuckDB(dbPath, tableName, limit, offset, &resp); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "duckdb preview: "+err.Error())
|
|
return
|
|
}
|
|
default:
|
|
writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported database kind: %q", dbKind))
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// previewSQLite fetches schema + rows from a SQLite database.
|
|
func previewSQLite(path, table string, limit, offset int, out *previewResponse) error {
|
|
db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
// Schema via PRAGMA table_info.
|
|
rows, err := db.Query(fmt.Sprintf(`PRAGMA table_info("%s")`, table))
|
|
if err != nil {
|
|
return fmt.Errorf("PRAGMA table_info: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var cid int
|
|
var name, typ, dflt string
|
|
var notNull, pk int
|
|
if err := rows.Scan(&cid, &name, &typ, ¬Null, &dflt, &pk); err != nil {
|
|
return err
|
|
}
|
|
out.Columns = append(out.Columns, previewColumn{Name: name, Type: typ})
|
|
}
|
|
_ = rows.Close()
|
|
if len(out.Columns) == 0 {
|
|
return fmt.Errorf("table %q not found or has no columns", table)
|
|
}
|
|
|
|
// Total row count.
|
|
cntRow := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM "%s"`, table))
|
|
_ = cntRow.Scan(&out.TotalRows)
|
|
|
|
// Data rows.
|
|
dataRows, err := db.Query(
|
|
fmt.Sprintf(`SELECT * FROM "%s" LIMIT ? OFFSET ?`, table),
|
|
limit, offset,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("SELECT: %w", err)
|
|
}
|
|
defer dataRows.Close()
|
|
|
|
nCols := len(out.Columns)
|
|
for dataRows.Next() {
|
|
vals := make([]interface{}, nCols)
|
|
ptrs := make([]interface{}, nCols)
|
|
for i := range vals {
|
|
ptrs[i] = &vals[i]
|
|
}
|
|
if err := dataRows.Scan(ptrs...); err != nil {
|
|
return err
|
|
}
|
|
row := make([]string, nCols)
|
|
for i, v := range vals {
|
|
switch t := v.(type) {
|
|
case nil:
|
|
row[i] = ""
|
|
case []byte:
|
|
row[i] = string(t)
|
|
default:
|
|
row[i] = fmt.Sprintf("%v", t)
|
|
}
|
|
}
|
|
out.Rows = append(out.Rows, row)
|
|
}
|
|
if out.Rows == nil {
|
|
out.Rows = [][]string{}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// duckPreviewResult is used to unmarshal the Python subprocess output.
|
|
type duckPreviewResult struct {
|
|
Columns []previewColumn `json:"columns"`
|
|
Rows [][]string `json:"rows"`
|
|
TotalRows int64 `json:"total_rows"`
|
|
}
|
|
|
|
// previewDuckDB fetches schema + rows from a DuckDB database via Python subprocess.
|
|
func previewDuckDB(path, table string, limit, offset int, out *previewResponse) error {
|
|
pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3"
|
|
script := fmt.Sprintf(
|
|
`import duckdb, json, sys
|
|
path, table, lim, off = %q, %q, %d, %d
|
|
c = duckdb.connect(path, read_only=True)
|
|
cols = c.execute(f"DESCRIBE \"{table}\"").fetchall()
|
|
cols_json = [{"name": r[0], "type": r[1]} for r in cols]
|
|
rows = c.execute(f"SELECT * FROM \"{table}\" LIMIT {lim} OFFSET {off}").fetchall()
|
|
rows_str = [[str(v) if v is not None else "" for v in r] for r in rows]
|
|
total = c.execute(f"SELECT count(*) FROM \"{table}\"").fetchone()[0]
|
|
print(json.dumps({"columns": cols_json, "rows": rows_str, "total_rows": total}))
|
|
`, path, table, limit, offset)
|
|
|
|
cmd := exec.Command(pyBin, "-c", script)
|
|
rawOut, err := cmd.Output()
|
|
if err != nil {
|
|
// Capture stderr for better error messages.
|
|
if ee, ok := err.(*exec.ExitError); ok {
|
|
return fmt.Errorf("python subprocess: %w: %s", err, string(ee.Stderr))
|
|
}
|
|
return fmt.Errorf("python subprocess: %w", err)
|
|
}
|
|
|
|
var result duckPreviewResult
|
|
if err := json.Unmarshal(rawOut, &result); err != nil {
|
|
return fmt.Errorf("parse subprocess output: %w", err)
|
|
}
|
|
out.Columns = result.Columns
|
|
out.TotalRows = result.TotalRows
|
|
out.Rows = result.Rows
|
|
if out.Rows == nil {
|
|
out.Rows = [][]string{}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) handleDataFactoryTables(w http.ResponseWriter, r *http.Request) {
|
|
db, err := s.dataFactoryDB()
|
|
if err != nil {
|
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
|
defer cancel()
|
|
|
|
rows, err := db.QueryContext(ctx, `
|
|
SELECT id, kind, label, uri FROM databases ORDER BY kind, label`)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, err.Error())
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Resolve registry root for relative URIs.
|
|
root := os.Getenv("FN_REGISTRY_ROOT")
|
|
if root == "" {
|
|
root = "/home/lucas/fn_registry"
|
|
}
|
|
|
|
out := make([]dataFactoryTable, 0, 16)
|
|
for rows.Next() {
|
|
var id, kind, label, uri string
|
|
if err := rows.Scan(&id, &kind, &label, &uri); err != nil {
|
|
continue
|
|
}
|
|
path := uri
|
|
if !filepath.IsAbs(path) {
|
|
path = filepath.Join(root, uri)
|
|
}
|
|
// Existence check.
|
|
if _, errStat := os.Stat(path); errStat != nil {
|
|
out = append(out, dataFactoryTable{
|
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
|
TableName: "(file not found)", Error: errStat.Error(),
|
|
})
|
|
continue
|
|
}
|
|
tables, errTables := listTablesForDatabase(kind, path)
|
|
if errTables != nil {
|
|
out = append(out, dataFactoryTable{
|
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
|
TableName: "(error)", Error: errTables.Error(),
|
|
})
|
|
continue
|
|
}
|
|
for _, t := range tables {
|
|
out = append(out, dataFactoryTable{
|
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
|
TableName: t.Name, RowCount: t.RowCount,
|
|
})
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusOK, map[string]any{"tables": out, "count": len(out)})
|
|
}
|