package main import ( "context" "fmt" "os" "time" "github.com/enmanuel/unibus/pkg/busauth" "github.com/enmanuel/unibus/pkg/client" "github.com/enmanuel/unibus/pkg/membership" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) // users_kv.go is the `--store kv` half of the user administration CLI (issue 0011 // gaps, GAP A): adding and listing bus users directly against the RUNNING // cluster's replicated JetStream KV allowlist, with no need to stop the cluster, // seed a standalone node, and restart (the procedure the 0011 deploy required). // // The mechanism is the cluster's own privileged internal connection. Under // enforce every bus user is confined by the per-subject ACL to the JetStream API // of its own rooms, so no ordinary identity may touch the control-plane buckets // (KV_UNIBUS_*). The ONLY identity the authenticator grants full JetStream // permissions is membershipd's internal service identity. By persisting that // identity to a file (membershipd --internal-id-file) the same key becomes // available to this CLI, which presents it as its NATS nkey and is therefore // recognized as the privileged internal client and allowed to read/write the KV. // // Intended invocation is over loopback on a cluster node (SSH): the data-plane // TLS certificate's SAN covers 127.0.0.1/localhost and the internal identity file // lives 0600 next to the node's TLS keys. Using the file requires root on the // node, which already implies full control of that node — so co-locating it adds // no practical exposure beyond what the TLS server key and cluster password // already represent. // defaultClusterNatsURL is the node-local NATS listener. The CLI is meant to run // on a cluster node over SSH, talking to that node's own embedded server. const defaultClusterNatsURL = "nats://127.0.0.1:4250" // Deploy-default paths for the privileged identity and the data-plane CA, so an // on-node invocation needs only --handle/--sign-pub/--role. Override for other // layouts. const ( defaultInternalIDFile = "/opt/unibus/secrets/internal.id" defaultClusterCAFile = "/opt/unibus/tls/ca.crt" ) // kvConn bundles the privileged NATS connection to a live cluster and the // KV-backed control-plane store opened over it. Close releases both. type kvConn struct { nc *nats.Conn js jetstream.JetStream store membership.Store } func (k *kvConn) Close() { if k == nil { return } if k.store != nil { _ = k.store.Close() } if k.nc != nil { k.nc.Close() } } // connectKVStore opens the privileged internal connection to the cluster's NATS // and the JetStream KV control-plane store on top of it. internalIDFile is the // membershipd-persisted internal service identity whose nkey the authenticator // grants full permissions; caPath pins the data-plane TLS (empty only for a // non-TLS dev cluster). A non-loopback target without --ca is refused, mirroring // migrate-to-kv (audit 0008 N6): the allowlist write must not travel in cleartext. func connectKVStore(natsURL, internalIDFile, caPath string, replicas int) (*kvConn, error) { if internalIDFile == "" { return nil, fmt.Errorf("--internal-id-file is required for --store kv (the privileged identity membershipd persists with --internal-id-file)") } // Confidentiality guard: a remote NATS without TLS would expose the allowlist // (handles/roles/sign-pubs) and the privileged nkey handshake in cleartext. if !isLoopbackURL(natsURL) && caPath == "" { return nil, fmt.Errorf("refusing to connect to remote %q without --ca: the allowlist write would travel in cleartext — pin TLS with --ca, or run over a loopback --nats-url on a node", natsURL) } id, err := client.LoadIdentity(internalIDFile) if err != nil { return nil, fmt.Errorf("load internal identity: %w", err) } nkeyPub, nkeySign, err := busauth.ClientNkey(id.SignPriv) if err != nil { return nil, fmt.Errorf("derive nkey from internal identity: %w", err) } opts := []nats.Option{ nats.Name("membershipd-user-cli"), nats.Nkey(nkeyPub, nkeySign), } if caPath != "" { tlsCfg, err := busauth.LoadCATLSConfig(caPath) if err != nil { return nil, fmt.Errorf("load CA %q: %w", caPath, err) } opts = append(opts, nats.Secure(tlsCfg)) } nc, err := nats.Connect(natsURL, opts...) if err != nil { return nil, fmt.Errorf("connect cluster NATS %q: %w", natsURL, err) } js, err := jetstream.New(nc) if err != nil { nc.Close() return nil, fmt.Errorf("jetstream: %w", err) } store, err := membership.OpenJetStream(js, membership.JetStreamConfig{Replicas: replicas}) if err != nil { nc.Close() return nil, fmt.Errorf("open KV control-plane store: %w", err) } return &kvConn{nc: nc, js: js, store: store}, nil } // reportKVReplication prints the replication status of the allowlist bucket // stream (KV_UNIBUS_users) right after a write, so the operator sees the add // landed on a quorum and replicated to the followers — executable evidence that // the live-cluster add is HA, not single-node. Best-effort: a read failure is a // note, not an error (the write itself already succeeded). func reportKVReplication(js jetstream.JetStream) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() st, err := js.Stream(ctx, "KV_UNIBUS_users") if err != nil { fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err) return } info, err := st.Info(ctx) if err != nil { fmt.Fprintf(os.Stderr, "note: could not read KV_UNIBUS_users stream info: %v\n", err) return } if info.Cluster == nil { fmt.Printf("KV_UNIBUS_users: standalone (R1, no cluster replication); msgs=%d\n", info.State.Msgs) return } current := 0 for _, r := range info.Cluster.Replicas { if r.Current { current++ } } fmt.Printf("KV_UNIBUS_users: leader=%s followers_current=%d/%d msgs=%d\n", info.Cluster.Leader, current, len(info.Cluster.Replicas), info.State.Msgs) }