package blobstore // objectStore is the NATS Object Store implementation of Store (issue 0003d): // media ciphertext lives in a JetStream Object Store bucket replicated across // the cluster, so a blob uploaded to one node is durable against the loss of a // node and readable from any node. It is selected when decentralization is on; // diskStore stays the single-node default. The content-addressing (sha256-hex) // is identical to the disk backend, so the wire contract does not change. import ( "context" "crypto/sha256" "encoding/hex" "fmt" "time" "github.com/nats-io/nats.go/jetstream" ) const ( defaultObjectBucket = "UNIBUS_blobs" defaultObjOpTime = 10 * time.Second ) // ObjectStoreConfig configures the replicated Object Store backend. type ObjectStoreConfig struct { // Bucket is the object store bucket name; empty uses UNIBUS_blobs. Bucket string // Replicas is the replication factor (R1..R5), matching the KV store's // R1->R3 rollout. Replicas int // OpTimeout bounds each object operation; zero uses defaultObjOpTime. OpTimeout time.Duration } type objectStore struct { os jetstream.ObjectStore opTimeout time.Duration } // NewObjectStore creates (or opens) the replicated Object Store bucket on js and // returns it as a Store. The JetStream context belongs to the caller. func NewObjectStore(js jetstream.JetStream, cfg ObjectStoreConfig) (Store, error) { if cfg.Bucket == "" { cfg.Bucket = defaultObjectBucket } if cfg.Replicas <= 0 { cfg.Replicas = 1 } opTimeout := cfg.OpTimeout if opTimeout <= 0 { opTimeout = defaultObjOpTime } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() obj, err := js.CreateOrUpdateObjectStore(ctx, jetstream.ObjectStoreConfig{ Bucket: cfg.Bucket, Replicas: cfg.Replicas, Storage: jetstream.FileStorage, }) if err != nil { return nil, fmt.Errorf("blobstore: open object store %q (replicas=%d): %w", cfg.Bucket, cfg.Replicas, err) } return &objectStore{os: obj, opTimeout: opTimeout}, nil } func (s *objectStore) ctx() (context.Context, context.CancelFunc) { return context.WithTimeout(context.Background(), s.opTimeout) } // Put stores data under its sha256-hex address. Re-putting identical bytes is a // harmless overwrite (same address, same content), preserving the idempotent, // deduplicating semantics of the disk backend. func (s *objectStore) Put(data []byte) (string, error) { sum := sha256.Sum256(data) hash := hex.EncodeToString(sum[:]) ctx, cancel := s.ctx() defer cancel() if _, err := s.os.PutBytes(ctx, hash, data); err != nil { return "", fmt.Errorf("blobstore: put object %q: %w", hash, err) } return hash, nil } // Get fetches the object by its hash address. func (s *objectStore) Get(hash string) ([]byte, error) { ctx, cancel := s.ctx() defer cancel() data, err := s.os.GetBytes(ctx, hash) if err != nil { return nil, fmt.Errorf("blobstore: get object %q: %w", hash, err) } return data, nil } // Has reports whether an object with the given hash exists. func (s *objectStore) Has(hash string) bool { ctx, cancel := s.ctx() defer cancel() _, err := s.os.GetInfo(ctx, hash) return err == nil }