feat: add process manager and execution store types (0007b, 0007c)

Process spawn/wait/kill functions for subprocess management with output
capture, timeout, and process group cleanup. DagRun and DagStepResult
types for SQLite execution persistence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-12 13:05:13 +02:00
parent 3136eb862f
commit 741fdcee24
13 changed files with 579 additions and 0 deletions
+30
View File
@@ -0,0 +1,30 @@
package infra
import "time"
// DagRun represents one execution of a DAG workflow.
type DagRun struct {
ID string
DagName string
DagPath string
Status string // pending, running, success, failed, cancelled
StartedAt time.Time
FinishedAt time.Time
Trigger string // manual, cron, api
Error string
}
// DagStepResult represents the outcome of one step within a DagRun.
type DagStepResult struct {
ID string
RunID string
StepName string
Status string // pending, running, success, failed, skipped
ExitCode int
Stdout string
Stderr string
StartedAt time.Time
FinishedAt time.Time
DurationMs int64
Error string
}
+26
View File
@@ -0,0 +1,26 @@
package infra
import (
"bytes"
"os/exec"
"time"
)
// ProcessHandle represents a running subprocess with output buffers.
type ProcessHandle struct {
Cmd *exec.Cmd
Pid int
StartTime time.Time
Dir string
stdout *bytes.Buffer
stderr *bytes.Buffer
}
// ProcessResult contains the outcome of a completed subprocess.
type ProcessResult struct {
ExitCode int
Stdout string
Stderr string
DurationMs int64
Killed bool
}
+42
View File
@@ -0,0 +1,42 @@
package infra
import (
"fmt"
"syscall"
"time"
)
// ProcessKill sends SIGTERM to the process group of handle, then waits up to
// graceSec seconds for the process to exit. If it is still alive after the
// grace period, SIGKILL is sent. Returns an error only if the signal could not
// be delivered (e.g. the process group does not exist).
func ProcessKill(handle *ProcessHandle, graceSec int) error {
// Send SIGTERM to the process group (negative pid targets the group).
if err := syscall.Kill(-handle.Pid, syscall.SIGTERM); err != nil {
// ESRCH means the process is already gone — not an error from our view.
if err != syscall.ESRCH {
return fmt.Errorf("process_kill: sigterm: %w", err)
}
return nil
}
// Poll until the process exits or the grace period expires.
deadline := time.Now().Add(time.Duration(graceSec) * time.Second)
for time.Now().Before(deadline) {
// Check if process has exited by sending signal 0 (no-op).
err := syscall.Kill(-handle.Pid, 0)
if err == syscall.ESRCH {
// Process group is gone.
return nil
}
time.Sleep(100 * time.Millisecond)
}
// Still alive after grace period — escalate to SIGKILL.
if err := syscall.Kill(-handle.Pid, syscall.SIGKILL); err != nil {
if err != syscall.ESRCH {
return fmt.Errorf("process_kill: sigkill: %w", err)
}
}
return nil
}
+45
View File
@@ -0,0 +1,45 @@
---
name: process_kill
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ProcessKill(handle *ProcessHandle, graceSec int) error"
description: "Termina un subproceso enviando SIGTERM al process group. Espera hasta graceSec segundos a que el proceso muera voluntariamente. Si sigue vivo, envia SIGKILL. Retorna error solo si la senal no pudo entregarse."
tags: [process, subprocess, kill, signal, sigterm, sigkill, infra]
uses_functions: []
uses_types: [process_handle_go_infra]
returns: []
returns_optional: false
error_type: "error_go_core"
imports: [fmt, syscall, time]
params:
- name: handle
desc: "handle del proceso lanzado por ProcessSpawn"
- name: graceSec
desc: "segundos de gracia entre SIGTERM y SIGKILL; 0 envia SIGKILL inmediatamente"
output: "nil si el proceso fue terminado correctamente; error si la senal no pudo entregarse"
tested: true
tests:
- "kill process"
test_file_path: "functions/infra/process_spawn_test.go"
file_path: "functions/infra/process_kill.go"
---
## Ejemplo
```go
h, err := ProcessSpawn("sleep 300", "", nil, "")
if err != nil {
log.Fatal(err)
}
// Dar 3 segundos de gracia antes de SIGKILL
if err := ProcessKill(h, 3); err != nil {
log.Printf("kill failed: %v", err)
}
```
## Notas
Funcion impura: envia senales al sistema operativo. Usa -handle.Pid (negativo) para direccionar el process group completo, matando tanto al proceso principal como a sus hijos. ESRCH se ignora porque significa que el proceso ya murio, lo cual es el objetivo deseado. Comprueba si el proceso sigue vivo con signal 0 (kill -0) cada 100ms durante el grace period.
+74
View File
@@ -0,0 +1,74 @@
package infra
import (
"bytes"
"fmt"
"os"
"os/exec"
"strings"
"syscall"
"time"
)
// ProcessSpawn launches a subprocess using the given shell.
// If shell is empty, "sh" is used. If command contains newlines it is treated
// as a multi-line script: the content is written to a temp file and executed
// with `shell <tempfile>`. Otherwise it is executed with `shell -c <command>`.
// dir sets the working directory (empty = inherit). env sets the environment
// (nil = inherit parent env). The process group is created with Setpgid so
// that ProcessKill can target the whole group.
func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error) {
if shell == "" {
shell = "sh"
}
var cmd *exec.Cmd
if strings.Contains(command, "\n") {
// Multi-line script: write to a temp file and execute it.
tmp, err := os.CreateTemp("", "fn-proc-*.sh")
if err != nil {
return nil, fmt.Errorf("process_spawn: create temp file: %w", err)
}
if _, err := tmp.WriteString(command); err != nil {
_ = os.Remove(tmp.Name())
return nil, fmt.Errorf("process_spawn: write temp file: %w", err)
}
if err := tmp.Close(); err != nil {
_ = os.Remove(tmp.Name())
return nil, fmt.Errorf("process_spawn: close temp file: %w", err)
}
cmd = exec.Command(shell, tmp.Name())
} else {
cmd = exec.Command(shell, "-c", command)
}
if dir != "" {
cmd.Dir = dir
}
if len(env) > 0 {
cmd.Env = env
}
// New process group so we can kill all children as a group.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
// Use buffers instead of pipes to avoid race between Wait() and ReadAll().
var stdoutBuf, stderrBuf bytes.Buffer
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
start := time.Now()
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("process_spawn: start: %w", err)
}
return &ProcessHandle{
Cmd: cmd,
Pid: cmd.Process.Pid,
StartTime: start,
Dir: dir,
stdout: &stdoutBuf,
stderr: &stderrBuf,
}, nil
}
+52
View File
@@ -0,0 +1,52 @@
---
name: process_spawn
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error)"
description: "Lanza un subproceso usando el shell indicado. Si shell esta vacio usa 'sh'. Comandos con newlines se tratan como scripts multilinea (se escriben a un archivo temporal). Configura un process group propio (Setpgid) para poder matar todos los hijos con ProcessKill. Captura stdout y stderr via pipes."
tags: [process, subprocess, spawn, exec, shell, infra]
uses_functions: []
uses_types: [process_handle_go_infra]
returns: [process_handle_go_infra]
returns_optional: false
error_type: "error_go_core"
imports: [fmt, os, os/exec, strings, syscall, time]
params:
- name: command
desc: "comando shell a ejecutar; si contiene newlines se trata como script multilinea"
- name: dir
desc: "directorio de trabajo del proceso hijo; vacio hereda el del proceso padre"
- name: env
desc: "variables de entorno en formato KEY=VALUE; nil hereda el entorno del proceso padre"
- name: shell
desc: "interprete shell a usar (sh, bash, zsh); vacio usa 'sh'"
output: "handle del proceso lanzado con Cmd, Pid, StartTime, Dir y los pipes de I/O"
tested: true
tests:
- "spawn and wait echo"
- "spawn with timeout kills"
- "spawn with env"
- "spawn script"
- "spawn with working dir"
- "kill process"
test_file_path: "functions/infra/process_spawn_test.go"
file_path: "functions/infra/process_spawn.go"
---
## Ejemplo
```go
h, err := ProcessSpawn("echo hello", "", nil, "")
if err != nil {
log.Fatal(err)
}
res, err := ProcessWait(h, 10)
fmt.Println(res.Stdout) // "hello\n"
```
## Notas
Funcion impura: hace I/O (crea archivo temporal para scripts, lanza proceso). El process group (Setpgid=true) permite a ProcessKill enviar senales al grupo completo con -Pid, afectando a todos los hijos del proceso lanzado. Para scripts multilinea el archivo temporal queda en el directorio temporal del OS y no se limpia automaticamente.
+107
View File
@@ -0,0 +1,107 @@
package infra
import (
"strings"
"testing"
)
func TestProcessSpawn(t *testing.T) {
t.Run("spawn and wait echo", func(t *testing.T) {
h, err := ProcessSpawn("echo hello", "", nil, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
res, err := ProcessWait(h, 10)
if err != nil {
t.Fatalf("wait: %v", err)
}
if res.ExitCode != 0 {
t.Errorf("exit code: got %d, want 0", res.ExitCode)
}
if !strings.Contains(res.Stdout, "hello") {
t.Errorf("stdout: got %q, want it to contain 'hello'", res.Stdout)
}
})
t.Run("spawn with timeout kills", func(t *testing.T) {
h, err := ProcessSpawn("sleep 60", "", nil, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
res, err := ProcessWait(h, 2)
if err != nil {
t.Fatalf("wait: %v", err)
}
if !res.Killed {
t.Errorf("killed: got false, want true")
}
if res.ExitCode == 0 {
t.Errorf("exit code: got 0, want != 0 after kill")
}
})
t.Run("spawn with env", func(t *testing.T) {
h, err := ProcessSpawn("echo $TEST_VAR", "", []string{"PATH=/usr/bin:/bin", "TEST_VAR=hello123"}, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
res, err := ProcessWait(h, 10)
if err != nil {
t.Fatalf("wait: %v", err)
}
if !strings.Contains(res.Stdout, "hello123") {
t.Errorf("stdout: got %q, want it to contain 'hello123'", res.Stdout)
}
})
t.Run("spawn script", func(t *testing.T) {
script := "#!/bin/sh\necho line1\necho line2"
h, err := ProcessSpawn(script, "", nil, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
res, err := ProcessWait(h, 10)
if err != nil {
t.Fatalf("wait: %v", err)
}
if !strings.Contains(res.Stdout, "line1") {
t.Errorf("stdout: got %q, want it to contain 'line1'", res.Stdout)
}
if !strings.Contains(res.Stdout, "line2") {
t.Errorf("stdout: got %q, want it to contain 'line2'", res.Stdout)
}
})
t.Run("spawn with working dir", func(t *testing.T) {
h, err := ProcessSpawn("pwd", "/tmp", nil, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
res, err := ProcessWait(h, 10)
if err != nil {
t.Fatalf("wait: %v", err)
}
if !strings.Contains(res.Stdout, "/tmp") {
t.Errorf("stdout: got %q, want it to contain '/tmp'", res.Stdout)
}
})
t.Run("kill process", func(t *testing.T) {
h, err := ProcessSpawn("sleep 60", "", nil, "")
if err != nil {
t.Fatalf("spawn: %v", err)
}
if err := ProcessKill(h, 1); err != nil {
t.Fatalf("kill: %v", err)
}
// After kill, Wait should unblock quickly.
_ = h.Cmd.Wait()
state := h.Cmd.ProcessState
if state == nil {
t.Fatal("process state is nil after kill+wait")
}
if state.ExitCode() == 0 {
t.Errorf("exit code: got 0 after kill, want non-zero")
}
})
}
+51
View File
@@ -0,0 +1,51 @@
package infra
import (
"time"
)
// ProcessWait waits for a subprocess to finish and collects its output.
// If timeoutSec > 0 and the process has not exited by then, ProcessKill is
// called with graceSec=5 and the result is marked Killed=true.
func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error) {
// Wait for the process in a goroutine.
waitCh := make(chan error, 1)
go func() {
waitCh <- handle.Cmd.Wait()
}()
killed := false
if timeoutSec > 0 {
timer := time.NewTimer(time.Duration(timeoutSec) * time.Second)
defer timer.Stop()
select {
case <-timer.C:
// Timeout exceeded — kill the process group.
_ = ProcessKill(handle, 5)
killed = true
<-waitCh
case <-waitCh:
}
} else {
<-waitCh
}
// After Wait() returns, buffers are safe to read.
exitCode := 0
if handle.Cmd.ProcessState != nil {
exitCode = handle.Cmd.ProcessState.ExitCode()
} else if killed {
exitCode = -1
}
duration := time.Since(handle.StartTime)
return ProcessResult{
ExitCode: exitCode,
Stdout: handle.stdout.String(),
Stderr: handle.stderr.String(),
DurationMs: duration.Milliseconds(),
Killed: killed,
}, nil
}
+49
View File
@@ -0,0 +1,49 @@
---
name: process_wait
kind: function
lang: go
domain: infra
version: "1.0.0"
purity: impure
signature: "func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error)"
description: "Espera a que un subproceso termine y recopila su salida. Lee stdout y stderr completos en goroutines para evitar deadlocks en pipes. Si timeoutSec > 0 y el proceso no termina en ese tiempo, llama a ProcessKill y marca el resultado con Killed=true. Retorna el exit code, salida completa y duracion total."
tags: [process, subprocess, wait, timeout, exec, infra]
uses_functions: [process_kill_go_infra]
uses_types: [process_handle_go_infra, process_result_go_infra]
returns: [process_result_go_infra]
returns_optional: false
error_type: "error_go_core"
imports: [fmt, io, time]
params:
- name: handle
desc: "handle del proceso lanzado por ProcessSpawn"
- name: timeoutSec
desc: "segundos maximos de espera; 0 o negativo espera indefinidamente"
output: "resultado con exit code, stdout, stderr, duracion en ms y flag de killed"
tested: true
tests:
- "spawn and wait echo"
- "spawn with timeout kills"
- "spawn with env"
- "spawn script"
- "spawn with working dir"
test_file_path: "functions/infra/process_spawn_test.go"
file_path: "functions/infra/process_wait.go"
---
## Ejemplo
```go
h, err := ProcessSpawn("sleep 60", "", nil, "")
if err != nil {
log.Fatal(err)
}
res, err := ProcessWait(h, 5) // timeout de 5 segundos
if res.Killed {
fmt.Println("proceso terminado por timeout")
}
```
## Notas
Funcion impura: bloquea esperando I/O y posiblemente llama a ProcessKill. Lee stdout y stderr en goroutines separadas antes de llamar a cmd.Wait() para evitar el deadlock clasico donde cmd.Wait() bloquea porque los pipes estan llenos y nadie los lee. El exit code -1 indica que ProcessState no estaba disponible (proceso matado antes de registrar estado).
+27
View File
@@ -0,0 +1,27 @@
---
name: DagRun
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type DagRun struct {
ID string
DagName string
DagPath string
Status string
StartedAt time.Time
FinishedAt time.Time
Trigger string
Error string
}
description: "Representa una ejecucion de un workflow DAG. Almacenado en SQLite con estado, timestamps y trigger."
tags: [dag, execution, run, workflow]
uses_types: []
file_path: "functions/infra/dag_run.go"
---
## Notas
Status puede ser: pending, running, success, failed, cancelled.
Trigger puede ser: manual, cron, api.
+29
View File
@@ -0,0 +1,29 @@
---
name: DagStepResult
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type DagStepResult struct {
ID string
RunID string
StepName string
Status string
ExitCode int
Stdout string
Stderr string
StartedAt time.Time
FinishedAt time.Time
DurationMs int64
Error string
}
description: "Resultado de la ejecucion de un step individual dentro de un DagRun. Captura exit code, stdout, stderr y duracion."
tags: [dag, execution, step, result]
uses_types: [DagRun_go_infra]
file_path: "functions/infra/dag_run.go"
---
## Notas
Status puede ser: pending, running, success, failed, skipped.
+24
View File
@@ -0,0 +1,24 @@
---
name: process_handle
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type ProcessHandle struct {
Cmd *exec.Cmd
Pid int
StartTime time.Time
Dir string
stdout io.ReadCloser
stderr io.ReadCloser
}
description: "Handle de un subproceso en ejecucion. Contiene el comando, PID, tiempo de inicio, directorio de trabajo y los pipes de stdout/stderr (privados, leidos internamente por ProcessWait)."
tags: [process, subprocess, handle, infra, exec]
uses_types: []
file_path: "functions/infra/process_handle.go"
---
## Notas
Tipo producto. Los campos stdout y stderr son privados para evitar lecturas concurrentes externas — ProcessWait los consume internamente. Cmd.SysProcAttr.Setpgid=true garantiza que ProcessKill puede matar el process group completo usando -Pid.
+23
View File
@@ -0,0 +1,23 @@
---
name: process_result
lang: go
domain: infra
version: "1.0.0"
algebraic: product
definition: |
type ProcessResult struct {
ExitCode int
Stdout string
Stderr string
DurationMs int64
Killed bool
}
description: "Resultado de un subproceso completado. Contiene codigo de salida, salida estandar y de error, duracion en milisegundos, y un flag que indica si fue terminado por timeout."
tags: [process, subprocess, result, exit, infra, exec]
uses_types: []
file_path: "functions/infra/process_handle.go"
---
## Notas
Tipo producto — todos los campos siempre presentes. Killed=true indica que ProcessWait agoto el timeout y llamo a ProcessKill; en ese caso ExitCode suele ser -1 o el codigo de SIGKILL segun el OS. DurationMs incluye el tiempo total desde ProcessSpawn hasta que Wait() retorno.