Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 96915d31ab | |||
| 155a6db824 | |||
| 11c986edc7 | |||
| 8a71839520 |
@@ -2,6 +2,7 @@
|
|||||||
name: sqlite_api
|
name: sqlite_api
|
||||||
lang: go
|
lang: go
|
||||||
domain: infra
|
domain: infra
|
||||||
|
version: 0.1.0
|
||||||
description: "API REST HTTP read-only sobre registry.db y operations.db de cada app. Permite consultas SQL (solo SELECT/PRAGMA), busqueda FTS5, exploracion de tablas y schema. Bind por defecto a localhost:8484."
|
description: "API REST HTTP read-only sobre registry.db y operations.db de cada app. Permite consultas SQL (solo SELECT/PRAGMA), busqueda FTS5, exploracion de tablas y schema. Bind por defecto a localhost:8484."
|
||||||
tags: [service, api, sqlite, http, registry, fts5]
|
tags: [service, api, sqlite, http, registry, fts5]
|
||||||
uses_functions:
|
uses_functions:
|
||||||
@@ -10,6 +11,18 @@ uses_types: []
|
|||||||
framework: "net/http"
|
framework: "net/http"
|
||||||
entry_point: "main.go"
|
entry_point: "main.go"
|
||||||
dir_path: "projects/fn_monitoring/apps/sqlite_api"
|
dir_path: "projects/fn_monitoring/apps/sqlite_api"
|
||||||
|
service:
|
||||||
|
port: 8484
|
||||||
|
health_endpoint: /api/databases
|
||||||
|
health_timeout_s: 3
|
||||||
|
systemd_unit: sqlite_api.service
|
||||||
|
systemd_scope: user
|
||||||
|
restart_policy: always
|
||||||
|
runtime: systemd-user
|
||||||
|
pc_targets:
|
||||||
|
- aurgi-pc
|
||||||
|
- home-wsl
|
||||||
|
is_local_only: false
|
||||||
---
|
---
|
||||||
|
|
||||||
## Uso
|
## Uso
|
||||||
@@ -81,3 +94,13 @@ El servicio pasa de read-only puro a soportar mutaciones. Split de handlers:
|
|||||||
|
|
||||||
- Permisos/auth: ahora cualquier cliente local puede escribir. Para deploy a un VPS habria que anadir token header (similar a `X-Registry-Token` del `registry_api`). Por ahora bind 127.0.0.1 mitiga.
|
- Permisos/auth: ahora cualquier cliente local puede escribir. Para deploy a un VPS habria que anadir token header (similar a `X-Registry-Token` del `registry_api`). Por ahora bind 127.0.0.1 mitiga.
|
||||||
- `/api/add/function` y `/api/add/type` (kinds que hoy no se exponen via HTTP). El CLI ya lo soporta; el endpoint seria casi paralelo a `/api/add/app`.
|
- `/api/add/function` y `/api/add/type` (kinds que hoy no se exponen via HTTP). El CLI ya lo soporta; el endpoint seria casi paralelo a `/api/add/app`.
|
||||||
|
|
||||||
|
|
||||||
|
## Capability growth log
|
||||||
|
|
||||||
|
Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`:
|
||||||
|
- `major`: breaking observable (CLI args, schema BBDD propia, formato wire).
|
||||||
|
- `minor`: feature aditiva (nuevo panel, endpoint, opcion).
|
||||||
|
- `patch`: bugfix sin cambio observable.
|
||||||
|
|
||||||
|
- v0.1.0 (2026-05-18) — baseline.
|
||||||
|
|||||||
@@ -0,0 +1,387 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// WebSocket event hub for data_factory.db `runs` table (issue 0097).
|
||||||
|
//
|
||||||
|
// Mirrors CallMonitorHub in events.go but watches `runs` of data_factory.db.
|
||||||
|
// Lazy: ticker only runs while >=1 subscriber is connected.
|
||||||
|
// Snapshot at connect = last 50 runs + active nodes + watermark.
|
||||||
|
// Delta every wsTickInterval = rows with rowid > watermark.
|
||||||
|
//
|
||||||
|
// Reuses subscriber/wsMessage/clientCmd from events.go — those types are
|
||||||
|
// generic enough for any append-only event stream.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"nhooyr.io/websocket"
|
||||||
|
"nhooyr.io/websocket/wsjson"
|
||||||
|
)
|
||||||
|
|
||||||
|
const dfSnapshotLimit = 50
|
||||||
|
|
||||||
|
// dfNode is the JSON shape served by the snapshot's `nodes` field.
|
||||||
|
type dfNode struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
FunctionID string `json:"function_id"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
TagsCSV string `json:"tags_csv"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// dfRun is the JSON contract for snapshot.runs and delta.runs.
|
||||||
|
type dfRun struct {
|
||||||
|
Rowid int64 `json:"rowid"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
StartedAt string `json:"started_at"`
|
||||||
|
FinishedAt string `json:"finished_at"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
RowsIn int64 `json:"rows_in"`
|
||||||
|
RowsOut int64 `json:"rows_out"`
|
||||||
|
KbIn int64 `json:"kb_in"`
|
||||||
|
KbOut int64 `json:"kb_out"`
|
||||||
|
DurationMS int64 `json:"duration_ms"`
|
||||||
|
Trigger string `json:"trigger"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// dfMessage is what travels on the WS. Distinct from wsMessage to avoid
|
||||||
|
// schema collision with call_monitor's KPIs/rows.
|
||||||
|
type dfMessage struct {
|
||||||
|
Type string `json:"type"` // "snapshot" | "delta" | "ping"
|
||||||
|
Watermark int64 `json:"watermark,omitempty"`
|
||||||
|
Nodes []dfNode `json:"nodes,omitempty"`
|
||||||
|
Runs []dfRun `json:"runs,omitempty"`
|
||||||
|
ServerTime int64 `json:"server_time"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type dfSubscriber struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
out chan dfMessage
|
||||||
|
watermark int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// DataFactoryHub gestiona broadcast WS sobre `runs` table de data_factory.db.
|
||||||
|
//
|
||||||
|
// Holds a direct *sql.DB pointer (lazy-opened RW by Server). NOT pulled from
|
||||||
|
// DBPool because data_factory.db is opened by datafactory.Open(), not by the
|
||||||
|
// pool's read-only discovery flow.
|
||||||
|
type DataFactoryHub struct {
|
||||||
|
getDB func() (*sql.DB, error)
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
subscribers map[*dfSubscriber]struct{}
|
||||||
|
tickerStop chan struct{}
|
||||||
|
tickerOn bool
|
||||||
|
watermark int64
|
||||||
|
lastEventAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDataFactoryHub(getDB func() (*sql.DB, error)) *DataFactoryHub {
|
||||||
|
return &DataFactoryHub{
|
||||||
|
getDB: getDB,
|
||||||
|
subscribers: make(map[*dfSubscriber]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) register(s *dfSubscriber) {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.subscribers[s] = struct{}{}
|
||||||
|
shouldStart := !h.tickerOn
|
||||||
|
if shouldStart {
|
||||||
|
h.tickerStop = make(chan struct{})
|
||||||
|
h.tickerOn = true
|
||||||
|
h.lastEventAt = time.Now()
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
if shouldStart {
|
||||||
|
go h.tickerLoop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) unregister(s *dfSubscriber) {
|
||||||
|
h.mu.Lock()
|
||||||
|
if _, ok := h.subscribers[s]; !ok {
|
||||||
|
h.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(h.subscribers, s)
|
||||||
|
close(s.out)
|
||||||
|
shouldStop := h.tickerOn && len(h.subscribers) == 0
|
||||||
|
if shouldStop {
|
||||||
|
close(h.tickerStop)
|
||||||
|
h.tickerOn = false
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) tickerLoop() {
|
||||||
|
interval := wsTickInterval
|
||||||
|
t := time.NewTimer(interval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-h.tickerStop:
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
rows, max, err := h.fetchSince(h.getWatermark())
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ws-df] fetchSince: %v", err)
|
||||||
|
} else if len(rows) > 0 {
|
||||||
|
h.setWatermark(max)
|
||||||
|
h.recordActivity()
|
||||||
|
h.broadcast(dfMessage{
|
||||||
|
Type: "delta",
|
||||||
|
Watermark: max,
|
||||||
|
Runs: rows,
|
||||||
|
ServerTime: time.Now().Unix(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if time.Since(h.lastActivityAt()) > wsIdleThreshold {
|
||||||
|
interval = wsTickIntervalIdle
|
||||||
|
} else {
|
||||||
|
interval = wsTickInterval
|
||||||
|
}
|
||||||
|
t.Reset(interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) getWatermark() int64 {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.watermark
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) setWatermark(v int64) {
|
||||||
|
h.mu.Lock()
|
||||||
|
if v > h.watermark {
|
||||||
|
h.watermark = v
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) recordActivity() {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.lastEventAt = time.Now()
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) lastActivityAt() time.Time {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.lastEventAt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) fetchSince(since int64) ([]dfRun, int64, error) {
|
||||||
|
db, err := h.getDB()
|
||||||
|
if err != nil {
|
||||||
|
return nil, since, err
|
||||||
|
}
|
||||||
|
rows, err := db.Query(`
|
||||||
|
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||||
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
||||||
|
FROM runs
|
||||||
|
WHERE rowid > ?
|
||||||
|
ORDER BY rowid ASC
|
||||||
|
LIMIT 500`, since)
|
||||||
|
if err != nil {
|
||||||
|
return nil, since, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
out := make([]dfRun, 0, 16)
|
||||||
|
max := since
|
||||||
|
for rows.Next() {
|
||||||
|
var r dfRun
|
||||||
|
if err := rows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
||||||
|
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
||||||
|
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
||||||
|
return nil, since, err
|
||||||
|
}
|
||||||
|
out = append(out, r)
|
||||||
|
if r.Rowid > max {
|
||||||
|
max = r.Rowid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out, max, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) fetchSnapshot() ([]dfNode, []dfRun, int64, error) {
|
||||||
|
db, err := h.getDB()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Active nodes (enabled=1).
|
||||||
|
nodeRows, err := db.Query(`
|
||||||
|
SELECT id, kind, name, function_id, description, enabled, tags_csv
|
||||||
|
FROM nodes
|
||||||
|
WHERE enabled = 1
|
||||||
|
ORDER BY kind, name`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, err
|
||||||
|
}
|
||||||
|
defer nodeRows.Close()
|
||||||
|
|
||||||
|
nodes := make([]dfNode, 0, 16)
|
||||||
|
for nodeRows.Next() {
|
||||||
|
var n dfNode
|
||||||
|
var enabled int
|
||||||
|
if err := nodeRows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID,
|
||||||
|
&n.Description, &enabled, &n.TagsCSV); err != nil {
|
||||||
|
return nil, nil, 0, err
|
||||||
|
}
|
||||||
|
n.Enabled = enabled != 0
|
||||||
|
nodes = append(nodes, n)
|
||||||
|
}
|
||||||
|
if err := nodeRows.Err(); err != nil {
|
||||||
|
return nil, nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last 50 runs (newest first).
|
||||||
|
runRows, err := db.Query(`
|
||||||
|
SELECT rowid, id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||||
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error
|
||||||
|
FROM runs
|
||||||
|
ORDER BY rowid DESC
|
||||||
|
LIMIT ?`, dfSnapshotLimit)
|
||||||
|
if err != nil {
|
||||||
|
return nodes, nil, 0, err
|
||||||
|
}
|
||||||
|
defer runRows.Close()
|
||||||
|
|
||||||
|
runs := make([]dfRun, 0, dfSnapshotLimit)
|
||||||
|
max := int64(0)
|
||||||
|
for runRows.Next() {
|
||||||
|
var r dfRun
|
||||||
|
if err := runRows.Scan(&r.Rowid, &r.ID, &r.NodeID, &r.StartedAt, &r.FinishedAt,
|
||||||
|
&r.Status, &r.RowsIn, &r.RowsOut, &r.KbIn, &r.KbOut,
|
||||||
|
&r.DurationMS, &r.Trigger, &r.Error); err != nil {
|
||||||
|
return nodes, nil, 0, err
|
||||||
|
}
|
||||||
|
runs = append(runs, r)
|
||||||
|
if r.Rowid > max {
|
||||||
|
max = r.Rowid
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nodes, runs, max, runRows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *DataFactoryHub) broadcast(msg dfMessage) {
|
||||||
|
h.mu.Lock()
|
||||||
|
subs := make([]*dfSubscriber, 0, len(h.subscribers))
|
||||||
|
for s := range h.subscribers {
|
||||||
|
subs = append(subs, s)
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
|
||||||
|
for _, s := range subs {
|
||||||
|
select {
|
||||||
|
case s.out <- msg:
|
||||||
|
default:
|
||||||
|
log.Printf("[ws-df] dropping frame for slow subscriber")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDataFactoryEvents upgradea HTTP a WS y mantiene la conexion.
|
||||||
|
// Endpoint: GET /api/ws/datafactory
|
||||||
|
func (s *Server) handleDataFactoryEvents(hub *DataFactoryHub) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ws-df] accept: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close(websocket.StatusInternalError, "closing")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(r.Context())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
sub := &dfSubscriber{
|
||||||
|
conn: conn,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
out: make(chan dfMessage, 64),
|
||||||
|
}
|
||||||
|
hub.register(sub)
|
||||||
|
defer hub.unregister(sub)
|
||||||
|
|
||||||
|
nodes, runs, watermark, err := hub.fetchSnapshot()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[ws-df] snapshot: %v", err)
|
||||||
|
conn.Close(websocket.StatusInternalError, "snapshot failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
hub.setWatermark(watermark)
|
||||||
|
if err := wsjson.Write(ctx, conn, dfMessage{
|
||||||
|
Type: "snapshot",
|
||||||
|
Watermark: watermark,
|
||||||
|
Nodes: nodes,
|
||||||
|
Runs: runs,
|
||||||
|
ServerTime: time.Now().Unix(),
|
||||||
|
}); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
readErr := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
var cmd clientCmd
|
||||||
|
if err := wsjson.Read(ctx, conn, &cmd); err != nil {
|
||||||
|
readErr <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cmd.Watermark > 0 {
|
||||||
|
rows, max, err := hub.fetchSince(cmd.Watermark)
|
||||||
|
if err == nil && len(rows) > 0 {
|
||||||
|
hub.setWatermark(max)
|
||||||
|
select {
|
||||||
|
case sub.out <- dfMessage{
|
||||||
|
Type: "delta",
|
||||||
|
Watermark: max,
|
||||||
|
Runs: rows,
|
||||||
|
ServerTime: time.Now().Unix(),
|
||||||
|
}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case err := <-readErr:
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case msg, ok := <-sub.out:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
wctx, wcancel := context.WithTimeout(ctx, wsBroadcastTimeout)
|
||||||
|
err := wsjson.Write(wctx, conn, msg)
|
||||||
|
wcancel()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,92 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type tableInfo struct {
|
||||||
|
Name string
|
||||||
|
RowCount int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// listTablesForDatabase returns the list of tables for a given DB file.
|
||||||
|
// Supports kind="sqlite" (uses database/sql + sqlite3 driver) and
|
||||||
|
// kind="duckdb" (uses python venv subprocess since go-duckdb is not linked
|
||||||
|
// in sqlite_api — it lives in the root module, not here).
|
||||||
|
func listTablesForDatabase(kind, path string) ([]tableInfo, error) {
|
||||||
|
switch kind {
|
||||||
|
case "sqlite":
|
||||||
|
return listSQLiteTables(path)
|
||||||
|
case "duckdb":
|
||||||
|
return listDuckDBTablesViaCLI(path)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported kind: %s", kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func listSQLiteTables(path string) ([]tableInfo, error) {
|
||||||
|
db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
rows, err := db.Query(`
|
||||||
|
SELECT name FROM sqlite_master
|
||||||
|
WHERE type='table'
|
||||||
|
AND name NOT LIKE 'sqlite_%'
|
||||||
|
AND name != '_migrations'
|
||||||
|
ORDER BY name`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var out []tableInfo
|
||||||
|
for rows.Next() {
|
||||||
|
var name string
|
||||||
|
if err := rows.Scan(&name); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var cnt int64
|
||||||
|
row := db.QueryRow(`SELECT count(*) FROM "` + name + `"`)
|
||||||
|
_ = row.Scan(&cnt)
|
||||||
|
out = append(out, tableInfo{Name: name, RowCount: cnt})
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func listDuckDBTablesViaCLI(path string) ([]tableInfo, error) {
|
||||||
|
// Use the python venv's duckdb module to avoid linking go-duckdb.
|
||||||
|
pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3"
|
||||||
|
script := fmt.Sprintf(
|
||||||
|
`import duckdb,sys
|
||||||
|
c=duckdb.connect(%q, read_only=True)
|
||||||
|
for (name,) in c.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='main' ORDER BY table_name").fetchall():
|
||||||
|
cnt=c.execute(f'SELECT count(*) FROM "{name}"').fetchone()[0]
|
||||||
|
print(f"{name}\t{cnt}")
|
||||||
|
`, path)
|
||||||
|
cmd := exec.Command(pyBin, "-c", script)
|
||||||
|
out, err := cmd.CombinedOutput()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("duckdb via python: %w (output: %s)", err, string(out))
|
||||||
|
}
|
||||||
|
|
||||||
|
var result []tableInfo
|
||||||
|
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
|
||||||
|
if line == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parts := strings.SplitN(line, "\t", 2)
|
||||||
|
if len(parts) != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cnt, _ := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64)
|
||||||
|
result = append(result, tableInfo{Name: parts[0], RowCount: cnt})
|
||||||
|
}
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
+60
-8
@@ -2,12 +2,15 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"fn-registry/apps/data_factory/datafactory"
|
||||||
"fn-registry/functions/infra"
|
"fn-registry/functions/infra"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,14 +21,48 @@ type Server struct {
|
|||||||
pool *DBPool
|
pool *DBPool
|
||||||
registryRoot string // raiz del fn_registry (para exec fn ...)
|
registryRoot string // raiz del fn_registry (para exec fn ...)
|
||||||
hub *CallMonitorHub
|
hub *CallMonitorHub
|
||||||
|
dfHub *DataFactoryHub
|
||||||
|
|
||||||
|
// data_factory.db (RW) is opened lazily on first request and reused.
|
||||||
|
dfPath string
|
||||||
|
dfMigrationsDir string
|
||||||
|
dfMu sync.Mutex
|
||||||
|
dfDB *sql.DB
|
||||||
|
dfErr error // sticky if a previous open failed
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(pool *DBPool, registryRoot string) *Server {
|
func NewServer(pool *DBPool, registryRoot, dfPath, dfMigrationsDir string) *Server {
|
||||||
return &Server{
|
s := &Server{
|
||||||
pool: pool,
|
pool: pool,
|
||||||
registryRoot: registryRoot,
|
registryRoot: registryRoot,
|
||||||
hub: NewCallMonitorHub(pool),
|
hub: NewCallMonitorHub(pool),
|
||||||
|
dfPath: dfPath,
|
||||||
|
dfMigrationsDir: dfMigrationsDir,
|
||||||
}
|
}
|
||||||
|
s.dfHub = NewDataFactoryHub(s.dataFactoryDB)
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// dataFactoryDB returns a RW connection to data_factory.db, lazy-opened on
|
||||||
|
// first call. Subsequent calls return the cached connection. If the open
|
||||||
|
// fails it is cached as a sticky error to avoid retry storms; retry after
|
||||||
|
// process restart.
|
||||||
|
func (s *Server) dataFactoryDB() (*sql.DB, error) {
|
||||||
|
s.dfMu.Lock()
|
||||||
|
defer s.dfMu.Unlock()
|
||||||
|
if s.dfDB != nil {
|
||||||
|
return s.dfDB, nil
|
||||||
|
}
|
||||||
|
if s.dfErr != nil {
|
||||||
|
return nil, s.dfErr
|
||||||
|
}
|
||||||
|
db, err := datafactory.Open(s.dfPath, s.dfMigrationsDir)
|
||||||
|
if err != nil {
|
||||||
|
s.dfErr = err
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
s.dfDB = db
|
||||||
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Routes registers all API routes on the given mux.
|
// Routes registers all API routes on the given mux.
|
||||||
@@ -50,6 +87,16 @@ func (s *Server) Routes(mux *http.ServeMux) {
|
|||||||
// Issue 0086: WebSocket live stream de ops:call_monitor (calls table).
|
// Issue 0086: WebSocket live stream de ops:call_monitor (calls table).
|
||||||
// Hub global con ticker bajo demanda (solo corre con >=1 subscriber).
|
// Hub global con ticker bajo demanda (solo corre con >=1 subscriber).
|
||||||
mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub))
|
mux.HandleFunc("GET /api/events/call_monitor", s.handleEvents(s.hub))
|
||||||
|
|
||||||
|
// Issue 0097: data_factory.db (nodes, runs, databases).
|
||||||
|
// REST read-only + WS live stream sobre `runs`.
|
||||||
|
mux.HandleFunc("GET /api/datafactory/nodes", s.handleDataFactoryNodes)
|
||||||
|
mux.HandleFunc("GET /api/datafactory/runs", s.handleDataFactoryRuns)
|
||||||
|
mux.HandleFunc("GET /api/datafactory/databases", s.handleDataFactoryDatabases)
|
||||||
|
mux.HandleFunc("GET /api/datafactory/tables", s.handleDataFactoryTables)
|
||||||
|
mux.HandleFunc("GET /api/datafactory/preview", s.handleDataFactoryPreview)
|
||||||
|
mux.HandleFunc("GET /api/functions/{id}", s.handleFunctionByID)
|
||||||
|
mux.HandleFunc("GET /api/ws/datafactory", s.handleDataFactoryEvents(s.dfHub))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -62,9 +109,14 @@ func (s *Server) handleDatabases(w http.ResponseWriter, r *http.Request) {
|
|||||||
Alias string `json:"alias"`
|
Alias string `json:"alias"`
|
||||||
Kind string `json:"kind"`
|
Kind string `json:"kind"`
|
||||||
}
|
}
|
||||||
out := make([]dbInfo, len(entries))
|
out := make([]dbInfo, 0, len(entries)+1)
|
||||||
for i, e := range entries {
|
for _, e := range entries {
|
||||||
out[i] = dbInfo{Alias: e.Alias, Kind: e.Kind}
|
out = append(out, dbInfo{Alias: e.Alias, Kind: e.Kind})
|
||||||
|
}
|
||||||
|
// Surface data_factory.db too. It is opened RW outside the pool but
|
||||||
|
// belongs in the discovery list so dashboards can see it.
|
||||||
|
if s.dfPath != "" {
|
||||||
|
out = append(out, dbInfo{Alias: "data_factory", Kind: "data_factory"})
|
||||||
}
|
}
|
||||||
writeJSON(w, http.StatusOK, out)
|
writeJSON(w, http.StatusOK, out)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,520 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
// REST handlers (read-only) for data_factory.db (issue 0097).
|
||||||
|
//
|
||||||
|
// Endpoints:
|
||||||
|
// GET /api/datafactory/nodes?kind=<kind>
|
||||||
|
// GET /api/datafactory/runs?node_id=<id>&limit=N
|
||||||
|
// GET /api/datafactory/databases
|
||||||
|
//
|
||||||
|
// All endpoints lazy-open data_factory.db on first request (creating the
|
||||||
|
// file + applying migrations if missing). If the open fails, returns 503.
|
||||||
|
// POST trigger is intentionally NOT implemented — sqlite_api is read-only;
|
||||||
|
// run insertion is done out-of-band by DAG steps / function wrappers.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
)
|
||||||
|
|
||||||
|
// dataFactoryNode is the JSON row for /api/datafactory/nodes.
|
||||||
|
type dataFactoryNode struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
FunctionID string `json:"function_id"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
ConfigJSON string `json:"config_json"`
|
||||||
|
ScheduleCron string `json:"schedule_cron"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
TagsCSV string `json:"tags_csv"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
UpdatedAt string `json:"updated_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// dataFactoryRun is the JSON row for /api/datafactory/runs.
|
||||||
|
type dataFactoryRun struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
StartedAt string `json:"started_at"`
|
||||||
|
FinishedAt string `json:"finished_at"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
RowsIn int64 `json:"rows_in"`
|
||||||
|
RowsOut int64 `json:"rows_out"`
|
||||||
|
KbIn int64 `json:"kb_in"`
|
||||||
|
KbOut int64 `json:"kb_out"`
|
||||||
|
DurationMS int64 `json:"duration_ms"`
|
||||||
|
Trigger string `json:"trigger"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
Notes string `json:"notes"`
|
||||||
|
StorageDBID string `json:"storage_db_id"`
|
||||||
|
StorageTable string `json:"storage_table"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// dataFactoryDatabase is the JSON row for /api/datafactory/databases.
|
||||||
|
type dataFactoryDatabase struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Kind string `json:"kind"`
|
||||||
|
Label string `json:"label"`
|
||||||
|
URI string `json:"uri"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
TagsCSV string `json:"tags_csv"`
|
||||||
|
LastSeenAt string `json:"last_seen_at"`
|
||||||
|
TableCount int64 `json:"table_count"`
|
||||||
|
SizeBytes int64 `json:"size_bytes"`
|
||||||
|
CreatedAt string `json:"created_at"`
|
||||||
|
UpdatedAt string `json:"updated_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleDataFactoryNodes(w http.ResponseWriter, r *http.Request) {
|
||||||
|
db, err := s.dataFactoryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
kind := r.URL.Query().Get("kind")
|
||||||
|
var rows *sql.Rows
|
||||||
|
if kind != "" {
|
||||||
|
rows, err = db.QueryContext(ctx, `
|
||||||
|
SELECT id, kind, name, function_id, description, config_json,
|
||||||
|
schedule_cron, enabled, tags_csv, created_at, updated_at
|
||||||
|
FROM nodes
|
||||||
|
WHERE kind = ?
|
||||||
|
ORDER BY kind, name`, kind)
|
||||||
|
} else {
|
||||||
|
rows, err = db.QueryContext(ctx, `
|
||||||
|
SELECT id, kind, name, function_id, description, config_json,
|
||||||
|
schedule_cron, enabled, tags_csv, created_at, updated_at
|
||||||
|
FROM nodes
|
||||||
|
ORDER BY kind, name`)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
nodes := make([]dataFactoryNode, 0, 16)
|
||||||
|
for rows.Next() {
|
||||||
|
var n dataFactoryNode
|
||||||
|
var enabled int
|
||||||
|
if err := rows.Scan(&n.ID, &n.Kind, &n.Name, &n.FunctionID, &n.Description,
|
||||||
|
&n.ConfigJSON, &n.ScheduleCron, &enabled, &n.TagsCSV,
|
||||||
|
&n.CreatedAt, &n.UpdatedAt); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.Enabled = enabled != 0
|
||||||
|
nodes = append(nodes, n)
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"nodes": nodes, "count": len(nodes)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
|
||||||
|
db, err := s.dataFactoryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
nodeID := r.URL.Query().Get("node_id")
|
||||||
|
limit := 100
|
||||||
|
if l := r.URL.Query().Get("limit"); l != "" {
|
||||||
|
if n, err := strconv.Atoi(l); err == nil && n > 0 && n <= 1000 {
|
||||||
|
limit = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var rows *sql.Rows
|
||||||
|
if nodeID != "" {
|
||||||
|
rows, err = db.QueryContext(ctx, `
|
||||||
|
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||||
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
|
||||||
|
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
|
||||||
|
FROM runs
|
||||||
|
WHERE node_id = ?
|
||||||
|
ORDER BY started_at DESC
|
||||||
|
LIMIT ?`, nodeID, limit)
|
||||||
|
} else {
|
||||||
|
rows, err = db.QueryContext(ctx, `
|
||||||
|
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
|
||||||
|
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
|
||||||
|
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
|
||||||
|
FROM runs
|
||||||
|
ORDER BY started_at DESC
|
||||||
|
LIMIT ?`, limit)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
runs := make([]dataFactoryRun, 0, 16)
|
||||||
|
for rows.Next() {
|
||||||
|
var rr dataFactoryRun
|
||||||
|
if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt,
|
||||||
|
&rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut,
|
||||||
|
&rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes,
|
||||||
|
&rr.StorageDBID, &rr.StorageTable); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
runs = append(runs, rr)
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"runs": runs, "count": len(runs)})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Request) {
|
||||||
|
db, err := s.dataFactoryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rows, err := db.QueryContext(ctx, `
|
||||||
|
SELECT id, kind, label, uri, description, tags_csv, last_seen_at,
|
||||||
|
table_count, size_bytes, created_at, updated_at
|
||||||
|
FROM databases
|
||||||
|
ORDER BY kind, label`)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
out := make([]dataFactoryDatabase, 0, 16)
|
||||||
|
for rows.Next() {
|
||||||
|
var d dataFactoryDatabase
|
||||||
|
if err := rows.Scan(&d.ID, &d.Kind, &d.Label, &d.URI, &d.Description,
|
||||||
|
&d.TagsCSV, &d.LastSeenAt, &d.TableCount, &d.SizeBytes,
|
||||||
|
&d.CreatedAt, &d.UpdatedAt); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out = append(out, d)
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// dataFactoryTable is the JSON row for /api/datafactory/tables.
|
||||||
|
type dataFactoryTable struct {
|
||||||
|
DatabaseID string `json:"database_id"`
|
||||||
|
DatabaseLabel string `json:"database_label"`
|
||||||
|
DatabaseKind string `json:"database_kind"`
|
||||||
|
TableName string `json:"table_name"`
|
||||||
|
RowCount int64 `json:"row_count"`
|
||||||
|
Error string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// tableNameRE restricts table names to alphanumeric + underscore + dot.
|
||||||
|
// This prevents SQL injection in the preview queries.
|
||||||
|
var tableNameRE = regexp.MustCompile(`^[A-Za-z0-9_.]+$`)
|
||||||
|
|
||||||
|
// previewColumn is a JSON column descriptor for the preview response.
|
||||||
|
type previewColumn struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// previewResponse is the JSON shape of /api/datafactory/preview.
|
||||||
|
type previewResponse struct {
|
||||||
|
DatabaseID string `json:"database_id"`
|
||||||
|
TableName string `json:"table_name"`
|
||||||
|
Columns []previewColumn `json:"columns"`
|
||||||
|
Rows [][]string `json:"rows"`
|
||||||
|
TotalRows int64 `json:"total_rows"`
|
||||||
|
Limit int `json:"limit"`
|
||||||
|
Offset int `json:"offset"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleDataFactoryPreview serves:
|
||||||
|
//
|
||||||
|
// GET /api/datafactory/preview?database_id=<id>&table=<name>&limit=N&offset=N
|
||||||
|
//
|
||||||
|
// Supports kind="sqlite" and kind="duckdb" databases registered in
|
||||||
|
// data_factory.databases. Returns up to 1000 rows (default 100).
|
||||||
|
func (s *Server) handleDataFactoryPreview(w http.ResponseWriter, r *http.Request) {
|
||||||
|
q := r.URL.Query()
|
||||||
|
|
||||||
|
databaseID := q.Get("database_id")
|
||||||
|
tableName := q.Get("table")
|
||||||
|
if databaseID == "" || tableName == "" {
|
||||||
|
writeError(w, http.StatusBadRequest, "database_id and table are required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !tableNameRE.MatchString(tableName) {
|
||||||
|
writeError(w, http.StatusBadRequest,
|
||||||
|
fmt.Sprintf("invalid table name %q: only [A-Za-z0-9_.] allowed", tableName))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
limit := 100
|
||||||
|
if l := q.Get("limit"); l != "" {
|
||||||
|
if n, err := strconv.Atoi(l); err == nil {
|
||||||
|
switch {
|
||||||
|
case n < 1:
|
||||||
|
limit = 1
|
||||||
|
case n > 1000:
|
||||||
|
limit = 1000
|
||||||
|
default:
|
||||||
|
limit = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
offset := 0
|
||||||
|
if o := q.Get("offset"); o != "" {
|
||||||
|
if n, err := strconv.Atoi(o); err == nil && n >= 0 {
|
||||||
|
offset = n
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up the database in data_factory.db.
|
||||||
|
dfDB, err := s.dataFactoryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 30*queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var dbKind, dbURI string
|
||||||
|
row := dfDB.QueryRowContext(ctx, `SELECT kind, uri FROM databases WHERE id = ?`, databaseID)
|
||||||
|
if err := row.Scan(&dbKind, &dbURI); err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
writeError(w, http.StatusNotFound, fmt.Sprintf("database_id %q not found", databaseID))
|
||||||
|
} else {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve absolute path.
|
||||||
|
root := os.Getenv("FN_REGISTRY_ROOT")
|
||||||
|
if root == "" {
|
||||||
|
root = "/home/lucas/fn_registry"
|
||||||
|
}
|
||||||
|
dbPath := dbURI
|
||||||
|
if !filepath.IsAbs(dbPath) {
|
||||||
|
dbPath = filepath.Join(root, dbURI)
|
||||||
|
}
|
||||||
|
|
||||||
|
var resp previewResponse
|
||||||
|
resp.DatabaseID = databaseID
|
||||||
|
resp.TableName = tableName
|
||||||
|
resp.Limit = limit
|
||||||
|
resp.Offset = offset
|
||||||
|
|
||||||
|
switch dbKind {
|
||||||
|
case "sqlite":
|
||||||
|
if err := previewSQLite(dbPath, tableName, limit, offset, &resp); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, "sqlite preview: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case "duckdb":
|
||||||
|
if err := previewDuckDB(dbPath, tableName, limit, offset, &resp); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, "duckdb preview: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported database kind: %q", dbKind))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
writeJSON(w, http.StatusOK, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// previewSQLite fetches schema + rows from a SQLite database.
|
||||||
|
func previewSQLite(path, table string, limit, offset int, out *previewResponse) error {
|
||||||
|
db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Schema via PRAGMA table_info.
|
||||||
|
rows, err := db.Query(fmt.Sprintf(`PRAGMA table_info("%s")`, table))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("PRAGMA table_info: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
for rows.Next() {
|
||||||
|
var cid int
|
||||||
|
var name, typ, dflt string
|
||||||
|
var notNull, pk int
|
||||||
|
if err := rows.Scan(&cid, &name, &typ, ¬Null, &dflt, &pk); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
out.Columns = append(out.Columns, previewColumn{Name: name, Type: typ})
|
||||||
|
}
|
||||||
|
_ = rows.Close()
|
||||||
|
if len(out.Columns) == 0 {
|
||||||
|
return fmt.Errorf("table %q not found or has no columns", table)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Total row count.
|
||||||
|
cntRow := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM "%s"`, table))
|
||||||
|
_ = cntRow.Scan(&out.TotalRows)
|
||||||
|
|
||||||
|
// Data rows.
|
||||||
|
dataRows, err := db.Query(
|
||||||
|
fmt.Sprintf(`SELECT * FROM "%s" LIMIT ? OFFSET ?`, table),
|
||||||
|
limit, offset,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("SELECT: %w", err)
|
||||||
|
}
|
||||||
|
defer dataRows.Close()
|
||||||
|
|
||||||
|
nCols := len(out.Columns)
|
||||||
|
for dataRows.Next() {
|
||||||
|
vals := make([]interface{}, nCols)
|
||||||
|
ptrs := make([]interface{}, nCols)
|
||||||
|
for i := range vals {
|
||||||
|
ptrs[i] = &vals[i]
|
||||||
|
}
|
||||||
|
if err := dataRows.Scan(ptrs...); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
row := make([]string, nCols)
|
||||||
|
for i, v := range vals {
|
||||||
|
switch t := v.(type) {
|
||||||
|
case nil:
|
||||||
|
row[i] = ""
|
||||||
|
case []byte:
|
||||||
|
row[i] = string(t)
|
||||||
|
default:
|
||||||
|
row[i] = fmt.Sprintf("%v", t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out.Rows = append(out.Rows, row)
|
||||||
|
}
|
||||||
|
if out.Rows == nil {
|
||||||
|
out.Rows = [][]string{}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// duckPreviewResult is used to unmarshal the Python subprocess output.
|
||||||
|
type duckPreviewResult struct {
|
||||||
|
Columns []previewColumn `json:"columns"`
|
||||||
|
Rows [][]string `json:"rows"`
|
||||||
|
TotalRows int64 `json:"total_rows"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// previewDuckDB fetches schema + rows from a DuckDB database via Python subprocess.
|
||||||
|
func previewDuckDB(path, table string, limit, offset int, out *previewResponse) error {
|
||||||
|
pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3"
|
||||||
|
script := fmt.Sprintf(
|
||||||
|
`import duckdb, json, sys
|
||||||
|
path, table, lim, off = %q, %q, %d, %d
|
||||||
|
c = duckdb.connect(path, read_only=True)
|
||||||
|
cols = c.execute(f"DESCRIBE \"{table}\"").fetchall()
|
||||||
|
cols_json = [{"name": r[0], "type": r[1]} for r in cols]
|
||||||
|
rows = c.execute(f"SELECT * FROM \"{table}\" LIMIT {lim} OFFSET {off}").fetchall()
|
||||||
|
rows_str = [[str(v) if v is not None else "" for v in r] for r in rows]
|
||||||
|
total = c.execute(f"SELECT count(*) FROM \"{table}\"").fetchone()[0]
|
||||||
|
print(json.dumps({"columns": cols_json, "rows": rows_str, "total_rows": total}))
|
||||||
|
`, path, table, limit, offset)
|
||||||
|
|
||||||
|
cmd := exec.Command(pyBin, "-c", script)
|
||||||
|
rawOut, err := cmd.Output()
|
||||||
|
if err != nil {
|
||||||
|
// Capture stderr for better error messages.
|
||||||
|
if ee, ok := err.(*exec.ExitError); ok {
|
||||||
|
return fmt.Errorf("python subprocess: %w: %s", err, string(ee.Stderr))
|
||||||
|
}
|
||||||
|
return fmt.Errorf("python subprocess: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var result duckPreviewResult
|
||||||
|
if err := json.Unmarshal(rawOut, &result); err != nil {
|
||||||
|
return fmt.Errorf("parse subprocess output: %w", err)
|
||||||
|
}
|
||||||
|
out.Columns = result.Columns
|
||||||
|
out.TotalRows = result.TotalRows
|
||||||
|
out.Rows = result.Rows
|
||||||
|
if out.Rows == nil {
|
||||||
|
out.Rows = [][]string{}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleDataFactoryTables(w http.ResponseWriter, r *http.Request) {
|
||||||
|
db, err := s.dataFactoryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
rows, err := db.QueryContext(ctx, `
|
||||||
|
SELECT id, kind, label, uri FROM databases ORDER BY kind, label`)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
// Resolve registry root for relative URIs.
|
||||||
|
root := os.Getenv("FN_REGISTRY_ROOT")
|
||||||
|
if root == "" {
|
||||||
|
root = "/home/lucas/fn_registry"
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]dataFactoryTable, 0, 16)
|
||||||
|
for rows.Next() {
|
||||||
|
var id, kind, label, uri string
|
||||||
|
if err := rows.Scan(&id, &kind, &label, &uri); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
path := uri
|
||||||
|
if !filepath.IsAbs(path) {
|
||||||
|
path = filepath.Join(root, uri)
|
||||||
|
}
|
||||||
|
// Existence check.
|
||||||
|
if _, errStat := os.Stat(path); errStat != nil {
|
||||||
|
out = append(out, dataFactoryTable{
|
||||||
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
||||||
|
TableName: "(file not found)", Error: errStat.Error(),
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
tables, errTables := listTablesForDatabase(kind, path)
|
||||||
|
if errTables != nil {
|
||||||
|
out = append(out, dataFactoryTable{
|
||||||
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
||||||
|
TableName: "(error)", Error: errTables.Error(),
|
||||||
|
})
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, t := range tables {
|
||||||
|
out = append(out, dataFactoryTable{
|
||||||
|
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
|
||||||
|
TableName: t.Name, RowCount: t.RowCount,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"tables": out, "count": len(out)})
|
||||||
|
}
|
||||||
@@ -0,0 +1,132 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
registryDBOnce sync.Once
|
||||||
|
registryDBConn *sql.DB
|
||||||
|
registryDBErr error
|
||||||
|
)
|
||||||
|
|
||||||
|
func resolveRegistryDBPath() string {
|
||||||
|
if p := os.Getenv("FN_REGISTRY_DB"); p != "" {
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
if root := os.Getenv("FN_REGISTRY_ROOT"); root != "" {
|
||||||
|
return filepath.Join(root, "registry.db")
|
||||||
|
}
|
||||||
|
return "/home/lucas/fn_registry/registry.db"
|
||||||
|
}
|
||||||
|
|
||||||
|
func openRegistryDB() (*sql.DB, error) {
|
||||||
|
registryDBOnce.Do(func() {
|
||||||
|
path := resolveRegistryDBPath()
|
||||||
|
// Read-only access — registry.db es source of truth gestionada via `fn index`.
|
||||||
|
db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro&_query_only=1")
|
||||||
|
if err != nil {
|
||||||
|
registryDBErr = err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
registryDBConn = db
|
||||||
|
})
|
||||||
|
return registryDBConn, registryDBErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleFunctionByID(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
if id == "" {
|
||||||
|
writeError(w, http.StatusBadRequest, "id is required")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := openRegistryDB()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Try functions table first.
|
||||||
|
const fnQuery = `SELECT id, name, kind, lang, domain, version, purity, signature,
|
||||||
|
description, tags, returns, returns_optional, error_type, file_path, code, documentation
|
||||||
|
FROM functions WHERE id = ?`
|
||||||
|
var (
|
||||||
|
fnID, name, kind, lang, domain, version, purity, signature string
|
||||||
|
description, tags, returns, errorType, filePath string
|
||||||
|
code, documentation string
|
||||||
|
returnsOptional bool
|
||||||
|
)
|
||||||
|
err = db.QueryRowContext(ctx, fnQuery, id).Scan(
|
||||||
|
&fnID, &name, &kind, &lang, &domain, &version, &purity, &signature,
|
||||||
|
&description, &tags, &returns, &returnsOptional, &errorType, &filePath,
|
||||||
|
&code, &documentation,
|
||||||
|
)
|
||||||
|
if err == nil {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"entity": "function",
|
||||||
|
"id": fnID,
|
||||||
|
"name": name,
|
||||||
|
"kind": kind,
|
||||||
|
"lang": lang,
|
||||||
|
"domain": domain,
|
||||||
|
"version": version,
|
||||||
|
"purity": purity,
|
||||||
|
"signature": signature,
|
||||||
|
"description": description,
|
||||||
|
"tags": tags,
|
||||||
|
"returns": returns,
|
||||||
|
"returns_optional": returnsOptional,
|
||||||
|
"error_type": errorType,
|
||||||
|
"file_path": filePath,
|
||||||
|
"code": code,
|
||||||
|
"documentation": documentation,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != sql.ErrNoRows {
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: types table.
|
||||||
|
const typeQuery = `SELECT id, name, lang, domain, version, description, tags, definition, file_path
|
||||||
|
FROM types WHERE id = ?`
|
||||||
|
var (
|
||||||
|
tID, tName, tLang, tDomain, tVersion string
|
||||||
|
tDescription, tTags, tDefinition string
|
||||||
|
tFilePath string
|
||||||
|
)
|
||||||
|
err = db.QueryRowContext(ctx, typeQuery, id).Scan(
|
||||||
|
&tID, &tName, &tLang, &tDomain, &tVersion,
|
||||||
|
&tDescription, &tTags, &tDefinition, &tFilePath,
|
||||||
|
)
|
||||||
|
if err == nil {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"entity": "type",
|
||||||
|
"id": tID,
|
||||||
|
"name": tName,
|
||||||
|
"lang": tLang,
|
||||||
|
"domain": tDomain,
|
||||||
|
"version": tVersion,
|
||||||
|
"description": tDescription,
|
||||||
|
"tags": tTags,
|
||||||
|
"definition": tDefinition,
|
||||||
|
"file_path": tFilePath,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
writeError(w, http.StatusNotFound, "not_found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
|
}
|
||||||
+7
-7
@@ -45,7 +45,7 @@ func setupTestDB(t *testing.T) (*DBPool, string) {
|
|||||||
|
|
||||||
func TestHealthEndpoint(t *testing.T) {
|
func TestHealthEndpoint(t *testing.T) {
|
||||||
pool := NewDBPool()
|
pool := NewDBPool()
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -68,7 +68,7 @@ func TestDatabasesEndpoint(t *testing.T) {
|
|||||||
pool.Register(DBEntry{Alias: "registry", Path: "/fake/path", Kind: "registry"})
|
pool.Register(DBEntry{Alias: "registry", Path: "/fake/path", Kind: "registry"})
|
||||||
pool.Register(DBEntry{Alias: "ops:myapp", Path: "/fake/path2", Kind: "operations"})
|
pool.Register(DBEntry{Alias: "ops:myapp", Path: "/fake/path2", Kind: "operations"})
|
||||||
|
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ func TestQueryEndpoint(t *testing.T) {
|
|||||||
pool, _ := setupTestDB(t)
|
pool, _ := setupTestDB(t)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -119,7 +119,7 @@ func TestQueryRejectsWrite(t *testing.T) {
|
|||||||
pool, _ := setupTestDB(t)
|
pool, _ := setupTestDB(t)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -146,7 +146,7 @@ func TestTablesEndpoint(t *testing.T) {
|
|||||||
pool, _ := setupTestDB(t)
|
pool, _ := setupTestDB(t)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -178,7 +178,7 @@ func TestSchemaEndpoint(t *testing.T) {
|
|||||||
pool, _ := setupTestDB(t)
|
pool, _ := setupTestDB(t)
|
||||||
defer pool.Close()
|
defer pool.Close()
|
||||||
|
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
@@ -200,7 +200,7 @@ func TestSchemaEndpoint(t *testing.T) {
|
|||||||
|
|
||||||
func TestNotFoundDB(t *testing.T) {
|
func TestNotFoundDB(t *testing.T) {
|
||||||
pool := NewDBPool()
|
pool := NewDBPool()
|
||||||
srv := NewServer(pool, "")
|
srv := NewServer(pool, "", "", "")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
bind := flag.String("bind", "127.0.0.1:8484", "address to bind")
|
bind := flag.String("bind", "127.0.0.1:8484", "address to bind")
|
||||||
|
dataFactoryDB := flag.String("data-factory-db", "", "path to data_factory.db (default: <root>/apps/data_factory/data_factory.db)")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
root := findRegistryRoot()
|
root := findRegistryRoot()
|
||||||
@@ -24,7 +25,14 @@ func main() {
|
|||||||
log.Printf("registered database: %s (%s)", entry.Alias, entry.Path)
|
log.Printf("registered database: %s (%s)", entry.Alias, entry.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
srv := NewServer(pool, root)
|
dfPath := *dataFactoryDB
|
||||||
|
if dfPath == "" {
|
||||||
|
dfPath = filepath.Join(root, "apps", "data_factory", "data_factory.db")
|
||||||
|
}
|
||||||
|
dfMigrations := filepath.Join(root, "apps", "data_factory", "migrations")
|
||||||
|
log.Printf("data_factory db: %s (migrations: %s)", dfPath, dfMigrations)
|
||||||
|
|
||||||
|
srv := NewServer(pool, root, dfPath, dfMigrations)
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
srv.Routes(mux)
|
srv.Routes(mux)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user