Files
device_agent/capability_fs_test.go
2026-05-30 17:28:38 +02:00

173 lines
4.6 KiB
Go

package main
import (
"encoding/base64"
"os"
"path/filepath"
"strings"
"testing"
)
func tempCap(t *testing.T, allowed ...string) *Capability {
t.Helper()
return &Capability{
Name: "fs.test",
PathsAllowed: allowed,
}
}
func TestFsRead_PathNotAllowed(t *testing.T) {
tmp := t.TempDir()
cap := tempCap(t, tmp+"/**")
_, _, err := runFsRead(cap, map[string]any{"path": "/etc/passwd"})
if err == nil || !strings.Contains(err.Error(), "not allowed") {
t.Fatalf("expected not allowed, got: %v", err)
}
}
func TestFsRead_PathAllowed(t *testing.T) {
tmp := t.TempDir()
target := filepath.Join(tmp, "test.txt")
if err := os.WriteFile(target, []byte("hello world"), 0644); err != nil {
t.Fatal(err)
}
cap := tempCap(t, tmp+"/**")
res, code, err := runFsRead(cap, map[string]any{"path": target})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if code != 0 {
t.Fatalf("expected exit=0 got %d", code)
}
m := res.(map[string]any)
b, _ := base64.StdEncoding.DecodeString(m["content_b64"].(string))
if string(b) != "hello world" {
t.Fatalf("content mismatch: %q", string(b))
}
}
func TestFsRead_MaxBytesTrunca(t *testing.T) {
tmp := t.TempDir()
target := filepath.Join(tmp, "big.bin")
if err := os.WriteFile(target, make([]byte, 1000), 0644); err != nil {
t.Fatal(err)
}
cap := tempCap(t, tmp+"/**")
res, _, err := runFsRead(cap, map[string]any{"path": target, "max_bytes": 100})
if err != nil {
t.Fatal(err)
}
m := res.(map[string]any)
if !m["truncated"].(bool) {
t.Fatalf("expected truncated=true")
}
if m["bytes_read"].(int) != 100 {
t.Fatalf("expected 100 bytes, got %v", m["bytes_read"])
}
}
func TestFsWrite_CreaParent(t *testing.T) {
tmp := t.TempDir()
target := filepath.Join(tmp, "subdir", "nested", "x.txt")
cap := tempCap(t, tmp+"/**")
body := base64.StdEncoding.EncodeToString([]byte("payload"))
_, _, err := runFsWrite(cap, map[string]any{"path": target, "content_b64": body})
if err != nil {
t.Fatalf("write: %v", err)
}
got, err := os.ReadFile(target)
if err != nil {
t.Fatal(err)
}
if string(got) != "payload" {
t.Fatalf("content mismatch: %q", string(got))
}
}
func TestFsList_NoRecursive(t *testing.T) {
tmp := t.TempDir()
for _, n := range []string{"a.txt", "b.txt", "c.txt"} {
os.WriteFile(filepath.Join(tmp, n), []byte("x"), 0644)
}
os.Mkdir(filepath.Join(tmp, "subdir"), 0755)
// archivo en subdir NO debe aparecer (no recursivo)
os.WriteFile(filepath.Join(tmp, "subdir", "hidden.txt"), []byte("y"), 0644)
cap := tempCap(t, tmp+"/**")
res, _, err := runFsList(cap, map[string]any{"dir": tmp})
if err != nil {
t.Fatal(err)
}
m := res.(map[string]any)
entries := m["entries"].([]map[string]any)
if len(entries) != 4 {
t.Fatalf("expected 4 entries, got %d: %+v", len(entries), entries)
}
// Check kind correcto: subdir es dir
for _, e := range entries {
if e["name"] == "subdir" && e["kind"] != "dir" {
t.Fatalf("subdir kind != dir: %v", e)
}
}
}
func TestFsStat_OK(t *testing.T) {
tmp := t.TempDir()
target := filepath.Join(tmp, "z.txt")
os.WriteFile(target, []byte("abc"), 0644)
cap := tempCap(t, tmp+"/**")
res, _, err := runFsStat(cap, map[string]any{"path": target})
if err != nil {
t.Fatal(err)
}
m := res.(map[string]any)
if m["kind"] != "file" {
t.Fatalf("kind: %v", m["kind"])
}
if m["size"].(int64) != 3 {
t.Fatalf("size: %v", m["size"])
}
}
func TestPathTraversal_DotDot(t *testing.T) {
tmp := t.TempDir()
cap := tempCap(t, tmp+"/**")
traversal := filepath.Join(tmp, "..", "..", "..", "etc", "passwd")
_, _, err := runFsRead(cap, map[string]any{"path": traversal})
if err == nil || !strings.Contains(err.Error(), "not allowed") {
t.Fatalf("expected blocked traversal, got err=%v", err)
}
}
func TestPathTraversal_Symlink(t *testing.T) {
tmp := t.TempDir()
// crear symlink dentro de tmp apuntando a /etc/passwd
linkPath := filepath.Join(tmp, "leak")
if err := os.Symlink("/etc/passwd", linkPath); err != nil {
t.Skipf("symlink failed: %v", err)
}
cap := tempCap(t, tmp+"/**")
_, _, err := runFsRead(cap, map[string]any{"path": linkPath})
// EvalSymlinks lo resuelve a /etc/passwd que NO esta en allowed -> reject
if err == nil {
t.Fatalf("expected symlink leak rejected")
}
}
func TestFsList_Glob(t *testing.T) {
tmp := t.TempDir()
for _, n := range []string{"a.log", "b.log", "c.txt"} {
os.WriteFile(filepath.Join(tmp, n), []byte("x"), 0644)
}
cap := tempCap(t, tmp+"/**")
res, _, err := runFsList(cap, map[string]any{"dir": tmp, "glob": "*.log"})
if err != nil {
t.Fatal(err)
}
m := res.(map[string]any)
entries := m["entries"].([]map[string]any)
if len(entries) != 2 {
t.Fatalf("expected 2 .log, got %d", len(entries))
}
}