// parallel-executor — Orquestador de ejecución paralela de issues. // // Crea git worktrees, ejecuta claude en cada uno, y mergea los resultados. // Usa DevFactory para Result[T], MapSlice, y operaciones I/O. // // Uso: // // parallel-executor [flags] // --plan Plan markdown (default: PARALLEL_EXECUTION_ORDER.md) // --group Ejecutar solo grupo N // --sequential Sin paralelismo // --dry-run Solo mostrar plan sin ejecutar // --timeout Timeout por issue en minutos (default: 30) // --cleanup Solo limpiar worktrees existentes // --sort Analizar issues y generar plan (no ejecutar) package main import ( "flag" "fmt" "os" "sync" "time" dfshell "github.com/lucasdataproyects/devfactory/shell" pcore "github.com/lucasdataproyects/parallel-executor/core" "github.com/lucasdataproyects/parallel-executor/shell" ) // CLI colors const ( red = "\033[0;31m" green = "\033[0;32m" yellow = "\033[1;33m" blue = "\033[0;34m" cyan = "\033[0;36m" nc = "\033[0m" ) func main() { planFile := flag.String("plan", "PARALLEL_EXECUTION_ORDER.md", "plan markdown file") groupNum := flag.Int("group", 0, "execute only this group number") sequential := flag.Bool("sequential", false, "disable parallel execution") dryRun := flag.Bool("dry-run", false, "show plan without executing") timeoutMin := flag.Int("timeout", 30, "timeout per issue in minutes") cleanup := flag.Bool("cleanup", false, "only cleanup existing worktrees") sortMode := flag.Bool("sort", false, "analyze issues and generate plan") flag.Parse() repoRoot, err := os.Getwd() if err != nil { fatal("cannot get working directory: %v", err) } // --- Modo cleanup --- if *cleanup { info("Cleaning up worktrees...") result := shell.CleanupAllWorktrees(repoRoot) if result.IsErr() { fatal("cleanup failed: %v", result.Error()) } ok("Cleaned %d worktrees", result.Unwrap()) return } // --- Modo sort: analizar issues y generar plan --- if *sortMode { generatePlan(repoRoot) return } // --- Ejecutar plan --- executePlan(repoRoot, *planFile, *groupNum, *sequential, *dryRun, *timeoutMin) } func generatePlan(repoRoot string) { issuesDir := repoRoot + "/dev/issues" if !dfshell.DirExists(issuesDir) { fatal("issues directory not found: %s", issuesDir) } info("Reading issues from %s...", issuesDir) filesResult := shell.ReadIssueFiles(issuesDir) if filesResult.IsErr() { fatal("cannot read issues: %v", filesResult.Error()) } issues := pcore.ParseIssueFiles(filesResult.Unwrap()) if len(issues) == 0 { warn("No pending issues found") return } info("Found %d pending issues", len(issues)) // Topological sort groupsResult := pcore.TopologicalSort(issues) if groupsResult.IsErr() { fatal("dependency analysis failed: %v", groupsResult.Error()) } groups := groupsResult.Unwrap() // Generate markdown markdown := pcore.GeneratePlanMarkdown(groups, nil) outFile := repoRoot + "/PARALLEL_EXECUTION_ORDER.md" writeResult := dfshell.WriteString(outFile, markdown) if writeResult.IsErr() { fatal("cannot write plan: %v", writeResult.Error()) } ok("Plan generated: %s", outFile) fmt.Printf("\n%sIssues:%s %d\n", cyan, nc, len(issues)) fmt.Printf("%sGroups:%s %d\n", cyan, nc, len(groups)) for _, g := range groups { fmt.Printf(" %sGrupo %d:%s %d issues\n", blue, g.Number, nc, len(g.Issues)) for _, issue := range g.Issues { fmt.Printf(" - #%04d %s\n", issue.Number, issue.Title) } } } func executePlan(repoRoot string, planFile string, groupNum int, sequential bool, dryRun bool, timeoutMin int) { // Leer plan planContent := dfshell.ReadString(planFile) if planContent.IsErr() { // Intentar generar plan automáticamente warn("Plan not found, generating from issues...") generatePlan(repoRoot) planContent = dfshell.ReadString(planFile) if planContent.IsErr() { fatal("cannot read plan: %v", planContent.Error()) } } planResult := pcore.ParsePlan(planContent.Unwrap()) if planResult.IsErr() { fatal("cannot parse plan: %v", planResult.Error()) } plan := planResult.Unwrap() // Filtrar por grupo si se especificó if groupNum > 0 { filtered := pcore.FilterGroup(plan, groupNum) if filtered.IsErr() { fatal("group filter: %v", filtered.Error()) } plan = filtered.Unwrap() } info("Plan: %d issues in %d groups", plan.Total, len(plan.Groups)) // --- Dry run: solo mostrar --- if dryRun { for _, group := range plan.Groups { fmt.Printf("\n%s%s%s (%d issues)\n", cyan, group.Name, nc, len(group.Issues)) specs := pcore.BuildWorktreeSpecs(group.Issues, repoRoot) for _, spec := range specs { fmt.Printf(" #%04d → %s\n", spec.Issue.Number, spec.BranchName) fmt.Printf(" %s\n", spec.WorkDir) } } fmt.Printf("\n%sDry run complete. No worktrees created.%s\n", yellow, nc) return } // --- Logger --- loggerResult := shell.NewLogger(repoRoot) if loggerResult.IsErr() { fatal("cannot create logger: %v", loggerResult.Error()) } logger := loggerResult.Unwrap() info("Logs: %s", logger.SessionLogFile()) // --- Ejecutar grupos secuencialmente, issues dentro de cada grupo en paralelo --- timeout := time.Duration(timeoutMin) * time.Minute var allResults []pcore.ExecutionResult for _, group := range plan.Groups { fmt.Printf("\n%s══════════════════════════════════════%s\n", cyan, nc) fmt.Printf("%s %s — %d issues%s\n", cyan, group.Name, len(group.Issues), nc) fmt.Printf("%s══════════════════════════════════════%s\n", cyan, nc) specs := pcore.BuildWorktreeSpecs(group.Issues, repoRoot) // Crear worktrees info("Creating %d worktrees...", len(specs)) var validSpecs []pcore.WorktreeSpec for _, spec := range specs { result := shell.CreateWorktree(spec, repoRoot) if result.IsErr() { warn("Failed to create worktree for #%04d: %v", spec.Issue.Number, result.Error()) allResults = append(allResults, pcore.ExecutionResult{ Issue: spec.Issue, Success: false, Error: result.Error().Error(), }) continue } ok("Worktree: %s → %s", spec.BranchName, result.Unwrap()) validSpecs = append(validSpecs, spec) } // Ejecutar var groupResults []pcore.ExecutionResult if sequential || len(validSpecs) == 1 { groupResults = executeSequential(validSpecs, timeout, logger) } else { groupResults = executeParallel(validSpecs, timeout, logger) } allResults = append(allResults, groupResults...) // Mergear las exitosas for _, r := range groupResults { if !r.Success { continue } spec := findSpec(validSpecs, r.Issue.Number) if spec == nil { continue } info("Merging %s...", spec.BranchName) mergeResult := shell.MergeBranchToMaster(spec.BranchName, repoRoot) if mergeResult.IsErr() { warn("Merge failed for %s: %v", spec.BranchName, mergeResult.Error()) } else { ok("Merged %s", spec.BranchName) } } // Limpiar worktrees del grupo for _, spec := range validSpecs { shell.RemoveWorktree(spec, repoRoot) } } // --- Resumen --- summaryResult := logger.WriteSummary(allResults) summaryFile := "" if summaryResult.IsOk() { summaryFile = summaryResult.Unwrap() } fmt.Printf("\n%s══════════════════════════════════════%s\n", green, nc) fmt.Printf("%s Execution Complete%s\n", green, nc) fmt.Printf("%s══════════════════════════════════════%s\n\n", green, nc) succeeded := 0 failed := 0 for _, r := range allResults { if r.Success { succeeded++ fmt.Printf(" %s✓%s #%04d %s (%s)\n", green, nc, r.Issue.Number, r.Issue.Title, r.Duration) } else { failed++ fmt.Printf(" %s✗%s #%04d %s — %s\n", red, nc, r.Issue.Number, r.Issue.Title, r.Error) } } fmt.Printf("\n Total: %d | Succeeded: %s%d%s | Failed: %s%d%s\n", len(allResults), green, succeeded, nc, red, failed, nc) if summaryFile != "" { fmt.Printf(" Summary: %s\n", summaryFile) } fmt.Printf(" Log: %s\n", logger.SessionLogFile()) // Cleanup final shell.CleanupAllWorktrees(repoRoot) if failed > 0 { os.Exit(1) } } func executeSequential(specs []pcore.WorktreeSpec, timeout time.Duration, logger *shell.Logger) []pcore.ExecutionResult { results := make([]pcore.ExecutionResult, 0, len(specs)) for _, spec := range specs { info("Executing #%04d %s...", spec.Issue.Number, spec.Issue.Title) logger.LogIssueStart(spec.Issue) result := shell.ExecuteIssue(spec, timeout) logger.LogIssueResult(result) if result.Success { ok("#%04d completed in %s", spec.Issue.Number, result.Duration) } else { warn("#%04d failed: %s", spec.Issue.Number, result.Error) } results = append(results, result) } return results } func executeParallel(specs []pcore.WorktreeSpec, timeout time.Duration, logger *shell.Logger) []pcore.ExecutionResult { results := make([]pcore.ExecutionResult, len(specs)) var wg sync.WaitGroup for i, spec := range specs { wg.Add(1) go func(idx int, s pcore.WorktreeSpec) { defer wg.Done() info("[goroutine] Executing #%04d %s...", s.Issue.Number, s.Issue.Title) logger.LogIssueStart(s.Issue) result := shell.ExecuteIssue(s, timeout) logger.LogIssueResult(result) results[idx] = result if result.Success { ok("[goroutine] #%04d completed in %s", s.Issue.Number, result.Duration) } else { warn("[goroutine] #%04d failed: %s", s.Issue.Number, result.Error) } }(i, spec) } wg.Wait() return results } func findSpec(specs []pcore.WorktreeSpec, issueNum int) *pcore.WorktreeSpec { for _, s := range specs { if s.Issue.Number == issueNum { return &s } } return nil } func info(format string, args ...any) { fmt.Printf("%s[INFO]%s %s\n", blue, nc, fmt.Sprintf(format, args...)) } func ok(format string, args ...any) { fmt.Printf("%s[OK]%s %s\n", green, nc, fmt.Sprintf(format, args...)) } func warn(format string, args ...any) { fmt.Printf("%s[WARN]%s %s\n", yellow, nc, fmt.Sprintf(format, args...)) } func fatal(format string, args ...any) { fmt.Fprintf(os.Stderr, "%s[ERROR]%s %s\n", red, nc, fmt.Sprintf(format, args...)) os.Exit(1) }