AI Agent Communication
The headline Python use case: an AI agent that joins a room and talks. A human speaks; your agent hears them (speech-to-text), thinks (an LLM), and answers out loud (text-to-speech) — all over a live WebRTC connection. The canonical deployment is a Python agent on one side and a human in a browser on the other, but the agent doesn't care what's on the far end; it just joins a channel and exchanges audio with whoever's there.
The SDK is the transport. It carries the human's audio into your process and carries your synthesized speech back out, in real time, with reconnects handled for you. Speech-to-text, the LLM, and text-to-speech are your components — the SDK has no opinion about which you use. Everywhere below they show up as clearly-marked hooks (your_stt(...), your_llm(...), your_tts(...)); they are not SDK APIs.
This guide needs the webrtc extra:
pip install "metered-realtime[webrtc]"
The loop
human (browser) your Python agent
─────────────── ─────────────────
mic ──audio──► Track ──► iter_frames ──► your STT ──► text
│
your LLM
│
speaker ◄─audio─ AudioSource ◄── push() ◄── your TTS ◄─ reply text
Two halves, both async:
- Inbound (STT feed). The human's microphone arrives as a
Track. You read it frame-by-frame withiter_frames, convert eachav.AudioFrameto PCM, and feed your speech-to-text. - Outbound (TTS out). You attach an
AudioSourceonce, thenpush()your text-to-speech PCM as it's produced. The SDK paces it out to the room in real time.
Full skeleton
This mirrors examples/audio_agent.py, reframed as the real STT → LLM → TTS loop with your components stubbed as hooks. One MeteredPeer, joined to a room, doing both halves.
import asyncio
from metered_realtime import (
AudioSource, MediaStream, MeteredPeer, PeerJoined, Track, iter_frames,
)
ROOM = "support-call-42"
async def main() -> None:
agent = MeteredPeer(api_key="pk_live_…")
# --- outbound: one AudioSource for the agent's voice (your TTS lands here) ---
voice = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
agent.add_track(voice, MediaStream(id="agent-voice"))
# --- inbound: when a human joins, consume their mic and run the loop ---
@agent.on(PeerJoined)
def _(ev: PeerJoined) -> None:
# An async Track handler is scheduled + tracked by the SDK, so you can
# consume the whole stream right here.
@ev.peer.on(Track)
async def _(t: Track) -> None:
if t.track.kind != "audio":
return # ignore video/screen tracks
await run_conversation(t.track, voice)
async with agent.joined(ROOM):
await asyncio.Future() # run until cancelled
async def run_conversation(track, voice: AudioSource) -> None:
"""One human's mic -> STT -> LLM -> TTS -> the agent's voice track."""
async for frame in iter_frames(track): # av.AudioFrame per ~20 ms
pcm = frame.to_ndarray().tobytes() # convert however your STT wants
transcript = your_stt(pcm) # ← YOUR speech-to-text
if not transcript:
continue # mid-utterance, keep listening
reply = your_llm(transcript) # ← YOUR LLM
async for chunk in your_tts(reply): # ← YOUR text-to-speech (s16 PCM)
await voice.push(chunk) # resampled + paced to 48 kHz
asyncio.run(main())
The hooks (your_stt, your_llm, your_tts) are placeholders for whatever libraries or services you wire in — a streaming STT client, an LLM call, a TTS client that yields PCM. Swap them for the real thing; nothing about the SDK changes.
iter_framesexits cleanly when the track ends (the human hung up or dropped) — theasync forjust stops, no exception to catch. See the Media reference.
Sample-rate handling
WebRTC audio on the wire is 48 kHz stereo. Most STT/TTS stacks work at 16 kHz mono. AudioSource bridges the output side for you: construct it for your PCM format and it resamples to 48 kHz internally.
voice = AudioSource(input_rate=16_000) # push 16 kHz mono; SDK -> 48 kHz
voice = AudioSource(input_rate=24_000, input_layout="mono") # e.g. a 24 kHz TTS voice
Only "mono" and "stereo" are accepted for input_layout; anything else raises ValueError. If your TTS emits a format the helper doesn't cover, build an av.AudioFrame yourself and push() that — it carries its own rate/layout.
The inbound side is the reverse: frames from iter_frames arrive at 48 kHz. Downsample to whatever your STT expects (most STT clients accept a target rate, or resample with av / numpy before feeding). Don't assume 16 kHz on the way in.
Barge-in (let the human interrupt)
Natural conversation means the human can talk over the agent. When your STT detects fresh speech mid-reply, stop the current utterance immediately. AudioSource.end() terminates the track after draining what's buffered — so for barge-in, the clean pattern is one AudioSource per utterance: end the current one and attach a fresh one for the next reply.
async def speak(agent: MeteredPeer, reply: str) -> AudioSource:
voice = AudioSource(input_rate=16_000)
agent.add_track(voice, MediaStream(id="agent-voice"))
async for chunk in your_tts(reply):
await voice.push(chunk)
voice.end() # drain + stop this utterance
return voice
# On barge-in: stop the in-flight utterance and remove its track.
current = await speak(agent, reply)
# … STT detects the human interrupting …
current.end()
agent.remove_track(current) # stop sending the cut-off voice
end() is idempotent and push() is a no-op afterward, so a late TTS chunk racing the interrupt can't resurrect the stopped track. To keep latency tight (less buffered audio to abandon on a barge-in), construct with a smaller cap, e.g. AudioSource(input_rate=16_000, max_buffered_seconds=1.0).
Transcripts and control over a side channel
Audio is the media path; everything else (live transcripts, tool-call status, "agent is thinking…", a hang-up signal) rides a data path. Two options:
Server-routed messaging — simplest, works the instant a peer is known. Broadcast with peer.send(...) or target one peer with peer.send_to(peer_id, ...). The far side receives a Data event.
from metered_realtime import Data
@agent.on(Data)
def _(ev: Data) -> None:
if ev.data.get("type") == "hangup":
... # human ended the call
# stream the running transcript to everyone in the room
await agent.send({"type": "transcript", "role": "agent", "text": partial})
These go Peer → Metered server → Peer and count against your message quota, so they're ideal for low-rate control and transcript updates, not per-frame data.
A real data channel — peer-to-peer, lower latency, off the message quota. Use it for chatty, high-rate control. The browser human opens it; your agent receives it as a DataChannelOpened event and wraps the raw channel:
from metered_realtime import DataChannel, DCMessage, DataChannelOpened
@agent.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(DataChannelOpened)
def _(opened: DataChannelOpened) -> None:
dc = DataChannel(opened.channel) # backpressure-aware wrapper
@dc.on(DCMessage)
def _(m: DCMessage) -> None:
handle_control(m.data)
Your agent can also open one toward a peer with ev.peer.create_data_channel("control"). See Data Channels for the wrapper's backpressure handling and the reconnect gotcha.
Rule of thumb: transcripts and occasional control → peer.send / peer.send_to; high-rate or latency-sensitive control → a data channel.
Interoperating with a browser human
The canonical setup is a Python agent ↔ a browser human, and it Just Works — the agent joins the same channel name the browser joins, and the SDK negotiates the connection. A few Python-specific notes:
- You supply the agent's track. There's no microphone on a server; the
AudioSourceis the agent's "mic". The human's browser supplies theirs viagetUserMediaon its side. - Group the agent's voice under a named stream (
MediaStream(id="agent-voice")) so the browser can label it in its UI. Thestream.idis stable across reconnects. - The human's audio is just a
Track. Checkt.track.kind == "audio"and ignore video/screen tracks if the human also shares those.
Many humans, one agent
add_track sends the agent's voice to every current and future peer in the room automatically — drop a second human into the channel and they hear the agent too, no extra code. On the inbound side you get one Track event per peer, so start one run_conversation (one STT loop) per human. If you want a single mixed transcript, tag each by ev.peer.id.
For a true multi-party agent (a meeting bot), this is the whole pattern: one outbound voice fanned out to all, one inbound STT loop per participant.
Long-running agents: reconnects
A voice agent is a long-lived process, not a page that a user can refresh. Configure reconnect for daemon-style operation:
from metered_realtime import MeteredPeer, ReconnectOptions
agent = MeteredPeer(
api_key="pk_live_…",
reconnect=ReconnectOptions(
max_attempts=float("inf"), # never give up — no one's there to restart it
max_delay=60.0, # cap backoff at 1 minute
),
)
The default max_attempts=100 is tuned for browsers; an unattended agent wants float("inf") so a transient cloud blip doesn't take it offline permanently. Across a reconnect the RemotePeer objects you hold stay valid — but inbound media arrives on a fresh track, so re-arm your iter_frames loop from the peer's Track event each time it fires (it re-fires after recovery). Listen for FatalError to catch terminal conditions (token revoked, account suspended) and alert your ops path rather than spinning silently.
For the full reconnect playbook, see Reconnect Best Practices.
Pitfalls
Treating STT/LLM/TTS as part of the SDK. They aren't. The SDK moves audio and data; you bring the intelligence. Pick libraries that stream (incremental STT, token-streaming LLM, chunked TTS) so the loop stays low-latency.
Blocking the event loop. A synchronous LLM or TTS call inside the
iter_framesloop stalls all the agent's audio (theAudioSourcewill emit silence while you're blocked). Run blocking work inasyncio.to_thread(...)or use async clients.Forgetting
end(). AnAudioSourceyou neverend()keeps the track live, emitting silence forever. End each utterance (or the whole source on shutdown) so the track terminates cleanly.Assuming 16 kHz inbound. Frames from
iter_framesare 48 kHz. Downsample before feeding STT.Pushing TTS faster than real time without bounding latency.
push()applies backpressure atmax_buffered_seconds, so memory is safe — but a big buffer means a long delay before a barge-in actually goes quiet. Keep the cap small for conversational latency.Re-using one
AudioSourceafterend(). It's terminal. Construct a fresh one for the next utterance (this is also what makes barge-in clean).Caching the inbound track across reconnects. Media re-arrives on a new track after recovery. Re-take it from the
Trackevent each time rather than holding the old object.
See also
- Media reference —
AudioSource,iter_frames, and the source helpers MeteredPeerreference —add_track,send,send_to, theTrack/PeerJoinedevents- Data Channels — low-latency P2P control/transcript path
- Reconnect Best Practices — daemon-style settings for long-lived agents