chore: auto-commit (5 archivos)
- handlers.go - handlers_test.go - main.go - datafactory_events.go - handlers_datafactory.go Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,387 @@
|
||||
package main
|
||||
|
||||
// WebSocket event hub for data_factory.db `runs` table (issue 0097).
|
||||
//
|
||||
// Mirrors CallMonitorHub in events.go but watches `runs` of data_factory.db.
|
||||
// Lazy: ticker only runs while >=1 subscriber is connected.
|
||||
// Snapshot at connect = last 50 runs + active nodes + watermark.
|
||||
// Delta every wsTickInterval = rows with rowid > watermark.
|
||||
//
|
||||
// Reuses subscriber/wsMessage/clientCmd from events.go — those types are
|
||||
// generic enough for any append-only event stream.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
)
|
||||
|
||||
const dfSnapshotLimit = 50
|
||||
|
||||
// dfNode is the JSON shape served by the snapshot's `nodes` field.
|
||||
type dfNode struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name"`
|
||||
FunctionID string `json:"function_id"`
|
||||
Description string `json:"description"`
|
||||
Enabled bool `json:"enabled"`
|
||||
TagsCSV string `json:"tags_csv"`
|
||||
}
|
||||
|
||||
// dfRun is the JSON contract for snapshot.runs and delta.runs.
|
||||
type dfRun struct {
|
||||
Rowid int64 `json:"rowid"`
|
||||
ID string `json:"id"`
|
||||
NodeID string `json:"node_id"`
|
||||
StartedAt string `json:"started_at"`
|
||||
FinishedAt string `json:"finished_at"`
|
||||
Status string `json:"status"`
|
||||
RowsIn int64 `json:"rows_in"`
|
||||
RowsOut int64 `json:"rows_out"`
|
||||
KbIn int64 `json:"kb_in"`
|
||||
KbOut int64 `json:"kb_out"`
|
||||
DurationMS int64 `json:"duration_ms"`
|
||||
Trigger string `json:"trigger"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// dfMessage is what travels on the WS. Distinct from wsMessage to avoid
|
||||
// schema collision with call_monitor's KPIs/rows.
|
||||
type dfMessage struct {
|
||||
Type string `json:"type"` // "snapshot" | "delta" | "ping"
|
||||
Watermark int64 `json:"watermark,omitempty"`
|
||||
Nodes []dfNode `json:"nodes,omitempty"`
|
||||
Runs []dfRun `json:"runs,omitempty"`
|
||||
ServerTime int64 `json:"server_time"`
|
||||
}
|
||||
|
||||
type dfSubscriber struct {
|
||||
conn *websocket.Conn
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
out chan dfMessage
|
||||
watermark int64
|
||||
}
|
||||
|
||||
// DataFactoryHub gestiona broadcast WS sobre `runs` table de data_factory.db.
|
||||
//
|
||||
// Holds a direct *sql.DB pointer (lazy-opened RW by Server). NOT pulled from
|
||||
// DBPool because data_factory.db is opened by datafactory.Open(), not by the
|
||||
// pool's read-only discovery flow.
|
||||
type DataFactoryHub struct {
|
||||
getDB func() (*sql.DB, error)
|
||||
|
||||
mu sync.Mutex
|
||||
subscribers map[*dfSubscriber]struct{}
|
||||
tickerStop chan struct{}
|
||||
tickerOn bool
|
||||
watermark int64
|
||||
lastEventAt time.Time
|
||||
}
|
||||
|
||||
func NewDataFactoryHub(getDB func() (*sql.DB, error)) *DataFactoryHub {
|
||||
return &DataFactoryHub{
|
||||
getDB: getDB,
|
||||
subscribers: make(map[*dfSubscriber]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) register(s *dfSubscriber) {
|
||||
h.mu.Lock()
|
||||
h.subscribers[s] = struct{}{}
|
||||
shouldStart := !h.tickerOn
|
||||
if shouldStart {
|
||||
h.tickerStop = make(chan struct{})
|
||||
h.tickerOn = true
|
||||
h.lastEventAt = time.Now()
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
if shouldStart {
|
||||
go h.tickerLoop()
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) unregister(s *dfSubscriber) {
|
||||
h.mu.Lock()
|
||||
if _, ok := h.subscribers[s]; !ok {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(h.subscribers, s)
|
||||
close(s.out)
|
||||
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
||||
if shouldStop {
|
||||
close(h.tickerStop)
|
||||
h.tickerOn = false
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) tickerLoop() {
|
||||
interval := wsTickInterval
|
||||
t := time.NewTimer(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.tickerStop:
|
||||
return
|
||||
case <-t.C:
|
||||
rows, max, err := h.fetchSince(h.getWatermark())
|
||||
if err != nil {
|
||||
log.Printf("[ws-df] fetchSince: %v", err)
|
||||
} else if len(rows) > 0 {
|
||||
h.setWatermark(max)
|
||||
h.recordActivity()
|
||||
h.broadcast(dfMessage{
|
||||
Type: "delta",
|
||||
Watermark: max,
|
||||
Runs: rows,
|
||||
ServerTime: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
|
||||
interval = wsTickIntervalIdle
|
||||
} else {
|
||||
interval = wsTickInterval
|
||||
}
|
||||
t.Reset(interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) getWatermark() int64 {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.watermark
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) setWatermark(v int64) {
|
||||
h.mu.Lock()
|
||||
if v > h.watermark {
|
||||
h.watermark = v
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) recordActivity() {
|
||||
h.mu.Lock()
|
||||
h.lastEventAt = time.Now()
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) lastActivityAt() time.Time {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.lastEventAt
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) fetchSince(since int64) ([]dfRun, int64, error) {
|
||||
db, err := h.getDB()
|
||||
if err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
rows, err := db.Query(`
|
||||
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
||||
FROM runs
|
||||
WHERE rowid > ?
|
||||
ORDER BY rowid ASC
|
||||
LIMIT 500`, since)
|
||||
if err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]dfRun, 0, 16)
|
||||
max := since
|
||||
for rows.Next() {
|
||||
var r dfRun
|
||||
if err := rows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
||||
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
||||
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
||||
return nil, since, err
|
||||
}
|
||||
out = append(out, r)
|
||||
if r.Rowid > max {
|
||||
max = r.Rowid
|
||||
}
|
||||
}
|
||||
return out, max, rows.Err()
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) fetchSnapshot() ([]dfNode, []dfRun, int64, error) {
|
||||
db, err := h.getDB()
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
|
||||
// Active nodes (enabled=1).
|
||||
nodeRows, err := db.Query(`
|
||||
SELECT id, kind, name, function_id, description, enabled, tags_csv
|
||||
FROM nodes
|
||||
WHERE enabled = 1
|
||||
ORDER BY kind, name`)
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
defer nodeRows.Close()
|
||||
|
||||
nodes := make([]dfNode, 0, 16)
|
||||
for nodeRows.Next() {
|
||||
var n dfNode
|
||||
var enabled int
|
||||
if err := nodeRows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID,
|
||||
&n.Description, &enabled, &n.TagsCSV); err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
n.Enabled = enabled != 0
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
if err := nodeRows.Err(); err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
|
||||
// Last 50 runs (newest first).
|
||||
runRows, err := db.Query(`
|
||||
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
||||
FROM runs
|
||||
ORDER BY rowid DESC
|
||||
LIMIT ?`, dfSnapshotLimit)
|
||||
if err != nil {
|
||||
return nodes, nil, 0, err
|
||||
}
|
||||
defer runRows.Close()
|
||||
|
||||
runs := make([]dfRun, 0, dfSnapshotLimit)
|
||||
max := int64(0)
|
||||
for runRows.Next() {
|
||||
var r dfRun
|
||||
if err := runRows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
||||
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
||||
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
||||
return nodes, nil, 0, err
|
||||
}
|
||||
runs = append(runs, r)
|
||||
if r.Rowid > max {
|
||||
max = r.Rowid
|
||||
}
|
||||
}
|
||||
return nodes, runs, max, runRows.Err()
|
||||
}
|
||||
|
||||
func (h *DataFactoryHub) broadcast(msg dfMessage) {
|
||||
h.mu.Lock()
|
||||
subs := make([]*dfSubscriber, 0, len(h.subscribers))
|
||||
for s := range h.subscribers {
|
||||
subs = append(subs, s)
|
||||
}
|
||||
h.mu.Unlock()
|
||||
|
||||
for _, s := range subs {
|
||||
select {
|
||||
case s.out <- msg:
|
||||
default:
|
||||
log.Printf("[ws-df] dropping frame for slow subscriber")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleDataFactoryEvents upgradea HTTP a WS y mantiene la conexion.
|
||||
// Endpoint: GET /api/ws/datafactory
|
||||
func (s *Server) handleDataFactoryEvents(hub *DataFactoryHub) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||
InsecureSkipVerify: true,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[ws-df] accept: %v", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close(websocket.StatusInternalError, "closing")
|
||||
|
||||
ctx, cancel := context.WithCancel(r.Context())
|
||||
defer cancel()
|
||||
|
||||
sub := &dfSubscriber{
|
||||
conn: conn,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
out: make(chan dfMessage, 64),
|
||||
}
|
||||
hub.register(sub)
|
||||
defer hub.unregister(sub)
|
||||
|
||||
nodes, runs, watermark, err := hub.fetchSnapshot()
|
||||
if err != nil {
|
||||
log.Printf("[ws-df] snapshot: %v", err)
|
||||
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
||||
return
|
||||
}
|
||||
hub.setWatermark(watermark)
|
||||
if err := wsjson.Write(ctx, conn, dfMessage{
|
||||
Type: "snapshot",
|
||||
Watermark: watermark,
|
||||
Nodes: nodes,
|
||||
Runs: runs,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
readErr := make(chan error, 1)
|
||||
go func() {
|
||||
for {
|
||||
var cmd clientCmd
|
||||
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
||||
readErr <- err
|
||||
return
|
||||
}
|
||||
if cmd.Watermark > 0 {
|
||||
rows, max, err := hub.fetchSince(cmd.Watermark)
|
||||
if err == nil && len(rows) > 0 {
|
||||
hub.setWatermark(max)
|
||||
select {
|
||||
case sub.out <- dfMessage{
|
||||
Type: "delta",
|
||||
Watermark: max,
|
||||
Runs: rows,
|
||||
ServerTime: time.Now().Unix(),
|
||||
}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-readErr:
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
case msg, ok := <-sub.out:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
|
||||
err := wsjson.Write(wctx, conn, msg)
|
||||
wcancel()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
+54
-5
@@ -2,12 +2,15 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"fn-registry/apps/data_factory/datafactory"
|
||||
"fn-registry/functions/infra"
|
||||
)
|
||||
|
||||
@@ -18,14 +21,48 @@ type Server struct {
|
||||
pool *DBPool
|
||||
registryRoot string // raiz del fn_registry (para exec fn ...)
|
||||
hub *CallMonitorHub
|
||||
dfHub *DataFactoryHub
|
||||
|
||||
// data_factory.db (RW) is opened lazily on first request and reused.
|
||||
dfPath string
|
||||
dfMigrationsDir string
|
||||
dfMu sync.Mutex
|
||||
dfDB *sql.DB
|
||||
dfErr error // sticky if a previous open failed
|
||||
}
|
||||
|
||||
func NewServer(pool *DBPool, registryRoot string) *Server {
|
||||
return &Server{
|
||||
func NewServer(pool *DBPool, registryRoot, dfPath, dfMigrationsDir string) *Server {
|
||||
s := &Server{
|
||||
pool: pool,
|
||||
registryRoot: registryRoot,
|
||||
hub: NewCallMonitorHub(pool),
|
||||
dfPath: dfPath,
|
||||
dfMigrationsDir: dfMigrationsDir,
|
||||
}
|
||||
s.dfHub = NewDataFactoryHub(s.dataFactoryDB)
|
||||
return s
|
||||
}
|
||||
|
||||
// dataFactoryDB returns a RW connection to data_factory.db, lazy-opened on
|
||||
// first call. Subsequent calls return the cached connection. If the open
|
||||
// fails it is cached as a sticky error to avoid retry storms; retry after
|
||||
// process restart.
|
||||
func (s *Server) dataFactoryDB() (*sql.DB, error) {
|
||||
s.dfMu.Lock()
|
||||
defer s.dfMu.Unlock()
|
||||
if s.dfDB != nil {
|
||||
return s.dfDB, nil
|
||||
}
|
||||
if s.dfErr != nil {
|
||||
return nil, s.dfErr
|
||||
}
|
||||
db, err := datafactory.Open(s.dfPath, s.dfMigrationsDir)
|
||||
if err != nil {
|
||||
s.dfErr = err
|
||||
return nil, err
|
||||
}
|
||||
s.dfDB = db
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// Routes registers all API routes on the given mux.
|
||||
@@ -50,6 +87,13 @@ func (s *Server) Routes(mux *http.ServeMux) {
|
||||
// Issue 0086: WebSocket live stream de ops:call_monitor (calls table).
|
||||
// Hub global con ticker bajo demanda (solo corre con >=1 subscriber).
|
||||
mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub))
|
||||
|
||||
// Issue 0097: data_factory.db (nodes, runs, databases).
|
||||
// REST read-only + WS live stream sobre `runs`.
|
||||
mux.HandleFunc("GET /api/datafactory/nodes", s.handleDataFactoryNodes)
|
||||
mux.HandleFunc("GET /api/datafactory/runs", s.handleDataFactoryRuns)
|
||||
mux.HandleFunc("GET /api/datafactory/databases", s.handleDataFactoryDatabases)
|
||||
mux.HandleFunc("GET /api/ws/datafactory", s.handleDataFactoryEvents(s.dfHub))
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -62,9 +106,14 @@ func (s *Server) handleDatabases(w http.ResponseWriter, r *http.Request) {
|
||||
Alias string `json:"alias"`
|
||||
Kind string `json:"kind"`
|
||||
}
|
||||
out := make([]dbInfo, len(entries))
|
||||
for i, e := range entries {
|
||||
out[i] = dbInfo{Alias: e.Alias, Kind: e.Kind}
|
||||
out := make([]dbInfo, 0, len(entries)+1)
|
||||
for _, e := range entries {
|
||||
out = append(out, dbInfo{Alias: e.Alias, Kind: e.Kind})
|
||||
}
|
||||
// Surface data_factory.db too. It is opened RW outside the pool but
|
||||
// belongs in the discovery list so dashboards can see it.
|
||||
if s.dfPath != "" {
|
||||
out = append(out, dbInfo{Alias: "data_factory", Kind: "data_factory"})
|
||||
}
|
||||
writeJSON(w, http.StatusOK, out)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,205 @@
|
||||
package main
|
||||
|
||||
// REST handlers (read-only) for data_factory.db (issue 0097).
|
||||
//
|
||||
// Endpoints:
|
||||
// GET /api/datafactory/nodes?kind=<kind>
|
||||
// GET /api/datafactory/runs?node_id=<id>&limit=N
|
||||
// GET /api/datafactory/databases
|
||||
//
|
||||
// All endpoints lazy-open data_factory.db on first request (creating the
|
||||
// file + applying migrations if missing). If the open fails, returns 503.
|
||||
// POST trigger is intentionally NOT implemented — sqlite_api is read-only;
|
||||
// run insertion is done out-of-band by DAG steps / function wrappers.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// dataFactoryNode is the JSON row for /api/datafactory/nodes.
|
||||
type dataFactoryNode struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
Name string `json:"name"`
|
||||
FunctionID string `json:"function_id"`
|
||||
Description string `json:"description"`
|
||||
ConfigJSON string `json:"config_json"`
|
||||
ScheduleCron string `json:"schedule_cron"`
|
||||
Enabled bool `json:"enabled"`
|
||||
TagsCSV string `json:"tags_csv"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
// dataFactoryRun is the JSON row for /api/datafactory/runs.
|
||||
type dataFactoryRun struct {
|
||||
ID string `json:"id"`
|
||||
NodeID string `json:"node_id"`
|
||||
StartedAt string `json:"started_at"`
|
||||
FinishedAt string `json:"finished_at"`
|
||||
Status string `json:"status"`
|
||||
RowsIn int64 `json:"rows_in"`
|
||||
RowsOut int64 `json:"rows_out"`
|
||||
KbIn int64 `json:"kb_in"`
|
||||
KbOut int64 `json:"kb_out"`
|
||||
DurationMS int64 `json:"duration_ms"`
|
||||
Trigger string `json:"trigger"`
|
||||
Error string `json:"error"`
|
||||
Notes string `json:"notes"`
|
||||
}
|
||||
|
||||
// dataFactoryDatabase is the JSON row for /api/datafactory/databases.
|
||||
type dataFactoryDatabase struct {
|
||||
ID string `json:"id"`
|
||||
Kind string `json:"kind"`
|
||||
Label string `json:"label"`
|
||||
URI string `json:"uri"`
|
||||
Description string `json:"description"`
|
||||
TagsCSV string `json:"tags_csv"`
|
||||
LastSeenAt string `json:"last_seen_at"`
|
||||
TableCount int64 `json:"table_count"`
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
func (s *Server) handleDataFactoryNodes(w http.ResponseWriter, r *http.Request) {
|
||||
db, err := s.dataFactoryDB()
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||
defer cancel()
|
||||
|
||||
kind := r.URL.Query().Get("kind")
|
||||
var rows *sql.Rows
|
||||
if kind != "" {
|
||||
rows, err = db.QueryContext(ctx, `
|
||||
SELECT id, kind, name, function_id, description, config_json,
|
||||
schedule_cron, enabled, tags_csv, created_at, updated_at
|
||||
FROM nodes
|
||||
WHERE kind = ?
|
||||
ORDER BY kind, name`, kind)
|
||||
} else {
|
||||
rows, err = db.QueryContext(ctx, `
|
||||
SELECT id, kind, name, function_id, description, config_json,
|
||||
schedule_cron, enabled, tags_csv, created_at, updated_at
|
||||
FROM nodes
|
||||
ORDER BY kind, name`)
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
nodes := make([]dataFactoryNode, 0, 16)
|
||||
for rows.Next() {
|
||||
var n dataFactoryNode
|
||||
var enabled int
|
||||
if err := rows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description,
|
||||
&n.ConfigJSON, &n.ScheduleCron, &enabled, &n.TagsCSV,
|
||||
&n.CreatedAt, &n.UpdatedAt); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
n.Enabled = enabled != 0
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"nodes": nodes, "count": len(nodes)})
|
||||
}
|
||||
|
||||
func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
|
||||
db, err := s.dataFactoryDB()
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||
defer cancel()
|
||||
|
||||
nodeID := r.URL.Query().Get("node_id")
|
||||
limit := 100
|
||||
if l := r.URL.Query().Get("limit"); l != "" {
|
||||
if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 {
|
||||
limit = n
|
||||
}
|
||||
}
|
||||
|
||||
var rows *sql.Rows
|
||||
if nodeID != "" {
|
||||
rows, err = db.QueryContext(ctx, `
|
||||
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes
|
||||
FROM runs
|
||||
WHERE node_id = ?
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?`, nodeID, limit)
|
||||
} else {
|
||||
rows, err = db.QueryContext(ctx, `
|
||||
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes
|
||||
FROM runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?`, limit)
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
runs := make([]dataFactoryRun, 0, 16)
|
||||
for rows.Next() {
|
||||
var rr dataFactoryRun
|
||||
if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt,
|
||||
&rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut,
|
||||
&rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
runs = append(runs, rr)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"runs": runs, "count": len(runs)})
|
||||
}
|
||||
|
||||
func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Request) {
|
||||
db, err := s.dataFactoryDB()
|
||||
if err != nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||
defer cancel()
|
||||
|
||||
rows, err := db.QueryContext(ctx, `
|
||||
SELECT id, kind, label, uri, description, tags_csv, last_seen_at,
|
||||
table_count, size_bytes, created_at, updated_at
|
||||
FROM databases
|
||||
ORDER BY kind, label`)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := make([]dataFactoryDatabase, 0, 16)
|
||||
for rows.Next() {
|
||||
var d dataFactoryDatabase
|
||||
if err := rows.Scan(&d.ID, &d.Kind, &d.Label, &d.URI, &d.Description,
|
||||
&d.TagsCSV, &d.LastSeenAt, &d.TableCount, &d.SizeBytes,
|
||||
&d.CreatedAt, &d.UpdatedAt); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
out = append(out, d)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)})
|
||||
}
|
||||
+7
-7
@@ -45,7 +45,7 @@ func setupTestDB(t *testing.T) (*DBPool, string) {
|
||||
|
||||
func TestHealthEndpoint(t *testing.T) {
|
||||
pool := NewDBPool()
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -68,7 +68,7 @@ func TestDatabasesEndpoint(t *testing.T) {
|
||||
pool.Register(DBEntry{Alias: "registry", Path: "/fake/path", Kind: "registry"})
|
||||
pool.Register(DBEntry{Alias: "ops:myapp", Path: "/fake/path2", Kind: "operations"})
|
||||
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -90,7 +90,7 @@ func TestQueryEndpoint(t *testing.T) {
|
||||
pool, _ := setupTestDB(t)
|
||||
defer pool.Close()
|
||||
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -119,7 +119,7 @@ func TestQueryRejectsWrite(t *testing.T) {
|
||||
pool, _ := setupTestDB(t)
|
||||
defer pool.Close()
|
||||
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -146,7 +146,7 @@ func TestTablesEndpoint(t *testing.T) {
|
||||
pool, _ := setupTestDB(t)
|
||||
defer pool.Close()
|
||||
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -178,7 +178,7 @@ func TestSchemaEndpoint(t *testing.T) {
|
||||
pool, _ := setupTestDB(t)
|
||||
defer pool.Close()
|
||||
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
@@ -200,7 +200,7 @@ func TestSchemaEndpoint(t *testing.T) {
|
||||
|
||||
func TestNotFoundDB(t *testing.T) {
|
||||
pool := NewDBPool()
|
||||
srv := NewServer(pool, "")
|
||||
srv := NewServer(pool, "", "", "")
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
func main() {
|
||||
bind := flag.String("bind", "127.0.0.1:8484", "address to bind")
|
||||
dataFactoryDB := flag.String("data-factory-db", "", "path to data_factory.db (default: <root>/apps/data_factory/data_factory.db)")
|
||||
flag.Parse()
|
||||
|
||||
root := findRegistryRoot()
|
||||
@@ -24,7 +25,14 @@ func main() {
|
||||
log.Printf("registered database: %s (%s)", entry.Alias, entry.Path)
|
||||
}
|
||||
|
||||
srv := NewServer(pool, root)
|
||||
dfPath := *dataFactoryDB
|
||||
if dfPath == "" {
|
||||
dfPath = filepath.Join(root, "apps", "data_factory", "data_factory.db")
|
||||
}
|
||||
dfMigrations := filepath.Join(root, "apps", "data_factory", "migrations")
|
||||
log.Printf("data_factory db: %s (migrations: %s)", dfPath, dfMigrations)
|
||||
|
||||
srv := NewServer(pool, root, dfPath, dfMigrations)
|
||||
mux := http.NewServeMux()
|
||||
srv.Routes(mux)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user