Skip to main content

DataChannel

A backpressure-aware wrapper around an aiortc data channel, exposing an async send() and typed events. Use it when you're sending more bytes per second than the channel's buffer can sustain — telemetry, file transfer, game state at high frequency — and could outpace the send queue.

You always wrap a raw channel: either one you opened with remote.create_data_channel(...) (outbound), or one the remote opened, delivered on the DataChannelOpened event (inbound).

from metered_realtime import DataChannel, DataChannelOpened

# Outbound — you open the channel:
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(raw)

# Inbound — the remote opened it:
@remote.on(DataChannelOpened)
def on_dc(ev: DataChannelOpened) -> None:
dc = DataChannel(ev.channel) # ev.channel is the raw channel

Why backpressure matters

Calling send() faster than the channel can flush would otherwise grow an unbounded buffer, climbing memory until the process runs out. DataChannel throttles sends against the channel's buffered byte count: send() suspends while the buffer is over max_buffered_amount and resumes as it drains, with FIFO ordering across concurrent senders. Past a hard queue cap it raises instead of buffering forever, so a producer on a stalled link gets a real back-off signal.

Construct

from metered_realtime import DataChannel, StateChange

@remote.on(StateChange)
def on_state(sc: StateChange) -> None:
if sc.to != "connected":
return
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(
raw,
max_buffered_amount=1_048_576, # suspend sending while buffer > 1 MB (default)
max_queued_sends=256, # raise if 256 sends are already queued (default)
)
# …use dc.send below

Constructor

DataChannel(
channel, # the raw aiortc data channel (positional)
*,
max_buffered_amount=1_048_576, # 1 MB
buffered_amount_low_threshold=None, # defaults to max_buffered_amount // 2
max_queued_sends=256,
buffer_poll_interval=0.1, # seconds
logger=None,
)
ParameterTypeDefaultNotes
channelraw aiortc data channelFrom remote.create_data_channel(...) or the DataChannelOpened event.
max_buffered_amountint1_048_576 (1 MB)When the buffered byte count would exceed this, send() suspends until the buffer drains.
buffered_amount_low_thresholdint \| Nonemax_buffered_amount // 2Resume threshold. A non-zero value is what makes the underlying drain event fire at all.
max_queued_sendsint256Hard cap on concurrently queued sends. The (256+1)th raises DataChannelOverflowError.
buffer_poll_intervalfloat0.1Seconds between defensive buffer re-checks while suspended, so a send can't wait forever if the drain event never re-fires on a settled buffer.
loggerLogger \| NoneNoopLoggerOptional structured logger.

Methods

await dc.send(data)None

data is str or bytes. Queues a send; resolves once the payload is handed to the underlying channel. Suspends while the buffer is over max_buffered_amount and resumes as it drains. Concurrent calls are chain-sequenced (FIFO) so a single drain wake-up can't stampede the buffer.

for chunk in file_chunks:   # chunk: bytes
await dc.send(chunk) # backpressure-aware

Raises:

  • DataChannelOverflowErrormax_queued_sends sends are already waiting. Your producer is faster than the network: throttle upstream or raise the cap. Carries .queued and .cap.
  • MeteredRealtimeError — the channel is closed (or closes mid-send), or a single payload is larger than max_buffered_amount (it could never drain below the cap and would otherwise suspend forever — split it or raise the cap).
from metered_realtime import DataChannelOverflowError

try:
await dc.send(chunk)
except DataChannelOverflowError as e:
# e.queued = current pending count, e.cap = your max_queued_sends
pause_producer()

dc.close()None

Synchronous. Closes the underlying channel and detaches listeners. Idempotent. Any suspended send() is released and fails fast. Wrapping a channel that the remote closes (or whose connection drops) fires DCClose on its own — close() is what you call when you are done with it.

Read-only state

dc.label             # str — the channel label (e.g. "file-transfer")
dc.ready_state # "connecting" | "open" | "closing" | "closed"
dc.buffered_amount # int — current bytes pending in the underlying buffer

Events

Register typed event classes with the @dc.on(EventType) decorator (or dc.on(EventType, handler)); dc.once(...), dc.off(...), and async for ev in dc.events(EventType): ... also work. Handlers may be sync or async.

EventFieldsWhen
DCOpenThe channel transitioned to open.
DCCloseThe channel closed (either side, or its connection dropped).
DCMessagedata (str \| bytes)An inbound payload arrived.
DCErrorerr (Exception)The underlying channel reported an error — distinct from a send() failure, which raises from the await instead.
from metered_realtime import DCOpen, DCClose, DCMessage, DCError

@dc.on(DCOpen)
def _open(ev: DCOpen) -> None:
print("channel open")

@dc.on(DCMessage)
def _msg(ev: DCMessage) -> None:
handle(ev.data) # str or bytes

@dc.on(DCError)
def _err(ev: DCError) -> None:
log.warning("dc error: %r", ev.err)

@dc.on(DCClose)
def _close(ev: DCClose) -> None:
print("channel closed")

What survives a reconnect

Nothing. A DataChannel wraps a channel that is tied to the RTCPeerConnection it was opened on. When the SDK re-establishes a survivor's connection on reconnect, every channel on the old connection closes (you'll see DCClose).

The right pattern: re-open the raw channel and re-construct the wrapper on each StateChange"connected" event. That fires once on the initial connect and once per reconnect cycle:

from metered_realtime import DataChannel, StateChange, DCMessage, PeerJoined, PeerLeft

channels = {} # peer id -> DataChannel

@peer.on(PeerJoined)
def on_join(ev: PeerJoined) -> None:
remote = ev.peer

@remote.on(StateChange)
def on_state(sc: StateChange) -> None:
if sc.to != "connected":
return
old = channels.pop(remote.id, None)
if old is not None:
old.close()
raw = remote.create_data_channel("file-transfer")
dc = DataChannel(raw, max_buffered_amount=1_048_576)
dc.on(DCMessage, lambda m: handle(m.data))
channels[remote.id] = dc

@peer.on(PeerLeft)
def on_left(ev: PeerLeft) -> None:
dc = channels.pop(ev.peer.id, None)
if dc is not None:
dc.close()

See Reconnect Best Practices for the full pattern including reconnect-aware producer pause/resume.

When NOT to use this wrapper

  • Server-routed messages. peer.send(data) / remote.send(data) already have their own queueing at the wire layer. DataChannel is specifically for P2P channels.
  • Reliable file transfer with chunk-level retry. This wrapper handles backpressure, not retry-on-failure. If the channel closes mid-transfer, you need application-level resume logic.

See also