diff --git a/pkg/blobstore/blobstore.go b/pkg/blobstore/blobstore.go index 54c82e0..48d32a2 100644 --- a/pkg/blobstore/blobstore.go +++ b/pkg/blobstore/blobstore.go @@ -1,9 +1,15 @@ -// Package blobstore is a content-addressed object store on local disk. +// Package blobstore is a content-addressed object store for media ciphertext. // // The bus transports messages, not blobs. Media (images, files, large payloads) // is encrypted by the client BEFORE being stored here, so the store only ever // sees ciphertext. Objects are addressed by the sha256 hex of their (encrypted) // bytes, which makes Put idempotent and deduplicating. +// +// Store is an interface (branch-by-abstraction, issue 0003d) with two backends: +// diskStore (the default, local filesystem) and objectStore (NATS Object Store +// on JetStream, replicated across the cluster so blobs survive a node loss and +// are reachable from any node). The wire contract (sha256-hex addressing) is +// identical, so a client cannot tell which backend a membershipd uses. package blobstore import ( @@ -14,27 +20,38 @@ import ( "path/filepath" ) -// Store is a directory-backed content-addressed blob store. -type Store struct { +// Store is a content-addressed blob store: Put returns the sha256-hex address of +// the stored bytes, Get fetches by that address, Has reports presence. +type Store interface { + Put(data []byte) (string, error) + Get(hash string) ([]byte, error) + Has(hash string) bool +} + +// diskStore is a directory-backed content-addressed blob store (the default, +// single-node backend). +type diskStore struct { dir string } -// New creates a Store rooted at dir, creating the directory if needed. -func New(dir string) (*Store, error) { +// New creates a disk-backed Store rooted at dir, creating the directory if +// needed. It remains the default backend; the replicated NATS Object Store is +// constructed separately (NewObjectStore) when decentralization is enabled. +func New(dir string) (Store, error) { if err := os.MkdirAll(dir, 0o755); err != nil { return nil, fmt.Errorf("blobstore: mkdir %q: %w", dir, err) } - return &Store{dir: dir}, nil + return &diskStore{dir: dir}, nil } // path returns the on-disk path for a given content hash. -func (s *Store) path(hash string) string { +func (s *diskStore) path(hash string) string { return filepath.Join(s.dir, hash) } // Put writes data to the store and returns its sha256 hex hash. If an object // with the same content already exists, Put is a no-op and returns the hash. -func (s *Store) Put(data []byte) (string, error) { +func (s *diskStore) Put(data []byte) (string, error) { sum := sha256.Sum256(data) hash := hex.EncodeToString(sum[:]) p := s.path(hash) @@ -66,7 +83,7 @@ func (s *Store) Put(data []byte) (string, error) { } // Get reads the object with the given hash. -func (s *Store) Get(hash string) ([]byte, error) { +func (s *diskStore) Get(hash string) ([]byte, error) { data, err := os.ReadFile(s.path(hash)) if err != nil { return nil, fmt.Errorf("blobstore: get %q: %w", hash, err) @@ -75,7 +92,7 @@ func (s *Store) Get(hash string) ([]byte, error) { } // Has reports whether an object with the given hash exists. -func (s *Store) Has(hash string) bool { +func (s *diskStore) Has(hash string) bool { _, err := os.Stat(s.path(hash)) return err == nil } diff --git a/pkg/blobstore/objectstore.go b/pkg/blobstore/objectstore.go new file mode 100644 index 0000000..cba88d3 --- /dev/null +++ b/pkg/blobstore/objectstore.go @@ -0,0 +1,102 @@ +package blobstore + +// objectStore is the NATS Object Store implementation of Store (issue 0003d): +// media ciphertext lives in a JetStream Object Store bucket replicated across +// the cluster, so a blob uploaded to one node is durable against the loss of a +// node and readable from any node. It is selected when decentralization is on; +// diskStore stays the single-node default. The content-addressing (sha256-hex) +// is identical to the disk backend, so the wire contract does not change. + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +const ( + defaultObjectBucket = "UNIBUS_blobs" + defaultObjOpTime = 10 * time.Second +) + +// ObjectStoreConfig configures the replicated Object Store backend. +type ObjectStoreConfig struct { + // Bucket is the object store bucket name; empty uses UNIBUS_blobs. + Bucket string + // Replicas is the replication factor (R1..R5), matching the KV store's + // R1->R3 rollout. + Replicas int + // OpTimeout bounds each object operation; zero uses defaultObjOpTime. + OpTimeout time.Duration +} + +type objectStore struct { + os jetstream.ObjectStore + opTimeout time.Duration +} + +// NewObjectStore creates (or opens) the replicated Object Store bucket on js and +// returns it as a Store. The JetStream context belongs to the caller. +func NewObjectStore(js jetstream.JetStream, cfg ObjectStoreConfig) (Store, error) { + if cfg.Bucket == "" { + cfg.Bucket = defaultObjectBucket + } + if cfg.Replicas <= 0 { + cfg.Replicas = 1 + } + opTimeout := cfg.OpTimeout + if opTimeout <= 0 { + opTimeout = defaultObjOpTime + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + obj, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{ + Bucket: cfg.Bucket, + Replicas: cfg.Replicas, + Storage: jetstream.FileStorage, + }) + if err != nil { + return nil, fmt.Errorf("blobstore: open object store %q (replicas=%d): %w", cfg.Bucket, cfg.Replicas, err) + } + return &objectStore{os: obj, opTimeout: opTimeout}, nil +} + +func (s *objectStore) ctx() (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), s.opTimeout) +} + +// Put stores data under its sha256-hex address. Re-putting identical bytes is a +// harmless overwrite (same address, same content), preserving the idempotent, +// deduplicating semantics of the disk backend. +func (s *objectStore) Put(data []byte) (string, error) { + sum := sha256.Sum256(data) + hash := hex.EncodeToString(sum[:]) + ctx, cancel := s.ctx() + defer cancel() + if _, err := s.os.PutBytes(ctx, hash, data); err != nil { + return "", fmt.Errorf("blobstore: put object %q: %w", hash, err) + } + return hash, nil +} + +// Get fetches the object by its hash address. +func (s *objectStore) Get(hash string) ([]byte, error) { + ctx, cancel := s.ctx() + defer cancel() + data, err := s.os.GetBytes(ctx, hash) + if err != nil { + return nil, fmt.Errorf("blobstore: get object %q: %w", hash, err) + } + return data, nil +} + +// Has reports whether an object with the given hash exists. +func (s *objectStore) Has(hash string) bool { + ctx, cancel := s.ctx() + defer cancel() + _, err := s.os.GetInfo(ctx, hash) + return err == nil +} diff --git a/pkg/blobstore/objectstore_test.go b/pkg/blobstore/objectstore_test.go new file mode 100644 index 0000000..0544fd7 --- /dev/null +++ b/pkg/blobstore/objectstore_test.go @@ -0,0 +1,132 @@ +package blobstore_test + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "net" + "testing" + "time" + + "github.com/enmanuel/unibus/pkg/blobstore" + "github.com/enmanuel/unibus/pkg/embeddednats" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func objFreePort(t *testing.T) int { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("free port: %v", err) + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port +} + +// newObjectStore boots a single-node embedded NATS with JetStream and returns a +// replicated (R1) Object Store backend over it. +func newObjectStore(t *testing.T) blobstore.Store { + t.Helper() + ns, err := embeddednats.StartServer(embeddednats.ServerConfig{ + StoreDir: t.TempDir(), + Host: "127.0.0.1", + Port: objFreePort(t), + }) + if err != nil { + t.Fatalf("embedded nats: %v", err) + } + nc, err := nats.Connect(ns.ClientURL()) + if err != nil { + ns.Shutdown() + t.Fatalf("nats connect: %v", err) + } + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("jetstream: %v", err) + } + st, err := blobstore.NewObjectStore(js, blobstore.ObjectStoreConfig{Replicas: 1, OpTimeout: 5 * time.Second}) + if err != nil { + nc.Close() + ns.Shutdown() + t.Fatalf("new object store: %v", err) + } + t.Cleanup(func() { nc.Close(); ns.Shutdown(); ns.WaitForShutdown() }) + return st +} + +// TestObjectStoreRoundTrip is the golden path: put ciphertext, get it back by +// its hash, Has reports presence, and re-putting identical bytes returns the +// same address (content-addressed dedup). +func TestObjectStoreRoundTrip(t *testing.T) { + s := newObjectStore(t) + data := []byte("encrypted-media-ciphertext-payload") + + hash, err := s.Put(data) + if err != nil { + t.Fatalf("Put: %v", err) + } + want := hex.EncodeToString(sha256Sum(data)) + if hash != want { + t.Fatalf("hash = %q, want sha256 hex %q", hash, want) + } + got, err := s.Get(hash) + if err != nil { + t.Fatalf("Get: %v", err) + } + if !bytes.Equal(got, data) { + t.Fatalf("Get returned %q, want %q", got, data) + } + if !s.Has(hash) { + t.Fatalf("Has should be true for a stored blob") + } + // Re-put identical bytes: same address, no error. + hash2, err := s.Put(data) + if err != nil || hash2 != hash { + t.Fatalf("re-Put: hash2=%q err=%v (want %q)", hash2, err, hash) + } +} + +// TestObjectStoreMissing is the edge/error path: a hash that was never stored +// is absent and unreadable. +func TestObjectStoreMissing(t *testing.T) { + s := newObjectStore(t) + missing := hex.EncodeToString(sha256Sum([]byte("never stored"))) + if s.Has(missing) { + t.Fatalf("Has should be false for an unknown hash") + } + if _, err := s.Get(missing); err == nil { + t.Fatalf("Get of an unknown hash should error") + } +} + +// TestObjectStoreAddressMatchesDisk is the contract test: the Object Store and +// the disk backend address identical bytes to the IDENTICAL hash, so a client +// cannot tell which backend a node uses and a blob ref is portable across them. +func TestObjectStoreAddressMatchesDisk(t *testing.T) { + obj := newObjectStore(t) + disk, err := blobstore.New(t.TempDir()) + if err != nil { + t.Fatalf("disk store: %v", err) + } + for _, payload := range [][]byte{[]byte("a"), []byte("longer ciphertext blob \x00\x01\x02"), {}} { + oh, err := obj.Put(payload) + if err != nil { + t.Fatalf("object Put: %v", err) + } + dh, err := disk.Put(payload) + if err != nil { + t.Fatalf("disk Put: %v", err) + } + if oh != dh { + t.Fatalf("address mismatch for %q: object=%q disk=%q", payload, oh, dh) + } + } +} + +func sha256Sum(b []byte) []byte { + sum := sha256.Sum256(b) + return sum[:] +} diff --git a/pkg/membership/server.go b/pkg/membership/server.go index 75136cf..1b5758b 100644 --- a/pkg/membership/server.go +++ b/pkg/membership/server.go @@ -56,7 +56,7 @@ const ( // (mTLS, capabilities, rate limits) is a later phase. type Server struct { store Store - blobs *blobstore.Store + blobs blobstore.Store mux *http.ServeMux authMode AuthMode nonces *nonceCache @@ -78,7 +78,7 @@ type Server struct { // tests that have not migrated to signed requests yet). It installs a per-IP // rate limiter with the package defaults; loopback dev behavior is unchanged // because the burst comfortably exceeds any single client's request rate. -func NewServer(store Store, blobs *blobstore.Store, authMode AuthMode) *Server { +func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server { s := &Server{ store: store, blobs: blobs,