SignallingClient
The lower-level class. Pub/sub + directed messages over WebSocket, ack-correlated, auto-reconnecting. No WebRTC, no per-peer state, no channel-driven discovery.
Use this when:
- You're building chat, presence, IoT telemetry, AI agent message bus, collaborative cursors, multiplayer lobbies, etc. — anything that's pub/sub at heart.
- You want to subscribe to multiple channels from one connection (
MeteredPeeris tied to one channel). - You want to use the SDK as the signalling layer for a custom WebRTC stack that doesn't fit
MeteredPeer's 1:N channel model.
If you're building WebRTC and the channel + peer model fits (MeteredPeer), use that instead — it wraps SignallingClient and handles the per-peer plumbing.
SignallingClient is fully asyncio-based: every network method is a coroutine you await, and events are typed dataclasses dispatched to handlers you register with client.on(...).
Construct
from metered_realtime import SignallingClient
client = SignallingClient(api_key="pk_live_…")
# — OR —
async def mint_token() -> str:
return await fetch_jwt_from_your_backend()
client = SignallingClient(token_provider=mint_token)
Exactly one of api_key or token_provider. Passing both, or neither, raises ValueError at construction. The constructor doesn't connect — call await connect() (or use async with).
token_provider is any zero-argument callable returning a str token (or an awaitable of one) — a plain async def is the common form. It's called on the first connect AND on every reconnect.
Options
Pass options as keyword arguments, or build a SignallingClientOptions and pass it positionally — both are equivalent.
from metered_realtime import SignallingClient, SignallingClientOptions, ReconnectOptions
client = SignallingClient(SignallingClientOptions(
api_key="pk_live_…",
reconnect=ReconnectOptions(max_attempts=float("inf")),
inactivity_timeout=30.0,
))
| Option | Type | Default | Notes |
|---|---|---|---|
api_key | str \| None | None | pk_live_…. Mutually exclusive with token_provider. |
token_provider | TokenProviderFn \| None | None | Async zero-arg callable returning an HS256 JWT (Callable[[], Awaitable[str]]). Called on first connect AND every reconnect. |
url | str \| None | "wss://rms.metered.ca" | Validated at construction — must be wss://… (or ws://localhost for dev). |
token_provider_timeout | float | 10.0 | Seconds. Cap on token_provider() resolution. Defends against a hanging mint endpoint. |
logger | Logger \| None | NoopLogger() | Use StdlibLogger() to route logs to the stdlib logging module while debugging. |
reconnect | ReconnectOptions \| bool | True | False disables reconnect entirely. See below + Reconnect Best Practices. |
inactivity_timeout | float | 60.0 | Seconds. If no frame arrives in the window, the SDK closes-and-reconnects (close code 4000). |
auto_resubscribe | bool | True | After reconnect, re-subscribe to every channel that was active before the drop. |
All durations are float seconds (the asyncio idiom) — note this differs from the JavaScript SDK, which uses milliseconds.
reconnect options
from metered_realtime import ReconnectOptions
ReconnectOptions(
initial_delay=0.5, # seconds
multiplier=2.0,
max_delay=30.0, # seconds
jitter_ratio=0.2, # ±20%
max_attempts=100, # float("inf") for daemon-style
)
| Field | Type | Default |
|---|---|---|
initial_delay | float | 0.5 |
multiplier | float | 2.0 |
max_delay | float | 30.0 |
jitter_ratio | float | 0.2 |
max_attempts | float | 100 |
Backoff formula: min(initial_delay * multiplier ** (attempt - 1), max_delay) ± jitter_ratio. Each successful welcome resets the attempt counter.
Validated at construction: ReconnectOptions.__post_init__ raises ValueError for knobs that would produce a zero-delay reconnect loop — initial_delay <= 0, multiplier < 1, max_delay < initial_delay, jitter_ratio outside [0, 1], or a negative max_attempts. max_attempts=float("inf") is allowed.
Close-code-aware behaviour — the SDK doesn't blindly retry on every close:
| Close code | Behaviour |
|---|---|
| 1001, 1006, 1008, 1009, 4000 (ClientInactivity), 4002 (TokenExpired), 4011 (OverMessageRate) | Normal backoff |
| 4001 (InvalidToken), 4003 (ChannelNotAuthorized), 4012 (AccountSuspended), 4020 (AdminDisconnect) | No retry — caller action required |
| 4010 (OverConcurrentLimit) | Forced ≥30 s backoff floor — retrying every 500 ms against a plan-cap rejection just hammers the server |
See Errors & Codes for what each code means + what to do.
After max_attempts consecutive failures, the client transitions to "closed" and emits a terminal Disconnected event. Call connect() again manually if you want to keep trying.
Methods
await connect() → None
Opens the WebSocket, sends the auth handshake, waits for the server welcome.
await client.connect()
print(client.state) # "connected"
Calling connect() from any state other than "idle" or "closed" raises SignallingError — there is one supervisor at a time.
Raises SignallingConnectError if the WS closes before welcome (invalid token, quota rejection). Branch on close_code:
from metered_realtime import SignallingConnectError, WsCloseCode
try:
await client.connect()
except SignallingConnectError as e:
if e.close_code == WsCloseCode.INVALID_TOKEN:
show_login_ui()
elif e.close_code == WsCloseCode.OVER_CONCURRENT_LIMIT:
show_quota_ui()
else:
raise
await close(code=1000, reason="client requested close") → None
Closes the WebSocket. Terminal — no auto-reconnect after this. Cancels the supervisor and all background tasks, fails any in-flight request, drains pending event handlers, then drops all listeners.
await client.close()
Idempotent — calling it when already "closed" returns immediately. It resolves after the underlying socket close (or after a 1 s flush timeout). await-ing it before process exit lets the TCP FIN flush so the server records a clean disconnect.
await subscribe(channel, opts=None) → None
Subscribes to channel. Resolves on server ack.
from metered_realtime import SubscribeOptions
await client.subscribe("alerts/critical")
await client.subscribe("rooms/lobby", SubscribeOptions(include_sender_metadata=True))
SubscribeOptions field | Type | Default | Notes |
|---|---|---|---|
include_sender_metadata | bool | False | If true, broadcast Message events on this channel carry the sender's token-asserted metadata in sender_metadata. Off by default — metadata can be large and most subscribers don't need per-message sender identity. |
Tracked for auto_resubscribe. The channel is recorded before the frame goes on the wire (so a drop between send and ack can't lose it), and re-subscribed automatically when the WS reconnects.
Raises ServerRequestError with code == "channel_not_authorized" (the JWT's channels claim doesn't allow it), "channel_reserved" (the channel starts with _metered/, _internal/, or _system/), or "channel_limit_exceeded" (you're at the per-connection channel cap).
await unsubscribe(channel) → None
Removes the channel from the auto_resubscribe set, then sends the unsubscribe frame and waits for ack.
await publish(channel, data) → None
Broadcasts data (any JSON-serializable value) to every subscriber on channel. Resolves on server ack.
await client.publish("alerts/critical", {"sensor": "T-12", "value": 92})
Raises ServerRequestError with code == "over_message_quota" (period quota exhausted; the connection stays open) or "action_not_permitted" (the key's permissions don't grant publish). An over-cap payload is rejected server-side and surfaces as a 1009 (MessageTooBig) close rather than a per-request error.
await send(to_peer_id, data) → None
Direct point-to-point message. Routes via the server (no P2P). Resolves on server ack.
await client.send("peer-xyz", {"type": "challenge", "nonce": "abc"})
Raises ServerRequestError with code == "peer_not_found" if the target peer isn't online (no current connection on this app).
client.on(EventType, handler) · client.off(EventType, handler) · client.once(EventType, handler)
Register / remove / one-shot event handlers, keyed by event type (not a string name). on and once also work as decorators.
from metered_realtime import Connected
@client.on(Connected)
def _(ev: Connected) -> None:
print("connected as", ev.peer_id)
# or imperatively
client.on(Connected, lambda ev: print(ev.peer_id))
Handlers may be sync or async. Sync handlers run inline; async handlers are scheduled as tracked tasks so a slow handler can't stall emission. A handler that raises is isolated and logged — it never propagates to the emitter or to sibling handlers.
client.events(EventType, *, maxsize=4096) → async iterator
An alternative to callbacks: an async-iterable stream of one event type, usable as an async with context manager that unsubscribes on exit. The stream is bounded (default 4096), dropping the oldest event when full; pass maxsize=0 for unbounded.
from metered_realtime import Message
async with client.events(Message) as stream:
async for ev in stream:
handle(ev.channel, ev.data)
async with support
SignallingClient is an async context manager: __aenter__ calls connect(), __aexit__ calls close().
async with SignallingClient(api_key="pk_live_…") as client:
await client.subscribe("rooms/lobby")
await client.publish("rooms/lobby", {"hi": "there"})
# socket is closed cleanly here
Read-only state
client.state # "idle" | "connecting" | "connected" | "reconnecting" | "closed"
State transitions
connect() close()
idle ─────────────────► connecting ──► connected ──────────────► closed
│ ▲
transient drop │ │ reconnect succeeded
▼ │
reconnecting
│
│ terminal close OR max_attempts
▼
closed
Events
All events are frozen dataclasses subclassing Event. Register with client.on(EventType).
| Event | Fields | When it fires |
|---|---|---|
Connected | peer_id, server_time, expires_at, is_reconnect, max_message_size, ice_servers | After every successful welcome — initial + reconnects. Use is_reconnect to skip "connected!" toasts on reconnects. |
Disconnected | code, reason, will_reconnect | Every WS close (after the first successful connect). will_reconnect tells you if the SDK is going to retry. |
StateChange | from_, to | Every state transition. (from is a Python keyword, so the field is from_.) |
Message | channel, sender_peer_id, data, sender_metadata | A peer published to a channel you're subscribed to. |
Direct | sender_peer_id, data, sender_metadata | Someone called send(your_peer_id, data). |
Presence | channel, joined, left | Roster change on a subscribed channel. joined / left are tuples of PresencePeer. |
ServerError | code, message, request_id | Server-emitted error not tied to a pending request. Per-request rejections are raised as ServerRequestError from the awaited call instead — they do not surface here. |
GoingAway | retry_after_ms | Server is shutting down for deploy. Informational — the SDK uses its normal reconnect backoff (it does not delay by retry_after_ms automatically). To honor the hint, listen for this event yourself and gate your own delay. |
TokenProviderError | consecutive_failures, err | Your token_provider() has failed 3 times in a row. Informational — the SDK keeps retrying. Surface a "please log in again" prompt if appropriate. |
Connected — what's in the fields
@client.on(Connected)
def _(ev: Connected) -> None:
ev.peer_id # server-assigned UUID, or your JWT's `sub` claim
ev.server_time # Unix seconds when the server stamped the welcome (clock skew)
ev.expires_at # JWT's `exp` — Unix seconds, or None for pk_ keys (no expiry)
ev.is_reconnect # False on first connect, True on every reconnect — useful for UI
ev.max_message_size # server-advertised payload cap (bytes)
ev.ice_servers # tuple of IceServerConfig from the JWT's metadata.iceServers
# (TURN creds), or None for pk_ keys
Disconnected — what to do
@client.on(Disconnected)
def _(ev: Disconnected) -> None:
if ev.will_reconnect:
... # SDK will retry. Show a "reconnecting…" banner.
else:
... # Terminal. Either a caller-action close code (4001 / 4003 / 4012 / 4020)
# or max_attempts exhausted. Show a "disconnected" error + retry button.
will_reconnect == False means the SDK gave up retrying. Map close codes to UX in Errors & Codes.
Disconnected fires only for closes after the first successful connect. A failure on the very first connect() surfaces as the SignallingConnectError raised from connect() instead.
Presence — diffing the roster
@client.on(Presence)
def _(ev: Presence) -> None:
for p in ev.joined:
add_to_roster(p.peer_id, p.metadata)
for p in ev.left:
remove_from_roster(p.peer_id)
joined and left are tuples of PresencePeer, each with peer_id: str and metadata: dict | None. The first Presence event after a subscribe lists everyone already in the channel — use it to populate your initial roster. It is always sent, even when the channel is empty (joined == ()), so you can treat it as the authoritative roster snapshot at subscribe time. Subsequent events are deltas.
If you're showing a "who's here" list with avatars / names, your JWT should mint per-peer metadata (e.g. {"username": ..., "avatarUrl": ...}) — that's what shows up in p.metadata. See Authentication for minting peer_metadata.
Common pitfalls
Calling
publish/send/subscribebeforeconnect()resolves. Each raisesSignallingErrorsynchronously — the frame never left the client. (Contrast withDisconnectedError: the frame was sent but the connection dropped before the server confirmed it.) Eitherawait connect()first or queue calls behind theConnectedevent.Assuming
state == "connected"means the latest subscribe is active.subscribe()resolves on ack —awaitthat coroutine, don't fire-and-forget then immediately publish to the same channel from another task expecting your subscription to be live.auto_resubscribe=False+ forgetting to re-subscribe. A common bug is to subscribe once at startup. After the first reconnect, the SDK has no record of your subscribes (because you turned off tracking) and you silently miss every message. Leaveauto_resubscribe=Trueunless you have a specific scoped-subscription pattern.Treating a
ServerErrorevent as fatal.ServerErroris for server-emitted errors not correlated to a pending request — it doesn't kill the connection, and per-request failures don't even arrive here (they're raised asServerRequestErrorfrom the awaited call). The SDK won't auto-disconnect on aServerError.Trusting
datacontents vs the envelope.sender_peer_id(onMessage/Direct) is server-stamped and trustworthy; anything insidedatais whatever the sender chose to put there.sender_metadatais token-asserted but still untrusted peer input — validate it.
See also
MeteredPeer— the higher-level class with WebRTC + channel-driven peer discovery- Errors & Codes — every error / close code + what to do
- Reconnect Best Practices
- Authentication —
api_keyvstoken_provider, JWT claims, peer metadata