Skip to main content

SignallingClient

The lower-level class. Pub/sub + directed messages over WebSocket, ack-correlated, auto-reconnecting. No WebRTC, no per-peer state, no channel-driven discovery.

Use this for chat, presence, IoT telemetry, AI agent message bus, collaborative cursors, multiplayer lobbies — anything pub/sub at heart — or when you want to subscribe to multiple channels from one connection (MeteredPeer is tied to one channel). If you're building WebRTC and the channel + peer model fits, use MeteredPeer; it wraps this class.

Construct

import 'package:metered_realtime/metered_realtime.dart';

final client = SignallingClient(SignallingClientOptions(
apiKey: 'pk_live_…',
// — OR —
tokenProvider: () async => fetchJwtFromYourBackend(),
));

Provide exactly one of apiKey or tokenProvider. The constructor doesn't connect — call connect().

SignallingClientOptions

OptionTypeDefaultNotes
apiKeyString?pk_live_…. Mutually exclusive with tokenProvider.
tokenProviderTokenProvider?Future<String> Function() returning an HS256 JWT. Called on first connect AND every reconnect.
urlString?wss://rms.metered.caValidated at construction — wss: (or ws: for localhost), no path/query/fragment/userinfo.
loggerLogger?NoopLoggerConsoleLogger() while debugging.
autoReconnectbooltrueMaster switch.
reconnectReconnectOptionsconst ReconnectOptions()Backoff knobs — see below.
inactivityTimeoutMsint60000If no frame arrives in the window, close-and-reconnect. 0 disables it + the keepalive ping.
tokenProviderTimeoutMsint10000Cap on tokenProvider() resolution. Must be > 0.
autoResubscribebooltrueAfter reconnect, re-subscribe to every channel active before the drop.
webSocketFactoryWebSocketFactory?web_socket_channel adapterTest injection.

ReconnectOptions

const ReconnectOptions({
int initialDelayMs = 500,
double multiplier = 2,
int maxDelayMs = 30000,
double jitterRatio = 0.2, // ±20%
int maxAttempts = 100, // a very large number ≈ retry forever
});

Backoff: min(initialDelayMs * multiplier^(attempt-1), maxDelayMs) ± jitterRatio. Each successful welcome resets the attempt counter. The constructor asserts sane values (multiplier >= 1, non-negative delays, jitterRatio in [0,1]). autoReconnect: false disables reconnection entirely (rather than passing a sentinel).

Close-code-aware behaviour — the SDK doesn't blindly retry:

Close codeBehaviour
1001, 1006, 1008, 1009, 4000, 4002, 4011Normal backoff
4001 (invalidToken), 4003 (channelNotAuthorized), 4012 (accountSuspended), 4020 (adminDisconnect)No retry — caller action required
4010 (overConcurrentLimit)Forced ≥30 s backoff floor

See Errors & Codes.

For a daemon / agent / IoT process, raise the budget:

SignallingClient(SignallingClientOptions(
tokenProvider: mint,
reconnect: const ReconnectOptions(maxAttempts: 1 << 30, maxDelayMs: 60000),
));

Methods

connect()Future<void>

Opens the WebSocket, sends the auth handshake, waits for the server welcome.

await client.connect();
print(client.state); // SignallingClientState.connected

Rejects with SignallingConnectError if the socket closes before the welcome (invalid token, quota rejection). Branch on closeCode:

try {
await client.connect();
} on SignallingConnectError catch (e) {
if (e.closeCode == WsCloseCode.invalidToken) showLoginUi();
else if (e.closeCode == WsCloseCode.overConcurrentLimit) showQuotaUi();
else rethrow;
}

close([code, reason])Future<void>

Closes the WebSocket. Terminal — no auto-reconnect after this. Defaults: code = 1000, reason = 'client requested close'.

await client.close();

dispose()Future<void>

Closes the connection (if open) and releases the event-stream controllers. Call this from your widget's dispose() so the streams and their listeners don't leak. After dispose() the client is terminal — its streams are done and connect() throws.

@override
void dispose() {
client.dispose();
super.dispose();
}

Prefer dispose() over close() for owner-teardown; use close() when you intend to keep the object but end the connection.

subscribe(channel, [opts])Future<void>

Subscribes to channel. Completes on server ack.

await client.subscribe('alerts/critical');
await client.subscribe('rooms/lobby', const SubscribeOptions(includeSenderMetadata: true));

SubscribeOptions:

FieldTypeDefaultNotes
includeSenderMetadataboolfalseIf true, onMessage events on this channel carry the sender's peerMetadata in fromMetadata.

Tracked for autoResubscribe — re-subscribed automatically on reconnect. Rejects with SignallingServerError (code of ErrorCode.channelNotAuthorized, channelReserved, or channelLimitExceeded).

unsubscribe(channel)Future<void>

Idempotent. Removes the channel from the autoResubscribe set.

publish(channel, data)Future<void>

Broadcasts data (JSON-serialisable) to every subscriber on channel.

await client.publish('alerts/critical', {'sensor': 'T-12', 'value': 92});

Rejects with SignallingServerError(ErrorCode.overMessageQuota) if the period quota is exhausted (the connection stays open). There's no client-side size gate at this layer — an oversized payload results in a server-side 1009 (messageTooBig) close. Use MeteredPeer.send / sendTo if you want a client-side MeteredPeerOversizedError guard before the frame leaves.

send(toPeerId, data)Future<void>

Direct point-to-point message, server-routed (no P2P).

await client.send('peer-xyz', {'type': 'challenge', 'nonce': 'abc'});

Rejects with SignallingServerError(ErrorCode.peerNotFound) if the target peer isn't online.

Naming vs MeteredPeer

SignallingClient keeps send(toPeerId, data) (no sendTo rename) and surfaces inbound directs on onDirect (a DirectMessageEvent with .from) — both wire-faithful to the JSON send / direct frames. MeteredPeer earns the more verbose sendTo / senderPeerId because it merges sources; this lower-level class stays close to the wire.

Read-only state

client.state // SignallingClientState
enum SignallingClientState { idle, connecting, connected, reconnecting, closed }
       connect()                                close()
idle ──────────► connecting ──► connected ──────────────► closed
│ ▲
transient drop │ │ reconnect succeeded
▼ │
reconnecting
│ terminal close OR maxAttempts

closed

Streams

StreamEmitsWhen
onConnectedConnectedEventAfter every successful welcome — initial + reconnects. Use isReconnect to skip "connected!" toasts.
onDisconnectedDisconnectedEvent { code, reason, willReconnect }Every WS close. willReconnect tells you if the SDK is retrying.
stateChangesStateChange<SignallingClientState>Every state transition.
onMessageChannelMessage { channel, from, fromMetadata?, data }A peer published to a channel you're subscribed to.
onDirectDirectMessageEvent { from, fromMetadata?, data }Someone called send(yourPeerId, data).
onPresencePresenceEvent { channel, joined, left }Roster change on a subscribed channel. joined / left are List<PresencePeer> { peerId, metadata? }.
onServerErrorServerErrorEvent { code, message?, requestId? }Server-emitted error not correlated to a pending request (correlated ones reject the related Future instead).
onGoingAwayGoingAwayEvent { retryAfterMs }Server shutting down for deploy. Informational — the SDK uses normal backoff, not retryAfterMs.
onTokenProviderErrorTokenProviderError { consecutiveFailures, error }Your tokenProvider() has failed N (default 3) times in a row. Informational — the SDK keeps retrying.

ConnectedEvent

client.onConnected.listen((e) {
// e.peerId server-assigned UUID, or your JWT's `sub` claim
// e.serverTime Unix seconds the server stamped the welcome (clock skew)
// e.expiresAt int? — JWT `exp`; null for pk_ keys (no expiry)
// e.isReconnect false on first connect, true on reconnects
// e.maxMessageSize server-advertised payload cap; the SDK checks against this
// e.iceServers List<IceServerConfig>? — TURN creds (JWT metadata.iceServers); null for pk_ keys
});

onDisconnected — what to do

client.onDisconnected.listen((e) {
if (e.willReconnect) {
// SDK will retry — show a "reconnecting…" banner.
} else {
// Terminal: a caller-action close code (4001/4003/4012/4020) or maxAttempts exhausted.
// Show "disconnected" + a retry button.
}
});

onPresence — diffing the roster

client.onPresence.listen((e) {
for (final p in e.joined) addToRoster(p.peerId, p.metadata);
for (final p in e.left) removeFromRoster(p.peerId);
});

The first presence after a subscribe is the authoritative roster snapshot of everyone already in the channel — always sent, even when the channel is empty (joined: []). Subsequent events are deltas. For names/avatars, mint peerMetadata: { username, avatarUrl } in your JWT — that's what shows up in p.metadata. See Presence & Chat.

Common pitfalls

  1. Publishing before connect() completes → the request rejects (e.g. SignallingDisconnectedError); the frame never landed. await connect() first, or queue behind onConnected.
  2. Fire-and-forgetting subscribe() then immediately publishingsubscribe resolves on ack. Await it before relying on the subscription being live.
  3. autoResubscribe: false + forgetting to re-subscribe → after the first reconnect the SDK has no record of your subscribes and you silently miss messages. Leave it true unless you have a specific scoped-subscription pattern.
  4. Treating onServerError as fatal → most are per-request and the connection stays open. Don't tear down on them.
  5. Leaking on dispose → in a widget, call client.dispose(), not close(), so the stream controllers are released.

See also