Files
fn_registry/apps/dag_engine/store/store.go
T
egutierrez 212875ed0d chore: auto-commit (286 archivos)
- .claude/agents/fn-orquestador/SKILL.md
- .claude/commands/fn_claude.md
- .claude/rules/INDEX.md
- .claude/rules/cpp_apps.md
- .claude/rules/ids_naming.md
- CHANGELOG.md
- apps/dag_engine/README.md
- apps/dag_engine/api.go
- apps/dag_engine/dags_migrated/example.yaml
- apps/dag_engine/dags_migrated/example_lineage_tracking.yaml
- ...

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-16 16:33:22 +02:00

267 lines
7.5 KiB
Go

package store
import (
"database/sql"
"embed"
"fmt"
"io/fs"
"sort"
"strings"
"time"
_ "github.com/mattn/go-sqlite3"
)
//go:embed migrations/*.sql
var migrationsFS embed.FS
// applyMigrations executes every embedded migrations/*.sql in order.
// Each statement is idempotent (IF NOT EXISTS / ADD COLUMN). Duplicate-column
// errors from re-running ALTER TABLE ADD COLUMN are tolerated.
func applyMigrations(conn *sql.DB) error {
files, err := fs.Glob(migrationsFS, "migrations/*.sql")
if err != nil {
return err
}
sort.Strings(files)
for _, f := range files {
b, err := migrationsFS.ReadFile(f)
if err != nil {
return fmt.Errorf("%s: read: %w", f, err)
}
if _, err := conn.Exec(string(b)); err != nil {
if strings.Contains(err.Error(), "duplicate column") ||
strings.Contains(err.Error(), "already exists") {
continue
}
return fmt.Errorf("%s: %w", f, err)
}
}
return nil
}
// DB wraps a SQLite connection for DAG run persistence.
type DB struct {
conn *sql.DB
path string
}
// Open opens or creates a DAG engine database at the given path.
func Open(path string) (*DB, error) {
conn, err := sql.Open("sqlite3", path+"?_journal_mode=WAL&_busy_timeout=5000&_foreign_keys=on")
if err != nil {
return nil, fmt.Errorf("store: open %s: %w", path, err)
}
if err := applyMigrations(conn); err != nil {
conn.Close()
return nil, fmt.Errorf("store: migrate: %w", err)
}
return &DB{conn: conn, path: path}, nil
}
// Close closes the database connection.
func (db *DB) Close() error {
return db.conn.Close()
}
// Conn exposes the underlying *sql.DB for read-only queries from other
// packages (e.g. WS hub in events.go). Do not Close() the returned conn.
func (db *DB) Conn() *sql.DB {
return db.conn
}
// --- DagRun CRUD ---
// DagRun mirrors infra.DagRun for the store layer.
type DagRun struct {
ID string `json:"id"`
DagName string `json:"dag_name"`
DagPath string `json:"dag_path"`
Status string `json:"status"`
Trigger string `json:"trigger"`
StartedAt time.Time `json:"started_at"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
Error string `json:"error,omitempty"`
}
// CreateRun inserts a new run record.
func (db *DB) CreateRun(run *DagRun) error {
_, err := db.conn.Exec(
`INSERT INTO dag_runs (id, dag_name, dag_path, status, trigger, started_at, error)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
run.ID, run.DagName, run.DagPath, run.Status, run.Trigger,
run.StartedAt.Format(time.RFC3339), run.Error,
)
return err
}
// UpdateRunStatus updates a run's status and optionally its finished_at and error.
func (db *DB) UpdateRunStatus(id, status string, finishedAt *time.Time, errMsg string) error {
var fin *string
if finishedAt != nil {
s := finishedAt.Format(time.RFC3339)
fin = &s
}
_, err := db.conn.Exec(
`UPDATE dag_runs SET status=?, finished_at=?, error=? WHERE id=?`,
status, fin, errMsg, id,
)
return err
}
// GetRun retrieves a single run by ID.
func (db *DB) GetRun(id string) (*DagRun, error) {
row := db.conn.QueryRow(
`SELECT id, dag_name, dag_path, status, trigger, started_at, finished_at, error
FROM dag_runs WHERE id=?`, id,
)
return scanRun(row)
}
// ListRuns returns runs, newest first, with optional dag name filter.
func (db *DB) ListRuns(dagName string, limit, offset int) ([]DagRun, int, error) {
var total int
var args []interface{}
where := ""
if dagName != "" {
where = " WHERE dag_name=?"
args = append(args, dagName)
}
err := db.conn.QueryRow("SELECT COUNT(*) FROM dag_runs"+where, args...).Scan(&total)
if err != nil {
return nil, 0, err
}
query := "SELECT id, dag_name, dag_path, status, trigger, started_at, finished_at, error FROM dag_runs" +
where + " ORDER BY started_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := db.conn.Query(query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var runs []DagRun
for rows.Next() {
r, err := scanRunRows(rows)
if err != nil {
return nil, 0, err
}
runs = append(runs, *r)
}
return runs, total, rows.Err()
}
// --- DagStepResult CRUD ---
// DagStepResult mirrors infra.DagStepResult for the store layer.
type DagStepResult struct {
ID string `json:"id"`
RunID string `json:"run_id"`
StepName string `json:"step_name"`
FunctionID string `json:"function_id,omitempty"`
Status string `json:"status"`
ExitCode int `json:"exit_code"`
Stdout string `json:"stdout,omitempty"`
Stderr string `json:"stderr,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
FinishedAt *time.Time `json:"finished_at,omitempty"`
DurationMs int64 `json:"duration_ms"`
Error string `json:"error,omitempty"`
}
// InsertStepResult inserts a new step result.
func (db *DB) InsertStepResult(r *DagStepResult) error {
var startedAt, finishedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.FinishedAt != nil {
s := r.FinishedAt.Format(time.RFC3339)
finishedAt = &s
}
_, err := db.conn.Exec(
`INSERT INTO dag_step_results (id, run_id, step_name, function_id, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms, error)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
r.ID, r.RunID, r.StepName, r.FunctionID, r.Status, r.ExitCode, r.Stdout, r.Stderr,
startedAt, finishedAt, r.DurationMs, r.Error,
)
return err
}
// UpdateStepResult updates a step result by ID.
func (db *DB) UpdateStepResult(id, status string, exitCode int, stdout, stderr string, finishedAt *time.Time, durationMs int64, errMsg string) error {
var fin *string
if finishedAt != nil {
s := finishedAt.Format(time.RFC3339)
fin = &s
}
_, err := db.conn.Exec(
`UPDATE dag_step_results SET status=?, exit_code=?, stdout=?, stderr=?, finished_at=?, duration_ms=?, error=? WHERE id=?`,
status, exitCode, stdout, stderr, fin, durationMs, errMsg, id,
)
return err
}
// ListStepResults returns all step results for a given run.
func (db *DB) ListStepResults(runID string) ([]DagStepResult, error) {
rows, err := db.conn.Query(
`SELECT id, run_id, step_name, function_id, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms, error
FROM dag_step_results WHERE run_id=? ORDER BY started_at ASC`, runID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var results []DagStepResult
for rows.Next() {
var r DagStepResult
var startedAt, finishedAt sql.NullString
if err := rows.Scan(&r.ID, &r.RunID, &r.StepName, &r.FunctionID, &r.Status, &r.ExitCode,
&r.Stdout, &r.Stderr, &startedAt, &finishedAt, &r.DurationMs, &r.Error); err != nil {
return nil, err
}
if startedAt.Valid {
t, _ := time.Parse(time.RFC3339, startedAt.String)
r.StartedAt = &t
}
if finishedAt.Valid {
t, _ := time.Parse(time.RFC3339, finishedAt.String)
r.FinishedAt = &t
}
results = append(results, r)
}
return results, rows.Err()
}
// --- scan helpers ---
type scanner interface {
Scan(dest ...interface{}) error
}
func scanRun(s scanner) (*DagRun, error) {
var r DagRun
var startedAt string
var finishedAt sql.NullString
if err := s.Scan(&r.ID, &r.DagName, &r.DagPath, &r.Status, &r.Trigger, &startedAt, &finishedAt, &r.Error); err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
r.StartedAt, _ = time.Parse(time.RFC3339, startedAt)
if finishedAt.Valid {
t, _ := time.Parse(time.RFC3339, finishedAt.String)
r.FinishedAt = &t
}
return &r, nil
}
func scanRunRows(rows *sql.Rows) (*DagRun, error) {
return scanRun(rows)
}