DataChannel
A backpressure-aware wrapper around an aiortc data channel, exposing an async send() and typed events. Use it when you're sending more bytes per second than the channel's buffer can sustain — telemetry, file transfer, game state at high frequency — and could outpace the send queue.
You always wrap a raw channel: either one you opened with remote.create_data_channel(...) (outbound), or one the remote opened, delivered on the DataChannelOpened event (inbound).
from metered_realtime import DataChannel, DataChannelOpened
# Outbound — you open the channel:
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(raw)
# Inbound — the remote opened it:
@remote.on(DataChannelOpened)
def on_dc(ev: DataChannelOpened) -> None:
dc = DataChannel(ev.channel) # ev.channel is the raw channel
Why backpressure matters
Calling send() faster than the channel can flush would otherwise grow an unbounded buffer, climbing memory until the process runs out. DataChannel throttles sends against the channel's buffered byte count: send() suspends while the buffer is over max_buffered_amount and resumes as it drains, with FIFO ordering across concurrent senders. Past a hard queue cap it raises instead of buffering forever, so a producer on a stalled link gets a real back-off signal.
Construct
from metered_realtime import DataChannel, StateChange
@remote.on(StateChange)
def on_state(sc: StateChange) -> None:
if sc.to != "connected":
return
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(
raw,
max_buffered_amount=1_048_576, # suspend sending while buffer > 1 MB (default)
max_queued_sends=256, # raise if 256 sends are already queued (default)
)
# …use dc.send below
Constructor
DataChannel(
channel, # the raw aiortc data channel (positional)
*,
max_buffered_amount=1_048_576, # 1 MB
buffered_amount_low_threshold=None, # defaults to max_buffered_amount // 2
max_queued_sends=256,
buffer_poll_interval=0.1, # seconds
logger=None,
)
| Parameter | Type | Default | Notes |
|---|---|---|---|
channel | raw aiortc data channel | — | From remote.create_data_channel(...) or the DataChannelOpened event. |
max_buffered_amount | int | 1_048_576 (1 MB) | When the buffered byte count would exceed this, send() suspends until the buffer drains. |
buffered_amount_low_threshold | int \| None | max_buffered_amount // 2 | Resume threshold. A non-zero value is what makes the underlying drain event fire at all. |
max_queued_sends | int | 256 | Hard cap on concurrently queued sends. The (256+1)th raises DataChannelOverflowError. |
buffer_poll_interval | float | 0.1 | Seconds between defensive buffer re-checks while suspended, so a send can't wait forever if the drain event never re-fires on a settled buffer. |
logger | Logger \| None | NoopLogger | Optional structured logger. |
Methods
await dc.send(data) → None
data is str or bytes. Queues a send; resolves once the payload is handed to the underlying channel. Suspends while the buffer is over max_buffered_amount and resumes as it drains. Concurrent calls are chain-sequenced (FIFO) so a single drain wake-up can't stampede the buffer.
for chunk in file_chunks: # chunk: bytes
await dc.send(chunk) # backpressure-aware
Raises:
DataChannelOverflowError—max_queued_sendssends are already waiting. Your producer is faster than the network: throttle upstream or raise the cap. Carries.queuedand.cap.MeteredRealtimeError— the channel is closed (or closes mid-send), or a single payload is larger thanmax_buffered_amount(it could never drain below the cap and would otherwise suspend forever — split it or raise the cap).
from metered_realtime import DataChannelOverflowError
try:
await dc.send(chunk)
except DataChannelOverflowError as e:
# e.queued = current pending count, e.cap = your max_queued_sends
pause_producer()
dc.close() → None
Synchronous. Closes the underlying channel and detaches listeners. Idempotent. Any suspended send() is released and fails fast. Wrapping a channel that the remote closes (or whose connection drops) fires DCClose on its own — close() is what you call when you are done with it.
Read-only state
dc.label # str — the channel label (e.g. "file-transfer")
dc.ready_state # "connecting" | "open" | "closing" | "closed"
dc.buffered_amount # int — current bytes pending in the underlying buffer
Events
Register typed event classes with the @dc.on(EventType) decorator (or dc.on(EventType, handler)); dc.once(...), dc.off(...), and async for ev in dc.events(EventType): ... also work. Handlers may be sync or async.
| Event | Fields | When |
|---|---|---|
DCOpen | — | The channel transitioned to open. |
DCClose | — | The channel closed (either side, or its connection dropped). |
DCMessage | data (str \| bytes) | An inbound payload arrived. |
DCError | err (Exception) | The underlying channel reported an error — distinct from a send() failure, which raises from the await instead. |
from metered_realtime import DCOpen, DCClose, DCMessage, DCError
@dc.on(DCOpen)
def _open(ev: DCOpen) -> None:
print("channel open")
@dc.on(DCMessage)
def _msg(ev: DCMessage) -> None:
handle(ev.data) # str or bytes
@dc.on(DCError)
def _err(ev: DCError) -> None:
log.warning("dc error: %r", ev.err)
@dc.on(DCClose)
def _close(ev: DCClose) -> None:
print("channel closed")
What survives a reconnect
Nothing. A DataChannel wraps a channel that is tied to the RTCPeerConnection it was opened on. When the SDK re-establishes a survivor's connection on reconnect, every channel on the old connection closes (you'll see DCClose).
The right pattern: re-open the raw channel and re-construct the wrapper on each StateChange → "connected" event. That fires once on the initial connect and once per reconnect cycle:
from metered_realtime import DataChannel, StateChange, DCMessage, PeerJoined, PeerLeft
channels = {} # peer id -> DataChannel
@peer.on(PeerJoined)
def on_join(ev: PeerJoined) -> None:
remote = ev.peer
@remote.on(StateChange)
def on_state(sc: StateChange) -> None:
if sc.to != "connected":
return
old = channels.pop(remote.id, None)
if old is not None:
old.close()
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(raw, max_buffered_amount=1_048_576)
dc.on(DCMessage, lambda m: handle(m.data))
channels[remote.id] = dc
@peer.on(PeerLeft)
def on_left(ev: PeerLeft) -> None:
dc = channels.pop(ev.peer.id, None)
if dc is not None:
dc.close()
See Reconnect Best Practices for the full pattern including reconnect-aware producer pause/resume.
When NOT to use this wrapper
- Server-routed messages.
peer.send(data)/remote.send(data)already have their own queueing at the wire layer.DataChannelis specifically for P2P channels. - Reliable file transfer with chunk-level retry. This wrapper handles backpressure, not retry-on-failure. If the channel closes mid-transfer, you need application-level resume logic.
See also
RemotePeer—create_data_channel(...)opens channels; theDataChannelOpenedevent delivers inbound ones- Errors & Codes —
DataChannelOverflowError,MeteredRealtimeError - Reconnect Best Practices