commit b394d27e9fb9eea357ee6fe917a6b533c4c0c886 Author: agent Date: Mon May 18 18:46:13 2026 +0200 feat: initial scaffold of agent_runner_api service Go :8486 diff --git a/agent_runner_api.service b/agent_runner_api.service new file mode 100644 index 0000000..0b2cdbf --- /dev/null +++ b/agent_runner_api.service @@ -0,0 +1,16 @@ +[Unit] +Description=agent_runner_api — orquestador de agentes Claude headless con worktrees + DoD +After=network.target + +[Service] +Type=simple +WorkingDirectory=%h/fn_registry/apps/agent_runner_api +ExecStart=%h/fn_registry/apps/agent_runner_api/agent_runner_api --port 8486 --db %h/fn_registry/apps/agent_runner_api/agent_runs.db --repo-root %h/fn_registry --worktrees-root /tmp +Restart=always +RestartSec=3 +Environment=PATH=%h/.local/bin:/usr/local/bin:/usr/bin:/bin +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=default.target diff --git a/agent_spawn.go b/agent_spawn.go new file mode 100644 index 0000000..7993af1 --- /dev/null +++ b/agent_spawn.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +type SpawnConfig struct { + RepoRoot string + Branch string + WorktreePath string + Prompt string + LogPath string +} + +type SpawnResult struct { + PID int `json:"pid"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + LogPath string `json:"log_path"` + StartedAt int64 `json:"started_at"` + Error string `json:"error,omitempty"` +} + +// Spawn creates a git worktree on a fresh branch (reset if exists) and starts +// the claude headless subprocess. If claude is not in PATH or +// AGENT_RUNNER_STUB=1, runs `echo STUB: ` as a placeholder. +func Spawn(cfg SpawnConfig) SpawnResult { + res := SpawnResult{ + Branch: cfg.Branch, + WorktreePath: cfg.WorktreePath, + LogPath: cfg.LogPath, + StartedAt: time.Now().Unix(), + } + + if cfg.RepoRoot == "" || cfg.Branch == "" || cfg.WorktreePath == "" { + res.Error = "missing required fields (repo_root/branch/worktree_path)" + return res + } + + // Ensure parent dir for worktree exists + if err := os.MkdirAll(filepath.Dir(cfg.WorktreePath), 0o755); err != nil { + res.Error = "mkdir worktree parent: " + err.Error() + return res + } + + // If worktree path already exists, remove forcibly first + if _, err := os.Stat(cfg.WorktreePath); err == nil { + _ = exec.Command("git", "-C", cfg.RepoRoot, "worktree", "remove", "--force", cfg.WorktreePath).Run() + _ = os.RemoveAll(cfg.WorktreePath) + } + + // Delete branch if exists (best-effort) + _ = exec.Command("git", "-C", cfg.RepoRoot, "branch", "-D", cfg.Branch).Run() + + // Create worktree on new branch from master (fallback to current HEAD if master missing) + base := "master" + if err := exec.Command("git", "-C", cfg.RepoRoot, "rev-parse", "--verify", base).Run(); err != nil { + base = "HEAD" + } + cmd := exec.Command("git", "-C", cfg.RepoRoot, "worktree", "add", "-b", cfg.Branch, cfg.WorktreePath, base) + out, err := cmd.CombinedOutput() + if err != nil { + res.Error = fmt.Sprintf("worktree add: %s: %s", err.Error(), string(out)) + return res + } + + // Open log + if cfg.LogPath == "" { + cfg.LogPath = filepath.Join(cfg.WorktreePath, "agent.log") + res.LogPath = cfg.LogPath + } + if err := os.MkdirAll(filepath.Dir(cfg.LogPath), 0o755); err != nil { + res.Error = "mkdir log: " + err.Error() + return res + } + logFile, err := os.OpenFile(cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + res.Error = "open log: " + err.Error() + return res + } + + // Decide command: claude or stub + useStub := os.Getenv("AGENT_RUNNER_STUB") == "1" + if !useStub { + if _, err := exec.LookPath("claude"); err != nil { + useStub = true + } + } + + var sub *exec.Cmd + if useStub { + sub = exec.Command("echo", "STUB:", cfg.Prompt) + } else { + sub = exec.Command("claude", "--headless", "--dangerously-skip-permissions", "-p", cfg.Prompt) + sub.Dir = cfg.WorktreePath + } + if sub.Dir == "" { + sub.Dir = cfg.WorktreePath + } + sub.Stdout = logFile + sub.Stderr = logFile + + if err := sub.Start(); err != nil { + logFile.Close() + res.Error = "spawn: " + err.Error() + return res + } + res.PID = sub.Process.Pid + + // Reap async — closes log when subprocess exits + go func() { + _ = sub.Wait() + _ = logFile.Close() + }() + + return res +} + +// Cleanup kills the PID (best-effort), removes the worktree and deletes the branch. +func Cleanup(repoRoot string, pid int, worktreePath, branch string) error { + var firstErr error + if pid > 0 { + if p, err := os.FindProcess(pid); err == nil { + _ = p.Kill() + } + } + if worktreePath != "" { + out, err := exec.Command("git", "-C", repoRoot, "worktree", "remove", "--force", worktreePath).CombinedOutput() + if err != nil && !strings.Contains(string(out), "is not a working tree") { + firstErr = fmt.Errorf("worktree remove: %s: %s", err.Error(), string(out)) + } + _ = os.RemoveAll(worktreePath) + } + if branch != "" { + _ = exec.Command("git", "-C", repoRoot, "branch", "-D", branch).Run() + } + return firstErr +} diff --git a/app.md b/app.md new file mode 100644 index 0000000..0355f8c --- /dev/null +++ b/app.md @@ -0,0 +1,86 @@ +--- +name: agent_runner_api +lang: go +domain: agents +version: 0.1.0 +description: "Service Go que orquesta agentes Claude headless en git worktrees con DoD" +tags: [service, agents, go, workflows, dod] +icon: + phosphor: "robot" + accent: "#3b82f6" +framework: "stdlib-http" +entry_point: "main.go" +dir_path: "apps/agent_runner_api" +repo_url: "https://gitea.organic-machine.com/dataforge/agent_runner_api" +uses_functions: [] +uses_types: [] +service: + port: 8486 + health_endpoint: /api/health + health_timeout_s: 3 + systemd_unit: agent_runner_api.service + systemd_scope: user + restart_policy: always + runtime: systemd-user + pc_targets: + - aurgi-pc + - home-wsl + is_local_only: true +e2e_checks: + - id: build + cmd: "CGO_ENABLED=1 go build -o agent_runner_api ." + timeout_s: 120 + - id: smoke + cmd: "./agent_runner_api --port 8486 --db /tmp/agent_runner_api_e2e.db &" + health: "http://127.0.0.1:8486/api/health" + - id: tests + cmd: "go test -count=1 ./..." +--- + +## Visual + +Backend puro, sin UI. Consume por skill_tree v2 + kanban_cpp. + +## Endpoints + +- `GET /api/health` — `{status, port, db}`. +- `POST /api/runs` — body `{issue_id?, card_id?, kanban_app?, mode, prompt?}` -> `{run_id, branch, worktree_path, sse_url}`. Crea worktree + lanza subprocess Claude (o `echo STUB:` si `AGENT_RUNNER_STUB=1`). +- `GET /api/runs?status=&app=&since=` — lista runs filtrada. +- `GET /api/runs/:id` — detalle + dod_items. +- `GET /api/runs/:id/sse` — stream `text/event-stream` (events: `connected`, `status`, `evidence`, `validated`, `merged`, `aborted`). +- `POST /api/runs/:id/evidence` — `{item_id|item_key, kind, payload_path?, payload_url?, payload_text?}`. Auto-crea `dod_item` si solo se da `item_key`. +- `POST /api/runs/:id/evidence/:eid/validate` — `{validated_by}`. +- `POST /api/runs/:id/merge` — TBD merge `auto/` a master `--no-ff` (gate: todos los `dod_items.required = 1` deben tener evidencia `validated`). +- `POST /api/runs/:id/abort` — kill PID + `git worktree remove --force` + `branch -D` + `status=aborted`. + +## Schema (5 migrations idempotentes via embed.FS) + +| Tabla | Para que | +|---|---| +| `workflows` | Templates de prompt + `dod_schema_json` | +| `runs` | Run vivo: workflow/issue/card/kanban_app + branch + worktree_path + PID + status | +| `worktrees` | 1 row por worktree creada, marcada `removed_at` al abort/merge | +| `dod_items` | Items DoD del run (`pending|done|validated|failed`) | +| `dod_evidence` | Evidencias adjuntas (`text|file|url`) con `validated_at/validated_by` | + +## Lanzamiento + +```bash +cd apps/agent_runner_api +CGO_ENABLED=1 go build -o agent_runner_api . +./agent_runner_api --port 8486 --db agent_runs.db --repo-root /home/lucas/fn_registry --worktrees-root /tmp +``` + +systemd-user: `systemctl --user enable --now agent_runner_api.service` (despues de copiar `agent_runner_api.service` a `~/.config/systemd/user/`). + +## Gotchas + +- `git worktree add` falla si la rama ya existe -> el `Spawn()` la borra antes con `branch -D` (best-effort). +- Worktree y main repo comparten `.git/hooks/` — pre-commit del main puede bloquear commits del agente; usar `--no-verify` documentado. +- `claude --headless` requiere PATH correcto en systemd. Si `claude` no esta en `$PATH`, el subprocess cae automaticamente a `echo STUB:` (mismo comportamiento que `AGENT_RUNNER_STUB=1`). +- Subprocess corre async — el handler HTTP devuelve `run_id` apenas inserta + lanza, no espera al exit. +- SSE: clientes deben reconectar al cerrar conexion. Heartbeat cada 15s para mantener conexion abierta. + +## Capability growth log + +- v0.1.0 (2026-05-18) — scaffold inicial: stdlib http, embed.FS migrations, SSE hub, spawn stub fallback, DoD gate en merge. diff --git a/appicon.ico b/appicon.ico new file mode 100644 index 0000000..1594ea3 Binary files /dev/null and b/appicon.ico differ diff --git a/db.go b/db.go new file mode 100644 index 0000000..81ab2c4 --- /dev/null +++ b/db.go @@ -0,0 +1,56 @@ +package main + +import ( + "database/sql" + "embed" + "fmt" + "io/fs" + "sort" + "strings" + + _ "github.com/mattn/go-sqlite3" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +// openDB opens (or creates) the SQLite database and applies migrations. +func openDB(path string) (*sql.DB, error) { + dsn := fmt.Sprintf("file:%s?_journal=WAL&_foreign_keys=on&_busy_timeout=5000", path) + conn, err := sql.Open("sqlite3", dsn) + if err != nil { + return nil, fmt.Errorf("open: %w", err) + } + if err := conn.Ping(); err != nil { + return nil, fmt.Errorf("ping: %w", err) + } + if err := applyMigrations(conn); err != nil { + conn.Close() + return nil, fmt.Errorf("migrations: %w", err) + } + return conn, nil +} + +func applyMigrations(conn *sql.DB) error { + files, err := fs.Glob(migrationsFS, "migrations/*.sql") + if err != nil { + return err + } + sort.Strings(files) + for _, f := range files { + b, err := migrationsFS.ReadFile(f) + if err != nil { + return err + } + if _, err := conn.Exec(string(b)); err != nil { + msg := err.Error() + // Idempotent ignores for ADD COLUMN re-runs etc. + if strings.Contains(msg, "duplicate column") || + strings.Contains(msg, "already exists") { + continue + } + return fmt.Errorf("%s: %w", f, err) + } + } + return nil +} diff --git a/dod.go b/dod.go new file mode 100644 index 0000000..a7a6c92 --- /dev/null +++ b/dod.go @@ -0,0 +1,169 @@ +package main + +import ( + "database/sql" + "time" + + "github.com/google/uuid" +) + +type DodItem struct { + ID string `json:"id"` + RunID string `json:"run_id"` + ItemKey string `json:"item_key"` + Kind string `json:"kind"` + Expected string `json:"expected"` + Required bool `json:"required"` + Status string `json:"status"` + CreatedAt int64 `json:"created_at"` +} + +type DodEvidence struct { + ID string `json:"id"` + DodItemID string `json:"dod_item_id"` + Kind string `json:"kind"` + PayloadPath *string `json:"payload_path,omitempty"` + PayloadURL *string `json:"payload_url,omitempty"` + PayloadText *string `json:"payload_text,omitempty"` + AttachedAt int64 `json:"attached_at"` + ValidatedAt *int64 `json:"validated_at,omitempty"` + ValidatedBy *string `json:"validated_by,omitempty"` +} + +func createDodItem(db *sql.DB, runID, key, kind, expected string, required bool) (DodItem, error) { + id := "dod_" + uuid.New().String()[:12] + now := time.Now().Unix() + reqInt := 0 + if required { + reqInt = 1 + } + _, err := db.Exec(`INSERT INTO dod_items + (id, run_id, item_key, kind, expected, required, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`, + id, runID, key, kind, expected, reqInt, now) + if err != nil { + return DodItem{}, err + } + return DodItem{ + ID: id, RunID: runID, ItemKey: key, Kind: kind, Expected: expected, + Required: required, Status: "pending", CreatedAt: now, + }, nil +} + +func listDodItems(db *sql.DB, runID string) ([]DodItem, error) { + rows, err := db.Query(`SELECT id, run_id, item_key, kind, expected, required, status, created_at + FROM dod_items WHERE run_id = ? ORDER BY created_at`, runID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []DodItem{} + for rows.Next() { + var it DodItem + var reqInt int + if err := rows.Scan(&it.ID, &it.RunID, &it.ItemKey, &it.Kind, &it.Expected, &reqInt, &it.Status, &it.CreatedAt); err != nil { + return nil, err + } + it.Required = reqInt != 0 + out = append(out, it) + } + return out, nil +} + +func attachEvidence(db *sql.DB, itemID, kind string, path, url, text *string) (DodEvidence, error) { + id := "ev_" + uuid.New().String()[:12] + now := time.Now().Unix() + _, err := db.Exec(`INSERT INTO dod_evidence + (id, dod_item_id, kind, payload_path, payload_url, payload_text, attached_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + id, itemID, kind, nullStr(path), nullStr(url), nullStr(text), now) + if err != nil { + return DodEvidence{}, err + } + // Auto-bump item status to 'done' on first evidence + _, _ = db.Exec(`UPDATE dod_items SET status = 'done' WHERE id = ? AND status = 'pending'`, itemID) + return DodEvidence{ + ID: id, DodItemID: itemID, Kind: kind, + PayloadPath: path, PayloadURL: url, PayloadText: text, + AttachedAt: now, + }, nil +} + +func validateEvidence(db *sql.DB, evID, validatedBy string) error { + now := time.Now().Unix() + res, err := db.Exec(`UPDATE dod_evidence + SET validated_at = ?, validated_by = ? WHERE id = ?`, now, validatedBy, evID) + if err != nil { + return err + } + n, _ := res.RowsAffected() + if n == 0 { + return sql.ErrNoRows + } + // Bump item status to validated + _, _ = db.Exec(`UPDATE dod_items SET status = 'validated' + WHERE id = (SELECT dod_item_id FROM dod_evidence WHERE id = ?)`, evID) + return nil +} + +func listEvidence(db *sql.DB, itemID string) ([]DodEvidence, error) { + rows, err := db.Query(`SELECT id, dod_item_id, kind, payload_path, payload_url, payload_text, + attached_at, validated_at, validated_by + FROM dod_evidence WHERE dod_item_id = ? ORDER BY attached_at`, itemID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []DodEvidence{} + for rows.Next() { + var ev DodEvidence + var path, url, text, valBy sql.NullString + var valAt sql.NullInt64 + if err := rows.Scan(&ev.ID, &ev.DodItemID, &ev.Kind, &path, &url, &text, + &ev.AttachedAt, &valAt, &valBy); err != nil { + return nil, err + } + if path.Valid { + s := path.String + ev.PayloadPath = &s + } + if url.Valid { + s := url.String + ev.PayloadURL = &s + } + if text.Valid { + s := text.String + ev.PayloadText = &s + } + if valAt.Valid { + v := valAt.Int64 + ev.ValidatedAt = &v + } + if valBy.Valid { + s := valBy.String + ev.ValidatedBy = &s + } + out = append(out, ev) + } + return out, nil +} + +// dodGateOpen returns true when every required item has at least one validated evidence. +func dodGateOpen(db *sql.DB, runID string) (bool, error) { + row := db.QueryRow(` + SELECT COUNT(*) FROM dod_items + WHERE run_id = ? AND required = 1 + AND status != 'validated'`, runID) + var n int + if err := row.Scan(&n); err != nil { + return false, err + } + return n == 0, nil +} + +func nullStr(p *string) interface{} { + if p == nil { + return nil + } + return *p +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5508a6d --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module agent_runner_api + +go 1.22 + +require ( + github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.22 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e056088 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/handlers.go b/handlers.go new file mode 100644 index 0000000..4e0d130 --- /dev/null +++ b/handlers.go @@ -0,0 +1,388 @@ +package main + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/google/uuid" +) + +type Run struct { + ID string `json:"id"` + WorkflowID *string `json:"workflow_id,omitempty"` + IssueID *string `json:"issue_id,omitempty"` + CardID *string `json:"card_id,omitempty"` + KanbanApp *string `json:"kanban_app,omitempty"` + Mode string `json:"mode"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + Status string `json:"status"` + StartedAt int64 `json:"started_at"` + FinishedAt *int64 `json:"finished_at,omitempty"` + AgentPID *int `json:"agent_pid,omitempty"` + AgentLogPath *string `json:"agent_log_path,omitempty"` + Error *string `json:"error,omitempty"` +} + +type createRunRequest struct { + WorkflowID string `json:"workflow_id"` + IssueID string `json:"issue_id"` + CardID string `json:"card_id"` + KanbanApp string `json:"kanban_app"` + Mode string `json:"mode"` + Prompt string `json:"prompt"` +} + +type createRunResponse struct { + RunID string `json:"run_id"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + SSEURL string `json:"sse_url"` +} + +func writeJSON(w http.ResponseWriter, status int, body interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +func writeErr(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +func (a *App) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]interface{}{ + "status": "ok", + "port": a.cfg.Port, + "db": a.cfg.DBPath, + }) +} + +// POST /api/runs +func (a *App) handleCreateRun(w http.ResponseWriter, r *http.Request) { + var req createRunRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.Mode == "" { + req.Mode = "agent" + } + + runID := "run_" + uuid.New().String()[:12] + slug := req.IssueID + if slug == "" { + slug = req.CardID + } + if slug == "" { + slug = runID + } + branch := fmt.Sprintf("auto/%s", slug) + worktreePath := filepath.Join(a.cfg.WorktreesRoot, "wt-"+slug+"-"+runID[4:]) + now := time.Now().Unix() + + // Insert pending row + _, err := a.db.Exec(`INSERT INTO runs + (id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, status, started_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?)`, + runID, + nullStrFromS(req.WorkflowID), + nullStrFromS(req.IssueID), + nullStrFromS(req.CardID), + nullStrFromS(req.KanbanApp), + req.Mode, branch, worktreePath, now) + if err != nil { + writeErr(w, http.StatusInternalServerError, "insert run: "+err.Error()) + return + } + + // Spawn async (worktree create blocks briefly but is short) + prompt := req.Prompt + if prompt == "" { + prompt = fmt.Sprintf("Resolve %s in branch %s", slug, branch) + } + logPath := filepath.Join(worktreePath, "agent.log") + + res := Spawn(SpawnConfig{ + RepoRoot: a.cfg.RepoRoot, + Branch: branch, + WorktreePath: worktreePath, + Prompt: prompt, + LogPath: logPath, + }) + + if res.Error != "" { + _, _ = a.db.Exec(`UPDATE runs SET status = 'failed', error = ?, finished_at = ? + WHERE id = ?`, res.Error, time.Now().Unix(), runID) + writeErr(w, http.StatusInternalServerError, "spawn: "+res.Error) + return + } + + // Update row with PID + log + worktree entry + _, _ = a.db.Exec(`UPDATE runs SET agent_pid = ?, agent_log_path = ?, status = 'running' + WHERE id = ?`, res.PID, res.LogPath, runID) + wtID := "wt_" + uuid.New().String()[:12] + _, _ = a.db.Exec(`INSERT INTO worktrees (id, run_id, path, branch, created_at) + VALUES (?, ?, ?, ?, ?)`, wtID, runID, worktreePath, branch, now) + + a.sse.Publish(runID, sseEvent{Event: "status", Data: `{"status":"running","pid":` + strconv.Itoa(res.PID) + `}`}) + + writeJSON(w, http.StatusCreated, createRunResponse{ + RunID: runID, + Branch: branch, + WorktreePath: worktreePath, + SSEURL: fmt.Sprintf("/api/runs/%s/sse", runID), + }) +} + +// GET /api/runs?status=&app=&since= +func (a *App) handleListRuns(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + where := []string{} + args := []interface{}{} + if s := q.Get("status"); s != "" { + where = append(where, "status = ?") + args = append(args, s) + } + if app := q.Get("app"); app != "" { + where = append(where, "kanban_app = ?") + args = append(args, app) + } + if since := q.Get("since"); since != "" { + if ts, err := strconv.ParseInt(since, 10, 64); err == nil { + where = append(where, "started_at >= ?") + args = append(args, ts) + } + } + sqlStr := `SELECT id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, + status, started_at, finished_at, agent_pid, agent_log_path, error + FROM runs` + if len(where) > 0 { + sqlStr += " WHERE " + strings.Join(where, " AND ") + } + sqlStr += " ORDER BY started_at DESC LIMIT 200" + rows, err := a.db.Query(sqlStr, args...) + if err != nil { + writeErr(w, http.StatusInternalServerError, "query: "+err.Error()) + return + } + defer rows.Close() + out := []Run{} + for rows.Next() { + run, err := scanRun(rows) + if err != nil { + writeErr(w, http.StatusInternalServerError, "scan: "+err.Error()) + return + } + out = append(out, run) + } + writeJSON(w, http.StatusOK, out) +} + +// GET /api/runs/:id +func (a *App) handleGetRun(w http.ResponseWriter, r *http.Request, id string) { + row := a.db.QueryRow(`SELECT id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, + status, started_at, finished_at, agent_pid, agent_log_path, error + FROM runs WHERE id = ?`, id) + run, err := scanRun(row) + if errors.Is(err, sql.ErrNoRows) { + writeErr(w, http.StatusNotFound, "run not found") + return + } + if err != nil { + writeErr(w, http.StatusInternalServerError, "scan: "+err.Error()) + return + } + // Include dod items + items, _ := listDodItems(a.db, id) + writeJSON(w, http.StatusOK, map[string]interface{}{ + "run": run, + "dod_items": items, + }) +} + +// POST /api/runs/:id/evidence +type evidenceRequest struct { + ItemID string `json:"item_id"` + ItemKey string `json:"item_key"` + Kind string `json:"kind"` + PayloadPath *string `json:"payload_path,omitempty"` + PayloadURL *string `json:"payload_url,omitempty"` + PayloadText *string `json:"payload_text,omitempty"` +} + +func (a *App) handleAttachEvidence(w http.ResponseWriter, r *http.Request, runID string) { + var req evidenceRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + itemID := req.ItemID + if itemID == "" && req.ItemKey != "" { + // auto-create item if key provided + it, err := createDodItem(a.db, runID, req.ItemKey, "manual", "", true) + if err != nil { + writeErr(w, http.StatusInternalServerError, "auto-create item: "+err.Error()) + return + } + itemID = it.ID + } + if itemID == "" { + writeErr(w, http.StatusBadRequest, "item_id or item_key required") + return + } + if req.Kind == "" { + req.Kind = "text" + } + ev, err := attachEvidence(a.db, itemID, req.Kind, req.PayloadPath, req.PayloadURL, req.PayloadText) + if err != nil { + writeErr(w, http.StatusInternalServerError, "attach: "+err.Error()) + return + } + a.sse.Publish(runID, sseEvent{Event: "evidence", Data: `{"item_id":"` + itemID + `","evidence_id":"` + ev.ID + `"}`}) + writeJSON(w, http.StatusCreated, ev) +} + +// POST /api/runs/:id/evidence/:eid/validate +type validateRequest struct { + ValidatedBy string `json:"validated_by"` +} + +func (a *App) handleValidateEvidence(w http.ResponseWriter, r *http.Request, runID, evID string) { + var req validateRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.ValidatedBy == "" { + req.ValidatedBy = "human" + } + if err := validateEvidence(a.db, evID, req.ValidatedBy); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeErr(w, http.StatusNotFound, "evidence not found") + return + } + writeErr(w, http.StatusInternalServerError, "validate: "+err.Error()) + return + } + a.sse.Publish(runID, sseEvent{Event: "validated", Data: `{"evidence_id":"` + evID + `","validated_by":"` + req.ValidatedBy + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "validated"}) +} + +// POST /api/runs/:id/merge +func (a *App) handleMergeRun(w http.ResponseWriter, r *http.Request, runID string) { + open, err := dodGateOpen(a.db, runID) + if err != nil { + writeErr(w, http.StatusInternalServerError, "gate check: "+err.Error()) + return + } + if !open { + writeErr(w, http.StatusPreconditionFailed, "dod gate closed — required items not validated") + return + } + row := a.db.QueryRow(`SELECT branch FROM runs WHERE id = ?`, runID) + var branch string + if err := row.Scan(&branch); err != nil { + writeErr(w, http.StatusNotFound, "run not found") + return + } + out, err := exec.Command("git", "-C", a.cfg.RepoRoot, "merge", "--no-ff", branch).CombinedOutput() + if err != nil { + writeErr(w, http.StatusInternalServerError, "merge: "+err.Error()+": "+string(out)) + return + } + _, _ = a.db.Exec(`UPDATE runs SET status = 'merged', finished_at = ? WHERE id = ?`, + time.Now().Unix(), runID) + a.sse.Publish(runID, sseEvent{Event: "merged", Data: `{"branch":"` + branch + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "merged", "branch": branch}) +} + +// POST /api/runs/:id/abort +func (a *App) handleAbortRun(w http.ResponseWriter, r *http.Request, runID string) { + row := a.db.QueryRow(`SELECT agent_pid, worktree_path, branch FROM runs WHERE id = ?`, runID) + var pidNS sql.NullInt64 + var wt, branch string + if err := row.Scan(&pidNS, &wt, &branch); err != nil { + writeErr(w, http.StatusNotFound, "run not found") + return + } + pid := 0 + if pidNS.Valid { + pid = int(pidNS.Int64) + } + if err := Cleanup(a.cfg.RepoRoot, pid, wt, branch); err != nil { + // non-fatal; record but continue + fmt.Println("cleanup warning:", err) + } + now := time.Now().Unix() + _, _ = a.db.Exec(`UPDATE runs SET status = 'aborted', finished_at = ? WHERE id = ?`, now, runID) + _, _ = a.db.Exec(`UPDATE worktrees SET removed_at = ? WHERE run_id = ? AND removed_at IS NULL`, now, runID) + a.sse.Publish(runID, sseEvent{Event: "aborted", Data: `{"run_id":"` + runID + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "aborted"}) +} + +// --- helpers --- + +type rowScanner interface { + Scan(dest ...interface{}) error +} + +func scanRun(s rowScanner) (Run, error) { + var r Run + var workflowID, issueID, cardID, kanbanApp, logPath, errStr sql.NullString + var finishedAt sql.NullInt64 + var agentPID sql.NullInt64 + err := s.Scan(&r.ID, &workflowID, &issueID, &cardID, &kanbanApp, &r.Mode, &r.Branch, &r.WorktreePath, + &r.Status, &r.StartedAt, &finishedAt, &agentPID, &logPath, &errStr) + if err != nil { + return r, err + } + if workflowID.Valid { + s := workflowID.String + r.WorkflowID = &s + } + if issueID.Valid { + s := issueID.String + r.IssueID = &s + } + if cardID.Valid { + s := cardID.String + r.CardID = &s + } + if kanbanApp.Valid { + s := kanbanApp.String + r.KanbanApp = &s + } + if finishedAt.Valid { + v := finishedAt.Int64 + r.FinishedAt = &v + } + if agentPID.Valid { + v := int(agentPID.Int64) + r.AgentPID = &v + } + if logPath.Valid { + s := logPath.String + r.AgentLogPath = &s + } + if errStr.Valid { + s := errStr.String + r.Error = &s + } + return r, nil +} + +func nullStrFromS(s string) interface{} { + if s == "" { + return nil + } + return s +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..735c3a5 --- /dev/null +++ b/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" +) + +type Config struct { + Port int + DBPath string + RepoRoot string + WorktreesRoot string +} + +type App struct { + cfg Config + db *sql.DB + sse *sseHub +} + +func main() { + var ( + port = flag.Int("port", 8486, "HTTP port") + dbPath = flag.String("db", "agent_runs.db", "SQLite database path") + repoRoot = flag.String("repo-root", "", "Git repo root (defaults to $PWD)") + wtRoot = flag.String("worktrees-root", "/tmp", "Parent dir for worktrees") + ) + flag.Parse() + + root := *repoRoot + if root == "" { + root, _ = os.Getwd() + } + + db, err := openDB(*dbPath) + if err != nil { + log.Fatalf("openDB: %v", err) + } + defer db.Close() + + app := &App{ + cfg: Config{ + Port: *port, + DBPath: *dbPath, + RepoRoot: root, + WorktreesRoot: *wtRoot, + }, + db: db, + sse: newSSEHub(), + } + + mux := http.NewServeMux() + mux.HandleFunc("/api/health", app.handleHealth) + mux.HandleFunc("/api/runs", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + app.handleCreateRun(w, r) + case http.MethodGet: + app.handleListRuns(w, r) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + }) + mux.HandleFunc("/api/runs/", func(w http.ResponseWriter, r *http.Request) { + app.routeRun(w, r) + }) + + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", *port), + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + // Graceful shutdown + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) + }() + + log.Printf("agent_runner_api listening :%d db=%s repo=%s", *port, *dbPath, root) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %v", err) + } +} + +// routeRun parses /api/runs/:id[/...] subroutes. +func (a *App) routeRun(w http.ResponseWriter, r *http.Request) { + path := strings.TrimPrefix(r.URL.Path, "/api/runs/") + parts := strings.Split(path, "/") + if len(parts) == 0 || parts[0] == "" { + http.NotFound(w, r) + return + } + runID := parts[0] + + if len(parts) == 1 { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleGetRun(w, r, runID) + return + } + + switch parts[1] { + case "sse": + a.handleRunSSE(w, r, runID) + case "merge": + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleMergeRun(w, r, runID) + case "abort": + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleAbortRun(w, r, runID) + case "evidence": + // /api/runs/:id/evidence (POST) — attach + // /api/runs/:id/evidence/:eid/validate (POST) — validate + if len(parts) == 2 { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleAttachEvidence(w, r, runID) + return + } + if len(parts) == 4 && parts[3] == "validate" { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleValidateEvidence(w, r, runID, parts[2]) + return + } + http.NotFound(w, r) + default: + http.NotFound(w, r) + } +} diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..a3bdffa --- /dev/null +++ b/main_test.go @@ -0,0 +1,245 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" +) + +func setupApp(t *testing.T) (*App, func()) { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + // Init a throwaway git repo so worktree commands work + repoRoot := filepath.Join(dir, "repo") + if err := os.MkdirAll(repoRoot, 0o755); err != nil { + t.Fatalf("mkdir repo: %v", err) + } + mustRun(t, repoRoot, "git", "init", "-b", "master") + mustRun(t, repoRoot, "git", "config", "user.email", "test@local") + mustRun(t, repoRoot, "git", "config", "user.name", "test") + // commit something + readme := filepath.Join(repoRoot, "README.md") + _ = os.WriteFile(readme, []byte("hi\n"), 0o644) + mustRun(t, repoRoot, "git", "add", "-A") + mustRun(t, repoRoot, "git", "commit", "-m", "initial") + + db, err := openDB(dbPath) + if err != nil { + t.Fatalf("openDB: %v", err) + } + app := &App{ + cfg: Config{ + Port: 0, + DBPath: dbPath, + RepoRoot: repoRoot, + WorktreesRoot: filepath.Join(dir, "worktrees"), + }, + db: db, + sse: newSSEHub(), + } + t.Setenv("AGENT_RUNNER_STUB", "1") + cleanup := func() { + db.Close() + } + return app, cleanup +} + +func mustRun(t *testing.T, dir, name string, args ...string) { + t.Helper() + cmd := exec.Command(name, args...) + cmd.Dir = dir + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("%s %v: %v: %s", name, args, err, string(out)) + } +} + +func TestHealth(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + req := httptest.NewRequest(http.MethodGet, "/api/health", nil) + rec := httptest.NewRecorder() + app.handleHealth(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d", rec.Code) + } + var body map[string]interface{} + _ = json.Unmarshal(rec.Body.Bytes(), &body) + if body["status"] != "ok" { + t.Fatalf("unexpected body: %v", body) + } +} + +func TestCreateRun(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + payload := map[string]string{ + "issue_id": "0999", + "mode": "agent", + "prompt": "test prompt", + } + b, _ := json.Marshal(payload) + req := httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b)) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, req) + + if rec.Code != http.StatusCreated { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + var res createRunResponse + if err := json.Unmarshal(rec.Body.Bytes(), &res); err != nil { + t.Fatalf("json: %v", err) + } + if !strings.HasPrefix(res.RunID, "run_") { + t.Fatalf("bad run_id: %s", res.RunID) + } + if !strings.HasPrefix(res.Branch, "auto/") { + t.Fatalf("bad branch: %s", res.Branch) + } + // Verify row inserted + var status string + if err := app.db.QueryRow(`SELECT status FROM runs WHERE id = ?`, res.RunID).Scan(&status); err != nil { + t.Fatalf("query: %v", err) + } + if status != "running" && status != "pending" { + t.Fatalf("expected running/pending, got %s", status) + } + // Verify worktree row + var count int + _ = app.db.QueryRow(`SELECT COUNT(*) FROM worktrees WHERE run_id = ?`, res.RunID).Scan(&count) + if count != 1 { + t.Fatalf("expected 1 worktree row, got %d", count) + } +} + +func TestAbortRun(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Create a run + payload := map[string]string{"issue_id": "abort_test", "prompt": "x"} + b, _ := json.Marshal(payload) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + if rec.Code != http.StatusCreated { + t.Fatalf("create failed: %d %s", rec.Code, rec.Body.String()) + } + var res createRunResponse + _ = json.Unmarshal(rec.Body.Bytes(), &res) + + // Abort + rec2 := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/runs/%s/abort", res.RunID), nil) + app.handleAbortRun(rec2, req, res.RunID) + + if rec2.Code != http.StatusOK { + t.Fatalf("abort status=%d body=%s", rec2.Code, rec2.Body.String()) + } + var status string + _ = app.db.QueryRow(`SELECT status FROM runs WHERE id = ?`, res.RunID).Scan(&status) + if status != "aborted" { + t.Fatalf("expected aborted, got %s", status) + } + // Worktree row marked removed + var removed bool + _ = app.db.QueryRow(`SELECT removed_at IS NOT NULL FROM worktrees WHERE run_id = ?`, res.RunID).Scan(&removed) + if !removed { + t.Fatalf("expected worktree removed_at populated") + } +} + +func TestEvidencePersist(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Create run + b, _ := json.Marshal(map[string]string{"issue_id": "ev_test"}) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + var run createRunResponse + _ = json.Unmarshal(rec.Body.Bytes(), &run) + + // Attach evidence with auto-create item + text := "tests pass" + evReq := evidenceRequest{ + ItemKey: "tests_green", + Kind: "text", + PayloadText: &text, + } + body, _ := json.Marshal(evReq) + rec2 := httptest.NewRecorder() + app.handleAttachEvidence(rec2, httptest.NewRequest(http.MethodPost, + fmt.Sprintf("/api/runs/%s/evidence", run.RunID), bytes.NewReader(body)), run.RunID) + if rec2.Code != http.StatusCreated { + t.Fatalf("evidence status=%d body=%s", rec2.Code, rec2.Body.String()) + } + + // Verify rows + var itemCount, evCount int + _ = app.db.QueryRow(`SELECT COUNT(*) FROM dod_items WHERE run_id = ?`, run.RunID).Scan(&itemCount) + _ = app.db.QueryRow(`SELECT COUNT(*) FROM dod_evidence`).Scan(&evCount) + if itemCount != 1 || evCount != 1 { + t.Fatalf("expected 1+1, got items=%d evidence=%d", itemCount, evCount) + } + + // Status should have bumped to 'done' + var status string + _ = app.db.QueryRow(`SELECT status FROM dod_items WHERE run_id = ?`, run.RunID).Scan(&status) + if status != "done" { + t.Fatalf("expected done, got %s", status) + } +} + +func TestListFilter(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Insert two runs with different kanban_app (unique issue_ids to avoid branch collision) + for i, kapp := range []string{"kanban_a", "kanban_b", "kanban_a"} { + k := kapp + b, _ := json.Marshal(map[string]string{ + "kanban_app": k, + "issue_id": fmt.Sprintf("x_%s_%d", k, i), + }) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + if rec.Code != http.StatusCreated { + t.Fatalf("setup row failed: %s", rec.Body.String()) + } + } + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/runs?app=kanban_a", nil) + app.handleListRuns(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + var runs []Run + _ = json.Unmarshal(rec.Body.Bytes(), &runs) + if len(runs) != 2 { + t.Fatalf("expected 2 runs for kanban_a, got %d", len(runs)) + } + for _, r := range runs { + if r.KanbanApp == nil || *r.KanbanApp != "kanban_a" { + t.Fatalf("unexpected kanban_app: %v", r.KanbanApp) + } + } +} + +// drain reads to EOF (used to discard test response bodies). Not strictly needed +// for httptest but kept for future use. +func drain(rc io.ReadCloser) { _, _ = io.Copy(io.Discard, rc); rc.Close() } diff --git a/migrations/001_workflows.sql b/migrations/001_workflows.sql new file mode 100644 index 0000000..54c08ba --- /dev/null +++ b/migrations/001_workflows.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS workflows ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + prompt_template TEXT NOT NULL DEFAULT '', + dod_schema_json TEXT NOT NULL DEFAULT '[]', + created_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_workflows_name ON workflows(name); diff --git a/migrations/002_runs.sql b/migrations/002_runs.sql new file mode 100644 index 0000000..4f8667f --- /dev/null +++ b/migrations/002_runs.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS runs ( + id TEXT PRIMARY KEY, + workflow_id TEXT, + issue_id TEXT, + card_id TEXT, + kanban_app TEXT, + mode TEXT NOT NULL DEFAULT 'agent', + branch TEXT NOT NULL DEFAULT '', + worktree_path TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'pending', + started_at INTEGER NOT NULL, + finished_at INTEGER, + agent_pid INTEGER, + agent_log_path TEXT, + error TEXT, + FOREIGN KEY (workflow_id) REFERENCES workflows(id) +); + +CREATE INDEX IF NOT EXISTS idx_runs_status ON runs(status); +CREATE INDEX IF NOT EXISTS idx_runs_issue ON runs(issue_id); +CREATE INDEX IF NOT EXISTS idx_runs_card ON runs(card_id); +CREATE INDEX IF NOT EXISTS idx_runs_kanban_app ON runs(kanban_app); +CREATE INDEX IF NOT EXISTS idx_runs_started ON runs(started_at DESC); diff --git a/migrations/003_worktrees.sql b/migrations/003_worktrees.sql new file mode 100644 index 0000000..dc784d1 --- /dev/null +++ b/migrations/003_worktrees.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS worktrees ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + path TEXT NOT NULL, + branch TEXT NOT NULL, + created_at INTEGER NOT NULL, + removed_at INTEGER, + FOREIGN KEY (run_id) REFERENCES runs(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_worktrees_run ON worktrees(run_id); diff --git a/migrations/004_dod_items.sql b/migrations/004_dod_items.sql new file mode 100644 index 0000000..27d04aa --- /dev/null +++ b/migrations/004_dod_items.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS dod_items ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + item_key TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'manual', + expected TEXT NOT NULL DEFAULT '', + required INTEGER NOT NULL DEFAULT 1, + status TEXT NOT NULL DEFAULT 'pending', + created_at INTEGER NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_dod_items_run ON dod_items(run_id); +CREATE INDEX IF NOT EXISTS idx_dod_items_status ON dod_items(status); diff --git a/migrations/005_dod_evidence.sql b/migrations/005_dod_evidence.sql new file mode 100644 index 0000000..1677303 --- /dev/null +++ b/migrations/005_dod_evidence.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS dod_evidence ( + id TEXT PRIMARY KEY, + dod_item_id TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'text', + payload_path TEXT, + payload_url TEXT, + payload_text TEXT, + attached_at INTEGER NOT NULL, + validated_at INTEGER, + validated_by TEXT, + FOREIGN KEY (dod_item_id) REFERENCES dod_items(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_dod_evidence_item ON dod_evidence(dod_item_id); diff --git a/sse.go b/sse.go new file mode 100644 index 0000000..ebfcd5d --- /dev/null +++ b/sse.go @@ -0,0 +1,107 @@ +package main + +import ( + "fmt" + "net/http" + "sync" + "time" +) + +// sseHub broadcasts events keyed by run_id to N subscribers. +type sseHub struct { + mu sync.Mutex + subscribers map[string]map[chan sseEvent]struct{} +} + +type sseEvent struct { + Event string + Data string +} + +func newSSEHub() *sseHub { + return &sseHub{subscribers: make(map[string]map[chan sseEvent]struct{})} +} + +func (h *sseHub) subscribe(runID string) chan sseEvent { + ch := make(chan sseEvent, 16) + h.mu.Lock() + if _, ok := h.subscribers[runID]; !ok { + h.subscribers[runID] = make(map[chan sseEvent]struct{}) + } + h.subscribers[runID][ch] = struct{}{} + h.mu.Unlock() + return ch +} + +func (h *sseHub) unsubscribe(runID string, ch chan sseEvent) { + h.mu.Lock() + if subs, ok := h.subscribers[runID]; ok { + delete(subs, ch) + if len(subs) == 0 { + delete(h.subscribers, runID) + } + } + h.mu.Unlock() + close(ch) +} + +func (h *sseHub) Publish(runID string, ev sseEvent) { + h.mu.Lock() + subs := h.subscribers[runID] + chans := make([]chan sseEvent, 0, len(subs)) + for c := range subs { + chans = append(chans, c) + } + h.mu.Unlock() + for _, c := range chans { + select { + case c <- ev: + default: + // drop on slow subscriber + } + } +} + +func (a *App) handleRunSSE(w http.ResponseWriter, r *http.Request, runID string) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + // Subscribe + ch := a.sse.subscribe(runID) + defer a.sse.unsubscribe(runID, ch) + + // Send initial connected event + fmt.Fprintf(w, "event: connected\ndata: {\"run_id\":\"%s\"}\n\n", runID) + flusher.Flush() + + // Heartbeat + events + heartbeat := time.NewTicker(15 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-r.Context().Done(): + return + case <-heartbeat.C: + fmt.Fprintf(w, ": heartbeat\n\n") + flusher.Flush() + case ev, ok := <-ch: + if !ok { + return + } + if ev.Event != "" { + fmt.Fprintf(w, "event: %s\n", ev.Event) + } + fmt.Fprintf(w, "data: %s\n\n", ev.Data) + flusher.Flush() + } + } +}