Files
fn_registry/fn_operations/store.go
T
egutierrez 169cb0853b feat: modelo Log y CRUD en fn_operations
Tipo Log con niveles debug/info/warn/error, source, entity_id y execution_id
opcionales. Migración 003_logs.sql y funciones InsertLog, GetLog, ListLogs
con filtros combinables.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-29 17:31:03 +02:00

906 lines
26 KiB
Go

package fn_operations
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
)
func marshalStrings(ss []string) string {
if ss == nil {
ss = []string{}
}
b, _ := json.Marshal(ss)
return string(b)
}
func unmarshalStrings(s string) []string {
var out []string
json.Unmarshal([]byte(s), &out)
if out == nil {
out = []string{}
}
return out
}
func marshalJSON(v map[string]any) string {
if v == nil {
v = map[string]any{}
}
b, _ := json.Marshal(v)
return string(b)
}
func unmarshalJSON(s string) map[string]any {
var out map[string]any
json.Unmarshal([]byte(s), &out)
if out == nil {
out = map[string]any{}
}
return out
}
// --- TypeSnapshot CRUD ---
// InsertTypeSnapshot inserts a type snapshot.
func (db *DB) InsertTypeSnapshot(ts *TypeSnapshot) error {
if ts.SnappedAt.IsZero() {
ts.SnappedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
INSERT OR IGNORE INTO types_snapshot (id, version, lang, algebraic, definition, description, snapped_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
ts.ID, ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
ts.SnappedAt.Format(time.RFC3339),
)
return err
}
// UpdateTypeSnapshot replaces an existing snapshot with a new version.
func (db *DB) UpdateTypeSnapshot(ts *TypeSnapshot) error {
if ts.SnappedAt.IsZero() {
ts.SnappedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
UPDATE types_snapshot SET version=?, lang=?, algebraic=?, definition=?, description=?, snapped_at=?
WHERE id=?`,
ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
ts.SnappedAt.Format(time.RFC3339), ts.ID,
)
return err
}
// GetTypeSnapshot returns a type snapshot by ID.
func (db *DB) GetTypeSnapshot(id string) (*TypeSnapshot, error) {
row := db.conn.QueryRow("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot WHERE id = ?", id)
var ts TypeSnapshot
var snappedAt string
err := row.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
return &ts, nil
}
// ListTypeSnapshots returns all type snapshots.
func (db *DB) ListTypeSnapshots() ([]TypeSnapshot, error) {
rows, err := db.conn.Query("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot ORDER BY id")
if err != nil {
return nil, err
}
defer rows.Close()
var result []TypeSnapshot
for rows.Next() {
var ts TypeSnapshot
var snappedAt string
if err := rows.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt); err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
result = append(result, ts)
}
return result, nil
}
// --- Entity CRUD ---
// InsertEntity inserts or replaces an entity.
func (db *DB) InsertEntity(e *Entity) error {
now := time.Now().UTC()
if e.CreatedAt.IsZero() {
e.CreatedAt = now
}
e.UpdatedAt = now
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.CreatedAt.Format(time.RFC3339), e.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetEntity returns an entity by ID.
func (db *DB) GetEntity(id string) (*Entity, error) {
row := db.conn.QueryRow(`
SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at
FROM entities WHERE id = ?`, id)
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
err := row.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
return &e, nil
}
// UpdateEntity updates an existing entity.
func (db *DB) UpdateEntity(e *Entity) error {
e.UpdatedAt = time.Now().UTC()
_, err := db.conn.Exec(`
UPDATE entities SET name=?, type_ref=?, status=?, description=?, domain=?, tags=?, source=?, metadata=?, notes=?, updated_at=?
WHERE id=?`,
e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.UpdatedAt.Format(time.RFC3339), e.ID,
)
return err
}
// DeleteEntity removes an entity by ID.
func (db *DB) DeleteEntity(id string) error {
_, err := db.conn.Exec("DELETE FROM entities WHERE id = ?", id)
return err
}
// ListEntities returns entities filtered by domain and/or status.
func (db *DB) ListEntities(domain string, status EntityStatus) ([]Entity, error) {
where := []string{}
args := []any{}
if domain != "" {
where = append(where, "domain = ?")
args = append(args, domain)
}
if status != "" {
where = append(where, "status = ?")
args = append(args, string(status))
}
q := "SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at FROM entities"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
// SearchEntities performs FTS search on entities.
func (db *DB) SearchEntities(query, domain string) ([]Entity, error) {
where := []string{}
args := []any{}
if query != "" {
where = append(where, "e.id IN (SELECT id FROM entities_fts WHERE entities_fts MATCH ?)")
args = append(args, query)
}
if domain != "" {
where = append(where, "e.domain = ?")
args = append(args, domain)
}
q := "SELECT e.id, e.name, e.type_ref, e.status, e.description, e.domain, e.tags, e.source, e.metadata, e.notes, e.created_at, e.updated_at FROM entities e"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY e.name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
func scanEntities(rows *sql.Rows) ([]Entity, error) {
var result []Entity
for rows.Next() {
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
if err := rows.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
result = append(result, e)
}
return result, nil
}
// --- Relation CRUD ---
// InsertRelation inserts or replaces a relation.
func (db *DB) InsertRelation(r *Relation) error {
now := time.Now().UTC()
if r.CreatedAt.IsZero() {
r.CreatedAt = now
}
r.UpdatedAt = now
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetRelation returns a relation by ID.
func (db *DB) GetRelation(id string) (*Relation, error) {
row := db.conn.QueryRow(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE id = ?`, id)
return scanRelation(row)
}
// UpdateRelation updates an existing relation.
func (db *DB) UpdateRelation(r *Relation) error {
r.UpdatedAt = time.Now().UTC()
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
UPDATE relations SET name=?, from_entity=?, to_entity=?, via=?, description=?, purity=?, direction=?, weight=?, status=?, started_at=?, ended_at=?, "order"=?, tags=?, notes=?, updated_at=?
WHERE id=?`,
r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.UpdatedAt.Format(time.RFC3339), r.ID,
)
return err
}
// DeleteRelation removes a relation by ID (cascades to relation_inputs).
func (db *DB) DeleteRelation(id string) error {
_, err := db.conn.Exec("DELETE FROM relations WHERE id = ?", id)
return err
}
// ListRelations returns all relations, optionally filtered by entity involvement.
func (db *DB) ListRelations(entityID string) ([]Relation, error) {
q := `SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at FROM relations`
var args []any
if entityID != "" {
q += " WHERE from_entity = ? OR to_entity = ?"
args = append(args, entityID, entityID)
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsFrom returns all relations where from_entity matches.
func (db *DB) GetRelationsFrom(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE from_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsTo returns all relations where to_entity matches.
func (db *DB) GetRelationsTo(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE to_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
func scanRelation(row *sql.Row) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := row.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
func scanRelationFromRows(rows *sql.Rows) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := rows.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
// --- RelationInput CRUD ---
// InsertRelationInput inserts a relation input.
func (db *DB) InsertRelationInput(ri *RelationInput) error {
_, err := db.conn.Exec(`
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
VALUES (?, ?, ?, ?, ?)`,
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
)
return err
}
// GetRelationInputs returns all inputs for a relation.
func (db *DB) GetRelationInputs(relationID string) ([]RelationInput, error) {
rows, err := db.conn.Query(`
SELECT id, relation_id, entity_id, role, "order"
FROM relation_inputs WHERE relation_id = ? ORDER BY "order"`, relationID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []RelationInput
for rows.Next() {
var ri RelationInput
if err := rows.Scan(&ri.ID, &ri.RelationID, &ri.EntityID, &ri.Role, &ri.Order); err != nil {
return nil, fmt.Errorf("scanning relation_input: %w", err)
}
result = append(result, ri)
}
return result, nil
}
// DeleteRelationInputs removes all inputs for a relation.
func (db *DB) DeleteRelationInputs(relationID string) error {
_, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID)
return err
}
// --- Execution CRUD ---
// InsertExecution inserts an execution record.
func (db *DB) InsertExecution(e *Execution) error {
if e.CreatedAt.IsZero() {
e.CreatedAt = time.Now().UTC()
}
var endedAt *string
if e.EndedAt != nil {
s := e.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO executions (
id, pipeline_id, relation_id, status, started_at, ended_at,
duration_ms, records_in, records_out, error, metrics, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.PipelineID, e.RelationID, string(e.Status),
e.StartedAt.Format(time.RFC3339), endedAt,
e.DurationMs, e.RecordsIn, e.RecordsOut, e.Error,
marshalJSON(e.Metrics), e.CreatedAt.Format(time.RFC3339),
)
return err
}
// GetExecution returns an execution by ID.
func (db *DB) GetExecution(id string) (*Execution, error) {
row := db.conn.QueryRow(`
SELECT id, pipeline_id, relation_id, status, started_at, ended_at,
duration_ms, records_in, records_out, error, metrics, created_at
FROM executions WHERE id = ?`, id)
var e Execution
var metricsJSON, createdAt, startedAt string
var endedAt *string
err := row.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status,
&startedAt, &endedAt,
&e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error,
&metricsJSON, &createdAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning execution: %w", err)
}
e.Metrics = unmarshalJSON(metricsJSON)
e.StartedAt, _ = time.Parse(time.RFC3339, startedAt)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
e.EndedAt = &t
}
return &e, nil
}
// ListExecutions returns executions filtered by pipeline, relation, and/or status.
func (db *DB) ListExecutions(pipelineID, relationID string, status ExecutionStatus) ([]Execution, error) {
where := []string{}
args := []any{}
if pipelineID != "" {
where = append(where, "pipeline_id = ?")
args = append(args, pipelineID)
}
if relationID != "" {
where = append(where, "relation_id = ?")
args = append(args, relationID)
}
if status != "" {
where = append(where, "status = ?")
args = append(args, string(status))
}
q := `SELECT id, pipeline_id, relation_id, status, started_at, ended_at,
duration_ms, records_in, records_out, error, metrics, created_at
FROM executions`
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY created_at DESC"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanExecutions(rows)
}
func scanExecutions(rows *sql.Rows) ([]Execution, error) {
var result []Execution
for rows.Next() {
var e Execution
var metricsJSON, createdAt, startedAt string
var endedAt *string
if err := rows.Scan(&e.ID, &e.PipelineID, &e.RelationID, &e.Status,
&startedAt, &endedAt,
&e.DurationMs, &e.RecordsIn, &e.RecordsOut, &e.Error,
&metricsJSON, &createdAt); err != nil {
return nil, fmt.Errorf("scanning execution: %w", err)
}
e.Metrics = unmarshalJSON(metricsJSON)
e.StartedAt, _ = time.Parse(time.RFC3339, startedAt)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
e.EndedAt = &t
}
result = append(result, e)
}
return result, nil
}
// --- Assertion CRUD ---
// InsertAssertion inserts or replaces an assertion.
func (db *DB) InsertAssertion(a *Assertion) error {
if a.CreatedAt.IsZero() {
a.CreatedAt = time.Now().UTC()
}
active := 0
if a.Active {
active = 1
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO assertions (
id, entity_id, name, kind, rule, severity, description, active, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
a.ID, a.EntityID, a.Name, a.Kind, a.Rule,
string(a.Severity), a.Description, active,
a.CreatedAt.Format(time.RFC3339),
)
return err
}
// GetAssertion returns an assertion by ID.
func (db *DB) GetAssertion(id string) (*Assertion, error) {
row := db.conn.QueryRow(`
SELECT id, entity_id, name, kind, rule, severity, description, active, created_at
FROM assertions WHERE id = ?`, id)
var a Assertion
var active int
var createdAt string
err := row.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule,
&a.Severity, &a.Description, &active, &createdAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning assertion: %w", err)
}
a.Active = active == 1
a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
return &a, nil
}
// UpdateAssertion updates an existing assertion.
func (db *DB) UpdateAssertion(a *Assertion) error {
active := 0
if a.Active {
active = 1
}
_, err := db.conn.Exec(`
UPDATE assertions SET entity_id=?, name=?, kind=?, rule=?, severity=?,
description=?, active=?
WHERE id=?`,
a.EntityID, a.Name, a.Kind, a.Rule, string(a.Severity),
a.Description, active, a.ID,
)
return err
}
// DeleteAssertion removes an assertion by ID.
func (db *DB) DeleteAssertion(id string) error {
_, err := db.conn.Exec("DELETE FROM assertions WHERE id = ?", id)
return err
}
// ListAssertions returns assertions filtered by entity and/or active state.
func (db *DB) ListAssertions(entityID string, active *bool) ([]Assertion, error) {
where := []string{}
args := []any{}
if entityID != "" {
where = append(where, "entity_id = ?")
args = append(args, entityID)
}
if active != nil {
v := 0
if *active {
v = 1
}
where = append(where, "active = ?")
args = append(args, v)
}
q := `SELECT id, entity_id, name, kind, rule, severity, description, active, created_at
FROM assertions`
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanAssertions(rows)
}
// SearchAssertions performs FTS search on assertions.
func (db *DB) SearchAssertions(query, entityID string) ([]Assertion, error) {
where := []string{}
args := []any{}
if query != "" {
where = append(where, "a.id IN (SELECT id FROM assertions_fts WHERE assertions_fts MATCH ?)")
args = append(args, query)
}
if entityID != "" {
where = append(where, "a.entity_id = ?")
args = append(args, entityID)
}
q := `SELECT a.id, a.entity_id, a.name, a.kind, a.rule, a.severity, a.description, a.active, a.created_at
FROM assertions a`
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY a.name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanAssertions(rows)
}
func scanAssertions(rows *sql.Rows) ([]Assertion, error) {
var result []Assertion
for rows.Next() {
var a Assertion
var active int
var createdAt string
if err := rows.Scan(&a.ID, &a.EntityID, &a.Name, &a.Kind, &a.Rule,
&a.Severity, &a.Description, &active, &createdAt); err != nil {
return nil, fmt.Errorf("scanning assertion: %w", err)
}
a.Active = active == 1
a.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
result = append(result, a)
}
return result, nil
}
// --- AssertionResult CRUD ---
// InsertAssertionResult inserts an assertion result.
func (db *DB) InsertAssertionResult(ar *AssertionResult) error {
if ar.EvaluatedAt.IsZero() {
ar.EvaluatedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO assertion_results (
id, assertion_id, execution_id, status, value, message, evaluated_at
) VALUES (?, ?, ?, ?, ?, ?, ?)`,
ar.ID, ar.AssertionID, ar.ExecutionID, string(ar.Status),
marshalJSON(ar.Value), ar.Message, ar.EvaluatedAt.Format(time.RFC3339),
)
return err
}
// GetAssertionResult returns an assertion result by ID.
func (db *DB) GetAssertionResult(id string) (*AssertionResult, error) {
row := db.conn.QueryRow(`
SELECT id, assertion_id, execution_id, status, value, message, evaluated_at
FROM assertion_results WHERE id = ?`, id)
var ar AssertionResult
var valueJSON, evaluatedAt string
err := row.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status,
&valueJSON, &ar.Message, &evaluatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning assertion_result: %w", err)
}
ar.Value = unmarshalJSON(valueJSON)
ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt)
return &ar, nil
}
// ListAssertionResults returns results filtered by assertion and/or execution.
func (db *DB) ListAssertionResults(assertionID, executionID string) ([]AssertionResult, error) {
where := []string{}
args := []any{}
if assertionID != "" {
where = append(where, "assertion_id = ?")
args = append(args, assertionID)
}
if executionID != "" {
where = append(where, "execution_id = ?")
args = append(args, executionID)
}
q := `SELECT id, assertion_id, execution_id, status, value, message, evaluated_at
FROM assertion_results`
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY evaluated_at DESC"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanAssertionResults(rows)
}
func scanAssertionResults(rows *sql.Rows) ([]AssertionResult, error) {
var result []AssertionResult
for rows.Next() {
var ar AssertionResult
var valueJSON, evaluatedAt string
if err := rows.Scan(&ar.ID, &ar.AssertionID, &ar.ExecutionID, &ar.Status,
&valueJSON, &ar.Message, &evaluatedAt); err != nil {
return nil, fmt.Errorf("scanning assertion_result: %w", err)
}
ar.Value = unmarshalJSON(valueJSON)
ar.EvaluatedAt, _ = time.Parse(time.RFC3339, evaluatedAt)
result = append(result, ar)
}
return result, nil
}
// --- Log CRUD ---
// InsertLog inserts a log entry.
func (db *DB) InsertLog(l *Log) error {
if l.CreatedAt.IsZero() {
l.CreatedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
INSERT INTO logs (id, level, source, entity_id, execution_id, message, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)`,
l.ID, string(l.Level), l.Source, l.EntityID, l.ExecutionID,
l.Message, marshalJSON(l.Metadata), l.CreatedAt.Format(time.RFC3339),
)
return err
}
// GetLog returns a log entry by ID.
func (db *DB) GetLog(id string) (*Log, error) {
row := db.conn.QueryRow(`
SELECT id, level, source, entity_id, execution_id, message, metadata, created_at
FROM logs WHERE id = ?`, id)
var l Log
var metadataJSON, createdAt string
err := row.Scan(&l.ID, &l.Level, &l.Source, &l.EntityID, &l.ExecutionID,
&l.Message, &metadataJSON, &createdAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning log: %w", err)
}
l.Metadata = unmarshalJSON(metadataJSON)
l.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
return &l, nil
}
// ListLogs returns logs filtered by level, source, entity, and/or execution.
func (db *DB) ListLogs(level LogLevel, source, entityID, executionID string, limit int) ([]Log, error) {
where := []string{}
args := []any{}
if level != "" {
where = append(where, "level = ?")
args = append(args, string(level))
}
if source != "" {
where = append(where, "source = ?")
args = append(args, source)
}
if entityID != "" {
where = append(where, "entity_id = ?")
args = append(args, entityID)
}
if executionID != "" {
where = append(where, "execution_id = ?")
args = append(args, executionID)
}
q := `SELECT id, level, source, entity_id, execution_id, message, metadata, created_at FROM logs`
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY created_at DESC"
if limit > 0 {
q += fmt.Sprintf(" LIMIT %d", limit)
}
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Log
for rows.Next() {
var l Log
var metadataJSON, createdAt string
if err := rows.Scan(&l.ID, &l.Level, &l.Source, &l.EntityID, &l.ExecutionID,
&l.Message, &metadataJSON, &createdAt); err != nil {
return nil, fmt.Errorf("scanning log: %w", err)
}
l.Metadata = unmarshalJSON(metadataJSON)
l.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
result = append(result, l)
}
return result, nil
}