merge: issue/0007-dag-engine — Motor de DAGs con CLI, web frontend y SQLite
Reemplaza Dagu con implementacion propia compatible con formato YAML existente. Incluye parser, validador, topo sort, process manager, execution store SQLite, scheduler cron, CLI (run/list/status/validate/server) y frontend React/Mantine.
This commit is contained in:
@@ -55,3 +55,4 @@ Thumbs.db
|
||||
|
||||
broken_paths.txt
|
||||
imgui.ini
|
||||
prompts/
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"dag-engine": {
|
||||
"enabled": false,
|
||||
"enabled": true,
|
||||
"issue": "0007",
|
||||
"description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store y scheduler cron."
|
||||
"description": "Sistema propio de orquestacion de DAGs para reemplazar Dagu. Incluye parser YAML, executor con paralelismo, process manager, execution store SQLite, scheduler cron, CLI y web frontend."
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,9 +8,9 @@
|
||||
| 0004 | Jupyter discover multiple instances | completado | — | feature | — |
|
||||
| 0005 | Jupyter write batch | completado | — | feature | — |
|
||||
| 0006 | Jupyter exec outputs keyerror | completado | — | bugfix | — |
|
||||
| **0007a** | **DAG engine: core (parse, validate, topo sort)** | pendiente | alta | feature | 0007b-e |
|
||||
| **0007b** | **DAG engine: process manager (spawn, wait, kill)** | pendiente | alta | feature | 0007e |
|
||||
| **0007c** | **DAG engine: execution store (SQLite)** | pendiente | alta | feature | 0007e |
|
||||
| **0007d** | **DAG engine: scheduler (cron parser, ticker)** | pendiente | media | feature | 0007e |
|
||||
| **0007e** | **DAG engine: app CLI que reemplaza Dagu** | pendiente | alta | feature | — |
|
||||
| [0007a](completed/0007a-dag-core.md) | DAG engine: core (parse, validate, topo sort) | completado | alta | feature | 0007b-e |
|
||||
| [0007b](completed/0007b-process-manager.md) | DAG engine: process manager (spawn, wait, kill) | completado | alta | feature | 0007e |
|
||||
| [0007c](completed/0007c-execution-store.md) | DAG engine: execution store (SQLite) | completado | alta | feature | 0007e |
|
||||
| [0007d](completed/0007d-scheduler.md) | DAG engine: scheduler (cron match) | completado | media | feature | 0007e |
|
||||
| [0007e](completed/0007e-dag-executor-app.md) | DAG engine: CLI + web app que reemplaza Dagu | completado | alta | feature | — |
|
||||
| **0008** | **SQLite API Web** | pendiente | alta | feature | — |
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
package core
|
||||
|
||||
import "time"
|
||||
|
||||
// CronMatch returns true if time t matches all fields of the cron schedule.
|
||||
func CronMatch(sched CronSchedule, t time.Time) bool {
|
||||
return intIn(t.Minute(), sched.Minute) &&
|
||||
intIn(t.Hour(), sched.Hour) &&
|
||||
intIn(t.Day(), sched.DayOfMonth) &&
|
||||
intIn(int(t.Month()), sched.Month) &&
|
||||
intIn(int(t.Weekday()), sched.DayOfWeek)
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
---
|
||||
name: cron_match
|
||||
kind: function
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "func CronMatch(sched CronSchedule, t time.Time) bool"
|
||||
description: "Verifica si un instante de tiempo coincide con un cron schedule. Compara los 5 campos (minuto, hora, dia del mes, mes, dia de la semana) y retorna true si todos coinciden."
|
||||
tags: [cron, scheduling, matching, time, pure]
|
||||
uses_functions: []
|
||||
uses_types: [cron_schedule_go_core]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [time]
|
||||
params:
|
||||
- name: sched
|
||||
desc: "CronSchedule con listas de valores validos por campo (resultado de ParseCronExpr)"
|
||||
- name: t
|
||||
desc: "instante de tiempo a verificar contra el schedule"
|
||||
output: "true si t coincide con todos los campos del cron schedule"
|
||||
tested: true
|
||||
tests:
|
||||
- "9:00 AM coincide con 0 9 * * *"
|
||||
- "9:15 AM NO coincide con 0 9 * * *"
|
||||
- "lunes a las 9 coincide con 0 9 * * 1"
|
||||
- "domingo a las 9 NO coincide con 0 9 * * 1"
|
||||
- "wildcard * coincide con cualquier valor"
|
||||
- "specific month"
|
||||
test_file_path: "functions/core/cron_match_test.go"
|
||||
file_path: "functions/core/cron_match.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
sched, _ := ParseCronExpr("0 9 * * *")
|
||||
t := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
|
||||
CronMatch(sched, t) // true
|
||||
|
||||
t2 := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC)
|
||||
CronMatch(sched, t2) // false
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion pura. Usa AND semantics para day_of_month y day_of_week (ambos deben coincidir), igual que NextCronTime en el mismo paquete.
|
||||
Reutiliza el helper intIn definido en next_cron_time.go (mismo paquete core).
|
||||
@@ -0,0 +1,88 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestCronMatch(t *testing.T) {
|
||||
t.Run("9:00 AM coincide con 0 9 * * *", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("0 9 * * *")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
ts := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
|
||||
if !CronMatch(sched, ts) {
|
||||
t.Errorf("expected match for 9:00 AM with '0 9 * * *'")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("9:15 AM NO coincide con 0 9 * * *", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("0 9 * * *")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
ts := time.Date(2026, 4, 11, 9, 15, 0, 0, time.UTC)
|
||||
if CronMatch(sched, ts) {
|
||||
t.Errorf("expected no match for 9:15 AM with '0 9 * * *'")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("lunes a las 9 coincide con 0 9 * * 1", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("0 9 * * 1")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
// 2026-04-13 is a Monday
|
||||
ts := time.Date(2026, 4, 13, 9, 0, 0, 0, time.UTC)
|
||||
if !CronMatch(sched, ts) {
|
||||
t.Errorf("expected match for Monday 9:00 with '0 9 * * 1'")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("domingo a las 9 NO coincide con 0 9 * * 1", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("0 9 * * 1")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
// 2026-04-12 is a Sunday (weekday 0)
|
||||
ts := time.Date(2026, 4, 12, 9, 0, 0, 0, time.UTC)
|
||||
if CronMatch(sched, ts) {
|
||||
t.Errorf("expected no match for Sunday 9:00 with '0 9 * * 1'")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("wildcard * coincide con cualquier valor", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("*/15 * * * *")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
// 9:30 should match */15 (0,15,30,45)
|
||||
ts := time.Date(2026, 4, 11, 9, 30, 0, 0, time.UTC)
|
||||
if !CronMatch(sched, ts) {
|
||||
t.Errorf("expected match for 9:30 with '*/15 * * * *'")
|
||||
}
|
||||
// 9:07 should NOT match */15
|
||||
ts2 := time.Date(2026, 4, 11, 9, 7, 0, 0, time.UTC)
|
||||
if CronMatch(sched, ts2) {
|
||||
t.Errorf("expected no match for 9:07 with '*/15 * * * *'")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("specific month", func(t *testing.T) {
|
||||
sched, err := ParseCronExpr("0 0 1 4 *")
|
||||
if err != nil {
|
||||
t.Fatalf("ParseCronExpr: %v", err)
|
||||
}
|
||||
// April 1st matches
|
||||
ts := time.Date(2026, 4, 1, 0, 0, 0, 0, time.UTC)
|
||||
if !CronMatch(sched, ts) {
|
||||
t.Errorf("expected match for April 1 with '0 0 1 4 *'")
|
||||
}
|
||||
// March 1st does NOT match
|
||||
ts2 := time.Date(2026, 3, 1, 0, 0, 0, 0, time.UTC)
|
||||
if CronMatch(sched, ts2) {
|
||||
t.Errorf("expected no match for March 1 with '0 0 1 4 *'")
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
package core
|
||||
|
||||
// DagContinueOn controls whether a step continues on failure/skip.
|
||||
type DagContinueOn struct {
|
||||
Failure bool
|
||||
Skipped bool
|
||||
}
|
||||
|
||||
// DagRetryPolicy configures automatic retries for a step.
|
||||
type DagRetryPolicy struct {
|
||||
Limit int
|
||||
IntervalSec int
|
||||
}
|
||||
|
||||
// DagStep represents a single step in a DAG workflow.
|
||||
type DagStep struct {
|
||||
Name string
|
||||
ID string
|
||||
Description string
|
||||
Command string
|
||||
Script string
|
||||
Args []string
|
||||
Shell string
|
||||
Dir string
|
||||
Depends []string
|
||||
Env map[string]string
|
||||
ContinueOn DagContinueOn
|
||||
RetryPolicy DagRetryPolicy
|
||||
TimeoutSec int
|
||||
Output string
|
||||
Tags []string
|
||||
}
|
||||
|
||||
// DagHandlers contains lifecycle handler steps.
|
||||
type DagHandlers struct {
|
||||
Init []DagStep
|
||||
Success []DagStep
|
||||
Failure []DagStep
|
||||
Exit []DagStep
|
||||
}
|
||||
|
||||
// DagDefinition is a complete DAG workflow parsed from YAML.
|
||||
type DagDefinition struct {
|
||||
Name string
|
||||
Description string
|
||||
Group string
|
||||
Type string // "graph" or "" (chain/sequential)
|
||||
WorkingDir string
|
||||
Shell string
|
||||
Env map[string]string
|
||||
Schedule []string
|
||||
Steps []DagStep
|
||||
HandlerOn DagHandlers
|
||||
Tags []string
|
||||
TimeoutSec int
|
||||
FilePath string
|
||||
}
|
||||
|
||||
// DagValidationResult contains validation output.
|
||||
type DagValidationResult struct {
|
||||
Valid bool
|
||||
Errors []string
|
||||
Warnings []string
|
||||
Levels [][]string // topological levels (step names/IDs)
|
||||
}
|
||||
@@ -0,0 +1,281 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// rawDagStep is the loosely-typed intermediate representation of a DAG step.
|
||||
type rawDagStep struct {
|
||||
Name string `yaml:"name"`
|
||||
ID string `yaml:"id"`
|
||||
Description string `yaml:"description"`
|
||||
Command string `yaml:"command"`
|
||||
Script string `yaml:"script"`
|
||||
Args []string `yaml:"args"`
|
||||
Shell string `yaml:"shell"`
|
||||
Dir string `yaml:"dir"`
|
||||
WorkingDir string `yaml:"working_dir"`
|
||||
Depends []string `yaml:"depends"`
|
||||
Env interface{} `yaml:"env"`
|
||||
ContinueOn rawDagContinueOn `yaml:"continue_on"`
|
||||
RetryPolicy rawDagRetryPolicy `yaml:"retry_policy"`
|
||||
TimeoutSec int `yaml:"timeout_sec"`
|
||||
Output string `yaml:"output"`
|
||||
Tags []string `yaml:"tags"`
|
||||
}
|
||||
|
||||
type rawDagContinueOn struct {
|
||||
Failure bool `yaml:"failure"`
|
||||
Skipped bool `yaml:"skipped"`
|
||||
}
|
||||
|
||||
type rawDagRetryPolicy struct {
|
||||
Limit int `yaml:"limit"`
|
||||
IntervalSec int `yaml:"interval_sec"`
|
||||
}
|
||||
|
||||
// rawDagHandlers handles both handler_on and handlers aliases.
|
||||
type rawDagHandlers struct {
|
||||
Init interface{} `yaml:"init"`
|
||||
Success interface{} `yaml:"success"`
|
||||
Failure interface{} `yaml:"failure"`
|
||||
Exit interface{} `yaml:"exit"`
|
||||
}
|
||||
|
||||
// rawDag is the loosely-typed intermediate representation of a DAG YAML.
|
||||
type rawDag struct {
|
||||
Name string `yaml:"name"`
|
||||
Description string `yaml:"description"`
|
||||
Group string `yaml:"group"`
|
||||
Type string `yaml:"type"`
|
||||
WorkingDir string `yaml:"working_dir"`
|
||||
Shell string `yaml:"shell"`
|
||||
Env interface{} `yaml:"env"`
|
||||
Schedule interface{} `yaml:"schedule"`
|
||||
Steps []rawDagStep `yaml:"steps"`
|
||||
HandlerOn *rawDagHandlers `yaml:"handler_on"`
|
||||
Handlers *rawDagHandlers `yaml:"handlers"`
|
||||
Tags []string `yaml:"tags"`
|
||||
TimeoutSec int `yaml:"timeout_sec"`
|
||||
}
|
||||
|
||||
// DagParse parses a YAML DAG definition compatible with Dagu format.
|
||||
// Handles schedule as string or list, env as list of single-key maps,
|
||||
// handler_on and handlers as aliases, and steps with command/script/depends.
|
||||
func DagParse(data []byte) (DagDefinition, error) {
|
||||
var raw rawDag
|
||||
if err := yaml.Unmarshal(data, &raw); err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: yaml unmarshal: %w", err)
|
||||
}
|
||||
|
||||
def := DagDefinition{
|
||||
Name: raw.Name,
|
||||
Description: raw.Description,
|
||||
Group: raw.Group,
|
||||
Type: raw.Type,
|
||||
WorkingDir: raw.WorkingDir,
|
||||
Shell: raw.Shell,
|
||||
Tags: raw.Tags,
|
||||
TimeoutSec: raw.TimeoutSec,
|
||||
}
|
||||
|
||||
// Normalize env (list of single-key maps or plain map).
|
||||
dagEnv, err := normalizeEnv(raw.Env)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: env: %w", err)
|
||||
}
|
||||
def.Env = dagEnv
|
||||
|
||||
// Normalize schedule (string or list).
|
||||
def.Schedule = normalizeSchedule(raw.Schedule)
|
||||
|
||||
// Normalize steps.
|
||||
steps := make([]DagStep, 0, len(raw.Steps))
|
||||
for i, rs := range raw.Steps {
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: step[%d]: %w", i, err)
|
||||
}
|
||||
steps = append(steps, step)
|
||||
}
|
||||
def.Steps = steps
|
||||
|
||||
// Normalize handlers: handler_on takes precedence, fallback to handlers.
|
||||
rawHandlers := raw.HandlerOn
|
||||
if rawHandlers == nil {
|
||||
rawHandlers = raw.Handlers
|
||||
}
|
||||
if rawHandlers != nil {
|
||||
handlers, err := normalizeHandlers(rawHandlers)
|
||||
if err != nil {
|
||||
return DagDefinition{}, fmt.Errorf("dag_parse: handlers: %w", err)
|
||||
}
|
||||
def.HandlerOn = handlers
|
||||
}
|
||||
|
||||
return def, nil
|
||||
}
|
||||
|
||||
// normalizeSchedule converts schedule from string or []interface{} to []string.
|
||||
func normalizeSchedule(v interface{}) []string {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
switch t := v.(type) {
|
||||
case string:
|
||||
s := strings.TrimSpace(t)
|
||||
if s == "" {
|
||||
return nil
|
||||
}
|
||||
return []string{s}
|
||||
case []interface{}:
|
||||
result := make([]string, 0, len(t))
|
||||
for _, item := range t {
|
||||
if s, ok := item.(string); ok {
|
||||
s = strings.TrimSpace(s)
|
||||
if s != "" {
|
||||
result = append(result, s)
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// normalizeEnv converts env from Dagu format (list of single-key maps) or plain map to map[string]string.
|
||||
func normalizeEnv(v interface{}) (map[string]string, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
switch t := v.(type) {
|
||||
case map[string]interface{}:
|
||||
// Plain YAML map.
|
||||
result := make(map[string]string, len(t))
|
||||
for k, val := range t {
|
||||
result[k] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
return result, nil
|
||||
case []interface{}:
|
||||
// Dagu format: list of single-key maps [KEY: value, KEY2: value2].
|
||||
result := make(map[string]string)
|
||||
for _, item := range t {
|
||||
m, ok := item.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for k, val := range m {
|
||||
result[k] = fmt.Sprintf("%v", val)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
return nil, fmt.Errorf("unexpected env type %T", v)
|
||||
}
|
||||
|
||||
// normalizeStep converts a rawDagStep to a DagStep.
|
||||
func normalizeStep(rs rawDagStep) (DagStep, error) {
|
||||
stepEnv, err := normalizeEnv(rs.Env)
|
||||
if err != nil {
|
||||
return DagStep{}, fmt.Errorf("env: %w", err)
|
||||
}
|
||||
|
||||
// working_dir is an alias for dir at the step level.
|
||||
dir := rs.Dir
|
||||
if dir == "" {
|
||||
dir = rs.WorkingDir
|
||||
}
|
||||
|
||||
return DagStep{
|
||||
Name: rs.Name,
|
||||
ID: rs.ID,
|
||||
Description: rs.Description,
|
||||
Command: rs.Command,
|
||||
Script: rs.Script,
|
||||
Args: rs.Args,
|
||||
Shell: rs.Shell,
|
||||
Dir: dir,
|
||||
Depends: rs.Depends,
|
||||
Env: stepEnv,
|
||||
ContinueOn: DagContinueOn{
|
||||
Failure: rs.ContinueOn.Failure,
|
||||
Skipped: rs.ContinueOn.Skipped,
|
||||
},
|
||||
RetryPolicy: DagRetryPolicy{
|
||||
Limit: rs.RetryPolicy.Limit,
|
||||
IntervalSec: rs.RetryPolicy.IntervalSec,
|
||||
},
|
||||
TimeoutSec: rs.TimeoutSec,
|
||||
Output: rs.Output,
|
||||
Tags: rs.Tags,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// normalizeHandlers converts rawDagHandlers to DagHandlers.
|
||||
// Each handler field can be a single step object or a list of steps.
|
||||
func normalizeHandlers(rh *rawDagHandlers) (DagHandlers, error) {
|
||||
var h DagHandlers
|
||||
var err error
|
||||
h.Init, err = normalizeHandlerField(rh.Init)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("init: %w", err)
|
||||
}
|
||||
h.Success, err = normalizeHandlerField(rh.Success)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("success: %w", err)
|
||||
}
|
||||
h.Failure, err = normalizeHandlerField(rh.Failure)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("failure: %w", err)
|
||||
}
|
||||
h.Exit, err = normalizeHandlerField(rh.Exit)
|
||||
if err != nil {
|
||||
return DagHandlers{}, fmt.Errorf("exit: %w", err)
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
// normalizeHandlerField converts a handler field (single step or list) to []DagStep.
|
||||
func normalizeHandlerField(v interface{}) ([]DagStep, error) {
|
||||
if v == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Re-marshal and unmarshal to handle polymorphic types cleanly.
|
||||
b, err := yaml.Marshal(v)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("re-marshal handler: %w", err)
|
||||
}
|
||||
|
||||
// Try as a list first.
|
||||
var rawList []rawDagStep
|
||||
if err := yaml.Unmarshal(b, &rawList); err == nil && len(rawList) > 0 {
|
||||
steps := make([]DagStep, 0, len(rawList))
|
||||
for i, rs := range rawList {
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("step[%d]: %w", i, err)
|
||||
}
|
||||
steps = append(steps, step)
|
||||
}
|
||||
return steps, nil
|
||||
}
|
||||
|
||||
// Try as single step.
|
||||
var rs rawDagStep
|
||||
if err := yaml.Unmarshal(b, &rs); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal handler step: %w", err)
|
||||
}
|
||||
step, err := normalizeStep(rs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// If the step has no meaningful content, return nil.
|
||||
if step.Name == "" && step.ID == "" && step.Command == "" && step.Script == "" {
|
||||
return nil, nil
|
||||
}
|
||||
return []DagStep{step}, nil
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
---
|
||||
name: dag_parse
|
||||
kind: function
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "func DagParse(data []byte) (DagDefinition, error)"
|
||||
description: "Parsea YAML de definicion de DAG en formato compatible con Dagu. Soporta schedule como string o lista, env como lista de maps single-key (formato Dagu), handler_on y handlers como aliases, steps con command/script/depends/continue_on, y type graph."
|
||||
tags: [dag, yaml, parsing, workflow, dagu, pure]
|
||||
uses_functions: []
|
||||
uses_types: [dag_definition_go_core, dag_step_go_core, dag_handlers_go_core]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [fmt, strings, gopkg.in/yaml.v3]
|
||||
params:
|
||||
- name: data
|
||||
desc: "contenido YAML de un archivo de definicion de DAG en formato Dagu"
|
||||
output: "DagDefinition con todos los campos normalizados; error si el YAML es sintaticamente invalido"
|
||||
tested: true
|
||||
tests:
|
||||
- "parsea DAG simple con steps y depends"
|
||||
- "parsea schedule como string y como lista"
|
||||
- "parsea env en formato lista de maps"
|
||||
- "parsea handler_on y handlers como alias"
|
||||
- "parsea continue_on y working_dir a nivel step"
|
||||
- "parsea type graph"
|
||||
test_file_path: "functions/core/dag_parse_test.go"
|
||||
file_path: "functions/core/dag_parse.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
data := []byte(`
|
||||
name: mi-dag
|
||||
schedule: "0 9 * * *"
|
||||
steps:
|
||||
- name: hello
|
||||
command: echo "hello"
|
||||
- name: world
|
||||
command: echo "world"
|
||||
depends: [hello]
|
||||
`)
|
||||
dag, err := DagParse(data)
|
||||
// dag.Name = "mi-dag"
|
||||
// dag.Schedule = ["0 9 * * *"]
|
||||
// dag.Steps[1].Depends = ["hello"]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion pura (el YAML es inmutable, no hay I/O). Internamente usa un struct rawDag para deserializar loosely y luego normaliza campos polimorficos. La estrategia de normalizacion: schedule string->[]string, env lista->map, handlers single-o-lista->[]DagStep. handler_on tiene precedencia sobre handlers si ambos estan presentes.
|
||||
@@ -0,0 +1,213 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDagParse(t *testing.T) {
|
||||
t.Run("parsea DAG simple con steps y depends", func(t *testing.T) {
|
||||
data := []byte(`
|
||||
name: example
|
||||
description: Example workflow
|
||||
steps:
|
||||
- name: hello
|
||||
command: echo "Hello!"
|
||||
- name: list_files
|
||||
command: ls -la /tmp
|
||||
depends:
|
||||
- hello
|
||||
`)
|
||||
dag, err := DagParse(data)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if dag.Name != "example" {
|
||||
t.Errorf("Name: got %q, want %q", dag.Name, "example")
|
||||
}
|
||||
if len(dag.Steps) != 2 {
|
||||
t.Fatalf("Steps: got %d, want 2", len(dag.Steps))
|
||||
}
|
||||
if dag.Steps[0].Name != "hello" {
|
||||
t.Errorf("Steps[0].Name: got %q, want %q", dag.Steps[0].Name, "hello")
|
||||
}
|
||||
if dag.Steps[1].Name != "list_files" {
|
||||
t.Errorf("Steps[1].Name: got %q, want %q", dag.Steps[1].Name, "list_files")
|
||||
}
|
||||
if len(dag.Steps[1].Depends) != 1 || dag.Steps[1].Depends[0] != "hello" {
|
||||
t.Errorf("Steps[1].Depends: got %v, want [hello]", dag.Steps[1].Depends)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("parsea schedule como string y como lista", func(t *testing.T) {
|
||||
// Schedule as string.
|
||||
dataStr := []byte(`
|
||||
name: dag-string-schedule
|
||||
schedule: "0 9 * * 5"
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo ok
|
||||
`)
|
||||
dagStr, err := DagParse(dataStr)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(dagStr.Schedule) != 1 || dagStr.Schedule[0] != "0 9 * * 5" {
|
||||
t.Errorf("Schedule (string): got %v, want [\"0 9 * * 5\"]", dagStr.Schedule)
|
||||
}
|
||||
|
||||
// Schedule as list.
|
||||
dataList := []byte(`
|
||||
name: dag-list-schedule
|
||||
schedule:
|
||||
- "0 9 * * *"
|
||||
- "0 18 * * *"
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo ok
|
||||
`)
|
||||
dagList, err := DagParse(dataList)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(dagList.Schedule) != 2 {
|
||||
t.Fatalf("Schedule (list): got %d items, want 2", len(dagList.Schedule))
|
||||
}
|
||||
if dagList.Schedule[0] != "0 9 * * *" {
|
||||
t.Errorf("Schedule[0]: got %q, want %q", dagList.Schedule[0], "0 9 * * *")
|
||||
}
|
||||
if dagList.Schedule[1] != "0 18 * * *" {
|
||||
t.Errorf("Schedule[1]: got %q, want %q", dagList.Schedule[1], "0 18 * * *")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("parsea env en formato lista de maps", func(t *testing.T) {
|
||||
data := []byte(`
|
||||
name: dag-env
|
||||
env:
|
||||
- PROJECT_DIR: /home/lucas/analysis
|
||||
- PYTHON: /home/lucas/.venv/bin/python
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo ${PROJECT_DIR}
|
||||
`)
|
||||
dag, err := DagParse(data)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if dag.Env["PROJECT_DIR"] != "/home/lucas/analysis" {
|
||||
t.Errorf("Env[PROJECT_DIR]: got %q, want %q", dag.Env["PROJECT_DIR"], "/home/lucas/analysis")
|
||||
}
|
||||
if dag.Env["PYTHON"] != "/home/lucas/.venv/bin/python" {
|
||||
t.Errorf("Env[PYTHON]: got %q, want %q", dag.Env["PYTHON"], "/home/lucas/.venv/bin/python")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("parsea handler_on y handlers como alias", func(t *testing.T) {
|
||||
// handler_on with single step object.
|
||||
dataHandlerOn := []byte(`
|
||||
name: dag-handler-on
|
||||
handler_on:
|
||||
failure:
|
||||
command: echo "FALLO"
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo hello
|
||||
`)
|
||||
dagHO, err := DagParse(dataHandlerOn)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error (handler_on): %v", err)
|
||||
}
|
||||
if len(dagHO.HandlerOn.Failure) != 1 {
|
||||
t.Fatalf("HandlerOn.Failure: got %d steps, want 1", len(dagHO.HandlerOn.Failure))
|
||||
}
|
||||
if dagHO.HandlerOn.Failure[0].Command != `echo "FALLO"` {
|
||||
t.Errorf("HandlerOn.Failure[0].Command: got %q", dagHO.HandlerOn.Failure[0].Command)
|
||||
}
|
||||
|
||||
// handlers alias with list of steps.
|
||||
dataHandlers := []byte(`
|
||||
name: dag-handlers
|
||||
handlers:
|
||||
failure:
|
||||
- name: mark_as_failed
|
||||
command: echo "FAILED"
|
||||
success:
|
||||
- name: notify
|
||||
command: echo "OK"
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo hello
|
||||
`)
|
||||
dagH, err := DagParse(dataHandlers)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error (handlers): %v", err)
|
||||
}
|
||||
if len(dagH.HandlerOn.Failure) != 1 || dagH.HandlerOn.Failure[0].Name != "mark_as_failed" {
|
||||
t.Errorf("HandlerOn.Failure: got %v", dagH.HandlerOn.Failure)
|
||||
}
|
||||
if len(dagH.HandlerOn.Success) != 1 || dagH.HandlerOn.Success[0].Name != "notify" {
|
||||
t.Errorf("HandlerOn.Success: got %v", dagH.HandlerOn.Success)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("parsea continue_on y working_dir a nivel step", func(t *testing.T) {
|
||||
data := []byte(`
|
||||
name: dag-step-options
|
||||
steps:
|
||||
- id: ingest
|
||||
description: Procesar archivos
|
||||
working_dir: /home/lucas/project
|
||||
command: ./bin/ingest
|
||||
continue_on:
|
||||
failure: true
|
||||
- id: informe
|
||||
command: python /tmp/script.py
|
||||
depends: [ingest]
|
||||
`)
|
||||
dag, err := DagParse(data)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(dag.Steps) != 2 {
|
||||
t.Fatalf("Steps: got %d, want 2", len(dag.Steps))
|
||||
}
|
||||
step0 := dag.Steps[0]
|
||||
if step0.ID != "ingest" {
|
||||
t.Errorf("Steps[0].ID: got %q, want %q", step0.ID, "ingest")
|
||||
}
|
||||
if step0.Dir != "/home/lucas/project" {
|
||||
t.Errorf("Steps[0].Dir: got %q, want %q", step0.Dir, "/home/lucas/project")
|
||||
}
|
||||
if !step0.ContinueOn.Failure {
|
||||
t.Errorf("Steps[0].ContinueOn.Failure: got false, want true")
|
||||
}
|
||||
step1 := dag.Steps[1]
|
||||
if len(step1.Depends) != 1 || step1.Depends[0] != "ingest" {
|
||||
t.Errorf("Steps[1].Depends: got %v, want [ingest]", step1.Depends)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("parsea type graph", func(t *testing.T) {
|
||||
data := []byte(`
|
||||
name: dag-graph
|
||||
type: graph
|
||||
tags: [finanzas, semanal]
|
||||
steps:
|
||||
- name: step1
|
||||
command: echo a
|
||||
- name: step2
|
||||
command: echo b
|
||||
depends: [step1]
|
||||
`)
|
||||
dag, err := DagParse(data)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if dag.Type != "graph" {
|
||||
t.Errorf("Type: got %q, want %q", dag.Type, "graph")
|
||||
}
|
||||
if len(dag.Tags) != 2 {
|
||||
t.Errorf("Tags: got %v, want [finanzas semanal]", dag.Tags)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// DagResolveEnv resolves environment variable references in a DagDefinition.
|
||||
// It parses environ (KEY=VALUE pairs from os.Environ()), merges with dag.Env
|
||||
// (dag.Env takes precedence), and substitutes ${VAR} and $VAR in Command,
|
||||
// Script, Dir, Args, and Env values of each step. Also substitutes in
|
||||
// DagDefinition.WorkingDir. Returns a new DagDefinition without mutating the original.
|
||||
func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition {
|
||||
// Parse environ into a base map.
|
||||
base := parseEnviron(environ)
|
||||
|
||||
// Merge dag.Env over base (dag.Env has precedence).
|
||||
dagEnv := mergeMaps(base, dag.Env)
|
||||
|
||||
// Build new DagDefinition (shallow copy, then replace fields).
|
||||
result := dag
|
||||
|
||||
// Substitute in WorkingDir.
|
||||
result.WorkingDir = substitute(dag.WorkingDir, dagEnv)
|
||||
|
||||
// Substitute in Env values of the DAG itself.
|
||||
result.Env = substituteMap(dag.Env, dagEnv)
|
||||
|
||||
// Resolve steps.
|
||||
resolvedSteps := make([]DagStep, len(dag.Steps))
|
||||
for i, step := range dag.Steps {
|
||||
// Merge dag env with step env (step env has precedence).
|
||||
stepEnv := mergeMaps(dagEnv, step.Env)
|
||||
|
||||
rs := step
|
||||
rs.Command = substitute(step.Command, stepEnv)
|
||||
rs.Script = substitute(step.Script, stepEnv)
|
||||
rs.Dir = substitute(step.Dir, stepEnv)
|
||||
|
||||
if len(step.Args) > 0 {
|
||||
args := make([]string, len(step.Args))
|
||||
for j, arg := range step.Args {
|
||||
args[j] = substitute(arg, stepEnv)
|
||||
}
|
||||
rs.Args = args
|
||||
}
|
||||
|
||||
rs.Env = substituteMap(step.Env, stepEnv)
|
||||
resolvedSteps[i] = rs
|
||||
}
|
||||
result.Steps = resolvedSteps
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// parseEnviron parses a slice of "KEY=VALUE" strings into a map.
|
||||
func parseEnviron(environ []string) map[string]string {
|
||||
m := make(map[string]string, len(environ))
|
||||
for _, kv := range environ {
|
||||
idx := strings.IndexByte(kv, '=')
|
||||
if idx < 0 {
|
||||
continue
|
||||
}
|
||||
m[kv[:idx]] = kv[idx+1:]
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// mergeMaps returns a new map with all entries from base, then overlaid with overlay.
|
||||
func mergeMaps(base, overlay map[string]string) map[string]string {
|
||||
result := make(map[string]string, len(base)+len(overlay))
|
||||
for k, v := range base {
|
||||
result[k] = v
|
||||
}
|
||||
for k, v := range overlay {
|
||||
result[k] = v
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// substituteMap applies substitute to all values in a map.
|
||||
func substituteMap(m map[string]string, env map[string]string) map[string]string {
|
||||
if len(m) == 0 {
|
||||
return m
|
||||
}
|
||||
result := make(map[string]string, len(m))
|
||||
for k, v := range m {
|
||||
result[k] = substitute(v, env)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// substitute replaces ${VAR} and $VAR occurrences in s using the env map.
|
||||
func substitute(s string, env map[string]string) string {
|
||||
if s == "" || !strings.Contains(s, "$") {
|
||||
return s
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
i := 0
|
||||
for i < len(s) {
|
||||
if s[i] != '$' {
|
||||
b.WriteByte(s[i])
|
||||
i++
|
||||
continue
|
||||
}
|
||||
|
||||
// Found '$'
|
||||
i++
|
||||
if i >= len(s) {
|
||||
b.WriteByte('$')
|
||||
break
|
||||
}
|
||||
|
||||
if s[i] == '{' {
|
||||
// ${VAR} form.
|
||||
i++
|
||||
end := strings.IndexByte(s[i:], '}')
|
||||
if end < 0 {
|
||||
// No closing brace — write as-is.
|
||||
b.WriteString("${")
|
||||
continue
|
||||
}
|
||||
varName := s[i : i+end]
|
||||
i += end + 1
|
||||
if val, ok := env[varName]; ok {
|
||||
b.WriteString(val)
|
||||
} else {
|
||||
b.WriteString("${")
|
||||
b.WriteString(varName)
|
||||
b.WriteByte('}')
|
||||
}
|
||||
} else if isEnvVarChar(s[i]) {
|
||||
// $VAR form (bare variable name).
|
||||
start := i
|
||||
for i < len(s) && isEnvVarChar(s[i]) {
|
||||
i++
|
||||
}
|
||||
varName := s[start:i]
|
||||
if val, ok := env[varName]; ok {
|
||||
b.WriteString(val)
|
||||
} else {
|
||||
b.WriteByte('$')
|
||||
b.WriteString(varName)
|
||||
}
|
||||
} else {
|
||||
// Not a valid variable start — keep '$' and continue.
|
||||
b.WriteByte('$')
|
||||
}
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// isEnvVarChar returns true for characters valid in environment variable names.
|
||||
func isEnvVarChar(c byte) bool {
|
||||
return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '_'
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
---
|
||||
name: dag_resolve_env
|
||||
kind: function
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "func DagResolveEnv(dag DagDefinition, environ []string) DagDefinition"
|
||||
description: "Resuelve referencias a variables de entorno (${VAR} y $VAR) en un DagDefinition. Mergea environ del sistema con dag.Env (dag.Env tiene precedencia), y sustituye en Command, Script, Dir, Args y Env values de cada step. No muta el DagDefinition original."
|
||||
tags: [dag, env, substitution, variable-expansion, pure]
|
||||
uses_functions: []
|
||||
uses_types: [dag_definition_go_core, dag_step_go_core]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [strings]
|
||||
params:
|
||||
- name: dag
|
||||
desc: "DagDefinition parseado con posibles referencias ${VAR} en sus campos"
|
||||
- name: environ
|
||||
desc: "lista de strings KEY=VALUE del entorno del sistema (tipicamente os.Environ())"
|
||||
output: "nuevo DagDefinition con todas las referencias de variables sustituidas por sus valores"
|
||||
tested: false
|
||||
tests: []
|
||||
test_file_path: ""
|
||||
file_path: "functions/core/dag_resolve_env.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
dag, _ := DagParse(yamlData)
|
||||
resolved := DagResolveEnv(dag, os.Environ())
|
||||
// resolved.Steps[0].Command tiene ${PROJECT_DIR} sustituido
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion pura. No muta el DagDefinition de entrada — retorna una copia con los campos resueltos. Precedencia de variables: environ base < dag.Env < step.Env. Sustituye tanto ${VAR} (con llaves) como $VAR (bare). Si una variable no existe en el entorno, se deja sin sustituir. Los campos sustituidos son: DagDefinition.WorkingDir, Env values del DAG, y por step: Command, Script, Dir, Args, Env values.
|
||||
@@ -0,0 +1,78 @@
|
||||
package core
|
||||
|
||||
import "fmt"
|
||||
|
||||
// DagTopoSort performs a topological sort of DAG steps using Kahn's algorithm.
|
||||
// Returns levels where each level contains steps that can run in parallel.
|
||||
// Returns an error if a dependency cycle is detected.
|
||||
func DagTopoSort(steps []DagStep) ([][]DagStep, error) {
|
||||
if len(steps) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Build index: ref -> DagStep.
|
||||
index := make(map[string]DagStep, len(steps))
|
||||
for _, s := range steps {
|
||||
index[stepRef(s)] = s
|
||||
}
|
||||
|
||||
// Compute in-degree for each step.
|
||||
inDegree := make(map[string]int, len(steps))
|
||||
for _, s := range steps {
|
||||
ref := stepRef(s)
|
||||
if _, ok := inDegree[ref]; !ok {
|
||||
inDegree[ref] = 0
|
||||
}
|
||||
for range s.Depends {
|
||||
inDegree[ref]++ // ref depends on a dep, so ref gets +1
|
||||
}
|
||||
}
|
||||
|
||||
// Build adjacency list: dep -> list of steps that depend on dep.
|
||||
adj := make(map[string][]string, len(steps))
|
||||
for _, s := range steps {
|
||||
ref := stepRef(s)
|
||||
for _, dep := range s.Depends {
|
||||
adj[dep] = append(adj[dep], ref)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize queue with steps that have in-degree 0.
|
||||
var queue []string
|
||||
for _, s := range steps {
|
||||
ref := stepRef(s)
|
||||
if inDegree[ref] == 0 {
|
||||
queue = append(queue, ref)
|
||||
}
|
||||
}
|
||||
|
||||
var levels [][]DagStep
|
||||
processed := 0
|
||||
|
||||
for len(queue) > 0 {
|
||||
// The current queue forms a parallel level.
|
||||
level := make([]DagStep, 0, len(queue))
|
||||
nextQueue := []string{}
|
||||
|
||||
for _, ref := range queue {
|
||||
level = append(level, index[ref])
|
||||
processed++
|
||||
// Reduce in-degree for dependents.
|
||||
for _, dependent := range adj[ref] {
|
||||
inDegree[dependent]--
|
||||
if inDegree[dependent] == 0 {
|
||||
nextQueue = append(nextQueue, dependent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
levels = append(levels, level)
|
||||
queue = nextQueue
|
||||
}
|
||||
|
||||
if processed != len(steps) {
|
||||
return nil, fmt.Errorf("dag_topo_sort: cycle detected (%d of %d steps processed)", processed, len(steps))
|
||||
}
|
||||
|
||||
return levels, nil
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
---
|
||||
name: dag_topo_sort
|
||||
kind: function
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "func DagTopoSort(steps []DagStep) ([][]DagStep, error)"
|
||||
description: "Ordenamiento topologico de steps de un DAG usando el algoritmo de Kahn. Retorna niveles donde cada nivel contiene steps que pueden ejecutarse en paralelo. Detecta ciclos y retorna error si los hay."
|
||||
tags: [dag, topological-sort, graph, kahn, pure]
|
||||
uses_functions: []
|
||||
uses_types: [dag_step_go_core]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [fmt]
|
||||
params:
|
||||
- name: steps
|
||||
desc: "lista de DagStep con sus dependencias definidas en el campo Depends"
|
||||
output: "slice de niveles donde cada nivel es un slice de DagStep que pueden ejecutarse en paralelo; error si hay ciclo"
|
||||
tested: true
|
||||
tests:
|
||||
- "cadena lineal A->B->C produce tres niveles"
|
||||
- "diamond A->B A->C B->D C->D produce cuatro niveles"
|
||||
- "steps paralelos sin depends produce un solo nivel"
|
||||
- "ciclo A->B->A retorna error"
|
||||
test_file_path: "functions/core/dag_topo_sort_test.go"
|
||||
file_path: "functions/core/dag_topo_sort.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
steps := []DagStep{
|
||||
{Name: "a"},
|
||||
{Name: "b", Depends: []string{"a"}},
|
||||
{Name: "c", Depends: []string{"a"}},
|
||||
{Name: "d", Depends: []string{"b", "c"}},
|
||||
}
|
||||
levels, err := DagTopoSort(steps)
|
||||
// levels[0] = [a]
|
||||
// levels[1] = [b, c] (paralelo)
|
||||
// levels[2] = [d]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion pura. Implementa Kahn's algorithm: calcula in-degree inicial, pone en cola los steps con in-degree 0, extrae un nivel completo por iteracion y reduce el in-degree de los dependientes. Si al terminar hay steps sin procesar, hay un ciclo. El orden dentro de cada nivel no esta garantizado — depende del orden del slice de entrada.
|
||||
@@ -0,0 +1,95 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDagTopoSort(t *testing.T) {
|
||||
t.Run("cadena lineal A->B->C produce tres niveles", func(t *testing.T) {
|
||||
steps := []DagStep{
|
||||
{Name: "a"},
|
||||
{Name: "b", Depends: []string{"a"}},
|
||||
{Name: "c", Depends: []string{"b"}},
|
||||
}
|
||||
levels, err := DagTopoSort(steps)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(levels) != 3 {
|
||||
t.Fatalf("levels: got %d, want 3", len(levels))
|
||||
}
|
||||
if len(levels[0]) != 1 || levels[0][0].Name != "a" {
|
||||
t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0]))
|
||||
}
|
||||
if len(levels[1]) != 1 || levels[1][0].Name != "b" {
|
||||
t.Errorf("levels[1]: got %v, want [b]", stepNames(levels[1]))
|
||||
}
|
||||
if len(levels[2]) != 1 || levels[2][0].Name != "c" {
|
||||
t.Errorf("levels[2]: got %v, want [c]", stepNames(levels[2]))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("diamond A->B A->C B->D C->D produce cuatro niveles", func(t *testing.T) {
|
||||
steps := []DagStep{
|
||||
{Name: "a"},
|
||||
{Name: "b", Depends: []string{"a"}},
|
||||
{Name: "c", Depends: []string{"a"}},
|
||||
{Name: "d", Depends: []string{"b", "c"}},
|
||||
}
|
||||
levels, err := DagTopoSort(steps)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
// Expected: level0=[a], level1=[b,c], level2=[d]
|
||||
if len(levels) != 3 {
|
||||
t.Fatalf("levels: got %d, want 3", len(levels))
|
||||
}
|
||||
if len(levels[0]) != 1 || levels[0][0].Name != "a" {
|
||||
t.Errorf("levels[0]: got %v, want [a]", stepNames(levels[0]))
|
||||
}
|
||||
if len(levels[1]) != 2 {
|
||||
t.Errorf("levels[1]: got %v, want 2 steps", stepNames(levels[1]))
|
||||
}
|
||||
if len(levels[2]) != 1 || levels[2][0].Name != "d" {
|
||||
t.Errorf("levels[2]: got %v, want [d]", stepNames(levels[2]))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("steps paralelos sin depends produce un solo nivel", func(t *testing.T) {
|
||||
steps := []DagStep{
|
||||
{Name: "x"},
|
||||
{Name: "y"},
|
||||
{Name: "z"},
|
||||
}
|
||||
levels, err := DagTopoSort(steps)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(levels) != 1 {
|
||||
t.Fatalf("levels: got %d, want 1", len(levels))
|
||||
}
|
||||
if len(levels[0]) != 3 {
|
||||
t.Errorf("levels[0]: got %d steps, want 3", len(levels[0]))
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ciclo A->B->A retorna error", func(t *testing.T) {
|
||||
steps := []DagStep{
|
||||
{Name: "a", Depends: []string{"b"}},
|
||||
{Name: "b", Depends: []string{"a"}},
|
||||
}
|
||||
_, err := DagTopoSort(steps)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for cycle, got nil")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// stepNames extracts step names for readable test output.
|
||||
func stepNames(steps []DagStep) []string {
|
||||
names := make([]string, len(steps))
|
||||
for i, s := range steps {
|
||||
names[i] = stepRef(s)
|
||||
}
|
||||
return names
|
||||
}
|
||||
@@ -0,0 +1,83 @@
|
||||
package core
|
||||
|
||||
import "fmt"
|
||||
|
||||
// DagValidate validates a DagDefinition for structural correctness.
|
||||
// Checks: steps have name/ID, no duplicate names/IDs, all depends reference
|
||||
// existing steps, no dependency cycles. On success, computes topological levels.
|
||||
// Returns warnings for steps with both command and script set.
|
||||
func DagValidate(dag DagDefinition) DagValidationResult {
|
||||
result := DagValidationResult{Valid: true}
|
||||
|
||||
// Build name/ID sets and check for missing identifiers and duplicates.
|
||||
seen := make(map[string]bool)
|
||||
for i, step := range dag.Steps {
|
||||
ref := stepRef(step)
|
||||
if ref == "" {
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("step[%d]: must have name or id", i))
|
||||
result.Valid = false
|
||||
continue
|
||||
}
|
||||
if seen[ref] {
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("step[%d]: duplicate name/id %q", i, ref))
|
||||
result.Valid = false
|
||||
}
|
||||
seen[ref] = true
|
||||
|
||||
// Warning: command and script both set.
|
||||
if step.Command != "" && step.Script != "" {
|
||||
result.Warnings = append(result.Warnings,
|
||||
fmt.Sprintf("step %q: has both command and script", ref))
|
||||
}
|
||||
}
|
||||
|
||||
if !result.Valid {
|
||||
return result
|
||||
}
|
||||
|
||||
// Check that all depends reference existing steps.
|
||||
for _, step := range dag.Steps {
|
||||
for _, dep := range step.Depends {
|
||||
if !seen[dep] {
|
||||
result.Errors = append(result.Errors,
|
||||
fmt.Sprintf("step %q: depends on unknown step %q", stepRef(step), dep))
|
||||
result.Valid = false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !result.Valid {
|
||||
return result
|
||||
}
|
||||
|
||||
// Topological sort with Kahn's — detects cycles and computes levels.
|
||||
levels, err := DagTopoSort(dag.Steps)
|
||||
if err != nil {
|
||||
result.Errors = append(result.Errors, fmt.Sprintf("cycle detected: %v", err))
|
||||
result.Valid = false
|
||||
return result
|
||||
}
|
||||
|
||||
// Convert [][]DagStep to [][]string for the result.
|
||||
strLevels := make([][]string, len(levels))
|
||||
for i, level := range levels {
|
||||
refs := make([]string, len(level))
|
||||
for j, s := range level {
|
||||
refs[j] = stepRef(s)
|
||||
}
|
||||
strLevels[i] = refs
|
||||
}
|
||||
result.Levels = strLevels
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// stepRef returns the canonical reference for a step (ID preferred, then Name).
|
||||
func stepRef(s DagStep) string {
|
||||
if s.ID != "" {
|
||||
return s.ID
|
||||
}
|
||||
return s.Name
|
||||
}
|
||||
@@ -0,0 +1,46 @@
|
||||
---
|
||||
name: dag_validate
|
||||
kind: function
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
purity: pure
|
||||
signature: "func DagValidate(dag DagDefinition) DagValidationResult"
|
||||
description: "Valida un DagDefinition para correcto uso estructural. Verifica que cada step tenga nombre o ID, que no haya duplicados, que todos los depends referencien steps existentes y que no haya ciclos (algoritmo de Kahn). Si el DAG es valido, calcula los niveles topologicos."
|
||||
tags: [dag, validation, workflow, graph, pure]
|
||||
uses_functions: [dag_topo_sort_go_core]
|
||||
uses_types: [dag_definition_go_core, dag_validation_result_go_core]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: ""
|
||||
imports: [fmt]
|
||||
params:
|
||||
- name: dag
|
||||
desc: "DagDefinition parseado y a validar"
|
||||
output: "DagValidationResult con Valid=true si no hay errores, Errors con los problemas encontrados, Warnings con avisos no fatales, y Levels con los niveles topologicos si el DAG es valido"
|
||||
tested: true
|
||||
tests:
|
||||
- "DAG valido retorna Valid true y levels calculados"
|
||||
- "step sin nombre retorna error"
|
||||
- "depends a step inexistente retorna error"
|
||||
- "ciclo en dependencias retorna error"
|
||||
test_file_path: "functions/core/dag_validate_test.go"
|
||||
file_path: "functions/core/dag_validate.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
dag, _ := DagParse(yamlData)
|
||||
res := DagValidate(dag)
|
||||
if !res.Valid {
|
||||
for _, e := range res.Errors {
|
||||
fmt.Println("ERROR:", e)
|
||||
}
|
||||
}
|
||||
// res.Levels = [["step-a"], ["step-b", "step-c"], ["step-d"]]
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion pura. No modifica el DagDefinition de entrada. El calculo de niveles topologicos se delega a DagTopoSort para mantener la separacion de responsabilidades. Un warning (command+script simultaneos) no invalida el DAG.
|
||||
@@ -0,0 +1,113 @@
|
||||
package core
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDagValidate(t *testing.T) {
|
||||
t.Run("DAG valido retorna Valid true y levels calculados", func(t *testing.T) {
|
||||
dag := DagDefinition{
|
||||
Name: "valid-dag",
|
||||
Steps: []DagStep{
|
||||
{Name: "a", Command: "echo a"},
|
||||
{Name: "b", Command: "echo b", Depends: []string{"a"}},
|
||||
{Name: "c", Command: "echo c", Depends: []string{"a"}},
|
||||
{Name: "d", Command: "echo d", Depends: []string{"b", "c"}},
|
||||
},
|
||||
}
|
||||
res := DagValidate(dag)
|
||||
if !res.Valid {
|
||||
t.Errorf("Valid: got false, want true. Errors: %v", res.Errors)
|
||||
}
|
||||
if len(res.Errors) != 0 {
|
||||
t.Errorf("Errors: got %v, want empty", res.Errors)
|
||||
}
|
||||
// Should have 3 levels: [a], [b,c], [d]
|
||||
if len(res.Levels) != 3 {
|
||||
t.Errorf("Levels: got %d, want 3. Levels: %v", len(res.Levels), res.Levels)
|
||||
}
|
||||
if len(res.Levels[0]) != 1 || res.Levels[0][0] != "a" {
|
||||
t.Errorf("Levels[0]: got %v, want [a]", res.Levels[0])
|
||||
}
|
||||
if len(res.Levels[2]) != 1 || res.Levels[2][0] != "d" {
|
||||
t.Errorf("Levels[2]: got %v, want [d]", res.Levels[2])
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("step sin nombre retorna error", func(t *testing.T) {
|
||||
dag := DagDefinition{
|
||||
Name: "bad-dag",
|
||||
Steps: []DagStep{
|
||||
{Command: "echo hello"}, // no name, no ID
|
||||
},
|
||||
}
|
||||
res := DagValidate(dag)
|
||||
if res.Valid {
|
||||
t.Error("Valid: got true, want false")
|
||||
}
|
||||
if len(res.Errors) == 0 {
|
||||
t.Error("Errors: got empty, want at least one error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("depends a step inexistente retorna error", func(t *testing.T) {
|
||||
dag := DagDefinition{
|
||||
Name: "bad-deps-dag",
|
||||
Steps: []DagStep{
|
||||
{Name: "step1", Command: "echo ok"},
|
||||
{Name: "step2", Command: "echo ok", Depends: []string{"nonexistent"}},
|
||||
},
|
||||
}
|
||||
res := DagValidate(dag)
|
||||
if res.Valid {
|
||||
t.Error("Valid: got true, want false")
|
||||
}
|
||||
found := false
|
||||
for _, e := range res.Errors {
|
||||
if containsStr(e, "nonexistent") {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Errors should mention 'nonexistent', got: %v", res.Errors)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ciclo en dependencias retorna error", func(t *testing.T) {
|
||||
dag := DagDefinition{
|
||||
Name: "cyclic-dag",
|
||||
Steps: []DagStep{
|
||||
{Name: "a", Command: "echo a", Depends: []string{"b"}},
|
||||
{Name: "b", Command: "echo b", Depends: []string{"a"}},
|
||||
},
|
||||
}
|
||||
res := DagValidate(dag)
|
||||
if res.Valid {
|
||||
t.Error("Valid: got true, want false")
|
||||
}
|
||||
found := false
|
||||
for _, e := range res.Errors {
|
||||
if containsStr(e, "cycle") {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("Errors should mention 'cycle', got: %v", res.Errors)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// containsStr returns true if s contains substr.
|
||||
func containsStr(s, substr string) bool {
|
||||
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||
|
||||
func() bool {
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
if s[i:i+len(substr)] == substr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}())
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package infra
|
||||
|
||||
import "time"
|
||||
|
||||
// DagRun represents one execution of a DAG workflow.
|
||||
type DagRun struct {
|
||||
ID string
|
||||
DagName string
|
||||
DagPath string
|
||||
Status string // pending, running, success, failed, cancelled
|
||||
StartedAt time.Time
|
||||
FinishedAt time.Time
|
||||
Trigger string // manual, cron, api
|
||||
Error string
|
||||
}
|
||||
|
||||
// DagStepResult represents the outcome of one step within a DagRun.
|
||||
type DagStepResult struct {
|
||||
ID string
|
||||
RunID string
|
||||
StepName string
|
||||
Status string // pending, running, success, failed, skipped
|
||||
ExitCode int
|
||||
Stdout string
|
||||
Stderr string
|
||||
StartedAt time.Time
|
||||
FinishedAt time.Time
|
||||
DurationMs int64
|
||||
Error string
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os/exec"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProcessHandle represents a running subprocess with output buffers.
|
||||
type ProcessHandle struct {
|
||||
Cmd *exec.Cmd
|
||||
Pid int
|
||||
StartTime time.Time
|
||||
Dir string
|
||||
stdout *bytes.Buffer
|
||||
stderr *bytes.Buffer
|
||||
}
|
||||
|
||||
// ProcessResult contains the outcome of a completed subprocess.
|
||||
type ProcessResult struct {
|
||||
ExitCode int
|
||||
Stdout string
|
||||
Stderr string
|
||||
DurationMs int64
|
||||
Killed bool
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProcessKill sends SIGTERM to the process group of handle, then waits up to
|
||||
// graceSec seconds for the process to exit. If it is still alive after the
|
||||
// grace period, SIGKILL is sent. Returns an error only if the signal could not
|
||||
// be delivered (e.g. the process group does not exist).
|
||||
func ProcessKill(handle *ProcessHandle, graceSec int) error {
|
||||
// Send SIGTERM to the process group (negative pid targets the group).
|
||||
if err := syscall.Kill(-handle.Pid, syscall.SIGTERM); err != nil {
|
||||
// ESRCH means the process is already gone — not an error from our view.
|
||||
if err != syscall.ESRCH {
|
||||
return fmt.Errorf("process_kill: sigterm: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Poll until the process exits or the grace period expires.
|
||||
deadline := time.Now().Add(time.Duration(graceSec) * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
// Check if process has exited by sending signal 0 (no-op).
|
||||
err := syscall.Kill(-handle.Pid, 0)
|
||||
if err == syscall.ESRCH {
|
||||
// Process group is gone.
|
||||
return nil
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Still alive after grace period — escalate to SIGKILL.
|
||||
if err := syscall.Kill(-handle.Pid, syscall.SIGKILL); err != nil {
|
||||
if err != syscall.ESRCH {
|
||||
return fmt.Errorf("process_kill: sigkill: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
---
|
||||
name: process_kill
|
||||
kind: function
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "func ProcessKill(handle *ProcessHandle, graceSec int) error"
|
||||
description: "Termina un subproceso enviando SIGTERM al process group. Espera hasta graceSec segundos a que el proceso muera voluntariamente. Si sigue vivo, envia SIGKILL. Retorna error solo si la senal no pudo entregarse."
|
||||
tags: [process, subprocess, kill, signal, sigterm, sigkill, infra]
|
||||
uses_functions: []
|
||||
uses_types: [process_handle_go_infra]
|
||||
returns: []
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: [fmt, syscall, time]
|
||||
params:
|
||||
- name: handle
|
||||
desc: "handle del proceso lanzado por ProcessSpawn"
|
||||
- name: graceSec
|
||||
desc: "segundos de gracia entre SIGTERM y SIGKILL; 0 envia SIGKILL inmediatamente"
|
||||
output: "nil si el proceso fue terminado correctamente; error si la senal no pudo entregarse"
|
||||
tested: true
|
||||
tests:
|
||||
- "kill process"
|
||||
test_file_path: "functions/infra/process_spawn_test.go"
|
||||
file_path: "functions/infra/process_kill.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
h, err := ProcessSpawn("sleep 300", "", nil, "")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// Dar 3 segundos de gracia antes de SIGKILL
|
||||
if err := ProcessKill(h, 3); err != nil {
|
||||
log.Printf("kill failed: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion impura: envia senales al sistema operativo. Usa -handle.Pid (negativo) para direccionar el process group completo, matando tanto al proceso principal como a sus hijos. ESRCH se ignora porque significa que el proceso ya murio, lo cual es el objetivo deseado. Comprueba si el proceso sigue vivo con signal 0 (kill -0) cada 100ms durante el grace period.
|
||||
@@ -0,0 +1,74 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProcessSpawn launches a subprocess using the given shell.
|
||||
// If shell is empty, "sh" is used. If command contains newlines it is treated
|
||||
// as a multi-line script: the content is written to a temp file and executed
|
||||
// with `shell <tempfile>`. Otherwise it is executed with `shell -c <command>`.
|
||||
// dir sets the working directory (empty = inherit). env sets the environment
|
||||
// (nil = inherit parent env). The process group is created with Setpgid so
|
||||
// that ProcessKill can target the whole group.
|
||||
func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error) {
|
||||
if shell == "" {
|
||||
shell = "sh"
|
||||
}
|
||||
|
||||
var cmd *exec.Cmd
|
||||
|
||||
if strings.Contains(command, "\n") {
|
||||
// Multi-line script: write to a temp file and execute it.
|
||||
tmp, err := os.CreateTemp("", "fn-proc-*.sh")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("process_spawn: create temp file: %w", err)
|
||||
}
|
||||
if _, err := tmp.WriteString(command); err != nil {
|
||||
_ = os.Remove(tmp.Name())
|
||||
return nil, fmt.Errorf("process_spawn: write temp file: %w", err)
|
||||
}
|
||||
if err := tmp.Close(); err != nil {
|
||||
_ = os.Remove(tmp.Name())
|
||||
return nil, fmt.Errorf("process_spawn: close temp file: %w", err)
|
||||
}
|
||||
cmd = exec.Command(shell, tmp.Name())
|
||||
} else {
|
||||
cmd = exec.Command(shell, "-c", command)
|
||||
}
|
||||
|
||||
if dir != "" {
|
||||
cmd.Dir = dir
|
||||
}
|
||||
if len(env) > 0 {
|
||||
cmd.Env = env
|
||||
}
|
||||
|
||||
// New process group so we can kill all children as a group.
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
||||
|
||||
// Use buffers instead of pipes to avoid race between Wait() and ReadAll().
|
||||
var stdoutBuf, stderrBuf bytes.Buffer
|
||||
cmd.Stdout = &stdoutBuf
|
||||
cmd.Stderr = &stderrBuf
|
||||
|
||||
start := time.Now()
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("process_spawn: start: %w", err)
|
||||
}
|
||||
|
||||
return &ProcessHandle{
|
||||
Cmd: cmd,
|
||||
Pid: cmd.Process.Pid,
|
||||
StartTime: start,
|
||||
Dir: dir,
|
||||
stdout: &stdoutBuf,
|
||||
stderr: &stderrBuf,
|
||||
}, nil
|
||||
}
|
||||
@@ -0,0 +1,52 @@
|
||||
---
|
||||
name: process_spawn
|
||||
kind: function
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "func ProcessSpawn(command string, dir string, env []string, shell string) (*ProcessHandle, error)"
|
||||
description: "Lanza un subproceso usando el shell indicado. Si shell esta vacio usa 'sh'. Comandos con newlines se tratan como scripts multilinea (se escriben a un archivo temporal). Configura un process group propio (Setpgid) para poder matar todos los hijos con ProcessKill. Captura stdout y stderr via pipes."
|
||||
tags: [process, subprocess, spawn, exec, shell, infra]
|
||||
uses_functions: []
|
||||
uses_types: [process_handle_go_infra]
|
||||
returns: [process_handle_go_infra]
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: [fmt, os, os/exec, strings, syscall, time]
|
||||
params:
|
||||
- name: command
|
||||
desc: "comando shell a ejecutar; si contiene newlines se trata como script multilinea"
|
||||
- name: dir
|
||||
desc: "directorio de trabajo del proceso hijo; vacio hereda el del proceso padre"
|
||||
- name: env
|
||||
desc: "variables de entorno en formato KEY=VALUE; nil hereda el entorno del proceso padre"
|
||||
- name: shell
|
||||
desc: "interprete shell a usar (sh, bash, zsh); vacio usa 'sh'"
|
||||
output: "handle del proceso lanzado con Cmd, Pid, StartTime, Dir y los pipes de I/O"
|
||||
tested: true
|
||||
tests:
|
||||
- "spawn and wait echo"
|
||||
- "spawn with timeout kills"
|
||||
- "spawn with env"
|
||||
- "spawn script"
|
||||
- "spawn with working dir"
|
||||
- "kill process"
|
||||
test_file_path: "functions/infra/process_spawn_test.go"
|
||||
file_path: "functions/infra/process_spawn.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
h, err := ProcessSpawn("echo hello", "", nil, "")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
res, err := ProcessWait(h, 10)
|
||||
fmt.Println(res.Stdout) // "hello\n"
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion impura: hace I/O (crea archivo temporal para scripts, lanza proceso). El process group (Setpgid=true) permite a ProcessKill enviar senales al grupo completo con -Pid, afectando a todos los hijos del proceso lanzado. Para scripts multilinea el archivo temporal queda en el directorio temporal del OS y no se limpia automaticamente.
|
||||
@@ -0,0 +1,107 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestProcessSpawn(t *testing.T) {
|
||||
t.Run("spawn and wait echo", func(t *testing.T) {
|
||||
h, err := ProcessSpawn("echo hello", "", nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
res, err := ProcessWait(h, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("wait: %v", err)
|
||||
}
|
||||
if res.ExitCode != 0 {
|
||||
t.Errorf("exit code: got %d, want 0", res.ExitCode)
|
||||
}
|
||||
if !strings.Contains(res.Stdout, "hello") {
|
||||
t.Errorf("stdout: got %q, want it to contain 'hello'", res.Stdout)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("spawn with timeout kills", func(t *testing.T) {
|
||||
h, err := ProcessSpawn("sleep 60", "", nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
res, err := ProcessWait(h, 2)
|
||||
if err != nil {
|
||||
t.Fatalf("wait: %v", err)
|
||||
}
|
||||
if !res.Killed {
|
||||
t.Errorf("killed: got false, want true")
|
||||
}
|
||||
if res.ExitCode == 0 {
|
||||
t.Errorf("exit code: got 0, want != 0 after kill")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("spawn with env", func(t *testing.T) {
|
||||
h, err := ProcessSpawn("echo $TEST_VAR", "", []string{"PATH=/usr/bin:/bin", "TEST_VAR=hello123"}, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
res, err := ProcessWait(h, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("wait: %v", err)
|
||||
}
|
||||
if !strings.Contains(res.Stdout, "hello123") {
|
||||
t.Errorf("stdout: got %q, want it to contain 'hello123'", res.Stdout)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("spawn script", func(t *testing.T) {
|
||||
script := "#!/bin/sh\necho line1\necho line2"
|
||||
h, err := ProcessSpawn(script, "", nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
res, err := ProcessWait(h, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("wait: %v", err)
|
||||
}
|
||||
if !strings.Contains(res.Stdout, "line1") {
|
||||
t.Errorf("stdout: got %q, want it to contain 'line1'", res.Stdout)
|
||||
}
|
||||
if !strings.Contains(res.Stdout, "line2") {
|
||||
t.Errorf("stdout: got %q, want it to contain 'line2'", res.Stdout)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("spawn with working dir", func(t *testing.T) {
|
||||
h, err := ProcessSpawn("pwd", "/tmp", nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
res, err := ProcessWait(h, 10)
|
||||
if err != nil {
|
||||
t.Fatalf("wait: %v", err)
|
||||
}
|
||||
if !strings.Contains(res.Stdout, "/tmp") {
|
||||
t.Errorf("stdout: got %q, want it to contain '/tmp'", res.Stdout)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("kill process", func(t *testing.T) {
|
||||
h, err := ProcessSpawn("sleep 60", "", nil, "")
|
||||
if err != nil {
|
||||
t.Fatalf("spawn: %v", err)
|
||||
}
|
||||
if err := ProcessKill(h, 1); err != nil {
|
||||
t.Fatalf("kill: %v", err)
|
||||
}
|
||||
// After kill, Wait should unblock quickly.
|
||||
_ = h.Cmd.Wait()
|
||||
state := h.Cmd.ProcessState
|
||||
if state == nil {
|
||||
t.Fatal("process state is nil after kill+wait")
|
||||
}
|
||||
if state.ExitCode() == 0 {
|
||||
t.Errorf("exit code: got 0 after kill, want non-zero")
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package infra
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProcessWait waits for a subprocess to finish and collects its output.
|
||||
// If timeoutSec > 0 and the process has not exited by then, ProcessKill is
|
||||
// called with graceSec=5 and the result is marked Killed=true.
|
||||
func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error) {
|
||||
// Wait for the process in a goroutine.
|
||||
waitCh := make(chan error, 1)
|
||||
go func() {
|
||||
waitCh <- handle.Cmd.Wait()
|
||||
}()
|
||||
|
||||
killed := false
|
||||
|
||||
if timeoutSec > 0 {
|
||||
timer := time.NewTimer(time.Duration(timeoutSec) * time.Second)
|
||||
defer timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
// Timeout exceeded — kill the process group.
|
||||
_ = ProcessKill(handle, 5)
|
||||
killed = true
|
||||
<-waitCh
|
||||
case <-waitCh:
|
||||
}
|
||||
} else {
|
||||
<-waitCh
|
||||
}
|
||||
|
||||
// After Wait() returns, buffers are safe to read.
|
||||
exitCode := 0
|
||||
if handle.Cmd.ProcessState != nil {
|
||||
exitCode = handle.Cmd.ProcessState.ExitCode()
|
||||
} else if killed {
|
||||
exitCode = -1
|
||||
}
|
||||
|
||||
duration := time.Since(handle.StartTime)
|
||||
|
||||
return ProcessResult{
|
||||
ExitCode: exitCode,
|
||||
Stdout: handle.stdout.String(),
|
||||
Stderr: handle.stderr.String(),
|
||||
DurationMs: duration.Milliseconds(),
|
||||
Killed: killed,
|
||||
}, nil
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
---
|
||||
name: process_wait
|
||||
kind: function
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
purity: impure
|
||||
signature: "func ProcessWait(handle *ProcessHandle, timeoutSec int) (ProcessResult, error)"
|
||||
description: "Espera a que un subproceso termine y recopila su salida. Lee stdout y stderr completos en goroutines para evitar deadlocks en pipes. Si timeoutSec > 0 y el proceso no termina en ese tiempo, llama a ProcessKill y marca el resultado con Killed=true. Retorna el exit code, salida completa y duracion total."
|
||||
tags: [process, subprocess, wait, timeout, exec, infra]
|
||||
uses_functions: [process_kill_go_infra]
|
||||
uses_types: [process_handle_go_infra, process_result_go_infra]
|
||||
returns: [process_result_go_infra]
|
||||
returns_optional: false
|
||||
error_type: "error_go_core"
|
||||
imports: [fmt, io, time]
|
||||
params:
|
||||
- name: handle
|
||||
desc: "handle del proceso lanzado por ProcessSpawn"
|
||||
- name: timeoutSec
|
||||
desc: "segundos maximos de espera; 0 o negativo espera indefinidamente"
|
||||
output: "resultado con exit code, stdout, stderr, duracion en ms y flag de killed"
|
||||
tested: true
|
||||
tests:
|
||||
- "spawn and wait echo"
|
||||
- "spawn with timeout kills"
|
||||
- "spawn with env"
|
||||
- "spawn script"
|
||||
- "spawn with working dir"
|
||||
test_file_path: "functions/infra/process_spawn_test.go"
|
||||
file_path: "functions/infra/process_wait.go"
|
||||
---
|
||||
|
||||
## Ejemplo
|
||||
|
||||
```go
|
||||
h, err := ProcessSpawn("sleep 60", "", nil, "")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
res, err := ProcessWait(h, 5) // timeout de 5 segundos
|
||||
if res.Killed {
|
||||
fmt.Println("proceso terminado por timeout")
|
||||
}
|
||||
```
|
||||
|
||||
## Notas
|
||||
|
||||
Funcion impura: bloquea esperando I/O y posiblemente llama a ProcessKill. Lee stdout y stderr en goroutines separadas antes de llamar a cmd.Wait() para evitar el deadlock clasico donde cmd.Wait() bloquea porque los pipes estan llenos y nadie los lee. El exit code -1 indica que ProcessState no estaba disponible (proceso matado antes de registrar estado).
|
||||
@@ -234,3 +234,172 @@ repos:
|
||||
- id: wails_bind_crud_go_infra
|
||||
source_file: ""
|
||||
date: 2026-04-01
|
||||
|
||||
- repo: https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/egutierrez/DevLauncher.git
|
||||
license: MIT
|
||||
cloned_dir: DevLauncher
|
||||
extracted:
|
||||
# Phase 1: Go Pure — TUI (5)
|
||||
- id: apply_gradient_go_tui
|
||||
source_file: launcher/core/gradient.go
|
||||
date: 2026-04-08
|
||||
- id: strip_ansi_go_tui
|
||||
source_file: launcher/core/commands.go
|
||||
date: 2026-04-08
|
||||
- id: normalize_terminal_output_go_tui
|
||||
source_file: launcher/core/commands.go
|
||||
date: 2026-04-08
|
||||
- id: draw_box_go_tui
|
||||
source_file: launcher/ui/styles.go
|
||||
date: 2026-04-08
|
||||
- id: draw_separator_go_tui
|
||||
source_file: launcher/ui/styles.go
|
||||
date: 2026-04-08
|
||||
# Phase 2: Go Pure — Core (5)
|
||||
- id: longest_common_prefix_go_core
|
||||
source_file: launcher/core/commands.go
|
||||
date: 2026-04-08
|
||||
- id: split_command_and_arg_go_core
|
||||
source_file: launcher/core/commands.go
|
||||
date: 2026-04-08
|
||||
- id: compare_versions_go_core
|
||||
source_file: installer/core/version.go
|
||||
date: 2026-04-08
|
||||
- id: parse_version_go_core
|
||||
source_file: installer/core/version.go
|
||||
date: 2026-04-08
|
||||
- id: rel_or_full_go_core
|
||||
source_file: launcher/core/script_query.go
|
||||
date: 2026-04-08
|
||||
# Phase 3: Go Impure (3)
|
||||
- id: load_ascii_art_go_tui
|
||||
source_file: launcher/middleware/assets.go
|
||||
date: 2026-04-08
|
||||
- id: read_dir_autocomplete_go_tui
|
||||
source_file: launcher/middleware/command_fs.go
|
||||
date: 2026-04-08
|
||||
- id: extract_script_description_go_shell
|
||||
source_file: launcher/middleware/reader.go
|
||||
date: 2026-04-08
|
||||
# Phase 4: Bash Library (6)
|
||||
- id: bash_colors_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
- id: bash_log_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
- id: bash_check_deps_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
- id: bash_confirm_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
- id: bash_safe_run_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
- id: bash_handle_error_bash_shell
|
||||
source_file: scripts/lib/common.sh
|
||||
date: 2026-04-08
|
||||
# Phase 5: Bash Cybersecurity (12)
|
||||
- id: analyze_dns_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/redes/analisis_dns.sh
|
||||
date: 2026-04-08
|
||||
- id: list_active_connections_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/redes/conexiones_activas.sh
|
||||
date: 2026-04-08
|
||||
- id: geolocate_ip_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/redes/geoip.sh
|
||||
date: 2026-04-08
|
||||
- id: audit_ssh_config_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/sistema/auditar_ssh.sh
|
||||
date: 2026-04-08
|
||||
- id: check_firewall_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/sistema/firewall_status.sh
|
||||
date: 2026-04-08
|
||||
- id: detect_suspicious_users_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/sistema/usuarios_sospechosos.sh
|
||||
date: 2026-04-08
|
||||
- id: encrypt_file_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/utilidades/cifrar_archivo.sh
|
||||
date: 2026-04-08
|
||||
- id: generate_password_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/utilidades/generar_password.sh
|
||||
date: 2026-04-08
|
||||
- id: verify_file_hash_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/utilidades/verificar_hash.sh
|
||||
date: 2026-04-08
|
||||
- id: audit_http_headers_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/web/cabeceras_http.sh
|
||||
date: 2026-04-08
|
||||
- id: inspect_ssl_cert_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/web/ssl_cert_info.sh
|
||||
date: 2026-04-08
|
||||
- id: enumerate_subdomains_bash_cybersecurity
|
||||
source_file: scripts/linux/ciberseguridad/web/subdominios.sh
|
||||
date: 2026-04-08
|
||||
# Phase 6: Git Utils (4)
|
||||
- id: git_repo_status_bash_shell
|
||||
source_file: scripts/linux/git_utils/estado_repo.sh
|
||||
date: 2026-04-08
|
||||
- id: git_clean_branches_bash_shell
|
||||
source_file: scripts/linux/git_utils/limpiar_ramas.sh
|
||||
date: 2026-04-08
|
||||
- id: git_push_all_remotes_bash_shell
|
||||
source_file: scripts/linux/git_utils/push_todos_remotes.sh
|
||||
date: 2026-04-08
|
||||
- id: git_log_visual_bash_shell
|
||||
source_file: scripts/linux/git_utils/historial_commits.sh
|
||||
date: 2026-04-08
|
||||
# Phase 7: Infra — System (3)
|
||||
- id: analyze_disk_space_bash_infra
|
||||
source_file: scripts/linux/gestion_linux/espacio_disponible.sh
|
||||
date: 2026-04-08
|
||||
- id: list_listening_ports_bash_infra
|
||||
source_file: scripts/linux/gestion_linux/puertos_activos.sh
|
||||
date: 2026-04-08
|
||||
- id: detect_wsl_bash_infra
|
||||
source_file: scripts/linux/gestion_linux/wsl_host.sh
|
||||
date: 2026-04-08
|
||||
# Phase 7: Infra — Installers (7)
|
||||
- id: install_go_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_go.sh
|
||||
date: 2026-04-08
|
||||
- id: install_nodejs_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_nodejs.sh
|
||||
date: 2026-04-08
|
||||
- id: install_pnpm_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_pnpm.sh
|
||||
date: 2026-04-08
|
||||
- id: install_python312_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_python312.sh
|
||||
date: 2026-04-08
|
||||
- id: install_uv_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_uv.sh
|
||||
date: 2026-04-08
|
||||
- id: install_volta_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_volta.sh
|
||||
date: 2026-04-08
|
||||
- id: install_wails_bash_infra
|
||||
source_file: scripts/linux/instaladores/instalar_wails.sh
|
||||
date: 2026-04-08
|
||||
# Phase 8: Shell Utils + Init (4)
|
||||
- id: convert_text_case_bash_shell
|
||||
source_file: scripts/linux/conversores/conversor_texto.sh
|
||||
date: 2026-04-08
|
||||
- id: create_project_structure_bash_shell
|
||||
source_file: scripts/linux/inicializar_repos/functional_structure.sh
|
||||
date: 2026-04-08
|
||||
- id: init_go_module_bash_pipelines
|
||||
source_file: scripts/linux/inicializar_repos/go/init_go_module.sh
|
||||
date: 2026-04-08
|
||||
- id: init_go_project_bash_pipelines
|
||||
source_file: scripts/linux/inicializar_repos/go/init_go_proyect.sh
|
||||
date: 2026-04-08
|
||||
|
||||
- repo: https://github.com/daguflow/dagu
|
||||
license: GPL-3.0
|
||||
cloned_dir: dagu
|
||||
analyzed: true
|
||||
extracted: []
|
||||
# GPL-3.0: no code extracted. YAML format studied for compatibility in issue 0007 (dag_engine).
|
||||
# Our implementation is written from scratch with no dagu code copied.
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
---
|
||||
name: dag_continue_on
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagContinueOn struct {
|
||||
Failure bool
|
||||
Skipped bool
|
||||
}
|
||||
description: "Politica de continuacion de un step DAG ante fallos o saltos. Cuando Failure=true el DAG no se detiene si el step falla. Cuando Skipped=true tampoco se detiene si el step es saltado."
|
||||
tags: [dag, workflow, policy, error-handling]
|
||||
uses_types: []
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Embebido en DagStep. Corresponde al campo `continue_on` del YAML de Dagu.
|
||||
@@ -0,0 +1,31 @@
|
||||
---
|
||||
name: dag_definition
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagDefinition struct {
|
||||
Name string
|
||||
Description string
|
||||
Group string
|
||||
Type string
|
||||
WorkingDir string
|
||||
Shell string
|
||||
Env map[string]string
|
||||
Schedule []string
|
||||
Steps []DagStep
|
||||
HandlerOn DagHandlers
|
||||
Tags []string
|
||||
TimeoutSec int
|
||||
FilePath string
|
||||
}
|
||||
description: "Definicion completa de un workflow DAG parseada desde YAML compatible con Dagu. Contiene steps, handlers de ciclo de vida, variables de entorno, schedule y metadatos del flujo."
|
||||
tags: [dag, workflow, yaml, dagu, definition]
|
||||
uses_types: [dag_step_go_core, dag_handlers_go_core]
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Type puede ser "graph" (ejecucion paralela por niveles topologicos) o vacio (cadena secuencial). Schedule es siempre una lista de strings (normalizado desde string o lista en el YAML). FilePath se rellena opcionalmente por el caller para saber el origen del archivo.
|
||||
@@ -0,0 +1,22 @@
|
||||
---
|
||||
name: dag_handlers
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagHandlers struct {
|
||||
Init []DagStep
|
||||
Success []DagStep
|
||||
Failure []DagStep
|
||||
Exit []DagStep
|
||||
}
|
||||
description: "Handlers de ciclo de vida de un DAG. Cada campo contiene steps que se ejecutan en el evento correspondiente: inicializacion, exito, fallo o salida (siempre). Corresponde a handler_on o handlers en el YAML de Dagu."
|
||||
tags: [dag, workflow, handlers, lifecycle]
|
||||
uses_types: [dag_step_go_core]
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Embebido en DagDefinition. Los campos handler_on y handlers son aliases en el YAML de Dagu — ambos se normalizan a este tipo. Cada handler puede ser un step unico o una lista de steps en el YAML.
|
||||
@@ -0,0 +1,20 @@
|
||||
---
|
||||
name: dag_retry_policy
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagRetryPolicy struct {
|
||||
Limit int
|
||||
IntervalSec int
|
||||
}
|
||||
description: "Politica de reintentos automaticos para un step DAG. Limit es el numero maximo de reintentos e IntervalSec es el tiempo de espera entre intentos en segundos."
|
||||
tags: [dag, workflow, retry, policy]
|
||||
uses_types: []
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Embebido en DagStep. Corresponde al campo `retry_policy` del YAML de Dagu. Limit=0 significa sin reintentos.
|
||||
@@ -0,0 +1,33 @@
|
||||
---
|
||||
name: dag_step
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagStep struct {
|
||||
Name string
|
||||
ID string
|
||||
Description string
|
||||
Command string
|
||||
Script string
|
||||
Args []string
|
||||
Shell string
|
||||
Dir string
|
||||
Depends []string
|
||||
Env map[string]string
|
||||
ContinueOn DagContinueOn
|
||||
RetryPolicy DagRetryPolicy
|
||||
TimeoutSec int
|
||||
Output string
|
||||
Tags []string
|
||||
}
|
||||
description: "Un paso individual en un workflow DAG con command/script, dependencias y configuracion de reintentos. Soporta variables de entorno por step, directorio de trabajo propio y politica continue_on."
|
||||
tags: [dag, workflow, step, yaml, dagu]
|
||||
uses_types: [dag_continue_on_go_core, dag_retry_policy_go_core]
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. El campo ID es opcional y se usa como referencia en otros steps (ej: `${id.stdout}`). Si Name e ID estan ambos vacios, el step es invalido. Env del step se mergea sobre el Env del DAG padre durante la resolucion.
|
||||
@@ -0,0 +1,22 @@
|
||||
---
|
||||
name: dag_validation_result
|
||||
lang: go
|
||||
domain: core
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagValidationResult struct {
|
||||
Valid bool
|
||||
Errors []string
|
||||
Warnings []string
|
||||
Levels [][]string
|
||||
}
|
||||
description: "Resultado de validar un DagDefinition. Contiene errores estructurales (ciclos, depends invalidos, nombres duplicados), warnings (command+script simultaneos) y los niveles topologicos calculados si el DAG es valido."
|
||||
tags: [dag, validation, workflow, result]
|
||||
uses_types: []
|
||||
file_path: "functions/core/dag_definition.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Levels solo se rellena cuando Valid=true. Cada sub-slice de Levels contiene los nombres/IDs de steps que pueden ejecutarse en paralelo. El orden de Levels refleja el orden topologico del grafo de dependencias.
|
||||
@@ -0,0 +1,27 @@
|
||||
---
|
||||
name: DagRun
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagRun struct {
|
||||
ID string
|
||||
DagName string
|
||||
DagPath string
|
||||
Status string
|
||||
StartedAt time.Time
|
||||
FinishedAt time.Time
|
||||
Trigger string
|
||||
Error string
|
||||
}
|
||||
description: "Representa una ejecucion de un workflow DAG. Almacenado en SQLite con estado, timestamps y trigger."
|
||||
tags: [dag, execution, run, workflow]
|
||||
uses_types: []
|
||||
file_path: "functions/infra/dag_run.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Status puede ser: pending, running, success, failed, cancelled.
|
||||
Trigger puede ser: manual, cron, api.
|
||||
@@ -0,0 +1,29 @@
|
||||
---
|
||||
name: DagStepResult
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type DagStepResult struct {
|
||||
ID string
|
||||
RunID string
|
||||
StepName string
|
||||
Status string
|
||||
ExitCode int
|
||||
Stdout string
|
||||
Stderr string
|
||||
StartedAt time.Time
|
||||
FinishedAt time.Time
|
||||
DurationMs int64
|
||||
Error string
|
||||
}
|
||||
description: "Resultado de la ejecucion de un step individual dentro de un DagRun. Captura exit code, stdout, stderr y duracion."
|
||||
tags: [dag, execution, step, result]
|
||||
uses_types: [DagRun_go_infra]
|
||||
file_path: "functions/infra/dag_run.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Status puede ser: pending, running, success, failed, skipped.
|
||||
@@ -0,0 +1,24 @@
|
||||
---
|
||||
name: process_handle
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type ProcessHandle struct {
|
||||
Cmd *exec.Cmd
|
||||
Pid int
|
||||
StartTime time.Time
|
||||
Dir string
|
||||
stdout io.ReadCloser
|
||||
stderr io.ReadCloser
|
||||
}
|
||||
description: "Handle de un subproceso en ejecucion. Contiene el comando, PID, tiempo de inicio, directorio de trabajo y los pipes de stdout/stderr (privados, leidos internamente por ProcessWait)."
|
||||
tags: [process, subprocess, handle, infra, exec]
|
||||
uses_types: []
|
||||
file_path: "functions/infra/process_handle.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto. Los campos stdout y stderr son privados para evitar lecturas concurrentes externas — ProcessWait los consume internamente. Cmd.SysProcAttr.Setpgid=true garantiza que ProcessKill puede matar el process group completo usando -Pid.
|
||||
@@ -0,0 +1,23 @@
|
||||
---
|
||||
name: process_result
|
||||
lang: go
|
||||
domain: infra
|
||||
version: "1.0.0"
|
||||
algebraic: product
|
||||
definition: |
|
||||
type ProcessResult struct {
|
||||
ExitCode int
|
||||
Stdout string
|
||||
Stderr string
|
||||
DurationMs int64
|
||||
Killed bool
|
||||
}
|
||||
description: "Resultado de un subproceso completado. Contiene codigo de salida, salida estandar y de error, duracion en milisegundos, y un flag que indica si fue terminado por timeout."
|
||||
tags: [process, subprocess, result, exit, infra, exec]
|
||||
uses_types: []
|
||||
file_path: "functions/infra/process_handle.go"
|
||||
---
|
||||
|
||||
## Notas
|
||||
|
||||
Tipo producto — todos los campos siempre presentes. Killed=true indica que ProcessWait agoto el timeout y llamo a ProcessKill; en ese caso ExitCode suele ser -1 o el codigo de SIGKILL segun el OS. DurationMs incluye el tiempo total desde ProcessSpawn hasta que Wait() retorno.
|
||||
Reference in New Issue
Block a user