Skip to main content

Media

Media input is where the Python SDK shines. There's no getUserMedia server-side, so you supply the track — synthesized speech from an AI agent, an IP-camera feed, a file, a local device. This page covers the grouping type (MediaStream), the push-based audio track (AudioSource), the received-track consumer (iter_frames), and the file/device source helpers (from_file and friends).

Everything here requires the webrtc extra:

pip install "metered-realtime[webrtc]"

The source helpers (AudioSource, iter_frames, from_file, …) are exported lazily — importing them pulls in the WebRTC backend (aiortc + av), so a pub/sub-only install never loads it. MediaStream and StreamMetadata are always available.

Provisional API

The media-input helpers — AudioSource, iter_frames, and the from_* source helpers — are Python-specific and may change in a minor release. The core media API on MeteredPeer (add_stream / add_track / replace_track) and the MediaStream grouping type are stable.

from metered_realtime import (
MediaStream,
AudioSource,
iter_frames,
from_file,
from_rtsp,
from_camera,
from_microphone,
screen_share,
)

Any aiortc-compatible MediaStreamTrack works too — pass it straight to peer.add_track(...).

MediaStream

A lightweight grouping of tracks under one stream id — the Python counterpart of the browser's MediaStream. On the sending side, pass one to peer.add_stream(...) (or as the stream argument of add_track) and a receiving peer sees the same stream.id. On the receiving side the SDK constructs one per remote stream and surfaces it on the Track / StreamAdded events.

from metered_realtime import MediaStream

stream = MediaStream(id="agent-voice") # explicit, stable id
stream.add_track(source) # group a track under it
peer.add_stream(stream) # send every track to all peers

Construct

MediaStream(tracks=(), *, id=None)
ArgumentTypeDefaultNotes
tracksiterable of tracks()Tracks to seed the grouping with.
idstrrandom hexThe stream id seen by receivers. Pass an explicit one when you want a meaningful, stable name ("agent-voice", "front-door-cam").

Methods

MethodReturnsNotes
get_tracks()listSnapshot of the stream's tracks.
get_audio_tracks()listTracks whose kind == "audio".
get_video_tracks()listTracks whose kind == "video".
add_track(track)NoneAdd a track to the grouping. No effect on already-negotiated senders — call peer.add_track with this stream to actually send it.
remove_track(track)NoneRemove a track from the grouping.

stream.id is stable; the object identity isn't

stream.id is stable from sender to receiver and across reconnects — it's how you correlate a remote stream over time (the agent's voice is always "agent-voice"). But the MediaStream object a receiver gets is a fresh one per remote stream, and a new object after each reconnect. Re-take your reference on every StreamAdded rather than caching the object:

streams = {}  # stream.id -> latest MediaStream object

@remote.on(StreamAdded)
def _(ev: StreamAdded) -> None:
streams[ev.stream.id] = ev.stream # re-take, keyed by the stable id

StreamMetadata

StreamMetadata is a plain dict[str, Any] of customer-defined fields you attach to tracks at add_stream / add_track time. It's delivered to receivers ahead of the media and surfaced on the Track / StreamAdded events' metadata field. Convention: role ("camera" | "screen" | "file" | …) and label (human-readable), plus anything your app needs.

peer.add_stream(
MediaStream([cam.video], id="front-door"),
{"role": "camera", "label": "Front Door"},
)

It travels over the messaging service and counts against the per-message size cap, so keep it small. Treat inbound metadata as untrusted peer input.

AudioSource

A push-based audio track for audio you generate or stream in — an AI agent's text-to-speech output being the canonical case. You construct it for your input format, attach it as a track, then push audio as it's produced. The track emits real-time-paced 48 kHz stereo audio to the room, inserting silence on underrun and applying backpressure when you outrun the link.

AudioSource is a MediaStreamTrack, so you send it like any other track:

from metered_realtime import AudioSource, MediaStream

source = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
peer.add_track(source, MediaStream(id="agent-voice"))

async for chunk in tts_stream(): # bytes of s16 PCM
await source.push(chunk)
source.end() # stop the track when done

Construct

AudioSource(*, input_rate=16_000, input_layout="mono", max_buffered_seconds=5.0, logger=None)
ArgumentTypeDefaultNotes
input_rateint16_000Sample rate of the raw PCM you'll push(). Audio is resampled to 48 kHz internally.
input_layoutstr"mono""mono" or "stereo" only — anything else raises ValueError. For other layouts, build an av.AudioFrame yourself and push that.
max_buffered_secondsfloat5.0Backpressure cap. push() suspends once this many seconds are buffered. Raise it to tolerate burstier producers, lower it to cap latency/memory.
loggerLoggerNoopLoggerOptional.

All arguments are keyword-only.

await push(data)None

Queue audio for emission. data is either:

  • raw s16 PCM bytes in the configured input_rate / input_layout, or
  • an av.AudioFrame, which carries its own format (so input_rate / input_layout apply only to raw bytes).

The audio is resampled to 48 kHz stereo and emitted in real-time 20 ms frames.

await source.push(pcm_bytes)        # s16 PCM in the configured input format
await source.push(av_audio_frame) # an av.AudioFrame (carries its own format)

Key behaviours:

  • Backpressure. push() suspends once the buffer reaches max_buffered_secondseven within a single oversized push — so a faster-than-real-time producer can't grow memory without bound. It resumes as the track drains below the cap.
  • Underrun → silence. If the buffer runs dry the track emits silence so it stays live; push again and audio resumes.
  • Partial samples are buffered. Empty or sub-sample byte chunks are held until a whole sample arrives, so a producer that splits a chunk mid-sample never desyncs the stream.
  • No-op after end() (or once the track has stopped).

end()None

Signal end-of-stream. The track emits any remaining buffered audio, then stops (recv raises and the track ends). Idempotent. Call it when your producer is done so the track terminates cleanly rather than emitting silence forever.

source.end()

buffered_secondsfloat

Read-only. Seconds of audio currently buffered, for introspection (e.g. logging how far ahead of real time your producer is running).

if source.buffered_seconds > 2.0:
... # producer is well ahead of the link

The "agent TTS out" use case

AudioSource is the output half of a voice agent: as your text-to-speech produces PCM, you push() it and the SDK paces it out to every peer in real time. You don't manage timing or framing — push chunks as they arrive, end() when the utterance is done. Pair it with iter_frames on the inbound side for the full speech-to-text → LLM → text-to-speech loop.

iter_frames

async def iter_frames(track) -> AsyncIterator   # yields av.AudioFrame / av.VideoFrame

An async iterator over a received track's frames. The loop exits cleanly when the track stops — the peer stopped sending, or the connection dropped — with no error handling needed on your side.

from metered_realtime import iter_frames

async for frame in iter_frames(track):
... # av.AudioFrame / av.VideoFrame

You get the track from a remote peer's Track event (t.track):

@remote.on(Track)
async def _(t: Track) -> None:
async for frame in iter_frames(t.track):
feed_stt(frame)

You own the track's lifecycle. If you stop iterating early, stop the track yourself (or, for a file/device source, stop the source's player) to release the decoder / device / socket — iter_frames won't do it for you.

The "agent STT in" use case

iter_frames is the input half of a voice agent: it turns a peer's audio track into an async stream of av.AudioFrames you can convert to PCM and feed to speech-to-text. One iter_frames loop per inbound track you care about, started from that peer's Track handler.

Source helpers

These build a track-bearing source from a file, a network stream, or a local device. Each returns a player object whose .audio and .video attributes are the contained tracks — either may be None if the source lacks that media, so check before add_track:

from metered_realtime import from_file

src = from_file("clip.mp4")
if src.video is not None:
peer.add_track(src.video, MediaStream(id="clip"))
if src.audio is not None:
peer.add_track(src.audio, MediaStream(id="clip"))

You own the player. Stop its tracks when you're done so the underlying decoder / device / socket is released.

Real device capture is platform- and FFmpeg-dependent (V4L2 / ALSA on Linux, AVFoundation on macOS, DirectShow / gdigrab / x11grab on Windows). Device naming and availability vary by OS and FFmpeg build; from_file / from_rtsp are the portable options.

from_file

from_file(file, *, loop=False)

A source backed by a media file (or file-like / HTTP URL). loop=True repeats a seekable file indefinitely.

peer.add_track(from_file("clip.mp4").audio, MediaStream(id="clip"))

Do not pass untrusted / externally-controlled file values. FFmpeg selects the protocol/demuxer from the string (a path, an http(s):// URL, or pseudo-protocols like concat: / data:), so a hostile string can read arbitrary local files or fetch arbitrary URLs (SSRF). Validate before forwarding.

from_rtsp

from_rtsp(url, *, transport="tcp")

A source backed by an RTSP (or HTTP) stream — most IP cameras. transport is the RTSP lower transport, one of {"tcp", "udp", "udp_multicast", "http"} (anything else raises ValueError); tcp is the most firewall-friendly.

cam = from_rtsp("rtsp://camera.local:554/stream")
if cam.video is not None:
peer.add_track(cam.video, MediaStream(id="front-door"), {"role": "camera"})

Same FFmpeg protocol caveat as from_file — don't pass an untrusted url.

from_camera

from_camera(device=None, *, width=640, height=480, fps=30)

A source backed by a local camera. device defaults to the platform's first camera. Returns a source whose .video is the camera track (.audio is typically None).

from_microphone

from_microphone(device=None)

A source backed by a local microphone. device defaults to the platform's default input. Returns a source whose .audio is the microphone track.

screen_share

screen_share(device=None, *, width=0, height=0, fps=30)

A source backed by screen capture. device defaults to the primary display. width / height of 0 keep the capture's native size. Returns a source whose .video is the screen track.

End-to-end: an audio agent in a room

The "AI agent in the call" shape — one peer pushes synthesized audio, another consumes a peer's track frame-by-frame. (Adapted from examples/audio_agent.py.)

import asyncio
from metered_realtime import (
AudioSource, MediaStream, MeteredPeer, PeerJoined, Track, iter_frames,
)

CHANNEL = "my-voice-agent"

async def main() -> None:
agent = MeteredPeer(api_key="pk_live_…")
listener = MeteredPeer(api_key="pk_live_…")

# --- agent: stream synthesized audio to everyone in the room (TTS out) ---
source = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
agent.add_track(source, MediaStream(id="agent-voice"))

# --- listener: consume the agent's audio (this is your STT feed) ---
@listener.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(Track)
async def _(t: Track) -> None:
async for _frame in iter_frames(t.track): # av.AudioFrame per 20 ms
... # feed speech-to-text

try:
await listener.join(CHANNEL)
await agent.join(CHANNEL)
async for chunk in tts_stream(): # bytes of s16 PCM
await source.push(chunk) # resampled + paced to 48 kHz
source.end()
await asyncio.sleep(2) # let the tail drain
finally:
await agent.close()
await listener.close()

asyncio.run(main())

A real agent runs the inbound iter_frames loop through speech-to-text → an LLM → text-to-speech, and pushes each TTS chunk back out through the AudioSource as it's produced.

See also