--- id: "0007c" title: "Execution store: persistencia de estado" status: completado type: feature domain: [] scope: multi-app priority: alta depends: [] blocks: [] related: [] created: 2026-05-17 updated: 2026-05-17 tags: [] --- # 0007c — Execution store: persistencia de estado ## Metadata | Campo | Valor | |-------|-------| | **ID** | 0007c | | **Estado** | pendiente | | **Prioridad** | alta | | **Tipo** | feature | ## Dependencias | ID | Título | Estado | Requerido | |----|--------|--------|-----------| | 0007a | Funciones core del DAG engine | pendiente | Si | **Bloqueada por:** `#0007a` **Desbloquea:** `#0007e` --- ## Objetivo Funciones para persistir el estado de ejecuciones de DAGs en SQLite: que DAG se ejecuto, cuando, que steps corrieron, resultado de cada step, logs. Permite historial, reintentos y debugging. ## Contexto - Cada ejecucion de un DAG genera un `run` con multiples `step_results` - Similar a `operations.db` pero especifico para el DAG engine - La BD vive en el directorio de la app del scheduler (no en raiz) - Debe soportar consultas tipo: "ultimas 10 ejecuciones de X", "steps fallidos" ## Arquitectura ``` functions/infra/ ├── dag_store_init.go — NEW: crea schema SQLite para runs/steps ├── dag_store_init.md ├── dag_store_run.go — NEW: CRUD de runs (create, update status, list, get) ├── dag_store_run.md ├── dag_store_step.go — NEW: CRUD de step results dentro de un run ├── dag_store_step.md types/infra/ ├── dag_run.md — NEW: id, dag_name, status, started_at, finished_at, trigger ├── dag_step_result.md — NEW: run_id, step_name, status, exit_code, stdout, stderr, duration_ms ``` ### Patron pure core / impure shell - `infra/` — Todas impuras (SQLite I/O) - Schema sencillo: `dag_runs` + `dag_step_results` ## Tareas ### Fase 1: Tipos y schema - [ ] **1.1** Definir `DagRun` — id (ULID), dag_name, dag_path, status (pending/running/success/failed/cancelled), started_at, finished_at, trigger (manual/schedule/api) - [ ] **1.2** Definir `DagStepResult` — id, run_id, step_name, status, exit_code, stdout, stderr, started_at, finished_at, duration_ms - [ ] **1.3** Schema SQLite: `dag_runs`, `dag_step_results` con indices ### Fase 2: Funciones - [ ] **2.1** `dag_store_init` — crea/migra la BD SQLite - [ ] **2.2** `dag_store_run` — create_run, update_run_status, get_run, list_runs (con filtros) - [ ] **2.3** `dag_store_step` — insert_step_result, list_steps_for_run ### Fase 3: Tests - [ ] **3.1** Ciclo completo: init → create run → insert steps → update status → query - [ ] **3.2** Queries: ultimas N ejecuciones, ejecuciones fallidas, steps de un run ### Fase 4: Cleanup - [ ] `fn index` y verificar IDs --- ## Ejemplo de uso ```go db := dag_store_init("/path/to/scheduler.db") run := dag_store_create_run(db, "my_pipeline", "manual") // run.ID = "01HXZ..." dag_store_insert_step(db, run.ID, DagStepResult{ StepName: "fetch_data", Status: "success", ExitCode: 0, Stdout: "fetched 1000 rows", DurationMs: 1234, }) dag_store_update_status(db, run.ID, "success") // Consultar runs := dag_store_list_runs(db, "my_pipeline", 10) // ultimas 10 ``` ## Decisiones de diseno - **SQLite por DAG engine, no por DAG**: una sola BD para todas las ejecuciones, no una por cada DAG - **ULID para run IDs**: ordenables por tiempo, unicos sin coordinacion - **stdout/stderr en BD**: para steps cortos. Para output grande, guardar path a archivo de log ## Criterios de aceptacion - [ ] Schema se crea correctamente - [ ] CRUD completo funciona - [ ] Queries con filtros funcionan - [ ] Tests pasan - [ ] Indexado en registry.db