Media
Media input is where the Python SDK shines. There's no getUserMedia server-side, so you supply the track — synthesized speech from an AI agent, an IP-camera feed, a file, a local device. This page covers the grouping type (MediaStream), the push-based audio track (AudioSource), the received-track consumer (iter_frames), and the file/device source helpers (from_file and friends).
Everything here requires the webrtc extra:
pip install "metered-realtime[webrtc]"
The source helpers (AudioSource, iter_frames, from_file, …) are exported lazily — importing them pulls in the WebRTC backend (aiortc + av), so a pub/sub-only install never loads it. MediaStream and StreamMetadata are always available.
The media-input helpers — AudioSource, iter_frames, and the from_* source helpers — are Python-specific and may change in a minor release. The core media API on MeteredPeer (add_stream / add_track / replace_track) and the MediaStream grouping type are stable.
from metered_realtime import (
MediaStream,
AudioSource,
iter_frames,
from_file,
from_rtsp,
from_camera,
from_microphone,
screen_share,
)
Any aiortc-compatible MediaStreamTrack works too — pass it straight to peer.add_track(...).
MediaStream
A lightweight grouping of tracks under one stream id — the Python counterpart of the browser's MediaStream. On the sending side, pass one to peer.add_stream(...) (or as the stream argument of add_track) and a receiving peer sees the same stream.id. On the receiving side the SDK constructs one per remote stream and surfaces it on the Track / StreamAdded events.
from metered_realtime import MediaStream
stream = MediaStream(id="agent-voice") # explicit, stable id
stream.add_track(source) # group a track under it
peer.add_stream(stream) # send every track to all peers
Construct
MediaStream(tracks=(), *, id=None)
| Argument | Type | Default | Notes |
|---|---|---|---|
tracks | iterable of tracks | () | Tracks to seed the grouping with. |
id | str | random hex | The stream id seen by receivers. Pass an explicit one when you want a meaningful, stable name ("agent-voice", "front-door-cam"). |
Methods
| Method | Returns | Notes |
|---|---|---|
get_tracks() | list | Snapshot of the stream's tracks. |
get_audio_tracks() | list | Tracks whose kind == "audio". |
get_video_tracks() | list | Tracks whose kind == "video". |
add_track(track) | None | Add a track to the grouping. No effect on already-negotiated senders — call peer.add_track with this stream to actually send it. |
remove_track(track) | None | Remove a track from the grouping. |
stream.id is stable; the object identity isn't
stream.id is stable from sender to receiver and across reconnects — it's how you correlate a remote stream over time (the agent's voice is always "agent-voice"). But the MediaStream object a receiver gets is a fresh one per remote stream, and a new object after each reconnect. Re-take your reference on every StreamAdded rather than caching the object:
streams = {} # stream.id -> latest MediaStream object
@remote.on(StreamAdded)
def _(ev: StreamAdded) -> None:
streams[ev.stream.id] = ev.stream # re-take, keyed by the stable id
StreamMetadata
StreamMetadata is a plain dict[str, Any] of customer-defined fields you attach to tracks at add_stream / add_track time. It's delivered to receivers ahead of the media and surfaced on the Track / StreamAdded events' metadata field. Convention: role ("camera" | "screen" | "file" | …) and label (human-readable), plus anything your app needs.
peer.add_stream(
MediaStream([cam.video], id="front-door"),
{"role": "camera", "label": "Front Door"},
)
It travels over the messaging service and counts against the per-message size cap, so keep it small. Treat inbound metadata as untrusted peer input.
AudioSource
A push-based audio track for audio you generate or stream in — an AI agent's text-to-speech output being the canonical case. You construct it for your input format, attach it as a track, then push audio as it's produced. The track emits real-time-paced 48 kHz stereo audio to the room, inserting silence on underrun and applying backpressure when you outrun the link.
AudioSource is a MediaStreamTrack, so you send it like any other track:
from metered_realtime import AudioSource, MediaStream
source = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
peer.add_track(source, MediaStream(id="agent-voice"))
async for chunk in tts_stream(): # bytes of s16 PCM
await source.push(chunk)
source.end() # stop the track when done
Construct
AudioSource(*, input_rate=16_000, input_layout="mono", max_buffered_seconds=5.0, logger=None)
| Argument | Type | Default | Notes |
|---|---|---|---|
input_rate | int | 16_000 | Sample rate of the raw PCM you'll push(). Audio is resampled to 48 kHz internally. |
input_layout | str | "mono" | "mono" or "stereo" only — anything else raises ValueError. For other layouts, build an av.AudioFrame yourself and push that. |
max_buffered_seconds | float | 5.0 | Backpressure cap. push() suspends once this many seconds are buffered. Raise it to tolerate burstier producers, lower it to cap latency/memory. |
logger | Logger | NoopLogger | Optional. |
All arguments are keyword-only.
await push(data) → None
Queue audio for emission. data is either:
- raw
s16PCMbytesin the configuredinput_rate/input_layout, or - an
av.AudioFrame, which carries its own format (soinput_rate/input_layoutapply only to raw bytes).
The audio is resampled to 48 kHz stereo and emitted in real-time 20 ms frames.
await source.push(pcm_bytes) # s16 PCM in the configured input format
await source.push(av_audio_frame) # an av.AudioFrame (carries its own format)
Key behaviours:
- Backpressure.
push()suspends once the buffer reachesmax_buffered_seconds— even within a single oversized push — so a faster-than-real-time producer can't grow memory without bound. It resumes as the track drains below the cap. - Underrun → silence. If the buffer runs dry the track emits silence so it stays live; push again and audio resumes.
- Partial samples are buffered. Empty or sub-sample byte chunks are held until a whole sample arrives, so a producer that splits a chunk mid-sample never desyncs the stream.
- No-op after
end()(or once the track has stopped).
end() → None
Signal end-of-stream. The track emits any remaining buffered audio, then stops (recv raises and the track ends). Idempotent. Call it when your producer is done so the track terminates cleanly rather than emitting silence forever.
source.end()
buffered_seconds → float
Read-only. Seconds of audio currently buffered, for introspection (e.g. logging how far ahead of real time your producer is running).
if source.buffered_seconds > 2.0:
... # producer is well ahead of the link
The "agent TTS out" use case
AudioSource is the output half of a voice agent: as your text-to-speech produces PCM, you push() it and the SDK paces it out to every peer in real time. You don't manage timing or framing — push chunks as they arrive, end() when the utterance is done. Pair it with iter_frames on the inbound side for the full speech-to-text → LLM → text-to-speech loop.
iter_frames
async def iter_frames(track) -> AsyncIterator # yields av.AudioFrame / av.VideoFrame
An async iterator over a received track's frames. The loop exits cleanly when the track stops — the peer stopped sending, or the connection dropped — with no error handling needed on your side.
from metered_realtime import iter_frames
async for frame in iter_frames(track):
... # av.AudioFrame / av.VideoFrame
You get the track from a remote peer's Track event (t.track):
@remote.on(Track)
async def _(t: Track) -> None:
async for frame in iter_frames(t.track):
feed_stt(frame)
You own the track's lifecycle. If you stop iterating early, stop the track yourself (or, for a file/device source, stop the source's player) to release the decoder / device / socket — iter_frames won't do it for you.
The "agent STT in" use case
iter_frames is the input half of a voice agent: it turns a peer's audio track into an async stream of av.AudioFrames you can convert to PCM and feed to speech-to-text. One iter_frames loop per inbound track you care about, started from that peer's Track handler.
Source helpers
These build a track-bearing source from a file, a network stream, or a local device. Each returns a player object whose .audio and .video attributes are the contained tracks — either may be None if the source lacks that media, so check before add_track:
from metered_realtime import from_file
src = from_file("clip.mp4")
if src.video is not None:
peer.add_track(src.video, MediaStream(id="clip"))
if src.audio is not None:
peer.add_track(src.audio, MediaStream(id="clip"))
You own the player. Stop its tracks when you're done so the underlying decoder / device / socket is released.
Real device capture is platform- and FFmpeg-dependent (V4L2 / ALSA on Linux, AVFoundation on macOS, DirectShow / gdigrab / x11grab on Windows). Device naming and availability vary by OS and FFmpeg build;
from_file/from_rtspare the portable options.
from_file
from_file(file, *, loop=False)
A source backed by a media file (or file-like / HTTP URL). loop=True repeats a seekable file indefinitely.
peer.add_track(from_file("clip.mp4").audio, MediaStream(id="clip"))
Do not pass untrusted / externally-controlled
filevalues. FFmpeg selects the protocol/demuxer from the string (a path, anhttp(s)://URL, or pseudo-protocols likeconcat:/data:), so a hostile string can read arbitrary local files or fetch arbitrary URLs (SSRF). Validate before forwarding.
from_rtsp
from_rtsp(url, *, transport="tcp")
A source backed by an RTSP (or HTTP) stream — most IP cameras. transport is the RTSP lower transport, one of {"tcp", "udp", "udp_multicast", "http"} (anything else raises ValueError); tcp is the most firewall-friendly.
cam = from_rtsp("rtsp://camera.local:554/stream")
if cam.video is not None:
peer.add_track(cam.video, MediaStream(id="front-door"), {"role": "camera"})
Same FFmpeg protocol caveat as
from_file— don't pass an untrustedurl.
from_camera
from_camera(device=None, *, width=640, height=480, fps=30)
A source backed by a local camera. device defaults to the platform's first camera. Returns a source whose .video is the camera track (.audio is typically None).
from_microphone
from_microphone(device=None)
A source backed by a local microphone. device defaults to the platform's default input. Returns a source whose .audio is the microphone track.
screen_share
screen_share(device=None, *, width=0, height=0, fps=30)
A source backed by screen capture. device defaults to the primary display. width / height of 0 keep the capture's native size. Returns a source whose .video is the screen track.
End-to-end: an audio agent in a room
The "AI agent in the call" shape — one peer pushes synthesized audio, another consumes a peer's track frame-by-frame. (Adapted from examples/audio_agent.py.)
import asyncio
from metered_realtime import (
AudioSource, MediaStream, MeteredPeer, PeerJoined, Track, iter_frames,
)
CHANNEL = "my-voice-agent"
async def main() -> None:
agent = MeteredPeer(api_key="pk_live_…")
listener = MeteredPeer(api_key="pk_live_…")
# --- agent: stream synthesized audio to everyone in the room (TTS out) ---
source = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
agent.add_track(source, MediaStream(id="agent-voice"))
# --- listener: consume the agent's audio (this is your STT feed) ---
@listener.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(Track)
async def _(t: Track) -> None:
async for _frame in iter_frames(t.track): # av.AudioFrame per 20 ms
... # feed speech-to-text
try:
await listener.join(CHANNEL)
await agent.join(CHANNEL)
async for chunk in tts_stream(): # bytes of s16 PCM
await source.push(chunk) # resampled + paced to 48 kHz
source.end()
await asyncio.sleep(2) # let the tail drain
finally:
await agent.close()
await listener.close()
asyncio.run(main())
A real agent runs the inbound iter_frames loop through speech-to-text → an LLM → text-to-speech, and pushes each TTS chunk back out through the AudioSource as it's produced.
See also
MeteredPeer—add_track/add_streamsend your tracks; theTrack/StreamAddedevents deliver peers' media- Guide: AI Agent Communication — the full speech-to-text → LLM → text-to-speech loop
- Guide: IoT Telemetry — bridging an edge camera or sensor feed into a room