Files
fn_registry/fn_operations/store.go
T
egutierrez 67401cb967 feat: fn_operations library — entities, relations, types_snapshot con FTS y ciclos
Paquete Go completo con modelos (Entity, Relation, RelationInput, TypeSnapshot),
DB SQLite con WAL + FTS5 en entities, CRUD para las 4 tablas, validacion de
integridad, deteccion de ciclos solo en relaciones causales (via != ''), y
operaciones de alto nivel con snapshot automatico de tipos del registry.
9 tests, todos pasan.
2026-03-28 04:37:50 +01:00

460 lines
14 KiB
Go

package fn_operations
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
)
func marshalStrings(ss []string) string {
if ss == nil {
ss = []string{}
}
b, _ := json.Marshal(ss)
return string(b)
}
func unmarshalStrings(s string) []string {
var out []string
json.Unmarshal([]byte(s), &out)
if out == nil {
out = []string{}
}
return out
}
func marshalJSON(v map[string]any) string {
if v == nil {
v = map[string]any{}
}
b, _ := json.Marshal(v)
return string(b)
}
func unmarshalJSON(s string) map[string]any {
var out map[string]any
json.Unmarshal([]byte(s), &out)
if out == nil {
out = map[string]any{}
}
return out
}
// --- TypeSnapshot CRUD ---
// InsertTypeSnapshot inserts a type snapshot.
func (db *DB) InsertTypeSnapshot(ts *TypeSnapshot) error {
if ts.SnappedAt.IsZero() {
ts.SnappedAt = time.Now().UTC()
}
_, err := db.conn.Exec(`
INSERT OR IGNORE INTO types_snapshot (id, version, lang, algebraic, definition, description, snapped_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
ts.ID, ts.Version, ts.Lang, ts.Algebraic, ts.Definition, ts.Description,
ts.SnappedAt.Format(time.RFC3339),
)
return err
}
// GetTypeSnapshot returns a type snapshot by ID.
func (db *DB) GetTypeSnapshot(id string) (*TypeSnapshot, error) {
row := db.conn.QueryRow("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot WHERE id = ?", id)
var ts TypeSnapshot
var snappedAt string
err := row.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
return &ts, nil
}
// ListTypeSnapshots returns all type snapshots.
func (db *DB) ListTypeSnapshots() ([]TypeSnapshot, error) {
rows, err := db.conn.Query("SELECT id, version, lang, algebraic, definition, description, snapped_at FROM types_snapshot ORDER BY id")
if err != nil {
return nil, err
}
defer rows.Close()
var result []TypeSnapshot
for rows.Next() {
var ts TypeSnapshot
var snappedAt string
if err := rows.Scan(&ts.ID, &ts.Version, &ts.Lang, &ts.Algebraic, &ts.Definition, &ts.Description, &snappedAt); err != nil {
return nil, fmt.Errorf("scanning type_snapshot: %w", err)
}
ts.SnappedAt, _ = time.Parse(time.RFC3339, snappedAt)
result = append(result, ts)
}
return result, nil
}
// --- Entity CRUD ---
// InsertEntity inserts or replaces an entity.
func (db *DB) InsertEntity(e *Entity) error {
now := time.Now().UTC()
if e.CreatedAt.IsZero() {
e.CreatedAt = now
}
e.UpdatedAt = now
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO entities (id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
e.ID, e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.CreatedAt.Format(time.RFC3339), e.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetEntity returns an entity by ID.
func (db *DB) GetEntity(id string) (*Entity, error) {
row := db.conn.QueryRow(`
SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at
FROM entities WHERE id = ?`, id)
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
err := row.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
return &e, nil
}
// UpdateEntity updates an existing entity.
func (db *DB) UpdateEntity(e *Entity) error {
e.UpdatedAt = time.Now().UTC()
_, err := db.conn.Exec(`
UPDATE entities SET name=?, type_ref=?, status=?, description=?, domain=?, tags=?, source=?, metadata=?, notes=?, updated_at=?
WHERE id=?`,
e.Name, e.TypeRef, string(e.Status), e.Description, e.Domain,
marshalStrings(e.Tags), e.Source, marshalJSON(e.Metadata), e.Notes,
e.UpdatedAt.Format(time.RFC3339), e.ID,
)
return err
}
// DeleteEntity removes an entity by ID.
func (db *DB) DeleteEntity(id string) error {
_, err := db.conn.Exec("DELETE FROM entities WHERE id = ?", id)
return err
}
// ListEntities returns entities filtered by domain and/or status.
func (db *DB) ListEntities(domain string, status EntityStatus) ([]Entity, error) {
where := []string{}
args := []any{}
if domain != "" {
where = append(where, "domain = ?")
args = append(args, domain)
}
if status != "" {
where = append(where, "status = ?")
args = append(args, string(status))
}
q := "SELECT id, name, type_ref, status, description, domain, tags, source, metadata, notes, created_at, updated_at FROM entities"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
// SearchEntities performs FTS search on entities.
func (db *DB) SearchEntities(query, domain string) ([]Entity, error) {
where := []string{}
args := []any{}
if query != "" {
where = append(where, "e.id IN (SELECT id FROM entities_fts WHERE entities_fts MATCH ?)")
args = append(args, query)
}
if domain != "" {
where = append(where, "e.domain = ?")
args = append(args, domain)
}
q := "SELECT e.id, e.name, e.type_ref, e.status, e.description, e.domain, e.tags, e.source, e.metadata, e.notes, e.created_at, e.updated_at FROM entities e"
if len(where) > 0 {
q += " WHERE " + strings.Join(where, " AND ")
}
q += " ORDER BY e.name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanEntities(rows)
}
func scanEntities(rows *sql.Rows) ([]Entity, error) {
var result []Entity
for rows.Next() {
var e Entity
var tagsJSON, metadataJSON, createdAt, updatedAt string
if err := rows.Scan(&e.ID, &e.Name, &e.TypeRef, &e.Status, &e.Description, &e.Domain,
&tagsJSON, &e.Source, &metadataJSON, &e.Notes, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("scanning entity: %w", err)
}
e.Tags = unmarshalStrings(tagsJSON)
e.Metadata = unmarshalJSON(metadataJSON)
e.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
e.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
result = append(result, e)
}
return result, nil
}
// --- Relation CRUD ---
// InsertRelation inserts or replaces a relation.
func (db *DB) InsertRelation(r *Relation) error {
now := time.Now().UTC()
if r.CreatedAt.IsZero() {
r.CreatedAt = now
}
r.UpdatedAt = now
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
INSERT OR REPLACE INTO relations (id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
r.ID, r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.CreatedAt.Format(time.RFC3339), r.UpdatedAt.Format(time.RFC3339),
)
return err
}
// GetRelation returns a relation by ID.
func (db *DB) GetRelation(id string) (*Relation, error) {
row := db.conn.QueryRow(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE id = ?`, id)
return scanRelation(row)
}
// UpdateRelation updates an existing relation.
func (db *DB) UpdateRelation(r *Relation) error {
r.UpdatedAt = time.Now().UTC()
var startedAt, endedAt *string
if r.StartedAt != nil {
s := r.StartedAt.Format(time.RFC3339)
startedAt = &s
}
if r.EndedAt != nil {
s := r.EndedAt.Format(time.RFC3339)
endedAt = &s
}
_, err := db.conn.Exec(`
UPDATE relations SET name=?, from_entity=?, to_entity=?, via=?, description=?, purity=?, direction=?, weight=?, status=?, started_at=?, ended_at=?, "order"=?, tags=?, notes=?, updated_at=?
WHERE id=?`,
r.Name, r.FromEntity, r.ToEntity, r.Via, r.Description,
r.Purity, string(r.Direction), r.Weight, string(r.Status),
startedAt, endedAt, r.Order, marshalStrings(r.Tags), r.Notes,
r.UpdatedAt.Format(time.RFC3339), r.ID,
)
return err
}
// DeleteRelation removes a relation by ID (cascades to relation_inputs).
func (db *DB) DeleteRelation(id string) error {
_, err := db.conn.Exec("DELETE FROM relations WHERE id = ?", id)
return err
}
// ListRelations returns all relations, optionally filtered by entity involvement.
func (db *DB) ListRelations(entityID string) ([]Relation, error) {
q := `SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at FROM relations`
var args []any
if entityID != "" {
q += " WHERE from_entity = ? OR to_entity = ?"
args = append(args, entityID, entityID)
}
q += " ORDER BY name"
rows, err := db.conn.Query(q, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsFrom returns all relations where from_entity matches.
func (db *DB) GetRelationsFrom(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE from_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
// GetRelationsTo returns all relations where to_entity matches.
func (db *DB) GetRelationsTo(entityID string) ([]Relation, error) {
rows, err := db.conn.Query(`
SELECT id, name, from_entity, to_entity, via, description, purity, direction, weight, status, started_at, ended_at, "order", tags, notes, created_at, updated_at
FROM relations WHERE to_entity = ? ORDER BY name`, entityID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []Relation
for rows.Next() {
r, err := scanRelationFromRows(rows)
if err != nil {
return nil, err
}
result = append(result, *r)
}
return result, nil
}
func scanRelation(row *sql.Row) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := row.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
func scanRelationFromRows(rows *sql.Rows) (*Relation, error) {
var r Relation
var tagsJSON, createdAt, updatedAt string
var startedAt, endedAt *string
err := rows.Scan(&r.ID, &r.Name, &r.FromEntity, &r.ToEntity, &r.Via, &r.Description,
&r.Purity, &r.Direction, &r.Weight, &r.Status,
&startedAt, &endedAt, &r.Order, &tagsJSON, &r.Notes, &createdAt, &updatedAt)
if err != nil {
return nil, fmt.Errorf("scanning relation: %w", err)
}
r.Tags = unmarshalStrings(tagsJSON)
r.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
r.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
if startedAt != nil {
t, _ := time.Parse(time.RFC3339, *startedAt)
r.StartedAt = &t
}
if endedAt != nil {
t, _ := time.Parse(time.RFC3339, *endedAt)
r.EndedAt = &t
}
return &r, nil
}
// --- RelationInput CRUD ---
// InsertRelationInput inserts a relation input.
func (db *DB) InsertRelationInput(ri *RelationInput) error {
_, err := db.conn.Exec(`
INSERT INTO relation_inputs (id, relation_id, entity_id, role, "order")
VALUES (?, ?, ?, ?, ?)`,
ri.ID, ri.RelationID, ri.EntityID, ri.Role, ri.Order,
)
return err
}
// GetRelationInputs returns all inputs for a relation.
func (db *DB) GetRelationInputs(relationID string) ([]RelationInput, error) {
rows, err := db.conn.Query(`
SELECT id, relation_id, entity_id, role, "order"
FROM relation_inputs WHERE relation_id = ? ORDER BY "order"`, relationID)
if err != nil {
return nil, err
}
defer rows.Close()
var result []RelationInput
for rows.Next() {
var ri RelationInput
if err := rows.Scan(&ri.ID, &ri.RelationID, &ri.EntityID, &ri.Role, &ri.Order); err != nil {
return nil, fmt.Errorf("scanning relation_input: %w", err)
}
result = append(result, ri)
}
return result, nil
}
// DeleteRelationInputs removes all inputs for a relation.
func (db *DB) DeleteRelationInputs(relationID string) error {
_, err := db.conn.Exec("DELETE FROM relation_inputs WHERE relation_id = ?", relationID)
return err
}