diff --git a/functions/infra/dag_run.go b/functions/infra/dag_run.go new file mode 100644 index 00000000..6e67705e --- /dev/null +++ b/functions/infra/dag_run.go @@ -0,0 +1,30 @@ +package infra + +import "time" + +// DagRun represents one execution of a DAG workflow. +type DagRun struct { + ID string + DagName string + DagPath string + Status string // pending, running, success, failed, cancelled + StartedAt time.Time + FinishedAt time.Time + Trigger string // manual, cron, api + Error string +} + +// DagStepResult represents the outcome of one step within a DagRun. +type DagStepResult struct { + ID string + RunID string + StepName string + Status string // pending, running, success, failed, skipped + ExitCode int + Stdout string + Stderr string + StartedAt time.Time + FinishedAt time.Time + DurationMs int64 + Error string +} diff --git a/functions/infra/process_handle.go b/functions/infra/process_handle.go new file mode 100644 index 00000000..f7afffcf --- /dev/null +++ b/functions/infra/process_handle.go @@ -0,0 +1,26 @@ +package infra + +import ( + "bytes" + "os/exec" + "time" +) + +// ProcessHandle represents a running subprocess with output buffers. +type ProcessHandle struct { + Cmd *exec.Cmd + Pid int + StartTime time.Time + Dir string + stdout *bytes.Buffer + stderr *bytes.Buffer +} + +// ProcessResult contains the outcome of a completed subprocess. +type ProcessResult struct { + ExitCode int + Stdout string + Stderr string + DurationMs int64 + Killed bool +} diff --git a/functions/infra/process_kill.go b/functions/infra/process_kill.go new file mode 100644 index 00000000..3c47fecc --- /dev/null +++ b/functions/infra/process_kill.go @@ -0,0 +1,42 @@ +package infra + +import ( + "fmt" + "syscall" + "time" +) + +// ProcessKill sends SIGTERM to the process group of handle, then waits up to +// graceSec seconds for the process to exit. If it is still alive after the +// grace period, SIGKILL is sent. Returns an error only if the signal could not +// be delivered (e.g. the process group does not exist). +func ProcessKill(handle *ProcessHandle, graceSec int) error { + // Send SIGTERM to the process group (negative pid targets the group). + if err := syscall.Kill(-handle.Pid, syscall.SIGTERM); err != nil { + // ESRCH means the process is already gone — not an error from our view. + if err != syscall.ESRCH { + return fmt.Errorf("process_kill: sigterm: %w", err) + } + return nil + } + + // Poll until the process exits or the grace period expires. + deadline := time.Now().Add(time.Duration(graceSec) * time.Second) + for time.Now().Before(deadline) { + // Check if process has exited by sending signal 0 (no-op). + err := syscall.Kill(-handle.Pid, 0) + if err == syscall.ESRCH { + // Process group is gone. + return nil + } + time.Sleep(100 * time.Millisecond) + } + + // Still alive after grace period — escalate to SIGKILL. + if err := syscall.Kill(-handle.Pid, syscall.SIGKILL); err != nil { + if err != syscall.ESRCH { + return fmt.Errorf("process_kill: sigkill: %w", err) + } + } + return nil +} diff --git a/functions/infra/process_kill.md b/functions/infra/process_kill.md new file mode 100644 index 00000000..6ce9fb0c --- /dev/null +++ b/functions/infra/process_kill.md @@ -0,0 +1,45 @@ +--- +name: process_kill +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessKill(handle *ProcessHandle, graceSec int) error" +description: "Termina un subproceso enviando SIGTERM al process group. Espera hasta graceSec segundos a que el proceso muera voluntariamente. Si sigue vivo, envia SIGKILL. Retorna error solo si la senal no pudo entregarse." +tags: [process, subprocess, kill, signal, sigterm, sigkill, infra] +uses_functions: [] +uses_types: [process_handle_go_infra] +returns: [] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, syscall, time] +params: + - name: handle + desc: "handle del proceso lanzado por ProcessSpawn" + - name: graceSec + desc: "segundos de gracia entre SIGTERM y SIGKILL; 0 envia SIGKILL inmediatamente" +output: "nil si el proceso fue terminado correctamente; error si la senal no pudo entregarse" +tested: true +tests: + - "kill process" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_kill.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("sleep 300", "", nil, "") +if err != nil { + log.Fatal(err) +} +// Dar 3 segundos de gracia antes de SIGKILL +if err := ProcessKill(h, 3); err != nil { + log.Printf("kill failed: %v", err) +} +``` + +## Notas + +Funcion impura: envia senales al sistema operativo. Usa -handle.Pid (negativo) para direccionar el process group completo, matando tanto al proceso principal como a sus hijos. ESRCH se ignora porque significa que el proceso ya murio, lo cual es el objetivo deseado. Comprueba si el proceso sigue vivo con signal 0 (kill -0) cada 100ms durante el grace period. diff --git a/functions/infra/process_spawn.go b/functions/infra/process_spawn.go new file mode 100644 index 00000000..f2a33364 --- /dev/null +++ b/functions/infra/process_spawn.go @@ -0,0 +1,74 @@ +package infra + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "strings" + "syscall" + "time" +) + +// ProcessSpawn launches a subprocess using the given shell. +// If shell is empty, "sh" is used. If command contains newlines it is treated +// as a multi-line script: the content is written to a temp file and executed +// with `shell `. Otherwise it is executed with `shell -c `. +// dir sets the working directory (empty = inherit). env sets the environment +// (nil = inherit parent env). The process group is created with Setpgid so +// that ProcessKill can target the whole group. +func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error) { + if shell == "" { + shell = "sh" + } + + var cmd *exec.Cmd + + if strings.Contains(command, "\n") { + // Multi-line script: write to a temp file and execute it. + tmp, err := os.CreateTemp("", "fn-proc-*.sh") + if err != nil { + return nil, fmt.Errorf("process_spawn: create temp file: %w", err) + } + if _, err := tmp.WriteString(command); err != nil { + _ = os.Remove(tmp.Name()) + return nil, fmt.Errorf("process_spawn: write temp file: %w", err) + } + if err := tmp.Close(); err != nil { + _ = os.Remove(tmp.Name()) + return nil, fmt.Errorf("process_spawn: close temp file: %w", err) + } + cmd = exec.Command(shell, tmp.Name()) + } else { + cmd = exec.Command(shell, "-c", command) + } + + if dir != "" { + cmd.Dir = dir + } + if len(env) > 0 { + cmd.Env = env + } + + // New process group so we can kill all children as a group. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + // Use buffers instead of pipes to avoid race between Wait() and ReadAll(). + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + start := time.Now() + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("process_spawn: start: %w", err) + } + + return &ProcessHandle{ + Cmd: cmd, + Pid: cmd.Process.Pid, + StartTime: start, + Dir: dir, + stdout: &stdoutBuf, + stderr: &stderrBuf, + }, nil +} diff --git a/functions/infra/process_spawn.md b/functions/infra/process_spawn.md new file mode 100644 index 00000000..ec7c439b --- /dev/null +++ b/functions/infra/process_spawn.md @@ -0,0 +1,52 @@ +--- +name: process_spawn +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error)" +description: "Lanza un subproceso usando el shell indicado. Si shell esta vacio usa 'sh'. Comandos con newlines se tratan como scripts multilinea (se escriben a un archivo temporal). Configura un process group propio (Setpgid) para poder matar todos los hijos con ProcessKill. Captura stdout y stderr via pipes." +tags: [process, subprocess, spawn, exec, shell, infra] +uses_functions: [] +uses_types: [process_handle_go_infra] +returns: [process_handle_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, os, os/exec, strings, syscall, time] +params: + - name: command + desc: "comando shell a ejecutar; si contiene newlines se trata como script multilinea" + - name: dir + desc: "directorio de trabajo del proceso hijo; vacio hereda el del proceso padre" + - name: env + desc: "variables de entorno en formato KEY=VALUE; nil hereda el entorno del proceso padre" + - name: shell + desc: "interprete shell a usar (sh, bash, zsh); vacio usa 'sh'" +output: "handle del proceso lanzado con Cmd, Pid, StartTime, Dir y los pipes de I/O" +tested: true +tests: + - "spawn and wait echo" + - "spawn with timeout kills" + - "spawn with env" + - "spawn script" + - "spawn with working dir" + - "kill process" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_spawn.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("echo hello", "", nil, "") +if err != nil { + log.Fatal(err) +} +res, err := ProcessWait(h, 10) +fmt.Println(res.Stdout) // "hello\n" +``` + +## Notas + +Funcion impura: hace I/O (crea archivo temporal para scripts, lanza proceso). El process group (Setpgid=true) permite a ProcessKill enviar senales al grupo completo con -Pid, afectando a todos los hijos del proceso lanzado. Para scripts multilinea el archivo temporal queda en el directorio temporal del OS y no se limpia automaticamente. diff --git a/functions/infra/process_spawn_test.go b/functions/infra/process_spawn_test.go new file mode 100644 index 00000000..d3918044 --- /dev/null +++ b/functions/infra/process_spawn_test.go @@ -0,0 +1,107 @@ +package infra + +import ( + "strings" + "testing" +) + +func TestProcessSpawn(t *testing.T) { + t.Run("spawn and wait echo", func(t *testing.T) { + h, err := ProcessSpawn("echo hello", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if res.ExitCode != 0 { + t.Errorf("exit code: got %d, want 0", res.ExitCode) + } + if !strings.Contains(res.Stdout, "hello") { + t.Errorf("stdout: got %q, want it to contain 'hello'", res.Stdout) + } + }) + + t.Run("spawn with timeout kills", func(t *testing.T) { + h, err := ProcessSpawn("sleep 60", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 2) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !res.Killed { + t.Errorf("killed: got false, want true") + } + if res.ExitCode == 0 { + t.Errorf("exit code: got 0, want != 0 after kill") + } + }) + + t.Run("spawn with env", func(t *testing.T) { + h, err := ProcessSpawn("echo $TEST_VAR", "", []string{"PATH=/usr/bin:/bin", "TEST_VAR=hello123"}, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "hello123") { + t.Errorf("stdout: got %q, want it to contain 'hello123'", res.Stdout) + } + }) + + t.Run("spawn script", func(t *testing.T) { + script := "#!/bin/sh\necho line1\necho line2" + h, err := ProcessSpawn(script, "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "line1") { + t.Errorf("stdout: got %q, want it to contain 'line1'", res.Stdout) + } + if !strings.Contains(res.Stdout, "line2") { + t.Errorf("stdout: got %q, want it to contain 'line2'", res.Stdout) + } + }) + + t.Run("spawn with working dir", func(t *testing.T) { + h, err := ProcessSpawn("pwd", "/tmp", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + res, err := ProcessWait(h, 10) + if err != nil { + t.Fatalf("wait: %v", err) + } + if !strings.Contains(res.Stdout, "/tmp") { + t.Errorf("stdout: got %q, want it to contain '/tmp'", res.Stdout) + } + }) + + t.Run("kill process", func(t *testing.T) { + h, err := ProcessSpawn("sleep 60", "", nil, "") + if err != nil { + t.Fatalf("spawn: %v", err) + } + if err := ProcessKill(h, 1); err != nil { + t.Fatalf("kill: %v", err) + } + // After kill, Wait should unblock quickly. + _ = h.Cmd.Wait() + state := h.Cmd.ProcessState + if state == nil { + t.Fatal("process state is nil after kill+wait") + } + if state.ExitCode() == 0 { + t.Errorf("exit code: got 0 after kill, want non-zero") + } + }) +} diff --git a/functions/infra/process_wait.go b/functions/infra/process_wait.go new file mode 100644 index 00000000..6680533f --- /dev/null +++ b/functions/infra/process_wait.go @@ -0,0 +1,51 @@ +package infra + +import ( + "time" +) + +// ProcessWait waits for a subprocess to finish and collects its output. +// If timeoutSec > 0 and the process has not exited by then, ProcessKill is +// called with graceSec=5 and the result is marked Killed=true. +func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error) { + // Wait for the process in a goroutine. + waitCh := make(chan error, 1) + go func() { + waitCh <- handle.Cmd.Wait() + }() + + killed := false + + if timeoutSec > 0 { + timer := time.NewTimer(time.Duration(timeoutSec) * time.Second) + defer timer.Stop() + select { + case <-timer.C: + // Timeout exceeded — kill the process group. + _ = ProcessKill(handle, 5) + killed = true + <-waitCh + case <-waitCh: + } + } else { + <-waitCh + } + + // After Wait() returns, buffers are safe to read. + exitCode := 0 + if handle.Cmd.ProcessState != nil { + exitCode = handle.Cmd.ProcessState.ExitCode() + } else if killed { + exitCode = -1 + } + + duration := time.Since(handle.StartTime) + + return ProcessResult{ + ExitCode: exitCode, + Stdout: handle.stdout.String(), + Stderr: handle.stderr.String(), + DurationMs: duration.Milliseconds(), + Killed: killed, + }, nil +} diff --git a/functions/infra/process_wait.md b/functions/infra/process_wait.md new file mode 100644 index 00000000..5153a4ee --- /dev/null +++ b/functions/infra/process_wait.md @@ -0,0 +1,49 @@ +--- +name: process_wait +kind: function +lang: go +domain: infra +version: "1.0.0" +purity: impure +signature: "func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error)" +description: "Espera a que un subproceso termine y recopila su salida. Lee stdout y stderr completos en goroutines para evitar deadlocks en pipes. Si timeoutSec > 0 y el proceso no termina en ese tiempo, llama a ProcessKill y marca el resultado con Killed=true. Retorna el exit code, salida completa y duracion total." +tags: [process, subprocess, wait, timeout, exec, infra] +uses_functions: [process_kill_go_infra] +uses_types: [process_handle_go_infra, process_result_go_infra] +returns: [process_result_go_infra] +returns_optional: false +error_type: "error_go_core" +imports: [fmt, io, time] +params: + - name: handle + desc: "handle del proceso lanzado por ProcessSpawn" + - name: timeoutSec + desc: "segundos maximos de espera; 0 o negativo espera indefinidamente" +output: "resultado con exit code, stdout, stderr, duracion en ms y flag de killed" +tested: true +tests: + - "spawn and wait echo" + - "spawn with timeout kills" + - "spawn with env" + - "spawn script" + - "spawn with working dir" +test_file_path: "functions/infra/process_spawn_test.go" +file_path: "functions/infra/process_wait.go" +--- + +## Ejemplo + +```go +h, err := ProcessSpawn("sleep 60", "", nil, "") +if err != nil { + log.Fatal(err) +} +res, err := ProcessWait(h, 5) // timeout de 5 segundos +if res.Killed { + fmt.Println("proceso terminado por timeout") +} +``` + +## Notas + +Funcion impura: bloquea esperando I/O y posiblemente llama a ProcessKill. Lee stdout y stderr en goroutines separadas antes de llamar a cmd.Wait() para evitar el deadlock clasico donde cmd.Wait() bloquea porque los pipes estan llenos y nadie los lee. El exit code -1 indica que ProcessState no estaba disponible (proceso matado antes de registrar estado). diff --git a/types/infra/dag_run.md b/types/infra/dag_run.md new file mode 100644 index 00000000..53ff6c64 --- /dev/null +++ b/types/infra/dag_run.md @@ -0,0 +1,27 @@ +--- +name: DagRun +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type DagRun struct { + ID string + DagName string + DagPath string + Status string + StartedAt time.Time + FinishedAt time.Time + Trigger string + Error string + } +description: "Representa una ejecucion de un workflow DAG. Almacenado en SQLite con estado, timestamps y trigger." +tags: [dag, execution, run, workflow] +uses_types: [] +file_path: "functions/infra/dag_run.go" +--- + +## Notas + +Status puede ser: pending, running, success, failed, cancelled. +Trigger puede ser: manual, cron, api. diff --git a/types/infra/dag_step_result.md b/types/infra/dag_step_result.md new file mode 100644 index 00000000..56780e5c --- /dev/null +++ b/types/infra/dag_step_result.md @@ -0,0 +1,29 @@ +--- +name: DagStepResult +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type DagStepResult struct { + ID string + RunID string + StepName string + Status string + ExitCode int + Stdout string + Stderr string + StartedAt time.Time + FinishedAt time.Time + DurationMs int64 + Error string + } +description: "Resultado de la ejecucion de un step individual dentro de un DagRun. Captura exit code, stdout, stderr y duracion." +tags: [dag, execution, step, result] +uses_types: [DagRun_go_infra] +file_path: "functions/infra/dag_run.go" +--- + +## Notas + +Status puede ser: pending, running, success, failed, skipped. diff --git a/types/infra/process_handle.md b/types/infra/process_handle.md new file mode 100644 index 00000000..83d96fbc --- /dev/null +++ b/types/infra/process_handle.md @@ -0,0 +1,24 @@ +--- +name: process_handle +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type ProcessHandle struct { + Cmd *exec.Cmd + Pid int + StartTime time.Time + Dir string + stdout io.ReadCloser + stderr io.ReadCloser + } +description: "Handle de un subproceso en ejecucion. Contiene el comando, PID, tiempo de inicio, directorio de trabajo y los pipes de stdout/stderr (privados, leidos internamente por ProcessWait)." +tags: [process, subprocess, handle, infra, exec] +uses_types: [] +file_path: "functions/infra/process_handle.go" +--- + +## Notas + +Tipo producto. Los campos stdout y stderr son privados para evitar lecturas concurrentes externas — ProcessWait los consume internamente. Cmd.SysProcAttr.Setpgid=true garantiza que ProcessKill puede matar el process group completo usando -Pid. diff --git a/types/infra/process_result.md b/types/infra/process_result.md new file mode 100644 index 00000000..4d969124 --- /dev/null +++ b/types/infra/process_result.md @@ -0,0 +1,23 @@ +--- +name: process_result +lang: go +domain: infra +version: "1.0.0" +algebraic: product +definition: | + type ProcessResult struct { + ExitCode int + Stdout string + Stderr string + DurationMs int64 + Killed bool + } +description: "Resultado de un subproceso completado. Contiene codigo de salida, salida estandar y de error, duracion en milisegundos, y un flag que indica si fue terminado por timeout." +tags: [process, subprocess, result, exit, infra, exec] +uses_types: [] +file_path: "functions/infra/process_handle.go" +--- + +## Notas + +Tipo producto — todos los campos siempre presentes. Killed=true indica que ProcessWait agoto el timeout y llamo a ProcessKill; en ese caso ExitCode suele ser -1 o el codigo de SIGKILL segun el OS. DurationMs incluye el tiempo total desde ProcessSpawn hasta que Wait() retorno.