13 Commits

Author SHA1 Message Date
egutierrez 0dde60a05e merge: f0 room discovery — GET /members/{endpoint}/rooms + ListMyRooms
Base para Matrix-out de agents_and_robots: un bot descubre por polling las rooms
cifradas a las que lo invitaron. Aditivo, tests verdes. Bump 0.4.0.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 03:08:07 +02:00
egutierrez 7fab473bc3 docs(app): bump to 0.4.0 — room discovery endpoint growth log
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 03:08:07 +02:00
egutierrez 92d4e4cb97 feat(membership): room discovery — GET /members/{endpoint}/rooms + ListMyRooms
A peer invited to an encrypted room needs to find it: the control plane is
pull-based (no server push of invitations), so add a discovery endpoint that
lists every room an endpoint belongs to, with the room's metadata and the
endpoint's role.

- store.ListRoomsForEndpoint: JOIN members+rooms, ordered by room id, empty
  slice (not error) for an endpoint in no rooms.
- membershipd: GET /members/{endpoint}/rooms returns {room_id, subject, epoch,
  policy, role}[].
- client.ListMyRooms + RoomRef: a bot polls this to discover and then Join +
  Subscribe rooms it was invited to.

Tests: store-level (owner in N rooms, member in one, unknown endpoint → []) and
client-level e2e through the embedded harness (B discovers a room A invited it
to, without prior knowledge of the room id; owner sees role=owner).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-07 03:07:34 +02:00
egutierrez ab4b099ab1 merge: cliente web + móvil de unibus
SPA de chat (web/, React+Vite+Mantine v9) contra el gateway, y app Android
nativa (android/, Kotlin+Compose) sobre el binding gomobile, con E2E en el
dispositivo. Amplía el binding (Card/Invite/Kick) y el gateway (rooms/members
+ CORS). Verificado end-to-end: chat cifrado en vivo entre dos pestañas web y
envío/recepción en el AVD Pixel_API34. Ver reports/0002.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:43:31 +02:00
egutierrez a11d67cf70 feat(android): app Kotlin/Compose sobre el binding gomobile
Cliente móvil nativo: embebe un peer real del bus (unibus.aar), de modo que
el cifrado E2E y el transporte NATS corren en el dispositivo.

- Conexión: Host (control plane) + NATS (data plane) + identidad; defaults
  10.0.2.2 para el emulador, configurables (sin IPs hardcodeadas).
- BusViewModel: llamadas de red del binding en Dispatchers.IO; los frames
  entrantes (FrameListener.onFrame, hilo NATS) se publican en un StateFlow
  thread-safe que Compose recolecta en el hilo principal.
- Chat: crear/unir room (toggle cifrado), enviar, recibir.
- El .aar es artefacto (gitignored); se regenera con gomobile bind (README).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:43:10 +02:00
egutierrez d33ca6278a feat(web): SPA de chat (React + Vite + Mantine v9)
Cliente web sobre el gateway (REST + SSE). El navegador no habla NATS ni
cripto: el peer Go del gateway lo hace.

- Pantalla de conexión: gateway URL + identidad (persistidas en localStorage).
- Navbar: crear room (con toggle de cifrado E2E), unirse por id, lista de rooms.
- Centro: mensajes en vivo por SSE, burbujas con autor y hora, composer.
- Lateral: miembros (rol owner), invitar por peer conectado, expulsar (owner).
- Mantine v9 (createTheme + MantineProvider), @tabler/icons-react, layout con
  AppShell/Stack/Group; sin Tailwind ni CSS manual. React 19 (peer dep de v9).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:43:10 +02:00
egutierrez 915f926136 feat(playground): endpoints rooms/members + CORS para la SPA
Madura el gateway web para servir a una SPA en otro origen:
- GET /api/rooms?peer=: rooms que conoce un peer (creadas o unidas).
- GET /api/members?room_id=: proxy al control plane (endpoint + rol).
- withCORS: middleware con preflight OPTIONS y headers permisivos para el
  dev server de Vite (mismo modelo de confianza de red que el control plane).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:42:56 +02:00
egutierrez 12fc77f25a feat(mobile): Card/Invite/Kick en el binding gomobile
Añade al binding plano sobre pkg/client:
- Card(): exporta la identidad pública del peer (id + sign_pub + kex_pub)
  como JSON portable, para intercambio peer-a-peer (paste/QR) sin gateway.
- Invite(roomID, peerCard): parsea una Card y sella la clave de room al
  invitado (delega en client.Invite).
- Kick(roomID, endpointID): expulsa y rota la clave (forward secrecy).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:42:56 +02:00
egutierrez 69079d17d5 docs(app): bump to 0.3.0 — service hardening + frame threading growth log
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:30:55 +02:00
egutierrez 22092834bd feat(frame): additive threading — ThreadID, ReplyTo + REACT type
Chat bots need replies, threads and reactions. Add two optional, omitempty
envelope fields (ThreadID, ReplyTo) plus a REACT frame type. The fields ride the
cleartext envelope (message-id references, not secret content) and are omitted
when unset, so non-threaded frames are byte-for-byte identical on the wire and
their signatures unchanged — a non-breaking, additive change.

Client gains PublishReply (threaded reply) and React (emoji reaction). The
reaction content travels in the payload, so it is sealed like any message and
stays confidential in E2E rooms; receivers dispatch on Frame.Type == REACT and
read Frame.ReplyTo for the target. Publish is refactored to share one
publishFrame path with the new helpers; its behavior is unchanged.

Tests: frame round-trip of a threaded REACT frame (golden), non-threaded
wire/sig back-compat asserting thr/re keys are absent (edge), Unmarshal of
garbage errors (error path), and an end-to-end reply+reaction round-trip in an
encrypted ModeMatrix room.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:10:44 +02:00
egutierrez b2e6712dd2 docs(app): fill service: block — systemd-user, Restart=always, LAN-reachable
membershipd now ships as a systemd user service (unit unibus-membershipd.service,
restart_policy always, runtime systemd-user). is_local_only flips to false since
--bind 0.0.0.0 makes both planes LAN-reachable. fn doctor services-spec: OK, no
drift.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:06:33 +02:00
egutierrez f6b53620e9 feat(deploy): systemd user unit + install script for membershipd
Add deploy/unibus-membershipd.service (Restart=always, binds both planes to
0.0.0.0 for LAN reachability), an idempotent deploy/install.sh that builds the
binary, symlinks the unit, and enables+starts it, plus deploy/README.md with
operate/health instructions.

Restart=always is deliberate: a clean SIGTERM exits 0 and Restart=on-failure
would not restart it, leaving the service silently dead (the sqlite_api gotcha).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:05:53 +02:00
egutierrez 01f8988cc3 feat(membershipd): --bind flag governs HTTP + embedded NATS interface
Add a --bind flag (default 127.0.0.1) to membershipd that controls which
network interface both the control-plane HTTP API and the embedded NATS data
plane listen on. Use 0.0.0.0 to expose the stack to the LAN so remote peers
(phones, other PCs) can connect; keep the default for a loopback-only dev stack.

embeddednats gains StartHost(storeDir, host, port) for explicit interface
control; Start stays a backward-compatible wrapper (host "" = nats default
0.0.0.0) so the playground and tests are untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-06 18:05:05 +02:00
51 changed files with 4422 additions and 30 deletions
+12
View File
@@ -0,0 +1,12 @@
.gradle/
build/
local.properties
*.iml
.idea/
captures/
.cxx/
# The gomobile binding is a build artifact (~24 MB). Regenerate it from ../mobile
# with `gomobile bind` (see README.md); it is not versioned.
app/libs/*.aar
app/libs/*.jar
+83
View File
@@ -0,0 +1,83 @@
# unibus · app Android
Cliente móvil nativo de unibus. La app no habla con un gateway: embebe un **peer
real** del bus a través del binding gomobile `mobile/unibus.go`, de modo que el
cifrado extremo a extremo corre **en el dispositivo**. Cada teléfono es un peer
de primera clase del bus, igual que cualquier peer Go.
## Arquitectura
```
Kotlin/Compose UI ──> BusViewModel ──> com.unibus.core.mobile.Session (.aar)
│ (NATS data plane + E2E crypto, en Go)
membershipd (control plane HTTP :8470)
NATS (data plane :4250)
```
- `BusViewModel` traduce intents de UI en llamadas al binding. Las llamadas de red
(`newSession`, `createRoom`, `join`, `publish`) corren en `Dispatchers.IO`.
- Los frames entrantes llegan por `FrameListener.onFrame` en una goroutine NATS
(hilo JNI); se publican en un `StateFlow` (thread-safe) que Compose recolecta en
el hilo principal.
## Requisitos
- Android SDK (compileSdk 34), NDK (para regenerar el `.aar`), JDK 17.
- El binding `app/libs/unibus.aar` (no versionado: es un artefacto de ~24 MB).
## 1. Generar el binding (.aar)
Desde la raíz del repo de la app (`projects/message_bus/apps/unibus`):
```bash
export ANDROID_HOME=$HOME/android-sdk
export ANDROID_NDK_HOME=$HOME/android-sdk/ndk/26.3.11579264
mkdir -p android/app/libs
gomobile bind -target=android -androidapi 21 -javapkg com.unibus.core \
-o android/app/libs/unibus.aar ./mobile
```
Esto produce `unibus.aar` con la clase estática `com.unibus.core.mobile.Mobile`
(`generateIdentity`, `newSession`) y los tipos `Session` y `FrameListener`.
## 2. Compilar el APK
```bash
cd android
export JAVA_HOME=$HOME/android-sdk/jdk-17/jdk-17.0.19+10
export ANDROID_HOME=$HOME/android-sdk
./gradlew assembleDebug
# APK: app/build/outputs/apk/debug/app-debug.apk
```
`local.properties` apunta a `sdk.dir`; ajústalo si tu SDK está en otra ruta.
## 3. Arrancar el bus y probar en el emulador
```bash
# 1. En el PC: control plane + NATS embebido (HTTP :8470, NATS :4250)
cd projects/message_bus/apps/unibus && go run ./cmd/membershipd
# 2. Emulador Pixel_API34
$ANDROID_HOME/emulator/emulator -avd Pixel_API34 &
# 3. Instalar + lanzar
adb install -r app/build/outputs/apk/debug/app-debug.apk
adb shell am start -n com.unibus.app/.MainActivity
```
En la pantalla de conexión, desde el emulador el host del PC es `10.0.2.2`:
- **Host (control plane):** `http://10.0.2.2:8470`
- **NATS (data plane):** `nats://10.0.2.2:4250`
Para un teléfono físico en la misma LAN, usa la IP LAN del PC en lugar de
`10.0.2.2`.
## Notas
- La identidad del peer se guarda en `filesDir/peer.id` (claves privadas
Ed25519 + X25519). No se sincroniza ni se respalda.
- Una room creada en modo "cifrar (E2E)" usa la política Matrix (cifrada,
persistida, firmada); en modo normal usa NATS cleartext.
+66
View File
@@ -0,0 +1,66 @@
plugins {
id("com.android.application")
id("org.jetbrains.kotlin.android")
id("org.jetbrains.kotlin.plugin.compose")
}
android {
namespace = "com.unibus.app"
compileSdk = 34
defaultConfig {
applicationId = "com.unibus.app"
minSdk = 21
targetSdk = 34
versionCode = 1
versionName = "0.1.0"
}
buildFeatures {
compose = true
}
compileOptions {
sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
}
kotlinOptions {
jvmTarget = "17"
}
buildTypes {
getByName("release") {
isMinifyEnabled = false
proguardFiles(
getDefaultProguardFile("proguard-android-optimize.txt"),
"proguard-rules.pro",
)
}
}
packaging {
resources {
excludes += "/META-INF/{AL2.0,LGPL2.1}"
}
}
}
dependencies {
// The unibus gomobile binding: a real bus peer that does NATS + E2E crypto
// on the device. All protocol logic lives here, shared with every other peer.
implementation(files("libs/unibus.aar"))
val composeBom = platform("androidx.compose:compose-bom:2024.09.03")
implementation(composeBom)
implementation("androidx.compose.ui:ui")
implementation("androidx.compose.ui:ui-graphics")
implementation("androidx.compose.ui:ui-tooling-preview")
implementation("androidx.compose.material3:material3")
implementation("androidx.compose.material:material-icons-extended")
implementation("androidx.activity:activity-compose:1.9.2")
implementation("androidx.lifecycle:lifecycle-viewmodel-compose:2.8.6")
implementation("androidx.lifecycle:lifecycle-runtime-ktx:2.8.6")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.1")
debugImplementation("androidx.compose.ui:ui-tooling")
}
+4
View File
@@ -0,0 +1,4 @@
# gomobile generates JNI-bound classes under com.unibus.core.mobile and go.*.
# They are reached from native code, so keep them intact even when minifying.
-keep class com.unibus.core.mobile.** { *; }
-keep class go.** { *; }
+25
View File
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android">
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
android:allowBackup="true"
android:label="@string/app_name"
android:supportsRtl="true"
android:usesCleartextTraffic="true"
android:theme="@style/Theme.Unibus">
<activity
android:name=".MainActivity"
android:exported="true"
android:label="@string/app_name"
android:windowSoftInputMode="adjustResize">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
</application>
</manifest>
@@ -0,0 +1,162 @@
package com.unibus.app
import android.app.Application
import androidx.lifecycle.AndroidViewModel
import androidx.lifecycle.viewModelScope
import com.unibus.core.mobile.FrameListener
import com.unibus.core.mobile.Mobile
import com.unibus.core.mobile.Session
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import java.io.File
/** One chat message shown in the UI. */
data class ChatMessage(
val sender: String,
val text: String,
val mine: Boolean,
val ts: Long,
)
/** The whole observable UI state of the app. */
data class BusState(
val connecting: Boolean = false,
val connected: Boolean = false,
val endpointId: String = "",
val roomId: String = "",
val roomSubject: String = "",
val status: String = "",
val error: String? = null,
val messages: List<ChatMessage> = emptyList(),
)
/**
* BusViewModel drives a real unibus peer on the device through the gomobile
* binding. The binding performs NATS transport and end-to-end crypto natively;
* this class only translates UI intents into binding calls and exposes the
* incoming frames as observable state.
*
* Threading: every binding call that touches the network (newSession, createRoom,
* join, publish) runs off the main thread on Dispatchers.IO to avoid
* NetworkOnMainThreadException. Incoming frames arrive on a JNI-attached NATS
* goroutine via [onFrame]; we only append to a thread-safe StateFlow there, and
* Compose collects that flow on the main thread.
*/
class BusViewModel(app: Application) : AndroidViewModel(app), FrameListener {
private val _state = MutableStateFlow(BusState())
val state: StateFlow<BusState> = _state.asStateFlow()
private var session: Session? = null
private var myEndpoint: String = ""
private val idPath: String
get() = File(getApplication<Application>().filesDir, "peer.id").absolutePath
override fun onFrame(roomID: String, sender: String, msgID: String, text: String) {
_state.update {
it.copy(
messages = it.messages + ChatMessage(
sender = sender,
text = text,
mine = sender == myEndpoint,
ts = System.currentTimeMillis(),
),
)
}
}
fun connect(host: String, nats: String, peerName: String) {
if (_state.value.connecting) return
_state.update { it.copy(connecting = true, error = null, status = "Conectando…") }
viewModelScope.launch(Dispatchers.IO) {
try {
val s = Mobile.newSession(idPath, nats.trim(), host.trim())
session = s
myEndpoint = s.endpointID()
_state.update {
it.copy(
connecting = false,
connected = true,
endpointId = myEndpoint,
status = "Conectado como $peerName",
)
}
} catch (e: Exception) {
_state.update {
it.copy(connecting = false, connected = false, error = e.message ?: "error desconocido")
}
}
}
}
fun createRoom(subject: String, encrypted: Boolean) {
val s = session ?: return
viewModelScope.launch(Dispatchers.IO) {
try {
val mode = if (encrypted) "matrix" else "nats"
val roomId = s.createRoom(subject.trim(), mode)
s.subscribe(roomId, this@BusViewModel)
_state.update {
it.copy(
roomId = roomId,
roomSubject = subject.trim(),
messages = emptyList(),
status = "Room creada",
)
}
} catch (e: Exception) {
_state.update { it.copy(error = e.message ?: "error al crear room") }
}
}
}
fun joinRoom(roomId: String) {
val s = session ?: return
viewModelScope.launch(Dispatchers.IO) {
try {
val rid = roomId.trim()
s.join(rid)
s.subscribe(rid, this@BusViewModel)
_state.update {
it.copy(roomId = rid, roomSubject = "(unida)", messages = emptyList(), status = "Unido a la room")
}
} catch (e: Exception) {
_state.update { it.copy(error = e.message ?: "error al unirse") }
}
}
}
fun publish(text: String) {
val s = session ?: return
val room = _state.value.roomId
if (room.isEmpty() || text.isBlank()) return
viewModelScope.launch(Dispatchers.IO) {
try {
s.publish(room, text)
} catch (e: Exception) {
_state.update { it.copy(error = e.message ?: "error al publicar") }
}
}
}
/** card returns this peer's shareable public identity (no secret). */
fun card(): String = try {
session?.card() ?: ""
} catch (_: Exception) {
""
}
fun clearError() = _state.update { it.copy(error = null) }
override fun onCleared() {
try {
session?.close()
} catch (_: Exception) {
}
session = null
}
}
@@ -0,0 +1,307 @@
package com.unibus.app
import android.os.Bundle
import androidx.activity.ComponentActivity
import androidx.activity.compose.setContent
import androidx.activity.viewModels
import androidx.compose.foundation.layout.Arrangement
import androidx.compose.foundation.layout.Box
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.Spacer
import androidx.compose.foundation.layout.fillMaxSize
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.height
import androidx.compose.foundation.layout.padding
import androidx.compose.foundation.layout.width
import androidx.compose.foundation.lazy.LazyColumn
import androidx.compose.foundation.lazy.itemsIndexed
import androidx.compose.foundation.lazy.rememberLazyListState
import androidx.compose.material.icons.Icons
import androidx.compose.material.icons.automirrored.filled.Send
import androidx.compose.material.icons.filled.Add
import androidx.compose.material.icons.filled.Lock
import androidx.compose.material3.Button
import androidx.compose.material3.Card
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.ExperimentalMaterial3Api
import androidx.compose.material3.Icon
import androidx.compose.material3.IconButton
import androidx.compose.material3.MaterialTheme
import androidx.compose.material3.OutlinedButton
import androidx.compose.material3.OutlinedTextField
import androidx.compose.material3.Scaffold
import androidx.compose.material3.Surface
import androidx.compose.material3.Switch
import androidx.compose.material3.Text
import androidx.compose.material3.TopAppBar
import androidx.compose.material3.darkColorScheme
import androidx.compose.runtime.Composable
import androidx.compose.runtime.LaunchedEffect
import androidx.compose.runtime.collectAsState
import androidx.compose.runtime.getValue
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.remember
import androidx.compose.runtime.saveable.rememberSaveable
import androidx.compose.runtime.setValue
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.text.style.TextOverflow
import androidx.compose.ui.unit.dp
import java.text.SimpleDateFormat
import java.util.Date
import java.util.Locale
class MainActivity : ComponentActivity() {
private val vm: BusViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContent {
MaterialTheme(colorScheme = darkColorScheme()) {
Surface(modifier = Modifier.fillMaxSize()) {
UnibusApp(vm)
}
}
}
}
}
@Composable
fun UnibusApp(vm: BusViewModel) {
val state by vm.state.collectAsState()
if (!state.connected) {
ConnectScreen(
connecting = state.connecting,
error = state.error,
onConnect = { host, nats, name -> vm.connect(host, nats, name) },
)
} else {
ChatScreen(state = state, vm = vm)
}
}
@Composable
fun ConnectScreen(
connecting: Boolean,
error: String?,
onConnect: (String, String, String) -> Unit,
) {
var host by rememberSaveable { mutableStateOf("http://10.0.2.2:8470") }
var nats by rememberSaveable { mutableStateOf("nats://10.0.2.2:4250") }
var name by rememberSaveable { mutableStateOf("android") }
Column(
modifier = Modifier
.fillMaxSize()
.padding(24.dp),
verticalArrangement = Arrangement.Center,
) {
Text("unibus", style = MaterialTheme.typography.headlineMedium)
Text(
"chat cifrado extremo a extremo sobre NATS",
style = MaterialTheme.typography.bodyMedium,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
Spacer(Modifier.height(24.dp))
OutlinedTextField(
value = host,
onValueChange = { host = it },
label = { Text("Host (control plane)") },
singleLine = true,
modifier = Modifier.fillMaxWidth(),
)
Spacer(Modifier.height(12.dp))
OutlinedTextField(
value = nats,
onValueChange = { nats = it },
label = { Text("NATS (data plane)") },
singleLine = true,
modifier = Modifier.fillMaxWidth(),
)
Spacer(Modifier.height(12.dp))
OutlinedTextField(
value = name,
onValueChange = { name = it },
label = { Text("Identidad") },
singleLine = true,
modifier = Modifier.fillMaxWidth(),
)
if (error != null) {
Spacer(Modifier.height(12.dp))
Text(error, color = MaterialTheme.colorScheme.error)
}
Spacer(Modifier.height(24.dp))
Button(
onClick = { onConnect(host, nats, name) },
enabled = !connecting,
modifier = Modifier.fillMaxWidth(),
) {
if (connecting) {
CircularProgressIndicator(modifier = Modifier.height(18.dp).width(18.dp), strokeWidth = 2.dp)
Spacer(Modifier.width(8.dp))
}
Text(if (connecting) "Conectando…" else "Conectar")
}
}
}
@OptIn(ExperimentalMaterial3Api::class)
@Composable
fun ChatScreen(state: BusState, vm: BusViewModel) {
var subject by rememberSaveable { mutableStateOf("room.general") }
var encrypt by rememberSaveable { mutableStateOf(false) }
var joinId by rememberSaveable { mutableStateOf("") }
var draft by rememberSaveable { mutableStateOf("") }
val listState = rememberLazyListState()
LaunchedEffect(state.messages.size) {
if (state.messages.isNotEmpty()) listState.animateScrollToItem(state.messages.size - 1)
}
Scaffold(
topBar = {
TopAppBar(
title = {
Column {
Text("unibus", style = MaterialTheme.typography.titleMedium)
Text(
state.status.ifEmpty { state.endpointId.take(12) + "" },
style = MaterialTheme.typography.bodySmall,
maxLines = 1,
overflow = TextOverflow.Ellipsis,
)
}
},
)
},
) { inner ->
Column(
modifier = Modifier
.fillMaxSize()
.padding(inner)
.padding(horizontal = 12.dp),
) {
// Room controls.
Card(modifier = Modifier.fillMaxWidth().padding(vertical = 8.dp)) {
Column(Modifier.padding(12.dp)) {
Row(verticalAlignment = Alignment.CenterVertically) {
OutlinedTextField(
value = subject,
onValueChange = { subject = it },
label = { Text("subject") },
singleLine = true,
modifier = Modifier.weight(1f),
)
Spacer(Modifier.width(8.dp))
Button(onClick = { vm.createRoom(subject, encrypt) }) {
Icon(Icons.Filled.Add, contentDescription = "crear")
}
}
Row(verticalAlignment = Alignment.CenterVertically) {
Switch(checked = encrypt, onCheckedChange = { encrypt = it })
Spacer(Modifier.width(8.dp))
Icon(Icons.Filled.Lock, contentDescription = null, modifier = Modifier.height(16.dp))
Text("cifrar (E2E)", style = MaterialTheme.typography.bodySmall)
}
Spacer(Modifier.height(4.dp))
Row(verticalAlignment = Alignment.CenterVertically) {
OutlinedTextField(
value = joinId,
onValueChange = { joinId = it },
label = { Text("unirse por room id") },
singleLine = true,
modifier = Modifier.weight(1f),
)
Spacer(Modifier.width(8.dp))
OutlinedButton(onClick = { if (joinId.isNotBlank()) vm.joinRoom(joinId) }) {
Text("Unir")
}
}
if (state.roomId.isNotEmpty()) {
Spacer(Modifier.height(4.dp))
Text(
"room: ${state.roomSubject} · ${state.roomId}",
style = MaterialTheme.typography.bodySmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
maxLines = 1,
overflow = TextOverflow.Ellipsis,
)
}
}
}
if (state.error != null) {
Text(
state.error,
color = MaterialTheme.colorScheme.error,
modifier = Modifier.fillMaxWidth().padding(vertical = 4.dp),
)
}
// Messages.
LazyColumn(
state = listState,
modifier = Modifier.weight(1f).fillMaxWidth(),
verticalArrangement = Arrangement.spacedBy(6.dp),
) {
itemsIndexed(state.messages, key = { i, m -> "${m.ts}-$i" }) { _, m ->
MessageBubble(m)
}
}
// Composer.
Row(
modifier = Modifier.fillMaxWidth().padding(vertical = 8.dp),
verticalAlignment = Alignment.CenterVertically,
) {
OutlinedTextField(
value = draft,
onValueChange = { draft = it },
placeholder = { Text("Mensaje…") },
singleLine = true,
enabled = state.roomId.isNotEmpty(),
modifier = Modifier.weight(1f),
)
Spacer(Modifier.width(8.dp))
IconButton(
onClick = {
vm.publish(draft)
draft = ""
},
enabled = state.roomId.isNotEmpty() && draft.isNotBlank(),
) {
Icon(Icons.AutoMirrored.Filled.Send, contentDescription = "enviar")
}
}
}
}
}
private val timeFmt = SimpleDateFormat("HH:mm:ss", Locale.getDefault())
@Composable
fun MessageBubble(m: ChatMessage) {
val align = if (m.mine) Alignment.End else Alignment.Start
Column(modifier = Modifier.fillMaxWidth(), horizontalAlignment = align) {
Card(
modifier = Modifier.fillMaxWidth(0.8f),
) {
Column(Modifier.padding(8.dp)) {
if (!m.mine) {
Text(
m.sender.take(12) + "",
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.primary,
)
}
Text(m.text, style = MaterialTheme.typography.bodyMedium)
Text(
timeFmt.format(Date(m.ts)),
style = MaterialTheme.typography.labelSmall,
color = MaterialTheme.colorScheme.onSurfaceVariant,
)
}
}
}
}
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<string name="app_name">unibus</string>
</resources>
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<resources>
<!-- A minimal Material3 base theme; the real UI styling is driven by Compose
Material3 (MaterialTheme) at runtime. -->
<style name="Theme.Unibus" parent="android:Theme.Material.NoActionBar" />
</resources>
+8
View File
@@ -0,0 +1,8 @@
// Top-level build file. Plugin versions are declared here and applied in the
// module build scripts. AGP 8.5 + Kotlin 2.0 (with the dedicated Compose
// compiler plugin) target the locally installed SDK (compileSdk 34).
plugins {
id("com.android.application") version "8.5.2" apply false
id("org.jetbrains.kotlin.android") version "2.0.21" apply false
id("org.jetbrains.kotlin.plugin.compose") version "2.0.21" apply false
}
+5
View File
@@ -0,0 +1,5 @@
org.gradle.jvmargs=-Xmx2048m -Dfile.encoding=UTF-8
org.gradle.caching=true
android.useAndroidX=true
android.nonTransitiveRClass=true
kotlin.code.style=official
Binary file not shown.
+7
View File
@@ -0,0 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Vendored Executable
+252
View File
@@ -0,0 +1,252 @@
#!/bin/sh
#
# Copyright © 2015-2021 the original authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
#
##############################################################################
#
# Gradle start up script for POSIX generated by Gradle.
#
# Important for running:
#
# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
# noncompliant, but you have some other compliant shell such as ksh or
# bash, then to run this script, type that shell name before the whole
# command line, like:
#
# ksh Gradle
#
# Busybox and similar reduced shells will NOT work, because this script
# requires all of these POSIX shell features:
# * functions;
# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
# * compound commands having a testable exit status, especially «case»;
# * various built-in commands including «command», «set», and «ulimit».
#
# Important for patching:
#
# (2) This script targets any POSIX shell, so it avoids extensions provided
# by Bash, Ksh, etc; in particular arrays are avoided.
#
# The "traditional" practice of packing multiple parameters into a
# space-separated string is a well documented source of bugs and security
# problems, so this is (mostly) avoided, by progressively accumulating
# options in "$@", and eventually passing that to Java.
#
# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
# see the in-line comments for details.
#
# There are tweaks for specific operating systems such as AIX, CygWin,
# Darwin, MinGW, and NonStop.
#
# (3) This script is generated from the Groovy template
# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
# within the Gradle project.
#
# You can find Gradle at https://github.com/gradle/gradle/.
#
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
app_path=$0
# Need this for daisy-chained symlinks.
while
APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
[ -h "$app_path" ]
do
ls=$( ls -ld "$app_path" )
link=${ls#*' -> '}
case $link in #(
/*) app_path=$link ;; #(
*) app_path=$APP_HOME$link ;;
esac
done
# This is normally unused
# shellcheck disable=SC2034
APP_BASE_NAME=${0##*/}
# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
APP_HOME=$( cd -P "${APP_HOME:-./}" > /dev/null && printf '%s
' "$PWD" ) || exit
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD=maximum
warn () {
echo "$*"
} >&2
die () {
echo
echo "$*"
echo
exit 1
} >&2
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "$( uname )" in #(
CYGWIN* ) cygwin=true ;; #(
Darwin* ) darwin=true ;; #(
MSYS* | MINGW* ) msys=true ;; #(
NONSTOP* ) nonstop=true ;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD=$JAVA_HOME/jre/sh/java
else
JAVACMD=$JAVA_HOME/bin/java
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD=java
if ! command -v java >/dev/null 2>&1
then
die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
fi
# Increase the maximum file descriptors if we can.
if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
case $MAX_FD in #(
max*)
# In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
MAX_FD=$( ulimit -H -n ) ||
warn "Could not query maximum file descriptor limit"
esac
case $MAX_FD in #(
'' | soft) :;; #(
*)
# In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
# shellcheck disable=SC2039,SC3045
ulimit -n "$MAX_FD" ||
warn "Could not set maximum file descriptor limit to $MAX_FD"
esac
fi
# Collect all arguments for the java command, stacking in reverse order:
# * args from the command line
# * the main class name
# * -classpath
# * -D...appname settings
# * --module-path (only if needed)
# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
# For Cygwin or MSYS, switch paths to Windows format before running java
if "$cygwin" || "$msys" ; then
APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
JAVACMD=$( cygpath --unix "$JAVACMD" )
# Now convert the arguments - kludge to limit ourselves to /bin/sh
for arg do
if
case $arg in #(
-*) false ;; # don't mess with options #(
/?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
[ -e "$t" ] ;; #(
*) false ;;
esac
then
arg=$( cygpath --path --ignore --mixed "$arg" )
fi
# Roll the args list around exactly as many times as the number of
# args, so each arg winds up back in the position where it started, but
# possibly modified.
#
# NB: a `for` loop captures its iteration list before it begins, so
# changing the positional parameters here affects neither the number of
# iterations, nor the values presented in `arg`.
shift # remove old arg
set -- "$@" "$arg" # push replacement arg
done
fi
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
# Collect all arguments for the java command:
# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
# and any embedded shellness will be escaped.
# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
# treated as '${Hostname}' itself on the command line.
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
org.gradle.wrapper.GradleWrapperMain \
"$@"
# Stop when "xargs" is not available.
if ! command -v xargs >/dev/null 2>&1
then
die "xargs is not available"
fi
# Use "xargs" to parse quoted args.
#
# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
#
# In Bash we could simply go:
#
# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
# set -- "${ARGS[@]}" "$@"
#
# but POSIX shell has neither arrays nor command substitution, so instead we
# post-process each arg (as a line of input to sed) to backslash-escape any
# character that might be a shell metacharacter, then use eval to reverse
# that process (while maintaining the separation between arguments), and wrap
# the whole thing up as a single "set" statement.
#
# This will of course break if any of these variables contains a newline or
# an unmatched quote.
#
eval "set -- $(
printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
xargs -n1 |
sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
tr '\n' ' '
)" '"$@"'
exec "$JAVACMD" "$@"
+94
View File
@@ -0,0 +1,94 @@
@rem
@rem Copyright 2015 the original author or authors.
@rem
@rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
@rem
@rem SPDX-License-Identifier: Apache-2.0
@rem
@if "%DEBUG%"=="" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%"=="" set DIRNAME=.
@rem This is normally unused
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if %ERRORLEVEL% equ 0 goto execute
echo. 1>&2
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute
echo. 1>&2
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. 1>&2
goto fail
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end
@rem End local scope for the variables with windows NT shell
if %ERRORLEVEL% equ 0 goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
set EXIT_CODE=%ERRORLEVEL%
if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
exit /b %EXIT_CODE%
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega
+23
View File
@@ -0,0 +1,23 @@
pluginManagement {
repositories {
google {
content {
includeGroupByRegex("com\\.android.*")
includeGroupByRegex("com\\.google.*")
includeGroupByRegex("androidx.*")
}
}
mavenCentral()
gradlePluginPortal()
}
}
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
google()
mavenCentral()
}
}
rootProject.name = "unibus"
include(":app")
+23 -6
View File
@@ -2,7 +2,7 @@
name: unibus
lang: go
domain: infra
version: 0.2.0
version: 0.4.0
description: "Bus de mensajería unificado sobre NATS+JetStream con cifrado E2E por room (megolm/olm reducido): service de membresía/claves, librería cliente y peers demo."
tags: [service, messaging, nats, e2e]
uses_functions:
@@ -22,13 +22,13 @@ service:
port: 8470
health_endpoint: /healthz
health_timeout_s: 3
systemd_unit: null
systemd_scope: null
restart_policy: none
runtime: manual
systemd_unit: unibus-membershipd.service
systemd_scope: user
restart_policy: always
runtime: systemd-user
pc_targets:
- lucas-linux
is_local_only: true
is_local_only: false
e2e_checks:
- id: build
cmd: "CGO_ENABLED=0 go build ./..."
@@ -154,6 +154,23 @@ agent.<nombre>.{in,out} inbox/outbox de agente LLM (agent.scout.in)
## Capability growth log
- v0.4.0 (2026-06-07) — descubrimiento de rooms: `GET /members/{endpoint}/rooms`
lista las rooms de un endpoint con su metadata y rol, y `client.ListMyRooms()`
lo consume. El control plane es pull (no hay push de invitaciones), así que un
peer recién invitado a una room cifrada la descubre por polling y luego hace
`Join` + `Subscribe`. Pieza base para que los bots de `agents_and_robots`
hablen por el bus en vez de Matrix (modelo "todo son rooms", E2E).
- v0.3.0 (2026-06-06) — `membershipd` se convierte en service de verdad: flag
`--bind` (default 127.0.0.1) que gobierna a la vez el HTTP de control y el NATS
embebido (`embeddednats.StartHost`), de modo que con `--bind 0.0.0.0` un
teléfono o PC de la LAN conecta a ambos planos. Se añade un systemd-user unit
(`deploy/unibus-membershipd.service`, `Restart=always`) + `deploy/install.sh`
idempotente, y el bloque `service:` queda completo (systemd-user, restart
always, health `/healthz`). El `Frame` (pkg/frame) gana threading aditivo
(`ThreadID`, `ReplyTo`) y un tipo `REACT`, con `PublishReply`/`React` en el
cliente — la base para que bots de chat hablen por el bus (fase 2). Cambios
100% aditivos: el wire de los frames no-threaded es idéntico y los tests
existentes siguen verdes.
- v0.2.0 (2026-06-03) — el playground gana un benchmark de rendimiento
(`GET /api/bench`, SSE): un publisher inunda una room con miles de mensajes a
N subscribers y una gráfica en vivo anima el throughput. Expone las dos
+6 -2
View File
@@ -23,6 +23,7 @@ import (
func main() {
var (
bind = flag.String("bind", "127.0.0.1", "network interface to bind the HTTP API and the embedded NATS to; use 0.0.0.0 to accept LAN/remote peers")
natsURL = flag.String("nats-url", "", "external NATS url; empty starts an embedded server")
httpPort = flag.String("http-port", "8470", "HTTP port for the control-plane API")
dbPath = flag.String("db", "./local_files/unibus.db", "SQLite database path")
@@ -40,7 +41,10 @@ func main() {
natsClientURL := *natsURL
if natsClientURL == "" {
var err error
ns, err = embeddednats.Start(*natsStore, *natsPort)
// Bind the embedded NATS to the same interface as the HTTP API so a single
// --bind flag governs reachability: 127.0.0.1 keeps the whole stack
// loopback-only; 0.0.0.0 exposes both planes to the LAN.
ns, err = embeddednats.StartHost(*natsStore, *bind, *natsPort)
if err != nil {
log.Fatalf("start embedded nats: %v", err)
}
@@ -65,7 +69,7 @@ func main() {
log.Printf("blob store: %s", *storeDir)
srv := membership.NewServer(store, blobs)
addr := "127.0.0.1:" + *httpPort
addr := *bind + ":" + *httpPort
httpSrv := &http.Server{Addr: addr, Handler: srv}
go func() {
+67
View File
@@ -0,0 +1,67 @@
# Running membershipd as a systemd user service
`membershipd` is the unibus control plane (rooms, members, sealed keys, blob
store) and, unless you point it at an external NATS with `--nats-url`, it also
runs the embedded NATS + JetStream data plane. Running it as a **systemd user
service** keeps it alive across logout/reboot and restarts it if it crashes.
The unit (`unibus-membershipd.service`) binds both planes to `0.0.0.0`:
| Plane | Port | Reachable from |
|--------------|-------|----------------|
| HTTP control | 8470 | LAN (`http://<host-ip>:8470/healthz`) |
| NATS data | 4250 | LAN (`nats://<host-ip>:4250`) |
## Install (idempotent)
```bash
cd ~/fn_registry/projects/message_bus/apps/unibus
./deploy/install.sh
```
This builds the binary, symlinks the unit into `~/.config/systemd/user/`,
reloads systemd, and enables + starts the service.
## Manual steps (what install.sh does)
```bash
cd ~/fn_registry/projects/message_bus/apps/unibus
# 1. Build the pure-Go binary (no CGO).
CGO_ENABLED=0 go build -o membershipd ./cmd/membershipd
# 2. Link the unit into the systemd user directory.
mkdir -p ~/.config/systemd/user
ln -sf "$PWD/deploy/unibus-membershipd.service" ~/.config/systemd/user/unibus-membershipd.service
# 3. Reload, enable (start on login) and start now.
systemctl --user daemon-reload
systemctl --user enable --now unibus-membershipd.service
# (optional) survive logout without an active session:
# sudo loginctl enable-linger "$USER"
```
## Operate
```bash
systemctl --user status unibus-membershipd.service # is it active?
systemctl --user restart unibus-membershipd.service # after a rebuild
systemctl --user stop unibus-membershipd.service
systemctl --user disable unibus-membershipd.service # stop starting on login
journalctl --user -u unibus-membershipd.service -f # follow logs
# Health (local and from another LAN host):
curl -fsS http://127.0.0.1:8470/healthz
curl -fsS http://<host-lan-ip>:8470/healthz
```
## Notes
- Writable state (SQLite DB, JetStream store, blobs) lives under `local_files/`
relative to `WorkingDirectory`, which the unit sets to the app directory.
- After editing the app code, rebuild (`CGO_ENABLED=0 go build -o membershipd
./cmd/membershipd`) and `systemctl --user restart unibus-membershipd.service`.
- To run against an external NATS instead of the embedded one, append
`--nats-url nats://<host>:4222` to `ExecStart` and re-run `daemon-reload` +
`restart`.
+31
View File
@@ -0,0 +1,31 @@
#!/usr/bin/env bash
# Build membershipd and install/enable/start it as a systemd user service.
# Idempotent: safe to re-run after a code change to rebuild and restart.
set -euo pipefail
APP_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
UNIT="unibus-membershipd.service"
USER_UNIT_DIR="${XDG_CONFIG_HOME:-$HOME/.config}/systemd/user"
cd "$APP_DIR"
echo "==> building membershipd (CGO_ENABLED=0)"
CGO_ENABLED=0 go build -o membershipd ./cmd/membershipd
echo "==> linking unit into $USER_UNIT_DIR"
mkdir -p "$USER_UNIT_DIR"
ln -sf "$APP_DIR/deploy/$UNIT" "$USER_UNIT_DIR/$UNIT"
echo "==> reloading systemd and (re)starting the service"
systemctl --user daemon-reload
systemctl --user enable --now "$UNIT"
# If the service was already running, enable --now does not restart it; do so to
# pick up the freshly built binary.
systemctl --user restart "$UNIT"
echo "==> status"
systemctl --user --no-pager status "$UNIT" || true
echo
echo "Health check:"
echo " curl -fsS http://127.0.0.1:8470/healthz"
+22
View File
@@ -0,0 +1,22 @@
[Unit]
Description=unibus membershipd — control plane (rooms, keys, blobs) + embedded NATS/JetStream
Documentation=https://gitea-dgg044oo04woo4ggcsws4gk0.organic-machine.com/dataforge/unibus
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=%h/fn_registry/projects/message_bus/apps/unibus
# --bind 0.0.0.0 exposes BOTH the HTTP control plane (:8470) and the embedded
# NATS data plane (:4250) to the LAN so phones / other PCs can connect.
ExecStart=%h/fn_registry/projects/message_bus/apps/unibus/membershipd --bind 0.0.0.0
# Restart=always (NOT on-failure): a clean SIGTERM shutdown exits 0, and
# on-failure would then NOT restart, leaving the service silently dead. always
# brings it back regardless of exit code. See .claude/rules/function_tags.md.
Restart=always
RestartSec=2
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=default.target
+53
View File
@@ -11,6 +11,9 @@
package mobile
import (
"encoding/base64"
"encoding/json"
"fmt"
"time"
"github.com/enmanuel/unibus/pkg/client"
@@ -92,6 +95,56 @@ func (s *Session) Subscribe(roomID string, l FrameListener) error {
return err
}
// cardJSON is the portable, copy-pasteable public identity a peer shares so a
// room owner can invite it to an encrypted room. It carries no secret: only the
// endpoint id and the two public keys (signing + key-exchange), base64-encoded
// for transport over text or a QR code.
type cardJSON struct {
ID string `json:"id"`
SignPub string `json:"sign_pub"` // base64 std of the Ed25519 public key
KexPub string `json:"kex_pub"` // base64 std of the X25519 public key
}
// Card returns this peer's public identity as a portable JSON string. Share it
// (paste, QR) with a room owner so they can Invite you to an encrypted room. It
// contains no private key and is safe to transmit in the clear.
func (s *Session) Card() string {
ep := s.c.Endpoint()
b, _ := json.Marshal(cardJSON{
ID: ep.ID,
SignPub: base64.StdEncoding.EncodeToString(ep.SignPub),
KexPub: base64.StdEncoding.EncodeToString(ep.KexPub),
})
return string(b)
}
// Invite adds the holder of peerCard to roomID. peerCard is the JSON string the
// invitee produced with Card(). For encrypted rooms this seals the current room
// key to the invitee's X25519 public key and signs the request; the caller must
// be the room owner.
func (s *Session) Invite(roomID, peerCard string) error {
var card cardJSON
if err := json.Unmarshal([]byte(peerCard), &card); err != nil {
return fmt.Errorf("mobile: bad peer card: %w", err)
}
signPub, err := base64.StdEncoding.DecodeString(card.SignPub)
if err != nil {
return fmt.Errorf("mobile: bad sign_pub in card: %w", err)
}
kexPub, err := base64.StdEncoding.DecodeString(card.KexPub)
if err != nil {
return fmt.Errorf("mobile: bad kex_pub in card: %w", err)
}
return s.c.Invite(roomID, client.Endpoint{ID: card.ID, SignPub: signPub, KexPub: kexPub})
}
// Kick removes endpointID from roomID and, for encrypted rooms, rotates the room
// key to a new epoch so the removed peer cannot decrypt messages published after
// the kick (forward secrecy). The caller must be the room owner.
func (s *Session) Kick(roomID, endpointID string) error {
return s.c.Kick(roomID, endpointID)
}
// Request performs an RPC request/reply against subject and returns the reply
// payload as text. timeoutMs bounds the wait in milliseconds.
func (s *Session) Request(subject, text string, timeoutMs int) (string, error) {
+85 -9
View File
@@ -231,8 +231,48 @@ type blobResp struct {
Hash string `json:"hash"`
}
type memberRoomJSON struct {
RoomID string `json:"room_id"`
Subject string `json:"subject"`
Epoch int `json:"epoch"`
Policy policyJSON `json:"policy"`
Role string `json:"role"`
}
// ---- room operations ------------------------------------------------------
// RoomRef is a room this peer belongs to, returned by ListMyRooms. It is the
// unit of room discovery: a peer that was invited to a new room finds it here
// and can then Join (fetch the sealed key) and Subscribe.
type RoomRef struct {
RoomID string
Subject string
Epoch int
Policy room.Policy
Role string
}
// ListMyRooms returns every room this peer is currently a member of. A peer
// polls this to discover rooms it has been invited to (the control plane is
// pull-based: there is no server push of invitations).
func (c *Client) ListMyRooms() ([]RoomRef, error) {
var resp []memberRoomJSON
if err := c.doJSON("GET", "/members/"+c.endpoint+"/rooms", nil, &resp); err != nil {
return nil, err
}
out := make([]RoomRef, 0, len(resp))
for _, r := range resp {
out = append(out, RoomRef{
RoomID: r.RoomID,
Subject: r.Subject,
Epoch: r.Epoch,
Policy: room.Policy{Encrypt: r.Policy.Encrypt, Persist: r.Policy.Persist, SignMsgs: r.Policy.SignMsgs},
Role: r.Role,
})
}
return out, nil
}
// newRoomKey returns 32 random bytes for a symmetric room key.
func newRoomKey() ([]byte, error) {
k := make([]byte, 32)
@@ -392,20 +432,31 @@ func (c *Client) signerPub(roomID, sender string) ([]byte, error) {
// ---- data plane: publish/subscribe ---------------------------------------
// Publish sends plaintext to a room. For encrypted rooms it seals the payload
// with the current K using the subject as AEAD additional-authenticated-data;
// for signed rooms it attaches an Ed25519 signature.
func (c *Client) Publish(roomID string, plaintext []byte) error {
// threadMeta carries the optional threading/reaction routing of a published
// frame. The zero value yields a plain top-level message whose wire bytes are
// identical to a pre-threading frame (the fields are omitempty).
type threadMeta struct {
threadID string // thread root message id
replyTo string // message id being replied to / reacted to
}
// publishFrame is the single publish path shared by Publish, PublishReply and
// React. It builds the envelope, seals+signs per the room policy, and routes
// through JetStream (persisted rooms) or core NATS (ephemeral rooms). The only
// thing the callers vary is the frame type and the threading metadata.
func (c *Client) publishFrame(roomID string, ftype frame.FrameType, plaintext []byte, tm threadMeta) error {
info, err := c.fetchRoom(roomID)
if err != nil {
return err
}
f := frame.Frame{
Type: frame.PUB,
Subject: info.Subject,
Sender: c.endpoint,
MsgID: newULID(),
Epoch: info.Epoch,
Type: ftype,
Subject: info.Subject,
Sender: c.endpoint,
MsgID: newULID(),
Epoch: info.Epoch,
ThreadID: tm.threadID,
ReplyTo: tm.replyTo,
}
if info.Policy.Encrypt {
k, ep, err := c.fetchKey(roomID, info.Epoch)
@@ -435,6 +486,31 @@ func (c *Client) Publish(roomID string, plaintext []byte) error {
return c.nc.Publish(info.Subject, b)
}
// Publish sends plaintext to a room. For encrypted rooms it seals the payload
// with the current K using the subject as AEAD additional-authenticated-data;
// for signed rooms it attaches an Ed25519 signature.
func (c *Client) Publish(roomID string, plaintext []byte) error {
return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{})
}
// PublishReply sends plaintext as a reply inside a thread. replyTo is the id of
// the message being replied to; threadID is the thread root — pass replyTo when
// you are starting a new thread off a top-level message, or the existing
// ThreadID to keep replying within one. Encryption and signing are identical to
// Publish; the threading metadata rides the cleartext envelope. Receivers read
// Frame.ReplyTo / Frame.ThreadID to render the conversation tree.
func (c *Client) PublishReply(roomID string, plaintext []byte, replyTo, threadID string) error {
return c.publishFrame(roomID, frame.PUB, plaintext, threadMeta{threadID: threadID, replyTo: replyTo})
}
// React publishes a reaction (emoji/shortcode) to a target message. The reaction
// content travels in the payload, so it is sealed exactly like a normal message
// and stays confidential in E2E rooms. Receivers dispatch on Frame.Type ==
// frame.REACT and read Frame.ReplyTo for the message being reacted to.
func (c *Client) React(roomID, targetMsgID, emoji string) error {
return c.publishFrame(roomID, frame.REACT, []byte(emoji), threadMeta{replyTo: targetMsgID})
}
// Sub is a transport-agnostic handle to an active room subscription. It wraps
// either a core NATS subscription (ephemeral rooms) or a JetStream durable
// consumer (persisted rooms) behind a single Unsubscribe() method, so callers
+153
View File
@@ -302,6 +302,159 @@ func TestMediaBlobRoundTrip(t *testing.T) {
}
}
// TestThreadedReplyAndReaction exercises the additive threading API end to end
// in an encrypted, persisted, signed room (ModeMatrix): A publishes a root
// message, replies to it within a thread, and reacts to it with an emoji. The
// loopback subscriber must observe the reply carrying ReplyTo/ThreadID and the
// reaction as a frame.REACT whose (decrypted) payload is the emoji — proving the
// reaction stays sealed like any message in an E2E room.
func TestThreadedReplyAndReaction(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
roomID, err := a.CreateRoom("room.thread", room.ModeMatrix)
if err != nil {
t.Fatalf("create room: %v", err)
}
type rec struct {
f frame.Frame
pt string
}
var mu sync.Mutex
var got []rec
sub, err := a.Subscribe(roomID, func(f frame.Frame, pt []byte) {
mu.Lock()
got = append(got, rec{f: f, pt: string(pt)})
mu.Unlock()
})
if err != nil {
t.Fatalf("subscribe: %v", err)
}
defer sub.Unsubscribe()
time.Sleep(150 * time.Millisecond)
find := func(pred func(rec) bool) (rec, bool) {
mu.Lock()
defer mu.Unlock()
for _, r := range got {
if pred(r) {
return r, true
}
}
return rec{}, false
}
waitRec := func(pred func(rec) bool) (rec, bool) {
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if r, ok := find(pred); ok {
return r, true
}
time.Sleep(25 * time.Millisecond)
}
return rec{}, false
}
// 1. Root message.
if err := a.Publish(roomID, []byte("root")); err != nil {
t.Fatalf("publish root: %v", err)
}
rootRec, ok := waitRec(func(r rec) bool { return r.pt == "root" })
if !ok {
t.Fatalf("never observed the root message")
}
rootID := rootRec.f.MsgID
if rootID == "" {
t.Fatalf("root frame has empty MsgID")
}
// 2. Threaded reply to the root.
if err := a.PublishReply(roomID, []byte("child"), rootID, rootID); err != nil {
t.Fatalf("publish reply: %v", err)
}
reply, ok := waitRec(func(r rec) bool { return r.pt == "child" })
if !ok {
t.Fatalf("never observed the threaded reply")
}
if reply.f.ReplyTo != rootID || reply.f.ThreadID != rootID {
t.Fatalf("reply threading lost: ReplyTo=%q ThreadID=%q want %q", reply.f.ReplyTo, reply.f.ThreadID, rootID)
}
// 3. Reaction to the root (emoji rides the encrypted payload).
if err := a.React(roomID, rootID, "👍"); err != nil {
t.Fatalf("react: %v", err)
}
reaction, ok := waitRec(func(r rec) bool { return r.f.Type == frame.REACT })
if !ok {
t.Fatalf("never observed the reaction frame")
}
if reaction.f.ReplyTo != rootID {
t.Fatalf("reaction target lost: ReplyTo=%q want %q", reaction.f.ReplyTo, rootID)
}
if reaction.pt != "👍" {
t.Fatalf("reaction payload mismatch: got %q want 👍 (decryption in E2E room)", reaction.pt)
}
}
// TestListMyRoomsDiscovery verifies room discovery: an invited peer finds the
// room via ListMyRooms (without being told its id), and a peer in no rooms gets
// an empty list. This is what lets a bot discover rooms it was invited to.
func TestListMyRoomsDiscovery(t *testing.T) {
h := newHarness(t)
waitHealth(t, h.ctrlURL)
a, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect A: %v", err)
}
defer a.Close()
b, err := client.New(h.natsURL, h.ctrlURL, mustIdentity(t))
if err != nil {
t.Fatalf("connect B: %v", err)
}
defer b.Close()
// B is in no rooms yet.
if rooms, err := b.ListMyRooms(); err != nil || len(rooms) != 0 {
t.Fatalf("B should start in no rooms, got %v err=%v", rooms, err)
}
roomID, err := a.CreateRoom("room.discovery", room.ModeMatrix)
if err != nil {
t.Fatalf("A create room: %v", err)
}
if err := a.Invite(roomID, b.Endpoint()); err != nil {
t.Fatalf("A invite B: %v", err)
}
// B discovers the room it was invited to, with its policy, without prior knowledge of the id.
rooms, err := b.ListMyRooms()
if err != nil {
t.Fatalf("B ListMyRooms: %v", err)
}
if len(rooms) != 1 || rooms[0].RoomID != roomID {
t.Fatalf("B should discover exactly room %s, got %+v", roomID, rooms)
}
if rooms[0].Subject != "room.discovery" || !rooms[0].Policy.Encrypt || rooms[0].Role != "member" {
t.Fatalf("discovered room metadata wrong: %+v", rooms[0])
}
// A sees the same room as its owner.
aRooms, err := a.ListMyRooms()
if err != nil {
t.Fatalf("A ListMyRooms: %v", err)
}
if len(aRooms) != 1 || aRooms[0].Role != "owner" {
t.Fatalf("A should own exactly one room, got %+v", aRooms)
}
}
// ---- test helpers ---------------------------------------------------------
type collector struct {
+17 -3
View File
@@ -13,13 +13,27 @@ import (
)
// Start launches an embedded nats-server with JetStream enabled, listening on
// the given port and persisting JetStream state under storeDir. It blocks until
// the server is ready to accept connections (up to 5s) and returns the running
// server. The caller is responsible for calling Shutdown on it.
// the given port and persisting JetStream state under storeDir. The listen host
// is left at the nats-server default ("0.0.0.0", all interfaces). It blocks
// until the server is ready to accept connections (up to 5s) and returns the
// running server. The caller is responsible for calling Shutdown on it.
//
// Start is a thin backward-compatible wrapper over StartHost; callers that need
// to control the bind interface (loopback vs LAN) should use StartHost directly.
func Start(storeDir string, port int) (*server.Server, error) {
return StartHost(storeDir, "", port)
}
// StartHost is Start with explicit control over the bind interface. host selects
// which network interface the data plane listens on: pass "127.0.0.1" to keep
// NATS loopback-only (the safe default for a single-host dev stack) or "0.0.0.0"
// to expose it to the LAN so remote peers (phones, other PCs) can connect. An
// empty host falls back to the nats-server default ("0.0.0.0", all interfaces).
func StartHost(storeDir, host string, port int) (*server.Server, error) {
opts := &server.Options{
JetStream: true,
StoreDir: storeDir,
Host: host,
Port: port,
DontListen: false,
// Keep the embedded server quiet by default; the host app logs the URLs.
+20 -9
View File
@@ -36,6 +36,10 @@ const (
KICK
// ACK acknowledges receipt of a previous frame.
ACK
// REACT is a reaction to a previous message (an emoji/shortcode). The target
// message id travels in ReplyTo; the reaction content rides Payload, so in
// encrypted rooms the reaction is sealed exactly like any other message.
REACT
)
// BlobRef references an out-of-band encrypted blob stored in the object store.
@@ -47,16 +51,23 @@ type BlobRef struct {
}
// Frame is the unit of transport on the unibus message bus.
//
// Threading metadata (ThreadID, ReplyTo) is additive and optional: it travels in
// the cleartext envelope (these are message-id references, not secret content)
// and is omitted entirely when unset, so the wire format and signatures of
// non-threaded frames are byte-for-byte identical to before this field existed.
type Frame struct {
Type FrameType `json:"t"`
Subject string `json:"s"`
Sender string `json:"from"` // endpoint id = EndpointID(signPub)
MsgID string `json:"id"` // ULID
Epoch int `json:"e"` // epoch of the room key K used to encrypt
Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only)
Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt)
Blob *BlobRef `json:"b,omitempty"`
Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes()
Type FrameType `json:"t"`
Subject string `json:"s"`
Sender string `json:"from"` // endpoint id = EndpointID(signPub)
MsgID string `json:"id"` // ULID
Epoch int `json:"e"` // epoch of the room key K used to encrypt
ThreadID string `json:"thr,omitempty"` // root message id of the thread this frame belongs to
ReplyTo string `json:"re,omitempty"` // message id this frame replies to / reacts to
Nonce []byte `json:"n,omitempty"` // AEAD nonce (encrypted rooms only)
Payload []byte `json:"p,omitempty"` // AEAD ciphertext (or cleartext if the room does not encrypt)
Blob *BlobRef `json:"b,omitempty"`
Sig []byte `json:"sig,omitempty"` // Ed25519 signature over SigningBytes()
}
// Marshal serializes the frame to its wire representation (JSON in v1).
+62
View File
@@ -2,6 +2,7 @@ package frame
import (
"bytes"
"strings"
"testing"
)
@@ -40,6 +41,67 @@ func TestMarshalUnmarshalRoundTrip(t *testing.T) {
}
}
// TestThreadingRoundTrip (golden) verifies that the additive threading fields
// survive a marshal/unmarshal cycle and that a REACT frame keeps its target.
func TestThreadingRoundTrip(t *testing.T) {
orig := Frame{
Type: REACT,
Subject: "room.general",
Sender: "abc123",
MsgID: "01J000000000000000000002",
Epoch: 1,
ThreadID: "01J000000000000000000000",
ReplyTo: "01J000000000000000000001",
Payload: []byte("👍"),
}
b, err := orig.Marshal()
if err != nil {
t.Fatalf("Marshal: %v", err)
}
got, err := Unmarshal(b)
if err != nil {
t.Fatalf("Unmarshal: %v", err)
}
if got.Type != REACT {
t.Fatalf("type mismatch: got %d want REACT(%d)", got.Type, REACT)
}
if got.ThreadID != orig.ThreadID || got.ReplyTo != orig.ReplyTo {
t.Fatalf("threading fields lost: got thr=%q re=%q", got.ThreadID, got.ReplyTo)
}
if !bytes.Equal(got.Payload, orig.Payload) {
t.Fatalf("reaction payload mismatch: got %q", got.Payload)
}
}
// TestNonThreadedWireBackCompat (edge) asserts that a frame without threading
// metadata serializes with NO thr/re keys at all, so its bytes — and therefore
// its signature — are identical to a pre-threading frame. This is the
// guarantee that makes the new fields a non-breaking, additive change.
func TestNonThreadedWireBackCompat(t *testing.T) {
f := Frame{Type: PUB, Subject: "room.general", Sender: "x", MsgID: "id", Epoch: 2, Payload: []byte("hi")}
b, err := f.Marshal()
if err != nil {
t.Fatalf("Marshal: %v", err)
}
s := string(b)
if strings.Contains(s, "\"thr\"") || strings.Contains(s, "\"re\"") {
t.Fatalf("threading keys leaked into a non-threaded frame: %s", s)
}
// SigningBytes of a non-threaded frame must also be free of the keys, so old
// signatures over equivalent frames still verify.
if sb := f.SigningBytes(); strings.Contains(string(sb), "\"thr\"") || strings.Contains(string(sb), "\"re\"") {
t.Fatalf("threading keys leaked into SigningBytes: %s", sb)
}
}
// TestUnmarshalRejectsGarbage (error path) verifies that malformed wire bytes
// surface as an error rather than a silently zero-valued frame.
func TestUnmarshalRejectsGarbage(t *testing.T) {
if _, err := Unmarshal([]byte("{not valid json")); err == nil {
t.Fatalf("expected error unmarshaling garbage, got nil")
}
}
func TestEndpointIDDeterministic(t *testing.T) {
pub := []byte("some-ed25519-public-key-bytes-32")
a := EndpointID(pub)
+33
View File
@@ -45,6 +45,7 @@ func (s *Server) routes() {
s.mux.HandleFunc("POST /rooms/{id}/invite", s.handleInvite)
s.mux.HandleFunc("GET /rooms/{id}/key", s.handleGetKey)
s.mux.HandleFunc("GET /rooms/{id}/members", s.handleListMembers)
s.mux.HandleFunc("GET /members/{endpoint}/rooms", s.handleListMemberRooms)
s.mux.HandleFunc("POST /rooms/{id}/rekey", s.handleRekey)
s.mux.HandleFunc("GET /rooms/{id}", s.handleGetRoom)
s.mux.HandleFunc("POST /blobs", s.handlePutBlob)
@@ -101,6 +102,14 @@ type roomResp struct {
Policy policyJSON `json:"policy"`
}
type memberRoomJSON struct {
RoomID string `json:"room_id"`
Subject string `json:"subject"`
Epoch int `json:"epoch"`
Policy policyJSON `json:"policy"`
Role string `json:"role"`
}
type rekeyKey struct {
Endpoint string `json:"endpoint"`
SealedKey []byte `json:"sealed_key"`
@@ -262,6 +271,30 @@ func (s *Server) handleListMembers(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, out)
}
func (s *Server) handleListMemberRooms(w http.ResponseWriter, r *http.Request) {
endpoint := r.PathValue("endpoint")
if endpoint == "" {
writeErr(w, http.StatusBadRequest, "endpoint required")
return
}
rooms, err := s.store.ListRoomsForEndpoint(endpoint)
if err != nil {
writeErr(w, http.StatusInternalServerError, err.Error())
return
}
out := make([]memberRoomJSON, 0, len(rooms))
for _, rm := range rooms {
out = append(out, memberRoomJSON{
RoomID: rm.RoomID,
Subject: rm.Subject,
Epoch: rm.Epoch,
Policy: policyJSON{Encrypt: rm.Encrypt, Persist: rm.Persist, SignMsgs: rm.SignMsgs},
Role: rm.Role,
})
}
writeJSON(w, http.StatusOK, out)
}
func (s *Server) handleGetRoom(w http.ResponseWriter, r *http.Request) {
roomID := r.PathValue("id")
info, err := s.store.GetRoom(roomID)
+36
View File
@@ -219,6 +219,42 @@ func (s *Store) ListMembers(roomID string) ([]Member, error) {
return out, rows.Err()
}
// RoomMembership is a room an endpoint belongs to, with that endpoint's role.
// It is the per-endpoint view used for room discovery (a peer asking "which
// rooms am I in?") so a freshly-invited member can find and join its rooms.
type RoomMembership struct {
RoomInfo
Role string
}
// ListRoomsForEndpoint returns every room the given endpoint is a member of,
// with the room's current metadata and the endpoint's role, ordered by room id.
// An endpoint that is in no rooms yields an empty slice (not an error).
func (s *Store) ListRoomsForEndpoint(endpoint string) ([]RoomMembership, error) {
rows, err := s.db.Query(
`SELECT r.room_id, r.subject, r.key_epoch, r.encrypt, r.persist, r.sign_msgs, r.owner_endpoint, m.role
FROM members m JOIN rooms r ON r.room_id = m.room_id
WHERE m.endpoint = ? ORDER BY r.room_id`,
endpoint,
)
if err != nil {
return nil, fmt.Errorf("membership: list rooms for endpoint %q: %w", endpoint, err)
}
defer rows.Close()
var out []RoomMembership
for rows.Next() {
var rm RoomMembership
var enc, per, sgn int
if err := rows.Scan(&rm.RoomID, &rm.Subject, &rm.Epoch, &enc, &per, &sgn, &rm.OwnerEndpoint, &rm.Role); err != nil {
return nil, fmt.Errorf("membership: scan room membership: %w", err)
}
rm.Encrypt, rm.Persist, rm.SignMsgs = enc != 0, per != 0, sgn != 0
out = append(out, rm)
}
return out, rows.Err()
}
// GetSealedKey returns the sealed room key for an endpoint at a given epoch.
// If epoch <= 0, the latest epoch for that endpoint is returned.
func (s *Store) GetSealedKey(roomID, endpoint string, epoch int) (int, []byte, error) {
+52
View File
@@ -35,6 +35,58 @@ func TestMigrationsCreateSchema(t *testing.T) {
}
}
func TestListRoomsForEndpoint(t *testing.T) {
s := openTestStore(t)
// Owner of two rooms; a member in only the first.
owner, member := "owner-ep", "member-ep"
mk := func(id, subj string) RoomInfo {
return RoomInfo{RoomID: id, Subject: subj, Encrypt: true, Persist: true, SignMsgs: true, OwnerEndpoint: owner}
}
if err := s.CreateRoom(mk("room-a", "room.a"), []byte("os"), []byte("ok"), []byte("k")); err != nil {
t.Fatalf("CreateRoom a: %v", err)
}
if err := s.CreateRoom(mk("room-b", "room.b"), []byte("os"), []byte("ok"), []byte("k")); err != nil {
t.Fatalf("CreateRoom b: %v", err)
}
if err := s.AddMember("room-a", Member{Endpoint: member, Role: "member", SignPub: []byte("s"), KexPub: []byte("k")}, 1, []byte("mk")); err != nil {
t.Fatalf("AddMember: %v", err)
}
// Owner is in both rooms, as owner, ordered by room id.
ownerRooms, err := s.ListRoomsForEndpoint(owner)
if err != nil {
t.Fatalf("ListRoomsForEndpoint owner: %v", err)
}
if len(ownerRooms) != 2 {
t.Fatalf("owner: expected 2 rooms, got %d", len(ownerRooms))
}
if ownerRooms[0].RoomID != "room-a" || ownerRooms[1].RoomID != "room-b" {
t.Fatalf("owner rooms not ordered: %+v", ownerRooms)
}
if ownerRooms[0].Role != "owner" || !ownerRooms[0].Encrypt || ownerRooms[0].Subject != "room.a" {
t.Fatalf("owner room metadata wrong: %+v", ownerRooms[0])
}
// Member is in exactly one room, as member.
memberRooms, err := s.ListRoomsForEndpoint(member)
if err != nil {
t.Fatalf("ListRoomsForEndpoint member: %v", err)
}
if len(memberRooms) != 1 || memberRooms[0].RoomID != "room-a" || memberRooms[0].Role != "member" {
t.Fatalf("member rooms wrong: %+v", memberRooms)
}
// An unknown endpoint yields an empty slice, not an error.
none, err := s.ListRoomsForEndpoint("nobody")
if err != nil {
t.Fatalf("ListRoomsForEndpoint nobody: %v", err)
}
if len(none) != 0 {
t.Fatalf("expected no rooms for unknown endpoint, got %+v", none)
}
}
func TestRoomMemberKeyRoundTrip(t *testing.T) {
s := openTestStore(t)
+78 -1
View File
@@ -24,6 +24,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
@@ -124,6 +125,22 @@ func (p *peerState) setRoom(roomID string, info roomInfo) {
p.mu.Unlock()
}
// roomList returns a snapshot of the rooms this peer knows (created or joined),
// so the SPA can render the peer's room list without re-deriving it client-side.
func (p *peerState) roomList() []map[string]any {
p.mu.Lock()
defer p.mu.Unlock()
out := make([]map[string]any, 0, len(p.rooms))
for id, info := range p.rooms {
out = append(out, map[string]any{
"room_id": id,
"subject": info.subject,
"encrypt": info.encrypt,
})
}
return out
}
// ---------------------------------------------------------------------------
// Hub: the registry of peers, protected by a single mutex.
// ---------------------------------------------------------------------------
@@ -449,6 +466,64 @@ func (h *Hub) handleKick(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "kicked", "target": req.Target})
}
// handleRooms returns the rooms a peer knows (created or joined). The SPA polls
// or calls this after create/join to refresh its room list.
//
// GET /api/rooms?peer=ana
func (h *Hub) handleRooms(w http.ResponseWriter, r *http.Request) {
name := r.URL.Query().Get("peer")
if name == "" {
writeErr(w, http.StatusBadRequest, "peer query param required")
return
}
p, ok := h.lookup(name)
if !ok {
writeErr(w, http.StatusBadRequest, "unknown peer "+name)
return
}
writeJSON(w, http.StatusOK, p.roomList())
}
// handleMembers lists the members of a room (endpoint id + role) so the SPA can
// render a members panel and drive invite/kick. It proxies the control plane's
// unauthenticated read endpoint; the public keys it returns are not secret.
//
// GET /api/members?room_id=<id>
func (h *Hub) handleMembers(w http.ResponseWriter, r *http.Request) {
roomID := r.URL.Query().Get("room_id")
if roomID == "" {
writeErr(w, http.StatusBadRequest, "room_id query param required")
return
}
resp, err := http.Get(ctrlURL + "/rooms/" + roomID + "/members")
if err != nil {
writeErr(w, http.StatusInternalServerError, "fetch members: "+err.Error())
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
_, _ = w.Write(body)
}
// withCORS allows the SPA running under the Vite dev server (a different origin)
// to call the gateway. It answers preflight OPTIONS and tags every response with
// permissive CORS headers. v1 trusts the local network, mirroring the control
// plane's auth model.
func withCORS(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
// handleStream is the SSE endpoint. The browser opens one EventSource per peer;
// each received Event is emitted as a `data: <json>\n\n` block. The listener is
// cleaned up when the HTTP request context is cancelled (tab closed / reload).
@@ -807,9 +882,11 @@ func main() {
mux.HandleFunc("POST /api/invite", hub.handleInvite)
mux.HandleFunc("POST /api/publish", hub.handlePublish)
mux.HandleFunc("POST /api/kick", hub.handleKick)
mux.HandleFunc("GET /api/rooms", hub.handleRooms)
mux.HandleFunc("GET /api/members", hub.handleMembers)
mux.HandleFunc("GET /api/stream", hub.handleStream)
mux.HandleFunc("GET /api/bench", hub.handleBench)
webSrv := &http.Server{Addr: webAddr, Handler: mux}
webSrv := &http.Server{Addr: webAddr, Handler: withCORS(mux)}
go func() {
if err := webSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("web server: %v", err)
+5
View File
@@ -0,0 +1,5 @@
node_modules/
dist/
*.local
.vite/
*.tsbuildinfo
+12
View File
@@ -0,0 +1,12 @@
<!doctype html>
<html lang="es">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>unibus · chat</title>
</head>
<body>
<div id="root"></div>
<script type="module" src="/src/main.tsx"></script>
</body>
</html>
+29
View File
@@ -0,0 +1,29 @@
{
"name": "unibus-web",
"private": true,
"version": "0.1.0",
"type": "module",
"description": "SPA de chat para el bus unibus (rooms cifradas E2E, mensajes en vivo por SSE).",
"scripts": {
"dev": "vite",
"build": "tsc -b && vite build",
"preview": "vite preview"
},
"dependencies": {
"@mantine/core": "^9.3.0",
"@mantine/hooks": "^9.3.0",
"@tabler/icons-react": "^3.36.0",
"react": "^19.2.0",
"react-dom": "^19.2.0"
},
"devDependencies": {
"@types/react": "^19.2.0",
"@types/react-dom": "^19.2.0",
"@vitejs/plugin-react": "^4.3.4",
"postcss": "^8.4.49",
"postcss-preset-mantine": "^1.17.0",
"postcss-simple-vars": "^7.0.1",
"typescript": "^5.6.3",
"vite": "^6.0.3"
}
}
+1481
View File
File diff suppressed because it is too large Load Diff
+2
View File
@@ -0,0 +1,2 @@
allowBuilds:
esbuild: true
+14
View File
@@ -0,0 +1,14 @@
module.exports = {
plugins: {
"postcss-preset-mantine": {},
"postcss-simple-vars": {
variables: {
"mantine-breakpoint-xs": "36em",
"mantine-breakpoint-sm": "48em",
"mantine-breakpoint-md": "62em",
"mantine-breakpoint-lg": "75em",
"mantine-breakpoint-xl": "88em",
},
},
},
};
+29
View File
@@ -0,0 +1,29 @@
import { useState } from "react";
import { GatewayClient } from "./api";
import type { Peer } from "./types";
import { ConnectScreen } from "./components/ConnectScreen";
import { ChatLayout } from "./components/ChatLayout";
// Connection holds the live gateway client plus the identity it connected as.
interface Connection {
client: GatewayClient;
peer: Peer;
}
// App is the root: it shows the connect screen until the user picks a gateway
// URL and a peer name, then swaps to the full chat layout. Disconnecting drops
// back to the connect screen.
export function App() {
const [conn, setConn] = useState<Connection | null>(null);
if (!conn) {
return <ConnectScreen onConnect={(client, peer) => setConn({ client, peer })} />;
}
return (
<ChatLayout
client={conn.client}
peer={conn.peer}
onDisconnect={() => setConn(null)}
/>
);
}
+99
View File
@@ -0,0 +1,99 @@
// GatewayClient is the SPA's typed wrapper over the unibus gateway HTTP API.
// Every method is a thin fetch against the gateway, which hosts one real Go bus
// peer per name and performs all NATS + end-to-end crypto on the browser's
// behalf. The base URL is chosen at runtime on the connect screen.
import type { BusEvent, Member, Peer, Room } from "./types";
export class GatewayClient {
constructor(public readonly baseURL: string) {
// Normalize: drop a trailing slash so `${base}/api/...` never doubles up.
this.baseURL = baseURL.replace(/\/+$/, "");
}
private async req<T>(method: string, path: string, body?: unknown): Promise<T> {
const res = await fetch(this.baseURL + path, {
method,
headers: body !== undefined ? { "Content-Type": "application/json" } : undefined,
body: body !== undefined ? JSON.stringify(body) : undefined,
});
const text = await res.text();
if (!res.ok) {
let msg = text;
try {
const j = JSON.parse(text);
if (j && typeof j.error === "string") msg = j.error;
} catch {
// not JSON: keep the raw text
}
throw new Error(msg || `HTTP ${res.status}`);
}
return (text ? JSON.parse(text) : {}) as T;
}
// connect creates (or recovers) the named peer on the gateway and returns its
// public identity. The identity persists across gateway restarts.
connect(name: string): Promise<Peer> {
return this.req<Peer>("POST", "/api/peer", { name });
}
// peers lists every peer currently hosted by the gateway (for the invite picker
// and to label senders by name).
peers(): Promise<Peer[]> {
return this.req<Peer[]>("GET", "/api/peers");
}
// rooms lists the rooms the named peer knows (created or joined).
rooms(peer: string): Promise<Room[]> {
return this.req<Room[]>("GET", `/api/rooms?peer=${encodeURIComponent(peer)}`);
}
// members lists the participants of a room.
members(roomID: string): Promise<Member[]> {
return this.req<Member[]>("GET", `/api/members?room_id=${encodeURIComponent(roomID)}`);
}
// createRoom opens a room on the given subject. encrypt drives both E2E
// encryption and per-message signing; the peer is auto-subscribed.
createRoom(peer: string, subject: string, encrypt: boolean): Promise<Room & { persist: boolean }> {
return this.req("POST", "/api/room", { peer, subject, encrypt, persist: false });
}
// join subscribes the peer to an existing room (must have been invited first
// when the room is encrypted).
join(peer: string, roomID: string): Promise<{ subject: string; encrypt: boolean }> {
return this.req("POST", "/api/join", { peer, room_id: roomID });
}
// invite adds another connected peer (by name) to a room, sealing the room key
// to it. Caller must be the room owner.
invite(peer: string, roomID: string, target: string): Promise<{ status: string }> {
return this.req("POST", "/api/invite", { peer, room_id: roomID, target });
}
// publish sends a text message to a room.
publish(peer: string, roomID: string, text: string): Promise<{ status: string }> {
return this.req("POST", "/api/publish", { peer, room_id: roomID, text });
}
// kick removes a peer (by name) from a room and rotates the key (forward
// secrecy). Caller must be the room owner.
kick(peer: string, roomID: string, target: string): Promise<{ status: string }> {
return this.req("POST", "/api/kick", { peer, room_id: roomID, target });
}
// stream opens the SSE channel for a peer. onEvent fires for each received bus
// message; onError fires if the stream drops. Returns the EventSource so the
// caller can close it.
stream(peer: string, onEvent: (ev: BusEvent) => void, onError?: () => void): EventSource {
const es = new EventSource(`${this.baseURL}/api/stream?peer=${encodeURIComponent(peer)}`);
es.onmessage = (e) => {
try {
onEvent(JSON.parse(e.data) as BusEvent);
} catch {
// ignore malformed frames (keepalive comments never reach onmessage)
}
};
if (onError) es.onerror = onError;
return es;
}
}
+285
View File
@@ -0,0 +1,285 @@
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import {
AppShell,
Group,
Title,
Badge,
Button,
CopyButton,
Tooltip,
ActionIcon,
ThemeIcon,
Alert,
Transition,
} from "@mantine/core";
import {
IconBolt,
IconLogout,
IconCopy,
IconCheck,
IconAlertTriangle,
} from "@tabler/icons-react";
import { GatewayClient } from "../api";
import type { Member, Message, Peer, Room } from "../types";
import { RoomList } from "./RoomList";
import { MessagePane } from "./MessagePane";
import { MembersPane } from "./MembersPane";
interface Props {
client: GatewayClient;
peer: Peer;
onDisconnect: () => void;
}
// short renders the first 10 chars of an endpoint id, enough to disambiguate.
export function short(endpoint: string): string {
return endpoint.length > 12 ? endpoint.slice(0, 10) + "…" : endpoint;
}
// ChatLayout owns all chat state: the peer's rooms, the active room, the
// per-room message log fed by the SSE stream, the directory of connected peers
// (to label senders and pick invitees), and the active room's member list. Every
// bus action goes through the gateway client.
export function ChatLayout({ client, peer, onDisconnect }: Props) {
const [rooms, setRooms] = useState<Room[]>([]);
const [activeRoom, setActiveRoom] = useState<string | null>(null);
const [messages, setMessages] = useState<Record<string, Message[]>>({});
const [peers, setPeers] = useState<Peer[]>([]);
const [members, setMembers] = useState<Member[]>([]);
const [error, setError] = useState<string | null>(null);
const seq = useRef(0);
const fail = useCallback((e: unknown) => {
setError(e instanceof Error ? e.message : String(e));
}, []);
// ---- data refreshers ----------------------------------------------------
const refreshRooms = useCallback(async () => {
try {
setRooms(await client.rooms(peer.name));
} catch (e) {
fail(e);
}
}, [client, peer.name, fail]);
const refreshPeers = useCallback(async () => {
try {
setPeers(await client.peers());
} catch (e) {
fail(e);
}
}, [client, fail]);
const refreshMembers = useCallback(
async (roomID: string) => {
try {
setMembers(await client.members(roomID));
} catch (e) {
fail(e);
}
},
[client, fail],
);
// ---- live stream (SSE) --------------------------------------------------
useEffect(() => {
const es = client.stream(
peer.name,
(ev) => {
seq.current += 1;
const msg: Message = { ...ev, id: `${ev.ts}-${seq.current}` };
setMessages((prev) => {
const list = prev[ev.room_id] ?? [];
return { ...prev, [ev.room_id]: [...list, msg] };
});
},
() => setError("Se perdió la conexión con el gateway (stream SSE)"),
);
return () => es.close();
}, [client, peer.name]);
// Initial load.
useEffect(() => {
refreshRooms();
refreshPeers();
}, [refreshRooms, refreshPeers]);
// Refresh members whenever the active room changes.
useEffect(() => {
if (activeRoom) refreshMembers(activeRoom);
else setMembers([]);
}, [activeRoom, refreshMembers]);
// ---- actions ------------------------------------------------------------
const onCreateRoom = useCallback(
async (subject: string, encrypt: boolean) => {
try {
const r = await client.createRoom(peer.name, subject, encrypt);
await refreshRooms();
setActiveRoom(r.room_id);
} catch (e) {
fail(e);
}
},
[client, peer.name, refreshRooms, fail],
);
const onJoinRoom = useCallback(
async (roomID: string) => {
try {
await client.join(peer.name, roomID);
await refreshRooms();
setActiveRoom(roomID);
} catch (e) {
fail(e);
}
},
[client, peer.name, refreshRooms, fail],
);
const onInvite = useCallback(
async (target: string) => {
if (!activeRoom) return;
try {
await client.invite(peer.name, activeRoom, target);
await refreshMembers(activeRoom);
} catch (e) {
fail(e);
}
},
[client, peer.name, activeRoom, refreshMembers, fail],
);
const onKick = useCallback(
async (target: string) => {
if (!activeRoom) return;
try {
await client.kick(peer.name, activeRoom, target);
await refreshMembers(activeRoom);
} catch (e) {
fail(e);
}
},
[client, peer.name, activeRoom, refreshMembers, fail],
);
const onPublish = useCallback(
async (text: string) => {
if (!activeRoom) return;
try {
await client.publish(peer.name, activeRoom, text);
} catch (e) {
fail(e);
}
},
[client, peer.name, activeRoom, fail],
);
// endpoint -> display name, using the peer directory; falls back to a short id.
const nameFor = useMemo(() => {
const byEndpoint = new Map(peers.map((p) => [p.endpoint_id, p.name]));
return (endpoint: string) =>
endpoint === peer.endpoint_id ? peer.name : byEndpoint.get(endpoint) ?? short(endpoint);
}, [peers, peer]);
const activeRoomObj = rooms.find((r) => r.room_id === activeRoom) ?? null;
const iAmOwner = members.some((m) => m.endpoint === peer.endpoint_id && m.role === "owner");
return (
<AppShell
header={{ height: 60 }}
navbar={{ width: 300, breakpoint: "sm" }}
aside={{ width: 300, breakpoint: "md", collapsed: { desktop: !activeRoom, mobile: true } }}
padding={0}
>
<AppShell.Header>
<Group h="100%" px="md" justify="space-between" wrap="nowrap">
<Group gap="xs" wrap="nowrap">
<ThemeIcon variant="light" color="violet" radius="md">
<IconBolt size={18} />
</ThemeIcon>
<Title order={4}>unibus</Title>
</Group>
<Group gap="xs" wrap="nowrap">
<Badge variant="light" color="violet" size="lg">
{peer.name}
</Badge>
<CopyButton value={peer.endpoint_id}>
{({ copied, copy }) => (
<Tooltip label={copied ? "¡copiado!" : peer.endpoint_id} withArrow>
<ActionIcon variant="subtle" color="gray" onClick={copy}>
{copied ? <IconCheck size={16} /> : <IconCopy size={16} />}
</ActionIcon>
</Tooltip>
)}
</CopyButton>
<Button
variant="subtle"
color="gray"
leftSection={<IconLogout size={16} />}
onClick={onDisconnect}
>
Salir
</Button>
</Group>
</Group>
</AppShell.Header>
<AppShell.Navbar>
<RoomList
rooms={rooms}
activeRoom={activeRoom}
onSelect={setActiveRoom}
onCreateRoom={onCreateRoom}
onJoinRoom={onJoinRoom}
/>
</AppShell.Navbar>
<AppShell.Main h="100vh">
{error && (
<Transition mounted={!!error} transition="slide-down">
{(styles) => (
<Alert
style={{ ...styles, position: "absolute", top: 70, left: "50%", transform: "translateX(-50%)", zIndex: 200, minWidth: 360 }}
color="red"
variant="filled"
icon={<IconAlertTriangle size={18} />}
withCloseButton
onClose={() => setError(null)}
title="Error"
>
{error}
</Alert>
)}
</Transition>
)}
<MessagePane
room={activeRoomObj}
messages={activeRoom ? messages[activeRoom] ?? [] : []}
myEndpoint={peer.endpoint_id}
nameFor={nameFor}
onPublish={onPublish}
/>
</AppShell.Main>
<AppShell.Aside>
{activeRoomObj && (
<MembersPane
room={activeRoomObj}
members={members}
peers={peers}
myEndpoint={peer.endpoint_id}
iAmOwner={iAmOwner}
nameFor={nameFor}
onInvite={onInvite}
onKick={onKick}
onRefresh={() => activeRoom && refreshMembers(activeRoom)}
/>
)}
</AppShell.Aside>
</AppShell>
);
}
+116
View File
@@ -0,0 +1,116 @@
import { useState } from "react";
import {
Button,
Card,
Center,
Group,
Stack,
Text,
TextInput,
Title,
Alert,
ThemeIcon,
} from "@mantine/core";
import { IconBolt, IconPlugConnected, IconAlertTriangle } from "@tabler/icons-react";
import { GatewayClient } from "../api";
import type { Peer } from "../types";
const LS_GATEWAY = "unibus.gateway";
const LS_PEER = "unibus.peer";
interface Props {
onConnect: (client: GatewayClient, peer: Peer) => void;
}
// ConnectScreen asks for the gateway URL and the identity (peer name) to connect
// as. Both persist in localStorage so a reload reconnects with one click. The
// gateway hosts the real Go bus peer; the browser only drives it.
export function ConnectScreen({ onConnect }: Props) {
const [gateway, setGateway] = useState(
() => localStorage.getItem(LS_GATEWAY) ?? "http://localhost:7700",
);
const [name, setName] = useState(() => localStorage.getItem(LS_PEER) ?? "");
const [busy, setBusy] = useState(false);
const [error, setError] = useState<string | null>(null);
const connect = async () => {
const trimmed = name.trim();
if (!trimmed) {
setError("Elige un nombre de identidad");
return;
}
setBusy(true);
setError(null);
try {
const client = new GatewayClient(gateway.trim());
const peer = await client.connect(trimmed);
localStorage.setItem(LS_GATEWAY, client.baseURL);
localStorage.setItem(LS_PEER, trimmed);
onConnect(client, peer);
} catch (e) {
setError(e instanceof Error ? e.message : String(e));
} finally {
setBusy(false);
}
};
return (
<Center h="100vh" p="md">
<Card withBorder shadow="md" radius="lg" p="xl" w={420} maw="100%">
<Stack gap="lg">
<Group gap="sm">
<ThemeIcon size="xl" radius="md" variant="light" color="violet">
<IconBolt size={26} />
</ThemeIcon>
<div>
<Title order={3}>unibus</Title>
<Text size="sm" c="dimmed">
chat cifrado extremo a extremo sobre NATS
</Text>
</div>
</Group>
<TextInput
label="Gateway"
description="URL del gateway web de unibus"
placeholder="http://localhost:7700"
value={gateway}
onChange={(e) => setGateway(e.currentTarget.value)}
disabled={busy}
/>
<TextInput
label="Identidad"
description="Tu nombre de peer en el bus (persistente)"
placeholder="ana"
value={name}
onChange={(e) => setName(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && connect()}
disabled={busy}
data-autofocus
/>
{error && (
<Alert
color="red"
variant="light"
icon={<IconAlertTriangle size={18} />}
title="No se pudo conectar"
>
{error}
</Alert>
)}
<Button
leftSection={<IconPlugConnected size={18} />}
onClick={connect}
loading={busy}
fullWidth
size="md"
>
Conectar
</Button>
</Stack>
</Card>
</Center>
);
}
+153
View File
@@ -0,0 +1,153 @@
import { useState } from "react";
import {
Stack,
Group,
Text,
Badge,
Select,
Button,
ActionIcon,
Divider,
Box,
Avatar,
Tooltip,
ScrollArea,
} from "@mantine/core";
import { IconUserPlus, IconUserMinus, IconRefresh, IconUsers } from "@tabler/icons-react";
import type { Member, Peer, Room } from "../types";
interface Props {
room: Room;
members: Member[];
peers: Peer[];
myEndpoint: string;
iAmOwner: boolean;
nameFor: (endpoint: string) => string;
onInvite: (target: string) => void;
onKick: (target: string) => void;
onRefresh: () => void;
}
// MembersPane is the right column: who is in the active room, plus invite (pick a
// connected peer) and kick (owner only). Invite/kick address peers by name; the
// gateway resolves the name to its bus endpoint.
export function MembersPane({
room,
members,
peers,
myEndpoint,
iAmOwner,
nameFor,
onInvite,
onKick,
onRefresh,
}: Props) {
const [target, setTarget] = useState<string | null>(null);
const memberEndpoints = new Set(members.map((m) => m.endpoint));
// Candidates to invite: connected peers not already in the room.
const candidates = peers
.filter((p) => !memberEndpoints.has(p.endpoint_id))
.map((p) => ({ value: p.name, label: p.name }));
const invite = () => {
if (target) {
onInvite(target);
setTarget(null);
}
};
return (
<Stack gap={0} h="100%">
<Group justify="space-between" px="md" py="sm" wrap="nowrap" style={{ borderBottom: "1px solid var(--mantine-color-default-border)" }}>
<Group gap="xs">
<IconUsers size={18} />
<Text fw={600}>Miembros</Text>
<Badge size="sm" variant="light">
{members.length}
</Badge>
</Group>
<Tooltip label="Recargar" withArrow>
<ActionIcon variant="subtle" color="gray" onClick={onRefresh}>
<IconRefresh size={16} />
</ActionIcon>
</Tooltip>
</Group>
<Box p="md">
<Text size="xs" fw={700} c="dimmed" tt="uppercase" mb="xs">
Invitar {room.encrypt && "(reparte la clave)"}
</Text>
<Group gap="xs" wrap="nowrap" align="flex-end">
<Select
style={{ flex: 1 }}
size="xs"
placeholder="peer conectado"
data={candidates}
value={target}
onChange={setTarget}
searchable
nothingFoundMessage="sin peers libres"
comboboxProps={{ withinPortal: true }}
/>
<Button
size="xs"
leftSection={<IconUserPlus size={14} />}
onClick={invite}
disabled={!target}
>
Invitar
</Button>
</Group>
</Box>
<Divider />
<ScrollArea style={{ flex: 1 }}>
<Stack gap={4} p="md">
{members.map((m) => {
const isMe = m.endpoint === myEndpoint;
const name = nameFor(m.endpoint);
const canKick = iAmOwner && !isMe && m.role !== "owner";
return (
<Group key={m.endpoint} justify="space-between" wrap="nowrap" gap="xs">
<Group gap="xs" wrap="nowrap" style={{ minWidth: 0 }}>
<Avatar size="sm" radius="xl" color="violet">
{name.slice(0, 2).toUpperCase()}
</Avatar>
<Box style={{ minWidth: 0 }}>
<Text size="sm" fw={isMe ? 700 : 500} truncate>
{name} {isMe && "(tú)"}
</Text>
<Text size="9px" c="dimmed" truncate>
{m.endpoint}
</Text>
</Box>
</Group>
<Group gap={4} wrap="nowrap">
{m.role === "owner" && (
<Badge size="xs" color="yellow" variant="light">
owner
</Badge>
)}
{canKick && (
<Tooltip label="Expulsar (rota la clave)" withArrow>
<ActionIcon
variant="subtle"
color="red"
size="sm"
onClick={() => onKick(name)}
>
<IconUserMinus size={15} />
</ActionIcon>
</Tooltip>
)}
</Group>
</Group>
);
})}
</Stack>
</ScrollArea>
</Stack>
);
}
+153
View File
@@ -0,0 +1,153 @@
import { useEffect, useRef, useState } from "react";
import {
Stack,
Group,
Text,
Badge,
Paper,
ScrollArea,
TextInput,
ActionIcon,
Center,
ThemeIcon,
Box,
CopyButton,
Tooltip,
} from "@mantine/core";
import {
IconLock,
IconHash,
IconSend,
IconMessages,
IconCopy,
IconCheck,
} from "@tabler/icons-react";
import type { Message, Room } from "../types";
interface Props {
room: Room | null;
messages: Message[];
myEndpoint: string;
nameFor: (endpoint: string) => string;
onPublish: (text: string) => void;
}
// formatTime renders a message timestamp as HH:mm:ss in 24h European style.
function formatTime(ts: number): string {
return new Date(ts).toLocaleTimeString("es-ES", {
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
});
}
// MessagePane is the center column: the active room's live message log plus the
// composer. Own messages align right; others align left and show the sender.
export function MessagePane({ room, messages, myEndpoint, nameFor, onPublish }: Props) {
const [text, setText] = useState("");
const viewport = useRef<HTMLDivElement>(null);
// Auto-scroll to the newest message.
useEffect(() => {
viewport.current?.scrollTo({ top: viewport.current.scrollHeight, behavior: "smooth" });
}, [messages.length]);
if (!room) {
return (
<Center h="100%">
<Stack align="center" gap="xs">
<ThemeIcon size={64} radius="xl" variant="light" color="gray">
<IconMessages size={34} />
</ThemeIcon>
<Text c="dimmed">Elige o crea una room para empezar a chatear</Text>
</Stack>
</Center>
);
}
const send = () => {
const t = text.trim();
if (t) {
onPublish(t);
setText("");
}
};
return (
<Stack gap={0} h="100%">
<Group justify="space-between" px="md" py="sm" wrap="nowrap" style={{ borderBottom: "1px solid var(--mantine-color-default-border)" }}>
<Group gap="xs" wrap="nowrap">
{room.encrypt ? <IconLock size={18} /> : <IconHash size={18} />}
<Text fw={600}>{room.subject}</Text>
{room.encrypt && (
<Badge size="sm" color="teal" variant="light">
cifrada E2E
</Badge>
)}
</Group>
<CopyButton value={room.room_id}>
{({ copied, copy }) => (
<Tooltip label={copied ? "¡copiado!" : "copiar room id"} withArrow>
<ActionIcon variant="subtle" color="gray" onClick={copy}>
{copied ? <IconCheck size={16} /> : <IconCopy size={16} />}
</ActionIcon>
</Tooltip>
)}
</CopyButton>
</Group>
<ScrollArea style={{ flex: 1 }} viewportRef={viewport} p="md">
<Stack gap="sm">
{messages.length === 0 && (
<Text c="dimmed" ta="center" py="xl" size="sm">
No hay mensajes todavía.
</Text>
)}
{messages.map((m) => {
const mine = m.sender === myEndpoint;
return (
<Box
key={m.id}
style={{ display: "flex", justifyContent: mine ? "flex-end" : "flex-start" }}
>
<Paper
withBorder
shadow="xs"
radius="md"
p="xs"
bg={mine ? "violet.9" : undefined}
maw="75%"
>
{!mine && (
<Text size="xs" fw={700} c="violet.4">
{nameFor(m.sender)}
</Text>
)}
<Text size="sm" style={{ wordBreak: "break-word", whiteSpace: "pre-wrap" }}>
{m.text}
</Text>
<Text size="9px" c="dimmed" ta="right" mt={2}>
{formatTime(m.ts)}
</Text>
</Paper>
</Box>
);
})}
</Stack>
</ScrollArea>
<Group p="md" gap="xs" wrap="nowrap" style={{ borderTop: "1px solid var(--mantine-color-default-border)" }}>
<TextInput
style={{ flex: 1 }}
placeholder={`Mensaje a ${room.subject}`}
value={text}
onChange={(e) => setText(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && send()}
/>
<ActionIcon size="lg" onClick={send} disabled={!text.trim()}>
<IconSend size={18} />
</ActionIcon>
</Group>
</Stack>
);
}
+119
View File
@@ -0,0 +1,119 @@
import { useState } from "react";
import {
Stack,
TextInput,
Checkbox,
Button,
Divider,
Text,
NavLink,
ScrollArea,
Group,
Box,
} from "@mantine/core";
import { IconLock, IconHash, IconPlus, IconDoorEnter } from "@tabler/icons-react";
import type { Room } from "../types";
interface Props {
rooms: Room[];
activeRoom: string | null;
onSelect: (roomID: string) => void;
onCreateRoom: (subject: string, encrypt: boolean) => void;
onJoinRoom: (roomID: string) => void;
}
// RoomList is the navbar: create a room, join one by id, and pick the active
// room from the peer's known rooms.
export function RoomList({ rooms, activeRoom, onSelect, onCreateRoom, onJoinRoom }: Props) {
const [subject, setSubject] = useState("room.general");
const [encrypt, setEncrypt] = useState(true);
const [joinID, setJoinID] = useState("");
const create = () => {
if (subject.trim()) onCreateRoom(subject.trim(), encrypt);
};
const join = () => {
if (joinID.trim()) {
onJoinRoom(joinID.trim());
setJoinID("");
}
};
return (
<Stack gap={0} h="100%">
<Box p="md">
<Text size="xs" fw={700} c="dimmed" tt="uppercase" mb="xs">
Crear room
</Text>
<Stack gap="xs">
<TextInput
size="xs"
placeholder="subject (room.general)"
leftSection={<IconHash size={14} />}
value={subject}
onChange={(e) => setSubject(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && create()}
/>
<Checkbox
size="xs"
label="Cifrado extremo a extremo"
checked={encrypt}
onChange={(e) => setEncrypt(e.currentTarget.checked)}
/>
<Button size="xs" leftSection={<IconPlus size={14} />} onClick={create}>
Crear
</Button>
</Stack>
</Box>
<Divider />
<Box p="md">
<Text size="xs" fw={700} c="dimmed" tt="uppercase" mb="xs">
Unirse por id
</Text>
<Group gap="xs" wrap="nowrap">
<TextInput
size="xs"
placeholder="room id"
value={joinID}
onChange={(e) => setJoinID(e.currentTarget.value)}
onKeyDown={(e) => e.key === "Enter" && join()}
style={{ flex: 1 }}
/>
<Button size="xs" variant="light" onClick={join} px="sm">
<IconDoorEnter size={16} />
</Button>
</Group>
</Box>
<Divider />
<Text size="xs" fw={700} c="dimmed" tt="uppercase" px="md" pt="md" pb="xs">
Rooms ({rooms.length})
</Text>
<ScrollArea style={{ flex: 1 }}>
<Stack gap={2} px="xs" pb="md">
{rooms.length === 0 && (
<Text size="sm" c="dimmed" px="sm" py="lg" ta="center">
Aún no hay rooms. Crea o únete a una.
</Text>
)}
{rooms.map((r) => (
<NavLink
key={r.room_id}
active={r.room_id === activeRoom}
onClick={() => onSelect(r.room_id)}
label={r.subject}
description={r.room_id.slice(0, 14) + "…"}
leftSection={
r.encrypt ? <IconLock size={16} /> : <IconHash size={16} />
}
variant="filled"
/>
))}
</Stack>
</ScrollArea>
</Stack>
);
}
+14
View File
@@ -0,0 +1,14 @@
import React from "react";
import ReactDOM from "react-dom/client";
import { MantineProvider } from "@mantine/core";
import "@mantine/core/styles.css";
import { theme } from "./theme";
import { App } from "./App";
ReactDOM.createRoot(document.getElementById("root")!).render(
<React.StrictMode>
<MantineProvider theme={theme} defaultColorScheme="dark">
<App />
</MantineProvider>
</React.StrictMode>,
);
+14
View File
@@ -0,0 +1,14 @@
import { createTheme } from "@mantine/core";
// The unibus theme: a single accent color and a slightly tighter default radius.
// Mantine generates all its CSS variables from this; the SPA never hand-writes
// CSS or color literals.
export const theme = createTheme({
primaryColor: "violet",
defaultRadius: "md",
fontFamily:
"Inter, ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, sans-serif",
headings: {
fontWeight: "650",
},
});
+41
View File
@@ -0,0 +1,41 @@
// Domain types shared across the SPA. They mirror the JSON the unibus gateway
// (playground/server.go) returns; the browser never speaks NATS or crypto
// directly — the Go peer behind the gateway does, so every type here is a plain
// view of a gateway response.
// Peer is a named identity hosted by the gateway. endpoint_id is the stable bus
// endpoint (base64url of sha256(signPub)).
export interface Peer {
name: string;
endpoint_id: string;
}
// Room is a channel the connected peer created or joined. encrypt true means the
// payloads are sealed end-to-end with the room key.
export interface Room {
room_id: string;
subject: string;
encrypt: boolean;
}
// Member is one participant of a room as reported by the control plane.
export interface Member {
endpoint: string;
role: string;
}
// BusEvent is one Server-Sent Event delivered on /api/stream: a message a peer
// received on one of its subscribed rooms, already decrypted by the Go peer.
export interface BusEvent {
room_id: string;
subject: string;
sender: string;
text: string;
encrypted: boolean;
ts: number; // unix millis
}
// Message is a BusEvent enriched with a stable local id for React keys.
export interface Message extends BusEvent {
id: string;
}
+22
View File
@@ -0,0 +1,22 @@
{
"compilerOptions": {
"target": "ES2022",
"useDefineForClassFields": true,
"lib": ["ES2022", "DOM", "DOM.Iterable"],
"module": "ESNext",
"skipLibCheck": true,
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"isolatedModules": true,
"moduleDetection": "force",
"noEmit": true,
"jsx": "react-jsx",
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true,
"noFallthroughCasesInSwitch": true
},
"include": ["src"]
}
+7
View File
@@ -0,0 +1,7 @@
{
"files": [],
"references": [
{ "path": "./tsconfig.app.json" },
{ "path": "./tsconfig.node.json" }
]
}
+17
View File
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2022",
"lib": ["ES2023"],
"module": "ESNext",
"skipLibCheck": true,
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"isolatedModules": true,
"moduleDetection": "force",
"noEmit": true,
"strict": true,
"noUnusedLocals": true,
"noUnusedParameters": true
},
"include": ["vite.config.ts"]
}
+14
View File
@@ -0,0 +1,14 @@
import { defineConfig } from "vite";
import react from "@vitejs/plugin-react";
// The SPA talks to the unibus gateway over plain fetch + EventSource; the
// gateway URL is chosen at runtime on the connect screen, so nothing is proxied
// here. The dev server runs on a fixed port so the gateway's permissive CORS is
// predictable.
export default defineConfig({
plugins: [react()],
server: {
port: 5173,
host: true,
},
});