Example — Data Channel Echo
Two MeteredPeers join a room in one process: the client opens a P2P DataChannel to the bot, and the bot echoes every message back. Demonstrates create_data_channel, the DataChannelOpened event, and the backpressure-aware DataChannel wrapper.
PyPI: metered-realtime
This is the runnable examples/data_channel_echo.py from the SDK, walked through end to end.
What it demonstrates
- Opening a P2P
DataChannelwithremote.create_data_channel(...)(it's aRemotePeermethod) for true peer-to-peer data - Accepting an inbound channel via the
DataChannelOpenedevent - The
DataChannelwrapper for backpressure-awareawait dc.send(...) - Async event handlers (
@dc.on(DCMessage)), which the SDK schedules and tracks for you — no manual task bookkeeping
Install
pip install "metered-realtime[webrtc]"
The data channel needs the webrtc extra — it pulls in the WebRTC backend (aiortc). A pub/sub-only install can't open one.
Running it locally
The example reads your publishable key from METERED_KEY:
METERED_KEY=pk_live_… python examples/data_channel_echo.py
To point at a local or self-hosted server, set the optional METERED_URL:
METERED_KEY=pk_live_… METERED_URL=ws://localhost:9292 python examples/data_channel_echo.py
It prints OK — bot echoed [...] and exits 0 once the bot has echoed every message back over the channel.
Why this isn't just peer.send
peer.send(data) works for most app-level data, but it's server-routed: every message goes up to the messaging service and back down, and counts against your signalling message quota.
For high-frequency or latency-sensitive data (game ticks, telemetry streams, file transfers), a true P2P DataChannel is the better fit — the bytes flow directly between the two peers. The trade-off is that you're responsible for opening it, handling backpressure, and re-opening it after reconnects.
This example shows the minimum complete pattern.
Source walkthrough
Setup — two peers, one process
Both peers read the same options. METERED_KEY is required; METERED_URL is an optional server override.
import asyncio
import os
import sys
from metered_realtime import (
DataChannel,
DataChannelOpened,
DCMessage,
DCOpen,
MeteredPeer,
PeerJoined,
)
CHANNEL = "metered-realtime-dc-echo"
MESSAGES = ["ping", "hello", "metered"]
def _opts() -> dict[str, str]:
key = os.environ.get("METERED_KEY")
if not key:
print("Set METERED_KEY=pk_live_... (a publishable key).", file=sys.stderr)
raise SystemExit(2)
opts = {"api_key": key}
url = os.environ.get("METERED_URL")
if url:
opts["url"] = url
return opts
The bot — accept the channel and echo each message
The bot doesn't open anything. When a peer opens a channel to it, the SDK fires DataChannelOpened on that remote peer, carrying the raw channel as opened.channel. Wrap it in a DataChannel and echo every inbound DCMessage straight back:
@bot.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(DataChannelOpened)
def _(opened: DataChannelOpened) -> None:
dc = DataChannel(opened.channel)
# Async handlers are scheduled + tracked by the SDK, so the echo's
# backpressure-aware send runs without manual task bookkeeping.
@dc.on(DCMessage)
async def _(msg: DCMessage) -> None:
await dc.send(msg.data)
The DCMessage handler is async, and await dc.send(...) is backpressure-aware — if the producer ever outran the link, send() would suspend until the channel's buffer drains rather than growing memory without bound. The SDK schedules and tracks the async handler, so you don't manage the task yourself.
The client — open the channel, send, collect the echoes
The client opens the channel from its PeerJoined handler with create_data_channel, wraps the raw channel, and listens for DCOpen (the channel is ready) and DCMessage (an echo arrived):
echoes: list[str | bytes] = []
opened = asyncio.Event()
client_dc: list[DataChannel] = []
@client.on(PeerJoined)
def _(ev: PeerJoined) -> None:
raw = ev.peer.create_data_channel("chat")
dc = DataChannel(raw)
client_dc.append(dc)
dc.on(DCOpen, lambda _e: opened.set())
dc.on(DCMessage, lambda m: echoes.append(m.data))
create_data_channel returns a raw aiortc channel; DataChannel(raw) wraps it. Note both registration styles are used: the decorator form (@dc.on(DCMessage) on the bot) and the call form (dc.on(DCOpen, handler) here) — they're equivalent, and handlers may be sync or async.
Driving it
Join both peers, wait for the channel to come up (DCOpen sets the opened event), send the messages, and wait for the echoes:
try:
await bot.join(CHANNEL)
await client.join(CHANNEL)
await asyncio.wait_for(opened.wait(), 30) # wait for ICE/DTLS/SCTP to come up
for m in MESSAGES:
await client_dc[0].send(m)
await _wait(lambda: len(echoes) == len(MESSAGES), 10)
print(f"OK — bot echoed {echoes!r}")
return 0 if echoes == MESSAGES else 1
except TimeoutError:
print(f"FAILED — timed out (echoes so far: {echoes!r})", file=sys.stderr)
return 1
finally:
await bot.close()
await client.close()
The await asyncio.wait_for(opened.wait(), 30) is doing real work: opening a P2P channel means ICE candidate gathering, DTLS handshake, and SCTP setup, which take a moment after join returns. Don't send() before DCOpen fires.
await client_dc[0].send(m) is the backpressure-aware send on the client side; here the payloads are tiny, so it returns immediately, but the same call would suspend under load.
Always close() both peers in a finally — that tears down the connection and the channel cleanly.
The reconnect gotcha
A DataChannel does not survive a reconnect. It wraps a channel tied to the underlying RTCPeerConnection; when the SDK re-establishes a peer's connection, every channel on the old connection closes (you'll see DCClose).
This example is short-lived, so it opens the channel once on PeerJoined and is done. A long-running app must re-open the channel on every reconnect. The right hook is the remote peer's StateChange → "connected", which fires on the initial connect and on every reconnect cycle:
from metered_realtime import DataChannel, StateChange, DCMessage, PeerJoined, PeerLeft
channels = {} # peer id -> DataChannel
@peer.on(PeerJoined)
def on_join(ev: PeerJoined) -> None:
remote = ev.peer
@remote.on(StateChange)
def on_state(sc: StateChange) -> None:
if sc.to != "connected":
return
old = channels.pop(remote.id, None)
if old is not None:
old.close() # discard the channel from the dead connection
raw = remote.create_data_channel("chat")
dc = DataChannel(raw)
dc.on(DCMessage, lambda m: handle(m.data))
channels[remote.id] = dc
@peer.on(PeerLeft)
def on_left(ev: PeerLeft) -> None:
dc = channels.pop(ev.peer.id, None)
if dc is not None:
dc.close()
If you skip this and only open the channel on PeerJoined, your channel silently stops working after the first reconnect.
See also
DataChannelreference — the full wrapper API, backpressure, andDataChannelOverflowErrorMeteredPeerreference —create_data_channel, theDataChannelOpenedevent, andsendvssend_torouting- AI Agent Communication guide — using a data channel as the control plane alongside media