9660a1c432
UnitTest en registry con Insert, GetByFunction, Search FTS5, Purge. E2ETest en fn_operations con Insert, Get, List, UpdateResult, Delete. Ambos con scan helpers y serialización JSON.
1025 lines
30 KiB
Go
1025 lines
30 KiB
Go
package fn_operations
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
func marshalStrings(ss []string) string {
|
|
if ss == nil {
|
|
ss = []string{}
|
|
}
|
|
b, _ := json.Marshal(ss)
|
|
return string(b)
|
|
}
|
|
|
|
func unmarshalStrings(s string) []string {
|
|
var out []string
|
|
json.Unmarshal([]byte(s), &out)
|
|
if out == nil {
|
|
out = []string{}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func marshalJSON(v map[string]any) string {
|
|
if v == nil {
|
|
v = map[string]any{}
|
|
}
|
|
b, _ := json.Marshal(v)
|
|
return string(b)
|
|
}
|
|
|
|
func unmarshalJSON(s string) map[string]any {
|
|
var out map[string]any
|
|
json.Unmarshal([]byte(s), &out)
|
|
if out == nil {
|
|
out = map[string]any{}
|
|
}
|
|
return out
|
|
}
|
|
|
|
// --- TypeSnapshot CRUD ---
|
|
|
|
// InsertTypeSnapshot inserts a type snapshot.
|
|
func (db *DB) InsertTypeSnapshot(ts *TypeSnapshot) error {
|
|
if ts.SnappedAt.IsZero() {
|
|
ts.SnappedAt = time.Now().UTC()
|
|
}
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR IGNORE INTO types_snapshot (id, version, lang, algebraic, definition, description, snapped_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
|
ts.ID, ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
|
|
ts.SnappedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// UpdateTypeSnapshot replaces an existing snapshot with a new version.
|
|
func (db *DB) UpdateTypeSnapshot(ts *TypeSnapshot) error {
|
|
if ts.SnappedAt.IsZero() {
|
|
ts.SnappedAt = time.Now().UTC()
|
|
}
|
|
_, err := db.conn.Exec(`
|
|
UPDATE types_snapshot SET version=?, lang=?, algebraic=?, definition=?, description=?, snapped_at=?
|
|
WHERE id=?`,
|
|
ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
|
|
ts.SnappedAt.Format(time.RFC3339), ts.ID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetTypeSnapshot returns a type snapshot by ID.
|
|
func (db *DB) GetTypeSnapshot(id string) (*TypeSnapshot, error) {
|
|
row := db.conn.QueryRow("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot WHERE id = ?", id)
|
|
var ts TypeSnapshot
|
|
var snappedAt string
|
|
err := row.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
|
|
}
|
|
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
|
|
return &ts, nil
|
|
}
|
|
|
|
// ListTypeSnapshots returns all type snapshots.
|
|
func (db *DB) ListTypeSnapshots() ([]TypeSnapshot, error) {
|
|
rows, err := db.conn.Query("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot ORDER BY id")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []TypeSnapshot
|
|
for rows.Next() {
|
|
var ts TypeSnapshot
|
|
var snappedAt string
|
|
if err := rows.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt); err != nil {
|
|
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
|
|
}
|
|
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
|
|
result = append(result, ts)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- Entity CRUD ---
|
|
|
|
// InsertEntity inserts or replaces an entity.
|
|
func (db *DB) InsertEntity(e *Entity) error {
|
|
now := time.Now().UTC()
|
|
if e.CreatedAt.IsZero() {
|
|
e.CreatedAt = now
|
|
}
|
|
e.UpdatedAt = now
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
e.ID, e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
|
|
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
|
|
e.CreatedAt.Format(time.RFC3339), e.UpdatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetEntity returns an entity by ID.
|
|
func (db *DB) GetEntity(id string) (*Entity, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at
|
|
FROM entities WHERE id = ?`, id)
|
|
|
|
var e Entity
|
|
var tagsJSON, metadataJSON, createdAt, updatedAt string
|
|
err := row.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
|
|
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning entity: %w", err)
|
|
}
|
|
e.Tags = unmarshalStrings(tagsJSON)
|
|
e.Metadata = unmarshalJSON(metadataJSON)
|
|
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
return &e, nil
|
|
}
|
|
|
|
// UpdateEntity updates an existing entity.
|
|
func (db *DB) UpdateEntity(e *Entity) error {
|
|
e.UpdatedAt = time.Now().UTC()
|
|
_, err := db.conn.Exec(`
|
|
UPDATE entities SET name=?, type_ref=?, status=?, description=?, domain=?, tags=?, source=?, metadata=?, notes=?, updated_at=?
|
|
WHERE id=?`,
|
|
e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
|
|
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
|
|
e.UpdatedAt.Format(time.RFC3339), e.ID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// DeleteEntity removes an entity by ID.
|
|
func (db *DB) DeleteEntity(id string) error {
|
|
_, err := db.conn.Exec("DELETE FROM entities WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
// ListEntities returns entities filtered by domain and/or status.
|
|
func (db *DB) ListEntities(domain string, status EntityStatus) ([]Entity, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if domain != "" {
|
|
where = append(where, "domain = ?")
|
|
args = append(args, domain)
|
|
}
|
|
if status != "" {
|
|
where = append(where, "status = ?")
|
|
args = append(args, string(status))
|
|
}
|
|
|
|
q := "SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at FROM entities"
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanEntities(rows)
|
|
}
|
|
|
|
// SearchEntities performs FTS search on entities.
|
|
func (db *DB) SearchEntities(query, domain string) ([]Entity, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if query != "" {
|
|
where = append(where, "e.id IN (SELECT id FROM entities_fts WHERE entities_fts MATCH ?)")
|
|
args = append(args, query)
|
|
}
|
|
if domain != "" {
|
|
where = append(where, "e.domain = ?")
|
|
args = append(args, domain)
|
|
}
|
|
|
|
q := "SELECT e.id, e.name, e.type_ref, e.status, e.description, e.domain, e.tags, e.source, e.metadata, e.notes, e.created_at, e.updated_at FROM entities e"
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY e.name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanEntities(rows)
|
|
}
|
|
|
|
func scanEntities(rows *sql.Rows) ([]Entity, error) {
|
|
var result []Entity
|
|
for rows.Next() {
|
|
var e Entity
|
|
var tagsJSON, metadataJSON, createdAt, updatedAt string
|
|
if err := rows.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
|
|
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt); err != nil {
|
|
return nil, fmt.Errorf("scanning entity: %w", err)
|
|
}
|
|
e.Tags = unmarshalStrings(tagsJSON)
|
|
e.Metadata = unmarshalJSON(metadataJSON)
|
|
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
result = append(result, e)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- Relation CRUD ---
|
|
|
|
// InsertRelation inserts or replaces a relation.
|
|
func (db *DB) InsertRelation(r *Relation) error {
|
|
now := time.Now().UTC()
|
|
if r.CreatedAt.IsZero() {
|
|
r.CreatedAt = now
|
|
}
|
|
r.UpdatedAt = now
|
|
|
|
var startedAt, endedAt *string
|
|
if r.StartedAt != nil {
|
|
s := r.StartedAt.Format(time.RFC3339)
|
|
startedAt = &s
|
|
}
|
|
if r.EndedAt != nil {
|
|
s := r.EndedAt.Format(time.RFC3339)
|
|
endedAt = &s
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
|
|
r.Purity, string(r.Direction), r.Weight, string(r.Status),
|
|
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
|
|
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetRelation returns a relation by ID.
|
|
func (db *DB) GetRelation(id string) (*Relation, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
|
|
FROM relations WHERE id = ?`, id)
|
|
return scanRelation(row)
|
|
}
|
|
|
|
// UpdateRelation updates an existing relation.
|
|
func (db *DB) UpdateRelation(r *Relation) error {
|
|
r.UpdatedAt = time.Now().UTC()
|
|
|
|
var startedAt, endedAt *string
|
|
if r.StartedAt != nil {
|
|
s := r.StartedAt.Format(time.RFC3339)
|
|
startedAt = &s
|
|
}
|
|
if r.EndedAt != nil {
|
|
s := r.EndedAt.Format(time.RFC3339)
|
|
endedAt = &s
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
UPDATE relations SET name=?, from_entity=?, to_entity=?, via=?, description=?, purity=?, direction=?, weight=?, status=?, started_at=?, ended_at=?, "order"=?, tags=?, notes=?, updated_at=?
|
|
WHERE id=?`,
|
|
r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
|
|
r.Purity, string(r.Direction), r.Weight, string(r.Status),
|
|
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
|
|
r.UpdatedAt.Format(time.RFC3339), r.ID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// DeleteRelation removes a relation by ID (cascades to relation_inputs).
|
|
func (db *DB) DeleteRelation(id string) error {
|
|
_, err := db.conn.Exec("DELETE FROM relations WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
// ListRelations returns all relations, optionally filtered by entity involvement.
|
|
func (db *DB) ListRelations(entityID string) ([]Relation, error) {
|
|
q := `SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at FROM relations`
|
|
var args []any
|
|
if entityID != "" {
|
|
q += " WHERE from_entity = ? OR to_entity = ?"
|
|
args = append(args, entityID, entityID)
|
|
}
|
|
q += " ORDER BY name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []Relation
|
|
for rows.Next() {
|
|
r, err := scanRelationFromRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, *r)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetRelationsFrom returns all relations where from_entity matches.
|
|
func (db *DB) GetRelationsFrom(entityID string) ([]Relation, error) {
|
|
rows, err := db.conn.Query(`
|
|
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
|
|
FROM relations WHERE from_entity = ? ORDER BY name`, entityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []Relation
|
|
for rows.Next() {
|
|
r, err := scanRelationFromRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, *r)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// GetRelationsTo returns all relations where to_entity matches.
|
|
func (db *DB) GetRelationsTo(entityID string) ([]Relation, error) {
|
|
rows, err := db.conn.Query(`
|
|
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
|
|
FROM relations WHERE to_entity = ? ORDER BY name`, entityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []Relation
|
|
for rows.Next() {
|
|
r, err := scanRelationFromRows(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result = append(result, *r)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func scanRelation(row *sql.Row) (*Relation, error) {
|
|
var r Relation
|
|
var tagsJSON, createdAt, updatedAt string
|
|
var startedAt, endedAt *string
|
|
err := row.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
|
|
&r.Purity, &r.Direction, &r.Weight, &r.Status,
|
|
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning relation: %w", err)
|
|
}
|
|
r.Tags = unmarshalStrings(tagsJSON)
|
|
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
if startedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *startedAt)
|
|
r.StartedAt = &t
|
|
}
|
|
if endedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *endedAt)
|
|
r.EndedAt = &t
|
|
}
|
|
return &r, nil
|
|
}
|
|
|
|
func scanRelationFromRows(rows *sql.Rows) (*Relation, error) {
|
|
var r Relation
|
|
var tagsJSON, createdAt, updatedAt string
|
|
var startedAt, endedAt *string
|
|
err := rows.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
|
|
&r.Purity, &r.Direction, &r.Weight, &r.Status,
|
|
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning relation: %w", err)
|
|
}
|
|
r.Tags = unmarshalStrings(tagsJSON)
|
|
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
if startedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *startedAt)
|
|
r.StartedAt = &t
|
|
}
|
|
if endedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *endedAt)
|
|
r.EndedAt = &t
|
|
}
|
|
return &r, nil
|
|
}
|
|
|
|
// --- RelationInput CRUD ---
|
|
|
|
// InsertRelationInput inserts a relation input.
|
|
func (db *DB) InsertRelationInput(ri *RelationInput) error {
|
|
_, err := db.conn.Exec(`
|
|
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
|
|
VALUES (?, ?, ?, ?, ?)`,
|
|
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetRelationInputs returns all inputs for a relation.
|
|
func (db *DB) GetRelationInputs(relationID string) ([]RelationInput, error) {
|
|
rows, err := db.conn.Query(`
|
|
SELECT id, relation_id, entity_id, role, "order"
|
|
FROM relation_inputs WHERE relation_id = ? ORDER BY "order"`, relationID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []RelationInput
|
|
for rows.Next() {
|
|
var ri RelationInput
|
|
if err := rows.Scan(&ri.ID, &ri.RelationID, &ri.EntityID, &ri.Role, &ri.Order); err != nil {
|
|
return nil, fmt.Errorf("scanning relation_input: %w", err)
|
|
}
|
|
result = append(result, ri)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// DeleteRelationInputs removes all inputs for a relation.
|
|
func (db *DB) DeleteRelationInputs(relationID string) error {
|
|
_, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID)
|
|
return err
|
|
}
|
|
|
|
// --- Execution CRUD ---
|
|
|
|
// InsertExecution inserts an execution record.
|
|
func (db *DB) InsertExecution(e *Execution) error {
|
|
if e.CreatedAt.IsZero() {
|
|
e.CreatedAt = time.Now().UTC()
|
|
}
|
|
|
|
var endedAt *string
|
|
if e.EndedAt != nil {
|
|
s := e.EndedAt.Format(time.RFC3339)
|
|
endedAt = &s
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO executions (
|
|
id, pipeline_id, relation_id, status, started_at, ended_at,
|
|
duration_ms, records_in, records_out, error, metrics, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
e.ID, e.PipelineID, e.RelationID, string(e.Status),
|
|
e.StartedAt.Format(time.RFC3339), endedAt,
|
|
e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error,
|
|
marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetExecution returns an execution by ID.
|
|
func (db *DB) GetExecution(id string) (*Execution, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, pipeline_id, relation_id, status, started_at, ended_at,
|
|
duration_ms, records_in, records_out, error, metrics, created_at
|
|
FROM executions WHERE id = ?`, id)
|
|
|
|
var e Execution
|
|
var metricsJSON, createdAt, startedAt string
|
|
var endedAt *string
|
|
err := row.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status,
|
|
&startedAt, &endedAt,
|
|
&e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error,
|
|
&metricsJSON, &createdAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning execution: %w", err)
|
|
}
|
|
e.Metrics = unmarshalJSON(metricsJSON)
|
|
e.StartedAt, _ = time.Parse(time.RFC3339, startedAt)
|
|
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
if endedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *endedAt)
|
|
e.EndedAt = &t
|
|
}
|
|
return &e, nil
|
|
}
|
|
|
|
// ListExecutions returns executions filtered by pipeline, relation, and/or status.
|
|
func (db *DB) ListExecutions(pipelineID, relationID string, status ExecutionStatus) ([]Execution, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if pipelineID != "" {
|
|
where = append(where, "pipeline_id = ?")
|
|
args = append(args, pipelineID)
|
|
}
|
|
if relationID != "" {
|
|
where = append(where, "relation_id = ?")
|
|
args = append(args, relationID)
|
|
}
|
|
if status != "" {
|
|
where = append(where, "status = ?")
|
|
args = append(args, string(status))
|
|
}
|
|
|
|
q := `SELECT id, pipeline_id, relation_id, status, started_at, ended_at,
|
|
duration_ms, records_in, records_out, error, metrics, created_at
|
|
FROM executions`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY created_at DESC"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanExecutions(rows)
|
|
}
|
|
|
|
func scanExecutions(rows *sql.Rows) ([]Execution, error) {
|
|
var result []Execution
|
|
for rows.Next() {
|
|
var e Execution
|
|
var metricsJSON, createdAt, startedAt string
|
|
var endedAt *string
|
|
if err := rows.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status,
|
|
&startedAt, &endedAt,
|
|
&e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error,
|
|
&metricsJSON, &createdAt); err != nil {
|
|
return nil, fmt.Errorf("scanning execution: %w", err)
|
|
}
|
|
e.Metrics = unmarshalJSON(metricsJSON)
|
|
e.StartedAt, _ = time.Parse(time.RFC3339, startedAt)
|
|
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
if endedAt != nil {
|
|
t, _ := time.Parse(time.RFC3339, *endedAt)
|
|
e.EndedAt = &t
|
|
}
|
|
result = append(result, e)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- Assertion CRUD ---
|
|
|
|
// InsertAssertion inserts or replaces an assertion.
|
|
func (db *DB) InsertAssertion(a *Assertion) error {
|
|
if a.CreatedAt.IsZero() {
|
|
a.CreatedAt = time.Now().UTC()
|
|
}
|
|
|
|
active := 0
|
|
if a.Active {
|
|
active = 1
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO assertions (
|
|
id, entity_id, name, kind, rule, severity, description, active, created_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
a.ID, a.EntityID, a.Name, a.Kind, a.Rule,
|
|
string(a.Severity), a.Description, active,
|
|
a.CreatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetAssertion returns an assertion by ID.
|
|
func (db *DB) GetAssertion(id string) (*Assertion, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, entity_id, name, kind, rule, severity, description, active, created_at
|
|
FROM assertions WHERE id = ?`, id)
|
|
|
|
var a Assertion
|
|
var active int
|
|
var createdAt string
|
|
err := row.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule,
|
|
&a.Severity, &a.Description, &active, &createdAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning assertion: %w", err)
|
|
}
|
|
a.Active = active == 1
|
|
a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
return &a, nil
|
|
}
|
|
|
|
// UpdateAssertion updates an existing assertion.
|
|
func (db *DB) UpdateAssertion(a *Assertion) error {
|
|
active := 0
|
|
if a.Active {
|
|
active = 1
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
UPDATE assertions SET entity_id=?, name=?, kind=?, rule=?, severity=?,
|
|
description=?, active=?
|
|
WHERE id=?`,
|
|
a.EntityID, a.Name, a.Kind, a.Rule, string(a.Severity),
|
|
a.Description, active, a.ID,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// DeleteAssertion removes an assertion by ID.
|
|
func (db *DB) DeleteAssertion(id string) error {
|
|
_, err := db.conn.Exec("DELETE FROM assertions WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
// ListAssertions returns assertions filtered by entity and/or active state.
|
|
func (db *DB) ListAssertions(entityID string, active *bool) ([]Assertion, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if entityID != "" {
|
|
where = append(where, "entity_id = ?")
|
|
args = append(args, entityID)
|
|
}
|
|
if active != nil {
|
|
v := 0
|
|
if *active {
|
|
v = 1
|
|
}
|
|
where = append(where, "active = ?")
|
|
args = append(args, v)
|
|
}
|
|
|
|
q := `SELECT id, entity_id, name, kind, rule, severity, description, active, created_at
|
|
FROM assertions`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanAssertions(rows)
|
|
}
|
|
|
|
// SearchAssertions performs FTS search on assertions.
|
|
func (db *DB) SearchAssertions(query, entityID string) ([]Assertion, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if query != "" {
|
|
where = append(where, "a.id IN (SELECT id FROM assertions_fts WHERE assertions_fts MATCH ?)")
|
|
args = append(args, query)
|
|
}
|
|
if entityID != "" {
|
|
where = append(where, "a.entity_id = ?")
|
|
args = append(args, entityID)
|
|
}
|
|
|
|
q := `SELECT a.id, a.entity_id, a.name, a.kind, a.rule, a.severity, a.description, a.active, a.created_at
|
|
FROM assertions a`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY a.name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanAssertions(rows)
|
|
}
|
|
|
|
func scanAssertions(rows *sql.Rows) ([]Assertion, error) {
|
|
var result []Assertion
|
|
for rows.Next() {
|
|
var a Assertion
|
|
var active int
|
|
var createdAt string
|
|
if err := rows.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule,
|
|
&a.Severity, &a.Description, &active, &createdAt); err != nil {
|
|
return nil, fmt.Errorf("scanning assertion: %w", err)
|
|
}
|
|
a.Active = active == 1
|
|
a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
result = append(result, a)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- AssertionResult CRUD ---
|
|
|
|
// InsertAssertionResult inserts an assertion result.
|
|
func (db *DB) InsertAssertionResult(ar *AssertionResult) error {
|
|
if ar.EvaluatedAt.IsZero() {
|
|
ar.EvaluatedAt = time.Now().UTC()
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO assertion_results (
|
|
id, assertion_id, execution_id, status, value, message, evaluated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
|
ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status),
|
|
marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetAssertionResult returns an assertion result by ID.
|
|
func (db *DB) GetAssertionResult(id string) (*AssertionResult, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, assertion_id, execution_id, status, value, message, evaluated_at
|
|
FROM assertion_results WHERE id = ?`, id)
|
|
|
|
var ar AssertionResult
|
|
var valueJSON, evaluatedAt string
|
|
err := row.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status,
|
|
&valueJSON, &ar.Message, &evaluatedAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning assertion_result: %w", err)
|
|
}
|
|
ar.Value = unmarshalJSON(valueJSON)
|
|
ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt)
|
|
return &ar, nil
|
|
}
|
|
|
|
// ListAssertionResults returns results filtered by assertion and/or execution.
|
|
func (db *DB) ListAssertionResults(assertionID, executionID string) ([]AssertionResult, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if assertionID != "" {
|
|
where = append(where, "assertion_id = ?")
|
|
args = append(args, assertionID)
|
|
}
|
|
if executionID != "" {
|
|
where = append(where, "execution_id = ?")
|
|
args = append(args, executionID)
|
|
}
|
|
|
|
q := `SELECT id, assertion_id, execution_id, status, value, message, evaluated_at
|
|
FROM assertion_results`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY evaluated_at DESC"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanAssertionResults(rows)
|
|
}
|
|
|
|
func scanAssertionResults(rows *sql.Rows) ([]AssertionResult, error) {
|
|
var result []AssertionResult
|
|
for rows.Next() {
|
|
var ar AssertionResult
|
|
var valueJSON, evaluatedAt string
|
|
if err := rows.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status,
|
|
&valueJSON, &ar.Message, &evaluatedAt); err != nil {
|
|
return nil, fmt.Errorf("scanning assertion_result: %w", err)
|
|
}
|
|
ar.Value = unmarshalJSON(valueJSON)
|
|
ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt)
|
|
result = append(result, ar)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- Log CRUD ---
|
|
|
|
// InsertLog inserts a log entry.
|
|
func (db *DB) InsertLog(l *Log) error {
|
|
if l.CreatedAt.IsZero() {
|
|
l.CreatedAt = time.Now().UTC()
|
|
}
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT INTO logs (id, level, source, entity_id, execution_id, message, metadata, created_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
l.ID, string(l.Level), l.Source, l.EntityID, l.ExecutionID,
|
|
l.Message, marshalJSON(l.Metadata), l.CreatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetLog returns a log entry by ID.
|
|
func (db *DB) GetLog(id string) (*Log, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, level, source, entity_id, execution_id, message, metadata, created_at
|
|
FROM logs WHERE id = ?`, id)
|
|
|
|
var l Log
|
|
var metadataJSON, createdAt string
|
|
err := row.Scan(&l.ID, &l.Level, &l.Source, &l.EntityID, &l.ExecutionID,
|
|
&l.Message, &metadataJSON, &createdAt)
|
|
if err == sql.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scanning log: %w", err)
|
|
}
|
|
l.Metadata = unmarshalJSON(metadataJSON)
|
|
l.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
return &l, nil
|
|
}
|
|
|
|
// ListLogs returns logs filtered by level, source, entity, and/or execution.
|
|
func (db *DB) ListLogs(level LogLevel, source, entityID, executionID string, limit int) ([]Log, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if level != "" {
|
|
where = append(where, "level = ?")
|
|
args = append(args, string(level))
|
|
}
|
|
if source != "" {
|
|
where = append(where, "source = ?")
|
|
args = append(args, source)
|
|
}
|
|
if entityID != "" {
|
|
where = append(where, "entity_id = ?")
|
|
args = append(args, entityID)
|
|
}
|
|
if executionID != "" {
|
|
where = append(where, "execution_id = ?")
|
|
args = append(args, executionID)
|
|
}
|
|
|
|
q := `SELECT id, level, source, entity_id, execution_id, message, metadata, created_at FROM logs`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY created_at DESC"
|
|
if limit > 0 {
|
|
q += fmt.Sprintf(" LIMIT %d", limit)
|
|
}
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []Log
|
|
for rows.Next() {
|
|
var l Log
|
|
var metadataJSON, createdAt string
|
|
if err := rows.Scan(&l.ID, &l.Level, &l.Source, &l.EntityID, &l.ExecutionID,
|
|
&l.Message, &metadataJSON, &createdAt); err != nil {
|
|
return nil, fmt.Errorf("scanning log: %w", err)
|
|
}
|
|
l.Metadata = unmarshalJSON(metadataJSON)
|
|
l.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
result = append(result, l)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// --- E2E Tests CRUD ---
|
|
|
|
// InsertE2ETest inserts or replaces an e2e test.
|
|
func (db *DB) InsertE2ETest(t *E2ETest) error {
|
|
now := time.Now().UTC()
|
|
if t.CreatedAt.IsZero() {
|
|
t.CreatedAt = now
|
|
}
|
|
t.UpdatedAt = now
|
|
|
|
_, err := db.conn.Exec(`
|
|
INSERT OR REPLACE INTO e2e_tests (
|
|
id, name, description, relation_id, steps, input_fixture,
|
|
expected, last_status, last_run_at, execution_id, duration_ms,
|
|
created_at, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
t.ID, t.Name, t.Description, t.RelationID,
|
|
marshalStrings(t.Steps), marshalJSON(t.InputFixture), marshalJSON(t.Expected),
|
|
string(t.LastStatus), t.LastRunAt, t.ExecutionID, t.DurationMs,
|
|
t.CreatedAt.Format(time.RFC3339), t.UpdatedAt.Format(time.RFC3339),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// GetE2ETest returns an e2e test by ID.
|
|
func (db *DB) GetE2ETest(id string) (*E2ETest, error) {
|
|
row := db.conn.QueryRow(`
|
|
SELECT id, name, description, relation_id, steps, input_fixture,
|
|
expected, last_status, last_run_at, execution_id, duration_ms,
|
|
created_at, updated_at
|
|
FROM e2e_tests WHERE id = ?`, id)
|
|
|
|
t, err := scanE2ETest(row)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("e2e test %q not found: %w", id, err)
|
|
}
|
|
return t, nil
|
|
}
|
|
|
|
// ListE2ETests returns e2e tests with optional status filter.
|
|
func (db *DB) ListE2ETests(status E2ETestStatus) ([]E2ETest, error) {
|
|
where := []string{}
|
|
args := []any{}
|
|
if status != "" {
|
|
where = append(where, "last_status = ?")
|
|
args = append(args, string(status))
|
|
}
|
|
|
|
q := `SELECT id, name, description, relation_id, steps, input_fixture,
|
|
expected, last_status, last_run_at, execution_id, duration_ms,
|
|
created_at, updated_at
|
|
FROM e2e_tests`
|
|
if len(where) > 0 {
|
|
q += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
q += " ORDER BY name"
|
|
|
|
rows, err := db.conn.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var result []E2ETest
|
|
for rows.Next() {
|
|
var t E2ETest
|
|
var stepsJSON, fixtureJSON, expectedJSON, createdAt, updatedAt string
|
|
if err := rows.Scan(&t.ID, &t.Name, &t.Description, &t.RelationID,
|
|
&stepsJSON, &fixtureJSON, &expectedJSON,
|
|
&t.LastStatus, &t.LastRunAt, &t.ExecutionID, &t.DurationMs,
|
|
&createdAt, &updatedAt); err != nil {
|
|
return nil, fmt.Errorf("scanning e2e test: %w", err)
|
|
}
|
|
t.Steps = unmarshalStrings(stepsJSON)
|
|
t.InputFixture = unmarshalJSON(fixtureJSON)
|
|
t.Expected = unmarshalJSON(expectedJSON)
|
|
t.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
t.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
result = append(result, t)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
// UpdateE2ETestResult updates the result fields after running an e2e test.
|
|
func (db *DB) UpdateE2ETestResult(id string, status E2ETestStatus, executionID string, durationMs int64) error {
|
|
now := time.Now().UTC()
|
|
_, err := db.conn.Exec(`
|
|
UPDATE e2e_tests SET last_status=?, last_run_at=?, execution_id=?, duration_ms=?, updated_at=?
|
|
WHERE id=?`,
|
|
string(status), now.Format(time.RFC3339), executionID, durationMs,
|
|
now.Format(time.RFC3339), id,
|
|
)
|
|
return err
|
|
}
|
|
|
|
// DeleteE2ETest removes an e2e test by ID.
|
|
func (db *DB) DeleteE2ETest(id string) error {
|
|
_, err := db.conn.Exec("DELETE FROM e2e_tests WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
func scanE2ETest(row *sql.Row) (*E2ETest, error) {
|
|
var t E2ETest
|
|
var stepsJSON, fixtureJSON, expectedJSON, createdAt, updatedAt string
|
|
err := row.Scan(&t.ID, &t.Name, &t.Description, &t.RelationID,
|
|
&stepsJSON, &fixtureJSON, &expectedJSON,
|
|
&t.LastStatus, &t.LastRunAt, &t.ExecutionID, &t.DurationMs,
|
|
&createdAt, &updatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t.Steps = unmarshalStrings(stepsJSON)
|
|
t.InputFixture = unmarshalJSON(fixtureJSON)
|
|
t.Expected = unmarshalJSON(expectedJSON)
|
|
t.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
|
t.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
|
|
return &t, nil
|
|
}
|