Skip to main content

MeteredPeer

The high-level class. One MeteredPeer instance = one signalling connection + one channel + N peer connections (one per remote peer in the channel).

Use this for WebRTC audio/video, screen share, peer-to-peer games, multiplayer presence + chat — including server-side Python peers (an AI agent that joins a room, an RTSP camera bridge, a recorder). A Python peer interoperates with browser peers in the same channel. If you only need pub/sub (no media, no per-peer state), SignallingClient is smaller and simpler.

Every method that does I/O is async. Names are snake_case. Events are typed dataclasses, not strings.

Construct

from metered_realtime import MeteredPeer

peer = MeteredPeer(api_key="pk_live_…")

# — OR — mint a short-lived JWT from your backend:
async def mint_token() -> str:
return await fetch_jwt_from_your_backend()

peer = MeteredPeer(token_provider=mint_token)

Pass exactly one of api_key or token_provider. The constructor doesn't connect — call join() for that.

Options

Options are keyword arguments on the constructor. (You may also build a SignallingClientOptions dataclass and pass it positionally as the first argument; the keyword form below is the common case.)

OptionTypeDefaultNotes
api_keystrNonepk_live_…. Mutually exclusive with token_provider.
token_providerCallable[[], Awaitable[str]]NoneAsync callable returning an HS256 JWT. Awaited on first connect AND every reconnect (auto-refresh).
urlstr"wss://rms.metered.ca"Server URL. The SDK appends /v1. Raises ValueError on construction if the URL has a path/query/fragment or a non-wss: scheme (ws: is allowed only for localhost).
loggerLoggerNoopLogger()Use StdlibLogger during development, your own Logger implementation in prod.
reconnectReconnectOptions \| boolTrueSee Reconnect Best Practices. False disables — rarely what you want.
inactivity_timeoutfloat (seconds)60.0If no frame arrives in this window, the SDK closes-and-reconnects. Aligned with the server's 30 s ping + 10 s grace.
token_provider_timeoutfloat (seconds)10.0Cap on how long the SDK waits for token_provider() to resolve.
auto_resubscribeboolTrueAfter reconnect, re-issue the subscribe for the channel that was active before the drop. MeteredPeer's channel recovery depends on it — leave it True. The opt-out exists on SignallingClient for manual subscription control.

All durations are floats in seconds (the asyncio idiom) — there is no *_ms option.

MeteredPeer shares SignallingClient's options — anything documented there applies here too.

Methods

await join(channel, *, include_sender_metadata=False)None

Connects (if not connected) and subscribes to channel. Resolves when the server acks the subscribe.

await peer.join("room-42")
print(peer.peer_id) # server-assigned, or your JWT's sub claim

Important — peers don't arrive synchronously with join. The server's initial presence snapshot for this channel arrives on the network after the subscribe ack, then the SDK translates it into PeerJoined events for each existing peer. By the time await peer.join(...) returns, peer.remote_peers is typically still empty; populate your state from the PeerJoined handler, not from a snapshot taken right after join.

from metered_realtime import PeerJoined

@peer.on(PeerJoined)
def _(ev: PeerJoined) -> None:
add_participant(ev.peer)

await peer.join("room-42")
# peer.remote_peers is likely [] here; existing peers arrive as PeerJoined
# events over the next ~50–200 ms.
OptionTypeDefaultNotes
include_sender_metadataboolFalseOpt in to receiving each broadcast sender's token-asserted metadata on Data events (Data.sender_metadata). Off by default since most use cases only need it via presence.

Raises:

  • MeteredPeerSendError with code == "reserved_channel" if channel starts with _metered/, _internal/, or _system/ (server-reserved prefixes, matched case-insensitively).
  • MeteredPeerStateError with code == "invalid_state" if the state isn't "idle" (already joining, already joined, leaving, or closed) — construct a new MeteredPeer if you've called close().

Reconnect note: if the signalling WebSocket drops and reconnects, the SDK re-subscribes to this channel for you (auto_resubscribe=True). You don't call join() again.

peer.joined(channel, *, include_sender_metadata=False) — async context manager

Sugar over join() / close(): joins on enter, closes on exit.

async with peer.joined("room-42") as peer:
await peer.send({"type": "hello"})
...
# peer.close() ran on exit — this instance is now terminal.

MeteredPeer is itself an async context manager (__aenter__ returns the peer, __aexit__ calls close()), so you can also write async with MeteredPeer(api_key=...) as peer: and call join() yourself inside.

await close(reason=None)None

Closes every RemotePeer, closes the WebSocket, transitions to "closed". Terminal — once closed, the same instance can't rejoin. Construct a fresh MeteredPeer.

await peer.close("user logged out")
peer = MeteredPeer(api_key="pk_live_…")
await peer.join(channel)

Calling close() more than once is safe (idempotent). It also removes the instance's listeners and closes any open events() streams after emitting Left.

Why close and not disconnect? The method is one-way terminal — it closes the WS and tears down all per-peer state. The name mirrors the underlying socket / connection close and leaves naming space for a future channel-scoped leave() (not implemented today).

await send(data)None

Broadcasts data to every other peer in the joined channel. Server-routed. data is anything JSON-serializable. Not echoed back to your own Data listeners.

await peer.send({"type": "chat", "text": "hi everyone"})

Use this for any payload that should reach everyone in the room — chat, presence pings, "user typing" indicators, control signals.

await send_to(peer_id, data)None

Sends data directly to one peer in the channel. Server-routed.

await peer.send_to(other.id, {"whisper": "private"})

Use this for one-to-one messages — private DMs, per-peer game state, direct queries. (Each RemotePeer also exposes await remote.send(data), which is shorthand for peer.send_to(remote.id, data).)

send and send_to are distinct methods. Two names make the routing intent loud — a single overloaded send(maybe_peer_id, payload) would look identical at the call site even though it routes differently.

Routing trade-offs (applies to both send and send_to)

Both are server-routed, not P2P. Same trade-off as the protocol's broadcast / direct frames:

Server-routed (send / send_to)P2P data channel (remote.create_data_channel)
Works before ICE completesYesNo — wait for remote.state == "connected"
Counts against signalling quotaYesNo
LatencyServer hop (~10–50 ms)P2P (~5–30 ms typical)
Survives WebRTC failureYes (signalling and media are separate)No
Max payloadServer cap (default 64 KB)Backend caps (~16 KB practical for ordered channels)

For low-latency P2P data, see DataChannel.

Both methods raise:

  • MeteredPeerSendError with code == "not_joined" if you call before join() resolves (state isn't "joined").
  • MeteredPeerSendError with code == "self_send" (from send_to only) if peer_id == peer.peer_id.
  • MeteredPeerSendError with code == "invalid_args" (from send_to only) if peer_id is not a non-empty string.
  • MeteredPeerOversizedError if the JSON-serialized payload exceeds the server cap. Branch on isinstance to show a "message too big" path:
from metered_realtime import MeteredPeerOversizedError

try:
await peer.send(huge)
except MeteredPeerOversizedError as e:
log.warning("message too big: %d bytes, max %d", e.size, e.cap)

add_stream(stream, metadata=None)None

Attaches every track in stream to every current peer and every peer that joins later. Sugar over add_track(track, stream, metadata) per track in the stream.

Idempotency is per track, not per stream — calling add_stream with a stream whose tracks are already tracked is a no-op for those tracks.

from metered_realtime import MediaStream, from_file

player = from_file("intro.mp4")
stream = MediaStream([t for t in (player.audio, player.video) if t is not None])
peer.add_stream(stream, {"role": "file", "label": "intro clip"})

The optional metadata (a copy is taken per track) rides over the signalling channel and arrives on the receiver's StreamAdded event and Track event. Use it to label streams so peers know which is the camera vs the screen share vs a file (see StreamMetadata below).

Tip — attach before join() if you can. Streams added before join() ride along in the first SDP offer to each peer (one round trip). Streams added after join() trigger a renegotiation cycle per peer (two extra round trips).

Reconnect note: the SDK remembers what you added (tracks + their metadata). On reconcile, both your media AND its metadata reattach to each survivor's refreshed connection automatically — you don't re-add_stream after a reconnect.

add_track(track, stream=None, metadata=None)None

Lower-level primitive. add_stream is sugar over this. Use directly when you want fine-grained control — e.g. attaching an AudioSource that isn't grouped into a MediaStream, or grouping specific tracks under one stream id.

from metered_realtime import AudioSource

source = AudioSource(input_rate=16_000) # push-based TTS track
peer.add_track(source, metadata={"role": "voice"})

Pass stream to group the track under a stream id at receivers — their Track event carries the stream, and the first track of a stream id fires StreamAdded. Omit stream for an unaffiliated track: the receiver still gets it on Track, but no StreamAdded fires (there's no stream to surface).

Idempotent on the same track object with the same stream — calling twice is a no-op. To re-tag an already-tracked track, call remove_track(t) then add_track(t, …) with new metadata.

State gates (raise MeteredPeerStateError):

  • "leaving" or "closed" — raises MeteredPeerStateError with code == "invalid_state" (and .method == "add_track", .current_state). The instance is terminal; construct a fresh MeteredPeer.
  • "reconnecting", "idle", or "joining" — no error. The track is added to the SDK's tracked set and fan-out is deferred: it lands on each survivor's refreshed connection (reconnect) or on every peer that joins (idle/joining).

Track sharing across streams is rejected: re-adding the same track with a different stream raises MeteredPeerStateError with code == "track_already_attached". Use distinct tracks per stream, or remove_track() first. (Internally each peer gets its own relayed copy of your track, so one track can fan out to many peers — but it belongs to one stream grouping.)

Metadata size: if the metadata bag would exceed the server cap when serialized for the wire, add_track raises MeteredPeerOversizedError before tracking the track (so over-cap metadata can't loop through reconcile re-sends). Keep metadata small (a few KB at most).

remove_stream(stream)None

Detaches the stream's tracks from every peer and stops tracking them for future joins. No-op if you never added it. Sugar over remove_track(t) per track in the stream. Doesn't stop the underlying tracks — stop them yourself (e.g. the source's player) if you want to release the device/decoder. Raises MeteredPeerStateError in "leaving" / "closed".

remove_track(track)None

Lower-level counterpart to add_track. Detaches one track from every peer and stops tracking it for future joins. No-op if not tracked. Doesn't stop the underlying track. Raises MeteredPeerStateError in "leaving" / "closed".

get_stream_metadata(stream)StreamMetadata | None

Returns the metadata you passed for a local stream (the one you add_stream'd) — specifically, the metadata of its first tracked track. Returns None if the stream wasn't added, or if you passed it without metadata. Useful for round-tripping your own metadata in your app state.

meta = peer.get_stream_metadata(local_screen)
print(meta.get("label") if meta else None)

This is a local lookup — it tells you what you attached. To get a remote peer's metadata for a stream they sent you, read it off the StreamAdded event payload (ev.metadata), or remote.metadata for peer-level (token-asserted) metadata.

get_track_metadata(track)StreamMetadata | None

Same as get_stream_metadata but for an individual track. Returns the metadata you passed for that specific track, or None. If you used add_stream (sugar), every track in the stream shares the same metadata bag.

await replace_track(old_track, new_track)None

Swap a track without renegotiating SDP when the formats are compatible (camera → screen share, mic → file). Fans the swap out to every peer.

screen = from_file("demo.mp4")
await peer.replace_track(camera_track, screen.video)

new_track may be None to stop sending that track across all peers. old_track must not be None — passing None raises TypeError (use remove_stream() / remove_track() to stop sending).

Tracking-map sync: replace_track updates the SDK's internal tracking map so future newcomers and reconnect cycles use new_track, not the stale old_track. Metadata is re-keyed under the new track's id and re-sent. replace_track(old_track, None) drops the entry entirely (the silenced track is no longer tracked and won't fan out to newcomers).

The map update happens when at least one peer succeeded, OR when no peer was carrying old_track (zero-target case — you're alone, or only newer peers exist). On total fan-out failure (every peer that carried the track failed), the map stays under old_track to match the live sender state — so the tracking map never lies about what's actually being sent.

Partial-failure handling. If the swap succeeds on some peers and fails on others (rare, but possible during reconnects or codec mismatches), it raises MeteredPeerReplaceTrackError carrying both lists:

from metered_realtime import MeteredPeerReplaceTrackError

try:
await peer.replace_track(old_cam, new_cam)
except MeteredPeerReplaceTrackError as e:
# e.succeeded: tuple[str, ...] — peer ids already on new_cam; leave them alone
# e.failed: tuple[ReplaceTrackFailure, ...] — retry just these
for failure in e.failed:
log.warning("replace_track failed for %s: %r", failure.peer_id, failure.err)

Each ReplaceTrackFailure has .peer_id and .err (the underlying cause, scrubbed of any credential-bearing fields since failures are routinely logged).

Raises MeteredPeerStateError (code == "invalid_state") if called during "reconnecting", "leaving", or "closed". Wait for peer.state == "joined" (listen for StateChange) before retrying.

on(EventType, handler) · off(EventType, handler) · once(EventType, handler)

Register / remove handlers, keyed by the event's class (not a string). on and once also work as decorators. Handlers may be sync or async — a sync handler runs inline during emit; an async handler is scheduled as a tracked task so a slow handler can't stall emission. A throwing handler is isolated (logged via your logger, never propagated to siblings).

from metered_realtime import Data

# decorator form
@peer.on(Data)
async def _(ev: Data) -> None:
await handle(ev)

# explicit form
def on_data(ev: Data) -> None: ...
peer.on(Data, on_data)
peer.off(Data, on_data)

events(EventType, *, maxsize=4096) → async iterator

An alternative to callbacks: an async-iterable stream of one event type. Bounded by maxsize (drops the oldest event when full so a slow consumer can't grow memory unbounded; pass maxsize=0 for unbounded). It's also an async context manager that unsubscribes on exit.

from metered_realtime import PeerJoined

async with peer.events(PeerJoined) as joins:
async for ev in joins:
add_participant(ev.peer)

StreamMetadata

StreamMetadata is a plain dict (dict[str, Any]) — the bag you pass to add_stream / add_track, surfaced at the receiver on the StreamAdded and Track events.

StreamMetadata = dict[str, Any]   # type alias

meta: StreamMetadata = {
"role": "camera", # convention: "camera" | "screen" | "file" | custom
"label": "front cam", # human-readable
# …any keys your app needs
}

Convention only — the SDK doesn't validate role / label. Use any keys your app needs. Remember this is server-routed payload over the signalling channel — the bag is sent as a direct message and so counts against the server message-size cap (default 64 KB) like any other directed send. Keep it small for low call-setup latency.

Trust model for StreamMetadata

Per-track / per-stream metadata is sender-stamped and server-routed verbatim — NOT token-signed. A peer in your channel can put whatever it wants in the metadata bag it sends, and your Track / StreamAdded handlers will see those exact values.

Use it for hints that help your app lay out streams (camera vs screen, label text, custom flags). Do NOT use it for authoritative identity, authorization decisions, or anything security-relevant. For server-verified identity, use the JWT's metadata claim — it surfaces on presence (remote.metadata) and on Data events as sender_metadata.

The SDK also validates inbound metadata defensively: a value that isn't a plain dict is dropped (a hostile peer can't slip a list/None/primitive into a customer-visible metadata field), and inbound metadata over the size cap is dropped with a warning rather than stored.

Receiver-side flood defense: the SDK caps each remote peer's track-metadata cache at 512 entries; on overflow, the oldest entry is evicted (FIFO). This bounds the memory cost when an adversarial peer floods unique track ids.

Wire transport (good to know, not required reading)

Per-track metadata travels over the same signalling channel as direct messages, under a reserved key ("__meteredTrackMeta"). The SDK intercepts these on the receive side and stashes them; they never surface on the Data event. Metadata is sent BEFORE the matching SDP offer for the track so ordering is preserved — receivers have the bag stashed by the time their Track event fires.

This means "__meteredTrackMeta" is a reserved key inside the data payload of send_to — if you use it for your own purposes, the SDK warns via your logger and the receiver will interpret it as a metadata update. Pick a different key in your own payloads.

Feeding media from Python

There's no getUserMedia here. Build media with MediaStream plus a source:

  • AudioSource — a push-based audio track for synthesized/streamed audio (an AI agent's TTS, say). Construct it for your input format, add_track it, then await source.push(pcm_bytes) as audio is produced and source.end() when done. It paces output to real time and applies backpressure.
  • from_file(path_or_url, loop=False) — a player backed by a file or HTTP URL; player.audio / player.video are the tracks (either may be None — check before add_track).
  • from_rtsp(url, transport="tcp") — an RTSP/HTTP stream (most IP cameras).
  • from_camera() / from_microphone() / screen_share() — local device capture (platform/FFmpeg-dependent).
  • iter_frames(track) — an async iterator over a received track's frames that ends when the track does (e.g. feeding inbound audio to speech-to-text).
from metered_realtime import MeteredPeer, AudioSource, from_file

peer = MeteredPeer(api_key="pk_live_…")
source = AudioSource(input_rate=16_000)
peer.add_track(source, metadata={"role": "voice"})

cam = from_camera()
if cam.video is not None:
peer.add_track(cam.video, metadata={"role": "camera"})

await peer.join("room-42")
async for chunk in tts_stream(): # bytes of s16 PCM
await source.push(chunk)
source.end()

The from_* helpers require the webrtc extra and interpret their argument via FFmpeg. Do not pass untrusted/externally-controlled file paths or URLs — a hostile string can read arbitrary local files or fetch arbitrary URLs (SSRF). Validate before forwarding. See Media for the full source reference.

Simultaneous camera + screen share — the canonical pattern

Instead of swapping the video track when switching to screen share, send both as separate streams:

from metered_realtime import MediaStream, PeerJoined, StreamAdded, from_camera, screen_share

cam = from_camera()
peer.add_stream(MediaStream([cam.video]), {"role": "camera", "label": "front cam"})

screen = screen_share()
peer.add_stream(MediaStream([screen.video]), {"role": "screen", "label": "shared window"})

# Receiver:
@peer.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(StreamAdded)
def _(s: StreamAdded) -> None:
role = (s.metadata or {}).get("role")
if role == "camera":
attach_to_face_tile(s.stream)
elif role == "screen":
attach_to_screen_tile(s.stream)

Peers see them as two separate streams and lay them out accordingly.

Read-only state

peer.state          # "idle" | "joining" | "joined" | "reconnecting" | "leaving" | "closed"
peer.peer_id # str | None (None until connected)
peer.channel # str | None (None until joined)
peer.remote_peers # list[RemotePeer] — snapshot; mutating the list does nothing

State transitions

            join()                                  close()
idle ──────────► joining ──► joined ──────────────────────► leaving ──► closed
│ ▲
│ │ reconnect succeeded
▼ │
reconnecting

│ terminal close code

closed

What each state means for your app:

StateShow your userSend / receive?
idleNo
joining"connecting…"No
joinednormal app UIYes
reconnecting"reconnecting…" banner (best practices)No (sends raise; the SDK recovers the channel and peers on success)
leaving"disconnecting…"No
closed"disconnected" + retryNo (terminal; construct a fresh peer)

Events

Register by event class:

peer.on(EventType, handler)        # or @peer.on(EventType), or peer.events(EventType)
EventFieldsFires
Joinedpeer_id: str, channel: strOnce, after join() resolves. Read your final peer_id here if you didn't claim one in your JWT.
Leftpeer_id: str \| None, channel: str \| None, reason: str \| NoneOnce, when you reach "closed". peer_id / channel snapshot what your peer was using before close (both None if you closed before connect/subscribe).
StateChangefrom_: str, to: strEvery state transition. Drives your reconnect UI. (from is a Python keyword, so the field is from_.)
PeerJoinedpeer: RemotePeerAnother peer joined the channel. Wire up Track / StateChange listeners on ev.peer here.
PeerLeftpeer: RemotePeerPeer unsubscribed or disconnected. Clean up their media here.
Datasender_peer_id: str, data: Any, kind: "broadcast" \| "direct", sender_metadata: dict \| NoneA customer payload arrived. See below.
FatalErrorerr: ExceptionUnified surface for unrecoverable conditions you must react to — see "What fires FatalError". RTC signals are NEVER surfaced as Data or FatalError.

RemotePeer emits its own events (StateChange, Track, StreamAdded, StreamRemoved, DataChannelOpened, NegotiationError, IceCandidateError) — see RemotePeer.

The Data event — channel scope

Data only fires for senders the SDK has seen via presence on your current channel. If another tenant peer in a different channel sends you a direct message (the wire protocol allows it within a tenant), the SDK drops it — your Data handler won't fire.

This keeps Data consistent with PeerJoined / PeerLeft: every Data event is from a peer you've also seen join. The SDK also skips echoing your own broadcasts back to you. Customers who want unscoped direct messaging (e.g. a cross-channel "system" peer that DMs you) should use SignallingClient directly, where the wire protocol's permissive semantics are intentional.

The Data event — branching on kind

kind discriminates channel-wide send(data) ("broadcast") from one-to-one send_to(my_id, data) ("direct"):

from metered_realtime import Data

@peer.on(Data)
def _(ev: Data) -> None:
if ev.kind == "broadcast":
append_to_room_chat(ev.sender_peer_id, ev.data)
else:
append_to_private_thread(ev.sender_peer_id, ev.data)

The discriminator is server-stamped and trustworthy.

The Data event — sender_metadata

If you joined with include_sender_metadata=True, broadcasts arrive with the sender's token-asserted metadata on sender_metadata. Direct sends carry it whenever the sender's JWT had metadata (you don't opt in for directs).

await peer.join("room-42", include_sender_metadata=True)

@peer.on(Data)
def _(ev: Data) -> None:
name = (ev.sender_metadata or {}).get("username", "Anonymous")
append_message(name, ev.data["text"])

sender_metadata is untrusted peer-supplied input — your backend signed it for the sender, but a peer with a leaked JWT keeps using it. Use the JWT's server-enforced channel patterns for access decisions, not sender_metadata fields.

The Data event — security note

sender_peer_id is the server-stamped envelope sender. Trust it.

Anything inside data is whatever the sender chose to put there. If a peer publishes {"from": "alice", "message": "…"}, the SDK doesn't validate the inner from. Use sender_peer_id, not data["from"], to identify who sent what.

# ❌ Don't trust data["from"] — peers can lie about it
@peer.on(Data)
def _(ev: Data) -> None:
add_message(ev.data["from"], ev.data["text"])

# ✅ Trust sender_peer_id — it's stamped by the server
@peer.on(Data)
def _(ev: Data) -> None:
add_message(ev.sender_peer_id, ev.data["text"])

Python dicts don't have JavaScript's __proto__ prototype-pollution hazard, but blindly merging untrusted peer data into your own structures is still unwise — a hostile peer can set any keys it likes. Copy specific known keys ({"text": ev.data.get("text")}) instead of my_state.update(ev.data), and validate types before use.

What fires FatalError

FatalError consolidates unrecoverable conditions that would otherwise require wiring separate listeners on the underlying SignallingClient. ev.err is a MeteredRealtimeError whose message identifies the cause. It fires for:

SourceWhenMessage identifies
Terminal WS close code 4001Invalid JWTinvalid_token
Terminal WS close code 4002Token expired (reconnect ladder won't run)token_expired
Terminal WS close code 4003JWT doesn't authorize the channelchannel_not_authorized
Terminal WS close code 4012Account suspended (billing)account_suspended
Terminal WS close code 4020Admin disconnect via REST APIadmin_disconnect
Fatal server-error frameServer rejected a request with account_suspended / channel_not_authorized / action_not_permitted / invalid_token / token_expiredthe server's code string
token_provider failed past its consecutive-failure thresholdYour mint endpoint kept rejecting"token provider failed N consecutive attempts…"

Non-fatal server errors (rate limits, quota, transient failures correlated to a specific request) are raised on the calling request instead and do NOT fire FatalError. The connection stays open.

Unlike the JS SDK, there's no err.name attribute to switch on — the symbolic name is embedded in the message string. Treat FatalError as terminal: log/report ev.err, surface a re-login / billing / kicked UI as appropriate, and recover by constructing a fresh MeteredPeer.

from metered_realtime import FatalError

@peer.on(FatalError)
def _(ev: FatalError) -> None:
log.error("fatal: %r", ev.err)
report_to_sentry(ev.err)
show_disconnected_ui() # offer re-login / retry; construct a new MeteredPeer

What survives a reconnect

Reference you holdSurvives signalling-WS reconnect?Notes
peer instanceYesSame object across the drop.
peer.remote_peers contentsYes for survivorsPeerJoined / PeerLeft fires for newcomers / leavers.
RemotePeer reference you heldYesSame object (is). The SDK swaps the underlying connection silently — no PeerJoined/PeerLeft.
Local media added via add_stream / add_trackYesSDK reattaches your tracks + metadata to each survivor's refreshed connection.
remote.pc reference you held in a variableNoAfter reconcile, remote.pc returns a fresh connection; your old variable points at a closed one. Re-read it.
MediaStream object from a StreamAdded eventNoA fresh object (same stream.id) arrives on a new StreamAdded. Re-take it.
Data channel from remote.create_data_channel(...)NoTied to the old connection. Re-open on StateChange to "connected". See DataChannel.

Recovery is signalled by the remote peer's StateChange back to "connected". Reconnect Best Practices has the full playbook — read it before going to production.

Common pitfalls

  1. Calling peer.send before join() resolves. Raises MeteredPeerSendError (code == "not_joined"). Either await peer.join(...) first or queue your sends behind the Joined event.

  2. Assuming send / send_to is P2P. Both are server-routed. If you're moving thousands of messages per second between peers (game state, telemetry), open a data channel instead — see the routing trade-offs table.

  3. Holding remote.pc (or a StreamAdded MediaStream) across a reconnect. Both go stale. Re-read remote.pc and re-take the stream on each StateChange"connected".

  4. add_stream after join without expecting renegotiation. Each post-join add_stream adds two SDP round-trips per peer — a ~200–600 ms delay before peers see the new track. Attach before join when possible.

  5. Forgetting close() is terminal. A MeteredPeer can't rejoin after close(). Construct a new one.

  6. Trusting data["from"] instead of sender_peer_id. Sender-spoofed messages bypass your access checks. Always use sender_peer_id.

  7. Passing an untrusted path/URL to a from_* media helper. FFmpeg will happily open local files and remote URLs (SSRF). Validate first.

See also