feat(0003d): replicated blob store on NATS Object Store

Branch-by-abstraction for the blob store (issue 0003d): media ciphertext
can live in a replicated JetStream Object Store instead of local disk, so
a blob uploaded to one node survives a node loss and is reachable from
any node.

pkg/blobstore:
- Store is now an interface (Put/Get/Has). The filesystem backend is
  renamed diskStore and stays the default: New(dir) returns it.
- objectStore (new) implements Store over a NATS Object Store bucket with
  a configurable replication factor (R1..R5), matching the KV store's
  R1->R3 rollout. Content-addressing (sha256-hex) is identical, so the
  wire contract is unchanged.

pkg/membership:
- Server.blobs and NewServer take the blobstore.Store interface instead
  of the concrete type; no behavior change with the disk default.

Tests (DoD: golden + edge + contract):
- TestObjectStoreRoundTrip: put/get/has + content-addressed dedup.
- TestObjectStoreMissing: unknown hash is absent and unreadable.
- TestObjectStoreAddressMatchesDisk: the Object Store and disk backends
  address identical bytes to the IDENTICAL hash (portable blob refs).

Like the KV store (0003b), wiring membershipd to select the Object Store
is deferred to the decentralized boot path (flag off); disk stays default.
This commit is contained in:
agent
2026-06-07 15:12:45 +02:00
parent 94e7ced1ef
commit d6e668b984
4 changed files with 263 additions and 12 deletions
+27 -10
View File
@@ -1,9 +1,15 @@
// Package blobstore is a content-addressed object store on local disk.
// Package blobstore is a content-addressed object store for media ciphertext.
//
// The bus transports messages, not blobs. Media (images, files, large payloads)
// is encrypted by the client BEFORE being stored here, so the store only ever
// sees ciphertext. Objects are addressed by the sha256 hex of their (encrypted)
// bytes, which makes Put idempotent and deduplicating.
//
// Store is an interface (branch-by-abstraction, issue 0003d) with two backends:
// diskStore (the default, local filesystem) and objectStore (NATS Object Store
// on JetStream, replicated across the cluster so blobs survive a node loss and
// are reachable from any node). The wire contract (sha256-hex addressing) is
// identical, so a client cannot tell which backend a membershipd uses.
package blobstore
import (
@@ -14,27 +20,38 @@ import (
"path/filepath"
)
// Store is a directory-backed content-addressed blob store.
type Store struct {
// Store is a content-addressed blob store: Put returns the sha256-hex address of
// the stored bytes, Get fetches by that address, Has reports presence.
type Store interface {
Put(data []byte) (string, error)
Get(hash string) ([]byte, error)
Has(hash string) bool
}
// diskStore is a directory-backed content-addressed blob store (the default,
// single-node backend).
type diskStore struct {
dir string
}
// New creates a Store rooted at dir, creating the directory if needed.
func New(dir string) (*Store, error) {
// New creates a disk-backed Store rooted at dir, creating the directory if
// needed. It remains the default backend; the replicated NATS Object Store is
// constructed separately (NewObjectStore) when decentralization is enabled.
func New(dir string) (Store, error) {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("blobstore: mkdir %q: %w", dir, err)
}
return &Store{dir: dir}, nil
return &diskStore{dir: dir}, nil
}
// path returns the on-disk path for a given content hash.
func (s *Store) path(hash string) string {
func (s *diskStore) path(hash string) string {
return filepath.Join(s.dir, hash)
}
// Put writes data to the store and returns its sha256 hex hash. If an object
// with the same content already exists, Put is a no-op and returns the hash.
func (s *Store) Put(data []byte) (string, error) {
func (s *diskStore) Put(data []byte) (string, error) {
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
p := s.path(hash)
@@ -66,7 +83,7 @@ func (s *Store) Put(data []byte) (string, error) {
}
// Get reads the object with the given hash.
func (s *Store) Get(hash string) ([]byte, error) {
func (s *diskStore) Get(hash string) ([]byte, error) {
data, err := os.ReadFile(s.path(hash))
if err != nil {
return nil, fmt.Errorf("blobstore: get %q: %w", hash, err)
@@ -75,7 +92,7 @@ func (s *Store) Get(hash string) ([]byte, error) {
}
// Has reports whether an object with the given hash exists.
func (s *Store) Has(hash string) bool {
func (s *diskStore) Has(hash string) bool {
_, err := os.Stat(s.path(hash))
return err == nil
}
+102
View File
@@ -0,0 +1,102 @@
package blobstore
// objectStore is the NATS Object Store implementation of Store (issue 0003d):
// media ciphertext lives in a JetStream Object Store bucket replicated across
// the cluster, so a blob uploaded to one node is durable against the loss of a
// node and readable from any node. It is selected when decentralization is on;
// diskStore stays the single-node default. The content-addressing (sha256-hex)
// is identical to the disk backend, so the wire contract does not change.
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"time"
"github.com/nats-io/nats.go/jetstream"
)
const (
defaultObjectBucket = "UNIBUS_blobs"
defaultObjOpTime = 10 * time.Second
)
// ObjectStoreConfig configures the replicated Object Store backend.
type ObjectStoreConfig struct {
// Bucket is the object store bucket name; empty uses UNIBUS_blobs.
Bucket string
// Replicas is the replication factor (R1..R5), matching the KV store's
// R1->R3 rollout.
Replicas int
// OpTimeout bounds each object operation; zero uses defaultObjOpTime.
OpTimeout time.Duration
}
type objectStore struct {
os jetstream.ObjectStore
opTimeout time.Duration
}
// NewObjectStore creates (or opens) the replicated Object Store bucket on js and
// returns it as a Store. The JetStream context belongs to the caller.
func NewObjectStore(js jetstream.JetStream, cfg ObjectStoreConfig) (Store, error) {
if cfg.Bucket == "" {
cfg.Bucket = defaultObjectBucket
}
if cfg.Replicas <= 0 {
cfg.Replicas = 1
}
opTimeout := cfg.OpTimeout
if opTimeout <= 0 {
opTimeout = defaultObjOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
obj, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: cfg.Bucket,
Replicas: cfg.Replicas,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("blobstore: open object store %q (replicas=%d): %w", cfg.Bucket, cfg.Replicas, err)
}
return &objectStore{os: obj, opTimeout: opTimeout}, nil
}
func (s *objectStore) ctx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), s.opTimeout)
}
// Put stores data under its sha256-hex address. Re-putting identical bytes is a
// harmless overwrite (same address, same content), preserving the idempotent,
// deduplicating semantics of the disk backend.
func (s *objectStore) Put(data []byte) (string, error) {
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.os.PutBytes(ctx, hash, data); err != nil {
return "", fmt.Errorf("blobstore: put object %q: %w", hash, err)
}
return hash, nil
}
// Get fetches the object by its hash address.
func (s *objectStore) Get(hash string) ([]byte, error) {
ctx, cancel := s.ctx()
defer cancel()
data, err := s.os.GetBytes(ctx, hash)
if err != nil {
return nil, fmt.Errorf("blobstore: get object %q: %w", hash, err)
}
return data, nil
}
// Has reports whether an object with the given hash exists.
func (s *objectStore) Has(hash string) bool {
ctx, cancel := s.ctx()
defer cancel()
_, err := s.os.GetInfo(ctx, hash)
return err == nil
}
+132
View File
@@ -0,0 +1,132 @@
package blobstore_test
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"net"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func objFreePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// newObjectStore boots a single-node embedded NATS with JetStream and returns a
// replicated (R1) Object Store backend over it.
func newObjectStore(t *testing.T) blobstore.Store {
t.Helper()
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: objFreePort(t),
})
if err != nil {
t.Fatalf("embedded nats: %v", err)
}
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
ns.Shutdown()
t.Fatalf("nats connect: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("jetstream: %v", err)
}
st, err := blobstore.NewObjectStore(js, blobstore.ObjectStoreConfig{Replicas: 1, OpTimeout: 5 * time.Second})
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("new object store: %v", err)
}
t.Cleanup(func() { nc.Close(); ns.Shutdown(); ns.WaitForShutdown() })
return st
}
// TestObjectStoreRoundTrip is the golden path: put ciphertext, get it back by
// its hash, Has reports presence, and re-putting identical bytes returns the
// same address (content-addressed dedup).
func TestObjectStoreRoundTrip(t *testing.T) {
s := newObjectStore(t)
data := []byte("encrypted-media-ciphertext-payload")
hash, err := s.Put(data)
if err != nil {
t.Fatalf("Put: %v", err)
}
want := hex.EncodeToString(sha256Sum(data))
if hash != want {
t.Fatalf("hash = %q, want sha256 hex %q", hash, want)
}
got, err := s.Get(hash)
if err != nil {
t.Fatalf("Get: %v", err)
}
if !bytes.Equal(got, data) {
t.Fatalf("Get returned %q, want %q", got, data)
}
if !s.Has(hash) {
t.Fatalf("Has should be true for a stored blob")
}
// Re-put identical bytes: same address, no error.
hash2, err := s.Put(data)
if err != nil || hash2 != hash {
t.Fatalf("re-Put: hash2=%q err=%v (want %q)", hash2, err, hash)
}
}
// TestObjectStoreMissing is the edge/error path: a hash that was never stored
// is absent and unreadable.
func TestObjectStoreMissing(t *testing.T) {
s := newObjectStore(t)
missing := hex.EncodeToString(sha256Sum([]byte("never stored")))
if s.Has(missing) {
t.Fatalf("Has should be false for an unknown hash")
}
if _, err := s.Get(missing); err == nil {
t.Fatalf("Get of an unknown hash should error")
}
}
// TestObjectStoreAddressMatchesDisk is the contract test: the Object Store and
// the disk backend address identical bytes to the IDENTICAL hash, so a client
// cannot tell which backend a node uses and a blob ref is portable across them.
func TestObjectStoreAddressMatchesDisk(t *testing.T) {
obj := newObjectStore(t)
disk, err := blobstore.New(t.TempDir())
if err != nil {
t.Fatalf("disk store: %v", err)
}
for _, payload := range [][]byte{[]byte("a"), []byte("longer ciphertext blob \x00\x01\x02"), {}} {
oh, err := obj.Put(payload)
if err != nil {
t.Fatalf("object Put: %v", err)
}
dh, err := disk.Put(payload)
if err != nil {
t.Fatalf("disk Put: %v", err)
}
if oh != dh {
t.Fatalf("address mismatch for %q: object=%q disk=%q", payload, oh, dh)
}
}
}
func sha256Sum(b []byte) []byte {
sum := sha256.Sum256(b)
return sum[:]
}
+2 -2
View File
@@ -56,7 +56,7 @@ const (
// (mTLS, capabilities, rate limits) is a later phase.
type Server struct {
store Store
blobs *blobstore.Store
blobs blobstore.Store
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
@@ -78,7 +78,7 @@ type Server struct {
// tests that have not migrated to signed requests yet). It installs a per-IP
// rate limiter with the package defaults; loopback dev behavior is unchanged
// because the burst comfortably exceeds any single client's request rate.
func NewServer(store Store, blobs *blobstore.Store, authMode AuthMode) *Server {
func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
s := &Server{
store: store,
blobs: blobs,