Skip to main content

SignallingClient

The lower-level class. Pub/sub + directed messages over WebSocket, ack-correlated, auto-reconnecting. No WebRTC, no per-peer state, no channel-driven discovery.

Use this when:

  • You're building chat, presence, IoT telemetry, AI agent message bus, collaborative cursors, multiplayer lobbies, etc. — anything that's pub/sub at heart.
  • You want to subscribe to multiple channels from one connection (MeteredPeer is tied to one channel).
  • You want to use the SDK as the signalling layer for a custom WebRTC stack that doesn't fit MeteredPeer's 1:N channel model.

If you're building WebRTC and the channel + peer model fits (MeteredPeer), use that instead — it wraps SignallingClient and handles the per-peer plumbing.

SignallingClient is fully asyncio-based: every network method is a coroutine you await, and events are typed dataclasses dispatched to handlers you register with client.on(...).

Construct

from metered_realtime import SignallingClient

client = SignallingClient(api_key="pk_live_…")

# — OR —

async def mint_token() -> str:
return await fetch_jwt_from_your_backend()

client = SignallingClient(token_provider=mint_token)

Exactly one of api_key or token_provider. Passing both, or neither, raises ValueError at construction. The constructor doesn't connect — call await connect() (or use async with).

token_provider is any zero-argument callable returning a str token (or an awaitable of one) — a plain async def is the common form. It's called on the first connect AND on every reconnect.

Options

Pass options as keyword arguments, or build a SignallingClientOptions and pass it positionally — both are equivalent.

from metered_realtime import SignallingClient, SignallingClientOptions, ReconnectOptions

client = SignallingClient(SignallingClientOptions(
api_key="pk_live_…",
reconnect=ReconnectOptions(max_attempts=float("inf")),
inactivity_timeout=30.0,
))
OptionTypeDefaultNotes
api_keystr \| NoneNonepk_live_…. Mutually exclusive with token_provider.
token_providerTokenProviderFn \| NoneNoneAsync zero-arg callable returning an HS256 JWT (Callable[[], Awaitable[str]]). Called on first connect AND every reconnect.
urlstr \| None"wss://rms.metered.ca"Validated at construction — must be wss://… (or ws://localhost for dev).
token_provider_timeoutfloat10.0Seconds. Cap on token_provider() resolution. Defends against a hanging mint endpoint.
loggerLogger \| NoneNoopLogger()Use StdlibLogger() to route logs to the stdlib logging module while debugging.
reconnectReconnectOptions \| boolTrueFalse disables reconnect entirely. See below + Reconnect Best Practices.
inactivity_timeoutfloat60.0Seconds. If no frame arrives in the window, the SDK closes-and-reconnects (close code 4000).
auto_resubscribeboolTrueAfter reconnect, re-subscribe to every channel that was active before the drop.

All durations are float seconds (the asyncio idiom) — note this differs from the JavaScript SDK, which uses milliseconds.

reconnect options

from metered_realtime import ReconnectOptions

ReconnectOptions(
initial_delay=0.5, # seconds
multiplier=2.0,
max_delay=30.0, # seconds
jitter_ratio=0.2, # ±20%
max_attempts=100, # float("inf") for daemon-style
)
FieldTypeDefault
initial_delayfloat0.5
multiplierfloat2.0
max_delayfloat30.0
jitter_ratiofloat0.2
max_attemptsfloat100

Backoff formula: min(initial_delay * multiplier ** (attempt - 1), max_delay) ± jitter_ratio. Each successful welcome resets the attempt counter.

Validated at construction: ReconnectOptions.__post_init__ raises ValueError for knobs that would produce a zero-delay reconnect loop — initial_delay <= 0, multiplier < 1, max_delay < initial_delay, jitter_ratio outside [0, 1], or a negative max_attempts. max_attempts=float("inf") is allowed.

Close-code-aware behaviour — the SDK doesn't blindly retry on every close:

Close codeBehaviour
1001, 1006, 1008, 1009, 4000 (ClientInactivity), 4002 (TokenExpired), 4011 (OverMessageRate)Normal backoff
4001 (InvalidToken), 4003 (ChannelNotAuthorized), 4012 (AccountSuspended), 4020 (AdminDisconnect)No retry — caller action required
4010 (OverConcurrentLimit)Forced ≥30 s backoff floor — retrying every 500 ms against a plan-cap rejection just hammers the server

See Errors & Codes for what each code means + what to do.

After max_attempts consecutive failures, the client transitions to "closed" and emits a terminal Disconnected event. Call connect() again manually if you want to keep trying.

Methods

await connect()None

Opens the WebSocket, sends the auth handshake, waits for the server welcome.

await client.connect()
print(client.state) # "connected"

Calling connect() from any state other than "idle" or "closed" raises SignallingError — there is one supervisor at a time.

Raises SignallingConnectError if the WS closes before welcome (invalid token, quota rejection). Branch on close_code:

from metered_realtime import SignallingConnectError, WsCloseCode

try:
await client.connect()
except SignallingConnectError as e:
if e.close_code == WsCloseCode.INVALID_TOKEN:
show_login_ui()
elif e.close_code == WsCloseCode.OVER_CONCURRENT_LIMIT:
show_quota_ui()
else:
raise

await close(code=1000, reason="client requested close")None

Closes the WebSocket. Terminal — no auto-reconnect after this. Cancels the supervisor and all background tasks, fails any in-flight request, drains pending event handlers, then drops all listeners.

await client.close()

Idempotent — calling it when already "closed" returns immediately. It resolves after the underlying socket close (or after a 1 s flush timeout). await-ing it before process exit lets the TCP FIN flush so the server records a clean disconnect.

await subscribe(channel, opts=None)None

Subscribes to channel. Resolves on server ack.

from metered_realtime import SubscribeOptions

await client.subscribe("alerts/critical")
await client.subscribe("rooms/lobby", SubscribeOptions(include_sender_metadata=True))
SubscribeOptions fieldTypeDefaultNotes
include_sender_metadataboolFalseIf true, broadcast Message events on this channel carry the sender's token-asserted metadata in sender_metadata. Off by default — metadata can be large and most subscribers don't need per-message sender identity.

Tracked for auto_resubscribe. The channel is recorded before the frame goes on the wire (so a drop between send and ack can't lose it), and re-subscribed automatically when the WS reconnects.

Raises ServerRequestError with code == "channel_not_authorized" (the JWT's channels claim doesn't allow it), "channel_reserved" (the channel starts with _metered/, _internal/, or _system/), or "channel_limit_exceeded" (you're at the per-connection channel cap).

await unsubscribe(channel)None

Removes the channel from the auto_resubscribe set, then sends the unsubscribe frame and waits for ack.

await publish(channel, data)None

Broadcasts data (any JSON-serializable value) to every subscriber on channel. Resolves on server ack.

await client.publish("alerts/critical", {"sensor": "T-12", "value": 92})

Raises ServerRequestError with code == "over_message_quota" (period quota exhausted; the connection stays open) or "action_not_permitted" (the key's permissions don't grant publish). An over-cap payload is rejected server-side and surfaces as a 1009 (MessageTooBig) close rather than a per-request error.

await send(to_peer_id, data)None

Direct point-to-point message. Routes via the server (no P2P). Resolves on server ack.

await client.send("peer-xyz", {"type": "challenge", "nonce": "abc"})

Raises ServerRequestError with code == "peer_not_found" if the target peer isn't online (no current connection on this app).

client.on(EventType, handler) · client.off(EventType, handler) · client.once(EventType, handler)

Register / remove / one-shot event handlers, keyed by event type (not a string name). on and once also work as decorators.

from metered_realtime import Connected

@client.on(Connected)
def _(ev: Connected) -> None:
print("connected as", ev.peer_id)

# or imperatively
client.on(Connected, lambda ev: print(ev.peer_id))

Handlers may be sync or async. Sync handlers run inline; async handlers are scheduled as tracked tasks so a slow handler can't stall emission. A handler that raises is isolated and logged — it never propagates to the emitter or to sibling handlers.

client.events(EventType, *, maxsize=4096) → async iterator

An alternative to callbacks: an async-iterable stream of one event type, usable as an async with context manager that unsubscribes on exit. The stream is bounded (default 4096), dropping the oldest event when full; pass maxsize=0 for unbounded.

from metered_realtime import Message

async with client.events(Message) as stream:
async for ev in stream:
handle(ev.channel, ev.data)

async with support

SignallingClient is an async context manager: __aenter__ calls connect(), __aexit__ calls close().

async with SignallingClient(api_key="pk_live_…") as client:
await client.subscribe("rooms/lobby")
await client.publish("rooms/lobby", {"hi": "there"})
# socket is closed cleanly here

Read-only state

client.state  # "idle" | "connecting" | "connected" | "reconnecting" | "closed"

State transitions

                connect()                                close()
idle ─────────────────► connecting ──► connected ──────────────► closed
│ ▲
transient drop │ │ reconnect succeeded
▼ │
reconnecting

│ terminal close OR max_attempts

closed

Events

All events are frozen dataclasses subclassing Event. Register with client.on(EventType).

EventFieldsWhen it fires
Connectedpeer_id, server_time, expires_at, is_reconnect, max_message_size, ice_serversAfter every successful welcome — initial + reconnects. Use is_reconnect to skip "connected!" toasts on reconnects.
Disconnectedcode, reason, will_reconnectEvery WS close (after the first successful connect). will_reconnect tells you if the SDK is going to retry.
StateChangefrom_, toEvery state transition. (from is a Python keyword, so the field is from_.)
Messagechannel, sender_peer_id, data, sender_metadataA peer published to a channel you're subscribed to.
Directsender_peer_id, data, sender_metadataSomeone called send(your_peer_id, data).
Presencechannel, joined, leftRoster change on a subscribed channel. joined / left are tuples of PresencePeer.
ServerErrorcode, message, request_idServer-emitted error not tied to a pending request. Per-request rejections are raised as ServerRequestError from the awaited call instead — they do not surface here.
GoingAwayretry_after_msServer is shutting down for deploy. Informational — the SDK uses its normal reconnect backoff (it does not delay by retry_after_ms automatically). To honor the hint, listen for this event yourself and gate your own delay.
TokenProviderErrorconsecutive_failures, errYour token_provider() has failed 3 times in a row. Informational — the SDK keeps retrying. Surface a "please log in again" prompt if appropriate.

Connected — what's in the fields

@client.on(Connected)
def _(ev: Connected) -> None:
ev.peer_id # server-assigned UUID, or your JWT's `sub` claim
ev.server_time # Unix seconds when the server stamped the welcome (clock skew)
ev.expires_at # JWT's `exp` — Unix seconds, or None for pk_ keys (no expiry)
ev.is_reconnect # False on first connect, True on every reconnect — useful for UI
ev.max_message_size # server-advertised payload cap (bytes)
ev.ice_servers # tuple of IceServerConfig from the JWT's metadata.iceServers
# (TURN creds), or None for pk_ keys

Disconnected — what to do

@client.on(Disconnected)
def _(ev: Disconnected) -> None:
if ev.will_reconnect:
... # SDK will retry. Show a "reconnecting…" banner.
else:
... # Terminal. Either a caller-action close code (4001 / 4003 / 4012 / 4020)
# or max_attempts exhausted. Show a "disconnected" error + retry button.

will_reconnect == False means the SDK gave up retrying. Map close codes to UX in Errors & Codes.

Disconnected fires only for closes after the first successful connect. A failure on the very first connect() surfaces as the SignallingConnectError raised from connect() instead.

Presence — diffing the roster

@client.on(Presence)
def _(ev: Presence) -> None:
for p in ev.joined:
add_to_roster(p.peer_id, p.metadata)
for p in ev.left:
remove_from_roster(p.peer_id)

joined and left are tuples of PresencePeer, each with peer_id: str and metadata: dict | None. The first Presence event after a subscribe lists everyone already in the channel — use it to populate your initial roster. It is always sent, even when the channel is empty (joined == ()), so you can treat it as the authoritative roster snapshot at subscribe time. Subsequent events are deltas.

If you're showing a "who's here" list with avatars / names, your JWT should mint per-peer metadata (e.g. {"username": ..., "avatarUrl": ...}) — that's what shows up in p.metadata. See Authentication for minting peer_metadata.

Common pitfalls

  1. Calling publish / send / subscribe before connect() resolves. Each raises SignallingError synchronously — the frame never left the client. (Contrast with DisconnectedError: the frame was sent but the connection dropped before the server confirmed it.) Either await connect() first or queue calls behind the Connected event.

  2. Assuming state == "connected" means the latest subscribe is active. subscribe() resolves on ack — await that coroutine, don't fire-and-forget then immediately publish to the same channel from another task expecting your subscription to be live.

  3. auto_resubscribe=False + forgetting to re-subscribe. A common bug is to subscribe once at startup. After the first reconnect, the SDK has no record of your subscribes (because you turned off tracking) and you silently miss every message. Leave auto_resubscribe=True unless you have a specific scoped-subscription pattern.

  4. Treating a ServerError event as fatal. ServerError is for server-emitted errors not correlated to a pending request — it doesn't kill the connection, and per-request failures don't even arrive here (they're raised as ServerRequestError from the awaited call). The SDK won't auto-disconnect on a ServerError.

  5. Trusting data contents vs the envelope. sender_peer_id (on Message / Direct) is server-stamped and trustworthy; anything inside data is whatever the sender chose to put there. sender_metadata is token-asserted but still untrusted peer input — validate it.

See also