Files
fn_registry/dev/issues/completed/0007c-execution-store.md

3.7 KiB

id, title, status, type, domain, scope, priority, depends, blocks, related, created, updated, tags
id title status type domain scope priority depends blocks related created updated tags
0007c Execution store: persistencia de estado completado feature
multi-app alta
2026-05-17 2026-05-17

0007c — Execution store: persistencia de estado

Metadata

Campo Valor
ID 0007c
Estado pendiente
Prioridad alta
Tipo feature

Dependencias

ID Título Estado Requerido
0007a Funciones core del DAG engine pendiente Si

Bloqueada por: #0007a

Desbloquea: #0007e


Objetivo

Funciones para persistir el estado de ejecuciones de DAGs en SQLite: que DAG se ejecuto, cuando, que steps corrieron, resultado de cada step, logs. Permite historial, reintentos y debugging.

Contexto

  • Cada ejecucion de un DAG genera un run con multiples step_results
  • Similar a operations.db pero especifico para el DAG engine
  • La BD vive en el directorio de la app del scheduler (no en raiz)
  • Debe soportar consultas tipo: "ultimas 10 ejecuciones de X", "steps fallidos"

Arquitectura

functions/infra/
├── dag_store_init.go       — NEW: crea schema SQLite para runs/steps
├── dag_store_init.md
├── dag_store_run.go        — NEW: CRUD de runs (create, update status, list, get)
├── dag_store_run.md
├── dag_store_step.go       — NEW: CRUD de step results dentro de un run
├── dag_store_step.md

types/infra/
├── dag_run.md              — NEW: id, dag_name, status, started_at, finished_at, trigger
├── dag_step_result.md      — NEW: run_id, step_name, status, exit_code, stdout, stderr, duration_ms

Patron pure core / impure shell

  • infra/ — Todas impuras (SQLite I/O)
  • Schema sencillo: dag_runs + dag_step_results

Tareas

Fase 1: Tipos y schema

  • 1.1 Definir DagRun — id (ULID), dag_name, dag_path, status (pending/running/success/failed/cancelled), started_at, finished_at, trigger (manual/schedule/api)
  • 1.2 Definir DagStepResult — id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms
  • 1.3 Schema SQLite: dag_runs, dag_step_results con indices

Fase 2: Funciones

  • 2.1 dag_store_init — crea/migra la BD SQLite
  • 2.2 dag_store_run — create_run, update_run_status, get_run, list_runs (con filtros)
  • 2.3 dag_store_step — insert_step_result, list_steps_for_run

Fase 3: Tests

  • 3.1 Ciclo completo: init → create run → insert steps → update status → query
  • 3.2 Queries: ultimas N ejecuciones, ejecuciones fallidas, steps de un run

Fase 4: Cleanup

  • fn index y verificar IDs

Ejemplo de uso

db := dag_store_init("/path/to/scheduler.db")

run := dag_store_create_run(db, "my_pipeline", "manual")
// run.ID = "01HXZ..."

dag_store_insert_step(db, run.ID, DagStepResult{
    StepName:   "fetch_data",
    Status:     "success",
    ExitCode:   0,
    Stdout:     "fetched 1000 rows",
    DurationMs: 1234,
})

dag_store_update_status(db, run.ID, "success")

// Consultar
runs := dag_store_list_runs(db, "my_pipeline", 10) // ultimas 10

Decisiones de diseno

  • SQLite por DAG engine, no por DAG: una sola BD para todas las ejecuciones, no una por cada DAG
  • ULID para run IDs: ordenables por tiempo, unicos sin coordinacion
  • stdout/stderr en BD: para steps cortos. Para output grande, guardar path a archivo de log

Criterios de aceptacion

  • Schema se crea correctamente
  • CRUD completo funciona
  • Queries con filtros funcionan
  • Tests pasan
  • Indexado en registry.db