diff --git a/datafactory_events.go b/datafactory_events.go new file mode 100644 index 0000000..1b26d80 --- /dev/null +++ b/datafactory_events.go @@ -0,0 +1,387 @@ +package main + +// WebSocket event hub for data_factory.db `runs` table (issue 0097). +// +// Mirrors CallMonitorHub in events.go but watches `runs` of data_factory.db. +// Lazy: ticker only runs while >=1 subscriber is connected. +// Snapshot at connect = last 50 runs + active nodes + watermark. +// Delta every wsTickInterval = rows with rowid > watermark. +// +// Reuses subscriber/wsMessage/clientCmd from events.go — those types are +// generic enough for any append-only event stream. + +import ( + "context" + "database/sql" + "log" + "net/http" + "sync" + "time" + + "nhooyr.io/websocket" + "nhooyr.io/websocket/wsjson" +) + +const dfSnapshotLimit = 50 + +// dfNode is the JSON shape served by the snapshot's `nodes` field. +type dfNode struct { + ID string `json:"id"` + Kind string `json:"kind"` + Name string `json:"name"` + FunctionID string `json:"function_id"` + Description string `json:"description"` + Enabled bool `json:"enabled"` + TagsCSV string `json:"tags_csv"` +} + +// dfRun is the JSON contract for snapshot.runs and delta.runs. +type dfRun struct { + Rowid int64 `json:"rowid"` + ID string `json:"id"` + NodeID string `json:"node_id"` + StartedAt string `json:"started_at"` + FinishedAt string `json:"finished_at"` + Status string `json:"status"` + RowsIn int64 `json:"rows_in"` + RowsOut int64 `json:"rows_out"` + KbIn int64 `json:"kb_in"` + KbOut int64 `json:"kb_out"` + DurationMS int64 `json:"duration_ms"` + Trigger string `json:"trigger"` + Error string `json:"error"` +} + +// dfMessage is what travels on the WS. Distinct from wsMessage to avoid +// schema collision with call_monitor's KPIs/rows. +type dfMessage struct { + Type string `json:"type"` // "snapshot" | "delta" | "ping" + Watermark int64 `json:"watermark,omitempty"` + Nodes []dfNode `json:"nodes,omitempty"` + Runs []dfRun `json:"runs,omitempty"` + ServerTime int64 `json:"server_time"` +} + +type dfSubscriber struct { + conn *websocket.Conn + ctx context.Context + cancel context.CancelFunc + out chan dfMessage + watermark int64 +} + +// DataFactoryHub gestiona broadcast WS sobre `runs` table de data_factory.db. +// +// Holds a direct *sql.DB pointer (lazy-opened RW by Server). NOT pulled from +// DBPool because data_factory.db is opened by datafactory.Open(), not by the +// pool's read-only discovery flow. +type DataFactoryHub struct { + getDB func() (*sql.DB, error) + + mu sync.Mutex + subscribers map[*dfSubscriber]struct{} + tickerStop chan struct{} + tickerOn bool + watermark int64 + lastEventAt time.Time +} + +func NewDataFactoryHub(getDB func() (*sql.DB, error)) *DataFactoryHub { + return &DataFactoryHub{ + getDB: getDB, + subscribers: make(map[*dfSubscriber]struct{}), + } +} + +func (h *DataFactoryHub) register(s *dfSubscriber) { + h.mu.Lock() + h.subscribers[s] = struct{}{} + shouldStart := !h.tickerOn + if shouldStart { + h.tickerStop = make(chan struct{}) + h.tickerOn = true + h.lastEventAt = time.Now() + } + h.mu.Unlock() + + if shouldStart { + go h.tickerLoop() + } +} + +func (h *DataFactoryHub) unregister(s *dfSubscriber) { + h.mu.Lock() + if _, ok := h.subscribers[s]; !ok { + h.mu.Unlock() + return + } + delete(h.subscribers, s) + close(s.out) + shouldStop := h.tickerOn && len(h.subscribers) == 0 + if shouldStop { + close(h.tickerStop) + h.tickerOn = false + } + h.mu.Unlock() +} + +func (h *DataFactoryHub) tickerLoop() { + interval := wsTickInterval + t := time.NewTimer(interval) + defer t.Stop() + for { + select { + case <-h.tickerStop: + return + case <-t.C: + rows, max, err := h.fetchSince(h.getWatermark()) + if err != nil { + log.Printf("[ws-df] fetchSince: %v", err) + } else if len(rows) > 0 { + h.setWatermark(max) + h.recordActivity() + h.broadcast(dfMessage{ + Type: "delta", + Watermark: max, + Runs: rows, + ServerTime: time.Now().Unix(), + }) + } + if time.Since(h.lastActivityAt()) > wsIdleThreshold { + interval = wsTickIntervalIdle + } else { + interval = wsTickInterval + } + t.Reset(interval) + } + } +} + +func (h *DataFactoryHub) getWatermark() int64 { + h.mu.Lock() + defer h.mu.Unlock() + return h.watermark +} + +func (h *DataFactoryHub) setWatermark(v int64) { + h.mu.Lock() + if v > h.watermark { + h.watermark = v + } + h.mu.Unlock() +} + +func (h *DataFactoryHub) recordActivity() { + h.mu.Lock() + h.lastEventAt = time.Now() + h.mu.Unlock() +} + +func (h *DataFactoryHub) lastActivityAt() time.Time { + h.mu.Lock() + defer h.mu.Unlock() + return h.lastEventAt +} + +func (h *DataFactoryHub) fetchSince(since int64) ([]dfRun, int64, error) { + db, err := h.getDB() + if err != nil { + return nil, since, err + } + rows, err := db.Query(` + SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status, + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error + FROM runs + WHERE rowid > ? + ORDER BY rowid ASC + LIMIT 500`, since) + if err != nil { + return nil, since, err + } + defer rows.Close() + + out := make([]dfRun, 0, 16) + max := since + for rows.Next() { + var r dfRun + if err := rows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt, + &r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut, + &r.DurationMS, &r.Trigger, &r.Error); err != nil { + return nil, since, err + } + out = append(out, r) + if r.Rowid > max { + max = r.Rowid + } + } + return out, max, rows.Err() +} + +func (h *DataFactoryHub) fetchSnapshot() ([]dfNode, []dfRun, int64, error) { + db, err := h.getDB() + if err != nil { + return nil, nil, 0, err + } + + // Active nodes (enabled=1). + nodeRows, err := db.Query(` + SELECT id, kind, name, function_id, description, enabled, tags_csv + FROM nodes + WHERE enabled = 1 + ORDER BY kind, name`) + if err != nil { + return nil, nil, 0, err + } + defer nodeRows.Close() + + nodes := make([]dfNode, 0, 16) + for nodeRows.Next() { + var n dfNode + var enabled int + if err := nodeRows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, + &n.Description, &enabled, &n.TagsCSV); err != nil { + return nil, nil, 0, err + } + n.Enabled = enabled != 0 + nodes = append(nodes, n) + } + if err := nodeRows.Err(); err != nil { + return nil, nil, 0, err + } + + // Last 50 runs (newest first). + runRows, err := db.Query(` + SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status, + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error + FROM runs + ORDER BY rowid DESC + LIMIT ?`, dfSnapshotLimit) + if err != nil { + return nodes, nil, 0, err + } + defer runRows.Close() + + runs := make([]dfRun, 0, dfSnapshotLimit) + max := int64(0) + for runRows.Next() { + var r dfRun + if err := runRows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt, + &r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut, + &r.DurationMS, &r.Trigger, &r.Error); err != nil { + return nodes, nil, 0, err + } + runs = append(runs, r) + if r.Rowid > max { + max = r.Rowid + } + } + return nodes, runs, max, runRows.Err() +} + +func (h *DataFactoryHub) broadcast(msg dfMessage) { + h.mu.Lock() + subs := make([]*dfSubscriber, 0, len(h.subscribers)) + for s := range h.subscribers { + subs = append(subs, s) + } + h.mu.Unlock() + + for _, s := range subs { + select { + case s.out <- msg: + default: + log.Printf("[ws-df] dropping frame for slow subscriber") + } + } +} + +// handleDataFactoryEvents upgradea HTTP a WS y mantiene la conexion. +// Endpoint: GET /api/ws/datafactory +func (s *Server) handleDataFactoryEvents(hub *DataFactoryHub) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, + }) + if err != nil { + log.Printf("[ws-df] accept: %v", err) + return + } + defer conn.Close(websocket.StatusInternalError, "closing") + + ctx, cancel := context.WithCancel(r.Context()) + defer cancel() + + sub := &dfSubscriber{ + conn: conn, + ctx: ctx, + cancel: cancel, + out: make(chan dfMessage, 64), + } + hub.register(sub) + defer hub.unregister(sub) + + nodes, runs, watermark, err := hub.fetchSnapshot() + if err != nil { + log.Printf("[ws-df] snapshot: %v", err) + conn.Close(websocket.StatusInternalError, "snapshot failed") + return + } + hub.setWatermark(watermark) + if err := wsjson.Write(ctx, conn, dfMessage{ + Type: "snapshot", + Watermark: watermark, + Nodes: nodes, + Runs: runs, + ServerTime: time.Now().Unix(), + }); err != nil { + return + } + + readErr := make(chan error, 1) + go func() { + for { + var cmd clientCmd + if err := wsjson.Read(ctx, conn, &cmd); err != nil { + readErr <- err + return + } + if cmd.Watermark > 0 { + rows, max, err := hub.fetchSince(cmd.Watermark) + if err == nil && len(rows) > 0 { + hub.setWatermark(max) + select { + case sub.out <- dfMessage{ + Type: "delta", + Watermark: max, + Runs: rows, + ServerTime: time.Now().Unix(), + }: + default: + } + } + } + } + }() + + for { + select { + case <-ctx.Done(): + return + case err := <-readErr: + if err != nil { + return + } + case msg, ok := <-sub.out: + if !ok { + return + } + wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout) + err := wsjson.Write(wctx, conn, msg) + wcancel() + if err != nil { + return + } + } + } + } +} diff --git a/handlers.go b/handlers.go index 1572e7d..4a22b1f 100644 --- a/handlers.go +++ b/handlers.go @@ -2,12 +2,15 @@ package main import ( "context" + "database/sql" "encoding/json" "fmt" "net/http" "strings" + "sync" "time" + "fn-registry/apps/data_factory/datafactory" "fn-registry/functions/infra" ) @@ -18,14 +21,48 @@ type Server struct { pool *DBPool registryRoot string // raiz del fn_registry (para exec fn ...) hub *CallMonitorHub + dfHub *DataFactoryHub + + // data_factory.db (RW) is opened lazily on first request and reused. + dfPath string + dfMigrationsDir string + dfMu sync.Mutex + dfDB *sql.DB + dfErr error // sticky if a previous open failed } -func NewServer(pool *DBPool, registryRoot string) *Server { - return &Server{ - pool: pool, - registryRoot: registryRoot, - hub: NewCallMonitorHub(pool), +func NewServer(pool *DBPool, registryRoot, dfPath, dfMigrationsDir string) *Server { + s := &Server{ + pool: pool, + registryRoot: registryRoot, + hub: NewCallMonitorHub(pool), + dfPath: dfPath, + dfMigrationsDir: dfMigrationsDir, } + s.dfHub = NewDataFactoryHub(s.dataFactoryDB) + return s +} + +// dataFactoryDB returns a RW connection to data_factory.db, lazy-opened on +// first call. Subsequent calls return the cached connection. If the open +// fails it is cached as a sticky error to avoid retry storms; retry after +// process restart. +func (s *Server) dataFactoryDB() (*sql.DB, error) { + s.dfMu.Lock() + defer s.dfMu.Unlock() + if s.dfDB != nil { + return s.dfDB, nil + } + if s.dfErr != nil { + return nil, s.dfErr + } + db, err := datafactory.Open(s.dfPath, s.dfMigrationsDir) + if err != nil { + s.dfErr = err + return nil, err + } + s.dfDB = db + return db, nil } // Routes registers all API routes on the given mux. @@ -50,6 +87,13 @@ func (s *Server) Routes(mux *http.ServeMux) { // Issue 0086: WebSocket live stream de ops:call_monitor (calls table). // Hub global con ticker bajo demanda (solo corre con >=1 subscriber). mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub)) + + // Issue 0097: data_factory.db (nodes, runs, databases). + // REST read-only + WS live stream sobre `runs`. + mux.HandleFunc("GET /api/datafactory/nodes", s.handleDataFactoryNodes) + mux.HandleFunc("GET /api/datafactory/runs", s.handleDataFactoryRuns) + mux.HandleFunc("GET /api/datafactory/databases", s.handleDataFactoryDatabases) + mux.HandleFunc("GET /api/ws/datafactory", s.handleDataFactoryEvents(s.dfHub)) } func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { @@ -62,9 +106,14 @@ func (s *Server) handleDatabases(w http.ResponseWriter, r *http.Request) { Alias string `json:"alias"` Kind string `json:"kind"` } - out := make([]dbInfo, len(entries)) - for i, e := range entries { - out[i] = dbInfo{Alias: e.Alias, Kind: e.Kind} + out := make([]dbInfo, 0, len(entries)+1) + for _, e := range entries { + out = append(out, dbInfo{Alias: e.Alias, Kind: e.Kind}) + } + // Surface data_factory.db too. It is opened RW outside the pool but + // belongs in the discovery list so dashboards can see it. + if s.dfPath != "" { + out = append(out, dbInfo{Alias: "data_factory", Kind: "data_factory"}) } writeJSON(w, http.StatusOK, out) } diff --git a/handlers_datafactory.go b/handlers_datafactory.go new file mode 100644 index 0000000..3d31719 --- /dev/null +++ b/handlers_datafactory.go @@ -0,0 +1,205 @@ +package main + +// REST handlers (read-only) for data_factory.db (issue 0097). +// +// Endpoints: +// GET /api/datafactory/nodes?kind= +// GET /api/datafactory/runs?node_id=&limit=N +// GET /api/datafactory/databases +// +// All endpoints lazy-open data_factory.db on first request (creating the +// file + applying migrations if missing). If the open fails, returns 503. +// POST trigger is intentionally NOT implemented — sqlite_api is read-only; +// run insertion is done out-of-band by DAG steps / function wrappers. + +import ( + "context" + "database/sql" + "net/http" + "strconv" +) + +// dataFactoryNode is the JSON row for /api/datafactory/nodes. +type dataFactoryNode struct { + ID string `json:"id"` + Kind string `json:"kind"` + Name string `json:"name"` + FunctionID string `json:"function_id"` + Description string `json:"description"` + ConfigJSON string `json:"config_json"` + ScheduleCron string `json:"schedule_cron"` + Enabled bool `json:"enabled"` + TagsCSV string `json:"tags_csv"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +// dataFactoryRun is the JSON row for /api/datafactory/runs. +type dataFactoryRun struct { + ID string `json:"id"` + NodeID string `json:"node_id"` + StartedAt string `json:"started_at"` + FinishedAt string `json:"finished_at"` + Status string `json:"status"` + RowsIn int64 `json:"rows_in"` + RowsOut int64 `json:"rows_out"` + KbIn int64 `json:"kb_in"` + KbOut int64 `json:"kb_out"` + DurationMS int64 `json:"duration_ms"` + Trigger string `json:"trigger"` + Error string `json:"error"` + Notes string `json:"notes"` +} + +// dataFactoryDatabase is the JSON row for /api/datafactory/databases. +type dataFactoryDatabase struct { + ID string `json:"id"` + Kind string `json:"kind"` + Label string `json:"label"` + URI string `json:"uri"` + Description string `json:"description"` + TagsCSV string `json:"tags_csv"` + LastSeenAt string `json:"last_seen_at"` + TableCount int64 `json:"table_count"` + SizeBytes int64 `json:"size_bytes"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +func (s *Server) handleDataFactoryNodes(w http.ResponseWriter, r *http.Request) { + db, err := s.dataFactoryDB() + if err != nil { + writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) + defer cancel() + + kind := r.URL.Query().Get("kind") + var rows *sql.Rows + if kind != "" { + rows, err = db.QueryContext(ctx, ` + SELECT id, kind, name, function_id, description, config_json, + schedule_cron, enabled, tags_csv, created_at, updated_at + FROM nodes + WHERE kind = ? + ORDER BY kind, name`, kind) + } else { + rows, err = db.QueryContext(ctx, ` + SELECT id, kind, name, function_id, description, config_json, + schedule_cron, enabled, tags_csv, created_at, updated_at + FROM nodes + ORDER BY kind, name`) + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + nodes := make([]dataFactoryNode, 0, 16) + for rows.Next() { + var n dataFactoryNode + var enabled int + if err := rows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description, + &n.ConfigJSON, &n.ScheduleCron, &enabled, &n.TagsCSV, + &n.CreatedAt, &n.UpdatedAt); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + n.Enabled = enabled != 0 + nodes = append(nodes, n) + } + writeJSON(w, http.StatusOK, map[string]any{"nodes": nodes, "count": len(nodes)}) +} + +func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) { + db, err := s.dataFactoryDB() + if err != nil { + writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) + defer cancel() + + nodeID := r.URL.Query().Get("node_id") + limit := 100 + if l := r.URL.Query().Get("limit"); l != "" { + if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 { + limit = n + } + } + + var rows *sql.Rows + if nodeID != "" { + rows, err = db.QueryContext(ctx, ` + SELECT id, node_id, started_at, COALESCE(finished_at,''), status, + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes + FROM runs + WHERE node_id = ? + ORDER BY started_at DESC + LIMIT ?`, nodeID, limit) + } else { + rows, err = db.QueryContext(ctx, ` + SELECT id, node_id, started_at, COALESCE(finished_at,''), status, + rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes + FROM runs + ORDER BY started_at DESC + LIMIT ?`, limit) + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + runs := make([]dataFactoryRun, 0, 16) + for rows.Next() { + var rr dataFactoryRun + if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt, + &rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut, + &rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + runs = append(runs, rr) + } + writeJSON(w, http.StatusOK, map[string]any{"runs": runs, "count": len(runs)}) +} + +func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Request) { + db, err := s.dataFactoryDB() + if err != nil { + writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error()) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), queryTimeout) + defer cancel() + + rows, err := db.QueryContext(ctx, ` + SELECT id, kind, label, uri, description, tags_csv, last_seen_at, + table_count, size_bytes, created_at, updated_at + FROM databases + ORDER BY kind, label`) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + out := make([]dataFactoryDatabase, 0, 16) + for rows.Next() { + var d dataFactoryDatabase + if err := rows.Scan(&d.ID, &d.Kind, &d.Label, &d.URI, &d.Description, + &d.TagsCSV, &d.LastSeenAt, &d.TableCount, &d.SizeBytes, + &d.CreatedAt, &d.UpdatedAt); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + out = append(out, d) + } + writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)}) +} diff --git a/handlers_test.go b/handlers_test.go index a9a1882..ee1b563 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -45,7 +45,7 @@ func setupTestDB(t *testing.T) (*DBPool, string) { func TestHealthEndpoint(t *testing.T) { pool := NewDBPool() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -68,7 +68,7 @@ func TestDatabasesEndpoint(t *testing.T) { pool.Register(DBEntry{Alias: "registry", Path: "/fake/path", Kind: "registry"}) pool.Register(DBEntry{Alias: "ops:myapp", Path: "/fake/path2", Kind: "operations"}) - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -90,7 +90,7 @@ func TestQueryEndpoint(t *testing.T) { pool, _ := setupTestDB(t) defer pool.Close() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -119,7 +119,7 @@ func TestQueryRejectsWrite(t *testing.T) { pool, _ := setupTestDB(t) defer pool.Close() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -146,7 +146,7 @@ func TestTablesEndpoint(t *testing.T) { pool, _ := setupTestDB(t) defer pool.Close() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -178,7 +178,7 @@ func TestSchemaEndpoint(t *testing.T) { pool, _ := setupTestDB(t) defer pool.Close() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) @@ -200,7 +200,7 @@ func TestSchemaEndpoint(t *testing.T) { func TestNotFoundDB(t *testing.T) { pool := NewDBPool() - srv := NewServer(pool, "") + srv := NewServer(pool, "", "", "") mux := http.NewServeMux() srv.Routes(mux) diff --git a/main.go b/main.go index e5ad5a6..797ca31 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( func main() { bind := flag.String("bind", "127.0.0.1:8484", "address to bind") + dataFactoryDB := flag.String("data-factory-db", "", "path to data_factory.db (default: /apps/data_factory/data_factory.db)") flag.Parse() root := findRegistryRoot() @@ -24,7 +25,14 @@ func main() { log.Printf("registered database: %s (%s)", entry.Alias, entry.Path) } - srv := NewServer(pool, root) + dfPath := *dataFactoryDB + if dfPath == "" { + dfPath = filepath.Join(root, "apps", "data_factory", "data_factory.db") + } + dfMigrations := filepath.Join(root, "apps", "data_factory", "migrations") + log.Printf("data_factory db: %s (migrations: %s)", dfPath, dfMigrations) + + srv := NewServer(pool, root, dfPath, dfMigrations) mux := http.NewServeMux() srv.Routes(mux)