docs(flows): DoD obligatorio con user-facing surface + abrir issues 0100-0103 (taxonomia, frontmatter migration, dev_console, work dashboard)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Egutierrez
2026-05-17 00:07:04 +02:00
parent 11c986edc7
commit 155a6db824
4 changed files with 545 additions and 3 deletions
+318 -3
View File
@@ -15,7 +15,13 @@ package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
)
@@ -49,6 +55,8 @@ type dataFactoryRun struct {
Trigger string `json:"trigger"`
Error string `json:"error"`
Notes string `json:"notes"`
StorageDBID string `json:"storage_db_id"`
StorageTable string `json:"storage_table"`
}
// dataFactoryDatabase is the JSON row for /api/datafactory/databases.
@@ -136,7 +144,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
if nodeID != "" {
rows, err = db.QueryContext(ctx, `
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
FROM runs
WHERE node_id = ?
ORDER BY started_at DESC
@@ -144,7 +153,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
} else {
rows, err = db.QueryContext(ctx, `
SELECT id, node_id, started_at, COALESCE(finished_at,''), status,
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes
rows_in, rows_out, kb_in, kb_out, duration_ms, trigger, error, notes,
COALESCE(storage_db_id,''), COALESCE(storage_table,'')
FROM runs
ORDER BY started_at DESC
LIMIT ?`, limit)
@@ -160,7 +170,8 @@ func (s *Server) handleDataFactoryRuns(w http.ResponseWriter, r *http.Request) {
var rr dataFactoryRun
if err := rows.Scan(&rr.ID, &rr.NodeID, &rr.StartedAt, &rr.FinishedAt,
&rr.Status, &rr.RowsIn, &rr.RowsOut, &rr.KbIn, &rr.KbOut,
&rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes); err != nil {
&rr.DurationMS, &rr.Trigger, &rr.Error, &rr.Notes,
&rr.StorageDBID, &rr.StorageTable); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
@@ -203,3 +214,307 @@ func (s *Server) handleDataFactoryDatabases(w http.ResponseWriter, r *http.Reque
}
writeJSON(w, http.StatusOK, map[string]any{"databases": out, "count": len(out)})
}
// dataFactoryTable is the JSON row for /api/datafactory/tables.
type dataFactoryTable struct {
DatabaseID string `json:"database_id"`
DatabaseLabel string `json:"database_label"`
DatabaseKind string `json:"database_kind"`
TableName string `json:"table_name"`
RowCount int64 `json:"row_count"`
Error string `json:"error,omitempty"`
}
// tableNameRE restricts table names to alphanumeric + underscore + dot.
// This prevents SQL injection in the preview queries.
var tableNameRE = regexp.MustCompile(`^[A-Za-z0-9_.]+$`)
// previewColumn is a JSON column descriptor for the preview response.
type previewColumn struct {
Name string `json:"name"`
Type string `json:"type"`
}
// previewResponse is the JSON shape of /api/datafactory/preview.
type previewResponse struct {
DatabaseID string `json:"database_id"`
TableName string `json:"table_name"`
Columns []previewColumn `json:"columns"`
Rows [][]string `json:"rows"`
TotalRows int64 `json:"total_rows"`
Limit int `json:"limit"`
Offset int `json:"offset"`
}
// handleDataFactoryPreview serves:
//
// GET /api/datafactory/preview?database_id=<id>&table=<name>&limit=N&offset=N
//
// Supports kind="sqlite" and kind="duckdb" databases registered in
// data_factory.databases. Returns up to 1000 rows (default 100).
func (s *Server) handleDataFactoryPreview(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
databaseID := q.Get("database_id")
tableName := q.Get("table")
if databaseID == "" || tableName == "" {
writeError(w, http.StatusBadRequest, "database_id and table are required")
return
}
if !tableNameRE.MatchString(tableName) {
writeError(w, http.StatusBadRequest,
fmt.Sprintf("invalid table name %q: only [A-Za-z0-9_.] allowed", tableName))
return
}
limit := 100
if l := q.Get("limit"); l != "" {
if n, err := strconv.Atoi(l); err == nil {
switch {
case n < 1:
limit = 1
case n > 1000:
limit = 1000
default:
limit = n
}
}
}
offset := 0
if o := q.Get("offset"); o != "" {
if n, err := strconv.Atoi(o); err == nil && n >= 0 {
offset = n
}
}
// Look up the database in data_factory.db.
dfDB, err := s.dataFactoryDB()
if err != nil {
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
return
}
ctx, cancel := context.WithTimeout(r.Context(), 30*queryTimeout)
defer cancel()
var dbKind, dbURI string
row := dfDB.QueryRowContext(ctx, `SELECT kind, uri FROM databases WHERE id = ?`, databaseID)
if err := row.Scan(&dbKind, &dbURI); err != nil {
if err == sql.ErrNoRows {
writeError(w, http.StatusNotFound, fmt.Sprintf("database_id %q not found", databaseID))
} else {
writeError(w, http.StatusInternalServerError, err.Error())
}
return
}
// Resolve absolute path.
root := os.Getenv("FN_REGISTRY_ROOT")
if root == "" {
root = "/home/lucas/fn_registry"
}
dbPath := dbURI
if !filepath.IsAbs(dbPath) {
dbPath = filepath.Join(root, dbURI)
}
var resp previewResponse
resp.DatabaseID = databaseID
resp.TableName = tableName
resp.Limit = limit
resp.Offset = offset
switch dbKind {
case "sqlite":
if err := previewSQLite(dbPath, tableName, limit, offset, &resp); err != nil {
writeError(w, http.StatusInternalServerError, "sqlite preview: "+err.Error())
return
}
case "duckdb":
if err := previewDuckDB(dbPath, tableName, limit, offset, &resp); err != nil {
writeError(w, http.StatusInternalServerError, "duckdb preview: "+err.Error())
return
}
default:
writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported database kind: %q", dbKind))
return
}
writeJSON(w, http.StatusOK, resp)
}
// previewSQLite fetches schema + rows from a SQLite database.
func previewSQLite(path, table string, limit, offset int, out *previewResponse) error {
db, err := sql.Open("sqlite3", "file:"+path+"?mode=ro")
if err != nil {
return err
}
defer db.Close()
// Schema via PRAGMA table_info.
rows, err := db.Query(fmt.Sprintf(`PRAGMA table_info("%s")`, table))
if err != nil {
return fmt.Errorf("PRAGMA table_info: %w", err)
}
defer rows.Close()
for rows.Next() {
var cid int
var name, typ, dflt string
var notNull, pk int
if err := rows.Scan(&cid, &name, &typ, &notNull, &dflt, &pk); err != nil {
return err
}
out.Columns = append(out.Columns, previewColumn{Name: name, Type: typ})
}
_ = rows.Close()
if len(out.Columns) == 0 {
return fmt.Errorf("table %q not found or has no columns", table)
}
// Total row count.
cntRow := db.QueryRow(fmt.Sprintf(`SELECT count(*) FROM "%s"`, table))
_ = cntRow.Scan(&out.TotalRows)
// Data rows.
dataRows, err := db.Query(
fmt.Sprintf(`SELECT * FROM "%s" LIMIT ? OFFSET ?`, table),
limit, offset,
)
if err != nil {
return fmt.Errorf("SELECT: %w", err)
}
defer dataRows.Close()
nCols := len(out.Columns)
for dataRows.Next() {
vals := make([]interface{}, nCols)
ptrs := make([]interface{}, nCols)
for i := range vals {
ptrs[i] = &vals[i]
}
if err := dataRows.Scan(ptrs...); err != nil {
return err
}
row := make([]string, nCols)
for i, v := range vals {
switch t := v.(type) {
case nil:
row[i] = ""
case []byte:
row[i] = string(t)
default:
row[i] = fmt.Sprintf("%v", t)
}
}
out.Rows = append(out.Rows, row)
}
if out.Rows == nil {
out.Rows = [][]string{}
}
return nil
}
// duckPreviewResult is used to unmarshal the Python subprocess output.
type duckPreviewResult struct {
Columns []previewColumn `json:"columns"`
Rows [][]string `json:"rows"`
TotalRows int64 `json:"total_rows"`
}
// previewDuckDB fetches schema + rows from a DuckDB database via Python subprocess.
func previewDuckDB(path, table string, limit, offset int, out *previewResponse) error {
pyBin := "/home/lucas/fn_registry/python/.venv/bin/python3"
script := fmt.Sprintf(
`import duckdb, json, sys
path, table, lim, off = %q, %q, %d, %d
c = duckdb.connect(path, read_only=True)
cols = c.execute(f"DESCRIBE \"{table}\"").fetchall()
cols_json = [{"name": r[0], "type": r[1]} for r in cols]
rows = c.execute(f"SELECT * FROM \"{table}\" LIMIT {lim} OFFSET {off}").fetchall()
rows_str = [[str(v) if v is not None else "" for v in r] for r in rows]
total = c.execute(f"SELECT count(*) FROM \"{table}\"").fetchone()[0]
print(json.dumps({"columns": cols_json, "rows": rows_str, "total_rows": total}))
`, path, table, limit, offset)
cmd := exec.Command(pyBin, "-c", script)
rawOut, err := cmd.Output()
if err != nil {
// Capture stderr for better error messages.
if ee, ok := err.(*exec.ExitError); ok {
return fmt.Errorf("python subprocess: %w: %s", err, string(ee.Stderr))
}
return fmt.Errorf("python subprocess: %w", err)
}
var result duckPreviewResult
if err := json.Unmarshal(rawOut, &result); err != nil {
return fmt.Errorf("parse subprocess output: %w", err)
}
out.Columns = result.Columns
out.TotalRows = result.TotalRows
out.Rows = result.Rows
if out.Rows == nil {
out.Rows = [][]string{}
}
return nil
}
func (s *Server) handleDataFactoryTables(w http.ResponseWriter, r *http.Request) {
db, err := s.dataFactoryDB()
if err != nil {
writeError(w, http.StatusServiceUnavailable, "data_factory.db unavailable: "+err.Error())
return
}
ctx, cancel := context.WithTimeout(r.Context(), queryTimeout)
defer cancel()
rows, err := db.QueryContext(ctx, `
SELECT id, kind, label, uri FROM databases ORDER BY kind, label`)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
// Resolve registry root for relative URIs.
root := os.Getenv("FN_REGISTRY_ROOT")
if root == "" {
root = "/home/lucas/fn_registry"
}
out := make([]dataFactoryTable, 0, 16)
for rows.Next() {
var id, kind, label, uri string
if err := rows.Scan(&id, &kind, &label, &uri); err != nil {
continue
}
path := uri
if !filepath.IsAbs(path) {
path = filepath.Join(root, uri)
}
// Existence check.
if _, errStat := os.Stat(path); errStat != nil {
out = append(out, dataFactoryTable{
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
TableName: "(file not found)", Error: errStat.Error(),
})
continue
}
tables, errTables := listTablesForDatabase(kind, path)
if errTables != nil {
out = append(out, dataFactoryTable{
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
TableName: "(error)", Error: errTables.Error(),
})
continue
}
for _, t := range tables {
out = append(out, dataFactoryTable{
DatabaseID: id, DatabaseLabel: label, DatabaseKind: kind,
TableName: t.Name, RowCount: t.RowCount,
})
}
}
writeJSON(w, http.StatusOK, map[string]any{"tables": out, "count": len(out)})
}