feat: add testing support for crypto initialization and process management, including auto-recovery and filtering of go wrapper processes
This commit is contained in:
@@ -1,25 +1,34 @@
|
|||||||
BIN := bin
|
BIN := bin
|
||||||
|
TAGS := -tags goolm
|
||||||
LDFLAGS := -ldflags="-s -w"
|
LDFLAGS := -ldflags="-s -w"
|
||||||
|
|
||||||
.PHONY: build build-launcher build-agentctl build-register \
|
.PHONY: build build-launcher build-agentctl build-register \
|
||||||
|
test ci \
|
||||||
list start stop remove register \
|
list start stop remove register \
|
||||||
clean tidy
|
clean tidy
|
||||||
|
|
||||||
|
# ── Test ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
test:
|
||||||
|
go test $(TAGS) ./...
|
||||||
|
|
||||||
# ── Build ──────────────────────────────────────────────────────────────────
|
# ── Build ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
ci: test build
|
||||||
|
|
||||||
build: build-launcher build-agentctl build-register
|
build: build-launcher build-agentctl build-register
|
||||||
|
|
||||||
build-launcher:
|
build-launcher:
|
||||||
@mkdir -p $(BIN)
|
@mkdir -p $(BIN)
|
||||||
go build $(LDFLAGS) -o $(BIN)/launcher ./cmd/launcher
|
go build $(TAGS) $(LDFLAGS) -o $(BIN)/launcher ./cmd/launcher
|
||||||
|
|
||||||
build-agentctl:
|
build-agentctl:
|
||||||
@mkdir -p $(BIN)
|
@mkdir -p $(BIN)
|
||||||
go build $(LDFLAGS) -o $(BIN)/agentctl ./cmd/agentctl
|
go build $(TAGS) $(LDFLAGS) -o $(BIN)/agentctl ./cmd/agentctl
|
||||||
|
|
||||||
build-register:
|
build-register:
|
||||||
@mkdir -p $(BIN)
|
@mkdir -p $(BIN)
|
||||||
go build $(LDFLAGS) -o $(BIN)/register ./cmd/register
|
go build $(TAGS) $(LDFLAGS) -o $(BIN)/register ./cmd/register
|
||||||
|
|
||||||
# ── Agent management (shortcuts via agentctl) ──────────────────────────────
|
# ── Agent management (shortcuts via agentctl) ──────────────────────────────
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"log/slog"
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"maunium.net/go/mautrix/event"
|
"maunium.net/go/mautrix/event"
|
||||||
|
|
||||||
@@ -55,12 +54,6 @@ func New(cfg *config.AgentConfig, rules []decision.Rule, logger *slog.Logger) (*
|
|||||||
logger.Info("initializing e2ee", "store", storePath)
|
logger.Info("initializing e2ee", "store", storePath)
|
||||||
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
|
cryptoStore, err = matrixClient.InitCrypto(context.Background(), storePath, pickleKey, cfg.Agent.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.Contains(err.Error(), "not marked as shared") {
|
|
||||||
logger.Error("crypto store is inconsistent with server — need a fresh device",
|
|
||||||
"store", storePath,
|
|
||||||
"fix", "delete crypto.db, login with password to get new token+device, update .env, restart",
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("e2ee init: %w", err)
|
return nil, fmt.Errorf("e2ee init: %w", err)
|
||||||
}
|
}
|
||||||
logger.Info("e2ee ready")
|
logger.Info("e2ee ready")
|
||||||
|
|||||||
@@ -9,6 +9,10 @@ LDFLAGS="-ldflags=-s -w"
|
|||||||
|
|
||||||
mkdir -p "$BIN"
|
mkdir -p "$BIN"
|
||||||
|
|
||||||
|
echo "==> Ejecutando tests..."
|
||||||
|
go test $TAGS ./...
|
||||||
|
echo ""
|
||||||
|
|
||||||
echo "==> Compilando todos los binarios en $BIN/ ..."
|
echo "==> Compilando todos los binarios en $BIN/ ..."
|
||||||
|
|
||||||
targets=(
|
targets=(
|
||||||
|
|||||||
@@ -20,6 +20,11 @@ start_agent() {
|
|||||||
|
|
||||||
# Build the binary first to avoid go run wrapper PID issues
|
# Build the binary first to avoid go run wrapper PID issues
|
||||||
if [[ ! -x "$bin" ]] || [[ "$(find ./cmd/launcher -newer "$bin" 2>/dev/null | head -1)" ]]; then
|
if [[ ! -x "$bin" ]] || [[ "$(find ./cmd/launcher -newer "$bin" 2>/dev/null | head -1)" ]]; then
|
||||||
|
info "Ejecutando tests..."
|
||||||
|
"$GO" test -tags goolm ./... || {
|
||||||
|
fail "$id tests fallaron — corrige antes de compilar"
|
||||||
|
return 1
|
||||||
|
}
|
||||||
info "Compilando launcher..."
|
info "Compilando launcher..."
|
||||||
mkdir -p "$(dirname "$bin")"
|
mkdir -p "$(dirname "$bin")"
|
||||||
"$GO" build -tags goolm -o "$bin" ./cmd/launcher || {
|
"$GO" build -tags goolm -o "$bin" ./cmd/launcher || {
|
||||||
|
|||||||
+89
-19
@@ -7,8 +7,10 @@ import (
|
|||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"maunium.net/go/mautrix"
|
"maunium.net/go/mautrix"
|
||||||
"maunium.net/go/mautrix/crypto/cryptohelper"
|
"maunium.net/go/mautrix/crypto/cryptohelper"
|
||||||
@@ -44,6 +46,37 @@ func New(cfg config.MatrixCfg) (*Client, error) {
|
|||||||
return &Client{raw: raw, cfg: cfg}, nil
|
return &Client{raw: raw, cfg: cfg}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cryptoIniter abstracts crypto helper creation for testing.
|
||||||
|
type cryptoIniter interface {
|
||||||
|
newHelper(pickleKey []byte, storePath string) (cryptoHelper, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cryptoHelper abstracts the mautrix CryptoHelper for testing.
|
||||||
|
type cryptoHelper interface {
|
||||||
|
io.Closer
|
||||||
|
Init(ctx context.Context) error
|
||||||
|
SetAccountID(id string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mautrixCryptoIniter is the real implementation using mautrix.
|
||||||
|
type mautrixCryptoIniter struct {
|
||||||
|
raw *mautrix.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mautrixCryptoIniter) newHelper(pickleKey []byte, storePath string) (cryptoHelper, error) {
|
||||||
|
h, err := cryptohelper.NewCryptoHelper(m.raw, pickleKey, storePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &mautrixCryptoWrapper{h}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type mautrixCryptoWrapper struct {
|
||||||
|
*cryptohelper.CryptoHelper
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *mautrixCryptoWrapper) SetAccountID(id string) { w.DBAccountID = id }
|
||||||
|
|
||||||
// InitCrypto sets up end-to-end encryption using the mautrix cryptohelper.
|
// InitCrypto sets up end-to-end encryption using the mautrix cryptohelper.
|
||||||
// storePath is the SQLite file path for crypto material (e.g. "./agents/<id>/data/crypto/crypto.db").
|
// storePath is the SQLite file path for crypto material (e.g. "./agents/<id>/data/crypto/crypto.db").
|
||||||
// pickleKeyHex is a hex-encoded key for encrypting crypto material at rest. If empty,
|
// pickleKeyHex is a hex-encoded key for encrypting crypto material at rest. If empty,
|
||||||
@@ -51,44 +84,81 @@ func New(cfg config.MatrixCfg) (*Client, error) {
|
|||||||
// agentID namespaces the crypto state within the database.
|
// agentID namespaces the crypto state within the database.
|
||||||
// Returns an io.Closer that must be called on agent shutdown to flush the crypto store.
|
// Returns an io.Closer that must be called on agent shutdown to flush the crypto store.
|
||||||
func (c *Client) InitCrypto(ctx context.Context, storePath, pickleKeyHex, agentID string) (io.Closer, error) {
|
func (c *Client) InitCrypto(ctx context.Context, storePath, pickleKeyHex, agentID string) (io.Closer, error) {
|
||||||
// Resolve the actual device ID from the server — the value in config may differ
|
// Resolve the actual device ID from the server.
|
||||||
// from what the registration process assigned.
|
|
||||||
whoami, err := c.raw.Whoami(ctx)
|
whoami, err := c.raw.Whoami(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("whoami for crypto init: %w", err)
|
return nil, fmt.Errorf("whoami for crypto init: %w", err)
|
||||||
}
|
}
|
||||||
c.raw.DeviceID = whoami.DeviceID
|
c.raw.DeviceID = whoami.DeviceID
|
||||||
|
|
||||||
// Use explicit pickle key if provided, otherwise derive from access token.
|
initer := &mautrixCryptoIniter{raw: c.raw}
|
||||||
var pickleKey []byte
|
closer, helper, err := initCryptoCore(ctx, storePath, pickleKeyHex, c.raw.AccessToken, agentID, initer, slog.Default())
|
||||||
if pickleKeyHex != "" {
|
if err != nil {
|
||||||
pickleKey, err = hex.DecodeString(pickleKeyHex)
|
return nil, err
|
||||||
if err != nil {
|
}
|
||||||
return nil, fmt.Errorf("decode pickle_key_env: %w", err)
|
|
||||||
}
|
// Assign the real mautrix crypto helper — this satisfies mautrix.CryptoHelper.
|
||||||
} else {
|
c.raw.Crypto = helper.(*mautrixCryptoWrapper)
|
||||||
sum := sha256.Sum256([]byte(c.raw.AccessToken))
|
return closer, nil
|
||||||
pickleKey = sum[:]
|
}
|
||||||
|
|
||||||
|
// initCryptoCore contains the testable logic: pickle key resolution, store
|
||||||
|
// creation, and auto-recovery on stale crypto.db. Returns (closer, helper, err).
|
||||||
|
func initCryptoCore(ctx context.Context, storePath, pickleKeyHex, accessToken, agentID string, initer cryptoIniter, logger *slog.Logger) (io.Closer, cryptoHelper, error) {
|
||||||
|
pickleKey, err := resolvePickleKey(pickleKeyHex, accessToken)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := os.MkdirAll(filepath.Dir(storePath), 0700); err != nil {
|
if err := os.MkdirAll(filepath.Dir(storePath), 0700); err != nil {
|
||||||
return nil, fmt.Errorf("create crypto store dir: %w", err)
|
return nil, nil, fmt.Errorf("create crypto store dir: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
helper, err := cryptohelper.NewCryptoHelper(c.raw, pickleKey, storePath)
|
helper, err := initHelper(ctx, initer, pickleKey, storePath, agentID)
|
||||||
|
if err != nil && strings.Contains(err.Error(), "not marked as shared") {
|
||||||
|
logger.Warn("crypto store inconsistent, attempting auto-recovery",
|
||||||
|
"store", storePath,
|
||||||
|
)
|
||||||
|
if removeErr := os.Remove(storePath); removeErr != nil && !os.IsNotExist(removeErr) {
|
||||||
|
return nil, nil, fmt.Errorf("auto-recovery: remove stale crypto.db: %w (original: %w)", removeErr, err)
|
||||||
|
}
|
||||||
|
helper, err = initHelper(ctx, initer, pickleKey, storePath, agentID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("e2ee init after auto-recovery: %w", err)
|
||||||
|
}
|
||||||
|
logger.Info("e2ee auto-recovery succeeded")
|
||||||
|
} else if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("init e2ee: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return helper, helper, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func initHelper(ctx context.Context, initer cryptoIniter, pickleKey []byte, storePath, agentID string) (cryptoHelper, error) {
|
||||||
|
helper, err := initer.newHelper(pickleKey, storePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("create crypto helper: %w", err)
|
return nil, fmt.Errorf("create crypto helper: %w", err)
|
||||||
}
|
}
|
||||||
helper.DBAccountID = agentID
|
helper.SetAccountID(agentID)
|
||||||
|
|
||||||
if err := helper.Init(ctx); err != nil {
|
if err := helper.Init(ctx); err != nil {
|
||||||
return nil, fmt.Errorf("init e2ee: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
c.raw.Crypto = helper
|
|
||||||
return helper, nil
|
return helper, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resolvePickleKey decodes a hex key or derives one from the access token.
|
||||||
|
func resolvePickleKey(pickleKeyHex, accessToken string) ([]byte, error) {
|
||||||
|
if pickleKeyHex != "" {
|
||||||
|
key, err := hex.DecodeString(pickleKeyHex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode pickle_key_env: %w", err)
|
||||||
|
}
|
||||||
|
return key, nil
|
||||||
|
}
|
||||||
|
sum := sha256.Sum256([]byte(accessToken))
|
||||||
|
return sum[:], nil
|
||||||
|
}
|
||||||
|
|
||||||
// SendText sends a plain-text message to a room.
|
// SendText sends a plain-text message to a room.
|
||||||
// If the room has E2EE enabled and crypto is initialized, the message is encrypted automatically.
|
// If the room has E2EE enabled and crypto is initialized, the message is encrypted automatically.
|
||||||
func (c *Client) SendText(ctx context.Context, roomID, text string) error {
|
func (c *Client) SendText(ctx context.Context, roomID, text string) error {
|
||||||
|
|||||||
@@ -0,0 +1,172 @@
|
|||||||
|
package matrix
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeCryptoHelper implements cryptoHelper for testing.
|
||||||
|
type fakeCryptoHelper struct {
|
||||||
|
initErr error
|
||||||
|
closed bool
|
||||||
|
accountID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeCryptoHelper) Init(ctx context.Context) error { return f.initErr }
|
||||||
|
func (f *fakeCryptoHelper) Close() error { f.closed = true; return nil }
|
||||||
|
func (f *fakeCryptoHelper) SetAccountID(id string) { f.accountID = id }
|
||||||
|
|
||||||
|
// fakeCryptoIniter implements cryptoIniter for testing.
|
||||||
|
type fakeCryptoIniter struct {
|
||||||
|
calls int
|
||||||
|
helpers []*fakeCryptoHelper // one per call
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeCryptoIniter) newHelper(pickleKey []byte, storePath string) (cryptoHelper, error) {
|
||||||
|
idx := f.calls
|
||||||
|
f.calls++
|
||||||
|
if idx < len(f.helpers) {
|
||||||
|
return f.helpers[idx], nil
|
||||||
|
}
|
||||||
|
return &fakeCryptoHelper{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitCryptoCore_Success(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
storePath := filepath.Join(dir, "crypto", "crypto.db")
|
||||||
|
|
||||||
|
initer := &fakeCryptoIniter{
|
||||||
|
helpers: []*fakeCryptoHelper{{initErr: nil}},
|
||||||
|
}
|
||||||
|
|
||||||
|
closer, _, err := initCryptoCore(context.Background(), storePath, "", "fake-token", "test-agent", initer, slog.Default())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if closer == nil {
|
||||||
|
t.Error("expected non-nil closer")
|
||||||
|
}
|
||||||
|
if initer.calls != 1 {
|
||||||
|
t.Errorf("expected 1 call to newHelper, got %d", initer.calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitCryptoCore_AutoRecoveryOnStaleStore(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
storePath := filepath.Join(dir, "crypto", "crypto.db")
|
||||||
|
|
||||||
|
// Create a stale crypto.db file.
|
||||||
|
_ = os.MkdirAll(filepath.Dir(storePath), 0700)
|
||||||
|
_ = os.WriteFile(storePath, []byte("stale"), 0o644)
|
||||||
|
|
||||||
|
// First call fails with "not marked as shared", second succeeds.
|
||||||
|
initer := &fakeCryptoIniter{
|
||||||
|
helpers: []*fakeCryptoHelper{
|
||||||
|
{initErr: errors.New("device keys not marked as shared")},
|
||||||
|
{initErr: nil},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err := initCryptoCore(context.Background(), storePath, "", "fake-token", "test-agent", initer, slog.Default())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected auto-recovery to succeed, got: %v", err)
|
||||||
|
}
|
||||||
|
if initer.calls != 2 {
|
||||||
|
t.Errorf("expected 2 calls (fail + retry), got %d", initer.calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitCryptoCore_AutoRecoveryFailsTwice(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
storePath := filepath.Join(dir, "crypto", "crypto.db")
|
||||||
|
|
||||||
|
_ = os.MkdirAll(filepath.Dir(storePath), 0700)
|
||||||
|
_ = os.WriteFile(storePath, []byte("stale"), 0o644)
|
||||||
|
|
||||||
|
initer := &fakeCryptoIniter{
|
||||||
|
helpers: []*fakeCryptoHelper{
|
||||||
|
{initErr: errors.New("not marked as shared")},
|
||||||
|
{initErr: errors.New("still broken after recovery")},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err := initCryptoCore(context.Background(), storePath, "", "fake-token", "test-agent", initer, slog.Default())
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error when recovery also fails")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "after auto-recovery") {
|
||||||
|
t.Errorf("expected 'after auto-recovery' in error, got: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitCryptoCore_NonRecoverableError(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
storePath := filepath.Join(dir, "crypto", "crypto.db")
|
||||||
|
|
||||||
|
initer := &fakeCryptoIniter{
|
||||||
|
helpers: []*fakeCryptoHelper{
|
||||||
|
{initErr: errors.New("connection refused")},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err := initCryptoCore(context.Background(), storePath, "", "fake-token", "test-agent", initer, slog.Default())
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for non-recoverable failure")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "init e2ee") {
|
||||||
|
t.Errorf("expected 'init e2ee' in error, got: %v", err)
|
||||||
|
}
|
||||||
|
// Should NOT have retried.
|
||||||
|
if initer.calls != 1 {
|
||||||
|
t.Errorf("expected 1 call (no retry for non-stale error), got %d", initer.calls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolvePickleKey_BadHex(t *testing.T) {
|
||||||
|
_, err := resolvePickleKey("not-hex!", "token")
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for invalid hex pickle key")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "decode pickle_key_env") {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolvePickleKey_DeriveFromToken(t *testing.T) {
|
||||||
|
key, err := resolvePickleKey("", "my-access-token")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(key) != 32 {
|
||||||
|
t.Errorf("expected 32-byte sha256 key, got %d bytes", len(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolvePickleKey_Explicit(t *testing.T) {
|
||||||
|
hexKey := "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||||
|
key, err := resolvePickleKey(hexKey, "ignored")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if len(key) != 32 {
|
||||||
|
t.Errorf("expected 32 bytes, got %d", len(key))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInitHelper_SetsAccountID(t *testing.T) {
|
||||||
|
helper := &fakeCryptoHelper{}
|
||||||
|
initer := &fakeCryptoIniter{helpers: []*fakeCryptoHelper{helper}}
|
||||||
|
|
||||||
|
_, err := initHelper(context.Background(), initer, []byte("key"), "/fake", "my-agent")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if helper.accountID != "my-agent" {
|
||||||
|
t.Errorf("expected accountID='my-agent', got '%s'", helper.accountID)
|
||||||
|
}
|
||||||
|
}
|
||||||
+55
-12
@@ -43,17 +43,57 @@ type ProcessStats struct {
|
|||||||
LogBytes int64
|
LogBytes int64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processProber abstracts process detection for testing.
|
||||||
|
type processProber interface {
|
||||||
|
// pgrepPIDs runs pgrep -f with the given pattern and returns matching PIDs.
|
||||||
|
pgrepPIDs(pattern string) []int
|
||||||
|
// processComm returns the comm name for a PID (e.g. "launcher", "go").
|
||||||
|
processComm(pid int) string
|
||||||
|
// isAlive checks if a PID is running.
|
||||||
|
isAlive(pid int) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// osProber is the real implementation using OS calls.
|
||||||
|
type osProber struct{}
|
||||||
|
|
||||||
|
func (osProber) pgrepPIDs(pattern string) []int {
|
||||||
|
out, err := exec.Command("pgrep", "-f", pattern).Output()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var pids []int
|
||||||
|
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
|
||||||
|
if p, err := strconv.Atoi(strings.TrimSpace(line)); err == nil && p > 0 {
|
||||||
|
pids = append(pids, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pids
|
||||||
|
}
|
||||||
|
|
||||||
|
func (osProber) processComm(pid int) string {
|
||||||
|
data, err := os.ReadFile(fmt.Sprintf("/proc/%d/comm", pid))
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (osProber) isAlive(pid int) bool {
|
||||||
|
return syscall.Kill(pid, 0) == nil
|
||||||
|
}
|
||||||
|
|
||||||
// Manager handles agent process lifecycle.
|
// Manager handles agent process lifecycle.
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
runDir string
|
runDir string
|
||||||
agentsGlob string
|
agentsGlob string
|
||||||
binPath string
|
binPath string
|
||||||
envFile string // path to .env file for child processes
|
envFile string // path to .env file for child processes
|
||||||
|
prober processProber
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager creates a Manager. binPath can be empty for auto-detection.
|
// NewManager creates a Manager. binPath can be empty for auto-detection.
|
||||||
func NewManager(runDir, agentsGlob, binPath string) *Manager {
|
func NewManager(runDir, agentsGlob, binPath string) *Manager {
|
||||||
return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath, envFile: ".env"}
|
return &Manager{runDir: runDir, agentsGlob: agentsGlob, binPath: binPath, envFile: ".env", prober: osProber{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scan discovers all agents from config files.
|
// Scan discovers all agents from config files.
|
||||||
@@ -110,8 +150,11 @@ func (m *Manager) StatusAll() ([]AgentStatus, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Start launches an agent process in the background.
|
// Start launches an agent process in the background.
|
||||||
// Multiple instances of the same agent are allowed.
|
// Returns an error if the agent is already running.
|
||||||
func (m *Manager) Start(info AgentInfo) error {
|
func (m *Manager) Start(info AgentInfo) error {
|
||||||
|
if pids := m.findProcessPIDs(info.ID); len(pids) > 0 {
|
||||||
|
return fmt.Errorf("agent %q is already running (PID %d)", info.ID, pids[0])
|
||||||
|
}
|
||||||
if err := os.MkdirAll(m.runDir, 0o755); err != nil {
|
if err := os.MkdirAll(m.runDir, 0o755); err != nil {
|
||||||
return fmt.Errorf("create run dir: %w", err)
|
return fmt.Errorf("create run dir: %w", err)
|
||||||
}
|
}
|
||||||
@@ -351,23 +394,23 @@ func (m *Manager) readPID(id string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// findProcessPIDs searches for running launcher processes for a given agent ID
|
// findProcessPIDs searches for running launcher processes for a given agent ID
|
||||||
// using pgrep. Returns all matching PIDs.
|
// using pgrep. Filters out "go run" wrapper PIDs to avoid double-counting.
|
||||||
func (m *Manager) findProcessPIDs(id string) []int {
|
func (m *Manager) findProcessPIDs(id string) []int {
|
||||||
// First try to find the config path for this agent
|
|
||||||
configPath := m.configPathFor(id)
|
configPath := m.configPathFor(id)
|
||||||
if configPath == "" {
|
if configPath == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
pattern := fmt.Sprintf("launcher.*-c.*%s", configPath)
|
pattern := fmt.Sprintf("launcher.*-c.*%s", configPath)
|
||||||
out, err := exec.Command("pgrep", "-f", pattern).Output()
|
raw := m.prober.pgrepPIDs(pattern)
|
||||||
if err != nil {
|
|
||||||
return nil
|
// Filter out the "go" wrapper process that appears when using "go run".
|
||||||
}
|
|
||||||
var pids []int
|
var pids []int
|
||||||
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
|
for _, p := range raw {
|
||||||
if p, err := strconv.Atoi(strings.TrimSpace(line)); err == nil && p > 0 {
|
comm := m.prober.processComm(p)
|
||||||
pids = append(pids, p)
|
if comm == "go" {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
pids = append(pids, p)
|
||||||
}
|
}
|
||||||
return pids
|
return pids
|
||||||
}
|
}
|
||||||
@@ -415,7 +458,7 @@ func (m *Manager) resolveRunningPID(id string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) isAlive(pid int) bool {
|
func (m *Manager) isAlive(pid int) bool {
|
||||||
return syscall.Kill(pid, 0) == nil
|
return m.prober.isAlive(pid)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) removePID(id string) {
|
func (m *Manager) removePID(id string) {
|
||||||
|
|||||||
@@ -0,0 +1,190 @@
|
|||||||
|
package process
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeProber is a test double for processProber.
|
||||||
|
type fakeProber struct {
|
||||||
|
pids map[string][]int // pattern → PIDs
|
||||||
|
comms map[int]string // PID → comm name
|
||||||
|
alive map[int]bool // PID → is alive
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeProber() *fakeProber {
|
||||||
|
return &fakeProber{
|
||||||
|
pids: make(map[string][]int),
|
||||||
|
comms: make(map[int]string),
|
||||||
|
alive: make(map[int]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeProber) pgrepPIDs(pattern string) []int { return f.pids[pattern] }
|
||||||
|
func (f *fakeProber) processComm(pid int) string { return f.comms[pid] }
|
||||||
|
func (f *fakeProber) isAlive(pid int) bool { return f.alive[pid] }
|
||||||
|
|
||||||
|
// testManager creates a Manager with a temp dir, fake prober, and a config file.
|
||||||
|
func testManager(t *testing.T, fp *fakeProber) (*Manager, string) {
|
||||||
|
t.Helper()
|
||||||
|
dir := t.TempDir()
|
||||||
|
runDir := filepath.Join(dir, "run")
|
||||||
|
agentsDir := filepath.Join(dir, "agents", "test-bot")
|
||||||
|
_ = os.MkdirAll(runDir, 0o755)
|
||||||
|
_ = os.MkdirAll(agentsDir, 0o755)
|
||||||
|
|
||||||
|
// Minimal config.yaml so Scan() and configPathFor() work.
|
||||||
|
cfgPath := filepath.Join(agentsDir, "config.yaml")
|
||||||
|
_ = os.WriteFile(cfgPath, []byte(`agent:
|
||||||
|
id: test-bot
|
||||||
|
name: Test Bot
|
||||||
|
version: "0.1"
|
||||||
|
enabled: true
|
||||||
|
`), 0o644)
|
||||||
|
|
||||||
|
glob := filepath.Join(dir, "agents", "*", "config.yaml")
|
||||||
|
m := &Manager{
|
||||||
|
runDir: runDir,
|
||||||
|
agentsGlob: glob,
|
||||||
|
binPath: "/bin/true", // won't actually run
|
||||||
|
envFile: "",
|
||||||
|
prober: fp,
|
||||||
|
}
|
||||||
|
return m, cfgPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindProcessPIDs_FiltersGoWrapper(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, cfgPath := testManager(t, fp)
|
||||||
|
|
||||||
|
// Simulate pgrep returning 2 PIDs: go wrapper (100) + real launcher (200).
|
||||||
|
pattern := "launcher.*-c.*" + cfgPath
|
||||||
|
fp.pids[pattern] = []int{100, 200}
|
||||||
|
fp.comms[100] = "go"
|
||||||
|
fp.comms[200] = "launcher"
|
||||||
|
|
||||||
|
pids := m.findProcessPIDs("test-bot")
|
||||||
|
|
||||||
|
if len(pids) != 1 {
|
||||||
|
t.Fatalf("expected 1 PID, got %d: %v", len(pids), pids)
|
||||||
|
}
|
||||||
|
if pids[0] != 200 {
|
||||||
|
t.Errorf("expected PID 200, got %d", pids[0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFindProcessPIDs_NoPIDs(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, _ := testManager(t, fp)
|
||||||
|
|
||||||
|
pids := m.findProcessPIDs("test-bot")
|
||||||
|
if len(pids) != 0 {
|
||||||
|
t.Fatalf("expected 0 PIDs, got %d", len(pids))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatus_SingleInstance(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, cfgPath := testManager(t, fp)
|
||||||
|
|
||||||
|
pattern := "launcher.*-c.*" + cfgPath
|
||||||
|
fp.pids[pattern] = []int{42}
|
||||||
|
fp.comms[42] = "launcher"
|
||||||
|
|
||||||
|
info := AgentInfo{ID: "test-bot", Name: "Test", ConfigPath: cfgPath, Enabled: true}
|
||||||
|
st := m.Status(info)
|
||||||
|
|
||||||
|
if !st.Running {
|
||||||
|
t.Error("expected Running=true")
|
||||||
|
}
|
||||||
|
if st.PID != 42 {
|
||||||
|
t.Errorf("expected PID=42, got %d", st.PID)
|
||||||
|
}
|
||||||
|
if st.Instances != 1 {
|
||||||
|
t.Errorf("expected Instances=1, got %d", st.Instances)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatus_NoInstances(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, cfgPath := testManager(t, fp)
|
||||||
|
|
||||||
|
info := AgentInfo{ID: "test-bot", Name: "Test", ConfigPath: cfgPath, Enabled: true}
|
||||||
|
st := m.Status(info)
|
||||||
|
|
||||||
|
if st.Running {
|
||||||
|
t.Error("expected Running=false")
|
||||||
|
}
|
||||||
|
if st.Instances != 0 {
|
||||||
|
t.Errorf("expected Instances=0, got %d", st.Instances)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStart_RejectsWhenAlreadyRunning(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, cfgPath := testManager(t, fp)
|
||||||
|
|
||||||
|
pattern := "launcher.*-c.*" + cfgPath
|
||||||
|
fp.pids[pattern] = []int{99}
|
||||||
|
fp.comms[99] = "launcher"
|
||||||
|
|
||||||
|
info := AgentInfo{ID: "test-bot", Name: "Test", ConfigPath: cfgPath, Enabled: true}
|
||||||
|
err := m.Start(info)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error when agent already running")
|
||||||
|
}
|
||||||
|
if got := err.Error(); got != `agent "test-bot" is already running (PID 99)` {
|
||||||
|
t.Errorf("unexpected error: %s", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveRunningPID_RepairsStale(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, cfgPath := testManager(t, fp)
|
||||||
|
|
||||||
|
// Write a stale PID file (PID 999 is dead).
|
||||||
|
_ = os.MkdirAll(m.runDir, 0o755)
|
||||||
|
_ = os.WriteFile(m.pidPath("test-bot"), []byte("999"), 0o644)
|
||||||
|
fp.alive[999] = false
|
||||||
|
|
||||||
|
// But the real process is at PID 42.
|
||||||
|
pattern := "launcher.*-c.*" + cfgPath
|
||||||
|
fp.pids[pattern] = []int{42}
|
||||||
|
fp.comms[42] = "launcher"
|
||||||
|
|
||||||
|
pid := m.resolveRunningPID("test-bot")
|
||||||
|
if pid != 42 {
|
||||||
|
t.Errorf("expected repaired PID=42, got %d", pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify PID file was repaired.
|
||||||
|
data, err := os.ReadFile(m.pidPath("test-bot"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("read pid file: %v", err)
|
||||||
|
}
|
||||||
|
if got, _ := strconv.Atoi(string(data)); got != 42 {
|
||||||
|
t.Errorf("expected PID file to contain 42, got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResolveRunningPID_CleansUpStalePIDFile(t *testing.T) {
|
||||||
|
fp := newFakeProber()
|
||||||
|
m, _ := testManager(t, fp)
|
||||||
|
|
||||||
|
// Write a stale PID file, no real process running.
|
||||||
|
_ = os.MkdirAll(m.runDir, 0o755)
|
||||||
|
_ = os.WriteFile(m.pidPath("test-bot"), []byte("999"), 0o644)
|
||||||
|
fp.alive[999] = false
|
||||||
|
|
||||||
|
pid := m.resolveRunningPID("test-bot")
|
||||||
|
if pid != 0 {
|
||||||
|
t.Errorf("expected 0 for dead process, got %d", pid)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PID file should be removed.
|
||||||
|
if _, err := os.Stat(m.pidPath("test-bot")); !os.IsNotExist(err) {
|
||||||
|
t.Error("expected stale PID file to be removed")
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user