5e6a974a5d
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
3.7 KiB
3.7 KiB
id, title, status, type, domain, scope, priority, depends, blocks, related, created, updated, tags
| id | title | status | type | domain | scope | priority | depends | blocks | related | created | updated | tags |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0007c | Execution store: persistencia de estado | completado | feature | multi-app | alta | 2026-05-17 | 2026-05-17 |
0007c — Execution store: persistencia de estado
Metadata
| Campo | Valor |
|---|---|
| ID | 0007c |
| Estado | pendiente |
| Prioridad | alta |
| Tipo | feature |
Dependencias
| ID | Título | Estado | Requerido |
|---|---|---|---|
| 0007a | Funciones core del DAG engine | pendiente | Si |
Bloqueada por: #0007a
Desbloquea: #0007e
Objetivo
Funciones para persistir el estado de ejecuciones de DAGs en SQLite: que DAG se ejecuto, cuando, que steps corrieron, resultado de cada step, logs. Permite historial, reintentos y debugging.
Contexto
- Cada ejecucion de un DAG genera un
runcon multiplesstep_results - Similar a
operations.dbpero especifico para el DAG engine - La BD vive en el directorio de la app del scheduler (no en raiz)
- Debe soportar consultas tipo: "ultimas 10 ejecuciones de X", "steps fallidos"
Arquitectura
functions/infra/
├── dag_store_init.go — NEW: crea schema SQLite para runs/steps
├── dag_store_init.md
├── dag_store_run.go — NEW: CRUD de runs (create, update status, list, get)
├── dag_store_run.md
├── dag_store_step.go — NEW: CRUD de step results dentro de un run
├── dag_store_step.md
types/infra/
├── dag_run.md — NEW: id, dag_name, status, started_at, finished_at, trigger
├── dag_step_result.md — NEW: run_id, step_name, status, exit_code, stdout, stderr, duration_ms
Patron pure core / impure shell
infra/— Todas impuras (SQLite I/O)- Schema sencillo:
dag_runs+dag_step_results
Tareas
Fase 1: Tipos y schema
- 1.1 Definir
DagRun— id (ULID), dag_name, dag_path, status (pending/running/success/failed/cancelled), started_at, finished_at, trigger (manual/schedule/api) - 1.2 Definir
DagStepResult— id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms - 1.3 Schema SQLite:
dag_runs,dag_step_resultscon indices
Fase 2: Funciones
- 2.1
dag_store_init— crea/migra la BD SQLite - 2.2
dag_store_run— create_run, update_run_status, get_run, list_runs (con filtros) - 2.3
dag_store_step— insert_step_result, list_steps_for_run
Fase 3: Tests
- 3.1 Ciclo completo: init → create run → insert steps → update status → query
- 3.2 Queries: ultimas N ejecuciones, ejecuciones fallidas, steps de un run
Fase 4: Cleanup
fn indexy verificar IDs
Ejemplo de uso
db := dag_store_init("/path/to/scheduler.db")
run := dag_store_create_run(db, "my_pipeline", "manual")
// run.ID = "01HXZ..."
dag_store_insert_step(db, run.ID, DagStepResult{
StepName: "fetch_data",
Status: "success",
ExitCode: 0,
Stdout: "fetched 1000 rows",
DurationMs: 1234,
})
dag_store_update_status(db, run.ID, "success")
// Consultar
runs := dag_store_list_runs(db, "my_pipeline", 10) // ultimas 10
Decisiones de diseno
- SQLite por DAG engine, no por DAG: una sola BD para todas las ejecuciones, no una por cada DAG
- ULID para run IDs: ordenables por tiempo, unicos sin coordinacion
- stdout/stderr en BD: para steps cortos. Para output grande, guardar path a archivo de log
Criterios de aceptacion
- Schema se crea correctamente
- CRUD completo funciona
- Queries con filtros funcionan
- Tests pasan
- Indexado en registry.db