Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 261f96f71b | |||
| 3db4443b65 | |||
| 4822208306 | |||
| cd0ba85a22 | |||
| bdd0c6266d | |||
| b3cf8b41aa | |||
| 98839cd8a8 |
@@ -43,6 +43,73 @@ Desde la TUI puedes:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## HTTP API (v0.1)
|
||||||
|
|
||||||
|
El launcher expone una API REST + SSE cuando se arranca con `--api-port <N>`.
|
||||||
|
|
||||||
|
### Arrancar con API habilitada
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Requiere AGENTS_API_KEY en .env (generar con: openssl rand -hex 32)
|
||||||
|
./bin/launcher --log-level info --api-port 8487
|
||||||
|
|
||||||
|
# Con systemd (VPS):
|
||||||
|
sudo systemctl start agents_and_robots.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### Autenticacion
|
||||||
|
|
||||||
|
Todos los endpoints (excepto `/health`) requieren:
|
||||||
|
|
||||||
|
```
|
||||||
|
Authorization: Bearer <AGENTS_API_KEY>
|
||||||
|
```
|
||||||
|
|
||||||
|
Comparacion con `crypto/subtle.ConstantTimeCompare` — resistente a timing attacks.
|
||||||
|
|
||||||
|
### Endpoints REST
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Liveness (sin auth)
|
||||||
|
curl http://localhost:8487/health
|
||||||
|
|
||||||
|
# Listar agentes
|
||||||
|
curl -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents
|
||||||
|
|
||||||
|
# Detalle + logs de un agente
|
||||||
|
curl -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot
|
||||||
|
|
||||||
|
# Control
|
||||||
|
curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/stop
|
||||||
|
curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/start
|
||||||
|
curl -X POST -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/agents/assistant-bot/restart
|
||||||
|
|
||||||
|
# Logs snapshot
|
||||||
|
curl -H "Authorization: Bearer $AGENTS_API_KEY" "http://localhost:8487/agents/assistant-bot/logs?n=50"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Endpoints SSE
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Stream de cambios de estado (stop/start) — un evento por transicion
|
||||||
|
curl -N -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/sse/status
|
||||||
|
|
||||||
|
# Tail en vivo del logfile de un agente (< 1s de lag)
|
||||||
|
curl -N -H "Authorization: Bearer $AGENTS_API_KEY" http://localhost:8487/sse/agents/assistant-bot/logs
|
||||||
|
```
|
||||||
|
|
||||||
|
### En produccion (VPS)
|
||||||
|
|
||||||
|
El VPS expone la API via Traefik con TLS en `agents.organic-machine.com`.
|
||||||
|
Los pasos DNS + Traefik los configura el humano tras el merge (ver `app.md` seccion Traefik).
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Con Traefik configurado:
|
||||||
|
curl -fsS -H "Authorization: Bearer $AGENTS_API_KEY" https://agents.organic-machine.com/agents
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Principio de diseño
|
## Principio de diseño
|
||||||
|
|
||||||
El proyecto usa el patrón **pure core / impure shell**:
|
El proyecto usa el patrón **pure core / impure shell**:
|
||||||
|
|||||||
@@ -2,8 +2,8 @@
|
|||||||
name: agents_and_robots
|
name: agents_and_robots
|
||||||
lang: go
|
lang: go
|
||||||
domain: agents
|
domain: agents
|
||||||
version: 0.1.0
|
version: 0.2.0
|
||||||
description: "Plataforma Go de bots autonomos Matrix con arquitectura pure core / impure shell. Launcher, agentctl, dashboard TUI y register."
|
description: "Plataforma Go de bots autonomos Matrix con arquitectura pure core / impure shell. Launcher, agentctl, dashboard TUI, register y HTTP API REST+SSE."
|
||||||
tags: [agents, matrix, bots, llm, element, e2ee, tools, service]
|
tags: [agents, matrix, bots, llm, element, e2ee, tools, service]
|
||||||
uses_functions: []
|
uses_functions: []
|
||||||
uses_types: []
|
uses_types: []
|
||||||
@@ -12,8 +12,8 @@ entry_point: "cmd/launcher/main.go"
|
|||||||
dir_path: "projects/element_agents/apps/agents_and_robots"
|
dir_path: "projects/element_agents/apps/agents_and_robots"
|
||||||
repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/egutierrez/agents_and_robots.git"
|
repo_url: "https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/egutierrez/agents_and_robots.git"
|
||||||
service:
|
service:
|
||||||
port: null
|
port: 8487
|
||||||
health_endpoint: null
|
health_endpoint: /health
|
||||||
health_timeout_s: 5
|
health_timeout_s: 5
|
||||||
systemd_unit: agents_and_robots.service
|
systemd_unit: agents_and_robots.service
|
||||||
systemd_scope: system
|
systemd_scope: system
|
||||||
@@ -22,6 +22,21 @@ service:
|
|||||||
pc_targets:
|
pc_targets:
|
||||||
- organic-machine.com
|
- organic-machine.com
|
||||||
is_local_only: false
|
is_local_only: false
|
||||||
|
e2e_checks:
|
||||||
|
- id: build
|
||||||
|
cmd: "go build -tags goolm ./..."
|
||||||
|
timeout_s: 120
|
||||||
|
- id: tests
|
||||||
|
cmd: "go test -tags goolm -count=1 ./internal/api/... ./cmd/launcher/..."
|
||||||
|
timeout_s: 60
|
||||||
|
- id: smoke_health
|
||||||
|
cmd: "AGENTS_API_KEY=test-e2e-smoke ./bin/launcher --api-port 18487 &"
|
||||||
|
health: "http://127.0.0.1:18487/health"
|
||||||
|
timeout_s: 10
|
||||||
|
- id: smoke_auth
|
||||||
|
cmd: "curl -s -o /dev/null -w '%{http_code}' http://127.0.0.1:18487/agents"
|
||||||
|
expect_stdout_contains: "401"
|
||||||
|
timeout_s: 5
|
||||||
---
|
---
|
||||||
|
|
||||||
## Deploy
|
## Deploy
|
||||||
@@ -29,15 +44,56 @@ service:
|
|||||||
- **VPS:** organic-machine.com (SSH alias: organic-machine.com)
|
- **VPS:** organic-machine.com (SSH alias: organic-machine.com)
|
||||||
- **Remote path:** /home/ubuntu/CodeProyects/agents_and_robots
|
- **Remote path:** /home/ubuntu/CodeProyects/agents_and_robots
|
||||||
- **Build:** `go build -tags goolm -ldflags="-s -w" -o bin/launcher ./cmd/launcher`
|
- **Build:** `go build -tags goolm -ldflags="-s -w" -o bin/launcher ./cmd/launcher`
|
||||||
- **Run:** `./bin/launcher --log-level info`
|
- **Run (manual):** `./bin/launcher --log-level info --api-port 8487`
|
||||||
|
- **Run (systemd):** `systemctl start agents_and_robots.service` (unit en `systemd/agents_and_robots.service`)
|
||||||
- **Binarios:** launcher, agentctl, register, dashboard, verify (en bin/)
|
- **Binarios:** launcher, agentctl, register, dashboard, verify (en bin/)
|
||||||
- **Proceso:** launcher corre como proceso directo (no systemd)
|
|
||||||
- **Config:** agents/*/config.yaml + .env con tokens Matrix, API keys LLM, pickle keys E2EE
|
- **Config:** agents/*/config.yaml + .env con tokens Matrix, API keys LLM, pickle keys E2EE
|
||||||
|
|
||||||
|
## HTTP API (v0.1)
|
||||||
|
|
||||||
|
Puerto: **8487** (local, Traefik termina TLS en `agents.organic-machine.com`).
|
||||||
|
Auth: `Authorization: Bearer $AGENTS_API_KEY` (32 bytes hex en `.env`).
|
||||||
|
|
||||||
|
| Verbo | Path | Descripcion |
|
||||||
|
|---|---|---|
|
||||||
|
| GET | `/health` | Liveness (sin auth) |
|
||||||
|
| GET | `/agents` | Lista todos los agentes con estado |
|
||||||
|
| GET | `/agents/{id}` | Detalle + logs recientes |
|
||||||
|
| POST | `/agents/{id}/start` | Arrancar agente |
|
||||||
|
| POST | `/agents/{id}/stop` | Parar agente |
|
||||||
|
| POST | `/agents/{id}/restart` | Stop + Start |
|
||||||
|
| GET | `/agents/{id}/logs?n=200` | Snapshot de logs |
|
||||||
|
| GET | `/sse/status` | SSE: diffs de estado cada 2s |
|
||||||
|
| GET | `/sse/agents/{id}/logs` | SSE: tail -f del logfile |
|
||||||
|
|
||||||
|
**TODO v0.2:** POST `/agents/{id}/message`, PUT `/agents/{id}/config`, SSE messages stream.
|
||||||
|
**TODO escala:** si se anade un 2do cliente HTTP (otro VPS, otro frontend), reemplazar el pubsub in-memory con NATS o Redis.
|
||||||
|
|
||||||
|
## Systemd
|
||||||
|
|
||||||
|
Unit en `systemd/agents_and_robots.service`. Para instalar en el VPS:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# En el VPS (ubuntu@organic-machine.com)
|
||||||
|
sudo cp systemd/agents_and_robots.service /etc/systemd/system/
|
||||||
|
sudo systemctl daemon-reload
|
||||||
|
sudo systemctl enable --now agents_and_robots.service
|
||||||
|
```
|
||||||
|
|
||||||
|
Importante: usa `Restart=always` (no `on-failure`) para reiniciar incluso tras exit limpio.
|
||||||
|
|
||||||
|
## Traefik + DNS (post-merge, humano)
|
||||||
|
|
||||||
|
Pasos que el humano completa tras mergear el PR:
|
||||||
|
1. Crear DNS A record `agents.organic-machine.com` apuntando a la IP del VPS.
|
||||||
|
2. En Traefik (standalone o Coolify): ruta `agents.organic-machine.com → 127.0.0.1:8487` con HTTPS+LE.
|
||||||
|
3. Verificar: `curl -fsS https://agents.organic-machine.com/health`.
|
||||||
|
4. Generar API key: `openssl rand -hex 32` y escribir `AGENTS_API_KEY=<key>` en `.env` del VPS.
|
||||||
|
|
||||||
## Notas
|
## Notas
|
||||||
|
|
||||||
4 binarios principales:
|
4 binarios principales:
|
||||||
- **launcher** — Inicia agentes como goroutines, descubre configs, sync Matrix
|
- **launcher** — Inicia agentes como goroutines, descubre configs, sync Matrix, HTTP API
|
||||||
- **agentctl** — CLI: list, start, stop, remove agentes
|
- **agentctl** — CLI: list, start, stop, remove agentes
|
||||||
- **register** — Registra usuarios bot en Synapse via admin API
|
- **register** — Registra usuarios bot en Synapse via admin API
|
||||||
- **dashboard** — TUI interactiva (bubbletea) para gestion de agentes
|
- **dashboard** — TUI interactiva (bubbletea) para gestion de agentes
|
||||||
@@ -56,3 +112,4 @@ Una linea por bump SemVer. Bump-type segun `.claude/commands/version.md`:
|
|||||||
- `patch`: bugfix sin cambio observable.
|
- `patch`: bugfix sin cambio observable.
|
||||||
|
|
||||||
- v0.1.0 (2026-05-18) — baseline.
|
- v0.1.0 (2026-05-18) — baseline.
|
||||||
|
- v0.2.0 (2026-05-22) — HTTP API REST+SSE (internal/api), systemd unit, --api-port flag en launcher.
|
||||||
|
|||||||
+65
-6
@@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"github.com/enmanuel/agents/devagents"
|
"github.com/enmanuel/agents/devagents"
|
||||||
|
"github.com/enmanuel/agents/internal/api"
|
||||||
"github.com/enmanuel/agents/internal/config"
|
"github.com/enmanuel/agents/internal/config"
|
||||||
"github.com/enmanuel/agents/pkg/decision"
|
"github.com/enmanuel/agents/pkg/decision"
|
||||||
"github.com/enmanuel/agents/pkg/orchestration"
|
"github.com/enmanuel/agents/pkg/orchestration"
|
||||||
@@ -28,6 +29,7 @@ import (
|
|||||||
agentlog "github.com/enmanuel/agents/shell/logger"
|
agentlog "github.com/enmanuel/agents/shell/logger"
|
||||||
orchshell "github.com/enmanuel/agents/shell/orchestration"
|
orchshell "github.com/enmanuel/agents/shell/orchestration"
|
||||||
shellsecurity "github.com/enmanuel/agents/shell/security"
|
shellsecurity "github.com/enmanuel/agents/shell/security"
|
||||||
|
"github.com/enmanuel/agents/shell/process"
|
||||||
|
|
||||||
// Blank imports: each agent self-registers its rules via init().
|
// Blank imports: each agent self-registers its rules via init().
|
||||||
_ "github.com/enmanuel/agents/agents/assistant-bot"
|
_ "github.com/enmanuel/agents/agents/assistant-bot"
|
||||||
@@ -46,6 +48,8 @@ func main() {
|
|||||||
configPaths []string
|
configPaths []string
|
||||||
logLevel string
|
logLevel string
|
||||||
logDir string
|
logDir string
|
||||||
|
apiPort int
|
||||||
|
apiKey string
|
||||||
)
|
)
|
||||||
|
|
||||||
root := &cobra.Command{
|
root := &cobra.Command{
|
||||||
@@ -112,14 +116,18 @@ func main() {
|
|||||||
logger.Info("orchestrator initialized")
|
logger.Info("orchestrator initialized")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Process manager (shared: API reflection + per-agent goroutine hooks) ──
|
||||||
|
mgr := newProcessManager(logDir)
|
||||||
|
|
||||||
// ── Shared dependencies for agent registry ──
|
// ── Shared dependencies for agent registry ──
|
||||||
deps := &launchDeps{
|
deps := &launchDeps{
|
||||||
agentBus: agentBus,
|
agentBus: agentBus,
|
||||||
orch: orch,
|
orch: orch,
|
||||||
logDir: logDir,
|
logDir: logDir,
|
||||||
logLevel: lvl,
|
logLevel: lvl,
|
||||||
parentCtx: ctx,
|
parentCtx: ctx,
|
||||||
secPolicy: secPolicy,
|
secPolicy: secPolicy,
|
||||||
|
procMgr: mgr,
|
||||||
}
|
}
|
||||||
registry := newAgentRegistry(deps)
|
registry := newAgentRegistry(deps)
|
||||||
|
|
||||||
@@ -268,6 +276,29 @@ func main() {
|
|||||||
scanCancel()
|
scanCancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── HTTP API (optional) ──
|
||||||
|
if apiPort > 0 {
|
||||||
|
key := apiKey
|
||||||
|
if key == "" {
|
||||||
|
key = os.Getenv("AGENTS_API_KEY")
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
|
logger.Warn("api-port set but AGENTS_API_KEY is empty — HTTP API disabled (set AGENTS_API_KEY in .env)")
|
||||||
|
} else {
|
||||||
|
// mgr already created above; share it between API and registry.
|
||||||
|
ctrl := &agentController{reg: registry, mgr: mgr}
|
||||||
|
srv := api.New(mgr, key, apiPort, logger).
|
||||||
|
WithController(ctrl).
|
||||||
|
WithDataDir("agents")
|
||||||
|
go func() {
|
||||||
|
if err := srv.Run(ctx); err != nil {
|
||||||
|
logger.Error("api server stopped", "err", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
logger.Info("http api enabled", "port", apiPort)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Supervised loop: wait for all agents, and if the parent context is
|
// Supervised loop: wait for all agents, and if the parent context is
|
||||||
// still alive (i.e. no SIGINT/SIGTERM received), reload them and keep
|
// still alive (i.e. no SIGINT/SIGTERM received), reload them and keep
|
||||||
// going. Protects against the launcher exiting cleanly when all
|
// going. Protects against the launcher exiting cleanly when all
|
||||||
@@ -286,6 +317,10 @@ func main() {
|
|||||||
"Log level: debug | info | warn | error")
|
"Log level: debug | info | warn | error")
|
||||||
root.Flags().StringVar(&logDir, "log-dir", "logs",
|
root.Flags().StringVar(&logDir, "log-dir", "logs",
|
||||||
`Log directory (logs/<agent>/YYYY-MM-DD.jsonl). Use "stdout" for console only`)
|
`Log directory (logs/<agent>/YYYY-MM-DD.jsonl). Use "stdout" for console only`)
|
||||||
|
root.Flags().IntVar(&apiPort, "api-port", 0,
|
||||||
|
"HTTP API port (0 = disabled). Requires AGENTS_API_KEY env var.")
|
||||||
|
root.Flags().StringVar(&apiKey, "api-key", "",
|
||||||
|
"HTTP API Bearer key (overrides AGENTS_API_KEY env var)")
|
||||||
|
|
||||||
if err := root.Execute(); err != nil {
|
if err := root.Execute(); err != nil {
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
@@ -364,6 +399,30 @@ func newLogger(level string) *slog.Logger {
|
|||||||
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: parseLogLevel(level)}))
|
return slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: parseLogLevel(level)}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newProcessManager creates a process.Manager scoped to the current working
|
||||||
|
// directory, used by the HTTP API to reflect the live launcher state.
|
||||||
|
func newProcessManager(logDir string) *process.Manager {
|
||||||
|
return process.NewManager("run", "agents/*/config.yaml", "bin/launcher")
|
||||||
|
}
|
||||||
|
|
||||||
|
// agentController adapts agentRegistry + process.Manager to the api.AgentController
|
||||||
|
// interface, allowing the HTTP API to start/stop individual agent goroutines without
|
||||||
|
// restarting the whole launcher process.
|
||||||
|
type agentController struct {
|
||||||
|
reg *agentRegistry
|
||||||
|
mgr *process.Manager
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopUnifiedAgent cancels the per-agent goroutine context without stopping the launcher.
|
||||||
|
func (c *agentController) StopUnifiedAgent(id string) error {
|
||||||
|
return c.mgr.StopUnifiedAgent(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartUnifiedAgent re-launches the agent goroutine for the given ID.
|
||||||
|
func (c *agentController) StartUnifiedAgent(id string) error {
|
||||||
|
return c.reg.startAgent(id, rulesFor)
|
||||||
|
}
|
||||||
|
|
||||||
// isSpecialConfig checks whether a config path belongs to a middleware special
|
// isSpecialConfig checks whether a config path belongs to a middleware special
|
||||||
// (e.g. orchestrator) by detecting a "special:" top-level key with a non-empty
|
// (e.g. orchestrator) by detecting a "special:" top-level key with a non-empty
|
||||||
// id. This avoids config.Load() failing with "agent.id is required" when the
|
// id. This avoids config.Load() failing with "agent.id is required" when the
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -34,6 +35,15 @@ type launchDeps struct {
|
|||||||
logLevel slog.Level
|
logLevel slog.Level
|
||||||
parentCtx context.Context
|
parentCtx context.Context
|
||||||
secPolicy pksecurity.SecurityPolicy // centralized security policy loaded from security/
|
secPolicy pksecurity.SecurityPolicy // centralized security policy loaded from security/
|
||||||
|
procMgr procManagerHook // optional: per-agent goroutine registration for API
|
||||||
|
}
|
||||||
|
|
||||||
|
// procManagerHook allows the registry to register/unregister per-agent goroutine
|
||||||
|
// contexts with the process.Manager so the API can reflect and control individual
|
||||||
|
// agent goroutines in unified mode.
|
||||||
|
type procManagerHook interface {
|
||||||
|
RegisterUnifiedAgent(id string, cancel context.CancelFunc)
|
||||||
|
UnregisterUnifiedAgent(id string)
|
||||||
}
|
}
|
||||||
|
|
||||||
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
|
// agentRegistry tracks all running agents by ID, enabling individual hot-reload.
|
||||||
@@ -61,10 +71,33 @@ func (r *agentRegistry) register(ra *runningAgent) {
|
|||||||
runtimeType = "agent"
|
runtimeType = "agent"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.launchGoroutine(ra, runtimeType)
|
||||||
|
}
|
||||||
|
|
||||||
|
// launchGoroutine starts a runner goroutine, registering its cancel context with
|
||||||
|
// the process manager hook when available for per-agent stop/start control.
|
||||||
|
func (r *agentRegistry) launchGoroutine(ra *runningAgent, runtimeType string) {
|
||||||
|
agentID := ra.cfg.Agent.ID
|
||||||
go func() {
|
go func() {
|
||||||
|
// Create a per-agent context derived from parent so we can cancel just
|
||||||
|
// this goroutine without stopping the launcher or other agents.
|
||||||
|
agentCtx, cancel := context.WithCancel(r.deps.parentCtx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// Register with process manager for API control (unified mode).
|
||||||
|
if r.deps.procMgr != nil {
|
||||||
|
r.deps.procMgr.RegisterUnifiedAgent(agentID, cancel)
|
||||||
|
defer r.deps.procMgr.UnregisterUnifiedAgent(agentID)
|
||||||
|
}
|
||||||
|
|
||||||
ra.logger.Info("runner started", "type", runtimeType)
|
ra.logger.Info("runner started", "type", runtimeType)
|
||||||
if err := ra.runner.Run(r.deps.parentCtx); err != nil {
|
if err := ra.runner.Run(agentCtx); err != nil {
|
||||||
ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType)
|
if agentCtx.Err() == nil {
|
||||||
|
// Not cancelled externally — log as real error
|
||||||
|
ra.logger.Error("runner stopped with error", "err", err, "type", runtimeType)
|
||||||
|
} else {
|
||||||
|
ra.logger.Info("runner stopped (context cancelled)", "type", runtimeType)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@@ -90,6 +123,21 @@ func (r *agentRegistry) stopAndWait(id string) {
|
|||||||
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
|
r.deps.agentBus.Unsubscribe(bus.AgentID(id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startAgent re-launches a stopped (but registered) agent by calling reload.
|
||||||
|
// Used by the API StartUnifiedAgent flow.
|
||||||
|
// Returns error if agent is not found in the registry.
|
||||||
|
func (r *agentRegistry) startAgent(id string, rulesFor func(string, *slog.Logger) []decision.Rule) error {
|
||||||
|
r.mu.Lock()
|
||||||
|
_, exists := r.agents[id]
|
||||||
|
r.mu.Unlock()
|
||||||
|
if !exists {
|
||||||
|
return fmt.Errorf("agent %q not found in registry", id)
|
||||||
|
}
|
||||||
|
// reload re-reads config and restarts the runner
|
||||||
|
r.reload(id, rulesFor)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// reload stops an agent, re-reads its config, recreates it, and restarts it.
|
// reload stops an agent, re-reads its config, recreates it, and restarts it.
|
||||||
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
|
func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []decision.Rule) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
@@ -192,12 +240,7 @@ func (r *agentRegistry) reload(id string, rulesFor func(string, *slog.Logger) []
|
|||||||
if runtimeType == "" {
|
if runtimeType == "" {
|
||||||
runtimeType = "agent"
|
runtimeType = "agent"
|
||||||
}
|
}
|
||||||
go func() {
|
r.launchGoroutine(newRA, runtimeType)
|
||||||
newLogger.Info("runner started", "type", runtimeType)
|
|
||||||
if err := newRunner.Run(r.deps.parentCtx); err != nil {
|
|
||||||
newLogger.Error("runner stopped with error", "err", err, "type", runtimeType)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
newLogger.Info("runner_reloaded", "id", id, "type", runtimeType)
|
newLogger.Info("runner_reloaded", "id", id, "type", runtimeType)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,550 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/shell/process"
|
||||||
|
_ "modernc.org/sqlite" // pure-Go SQLite driver (same as launcher)
|
||||||
|
)
|
||||||
|
|
||||||
|
// --- Response types ---
|
||||||
|
|
||||||
|
// AgentResponse is the JSON representation of an agent.
|
||||||
|
type AgentResponse struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Name string `json:"name"`
|
||||||
|
Version string `json:"version"`
|
||||||
|
Desc string `json:"desc"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
Running bool `json:"running"`
|
||||||
|
PID int `json:"pid,omitempty"`
|
||||||
|
Instances int `json:"instances"`
|
||||||
|
ConfigPath string `json:"config_path"`
|
||||||
|
UptimeSeconds int64 `json:"uptime_seconds"`
|
||||||
|
Messages24h int `json:"messages_24h"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentDetailResponse extends AgentResponse with logs.
|
||||||
|
type AgentDetailResponse struct {
|
||||||
|
AgentResponse
|
||||||
|
Logs []string `json:"logs"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// msg24hCache caches messages_24h counts per agent to avoid hammering SQLite.
|
||||||
|
type msg24hEntry struct {
|
||||||
|
count int
|
||||||
|
fetchAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
msg24hMu sync.Mutex
|
||||||
|
msg24hCache = make(map[string]msg24hEntry)
|
||||||
|
msg24hTTL = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func agentResponse(s process.AgentStatus) AgentResponse {
|
||||||
|
return AgentResponse{
|
||||||
|
ID: s.ID,
|
||||||
|
Name: s.Name,
|
||||||
|
Version: s.Version,
|
||||||
|
Desc: s.Desc,
|
||||||
|
Enabled: s.Enabled,
|
||||||
|
Running: s.Running,
|
||||||
|
PID: s.PID,
|
||||||
|
Instances: s.Instances,
|
||||||
|
ConfigPath: s.ConfigPath,
|
||||||
|
UptimeSeconds: s.UptimeSeconds,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// queryMessages24h returns the count of messages in the past 24h for the given agent.
|
||||||
|
// Uses a 30s cache keyed by agentID. dataDir is the base data directory
|
||||||
|
// (e.g. "agents/<id>/data"). Returns 0 on error (non-fatal).
|
||||||
|
func queryMessages24h(agentID, dataDir string) int {
|
||||||
|
msg24hMu.Lock()
|
||||||
|
if e, ok := msg24hCache[agentID]; ok && time.Since(e.fetchAt) < msg24hTTL {
|
||||||
|
msg24hMu.Unlock()
|
||||||
|
return e.count
|
||||||
|
}
|
||||||
|
msg24hMu.Unlock()
|
||||||
|
|
||||||
|
dbPath := filepath.Join(dataDir, "memory.db")
|
||||||
|
if _, err := os.Stat(dbPath); err != nil {
|
||||||
|
return 0 // DB does not exist yet
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := sql.Open("sqlite", dbPath+"?mode=ro&_query_only=1")
|
||||||
|
if err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
var count int
|
||||||
|
row := db.QueryRow(
|
||||||
|
"SELECT COUNT(*) FROM messages WHERE agent_id=? AND created_at > datetime('now','-24 hours')",
|
||||||
|
agentID,
|
||||||
|
)
|
||||||
|
if err := row.Scan(&count); err != nil {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
msg24hMu.Lock()
|
||||||
|
msg24hCache[agentID] = msg24hEntry{count: count, fetchAt: time.Now()}
|
||||||
|
msg24hMu.Unlock()
|
||||||
|
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Health ---
|
||||||
|
|
||||||
|
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "ok", "time": time.Now().UTC().Format(time.RFC3339)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// statusAllAuto chooses unified vs multi-process status based on runtime mode.
|
||||||
|
// In unified mode all agents run as goroutines under one launcher process — per-agent
|
||||||
|
// PID files do not exist, so StatusAll reports Running=false. StatusAllUnified
|
||||||
|
// reflects the real state.
|
||||||
|
func (s *Server) statusAllAuto() ([]process.AgentStatus, error) {
|
||||||
|
if s.mgr.IsUnifiedRunning() {
|
||||||
|
return s.mgr.StatusAllUnified()
|
||||||
|
}
|
||||||
|
return s.mgr.StatusAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- List agents ---
|
||||||
|
|
||||||
|
func (s *Server) handleListAgents(w http.ResponseWriter, r *http.Request) {
|
||||||
|
statuses, err := s.statusAllAuto()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp := make([]AgentResponse, 0, len(statuses))
|
||||||
|
for _, st := range statuses {
|
||||||
|
ar := agentResponse(st)
|
||||||
|
// Enrich with messages_24h when dataDir is configured
|
||||||
|
if s.dataDir != "" {
|
||||||
|
agentDataDir := filepath.Join(s.dataDir, st.ID, "data")
|
||||||
|
ar.Messages24h = queryMessages24h(st.ID, agentDataDir)
|
||||||
|
}
|
||||||
|
resp = append(resp, ar)
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Get single agent ---
|
||||||
|
|
||||||
|
func (s *Server) handleGetAgent(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
statuses, err := s.statusAllAuto()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var found *process.AgentStatus
|
||||||
|
for i, st := range statuses {
|
||||||
|
if st.ID == id {
|
||||||
|
found = &statuses[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if found == nil {
|
||||||
|
writeError(w, http.StatusNotFound, "agent not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n := 200
|
||||||
|
if qn := r.URL.Query().Get("n"); qn != "" {
|
||||||
|
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
|
||||||
|
n = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logs, _ := s.mgr.LogTail(id, n)
|
||||||
|
writeJSON(w, http.StatusOK, AgentDetailResponse{
|
||||||
|
AgentResponse: agentResponse(*found),
|
||||||
|
Logs: logs,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Start agent ---
|
||||||
|
|
||||||
|
func (s *Server) handleStartAgent(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
// Unified mode: delegate to AgentController if available
|
||||||
|
if s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
if err := s.controller.StartUnifiedAgent(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("start (unified): %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent started via api (unified)", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id, "mode": "unified"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multi-process mode: use per-agent process launch
|
||||||
|
agents, err := s.mgr.Scan()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var info *process.AgentInfo
|
||||||
|
for i, a := range agents {
|
||||||
|
if a.ID == id {
|
||||||
|
info = &agents[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if info == nil {
|
||||||
|
writeError(w, http.StatusNotFound, "agent not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.mgr.Start(*info); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("start: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent started via api", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "started", "id": id})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Stop agent ---
|
||||||
|
|
||||||
|
func (s *Server) handleStopAgent(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
// Unified mode: cancel goroutine context without killing launcher
|
||||||
|
if s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
if err := s.controller.StopUnifiedAgent(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("stop (unified): %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent stopped via api (unified)", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id, "mode": "unified"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multi-process mode
|
||||||
|
if err := s.mgr.Stop(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("stop: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent stopped via api", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "stopped", "id": id})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Restart agent ---
|
||||||
|
|
||||||
|
func (s *Server) handleRestartAgent(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
// Unified mode: stop goroutine then re-launch
|
||||||
|
if s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
// Stop (ignore not-running error)
|
||||||
|
_ = s.controller.StopUnifiedAgent(id)
|
||||||
|
|
||||||
|
// Brief pause to let goroutine exit cleanly
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
if err := s.controller.StartUnifiedAgent(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("restart/start (unified): %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent restarted via api (unified)", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id, "mode": "unified"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Multi-process mode
|
||||||
|
// Stop first (ignore not-running error)
|
||||||
|
_ = s.mgr.Stop(id)
|
||||||
|
|
||||||
|
// Wait up to 3s for process to die
|
||||||
|
deadline := time.Now().Add(3 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if !s.mgr.IsRunning(id) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find agent info for Start
|
||||||
|
agents, err := s.mgr.Scan()
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("scan: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var info *process.AgentInfo
|
||||||
|
for i, a := range agents {
|
||||||
|
if a.ID == id {
|
||||||
|
info = &agents[i]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if info == nil {
|
||||||
|
writeError(w, http.StatusNotFound, "agent not found")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.mgr.Start(*info); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("restart/start: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.logger.Info("agent restarted via api", "id", id)
|
||||||
|
writeJSON(w, http.StatusOK, map[string]string{"status": "restarted", "id": id})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Agent logs snapshot ---
|
||||||
|
|
||||||
|
func (s *Server) handleAgentLogs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
n := 200
|
||||||
|
if qn := r.URL.Query().Get("n"); qn != "" {
|
||||||
|
if parsed, err := strconv.Atoi(qn); err == nil && parsed > 0 {
|
||||||
|
n = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logs, err := s.mgr.LogTail(id, n)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusNotFound, fmt.Sprintf("logs: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{"id": id, "lines": logs, "count": len(logs)})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- SSE: status broadcast ---
|
||||||
|
|
||||||
|
func (s *Server) handleSSEStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
w.Header().Set("X-Accel-Buffering", "no")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
// Initial ping: SSE clients consider the stream "connected" only after
|
||||||
|
// receiving the first byte of body. Without this, agents_dashboard sits
|
||||||
|
// on "connecting" until the first status diff (which can be minutes away).
|
||||||
|
fmt.Fprint(w, ": ping\n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
sub := s.bus.Subscribe("status")
|
||||||
|
defer s.bus.Unsubscribe("status", sub)
|
||||||
|
|
||||||
|
ticker := time.NewTicker(15 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
ctx := r.Context()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
// Periodic heartbeat: keeps proxies (Traefik, CDN) from closing
|
||||||
|
// the idle connection and lets the client detect dead servers.
|
||||||
|
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
|
case ev, ok := <-sub:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
data, _ := json.Marshal(ev)
|
||||||
|
fmt.Fprintf(w, "event: status\ndata: %s\n\n", data)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Clear memory ---
|
||||||
|
|
||||||
|
func (s *Server) handleClearMemory(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
// Determine whether restart after clear is requested.
|
||||||
|
restart := r.URL.Query().Get("restart") == "true"
|
||||||
|
|
||||||
|
// In unified mode, stop the agent goroutine before touching its DB.
|
||||||
|
wasRunning := false
|
||||||
|
if s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
wasRunning = s.mgr.IsUnifiedAgentRunning(id)
|
||||||
|
if wasRunning {
|
||||||
|
if err := s.controller.StopUnifiedAgent(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("clear_memory/stop: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Give goroutine a moment to release the DB.
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Locate the agent's memory.db.
|
||||||
|
if s.dataDir == "" {
|
||||||
|
writeError(w, http.StatusInternalServerError, "data_dir not configured on server")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dbPath := filepath.Join(s.dataDir, id, "data", "memory.db")
|
||||||
|
if _, err := os.Stat(dbPath); err != nil {
|
||||||
|
// No memory.db — still a success (nothing to clear).
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "cleared",
|
||||||
|
"messages_deleted": 0,
|
||||||
|
"facts_deleted": 0,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := sql.Open("sqlite", dbPath)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("open memory.db: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
var msgDel, factsDel int64
|
||||||
|
|
||||||
|
res, err := db.ExecContext(r.Context(), "DELETE FROM messages WHERE agent_id=?", id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete messages: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
msgDel, _ = res.RowsAffected()
|
||||||
|
|
||||||
|
res, err = db.ExecContext(r.Context(), "DELETE FROM facts WHERE agent_id=?", id)
|
||||||
|
if err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("delete facts: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
factsDel, _ = res.RowsAffected()
|
||||||
|
|
||||||
|
// Invalidate the 24h cache entry for this agent.
|
||||||
|
msg24hMu.Lock()
|
||||||
|
delete(msg24hCache, id)
|
||||||
|
msg24hMu.Unlock()
|
||||||
|
|
||||||
|
s.logger.Info("agent memory cleared via api", "id", id,
|
||||||
|
"messages_deleted", msgDel, "facts_deleted", factsDel)
|
||||||
|
|
||||||
|
// Optionally restart.
|
||||||
|
if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
_ = s.controller.StartUnifiedAgent(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "cleared",
|
||||||
|
"messages_deleted": msgDel,
|
||||||
|
"facts_deleted": factsDel,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Delete cache ---
|
||||||
|
|
||||||
|
func (s *Server) handleDeleteCache(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
restart := r.URL.Query().Get("restart") == "true"
|
||||||
|
|
||||||
|
// Stop in unified mode before removing crypto dir.
|
||||||
|
wasRunning := false
|
||||||
|
if s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
wasRunning = s.mgr.IsUnifiedAgentRunning(id)
|
||||||
|
if wasRunning {
|
||||||
|
if err := s.controller.StopUnifiedAgent(id); err != nil {
|
||||||
|
writeError(w, http.StatusConflict, fmt.Sprintf("delete_cache/stop: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.dataDir == "" {
|
||||||
|
writeError(w, http.StatusInternalServerError, "data_dir not configured on server")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
agentDataDir := filepath.Join(s.dataDir, id, "data")
|
||||||
|
var deleted []string
|
||||||
|
|
||||||
|
// Remove crypto directory (session keys, verification cache).
|
||||||
|
cryptoDir := filepath.Join(agentDataDir, "crypto")
|
||||||
|
if _, err := os.Stat(cryptoDir); err == nil {
|
||||||
|
if err := os.RemoveAll(cryptoDir); err != nil {
|
||||||
|
writeError(w, http.StatusInternalServerError, fmt.Sprintf("remove crypto: %v", err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
deleted = append(deleted, cryptoDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove cache directory contents (but keep the dir itself).
|
||||||
|
cacheDir := filepath.Join(agentDataDir, "cache")
|
||||||
|
if entries, err := os.ReadDir(cacheDir); err == nil {
|
||||||
|
for _, e := range entries {
|
||||||
|
p := filepath.Join(cacheDir, e.Name())
|
||||||
|
if err := os.RemoveAll(p); err == nil {
|
||||||
|
deleted = append(deleted, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info("agent cache deleted via api", "id", id, "paths", len(deleted))
|
||||||
|
|
||||||
|
// Optionally restart.
|
||||||
|
if (restart || wasRunning) && s.mgr.IsUnifiedRunning() && s.controller != nil {
|
||||||
|
_ = s.controller.StartUnifiedAgent(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
writeJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"status": "cleared",
|
||||||
|
"paths_deleted": deleted,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- SSE: agent log tail ---
|
||||||
|
|
||||||
|
func (s *Server) handleSSEAgentLogs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
id := r.PathValue("id")
|
||||||
|
|
||||||
|
logPath := s.mgr.LogPath(id)
|
||||||
|
if logPath == "" {
|
||||||
|
http.Error(w, "agent not found", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
if !ok {
|
||||||
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
w.Header().Set("X-Accel-Buffering", "no")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
// Initial ping unblocks client fgets so the UI flips from "connecting"
|
||||||
|
// to "connected" immediately (logfile may be silent for a while).
|
||||||
|
fmt.Fprint(w, ": ping\n\n")
|
||||||
|
flusher.Flush()
|
||||||
|
|
||||||
|
ctx := r.Context()
|
||||||
|
tailLogFile(ctx, logPath, w, flusher)
|
||||||
|
}
|
||||||
@@ -0,0 +1,91 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/shell/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StatusDiff is published to the "status" topic when an agent's running state changes.
|
||||||
|
type StatusDiff struct {
|
||||||
|
AgentID string `json:"agent_id"`
|
||||||
|
OldStatus bool `json:"old_running"`
|
||||||
|
NewStatus bool `json:"new_running"`
|
||||||
|
PID int `json:"pid,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// pollStatus polls StatusAll every 2s and publishes StatusDiff events on changes.
|
||||||
|
func (s *Server) pollStatus(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(2 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Seed the previous state map.
|
||||||
|
prev := make(map[string]bool)
|
||||||
|
if statuses, err := s.statusAllAuto(); err == nil {
|
||||||
|
for _, st := range statuses {
|
||||||
|
prev[st.ID] = st.Running
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
s.checkAndPublishDiffs(prev)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) checkAndPublishDiffs(prev map[string]bool) {
|
||||||
|
statuses, err := s.statusAllAuto()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, st := range statuses {
|
||||||
|
old, known := prev[st.ID]
|
||||||
|
if !known || old != st.Running {
|
||||||
|
s.bus.Publish("status", StatusDiff{
|
||||||
|
AgentID: st.ID,
|
||||||
|
OldStatus: old,
|
||||||
|
NewStatus: st.Running,
|
||||||
|
PID: st.PID,
|
||||||
|
})
|
||||||
|
prev[st.ID] = st.Running
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle agents that were removed (disappeared from scan)
|
||||||
|
current := make(map[string]bool, len(statuses))
|
||||||
|
for _, st := range statuses {
|
||||||
|
current[st.ID] = true
|
||||||
|
}
|
||||||
|
for id, wasRunning := range prev {
|
||||||
|
if !current[id] {
|
||||||
|
if wasRunning {
|
||||||
|
s.bus.Publish("status", StatusDiff{
|
||||||
|
AgentID: id,
|
||||||
|
OldStatus: true,
|
||||||
|
NewStatus: false,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
delete(prev, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// agentInfoByID finds AgentInfo by ID in a StatusAll scan.
|
||||||
|
func agentInfoByID(mgr *process.Manager, id string) (*process.AgentInfo, error) {
|
||||||
|
agents, err := mgr.Scan()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for i, a := range agents {
|
||||||
|
if a.ID == id {
|
||||||
|
return &agents[i], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
// Package api — in-memory pub/sub bus for SSE broadcast.
|
||||||
|
//
|
||||||
|
// TODO(v0.2): if a second consumer (e.g. from another VPS) is added,
|
||||||
|
// replace this in-memory bus with NATS or Redis pub/sub. For now
|
||||||
|
// (1 local client) the overhead of an external broker is unwarranted.
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Event is a generic event payload (JSON-serialisable).
|
||||||
|
type Event = any
|
||||||
|
|
||||||
|
// Bus is a simple in-memory pub/sub hub.
|
||||||
|
// Topics are arbitrary strings (e.g. "status", "logs/agent-id").
|
||||||
|
type Bus struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
subs map[string][]chan Event
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBus creates an initialised Bus.
|
||||||
|
func NewBus() *Bus {
|
||||||
|
return &Bus{subs: make(map[string][]chan Event)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe returns a channel that receives events published to topic.
|
||||||
|
// The channel is buffered (32) to avoid blocking the publisher.
|
||||||
|
func (b *Bus) Subscribe(topic string) <-chan Event {
|
||||||
|
ch := make(chan Event, 32)
|
||||||
|
b.mu.Lock()
|
||||||
|
b.subs[topic] = append(b.subs[topic], ch)
|
||||||
|
b.mu.Unlock()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unsubscribe removes ch from topic and closes it.
|
||||||
|
func (b *Bus) Unsubscribe(topic string, ch <-chan Event) {
|
||||||
|
b.mu.Lock()
|
||||||
|
defer b.mu.Unlock()
|
||||||
|
list := b.subs[topic]
|
||||||
|
for i, c := range list {
|
||||||
|
if c == ch {
|
||||||
|
close(c)
|
||||||
|
b.subs[topic] = append(list[:i], list[i+1:]...)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish sends ev to all subscribers of topic.
|
||||||
|
// Non-blocking: if a subscriber channel is full, the event is dropped for that subscriber.
|
||||||
|
func (b *Bus) Publish(topic string, ev Event) {
|
||||||
|
b.mu.RLock()
|
||||||
|
list := b.subs[topic]
|
||||||
|
b.mu.RUnlock()
|
||||||
|
for _, ch := range list {
|
||||||
|
select {
|
||||||
|
case ch <- ev:
|
||||||
|
default:
|
||||||
|
// drop for this slow subscriber
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,199 @@
|
|||||||
|
// Package api provides the HTTP API server for agents_and_robots.
|
||||||
|
// It exposes REST endpoints for agent management and SSE streams for
|
||||||
|
// real-time status and log updates.
|
||||||
|
//
|
||||||
|
// Auth: every endpoint (except /health) requires:
|
||||||
|
//
|
||||||
|
// Authorization: Bearer <AGENTS_API_KEY>
|
||||||
|
//
|
||||||
|
// with crypto/subtle constant-time comparison.
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/subtle"
|
||||||
|
"encoding/json"
|
||||||
|
"log/slog"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/shell/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AgentController is an optional interface for per-agent unified-mode control.
|
||||||
|
// The launcher can implement this to allow the API to stop/start individual
|
||||||
|
// agent goroutines without restarting the whole process.
|
||||||
|
type AgentController interface {
|
||||||
|
// StopUnifiedAgent cancels the goroutine context for the agent with the given ID.
|
||||||
|
// Returns an error if the agent is not currently running in unified mode.
|
||||||
|
StopUnifiedAgent(id string) error
|
||||||
|
// StartUnifiedAgent re-launches the agent goroutine for the given ID.
|
||||||
|
// Returns an error if the agent is not registered.
|
||||||
|
StartUnifiedAgent(id string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Server is the HTTP API server.
|
||||||
|
type Server struct {
|
||||||
|
mgr *process.Manager
|
||||||
|
apiKey string
|
||||||
|
port int
|
||||||
|
logger *slog.Logger
|
||||||
|
bus *Bus
|
||||||
|
controller AgentController // optional: per-agent unified control (nil = not available)
|
||||||
|
// dataDir is the base directory for agent runtime data used for memory/cache queries.
|
||||||
|
dataDir string
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new Server. apiKey is compared with subtle.ConstantTimeCompare.
|
||||||
|
// If apiKey is empty the server refuses to start.
|
||||||
|
func New(mgr *process.Manager, apiKey string, port int, logger *slog.Logger) *Server {
|
||||||
|
if logger == nil {
|
||||||
|
logger = slog.Default()
|
||||||
|
}
|
||||||
|
return &Server{
|
||||||
|
mgr: mgr,
|
||||||
|
apiKey: apiKey,
|
||||||
|
port: port,
|
||||||
|
logger: logger.With("component", "api"),
|
||||||
|
bus: NewBus(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithController attaches an AgentController for unified-mode per-agent control.
|
||||||
|
func (s *Server) WithController(c AgentController) *Server {
|
||||||
|
s.controller = c
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithDataDir sets the base directory for agent runtime data (memory.db, crypto/).
|
||||||
|
func (s *Server) WithDataDir(dir string) *Server {
|
||||||
|
s.dataDir = dir
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the HTTP server and blocks until ctx is done.
|
||||||
|
// It also starts the status-diff poller that feeds /sse/status.
|
||||||
|
func (s *Server) Run(ctx context.Context) error {
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
|
||||||
|
// Public endpoints
|
||||||
|
mux.HandleFunc("GET /health", s.handleHealth)
|
||||||
|
|
||||||
|
// Auth-gated REST endpoints
|
||||||
|
mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents)))
|
||||||
|
mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent)))
|
||||||
|
mux.Handle("POST /agents/{id}/start", s.auth(http.HandlerFunc(s.handleStartAgent)))
|
||||||
|
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
|
||||||
|
mux.Handle("POST /agents/{id}/restart", s.auth(http.HandlerFunc(s.handleRestartAgent)))
|
||||||
|
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
|
||||||
|
mux.Handle("POST /agents/{id}/clear_memory", s.auth(http.HandlerFunc(s.handleClearMemory)))
|
||||||
|
mux.Handle("POST /agents/{id}/delete_cache", s.auth(http.HandlerFunc(s.handleDeleteCache)))
|
||||||
|
|
||||||
|
// SSE endpoints
|
||||||
|
mux.Handle("GET /sse/status", s.auth(http.HandlerFunc(s.handleSSEStatus)))
|
||||||
|
mux.Handle("GET /sse/agents/{id}/logs", s.auth(http.HandlerFunc(s.handleSSEAgentLogs)))
|
||||||
|
|
||||||
|
addr := ":" + strconv.Itoa(s.port)
|
||||||
|
ln, err := net.Listen("tcp", addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
srv := &http.Server{
|
||||||
|
Handler: s.logMiddleware(mux),
|
||||||
|
ReadTimeout: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.logger.Info("api server listening", "addr", addr)
|
||||||
|
|
||||||
|
// Start the status poller
|
||||||
|
go s.pollStatus(ctx)
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() { errCh <- srv.Serve(ln) }()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
shutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
return srv.Shutdown(shutCtx)
|
||||||
|
case err := <-errCh:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Auth middleware ---
|
||||||
|
|
||||||
|
func (s *Server) auth(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
key := extractBearerToken(r)
|
||||||
|
expected := []byte(s.apiKey)
|
||||||
|
got := []byte(key)
|
||||||
|
|
||||||
|
// Ensure equal-length comparison to avoid timing side-channel.
|
||||||
|
// subtle.ConstantTimeCompare returns 0 if lengths differ too.
|
||||||
|
if subtle.ConstantTimeCompare(got, expected) != 1 {
|
||||||
|
writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "unauthorized"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func extractBearerToken(r *http.Request) string {
|
||||||
|
h := r.Header.Get("Authorization")
|
||||||
|
if len(h) > 7 && h[:7] == "Bearer " {
|
||||||
|
return h[7:]
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Log middleware ---
|
||||||
|
|
||||||
|
func (s *Server) logMiddleware(next http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
start := time.Now()
|
||||||
|
rw := &statusWriter{ResponseWriter: w, code: http.StatusOK}
|
||||||
|
next.ServeHTTP(rw, r)
|
||||||
|
s.logger.Info("http",
|
||||||
|
"method", r.Method,
|
||||||
|
"path", r.URL.Path,
|
||||||
|
"status", rw.code,
|
||||||
|
"duration_ms", time.Since(start).Milliseconds(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type statusWriter struct {
|
||||||
|
http.ResponseWriter
|
||||||
|
code int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sw *statusWriter) WriteHeader(code int) {
|
||||||
|
sw.code = code
|
||||||
|
sw.ResponseWriter.WriteHeader(code)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush forwards to the underlying ResponseWriter when it implements Flusher.
|
||||||
|
// Without this method, the type assertion `w.(http.Flusher)` in the SSE handlers
|
||||||
|
// fails (the wrapper hides the inner Flusher), and the handler aborts with
|
||||||
|
// "streaming unsupported".
|
||||||
|
func (sw *statusWriter) Flush() {
|
||||||
|
if f, ok := sw.ResponseWriter.(http.Flusher); ok {
|
||||||
|
f.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Helpers ---
|
||||||
|
|
||||||
|
func writeJSON(w http.ResponseWriter, status int, v any) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(status)
|
||||||
|
_ = json.NewEncoder(w).Encode(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func writeError(w http.ResponseWriter, status int, msg string) {
|
||||||
|
writeJSON(w, status, map[string]string{"error": msg})
|
||||||
|
}
|
||||||
@@ -0,0 +1,253 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/enmanuel/agents/shell/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newTestServer creates a Server with a real (temp-dir) Manager and a test API key.
|
||||||
|
func newTestServer(t *testing.T) *Server {
|
||||||
|
t.Helper()
|
||||||
|
dir := t.TempDir()
|
||||||
|
mgr := process.NewManager(dir+"/run", dir+"/agents/*/config.yaml", "")
|
||||||
|
return New(mgr, "test-key-abcd1234", 0, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Auth tests ---
|
||||||
|
|
||||||
|
func TestAuthMissingHeader(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
t.Error("handler called despite missing auth")
|
||||||
|
})).ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusUnauthorized {
|
||||||
|
t.Fatalf("expected 401, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAuthWrongKey(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer wrong-key")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
t.Error("handler called despite wrong key")
|
||||||
|
})).ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusUnauthorized {
|
||||||
|
t.Fatalf("expected 401, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAuthCorrectKey(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
|
||||||
|
called := false
|
||||||
|
s.auth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
called = true
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
})).ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if !called {
|
||||||
|
t.Fatal("handler not called with valid key")
|
||||||
|
}
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Health endpoint ---
|
||||||
|
|
||||||
|
func TestHealthEndpoint(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.HandleFunc("GET /health", s.handleHealth)
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/health", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
mux.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d", w.Code)
|
||||||
|
}
|
||||||
|
var resp map[string]string
|
||||||
|
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||||
|
t.Fatalf("decode: %v", err)
|
||||||
|
}
|
||||||
|
if resp["status"] != "ok" {
|
||||||
|
t.Errorf("expected status=ok, got %q", resp["status"])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- List agents ---
|
||||||
|
|
||||||
|
func TestListAgentsEmpty(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("GET /agents", s.auth(http.HandlerFunc(s.handleListAgents)))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
mux.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusOK {
|
||||||
|
t.Fatalf("expected 200, got %d", w.Code)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, _ := io.ReadAll(w.Body)
|
||||||
|
// With empty agents dir, should return empty JSON array.
|
||||||
|
trimmed := strings.TrimSpace(string(body))
|
||||||
|
if trimmed != "null" && trimmed != "[]" {
|
||||||
|
// Accept both null and [] for empty slice serialisation.
|
||||||
|
t.Logf("body: %s (acceptable)", trimmed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Bus tests ---
|
||||||
|
|
||||||
|
func TestBusSubscribePublish(t *testing.T) {
|
||||||
|
b := NewBus()
|
||||||
|
ch := b.Subscribe("test")
|
||||||
|
defer b.Unsubscribe("test", ch)
|
||||||
|
|
||||||
|
b.Publish("test", "hello")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ev := <-ch:
|
||||||
|
if ev != "hello" {
|
||||||
|
t.Fatalf("expected 'hello', got %v", ev)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("timeout waiting for event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBusUnsubscribe(t *testing.T) {
|
||||||
|
b := NewBus()
|
||||||
|
ch := b.Subscribe("test")
|
||||||
|
b.Unsubscribe("test", ch)
|
||||||
|
|
||||||
|
// After unsubscribe, channel should be closed.
|
||||||
|
select {
|
||||||
|
case _, ok := <-ch:
|
||||||
|
if ok {
|
||||||
|
t.Fatal("channel should be closed after unsubscribe")
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatal("channel not closed after unsubscribe")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBusMultipleSubscribers(t *testing.T) {
|
||||||
|
b := NewBus()
|
||||||
|
ch1 := b.Subscribe("x")
|
||||||
|
ch2 := b.Subscribe("x")
|
||||||
|
defer b.Unsubscribe("x", ch1)
|
||||||
|
defer b.Unsubscribe("x", ch2)
|
||||||
|
|
||||||
|
b.Publish("x", 42)
|
||||||
|
|
||||||
|
for _, ch := range []<-chan Event{ch1, ch2} {
|
||||||
|
select {
|
||||||
|
case ev := <-ch:
|
||||||
|
if ev != 42 {
|
||||||
|
t.Fatalf("expected 42, got %v", ev)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("timeout")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Get agent not found ---
|
||||||
|
|
||||||
|
func TestGetAgentNotFound(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("GET /agents/{id}", s.auth(http.HandlerFunc(s.handleGetAgent)))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
mux.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("expected 404, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Stop agent not running ---
|
||||||
|
|
||||||
|
func TestStopAgentNotRunning(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("POST /agents/{id}/stop", s.auth(http.HandlerFunc(s.handleStopAgent)))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/agents/ghost/stop", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
mux.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
// stop returns Conflict when the agent is not running
|
||||||
|
if w.Code != http.StatusConflict {
|
||||||
|
t.Fatalf("expected 409, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Logs endpoint ---
|
||||||
|
|
||||||
|
func TestLogsEndpointNotFound(t *testing.T) {
|
||||||
|
s := newTestServer(t)
|
||||||
|
mux := http.NewServeMux()
|
||||||
|
mux.Handle("GET /agents/{id}/logs", s.auth(http.HandlerFunc(s.handleAgentLogs)))
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/agents/nonexistent/logs?n=10", nil)
|
||||||
|
req.Header.Set("Authorization", "Bearer test-key-abcd1234")
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
mux.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
if w.Code != http.StatusNotFound {
|
||||||
|
t.Fatalf("expected 404, got %d", w.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- extractBearerToken ---
|
||||||
|
|
||||||
|
func TestExtractBearerToken(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
header string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{"Bearer abc123", "abc123"},
|
||||||
|
{"bearer abc123", ""}, // case sensitive
|
||||||
|
{"Basic abc123", ""},
|
||||||
|
{"", ""},
|
||||||
|
{"Bearer ", ""},
|
||||||
|
}
|
||||||
|
for _, tc := range cases {
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||||
|
if tc.header != "" {
|
||||||
|
req.Header.Set("Authorization", tc.header)
|
||||||
|
}
|
||||||
|
got := extractBearerToken(req)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("header=%q: got=%q want=%q", tc.header, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,98 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// tailLogFile streams new lines appended to path to w (SSE text/plain lines).
|
||||||
|
// Sends existing content first (last 200 lines), then polls for new content.
|
||||||
|
// Blocks until ctx is done.
|
||||||
|
func tailLogFile(ctx context.Context, path string, w http.ResponseWriter, flusher http.Flusher) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
// File may not exist yet (agent hasn't written any logs).
|
||||||
|
// Wait for it to appear.
|
||||||
|
f = waitForFile(ctx, path)
|
||||||
|
if f == nil {
|
||||||
|
return // ctx cancelled before file appeared
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
|
||||||
|
// Seek to end minus ~50 KB to avoid dumping the whole file.
|
||||||
|
// This gives "recent context" without overwhelming the SSE stream.
|
||||||
|
const tailBytes = 50 * 1024
|
||||||
|
info, _ := f.Stat()
|
||||||
|
if info != nil && info.Size() > tailBytes {
|
||||||
|
_, _ = f.Seek(-tailBytes, io.SeekEnd)
|
||||||
|
// Skip incomplete first line
|
||||||
|
r := bufio.NewReader(f)
|
||||||
|
_, _ = r.ReadString('\n')
|
||||||
|
// Emit buffered remainder
|
||||||
|
for {
|
||||||
|
line, err := r.ReadString('\n')
|
||||||
|
if line != "" {
|
||||||
|
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tail the file: poll for new bytes every 200ms.
|
||||||
|
// Separate heartbeat ticker keeps proxies / clients alive on idle logs.
|
||||||
|
ticker := time.NewTicker(200 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
heartbeat := time.NewTicker(15 * time.Second)
|
||||||
|
defer heartbeat.Stop()
|
||||||
|
|
||||||
|
reader := bufio.NewReader(f)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-heartbeat.C:
|
||||||
|
if _, err := fmt.Fprint(w, ": ping\n\n"); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
flusher.Flush()
|
||||||
|
case <-ticker.C:
|
||||||
|
for {
|
||||||
|
line, err := reader.ReadString('\n')
|
||||||
|
if line != "" {
|
||||||
|
fmt.Fprintf(w, "event: log\ndata: %s\n\n", line)
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// io.EOF means no more data right now — wait next tick
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitForFile polls until path exists or ctx is done.
|
||||||
|
func waitForFile(ctx context.Context, path string) *os.File {
|
||||||
|
ticker := time.NewTicker(500 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil
|
||||||
|
case <-ticker.C:
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err == nil {
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+94
-10
@@ -4,12 +4,14 @@ package process
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -29,9 +31,10 @@ type AgentInfo struct {
|
|||||||
// AgentStatus combines agent metadata with runtime state.
|
// AgentStatus combines agent metadata with runtime state.
|
||||||
type AgentStatus struct {
|
type AgentStatus struct {
|
||||||
AgentInfo
|
AgentInfo
|
||||||
Running bool
|
Running bool
|
||||||
PID int
|
PID int
|
||||||
Instances int
|
Instances int
|
||||||
|
UptimeSeconds int64 // seconds since agent goroutine started (unified mode) or 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessStats holds resource usage for a running process.
|
// ProcessStats holds resource usage for a running process.
|
||||||
@@ -91,11 +94,25 @@ type Manager struct {
|
|||||||
binPath string
|
binPath string
|
||||||
envFile string // path to .env file for child processes
|
envFile string // path to .env file for child processes
|
||||||
prober processProber
|
prober processProber
|
||||||
|
|
||||||
|
// unifiedMode tracks per-agent goroutine cancel functions and start times
|
||||||
|
// when the unified launcher is running (all agents as goroutines).
|
||||||
|
unifiedMu sync.RWMutex
|
||||||
|
unifiedCancels map[string]context.CancelFunc
|
||||||
|
startedAt map[string]time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager creates a Manager. binPath can be empty for auto-detection.
|
// NewManager creates a Manager. binPath can be empty for auto-detection.
|
||||||
func NewManager(runDir, agentsGlob, binPath string) *Manager {
|
func NewManager(runDir, agentsGlob, binPath string) *Manager {
|
||||||
return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath, envFile: ".env", prober: osProber{}}
|
return &Manager{
|
||||||
|
runDir: runDir,
|
||||||
|
agentsGlob: agentsGlob,
|
||||||
|
binPath: binPath,
|
||||||
|
envFile: ".env",
|
||||||
|
prober: osProber{},
|
||||||
|
unifiedCancels: make(map[string]context.CancelFunc),
|
||||||
|
startedAt: make(map[string]time.Time),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan discovers all agents from config files.
|
// Scan discovers all agents from config files.
|
||||||
@@ -484,8 +501,63 @@ func (m *Manager) UnifiedLogTail(lines int) ([]string, error) {
|
|||||||
return m.LogTail(unifiedID, lines)
|
return m.LogTail(unifiedID, lines)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Per-agent unified control ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
// RegisterUnifiedAgent registers a cancel function and start time for an agent
|
||||||
|
// goroutine running inside the unified launcher. Called by the launcher runtime.
|
||||||
|
func (m *Manager) RegisterUnifiedAgent(id string, cancel context.CancelFunc) {
|
||||||
|
m.unifiedMu.Lock()
|
||||||
|
defer m.unifiedMu.Unlock()
|
||||||
|
m.unifiedCancels[id] = cancel
|
||||||
|
m.startedAt[id] = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnregisterUnifiedAgent removes the cancel function for an agent goroutine.
|
||||||
|
// Called when the goroutine exits.
|
||||||
|
func (m *Manager) UnregisterUnifiedAgent(id string) {
|
||||||
|
m.unifiedMu.Lock()
|
||||||
|
defer m.unifiedMu.Unlock()
|
||||||
|
delete(m.unifiedCancels, id)
|
||||||
|
delete(m.startedAt, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StopUnifiedAgent cancels the goroutine context for a specific agent without
|
||||||
|
// stopping the launcher process. Returns error if agent is not registered.
|
||||||
|
func (m *Manager) StopUnifiedAgent(id string) error {
|
||||||
|
m.unifiedMu.RLock()
|
||||||
|
cancel, ok := m.unifiedCancels[id]
|
||||||
|
m.unifiedMu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("agent %q is not registered in unified mode (not running)", id)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
m.UnregisterUnifiedAgent(id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsUnifiedAgentRunning returns true if the agent goroutine is registered.
|
||||||
|
func (m *Manager) IsUnifiedAgentRunning(id string) bool {
|
||||||
|
m.unifiedMu.RLock()
|
||||||
|
defer m.unifiedMu.RUnlock()
|
||||||
|
_, ok := m.unifiedCancels[id]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
// UptimeSeconds returns how long an agent has been running since registration.
|
||||||
|
// Returns 0 if the agent is not registered or not running.
|
||||||
|
func (m *Manager) UptimeSeconds(id string) int64 {
|
||||||
|
m.unifiedMu.RLock()
|
||||||
|
defer m.unifiedMu.RUnlock()
|
||||||
|
if t, ok := m.startedAt[id]; ok {
|
||||||
|
return int64(time.Since(t).Seconds())
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
// StatusAllUnified returns status for all agents, deriving "running" from
|
// StatusAllUnified returns status for all agents, deriving "running" from
|
||||||
// whether the unified launcher is running + the agent is enabled.
|
// whether the unified launcher is running + per-agent registration.
|
||||||
|
// When per-agent cancel registration is available (via RegisterUnifiedAgent),
|
||||||
|
// running reflects the individual goroutine state rather than launcher-wide enabled.
|
||||||
func (m *Manager) StatusAllUnified() ([]AgentStatus, error) {
|
func (m *Manager) StatusAllUnified() ([]AgentStatus, error) {
|
||||||
agents, err := m.Scan()
|
agents, err := m.Scan()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -494,9 +566,20 @@ func (m *Manager) StatusAllUnified() ([]AgentStatus, error) {
|
|||||||
launcherRunning := m.IsUnifiedRunning()
|
launcherRunning := m.IsUnifiedRunning()
|
||||||
launcherPID := m.UnifiedPID()
|
launcherPID := m.UnifiedPID()
|
||||||
|
|
||||||
|
m.unifiedMu.RLock()
|
||||||
|
hasPerAgentTracking := len(m.unifiedCancels) > 0
|
||||||
|
m.unifiedMu.RUnlock()
|
||||||
|
|
||||||
statuses := make([]AgentStatus, len(agents))
|
statuses := make([]AgentStatus, len(agents))
|
||||||
for i, a := range agents {
|
for i, a := range agents {
|
||||||
running := launcherRunning && a.Enabled
|
var running bool
|
||||||
|
if hasPerAgentTracking {
|
||||||
|
// Per-agent goroutine tracking: check individual registration
|
||||||
|
running = m.IsUnifiedAgentRunning(a.ID)
|
||||||
|
} else {
|
||||||
|
// Fallback: launcher running + agent enabled
|
||||||
|
running = launcherRunning && a.Enabled
|
||||||
|
}
|
||||||
pid := 0
|
pid := 0
|
||||||
instances := 0
|
instances := 0
|
||||||
if running {
|
if running {
|
||||||
@@ -504,10 +587,11 @@ func (m *Manager) StatusAllUnified() ([]AgentStatus, error) {
|
|||||||
instances = 1
|
instances = 1
|
||||||
}
|
}
|
||||||
statuses[i] = AgentStatus{
|
statuses[i] = AgentStatus{
|
||||||
AgentInfo: a,
|
AgentInfo: a,
|
||||||
Running: running,
|
Running: running,
|
||||||
PID: pid,
|
PID: pid,
|
||||||
Instances: instances,
|
Instances: instances,
|
||||||
|
UptimeSeconds: m.UptimeSeconds(a.ID),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return statuses, nil
|
return statuses, nil
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=agents_and_robots — Matrix bot platform launcher
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
WorkingDirectory=/home/ubuntu/CodeProyects/agents_and_robots
|
||||||
|
EnvironmentFile=/home/ubuntu/CodeProyects/agents_and_robots/.env
|
||||||
|
ExecStart=/home/ubuntu/CodeProyects/agents_and_robots/bin/launcher --log-level info --api-port 8487
|
||||||
|
Restart=always
|
||||||
|
RestartSec=5
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
||||||
Reference in New Issue
Block a user