From b394d27e9fb9eea357ee6fe917a6b533c4c0c886 Mon Sep 17 00:00:00 2001 From: agent Date: Mon, 18 May 2026 18:46:13 +0200 Subject: [PATCH] feat: initial scaffold of agent_runner_api service Go :8486 --- agent_runner_api.service | 16 ++ agent_spawn.go | 143 ++++++++++++ app.md | 86 +++++++ appicon.ico | Bin 0 -> 9058 bytes db.go | 56 +++++ dod.go | 169 ++++++++++++++ go.mod | 8 + go.sum | 4 + handlers.go | 388 ++++++++++++++++++++++++++++++++ main.go | 156 +++++++++++++ main_test.go | 245 ++++++++++++++++++++ migrations/001_workflows.sql | 9 + migrations/002_runs.sql | 23 ++ migrations/003_worktrees.sql | 11 + migrations/004_dod_items.sql | 14 ++ migrations/005_dod_evidence.sql | 14 ++ sse.go | 107 +++++++++ 17 files changed, 1449 insertions(+) create mode 100644 agent_runner_api.service create mode 100644 agent_spawn.go create mode 100644 app.md create mode 100644 appicon.ico create mode 100644 db.go create mode 100644 dod.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handlers.go create mode 100644 main.go create mode 100644 main_test.go create mode 100644 migrations/001_workflows.sql create mode 100644 migrations/002_runs.sql create mode 100644 migrations/003_worktrees.sql create mode 100644 migrations/004_dod_items.sql create mode 100644 migrations/005_dod_evidence.sql create mode 100644 sse.go diff --git a/agent_runner_api.service b/agent_runner_api.service new file mode 100644 index 0000000..0b2cdbf --- /dev/null +++ b/agent_runner_api.service @@ -0,0 +1,16 @@ +[Unit] +Description=agent_runner_api — orquestador de agentes Claude headless con worktrees + DoD +After=network.target + +[Service] +Type=simple +WorkingDirectory=%h/fn_registry/apps/agent_runner_api +ExecStart=%h/fn_registry/apps/agent_runner_api/agent_runner_api --port 8486 --db %h/fn_registry/apps/agent_runner_api/agent_runs.db --repo-root %h/fn_registry --worktrees-root /tmp +Restart=always +RestartSec=3 +Environment=PATH=%h/.local/bin:/usr/local/bin:/usr/bin:/bin +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=default.target diff --git a/agent_spawn.go b/agent_spawn.go new file mode 100644 index 0000000..7993af1 --- /dev/null +++ b/agent_spawn.go @@ -0,0 +1,143 @@ +package main + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" +) + +type SpawnConfig struct { + RepoRoot string + Branch string + WorktreePath string + Prompt string + LogPath string +} + +type SpawnResult struct { + PID int `json:"pid"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + LogPath string `json:"log_path"` + StartedAt int64 `json:"started_at"` + Error string `json:"error,omitempty"` +} + +// Spawn creates a git worktree on a fresh branch (reset if exists) and starts +// the claude headless subprocess. If claude is not in PATH or +// AGENT_RUNNER_STUB=1, runs `echo STUB: ` as a placeholder. +func Spawn(cfg SpawnConfig) SpawnResult { + res := SpawnResult{ + Branch: cfg.Branch, + WorktreePath: cfg.WorktreePath, + LogPath: cfg.LogPath, + StartedAt: time.Now().Unix(), + } + + if cfg.RepoRoot == "" || cfg.Branch == "" || cfg.WorktreePath == "" { + res.Error = "missing required fields (repo_root/branch/worktree_path)" + return res + } + + // Ensure parent dir for worktree exists + if err := os.MkdirAll(filepath.Dir(cfg.WorktreePath), 0o755); err != nil { + res.Error = "mkdir worktree parent: " + err.Error() + return res + } + + // If worktree path already exists, remove forcibly first + if _, err := os.Stat(cfg.WorktreePath); err == nil { + _ = exec.Command("git", "-C", cfg.RepoRoot, "worktree", "remove", "--force", cfg.WorktreePath).Run() + _ = os.RemoveAll(cfg.WorktreePath) + } + + // Delete branch if exists (best-effort) + _ = exec.Command("git", "-C", cfg.RepoRoot, "branch", "-D", cfg.Branch).Run() + + // Create worktree on new branch from master (fallback to current HEAD if master missing) + base := "master" + if err := exec.Command("git", "-C", cfg.RepoRoot, "rev-parse", "--verify", base).Run(); err != nil { + base = "HEAD" + } + cmd := exec.Command("git", "-C", cfg.RepoRoot, "worktree", "add", "-b", cfg.Branch, cfg.WorktreePath, base) + out, err := cmd.CombinedOutput() + if err != nil { + res.Error = fmt.Sprintf("worktree add: %s: %s", err.Error(), string(out)) + return res + } + + // Open log + if cfg.LogPath == "" { + cfg.LogPath = filepath.Join(cfg.WorktreePath, "agent.log") + res.LogPath = cfg.LogPath + } + if err := os.MkdirAll(filepath.Dir(cfg.LogPath), 0o755); err != nil { + res.Error = "mkdir log: " + err.Error() + return res + } + logFile, err := os.OpenFile(cfg.LogPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + res.Error = "open log: " + err.Error() + return res + } + + // Decide command: claude or stub + useStub := os.Getenv("AGENT_RUNNER_STUB") == "1" + if !useStub { + if _, err := exec.LookPath("claude"); err != nil { + useStub = true + } + } + + var sub *exec.Cmd + if useStub { + sub = exec.Command("echo", "STUB:", cfg.Prompt) + } else { + sub = exec.Command("claude", "--headless", "--dangerously-skip-permissions", "-p", cfg.Prompt) + sub.Dir = cfg.WorktreePath + } + if sub.Dir == "" { + sub.Dir = cfg.WorktreePath + } + sub.Stdout = logFile + sub.Stderr = logFile + + if err := sub.Start(); err != nil { + logFile.Close() + res.Error = "spawn: " + err.Error() + return res + } + res.PID = sub.Process.Pid + + // Reap async — closes log when subprocess exits + go func() { + _ = sub.Wait() + _ = logFile.Close() + }() + + return res +} + +// Cleanup kills the PID (best-effort), removes the worktree and deletes the branch. +func Cleanup(repoRoot string, pid int, worktreePath, branch string) error { + var firstErr error + if pid > 0 { + if p, err := os.FindProcess(pid); err == nil { + _ = p.Kill() + } + } + if worktreePath != "" { + out, err := exec.Command("git", "-C", repoRoot, "worktree", "remove", "--force", worktreePath).CombinedOutput() + if err != nil && !strings.Contains(string(out), "is not a working tree") { + firstErr = fmt.Errorf("worktree remove: %s: %s", err.Error(), string(out)) + } + _ = os.RemoveAll(worktreePath) + } + if branch != "" { + _ = exec.Command("git", "-C", repoRoot, "branch", "-D", branch).Run() + } + return firstErr +} diff --git a/app.md b/app.md new file mode 100644 index 0000000..0355f8c --- /dev/null +++ b/app.md @@ -0,0 +1,86 @@ +--- +name: agent_runner_api +lang: go +domain: agents +version: 0.1.0 +description: "Service Go que orquesta agentes Claude headless en git worktrees con DoD" +tags: [service, agents, go, workflows, dod] +icon: + phosphor: "robot" + accent: "#3b82f6" +framework: "stdlib-http" +entry_point: "main.go" +dir_path: "apps/agent_runner_api" +repo_url: "https://gitea.organic-machine.com/dataforge/agent_runner_api" +uses_functions: [] +uses_types: [] +service: + port: 8486 + health_endpoint: /api/health + health_timeout_s: 3 + systemd_unit: agent_runner_api.service + systemd_scope: user + restart_policy: always + runtime: systemd-user + pc_targets: + - aurgi-pc + - home-wsl + is_local_only: true +e2e_checks: + - id: build + cmd: "CGO_ENABLED=1 go build -o agent_runner_api ." + timeout_s: 120 + - id: smoke + cmd: "./agent_runner_api --port 8486 --db /tmp/agent_runner_api_e2e.db &" + health: "http://127.0.0.1:8486/api/health" + - id: tests + cmd: "go test -count=1 ./..." +--- + +## Visual + +Backend puro, sin UI. Consume por skill_tree v2 + kanban_cpp. + +## Endpoints + +- `GET /api/health` — `{status, port, db}`. +- `POST /api/runs` — body `{issue_id?, card_id?, kanban_app?, mode, prompt?}` -> `{run_id, branch, worktree_path, sse_url}`. Crea worktree + lanza subprocess Claude (o `echo STUB:` si `AGENT_RUNNER_STUB=1`). +- `GET /api/runs?status=&app=&since=` — lista runs filtrada. +- `GET /api/runs/:id` — detalle + dod_items. +- `GET /api/runs/:id/sse` — stream `text/event-stream` (events: `connected`, `status`, `evidence`, `validated`, `merged`, `aborted`). +- `POST /api/runs/:id/evidence` — `{item_id|item_key, kind, payload_path?, payload_url?, payload_text?}`. Auto-crea `dod_item` si solo se da `item_key`. +- `POST /api/runs/:id/evidence/:eid/validate` — `{validated_by}`. +- `POST /api/runs/:id/merge` — TBD merge `auto/` a master `--no-ff` (gate: todos los `dod_items.required = 1` deben tener evidencia `validated`). +- `POST /api/runs/:id/abort` — kill PID + `git worktree remove --force` + `branch -D` + `status=aborted`. + +## Schema (5 migrations idempotentes via embed.FS) + +| Tabla | Para que | +|---|---| +| `workflows` | Templates de prompt + `dod_schema_json` | +| `runs` | Run vivo: workflow/issue/card/kanban_app + branch + worktree_path + PID + status | +| `worktrees` | 1 row por worktree creada, marcada `removed_at` al abort/merge | +| `dod_items` | Items DoD del run (`pending|done|validated|failed`) | +| `dod_evidence` | Evidencias adjuntas (`text|file|url`) con `validated_at/validated_by` | + +## Lanzamiento + +```bash +cd apps/agent_runner_api +CGO_ENABLED=1 go build -o agent_runner_api . +./agent_runner_api --port 8486 --db agent_runs.db --repo-root /home/lucas/fn_registry --worktrees-root /tmp +``` + +systemd-user: `systemctl --user enable --now agent_runner_api.service` (despues de copiar `agent_runner_api.service` a `~/.config/systemd/user/`). + +## Gotchas + +- `git worktree add` falla si la rama ya existe -> el `Spawn()` la borra antes con `branch -D` (best-effort). +- Worktree y main repo comparten `.git/hooks/` — pre-commit del main puede bloquear commits del agente; usar `--no-verify` documentado. +- `claude --headless` requiere PATH correcto en systemd. Si `claude` no esta en `$PATH`, el subprocess cae automaticamente a `echo STUB:` (mismo comportamiento que `AGENT_RUNNER_STUB=1`). +- Subprocess corre async — el handler HTTP devuelve `run_id` apenas inserta + lanza, no espera al exit. +- SSE: clientes deben reconectar al cerrar conexion. Heartbeat cada 15s para mantener conexion abierta. + +## Capability growth log + +- v0.1.0 (2026-05-18) — scaffold inicial: stdlib http, embed.FS migrations, SSE hub, spawn stub fallback, DoD gate en merge. diff --git a/appicon.ico b/appicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..1594ea38b41b4628520583ac570f938346453ef8 GIT binary patch literal 9058 zcmb_=byOT(uxB&44;ly_2oT%}KDZ@laEIW5;4Xu^JHdhkcL~m50YdQL!IEIX-7>I~ zeBask_MErx?H}9c^mP5I`cB`jTfeGX4FE_0IsgU(h=Uf;LjnK~ghWR656(v53B*ZD z`w!L!0e~$s0B~{rgZU70KPmt~AphWF1ir!m04VeyY=#a1+chyfr< zO+_9T>oFEWiL0m}qk*{onE@CB4e?8J_Fx78NVJMFlCQjSnv=Pe`ys?bf%A+4QTo5Z zj*e8Xoc7Rd7*FW0kz`QAFam%cXr+`$WK5!B&J!aZHz&@8`q$-Ye6}ulwqZr{#4Pt4 zt)53K@WZYlB0&D*S~5SeS3G!6Jg-uN)Q#fP8W7P@Vg{sgs=Q-#EQ$sXF0O*Fw@Cqw zKG#g=hjfmKb+5GoN9>krb8?b^gLd}kLb_~|Pp&-HW znYJrOZ~u9=ly&r{(rK2(WiV@6+2);FZeL&Z!FsV?NGR%Et+sOb#ov(im~Fn zHuEv%335=9&Qgt7%__oWfTFC5OpTOj@PFahKTApW*RkuKV%G@An*HNgi`Nj)+{?9l zUNt1j>L2Ez_2lKYTmmtfHfCHrrp0M{W>5y2GDRNlGR4F#TT%_;K zj4uJXN_^ea($oge%z{hD%F5JyAXT+N+UD*mKWO9npai%skj zJD6CE#_vE8T7n( ztAVnqf!Y;caI`BDkH_8QWCVEL1qB6dtUTJcWYlJQ({mlMF`n7NDz ztow{#^WDcuA8IJ6NikWfBDXbJxCjDt;IH$?^l<-@=)YMRTExoG{#_YYAD?po0MY%k zG6xwiZDvf#h63MCHzh|~S-x?@5V_`-^dx2j9|I)pvV1xW zIny$#GG^6#=jrnd@go%(kcnP0@OH5eu|~lSbl(N2Rtl8XS5wdZnZ=dqk@p^bZz~ew zBRTAG^z0JkK_x=}s(bca%iLTBMC`+&ZsH^EBc??l?tA2PRzCJtq|<6q&ENd&chL(X z9x7NR>c~4)!bXA`md3+V5A{=*UPaWzUvz82SwmI6`_x#jN95XGPTiBuw1TJwuZUBl zS8(wTmDaRdJm|50ov9{J@CNGEua9SmN@$CIu!djVEn&uOhU^=J|H6JzvKL$^vv})vUET<5s1jT`!}+( zM^%X=DTgFPIvg-tqBKHL%1r5$ysiOM!m#UTZfY=xID&lZvtD#bAyJQr#ICQ2zD7P0 zBSVd&V_?oCBjSejW|Ae55t+!y>|2OcRhJx08Zhh>5k2PU2z7e^S z7VJpJmAi{cdwqZz*TyKC=SBbIQo1H4LME^OfHuGzTu-?LLG9JT?XO8tczAY$`tQKY1j@y!^8ZX-vgB0!ys=vC##~;P~cU z%_Z5>(fc?S1Ni8~%0K+$S2_@YH+f@ z#N?oMJ=w6&^rt9}K$|v&mGlOY?tQd9!=BC(!O~-1R*0Ca%;I`0{+MK8%Z?;pM{gQ! zos%~&!86~@xRakp+o28NvVi+y98oXw@4r_}rhc!@SFfK{i00vNb5X}$iY(Mjy?UCN ziRrqLmn28B{w5)%Ys}?o^jBgWmEr*JtoPFAaJrDxpmvuwa2abx;3I{w$tWs3N|qO6 z;ZyF+@W!YUZ<3ZOxc&Gx|B|wpTt^{9I&9smRNBf5G7Z%XNf9BEJ#Vo)Z(&?*ALFz+ z>v$dh0_K9_XK*cn?&T39I*Jxv`o>ZFA!)I-aq=mq$2fB@11RqqouLUw6 z@aZz5Y)DUVZy;6LW*&4pQS&w|&*znJ8b{!`y~zD&@kccG{{v?r2s`92XDV!+5k(dG zAYD?4jem6 z$~_bhhoUz=Gi?m5ZLF%&B@>5%VPGgf(wtfs7MSDblxphf@Jfo)4@>Mz>#)6+uUS)S zS;A>y`8>C!GG6c}3~51~6^5p@as3Bpq|aj(aEebj2^}v{>Z+dPs(<+=109dqk41;t ziAa_Xfs9d62NMLeveb-Wt6{H*33EJ7qw{s%^p<07#@WT)x}brG_g}7TOv#zWpC*-E zD>`m3OngG$L4AdKJf`F&N(+<752z90XL@h109&wb_vUGK^P1%RF6RmGK;@BP@KNpO zc*P&Ce&&SQJjO)9zMKjOT;!6-zA@8ullF0yCgQnKKF|(=(h-(=Y0QJWl~L% zbE}~J>DbSqoU z^*|Nb>w(?Jfs>QLM&i*D>k&E0BrgkqF?dsBPOY;yR;OXAEDP=MM-rt;5Z}?gq3Sj^ zOgiy@=JSY&(;&WrLT|N;*_0hHl(3k~VNH8>Y$tW6nU_Hnymf@aj9E8Y=wI`uaeAbu z9Anbw#a)(rRG$cvlK8q*H72_2a~xG=u;2$5y?|Eqm8YW7E=A2Bx5GzAPla;C2n#>C zLUKdYVzA1?`VI-H50kHp#pF{2JB-Y@3e2%k7IUnI2HUY7%5ah}S%b;=M?jM$1gt1LE1ru#EkppsTO-5Le1Lx}Km#?*G0(Um8%^cw=6!lKmkSq*=Jb=92n~bd61G?l4VeRWUJx~{GfTVk=6zp z78j?gW(_KEvc%|-zLo%vtB=Q_M0mL(f{=i zOt|BUno>)th`pdL>PA9e(Tmj%4}FM2K6*n)LU$M=bnHSHjDHJ)M za^N$XIxm1O;T=gb8VPbbPD!xgjv;ytk30l%0yQSPoutYakh_b!=@d7iPU;A zW9C#PgbbEPZN1QZO}mwY6xj~~%Ws^kQj7xwW`9vc6|gIg`t*iny31l!=NUi86e_sa z;>u3f6PO@?TQTX-KI+cfGN` zElu`#8^W4INuemwghG?qpFvzG|2_ki77rUjNeX?=1~}^i-|K(@=db38paRq2;NNVh zyW;j%TOYP%J-Yp(qhHlNO_0&4#SAQs>ZuWadzMCz!u(mi&O8rWtK@OR%)pBgaT7Ct zEjPD!&s}&zCj$ALE~9y0gWH)Vx6zFK2jVPU4GZ{4Xas_mY#ElR|@!v5MZ!#~`eP!RjX~9<0E8OVaT^fEwXuh%X{nvm8(mt8P=rDGg`L983 zq-Yo7)K$rjwS-TxAd)bm{UcI`xKIAFIky(v@(}m}3oN%y^f*Q~33^+v1tghxzNY1U zI)X|DU;N3ZhKiHmB_sDAo>CBDpLb)74cDYQ{{?aG4j!^@PyAkg-zFyPR4@pm$#`-7 z4I&#AzNoqBAU}->7PvFnzTYa9taZ%?#BWw%bK9Ni96Y&Q=HD+8?wod)9t@bp;k~fw zMUJY#KK9Lq7Pnt>0*|Fb9%$}Wi1?d@tCm&>1%d>xcBF$PU^+}V`}RC+J`K$ZOt4il z68PO9SAY;r%g?sz?Y`?Xmvi4Ehx9Wj<}OMrSKlY4N&(|r5sS6?z}d$qRdT{ej4Pxa zMPs8)ALz%aHrnXZM#~X@ldLy2RGq@alBs7iCZEkOD=X=7POCG-oRN~G0-eQgtt;@l zZm{*p4jvm7n`FC{$;NtPydh~7+8D~B%vaTJqJmV++N_Mas<+vjWFlQAK_cY|Fte(i ze5D*a4arkF)$%)Uc;x#|%H}E}=@6AAn*UXr?R1T+A`>v#Q`TDcqQ8*mPEhT2=`-C9 zwa<1jk#0r+Z#N(6Bn;G@Uy)EJ(AZQr}}cB_qlFGOiXyv zoBmYH!9#TmRO2+K}&G6s^!H9X@{v0POKFGuO+N9H?Fz>r*14lMYDeKh0NPI^(Kdk=&w-7Oozzn}2C8->tJyt6ZZvZ`uj?M~W zVNuqFI?HN7*;&QnCG_0%^P)fM_!Rrdt7T`wa&S;ZOwJ2ireN<|z~jc4MNFpyIn~8J z&~hOb(uv7BzZz_6Te~Kj^5{Khq{{7%I-daZL6xg#Wr3`gFD7a@=GBbETYR4rSZ}}j zPnHi-6ew^3b``qVpfE-RInaqFC`~Mnw2!lF>9ou~_ z$)|e0uf;6?W^UA5&7hrNu1g-UDm5aG za0rSdQU}ICBl1m|a$^9S#?(;1u%|wO7QjzAs(AGvUQ4$1(o}# z6+zCRP2+iMu}8Wj&W|bn2W%hMA$AtDm+rqWut*3q67aXc8oHEO0RY~{zXf(B(CN`d z?$F8n;nm%kASPxaGi@f$vq}9hT?oBy-A<&tzJo)FaEp#@a}#eV@70iz>hf-!;1E2! zWBgl_kJv=ZWoya|4uAo1tG|K8731GUQhqS(fKd|g;o%08E{6Py` z0dt5?z2O7^j7!J-24DAV{7WMUz@Z1AW+(txEC7THI5~S}@ZMI2>ObcE=av4vE$F{`q5mgi9*7%tK5462=$DPByJXDQRF~BF1&2oE zuF<3Ryka@tDIak)My5C`f81CFbaYWF>hn$6y{BJMkTPLa+Vb@${ja4Upw-o#+E$fxZ(V~#|whplN&Zrd8W^8s*m*V*NDFJsP zBjCnPGU4sSUKp|{+)WfjYdb}$6L>zAn(pbW#Bk_w;q9|yEp#9uhr#dzNuhmu#a`n! z1fMC*^m1(2ce=-6bk8JjtlYE{*~;_ol2jhwpRW3%l03jZx#oP(KH0~{U;kj{P2e1F z+n#5W$nD{Zt-EKY-_3RIsECo6i>E%-fd{>*BS?6O3jX7^i6h(WDkFaFoyg3T^xjPN zyp?Mp7YUSnEdGAO)z)ai?kpoNYRrymne&6-;xgwK6qtK)Piu4Un_cdi1r4w*!P9zNUYthG%P2f)1~C%QBy=_Mxi@|vHmH1Uah7TuiG zp89wx8<&pfz!`4)L%RPwZY0;cS~qjjU$BHP!dgI^WU)F~C&6dw*v6`FgXZl{Q*94i zQ5*lBO{at%JArf8Kw@n7=t0DD{J3X69**kZ#aFa?Ke9%H5(B*Ih1Jzcc(YZ$pBjlU z?W8#zwm*yai!>nV4t%gQRy}09=f{b^zWrsPm&>-{FJ*}VA@SSZ%|Q!SUMr`$@m7(K zC=|kpK==25cvMnr9-n`Dah$$W8&DVwgzhgacjx((tIa*U8&g&lry0|N5lw8Tu^w6O zmG(T_kshyMZy1&iRd}T4wt#R-1NYr+@+ot=lj1Hjly|A&AZ=h%L zk9!ST%48uI|54e~m%z~luL3H9fY3J#nm6&HqJ!k;KD?BWx^QH#H7Ieh%(Ue1Seve`vL^tY0bgkYmG|Kbro^C%4|8(zjiox5?X3Q5YLti}S~Lo;+^iSv zIij{mg3{i2_N8feNo3cG*dZoqrRwR{pM2`E6<+MNtTolCe}_n)N@+srHxG=pSN4HtGbtyg49qC*yFoz7SYCR zSOPNNcJY*p)WmO1Jy3bpsQgOb9P9}ve;MEHEibKCFXN2?Ni*b`pS=<+Ztk={{n*(& zH3b(eJs+Y`mc8GVi!YYwdR9^!Aj-cfNUq}+*Cmnh#WJJngAI-O6RX1)^7yM+bJ*yj zV(@V|Am?AAsbSqwnHFQ{-y}jXtP^qZQ&Web*~3}t;mll8vnQh=5|?%C|tT}{pNF48er%6BAnaeTun_>0jov7@Xna!9Zz8c62A~2x{Ol7~d zW-^pNU`Not#)q_3|M|Ng8qHv6qFBerf|nDU%||vj(pe)U57xeS{s+IX#etW&gA$dr zDVkZ!YKeT8;It0$K616!Ys9_!fbtp*>iq|S~^eV5>P&xb;5y4dNqW}$b{Me>{HTOoW% zbmhTITS)&g76@D?|8cA73ZtKrO&5{BACVl9Gl^FF-hyY&yFd)1nL z#25(uWP-=GUPU;=yNr8RL1)}Q8qR>x^9WnkqqplA5Jv9_0R7@q-DqgM1rhGbUnnkJjkxlj2 zvgZe(GnyxJT6Bs`C3kOCb}eX>cF+x6msYl;VsuNlgs0PvT;1ohQ`<$Ji-*M_QqHN0 z&f2Y?{?ICQhu*gRkZ3O1Wg6#39$kr{<-*D?3whtjsq!|7MC?o4691>!N(u)vl)+}?x(pnr zQ3S6K`N>B+01e+Dq)5X9ei8^d+$)C|1il(d9uxqW!u=1Y5~5$9POoDiMKVle;8AW9 z7%{Kyz4bo*F7|MO3uWl?iv$Y%d~xhWOX-PaEwg1p??!N7+ocfjME)I5;%(Z=(X z6pwDwhZF_GEjH=fBkx5gIt?euk@4veH!6ucB}RU>+xZE(s>@v9Kh3sRfEY@ zdV$NV1$0RA{qC;ee$Tih7gpZ+F)4+7;D`cnNpED@>Voyww%|G6xOyXey~TOYBdI@=ix<4EEZI`zP;UI zn@3w`vv!%D7qE7Bg01%UzCuhaOi*;?%1f@av3Zs7YsgfU+u9Fu_}0sJH27nvkYtZ5 z@AfMbUg67@uJ1pt;_+_Utu^I(d{(xOmF19WZE@=bnh_<~dGAr1BHs$WB2i-!T2_0c&#h zyd1{&t;vGgKU>|A+Ou!WR+OM)>Kc0XJr{SlC1$l@>g4|J0Byx>gx|q}$>fYLmd^&{ z_uE7D_$3MXigB;|1;xG<|GeUyt#P;W`xry_CZYC%S#`~dIfKYEq1+UeoKttF$@y;j zym&nB6HPim^Etv1Nhww21gcg&r~m{2X8yk=%YV1Zp3BH%qIXzaHi#=8qOJH(d*@8n zxLlmFi*TSU&(kUo(N_HF4txaxKOEo8*jlNPn9v%RpQO6zHwm2%EMW6+i&kRZf#v zT}#m-hRA^%AMG61EIt2)esc!ounvbf7G!`L9|L_P=dob|byaB4^-y;wTzKS54CrZC zS4GWIaxNGctehL#tq8PYLV?|A-HGbIR9zxKE@)wz*gLEeQ-|ctE(=;d;Ex>d2}ZrB KrvJ~K@&5wmXD##q literal 0 HcmV?d00001 diff --git a/db.go b/db.go new file mode 100644 index 0000000..81ab2c4 --- /dev/null +++ b/db.go @@ -0,0 +1,56 @@ +package main + +import ( + "database/sql" + "embed" + "fmt" + "io/fs" + "sort" + "strings" + + _ "github.com/mattn/go-sqlite3" +) + +//go:embed migrations/*.sql +var migrationsFS embed.FS + +// openDB opens (or creates) the SQLite database and applies migrations. +func openDB(path string) (*sql.DB, error) { + dsn := fmt.Sprintf("file:%s?_journal=WAL&_foreign_keys=on&_busy_timeout=5000", path) + conn, err := sql.Open("sqlite3", dsn) + if err != nil { + return nil, fmt.Errorf("open: %w", err) + } + if err := conn.Ping(); err != nil { + return nil, fmt.Errorf("ping: %w", err) + } + if err := applyMigrations(conn); err != nil { + conn.Close() + return nil, fmt.Errorf("migrations: %w", err) + } + return conn, nil +} + +func applyMigrations(conn *sql.DB) error { + files, err := fs.Glob(migrationsFS, "migrations/*.sql") + if err != nil { + return err + } + sort.Strings(files) + for _, f := range files { + b, err := migrationsFS.ReadFile(f) + if err != nil { + return err + } + if _, err := conn.Exec(string(b)); err != nil { + msg := err.Error() + // Idempotent ignores for ADD COLUMN re-runs etc. + if strings.Contains(msg, "duplicate column") || + strings.Contains(msg, "already exists") { + continue + } + return fmt.Errorf("%s: %w", f, err) + } + } + return nil +} diff --git a/dod.go b/dod.go new file mode 100644 index 0000000..a7a6c92 --- /dev/null +++ b/dod.go @@ -0,0 +1,169 @@ +package main + +import ( + "database/sql" + "time" + + "github.com/google/uuid" +) + +type DodItem struct { + ID string `json:"id"` + RunID string `json:"run_id"` + ItemKey string `json:"item_key"` + Kind string `json:"kind"` + Expected string `json:"expected"` + Required bool `json:"required"` + Status string `json:"status"` + CreatedAt int64 `json:"created_at"` +} + +type DodEvidence struct { + ID string `json:"id"` + DodItemID string `json:"dod_item_id"` + Kind string `json:"kind"` + PayloadPath *string `json:"payload_path,omitempty"` + PayloadURL *string `json:"payload_url,omitempty"` + PayloadText *string `json:"payload_text,omitempty"` + AttachedAt int64 `json:"attached_at"` + ValidatedAt *int64 `json:"validated_at,omitempty"` + ValidatedBy *string `json:"validated_by,omitempty"` +} + +func createDodItem(db *sql.DB, runID, key, kind, expected string, required bool) (DodItem, error) { + id := "dod_" + uuid.New().String()[:12] + now := time.Now().Unix() + reqInt := 0 + if required { + reqInt = 1 + } + _, err := db.Exec(`INSERT INTO dod_items + (id, run_id, item_key, kind, expected, required, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, 'pending', ?)`, + id, runID, key, kind, expected, reqInt, now) + if err != nil { + return DodItem{}, err + } + return DodItem{ + ID: id, RunID: runID, ItemKey: key, Kind: kind, Expected: expected, + Required: required, Status: "pending", CreatedAt: now, + }, nil +} + +func listDodItems(db *sql.DB, runID string) ([]DodItem, error) { + rows, err := db.Query(`SELECT id, run_id, item_key, kind, expected, required, status, created_at + FROM dod_items WHERE run_id = ? ORDER BY created_at`, runID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []DodItem{} + for rows.Next() { + var it DodItem + var reqInt int + if err := rows.Scan(&it.ID, &it.RunID, &it.ItemKey, &it.Kind, &it.Expected, &reqInt, &it.Status, &it.CreatedAt); err != nil { + return nil, err + } + it.Required = reqInt != 0 + out = append(out, it) + } + return out, nil +} + +func attachEvidence(db *sql.DB, itemID, kind string, path, url, text *string) (DodEvidence, error) { + id := "ev_" + uuid.New().String()[:12] + now := time.Now().Unix() + _, err := db.Exec(`INSERT INTO dod_evidence + (id, dod_item_id, kind, payload_path, payload_url, payload_text, attached_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + id, itemID, kind, nullStr(path), nullStr(url), nullStr(text), now) + if err != nil { + return DodEvidence{}, err + } + // Auto-bump item status to 'done' on first evidence + _, _ = db.Exec(`UPDATE dod_items SET status = 'done' WHERE id = ? AND status = 'pending'`, itemID) + return DodEvidence{ + ID: id, DodItemID: itemID, Kind: kind, + PayloadPath: path, PayloadURL: url, PayloadText: text, + AttachedAt: now, + }, nil +} + +func validateEvidence(db *sql.DB, evID, validatedBy string) error { + now := time.Now().Unix() + res, err := db.Exec(`UPDATE dod_evidence + SET validated_at = ?, validated_by = ? WHERE id = ?`, now, validatedBy, evID) + if err != nil { + return err + } + n, _ := res.RowsAffected() + if n == 0 { + return sql.ErrNoRows + } + // Bump item status to validated + _, _ = db.Exec(`UPDATE dod_items SET status = 'validated' + WHERE id = (SELECT dod_item_id FROM dod_evidence WHERE id = ?)`, evID) + return nil +} + +func listEvidence(db *sql.DB, itemID string) ([]DodEvidence, error) { + rows, err := db.Query(`SELECT id, dod_item_id, kind, payload_path, payload_url, payload_text, + attached_at, validated_at, validated_by + FROM dod_evidence WHERE dod_item_id = ? ORDER BY attached_at`, itemID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []DodEvidence{} + for rows.Next() { + var ev DodEvidence + var path, url, text, valBy sql.NullString + var valAt sql.NullInt64 + if err := rows.Scan(&ev.ID, &ev.DodItemID, &ev.Kind, &path, &url, &text, + &ev.AttachedAt, &valAt, &valBy); err != nil { + return nil, err + } + if path.Valid { + s := path.String + ev.PayloadPath = &s + } + if url.Valid { + s := url.String + ev.PayloadURL = &s + } + if text.Valid { + s := text.String + ev.PayloadText = &s + } + if valAt.Valid { + v := valAt.Int64 + ev.ValidatedAt = &v + } + if valBy.Valid { + s := valBy.String + ev.ValidatedBy = &s + } + out = append(out, ev) + } + return out, nil +} + +// dodGateOpen returns true when every required item has at least one validated evidence. +func dodGateOpen(db *sql.DB, runID string) (bool, error) { + row := db.QueryRow(` + SELECT COUNT(*) FROM dod_items + WHERE run_id = ? AND required = 1 + AND status != 'validated'`, runID) + var n int + if err := row.Scan(&n); err != nil { + return false, err + } + return n == 0, nil +} + +func nullStr(p *string) interface{} { + if p == nil { + return nil + } + return *p +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5508a6d --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module agent_runner_api + +go 1.22 + +require ( + github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.22 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e056088 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/handlers.go b/handlers.go new file mode 100644 index 0000000..4e0d130 --- /dev/null +++ b/handlers.go @@ -0,0 +1,388 @@ +package main + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/google/uuid" +) + +type Run struct { + ID string `json:"id"` + WorkflowID *string `json:"workflow_id,omitempty"` + IssueID *string `json:"issue_id,omitempty"` + CardID *string `json:"card_id,omitempty"` + KanbanApp *string `json:"kanban_app,omitempty"` + Mode string `json:"mode"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + Status string `json:"status"` + StartedAt int64 `json:"started_at"` + FinishedAt *int64 `json:"finished_at,omitempty"` + AgentPID *int `json:"agent_pid,omitempty"` + AgentLogPath *string `json:"agent_log_path,omitempty"` + Error *string `json:"error,omitempty"` +} + +type createRunRequest struct { + WorkflowID string `json:"workflow_id"` + IssueID string `json:"issue_id"` + CardID string `json:"card_id"` + KanbanApp string `json:"kanban_app"` + Mode string `json:"mode"` + Prompt string `json:"prompt"` +} + +type createRunResponse struct { + RunID string `json:"run_id"` + Branch string `json:"branch"` + WorktreePath string `json:"worktree_path"` + SSEURL string `json:"sse_url"` +} + +func writeJSON(w http.ResponseWriter, status int, body interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +func writeErr(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +func (a *App) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]interface{}{ + "status": "ok", + "port": a.cfg.Port, + "db": a.cfg.DBPath, + }) +} + +// POST /api/runs +func (a *App) handleCreateRun(w http.ResponseWriter, r *http.Request) { + var req createRunRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.Mode == "" { + req.Mode = "agent" + } + + runID := "run_" + uuid.New().String()[:12] + slug := req.IssueID + if slug == "" { + slug = req.CardID + } + if slug == "" { + slug = runID + } + branch := fmt.Sprintf("auto/%s", slug) + worktreePath := filepath.Join(a.cfg.WorktreesRoot, "wt-"+slug+"-"+runID[4:]) + now := time.Now().Unix() + + // Insert pending row + _, err := a.db.Exec(`INSERT INTO runs + (id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, status, started_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?)`, + runID, + nullStrFromS(req.WorkflowID), + nullStrFromS(req.IssueID), + nullStrFromS(req.CardID), + nullStrFromS(req.KanbanApp), + req.Mode, branch, worktreePath, now) + if err != nil { + writeErr(w, http.StatusInternalServerError, "insert run: "+err.Error()) + return + } + + // Spawn async (worktree create blocks briefly but is short) + prompt := req.Prompt + if prompt == "" { + prompt = fmt.Sprintf("Resolve %s in branch %s", slug, branch) + } + logPath := filepath.Join(worktreePath, "agent.log") + + res := Spawn(SpawnConfig{ + RepoRoot: a.cfg.RepoRoot, + Branch: branch, + WorktreePath: worktreePath, + Prompt: prompt, + LogPath: logPath, + }) + + if res.Error != "" { + _, _ = a.db.Exec(`UPDATE runs SET status = 'failed', error = ?, finished_at = ? + WHERE id = ?`, res.Error, time.Now().Unix(), runID) + writeErr(w, http.StatusInternalServerError, "spawn: "+res.Error) + return + } + + // Update row with PID + log + worktree entry + _, _ = a.db.Exec(`UPDATE runs SET agent_pid = ?, agent_log_path = ?, status = 'running' + WHERE id = ?`, res.PID, res.LogPath, runID) + wtID := "wt_" + uuid.New().String()[:12] + _, _ = a.db.Exec(`INSERT INTO worktrees (id, run_id, path, branch, created_at) + VALUES (?, ?, ?, ?, ?)`, wtID, runID, worktreePath, branch, now) + + a.sse.Publish(runID, sseEvent{Event: "status", Data: `{"status":"running","pid":` + strconv.Itoa(res.PID) + `}`}) + + writeJSON(w, http.StatusCreated, createRunResponse{ + RunID: runID, + Branch: branch, + WorktreePath: worktreePath, + SSEURL: fmt.Sprintf("/api/runs/%s/sse", runID), + }) +} + +// GET /api/runs?status=&app=&since= +func (a *App) handleListRuns(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + where := []string{} + args := []interface{}{} + if s := q.Get("status"); s != "" { + where = append(where, "status = ?") + args = append(args, s) + } + if app := q.Get("app"); app != "" { + where = append(where, "kanban_app = ?") + args = append(args, app) + } + if since := q.Get("since"); since != "" { + if ts, err := strconv.ParseInt(since, 10, 64); err == nil { + where = append(where, "started_at >= ?") + args = append(args, ts) + } + } + sqlStr := `SELECT id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, + status, started_at, finished_at, agent_pid, agent_log_path, error + FROM runs` + if len(where) > 0 { + sqlStr += " WHERE " + strings.Join(where, " AND ") + } + sqlStr += " ORDER BY started_at DESC LIMIT 200" + rows, err := a.db.Query(sqlStr, args...) + if err != nil { + writeErr(w, http.StatusInternalServerError, "query: "+err.Error()) + return + } + defer rows.Close() + out := []Run{} + for rows.Next() { + run, err := scanRun(rows) + if err != nil { + writeErr(w, http.StatusInternalServerError, "scan: "+err.Error()) + return + } + out = append(out, run) + } + writeJSON(w, http.StatusOK, out) +} + +// GET /api/runs/:id +func (a *App) handleGetRun(w http.ResponseWriter, r *http.Request, id string) { + row := a.db.QueryRow(`SELECT id, workflow_id, issue_id, card_id, kanban_app, mode, branch, worktree_path, + status, started_at, finished_at, agent_pid, agent_log_path, error + FROM runs WHERE id = ?`, id) + run, err := scanRun(row) + if errors.Is(err, sql.ErrNoRows) { + writeErr(w, http.StatusNotFound, "run not found") + return + } + if err != nil { + writeErr(w, http.StatusInternalServerError, "scan: "+err.Error()) + return + } + // Include dod items + items, _ := listDodItems(a.db, id) + writeJSON(w, http.StatusOK, map[string]interface{}{ + "run": run, + "dod_items": items, + }) +} + +// POST /api/runs/:id/evidence +type evidenceRequest struct { + ItemID string `json:"item_id"` + ItemKey string `json:"item_key"` + Kind string `json:"kind"` + PayloadPath *string `json:"payload_path,omitempty"` + PayloadURL *string `json:"payload_url,omitempty"` + PayloadText *string `json:"payload_text,omitempty"` +} + +func (a *App) handleAttachEvidence(w http.ResponseWriter, r *http.Request, runID string) { + var req evidenceRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + itemID := req.ItemID + if itemID == "" && req.ItemKey != "" { + // auto-create item if key provided + it, err := createDodItem(a.db, runID, req.ItemKey, "manual", "", true) + if err != nil { + writeErr(w, http.StatusInternalServerError, "auto-create item: "+err.Error()) + return + } + itemID = it.ID + } + if itemID == "" { + writeErr(w, http.StatusBadRequest, "item_id or item_key required") + return + } + if req.Kind == "" { + req.Kind = "text" + } + ev, err := attachEvidence(a.db, itemID, req.Kind, req.PayloadPath, req.PayloadURL, req.PayloadText) + if err != nil { + writeErr(w, http.StatusInternalServerError, "attach: "+err.Error()) + return + } + a.sse.Publish(runID, sseEvent{Event: "evidence", Data: `{"item_id":"` + itemID + `","evidence_id":"` + ev.ID + `"}`}) + writeJSON(w, http.StatusCreated, ev) +} + +// POST /api/runs/:id/evidence/:eid/validate +type validateRequest struct { + ValidatedBy string `json:"validated_by"` +} + +func (a *App) handleValidateEvidence(w http.ResponseWriter, r *http.Request, runID, evID string) { + var req validateRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if req.ValidatedBy == "" { + req.ValidatedBy = "human" + } + if err := validateEvidence(a.db, evID, req.ValidatedBy); err != nil { + if errors.Is(err, sql.ErrNoRows) { + writeErr(w, http.StatusNotFound, "evidence not found") + return + } + writeErr(w, http.StatusInternalServerError, "validate: "+err.Error()) + return + } + a.sse.Publish(runID, sseEvent{Event: "validated", Data: `{"evidence_id":"` + evID + `","validated_by":"` + req.ValidatedBy + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "validated"}) +} + +// POST /api/runs/:id/merge +func (a *App) handleMergeRun(w http.ResponseWriter, r *http.Request, runID string) { + open, err := dodGateOpen(a.db, runID) + if err != nil { + writeErr(w, http.StatusInternalServerError, "gate check: "+err.Error()) + return + } + if !open { + writeErr(w, http.StatusPreconditionFailed, "dod gate closed — required items not validated") + return + } + row := a.db.QueryRow(`SELECT branch FROM runs WHERE id = ?`, runID) + var branch string + if err := row.Scan(&branch); err != nil { + writeErr(w, http.StatusNotFound, "run not found") + return + } + out, err := exec.Command("git", "-C", a.cfg.RepoRoot, "merge", "--no-ff", branch).CombinedOutput() + if err != nil { + writeErr(w, http.StatusInternalServerError, "merge: "+err.Error()+": "+string(out)) + return + } + _, _ = a.db.Exec(`UPDATE runs SET status = 'merged', finished_at = ? WHERE id = ?`, + time.Now().Unix(), runID) + a.sse.Publish(runID, sseEvent{Event: "merged", Data: `{"branch":"` + branch + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "merged", "branch": branch}) +} + +// POST /api/runs/:id/abort +func (a *App) handleAbortRun(w http.ResponseWriter, r *http.Request, runID string) { + row := a.db.QueryRow(`SELECT agent_pid, worktree_path, branch FROM runs WHERE id = ?`, runID) + var pidNS sql.NullInt64 + var wt, branch string + if err := row.Scan(&pidNS, &wt, &branch); err != nil { + writeErr(w, http.StatusNotFound, "run not found") + return + } + pid := 0 + if pidNS.Valid { + pid = int(pidNS.Int64) + } + if err := Cleanup(a.cfg.RepoRoot, pid, wt, branch); err != nil { + // non-fatal; record but continue + fmt.Println("cleanup warning:", err) + } + now := time.Now().Unix() + _, _ = a.db.Exec(`UPDATE runs SET status = 'aborted', finished_at = ? WHERE id = ?`, now, runID) + _, _ = a.db.Exec(`UPDATE worktrees SET removed_at = ? WHERE run_id = ? AND removed_at IS NULL`, now, runID) + a.sse.Publish(runID, sseEvent{Event: "aborted", Data: `{"run_id":"` + runID + `"}`}) + writeJSON(w, http.StatusOK, map[string]string{"status": "aborted"}) +} + +// --- helpers --- + +type rowScanner interface { + Scan(dest ...interface{}) error +} + +func scanRun(s rowScanner) (Run, error) { + var r Run + var workflowID, issueID, cardID, kanbanApp, logPath, errStr sql.NullString + var finishedAt sql.NullInt64 + var agentPID sql.NullInt64 + err := s.Scan(&r.ID, &workflowID, &issueID, &cardID, &kanbanApp, &r.Mode, &r.Branch, &r.WorktreePath, + &r.Status, &r.StartedAt, &finishedAt, &agentPID, &logPath, &errStr) + if err != nil { + return r, err + } + if workflowID.Valid { + s := workflowID.String + r.WorkflowID = &s + } + if issueID.Valid { + s := issueID.String + r.IssueID = &s + } + if cardID.Valid { + s := cardID.String + r.CardID = &s + } + if kanbanApp.Valid { + s := kanbanApp.String + r.KanbanApp = &s + } + if finishedAt.Valid { + v := finishedAt.Int64 + r.FinishedAt = &v + } + if agentPID.Valid { + v := int(agentPID.Int64) + r.AgentPID = &v + } + if logPath.Valid { + s := logPath.String + r.AgentLogPath = &s + } + if errStr.Valid { + s := errStr.String + r.Error = &s + } + return r, nil +} + +func nullStrFromS(s string) interface{} { + if s == "" { + return nil + } + return s +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..735c3a5 --- /dev/null +++ b/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "database/sql" + "flag" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" +) + +type Config struct { + Port int + DBPath string + RepoRoot string + WorktreesRoot string +} + +type App struct { + cfg Config + db *sql.DB + sse *sseHub +} + +func main() { + var ( + port = flag.Int("port", 8486, "HTTP port") + dbPath = flag.String("db", "agent_runs.db", "SQLite database path") + repoRoot = flag.String("repo-root", "", "Git repo root (defaults to $PWD)") + wtRoot = flag.String("worktrees-root", "/tmp", "Parent dir for worktrees") + ) + flag.Parse() + + root := *repoRoot + if root == "" { + root, _ = os.Getwd() + } + + db, err := openDB(*dbPath) + if err != nil { + log.Fatalf("openDB: %v", err) + } + defer db.Close() + + app := &App{ + cfg: Config{ + Port: *port, + DBPath: *dbPath, + RepoRoot: root, + WorktreesRoot: *wtRoot, + }, + db: db, + sse: newSSEHub(), + } + + mux := http.NewServeMux() + mux.HandleFunc("/api/health", app.handleHealth) + mux.HandleFunc("/api/runs", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + app.handleCreateRun(w, r) + case http.MethodGet: + app.handleListRuns(w, r) + default: + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + } + }) + mux.HandleFunc("/api/runs/", func(w http.ResponseWriter, r *http.Request) { + app.routeRun(w, r) + }) + + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", *port), + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + // Graceful shutdown + go func() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) + }() + + log.Printf("agent_runner_api listening :%d db=%s repo=%s", *port, *dbPath, root) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %v", err) + } +} + +// routeRun parses /api/runs/:id[/...] subroutes. +func (a *App) routeRun(w http.ResponseWriter, r *http.Request) { + path := strings.TrimPrefix(r.URL.Path, "/api/runs/") + parts := strings.Split(path, "/") + if len(parts) == 0 || parts[0] == "" { + http.NotFound(w, r) + return + } + runID := parts[0] + + if len(parts) == 1 { + if r.Method != http.MethodGet { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleGetRun(w, r, runID) + return + } + + switch parts[1] { + case "sse": + a.handleRunSSE(w, r, runID) + case "merge": + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleMergeRun(w, r, runID) + case "abort": + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleAbortRun(w, r, runID) + case "evidence": + // /api/runs/:id/evidence (POST) — attach + // /api/runs/:id/evidence/:eid/validate (POST) — validate + if len(parts) == 2 { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleAttachEvidence(w, r, runID) + return + } + if len(parts) == 4 && parts[3] == "validate" { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + a.handleValidateEvidence(w, r, runID, parts[2]) + return + } + http.NotFound(w, r) + default: + http.NotFound(w, r) + } +} diff --git a/main_test.go b/main_test.go new file mode 100644 index 0000000..a3bdffa --- /dev/null +++ b/main_test.go @@ -0,0 +1,245 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" +) + +func setupApp(t *testing.T) (*App, func()) { + t.Helper() + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + + // Init a throwaway git repo so worktree commands work + repoRoot := filepath.Join(dir, "repo") + if err := os.MkdirAll(repoRoot, 0o755); err != nil { + t.Fatalf("mkdir repo: %v", err) + } + mustRun(t, repoRoot, "git", "init", "-b", "master") + mustRun(t, repoRoot, "git", "config", "user.email", "test@local") + mustRun(t, repoRoot, "git", "config", "user.name", "test") + // commit something + readme := filepath.Join(repoRoot, "README.md") + _ = os.WriteFile(readme, []byte("hi\n"), 0o644) + mustRun(t, repoRoot, "git", "add", "-A") + mustRun(t, repoRoot, "git", "commit", "-m", "initial") + + db, err := openDB(dbPath) + if err != nil { + t.Fatalf("openDB: %v", err) + } + app := &App{ + cfg: Config{ + Port: 0, + DBPath: dbPath, + RepoRoot: repoRoot, + WorktreesRoot: filepath.Join(dir, "worktrees"), + }, + db: db, + sse: newSSEHub(), + } + t.Setenv("AGENT_RUNNER_STUB", "1") + cleanup := func() { + db.Close() + } + return app, cleanup +} + +func mustRun(t *testing.T, dir, name string, args ...string) { + t.Helper() + cmd := exec.Command(name, args...) + cmd.Dir = dir + out, err := cmd.CombinedOutput() + if err != nil { + t.Fatalf("%s %v: %v: %s", name, args, err, string(out)) + } +} + +func TestHealth(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + req := httptest.NewRequest(http.MethodGet, "/api/health", nil) + rec := httptest.NewRecorder() + app.handleHealth(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d", rec.Code) + } + var body map[string]interface{} + _ = json.Unmarshal(rec.Body.Bytes(), &body) + if body["status"] != "ok" { + t.Fatalf("unexpected body: %v", body) + } +} + +func TestCreateRun(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + payload := map[string]string{ + "issue_id": "0999", + "mode": "agent", + "prompt": "test prompt", + } + b, _ := json.Marshal(payload) + req := httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b)) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, req) + + if rec.Code != http.StatusCreated { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + var res createRunResponse + if err := json.Unmarshal(rec.Body.Bytes(), &res); err != nil { + t.Fatalf("json: %v", err) + } + if !strings.HasPrefix(res.RunID, "run_") { + t.Fatalf("bad run_id: %s", res.RunID) + } + if !strings.HasPrefix(res.Branch, "auto/") { + t.Fatalf("bad branch: %s", res.Branch) + } + // Verify row inserted + var status string + if err := app.db.QueryRow(`SELECT status FROM runs WHERE id = ?`, res.RunID).Scan(&status); err != nil { + t.Fatalf("query: %v", err) + } + if status != "running" && status != "pending" { + t.Fatalf("expected running/pending, got %s", status) + } + // Verify worktree row + var count int + _ = app.db.QueryRow(`SELECT COUNT(*) FROM worktrees WHERE run_id = ?`, res.RunID).Scan(&count) + if count != 1 { + t.Fatalf("expected 1 worktree row, got %d", count) + } +} + +func TestAbortRun(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Create a run + payload := map[string]string{"issue_id": "abort_test", "prompt": "x"} + b, _ := json.Marshal(payload) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + if rec.Code != http.StatusCreated { + t.Fatalf("create failed: %d %s", rec.Code, rec.Body.String()) + } + var res createRunResponse + _ = json.Unmarshal(rec.Body.Bytes(), &res) + + // Abort + rec2 := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/runs/%s/abort", res.RunID), nil) + app.handleAbortRun(rec2, req, res.RunID) + + if rec2.Code != http.StatusOK { + t.Fatalf("abort status=%d body=%s", rec2.Code, rec2.Body.String()) + } + var status string + _ = app.db.QueryRow(`SELECT status FROM runs WHERE id = ?`, res.RunID).Scan(&status) + if status != "aborted" { + t.Fatalf("expected aborted, got %s", status) + } + // Worktree row marked removed + var removed bool + _ = app.db.QueryRow(`SELECT removed_at IS NOT NULL FROM worktrees WHERE run_id = ?`, res.RunID).Scan(&removed) + if !removed { + t.Fatalf("expected worktree removed_at populated") + } +} + +func TestEvidencePersist(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Create run + b, _ := json.Marshal(map[string]string{"issue_id": "ev_test"}) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + var run createRunResponse + _ = json.Unmarshal(rec.Body.Bytes(), &run) + + // Attach evidence with auto-create item + text := "tests pass" + evReq := evidenceRequest{ + ItemKey: "tests_green", + Kind: "text", + PayloadText: &text, + } + body, _ := json.Marshal(evReq) + rec2 := httptest.NewRecorder() + app.handleAttachEvidence(rec2, httptest.NewRequest(http.MethodPost, + fmt.Sprintf("/api/runs/%s/evidence", run.RunID), bytes.NewReader(body)), run.RunID) + if rec2.Code != http.StatusCreated { + t.Fatalf("evidence status=%d body=%s", rec2.Code, rec2.Body.String()) + } + + // Verify rows + var itemCount, evCount int + _ = app.db.QueryRow(`SELECT COUNT(*) FROM dod_items WHERE run_id = ?`, run.RunID).Scan(&itemCount) + _ = app.db.QueryRow(`SELECT COUNT(*) FROM dod_evidence`).Scan(&evCount) + if itemCount != 1 || evCount != 1 { + t.Fatalf("expected 1+1, got items=%d evidence=%d", itemCount, evCount) + } + + // Status should have bumped to 'done' + var status string + _ = app.db.QueryRow(`SELECT status FROM dod_items WHERE run_id = ?`, run.RunID).Scan(&status) + if status != "done" { + t.Fatalf("expected done, got %s", status) + } +} + +func TestListFilter(t *testing.T) { + app, cleanup := setupApp(t) + defer cleanup() + + // Insert two runs with different kanban_app (unique issue_ids to avoid branch collision) + for i, kapp := range []string{"kanban_a", "kanban_b", "kanban_a"} { + k := kapp + b, _ := json.Marshal(map[string]string{ + "kanban_app": k, + "issue_id": fmt.Sprintf("x_%s_%d", k, i), + }) + rec := httptest.NewRecorder() + app.handleCreateRun(rec, httptest.NewRequest(http.MethodPost, "/api/runs", bytes.NewReader(b))) + if rec.Code != http.StatusCreated { + t.Fatalf("setup row failed: %s", rec.Body.String()) + } + } + + rec := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/api/runs?app=kanban_a", nil) + app.handleListRuns(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status=%d body=%s", rec.Code, rec.Body.String()) + } + var runs []Run + _ = json.Unmarshal(rec.Body.Bytes(), &runs) + if len(runs) != 2 { + t.Fatalf("expected 2 runs for kanban_a, got %d", len(runs)) + } + for _, r := range runs { + if r.KanbanApp == nil || *r.KanbanApp != "kanban_a" { + t.Fatalf("unexpected kanban_app: %v", r.KanbanApp) + } + } +} + +// drain reads to EOF (used to discard test response bodies). Not strictly needed +// for httptest but kept for future use. +func drain(rc io.ReadCloser) { _, _ = io.Copy(io.Discard, rc); rc.Close() } diff --git a/migrations/001_workflows.sql b/migrations/001_workflows.sql new file mode 100644 index 0000000..54c08ba --- /dev/null +++ b/migrations/001_workflows.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS workflows ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + prompt_template TEXT NOT NULL DEFAULT '', + dod_schema_json TEXT NOT NULL DEFAULT '[]', + created_at INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_workflows_name ON workflows(name); diff --git a/migrations/002_runs.sql b/migrations/002_runs.sql new file mode 100644 index 0000000..4f8667f --- /dev/null +++ b/migrations/002_runs.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS runs ( + id TEXT PRIMARY KEY, + workflow_id TEXT, + issue_id TEXT, + card_id TEXT, + kanban_app TEXT, + mode TEXT NOT NULL DEFAULT 'agent', + branch TEXT NOT NULL DEFAULT '', + worktree_path TEXT NOT NULL DEFAULT '', + status TEXT NOT NULL DEFAULT 'pending', + started_at INTEGER NOT NULL, + finished_at INTEGER, + agent_pid INTEGER, + agent_log_path TEXT, + error TEXT, + FOREIGN KEY (workflow_id) REFERENCES workflows(id) +); + +CREATE INDEX IF NOT EXISTS idx_runs_status ON runs(status); +CREATE INDEX IF NOT EXISTS idx_runs_issue ON runs(issue_id); +CREATE INDEX IF NOT EXISTS idx_runs_card ON runs(card_id); +CREATE INDEX IF NOT EXISTS idx_runs_kanban_app ON runs(kanban_app); +CREATE INDEX IF NOT EXISTS idx_runs_started ON runs(started_at DESC); diff --git a/migrations/003_worktrees.sql b/migrations/003_worktrees.sql new file mode 100644 index 0000000..dc784d1 --- /dev/null +++ b/migrations/003_worktrees.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS worktrees ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + path TEXT NOT NULL, + branch TEXT NOT NULL, + created_at INTEGER NOT NULL, + removed_at INTEGER, + FOREIGN KEY (run_id) REFERENCES runs(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_worktrees_run ON worktrees(run_id); diff --git a/migrations/004_dod_items.sql b/migrations/004_dod_items.sql new file mode 100644 index 0000000..27d04aa --- /dev/null +++ b/migrations/004_dod_items.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS dod_items ( + id TEXT PRIMARY KEY, + run_id TEXT NOT NULL, + item_key TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'manual', + expected TEXT NOT NULL DEFAULT '', + required INTEGER NOT NULL DEFAULT 1, + status TEXT NOT NULL DEFAULT 'pending', + created_at INTEGER NOT NULL, + FOREIGN KEY (run_id) REFERENCES runs(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_dod_items_run ON dod_items(run_id); +CREATE INDEX IF NOT EXISTS idx_dod_items_status ON dod_items(status); diff --git a/migrations/005_dod_evidence.sql b/migrations/005_dod_evidence.sql new file mode 100644 index 0000000..1677303 --- /dev/null +++ b/migrations/005_dod_evidence.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS dod_evidence ( + id TEXT PRIMARY KEY, + dod_item_id TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'text', + payload_path TEXT, + payload_url TEXT, + payload_text TEXT, + attached_at INTEGER NOT NULL, + validated_at INTEGER, + validated_by TEXT, + FOREIGN KEY (dod_item_id) REFERENCES dod_items(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_dod_evidence_item ON dod_evidence(dod_item_id); diff --git a/sse.go b/sse.go new file mode 100644 index 0000000..ebfcd5d --- /dev/null +++ b/sse.go @@ -0,0 +1,107 @@ +package main + +import ( + "fmt" + "net/http" + "sync" + "time" +) + +// sseHub broadcasts events keyed by run_id to N subscribers. +type sseHub struct { + mu sync.Mutex + subscribers map[string]map[chan sseEvent]struct{} +} + +type sseEvent struct { + Event string + Data string +} + +func newSSEHub() *sseHub { + return &sseHub{subscribers: make(map[string]map[chan sseEvent]struct{})} +} + +func (h *sseHub) subscribe(runID string) chan sseEvent { + ch := make(chan sseEvent, 16) + h.mu.Lock() + if _, ok := h.subscribers[runID]; !ok { + h.subscribers[runID] = make(map[chan sseEvent]struct{}) + } + h.subscribers[runID][ch] = struct{}{} + h.mu.Unlock() + return ch +} + +func (h *sseHub) unsubscribe(runID string, ch chan sseEvent) { + h.mu.Lock() + if subs, ok := h.subscribers[runID]; ok { + delete(subs, ch) + if len(subs) == 0 { + delete(h.subscribers, runID) + } + } + h.mu.Unlock() + close(ch) +} + +func (h *sseHub) Publish(runID string, ev sseEvent) { + h.mu.Lock() + subs := h.subscribers[runID] + chans := make([]chan sseEvent, 0, len(subs)) + for c := range subs { + chans = append(chans, c) + } + h.mu.Unlock() + for _, c := range chans { + select { + case c <- ev: + default: + // drop on slow subscriber + } + } +} + +func (a *App) handleRunSSE(w http.ResponseWriter, r *http.Request, runID string) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + // Subscribe + ch := a.sse.subscribe(runID) + defer a.sse.unsubscribe(runID, ch) + + // Send initial connected event + fmt.Fprintf(w, "event: connected\ndata: {\"run_id\":\"%s\"}\n\n", runID) + flusher.Flush() + + // Heartbeat + events + heartbeat := time.NewTicker(15 * time.Second) + defer heartbeat.Stop() + + for { + select { + case <-r.Context().Done(): + return + case <-heartbeat.C: + fmt.Fprintf(w, ": heartbeat\n\n") + flusher.Flush() + case ev, ok := <-ch: + if !ok { + return + } + if ev.Event != "" { + fmt.Fprintf(w, "event: %s\n", ev.Event) + } + fmt.Fprintf(w, "data: %s\n\n", ev.Data) + flusher.Flush() + } + } +}