SignallingClient
The lower-level class. Pub/sub + directed messages over WebSocket, ack-correlated, auto-reconnecting. No WebRTC, no per-peer state, no channel-driven discovery.
Use this for chat, presence, IoT telemetry, AI agent message bus, collaborative cursors, multiplayer lobbies — anything pub/sub at heart — or when you want to subscribe to multiple channels from one connection (MeteredPeer is tied to one channel). If you're building WebRTC and the channel + peer model fits, use MeteredPeer; it wraps this class.
Construct
import 'package:metered_realtime/metered_realtime.dart';
final client = SignallingClient(SignallingClientOptions(
apiKey: 'pk_live_…',
// — OR —
tokenProvider: () async => fetchJwtFromYourBackend(),
));
Provide exactly one of apiKey or tokenProvider. The constructor doesn't connect — call connect().
SignallingClientOptions
| Option | Type | Default | Notes |
|---|---|---|---|
apiKey | String? | — | pk_live_…. Mutually exclusive with tokenProvider. |
tokenProvider | TokenProvider? | — | Future<String> Function() returning an HS256 JWT. Called on first connect AND every reconnect. |
url | String? | wss://rms.metered.ca | Validated at construction — wss: (or ws: for localhost), no path/query/fragment/userinfo. |
logger | Logger? | NoopLogger | ConsoleLogger() while debugging. |
autoReconnect | bool | true | Master switch. |
reconnect | ReconnectOptions | const ReconnectOptions() | Backoff knobs — see below. |
inactivityTimeoutMs | int | 60000 | If no frame arrives in the window, close-and-reconnect. 0 disables it + the keepalive ping. |
tokenProviderTimeoutMs | int | 10000 | Cap on tokenProvider() resolution. Must be > 0. |
autoResubscribe | bool | true | After reconnect, re-subscribe to every channel active before the drop. |
webSocketFactory | WebSocketFactory? | web_socket_channel adapter | Test injection. |
ReconnectOptions
const ReconnectOptions({
int initialDelayMs = 500,
double multiplier = 2,
int maxDelayMs = 30000,
double jitterRatio = 0.2, // ±20%
int maxAttempts = 100, // a very large number ≈ retry forever
});
Backoff: min(initialDelayMs * multiplier^(attempt-1), maxDelayMs) ± jitterRatio. Each successful welcome resets the attempt counter. The constructor asserts sane values (multiplier >= 1, non-negative delays, jitterRatio in [0,1]). autoReconnect: false disables reconnection entirely (rather than passing a sentinel).
Close-code-aware behaviour — the SDK doesn't blindly retry:
| Close code | Behaviour |
|---|---|
| 1001, 1006, 1008, 1009, 4000, 4002, 4011 | Normal backoff |
4001 (invalidToken), 4003 (channelNotAuthorized), 4012 (accountSuspended), 4020 (adminDisconnect) | No retry — caller action required |
4010 (overConcurrentLimit) | Forced ≥30 s backoff floor |
See Errors & Codes.
For a daemon / agent / IoT process, raise the budget:
SignallingClient(SignallingClientOptions(
tokenProvider: mint,
reconnect: const ReconnectOptions(maxAttempts: 1 << 30, maxDelayMs: 60000),
));
Methods
connect() → Future<void>
Opens the WebSocket, sends the auth handshake, waits for the server welcome.
await client.connect();
print(client.state); // SignallingClientState.connected
Rejects with SignallingConnectError if the socket closes before the welcome (invalid token, quota rejection). Branch on closeCode:
try {
await client.connect();
} on SignallingConnectError catch (e) {
if (e.closeCode == WsCloseCode.invalidToken) showLoginUi();
else if (e.closeCode == WsCloseCode.overConcurrentLimit) showQuotaUi();
else rethrow;
}
close([code, reason]) → Future<void>
Closes the WebSocket. Terminal — no auto-reconnect after this. Defaults: code = 1000, reason = 'client requested close'.
await client.close();
dispose() → Future<void>
Closes the connection (if open) and releases the event-stream controllers. Call this from your widget's dispose() so the streams and their listeners don't leak. After dispose() the client is terminal — its streams are done and connect() throws.
@override
void dispose() {
client.dispose();
super.dispose();
}
Prefer dispose() over close() for owner-teardown; use close() when you intend to keep the object but end the connection.
subscribe(channel, [opts]) → Future<void>
Subscribes to channel. Completes on server ack.
await client.subscribe('alerts/critical');
await client.subscribe('rooms/lobby', const SubscribeOptions(includeSenderMetadata: true));
SubscribeOptions:
| Field | Type | Default | Notes |
|---|---|---|---|
includeSenderMetadata | bool | false | If true, onMessage events on this channel carry the sender's peerMetadata in fromMetadata. |
Tracked for autoResubscribe — re-subscribed automatically on reconnect. Rejects with SignallingServerError (code of ErrorCode.channelNotAuthorized, channelReserved, or channelLimitExceeded).
unsubscribe(channel) → Future<void>
Idempotent. Removes the channel from the autoResubscribe set.
publish(channel, data) → Future<void>
Broadcasts data (JSON-serialisable) to every subscriber on channel.
await client.publish('alerts/critical', {'sensor': 'T-12', 'value': 92});
Rejects with SignallingServerError(ErrorCode.overMessageQuota) if the period quota is exhausted (the connection stays open). There's no client-side size gate at this layer — an oversized payload results in a server-side 1009 (messageTooBig) close. Use MeteredPeer.send / sendTo if you want a client-side MeteredPeerOversizedError guard before the frame leaves.
send(toPeerId, data) → Future<void>
Direct point-to-point message, server-routed (no P2P).
await client.send('peer-xyz', {'type': 'challenge', 'nonce': 'abc'});
Rejects with SignallingServerError(ErrorCode.peerNotFound) if the target peer isn't online.
MeteredPeerSignallingClient keeps send(toPeerId, data) (no sendTo rename) and surfaces inbound directs on onDirect (a DirectMessageEvent with .from) — both wire-faithful to the JSON send / direct frames. MeteredPeer earns the more verbose sendTo / senderPeerId because it merges sources; this lower-level class stays close to the wire.
Read-only state
client.state // SignallingClientState
enum SignallingClientState { idle, connecting, connected, reconnecting, closed }
connect() close()
idle ──────────► connecting ──► connected ──────────────► closed
│ ▲
transient drop │ │ reconnect succeeded
▼ │
reconnecting
│ terminal close OR maxAttempts
▼
closed
Streams
| Stream | Emits | When |
|---|---|---|
onConnected | ConnectedEvent | After every successful welcome — initial + reconnects. Use isReconnect to skip "connected!" toasts. |
onDisconnected | DisconnectedEvent { code, reason, willReconnect } | Every WS close. willReconnect tells you if the SDK is retrying. |
stateChanges | StateChange<SignallingClientState> | Every state transition. |
onMessage | ChannelMessage { channel, from, fromMetadata?, data } | A peer published to a channel you're subscribed to. |
onDirect | DirectMessageEvent { from, fromMetadata?, data } | Someone called send(yourPeerId, data). |
onPresence | PresenceEvent { channel, joined, left } | Roster change on a subscribed channel. joined / left are List<PresencePeer> { peerId, metadata? }. |
onServerError | ServerErrorEvent { code, message?, requestId? } | Server-emitted error not correlated to a pending request (correlated ones reject the related Future instead). |
onGoingAway | GoingAwayEvent { retryAfterMs } | Server shutting down for deploy. Informational — the SDK uses normal backoff, not retryAfterMs. |
onTokenProviderError | TokenProviderError { consecutiveFailures, error } | Your tokenProvider() has failed N (default 3) times in a row. Informational — the SDK keeps retrying. |
ConnectedEvent
client.onConnected.listen((e) {
// e.peerId server-assigned UUID, or your JWT's `sub` claim
// e.serverTime Unix seconds the server stamped the welcome (clock skew)
// e.expiresAt int? — JWT `exp`; null for pk_ keys (no expiry)
// e.isReconnect false on first connect, true on reconnects
// e.maxMessageSize server-advertised payload cap; the SDK checks against this
// e.iceServers List<IceServerConfig>? — TURN creds (JWT metadata.iceServers); null for pk_ keys
});
onDisconnected — what to do
client.onDisconnected.listen((e) {
if (e.willReconnect) {
// SDK will retry — show a "reconnecting…" banner.
} else {
// Terminal: a caller-action close code (4001/4003/4012/4020) or maxAttempts exhausted.
// Show "disconnected" + a retry button.
}
});
onPresence — diffing the roster
client.onPresence.listen((e) {
for (final p in e.joined) addToRoster(p.peerId, p.metadata);
for (final p in e.left) removeFromRoster(p.peerId);
});
The first presence after a subscribe is the authoritative roster snapshot of everyone already in the channel — always sent, even when the channel is empty (joined: []). Subsequent events are deltas. For names/avatars, mint peerMetadata: { username, avatarUrl } in your JWT — that's what shows up in p.metadata. See Presence & Chat.
Common pitfalls
- Publishing before
connect()completes → the request rejects (e.g.SignallingDisconnectedError); the frame never landed.await connect()first, or queue behindonConnected. - Fire-and-forgetting
subscribe()then immediately publishing →subscriberesolves on ack. Await it before relying on the subscription being live. autoResubscribe: false+ forgetting to re-subscribe → after the first reconnect the SDK has no record of your subscribes and you silently miss messages. Leave ittrueunless you have a specific scoped-subscription pattern.- Treating
onServerErroras fatal → most are per-request and the connection stays open. Don't tear down on them. - Leaking on
dispose→ in a widget, callclient.dispose(), notclose(), so the stream controllers are released.
See also
MeteredPeer— the higher-level class with WebRTC + channel-driven discovery- Errors & Codes — every error / close code
- Reconnect Best Practices
- Authentication —
apiKeyvstokenProvider, JWT claims,peerMetadata