Merge issue/0003d-objectstore: replicated blobs on NATS Object Store

This commit is contained in:
2026-06-07 15:12:45 +02:00
4 changed files with 263 additions and 12 deletions
+27 -10
View File
@@ -1,9 +1,15 @@
// Package blobstore is a content-addressed object store on local disk.
// Package blobstore is a content-addressed object store for media ciphertext.
//
// The bus transports messages, not blobs. Media (images, files, large payloads)
// is encrypted by the client BEFORE being stored here, so the store only ever
// sees ciphertext. Objects are addressed by the sha256 hex of their (encrypted)
// bytes, which makes Put idempotent and deduplicating.
//
// Store is an interface (branch-by-abstraction, issue 0003d) with two backends:
// diskStore (the default, local filesystem) and objectStore (NATS Object Store
// on JetStream, replicated across the cluster so blobs survive a node loss and
// are reachable from any node). The wire contract (sha256-hex addressing) is
// identical, so a client cannot tell which backend a membershipd uses.
package blobstore
import (
@@ -14,27 +20,38 @@ import (
"path/filepath"
)
// Store is a directory-backed content-addressed blob store.
type Store struct {
// Store is a content-addressed blob store: Put returns the sha256-hex address of
// the stored bytes, Get fetches by that address, Has reports presence.
type Store interface {
Put(data []byte) (string, error)
Get(hash string) ([]byte, error)
Has(hash string) bool
}
// diskStore is a directory-backed content-addressed blob store (the default,
// single-node backend).
type diskStore struct {
dir string
}
// New creates a Store rooted at dir, creating the directory if needed.
func New(dir string) (*Store, error) {
// New creates a disk-backed Store rooted at dir, creating the directory if
// needed. It remains the default backend; the replicated NATS Object Store is
// constructed separately (NewObjectStore) when decentralization is enabled.
func New(dir string) (Store, error) {
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("blobstore: mkdir %q: %w", dir, err)
}
return &Store{dir: dir}, nil
return &diskStore{dir: dir}, nil
}
// path returns the on-disk path for a given content hash.
func (s *Store) path(hash string) string {
func (s *diskStore) path(hash string) string {
return filepath.Join(s.dir, hash)
}
// Put writes data to the store and returns its sha256 hex hash. If an object
// with the same content already exists, Put is a no-op and returns the hash.
func (s *Store) Put(data []byte) (string, error) {
func (s *diskStore) Put(data []byte) (string, error) {
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
p := s.path(hash)
@@ -66,7 +83,7 @@ func (s *Store) Put(data []byte) (string, error) {
}
// Get reads the object with the given hash.
func (s *Store) Get(hash string) ([]byte, error) {
func (s *diskStore) Get(hash string) ([]byte, error) {
data, err := os.ReadFile(s.path(hash))
if err != nil {
return nil, fmt.Errorf("blobstore: get %q: %w", hash, err)
@@ -75,7 +92,7 @@ func (s *Store) Get(hash string) ([]byte, error) {
}
// Has reports whether an object with the given hash exists.
func (s *Store) Has(hash string) bool {
func (s *diskStore) Has(hash string) bool {
_, err := os.Stat(s.path(hash))
return err == nil
}
+102
View File
@@ -0,0 +1,102 @@
package blobstore
// objectStore is the NATS Object Store implementation of Store (issue 0003d):
// media ciphertext lives in a JetStream Object Store bucket replicated across
// the cluster, so a blob uploaded to one node is durable against the loss of a
// node and readable from any node. It is selected when decentralization is on;
// diskStore stays the single-node default. The content-addressing (sha256-hex)
// is identical to the disk backend, so the wire contract does not change.
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"time"
"github.com/nats-io/nats.go/jetstream"
)
const (
defaultObjectBucket = "UNIBUS_blobs"
defaultObjOpTime = 10 * time.Second
)
// ObjectStoreConfig configures the replicated Object Store backend.
type ObjectStoreConfig struct {
// Bucket is the object store bucket name; empty uses UNIBUS_blobs.
Bucket string
// Replicas is the replication factor (R1..R5), matching the KV store's
// R1->R3 rollout.
Replicas int
// OpTimeout bounds each object operation; zero uses defaultObjOpTime.
OpTimeout time.Duration
}
type objectStore struct {
os jetstream.ObjectStore
opTimeout time.Duration
}
// NewObjectStore creates (or opens) the replicated Object Store bucket on js and
// returns it as a Store. The JetStream context belongs to the caller.
func NewObjectStore(js jetstream.JetStream, cfg ObjectStoreConfig) (Store, error) {
if cfg.Bucket == "" {
cfg.Bucket = defaultObjectBucket
}
if cfg.Replicas <= 0 {
cfg.Replicas = 1
}
opTimeout := cfg.OpTimeout
if opTimeout <= 0 {
opTimeout = defaultObjOpTime
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
obj, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{
Bucket: cfg.Bucket,
Replicas: cfg.Replicas,
Storage: jetstream.FileStorage,
})
if err != nil {
return nil, fmt.Errorf("blobstore: open object store %q (replicas=%d): %w", cfg.Bucket, cfg.Replicas, err)
}
return &objectStore{os: obj, opTimeout: opTimeout}, nil
}
func (s *objectStore) ctx() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), s.opTimeout)
}
// Put stores data under its sha256-hex address. Re-putting identical bytes is a
// harmless overwrite (same address, same content), preserving the idempotent,
// deduplicating semantics of the disk backend.
func (s *objectStore) Put(data []byte) (string, error) {
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
ctx, cancel := s.ctx()
defer cancel()
if _, err := s.os.PutBytes(ctx, hash, data); err != nil {
return "", fmt.Errorf("blobstore: put object %q: %w", hash, err)
}
return hash, nil
}
// Get fetches the object by its hash address.
func (s *objectStore) Get(hash string) ([]byte, error) {
ctx, cancel := s.ctx()
defer cancel()
data, err := s.os.GetBytes(ctx, hash)
if err != nil {
return nil, fmt.Errorf("blobstore: get object %q: %w", hash, err)
}
return data, nil
}
// Has reports whether an object with the given hash exists.
func (s *objectStore) Has(hash string) bool {
ctx, cancel := s.ctx()
defer cancel()
_, err := s.os.GetInfo(ctx, hash)
return err == nil
}
+132
View File
@@ -0,0 +1,132 @@
package blobstore_test
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"net"
"testing"
"time"
"github.com/enmanuel/unibus/pkg/blobstore"
"github.com/enmanuel/unibus/pkg/embeddednats"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func objFreePort(t *testing.T) int {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("free port: %v", err)
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port
}
// newObjectStore boots a single-node embedded NATS with JetStream and returns a
// replicated (R1) Object Store backend over it.
func newObjectStore(t *testing.T) blobstore.Store {
t.Helper()
ns, err := embeddednats.StartServer(embeddednats.ServerConfig{
StoreDir: t.TempDir(),
Host: "127.0.0.1",
Port: objFreePort(t),
})
if err != nil {
t.Fatalf("embedded nats: %v", err)
}
nc, err := nats.Connect(ns.ClientURL())
if err != nil {
ns.Shutdown()
t.Fatalf("nats connect: %v", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("jetstream: %v", err)
}
st, err := blobstore.NewObjectStore(js, blobstore.ObjectStoreConfig{Replicas: 1, OpTimeout: 5 * time.Second})
if err != nil {
nc.Close()
ns.Shutdown()
t.Fatalf("new object store: %v", err)
}
t.Cleanup(func() { nc.Close(); ns.Shutdown(); ns.WaitForShutdown() })
return st
}
// TestObjectStoreRoundTrip is the golden path: put ciphertext, get it back by
// its hash, Has reports presence, and re-putting identical bytes returns the
// same address (content-addressed dedup).
func TestObjectStoreRoundTrip(t *testing.T) {
s := newObjectStore(t)
data := []byte("encrypted-media-ciphertext-payload")
hash, err := s.Put(data)
if err != nil {
t.Fatalf("Put: %v", err)
}
want := hex.EncodeToString(sha256Sum(data))
if hash != want {
t.Fatalf("hash = %q, want sha256 hex %q", hash, want)
}
got, err := s.Get(hash)
if err != nil {
t.Fatalf("Get: %v", err)
}
if !bytes.Equal(got, data) {
t.Fatalf("Get returned %q, want %q", got, data)
}
if !s.Has(hash) {
t.Fatalf("Has should be true for a stored blob")
}
// Re-put identical bytes: same address, no error.
hash2, err := s.Put(data)
if err != nil || hash2 != hash {
t.Fatalf("re-Put: hash2=%q err=%v (want %q)", hash2, err, hash)
}
}
// TestObjectStoreMissing is the edge/error path: a hash that was never stored
// is absent and unreadable.
func TestObjectStoreMissing(t *testing.T) {
s := newObjectStore(t)
missing := hex.EncodeToString(sha256Sum([]byte("never stored")))
if s.Has(missing) {
t.Fatalf("Has should be false for an unknown hash")
}
if _, err := s.Get(missing); err == nil {
t.Fatalf("Get of an unknown hash should error")
}
}
// TestObjectStoreAddressMatchesDisk is the contract test: the Object Store and
// the disk backend address identical bytes to the IDENTICAL hash, so a client
// cannot tell which backend a node uses and a blob ref is portable across them.
func TestObjectStoreAddressMatchesDisk(t *testing.T) {
obj := newObjectStore(t)
disk, err := blobstore.New(t.TempDir())
if err != nil {
t.Fatalf("disk store: %v", err)
}
for _, payload := range [][]byte{[]byte("a"), []byte("longer ciphertext blob \x00\x01\x02"), {}} {
oh, err := obj.Put(payload)
if err != nil {
t.Fatalf("object Put: %v", err)
}
dh, err := disk.Put(payload)
if err != nil {
t.Fatalf("disk Put: %v", err)
}
if oh != dh {
t.Fatalf("address mismatch for %q: object=%q disk=%q", payload, oh, dh)
}
}
}
func sha256Sum(b []byte) []byte {
sum := sha256.Sum256(b)
return sum[:]
}
+2 -2
View File
@@ -56,7 +56,7 @@ const (
// (mTLS, capabilities, rate limits) is a later phase.
type Server struct {
store Store
blobs *blobstore.Store
blobs blobstore.Store
mux *http.ServeMux
authMode AuthMode
nonces *nonceCache
@@ -78,7 +78,7 @@ type Server struct {
// tests that have not migrated to signed requests yet). It installs a per-IP
// rate limiter with the package defaults; loopback dev behavior is unchanged
// because the burst comfortably exceeds any single client's request rate.
func NewServer(store Store, blobs *blobstore.Store, authMode AuthMode) *Server {
func NewServer(store Store, blobs blobstore.Store, authMode AuthMode) *Server {
s := &Server{
store: store,
blobs: blobs,