# metered-realtime — Python SDK reference For developers using the `metered-realtime` PyPI package — this file documents the SDK surface end-to-end. The SDK wraps the Realtime Messaging wire protocol with framing, ack correlation, auto-reconnect, perfect-negotiation WebRTC (on aiortc), multi-stream + per-track metadata, and TURN credential injection. **You should not need to read the wire-protocol layer to build with the SDK** — but if you do, see the raw-WebSocket reference (https://www.metered.ca/docs/llms-realtime-messaging-raw-websocket.txt), which is also useful when interoperating with non-Python clients. A Python peer is indistinguishable on the wire from a browser/JS peer — the canonical case is a Python AI agent in a call with a browser human. - PyPI: https://pypi.org/project/metered-realtime/ - Docs: https://www.metered.ca/docs/realtime-messaging/sdk-python/ - JS SDK (interops with this one): https://www.metered.ca/docs/llms-realtime-messaging-sdk.txt - Dashboard: https://dashboard.metered.ca | Pricing: https://www.metered.ca/pricing --- ## Install ``` pip install "metered-realtime[webrtc]" # MeteredPeer + media (aiortc + av) pip install metered-realtime # pub/sub only (SignallingClient; just websockets) ``` Python 3.10+. Async throughout (`asyncio`). The `webrtc` extra adds the native media stack; a pub/sub-only install never imports it (the media helpers are lazily loaded). --- ## Two classes Pick by what you're building: | You're building… | Use | Why | |---|---|---| | **AI agent in a call**, media bridge, recording bot — anything exchanging audio/video/data with peers | `MeteredPeer` | Joins a channel, discovers peers via presence, manages each peer connection, fans your media out, recovers ICE on network change | | **Presence + chat / a coordination bus** | `MeteredPeer` | `PeerJoined` / `PeerLeft` events drive your logic; `peer.send` broadcasts, `peer.send_to` directs | | **IoT telemetry / pub-sub fan-out** without media | `SignallingClient` | Pub/sub + directed messages, no WebRTC overhead. Smaller, multiple channels per connection | Not sure? Start with `MeteredPeer`. --- ## Event model — one mechanism, three surfaces Every emitter (`MeteredPeer`, `RemotePeer`, `DataChannel`, `SignallingClient`) carries typed, frozen-dataclass events. Events are keyed by TYPE, not by string name. ```python from metered_realtime import MeteredPeer, PeerJoined, Data async with MeteredPeer(api_key="pk_live_…") as peer: # 3. async-context lifecycle @peer.on(PeerJoined) # 1. decorator (sync OR async def) def _(ev: PeerJoined) -> None: print("joined:", ev.peer.id) await peer.join("room-42") async for msg in peer.events(Data): # 2. buffered async iterator if msg.data == "stop": break ``` - `obj.on(EventType, handler)` / `@obj.on(EventType)` — register; `obj.off(EventType, handler)`; `obj.once(EventType, handler)`. Handlers may be sync or `async def` (async handlers are scheduled and tracked by the SDK; a throwing handler is isolated, never breaks emission). - `obj.events(EventType, *, maxsize=4096)` — async iterator (drops oldest when full). - `async with MeteredPeer(...) as peer:` closes on exit. `async with peer.joined("room") as peer:` joins on enter, closes on exit. --- ## Quick start — `MeteredPeer` (WebRTC + presence + chat) ```python import asyncio from metered_realtime import ( MeteredPeer, PeerJoined, PeerLeft, Track, StreamAdded, Data, AudioSource, MediaStream, ) async def main() -> None: # exactly one of api_key / token_provider: # api_key="pk_live_…" pk_ keys must have `Send` ticked for WebRTC (see "pk_ gotcha") # token_provider=async_or_sync_callable_returning_a_jwt async with MeteredPeer(api_key="pk_live_…") as peer: @peer.on(PeerJoined) def _(ev: PeerJoined) -> None: @ev.peer.on(StreamAdded) # fires once per remote stream def _(s: StreamAdded) -> None: attach_or_record(s.stream, s.metadata) @ev.peer.on(Track) # or per individual track def _(t: Track) -> None: ... # t.track is an aiortc MediaStreamTrack @peer.on(PeerLeft) def _(ev: PeerLeft) -> None: cleanup(ev.peer.id) @peer.on(Data) def _(ev: Data) -> None: # ev.sender_peer_id is server-stamped — trust it. # ev.kind is "broadcast" | "direct"; ev.sender_metadata needs the JWT path # + include_sender_metadata=True on join (for broadcasts). ... # No getUserMedia server-side — you supply the track. source = AudioSource(input_rate=16_000) peer.add_track(source, MediaStream(id="agent-voice"), {"role": "voice"}) await peer.join("room-42") await peer.send({"chat": "hi everyone"}) # broadcast to channel await peer.send_to(other_peer_id, {"hi": "you"}) # directed to one peer await source.push(pcm_bytes) # stream synthesized audio await asyncio.sleep(30) asyncio.run(main()) ``` **Public methods on `MeteredPeer`** (all `async` unless noted): - `join(channel, *, include_sender_metadata=False)` - `joined(channel, *, include_sender_metadata=False)` — returns an async context manager (sync call) - `close(reason=None)` — terminal; construct a new `MeteredPeer` to rejoin - `send(data)` — broadcast to channel - `send_to(peer_id, data)` — directed to one peer - `add_stream(stream, metadata=None)` / `remove_stream(stream)` — sync - `add_track(track, stream=None, metadata=None)` / `remove_track(track)` — sync - `replace_track(old_track, new_track)` — swap without renegotiation; `new_track=None` mutes - `get_stream_metadata(stream)` / `get_track_metadata(track)` — sync local lookups - `on` / `off` / `once` / `events` — sync **Read-only properties:** `peer.state`, `peer.peer_id`, `peer.channel`, `peer.remote_peers` (snapshot list). **Peers don't arrive synchronously with `join`.** The first `presence` lands shortly after the subscribe ack, so `peer.remote_peers` is typically empty right after `await peer.join(...)`. Populate from the `PeerJoined` handler, not a snapshot. **pk_ key gotcha — `Send` permission is required for WebRTC.** New publishable keys have `Subscribe`/`Publish`/`Presence` on but `Send` OFF. MeteredPeer's WebRTC layer uses the wire `send` op to exchange SDP/ICE — without `Send`, `join()`/`add_track()` complete and presence fires, but `PeerJoined → Track` never fires (no connection negotiates). For pk_ + WebRTC, tick `Send` when creating the key. --- ## Quick start — `SignallingClient` (pub/sub only) ```python import asyncio from metered_realtime import SignallingClient, Connected, Message async def main() -> None: async with SignallingClient(api_key="pk_live_…") as client: @client.on(Connected) def _(ev: Connected) -> None: print("connected as", ev.peer_id) @client.on(Message) def _(ev: Message) -> None: print(ev.sender_peer_id, ev.channel, ev.data) await client.subscribe("alerts/critical") await client.publish("alerts/critical", {"sensor": "T-12", "value": 92}) await client.send(other_peer_id, {"type": "challenge"}) # directed P2P-via-server await asyncio.sleep(10) asyncio.run(main()) ``` **Public methods on `SignallingClient`** (all `async` unless noted): - `connect()` / `close(code=1000, reason="client requested close")` - `subscribe(channel, opts=None)` / `unsubscribe(channel)` (`opts` is a `SubscribeOptions`) - `publish(channel, data)` / `send(to_peer_id, data)` - `on` / `off` / `once` / `events` — sync **Read-only property:** `client.state`. **Naming note vs `MeteredPeer`:** `SignallingClient` keeps `client.send(to_peer_id, data)` (no `send_to` rename) and surfaces directed messages as the `Direct` event (`sender_peer_id`, `data`) — wire-faithful. `MeteredPeer` earns the more specific names because it merges sources. --- ## Auth modes Two paths. Pick by where your code runs. ### Path 1 — `api_key` (publishable key) Fixed scope from the dashboard, no backend. `MeteredPeer(api_key="pk_live_…")`. Use for trusted server-side processes and prototypes. **For WebRTC, tick `Send` when creating the key** (off by default — see the pk_ gotcha above). With "Auto-inject TURN" enabled (default), `iceServers` arrive on the welcome automatically. ### Path 2 — `token_provider` (sk_-minted JWT) An **async** callable returning an HS256 JWT string (`Callable[[], Awaitable[str]]` — the SDK `await`s it; wrap a sync minter in an `async def` or `asyncio.to_thread`). Called on first connect AND every reconnect (auto-refresh). Gives a stable `peer_id` (the JWT `sub`), channel permissions, `peerMetadata`, and embedded TURN creds. ```python from metered_realtime import MeteredPeer async def mint() -> str: # call YOUR backend's token endpoint; never sign with the secret in client code async with httpx.AsyncClient() as h: r = await h.post("https://your-backend/api/realtime-token") r.raise_for_status() return r.json()["token"] peer = MeteredPeer(token_provider=mint) ``` ### Minting JWTs server-side (Python, PyJWT) `pip install "metered-realtime[auth]"` (or just `PyJWT`). The transport never signs — minting is your backend's job. ```python import time, jwt # PyJWT def mint_realtime_token(user_id: str, app_id: str, turn_creds: dict) -> str: return jwt.encode( { "sub": user_id, # becomes peer_id "channels": [f"app_{app_id}/call-*"], # wildcard patterns "permissions": ["publish", "subscribe", "presence", "send"], "metadata": {"iceServers": turn_creds}, # welcome-only (TURN) "peerMetadata": {"username": "Ada"}, # visible to other peers "exp": int(time.time()) + 3600, }, SK_SECRET, algorithm="HS256", headers={"kid": SK_ID}, ) ``` **Critical:** `token_provider` is called on every reconnect — return a FRESH JWT each call (or cache with a TTL well under `exp`). A stale JWT triggers a 4002 → re-mint → stale loop. ### JWT claims reference | Claim | Required | What it does | |---|---|---| | `sub` | yes | Becomes `peer_id`. ≤ 128 chars. | | `exp` | yes | Unix seconds. ≤ 24h. | | `channels` | yes | Wildcard patterns (`*` = one segment, `**` = multi-segment). | | `permissions` | yes | Subset of `["publish", "subscribe", "presence", "send"]`. | | `metadata` | no | ≤ 8 KB. Returned on `welcome`. WebRTC: put `iceServers` here. | | `peerMetadata` | no | ≤ 4 KB. Stamped onto presence + direct + opt-in channel messages. | You can also mint via the REST API (`POST https://rms.metered.ca/v1/tokens` with `Authorization: Bearer `) if you'd rather not sign in-process. --- ## Media — you supply the track No `getUserMedia` server-side. Built-in helpers (require the `webrtc` extra): ```python from metered_realtime import AudioSource, MediaStream, iter_frames, from_file, from_rtsp # Push synthesized / streamed audio (e.g. an agent's TTS) into the room: source = AudioSource(input_rate=16_000, input_layout="mono") # 16 kHz mono s16 PCM in peer.add_track(source, MediaStream(id="agent-voice")) await source.push(pcm_bytes) # resampled to 48 kHz stereo, paced in real time; suspends on backpressure source.end() # end-of-stream: drains, then the track stops # Consume a peer's audio (e.g. feed speech-to-text): async for frame in iter_frames(remote_track): # av.AudioFrame per 20 ms; ends when the track stops ... # Sources backed by a file / IP camera / device (.audio / .video is None if absent): peer.add_track(from_file("clip.mp4").audio) peer.add_track(from_rtsp("rtsp://cam:554/stream", transport="tcp").video) # also: from_camera(device=None, *, width, height, fps), from_microphone(device=None), screen_share(...) ``` - `AudioSource(*, input_rate=16_000, input_layout="mono", max_buffered_seconds=5.0, logger=None)` — `input_layout` is `"mono"` or `"stereo"` (others raise). `push(data: bytes | av.AudioFrame)`, `end()`, `buffered_seconds`. - `MediaStream(tracks=(), *, id=None)` — `id`, `get_tracks()`, `get_audio_tracks()`, `get_video_tracks()`, `add_track()`, `remove_track()`. `stream.id` is stable sender→receiver; the object differs across reconnects (re-take on `StreamAdded`). - `StreamMetadata` is a plain `dict` (convention: `role`, `label`). Sender-stamped, untrusted — do not use for auth. - `from_rtsp` `transport` allowlist: `{tcp, udp, udp_multicast, http}`. Do NOT pass untrusted file/url strings to `from_file`/`from_rtsp` — they reach FFmpeg's protocol layer. Device capture is platform/FFmpeg-dependent (V4L2 / AVFoundation / DirectShow); software encode has a CPU ceiling on small SoCs. - Any aiortc-compatible `MediaStreamTrack` works directly with `add_track`. --- ## Reconnect — three layers (handled automatically) 1. **Signalling WS:** exponential backoff + jitter, close-code-aware, token refresh on every reconnect, inactivity watchdog, auto-resubscribe. 2. **Per-peer WebRTC:** ICE recovery on network change / TURN failover (surfaces as a peer `StateChange` to `"reconnecting"` then back to `"connected"`). 3. **Channel reconcile:** when the WS drops, your `RemotePeer` references survive — only the underlying connection is swapped. **What survives a reconnect:** | Reference | Survives? | Note | |---|---|---| | `peer` instance | yes | same object | | `RemotePeer` you held | yes | same object; SDK swaps its connection | | local media added via `add_track`/`add_stream` | yes | reattached automatically | | `remote.pc` you stored | **no** | re-read on `StateChange` → `"connected"` | | `MediaStream` from a `Track`/`StreamAdded` event | **no** (new object, same `id`) | re-take on the next `StreamAdded` | | a `DataChannel` over `remote.create_data_channel` | **no** | re-open on `StateChange` → `"connected"` | Tune via `ReconnectOptions(initial_delay=0.5, multiplier=2.0, max_delay=30.0, jitter_ratio=0.2, max_attempts=100)` (all seconds) passed as `reconnect=`. Pass `reconnect=False` to disable. --- ## Routing model — `send` / `send_to` are SERVER-routed (not P2P) `peer.send(data)` (broadcast) and `peer.send_to(peer_id, data)` (directed) go Peer → Metered → Peer over the signalling WS. They work before ICE completes and survive WebRTC failure, but count against your signalling-message quota and are capped at `welcome.max_message_size` (server default 64 KB). For low-latency P2P bulk data, open a `DataChannel`. --- ## MeteredPeer events `@peer.on(EventType)` — payload fields in parentheses: - `Joined(peer_id, channel)` — once, after `join()` resolves. - `Left(peer_id, channel, reason)` — once, on reaching `"closed"` (`peer_id`/`channel` may be `None`). - `StateChange(from_, to)` — every transition. (`from_` — `from` is a keyword.) - `PeerJoined(peer)` — another peer joined; `peer` is a `RemotePeer`. - `PeerLeft(peer)` — peer left/dropped. - `Data(sender_peer_id, data, kind, sender_metadata)` — customer payload. `kind` is `"broadcast"`|`"direct"`. Only fires for senders seen via presence on your channel. RTC signals + track-metadata control messages are never surfaced here. - `FatalError(err)` — a condition you must act on: a terminal auth/admin close code, a fatal server-error, or the token provider failing past its threshold. (`err` is an exception; branch on the message / pair it with the lower-level `Disconnected.code`.) State: `"idle" | "joining" | "joined" | "reconnecting" | "leaving" | "closed"`. --- ## RemotePeer events (subscribe on each `PeerJoined`'s `ev.peer`) - `StateChange(from_, to)` — per-peer connection state: `"idle" | "connecting" | "connected" | "reconnecting" | "closed"`. The `"connecting"` hop is filtered during a reconnect so you see `connected → reconnecting → connected`. - `Track(track, streams, metadata)` — remote added a track. `streams` is a tuple of `MediaStream`; `metadata` is the sender's per-track `StreamMetadata`. - `StreamAdded(stream, metadata)` — first time a stream id appears; re-fires after a reconnect with a fresh `MediaStream` object (same id). - `StreamRemoved(stream)` — every track of a stream ended (not fired during reconnects). - `DataChannelOpened(channel)` — remote opened a data channel; `channel` is the raw channel — wrap it in `DataChannel`. - `NegotiationError(err)` / `IceCandidateError(err)` — negotiation / inbound-candidate failures (usually recovery noise). Properties: `remote.id`, `remote.metadata`, `remote.state`, `remote.polite`, `remote.pc` (escape hatch — goes stale across reconnects). Methods: `await remote.send(data)`, `remote.create_data_channel(label, **kwargs)`. --- ## SignallingClient events - `Connected(peer_id, server_time, expires_at, is_reconnect, max_message_size, ice_servers)` - `Disconnected(code, reason, will_reconnect)` - `StateChange(from_, to)` — `"idle" | "connecting" | "connected" | "reconnecting" | "closed"` - `Message(channel, sender_peer_id, data, sender_metadata)` — a channel broadcast - `Direct(sender_peer_id, data, sender_metadata)` — a directed message - `Presence(channel, joined, left)` — `joined`/`left` are tuples of `PresencePeer(peer_id, metadata)` - `ServerError(code, message, request_id)` — uncorrelated server error (correlated ones raise on the awaited call) - `GoingAway(retry_after_ms)` - `TokenProviderError(consecutive_failures, err)` — fired at the failure threshold --- ## DataChannel Backpressure-aware wrapper over a raw channel (from `remote.create_data_channel(...)` outbound, or the `DataChannelOpened` event inbound). ```python from metered_realtime import DataChannel, DCOpen, DCMessage dc = DataChannel(raw_channel) # max_buffered_amount=1_048_576, max_queued_sends=256, … @dc.on(DCMessage) async def _(m: DCMessage) -> None: # m.data is str | bytes await dc.send(m.data) # suspends while over the buffer cap (backpressure) ``` - `await dc.send(data: str | bytes)` — raises `DataChannelOverflowError` past the queue cap, `MeteredRealtimeError` if closed or if a single payload exceeds `max_buffered_amount`. - `dc.close()` (sync). Properties: `dc.label`, `dc.ready_state`, `dc.buffered_amount`. - Events: `DCOpen`, `DCClose`, `DCError(err)`, `DCMessage(data)`. - A channel dies with its connection — re-open on `StateChange` → `"connected"` after a reconnect. --- ## Error classes Branch on `isinstance` / stable `.code` attrs — never the message string. Base: `MeteredRealtimeError`. - `SignallingError` → `SignallingConnectError(close_code, close_reason)`, `ServerRequestError(code, request_id)`, `AckTimeoutError(request_id, timeout)`, `DisconnectedError` - `MeteredPeerSendError(code)` — `code` ∈ `"reserved_channel" | "not_joined" | "invalid_args" | "self_send"` - `MeteredPeerOversizedError(size, cap)` - `MeteredPeerStateError(code, method, current_state)` — `code` ∈ `"invalid_state" | "track_already_attached"` - `MeteredPeerReplaceTrackError(succeeded, failed)` — `failed` is a tuple of `ReplaceTrackFailure(peer_id, err)` (errors scrubbed of SDP secrets) - `DataChannelOverflowError(queued, cap)` --- ## Close codes (`WsCloseCode` IntEnum) | Code | Name | Reconnect? | |---|---|---| | 1001 | GOING_AWAY | yes | | 1008 | POLICY_VIOLATION | yes | | 1009 | MESSAGE_TOO_BIG | yes | | 4000 | CLIENT_INACTIVITY (client-emitted watchdog) | yes | | 4001 | INVALID_TOKEN | **no (terminal)** → `FatalError` | | 4002 | TOKEN_EXPIRED | yes — refreshes the token first | | 4003 | CHANNEL_NOT_AUTHORIZED | **no (terminal)** → `FatalError` | | 4010 | OVER_CONCURRENT_LIMIT | yes — with a ≥ 30 s backoff floor | | 4011 | OVER_MESSAGE_RATE | yes | | 4012 | ACCOUNT_SUSPENDED | **no (terminal)** → `FatalError` | | 4020 | ADMIN_DISCONNECT | **no (terminal)** → `FatalError` | Terminal codes (`{4001, 4003, 4012, 4020}`) surface as a `FatalError` event; recover with `close()` + a fresh `MeteredPeer`. --- ## Reserved channel prefixes `join()` rejects channels starting with `_metered/`, `_internal/`, `_system/` (server-reserved) synchronously with `MeteredPeerSendError(code="reserved_channel")`. --- ## Interop A Python `MeteredPeer`, a browser `@metered-ca/realtime` peer, a React Native peer, and a raw-WebSocket client can all share a channel — the SDK speaks the protocol byte-for-byte. WebRTC negotiation uses non-trickle ICE outbound + trickle-accept inbound and a polite/impolite tie-break, all handled for you, so a Python peer answers a browser peer's offer (and vice versa) transparently. --- ## Examples (in the package repo's `examples/`) - `data_channel_echo.py` — open a P2P data channel and echo messages - `audio_agent.py` — stream synthesized audio via `AudioSource` + consume a peer's audio via `iter_frames` - `signalling_smoke.py` — pub/sub round-trip with `SignallingClient` Each is env-driven: `METERED_KEY=pk_live_… python examples/.py`. --- ## Documentation pages - Overview: https://www.metered.ca/docs/realtime-messaging/sdk-python/ - Getting Started: https://www.metered.ca/docs/realtime-messaging/sdk-python/getting-started - API Reference: MeteredPeer, RemotePeer, SignallingClient, DataChannel, Media, Errors & Codes - Guides: Authentication, Reconnect Best Practices, AI Agent Communication, IoT Telemetry