Skip to main content

AI Agent Communication

The headline Python use case: an AI agent that joins a room and talks. A human speaks; your agent hears them (speech-to-text), thinks (an LLM), and answers out loud (text-to-speech) — all over a live WebRTC connection. The canonical deployment is a Python agent on one side and a human in a browser on the other, but the agent doesn't care what's on the far end; it just joins a channel and exchanges audio with whoever's there.

The SDK is the transport. It carries the human's audio into your process and carries your synthesized speech back out, in real time, with reconnects handled for you. Speech-to-text, the LLM, and text-to-speech are your components — the SDK has no opinion about which you use. Everywhere below they show up as clearly-marked hooks (your_stt(...), your_llm(...), your_tts(...)); they are not SDK APIs.

This guide needs the webrtc extra:

pip install "metered-realtime[webrtc]"

The loop

human (browser)                     your Python agent
─────────────── ─────────────────
mic ──audio──► Track ──► iter_frames ──► your STT ──► text

your LLM

speaker ◄─audio─ AudioSource ◄── push() ◄── your TTS ◄─ reply text

Two halves, both async:

  • Inbound (STT feed). The human's microphone arrives as a Track. You read it frame-by-frame with iter_frames, convert each av.AudioFrame to PCM, and feed your speech-to-text.
  • Outbound (TTS out). You attach an AudioSource once, then push() your text-to-speech PCM as it's produced. The SDK paces it out to the room in real time.

Full skeleton

This mirrors examples/audio_agent.py, reframed as the real STT → LLM → TTS loop with your components stubbed as hooks. One MeteredPeer, joined to a room, doing both halves.

agent.py
import asyncio
from metered_realtime import (
AudioSource, MediaStream, MeteredPeer, PeerJoined, Track, iter_frames,
)

ROOM = "support-call-42"

async def main() -> None:
agent = MeteredPeer(api_key="pk_live_…")

# --- outbound: one AudioSource for the agent's voice (your TTS lands here) ---
voice = AudioSource(input_rate=16_000) # 16 kHz mono PCM in
agent.add_track(voice, MediaStream(id="agent-voice"))

# --- inbound: when a human joins, consume their mic and run the loop ---
@agent.on(PeerJoined)
def _(ev: PeerJoined) -> None:
# An async Track handler is scheduled + tracked by the SDK, so you can
# consume the whole stream right here.
@ev.peer.on(Track)
async def _(t: Track) -> None:
if t.track.kind != "audio":
return # ignore video/screen tracks
await run_conversation(t.track, voice)

async with agent.joined(ROOM):
await asyncio.Future() # run until cancelled

async def run_conversation(track, voice: AudioSource) -> None:
"""One human's mic -> STT -> LLM -> TTS -> the agent's voice track."""
async for frame in iter_frames(track): # av.AudioFrame per ~20 ms
pcm = frame.to_ndarray().tobytes() # convert however your STT wants
transcript = your_stt(pcm) # ← YOUR speech-to-text
if not transcript:
continue # mid-utterance, keep listening
reply = your_llm(transcript) # ← YOUR LLM
async for chunk in your_tts(reply): # ← YOUR text-to-speech (s16 PCM)
await voice.push(chunk) # resampled + paced to 48 kHz

asyncio.run(main())

The hooks (your_stt, your_llm, your_tts) are placeholders for whatever libraries or services you wire in — a streaming STT client, an LLM call, a TTS client that yields PCM. Swap them for the real thing; nothing about the SDK changes.

iter_frames exits cleanly when the track ends (the human hung up or dropped) — the async for just stops, no exception to catch. See the Media reference.

Sample-rate handling

WebRTC audio on the wire is 48 kHz stereo. Most STT/TTS stacks work at 16 kHz mono. AudioSource bridges the output side for you: construct it for your PCM format and it resamples to 48 kHz internally.

voice = AudioSource(input_rate=16_000)                  # push 16 kHz mono; SDK -> 48 kHz
voice = AudioSource(input_rate=24_000, input_layout="mono") # e.g. a 24 kHz TTS voice

Only "mono" and "stereo" are accepted for input_layout; anything else raises ValueError. If your TTS emits a format the helper doesn't cover, build an av.AudioFrame yourself and push() that — it carries its own rate/layout.

The inbound side is the reverse: frames from iter_frames arrive at 48 kHz. Downsample to whatever your STT expects (most STT clients accept a target rate, or resample with av / numpy before feeding). Don't assume 16 kHz on the way in.

Barge-in (let the human interrupt)

Natural conversation means the human can talk over the agent. When your STT detects fresh speech mid-reply, stop the current utterance immediately. AudioSource.end() terminates the track after draining what's buffered — so for barge-in, the clean pattern is one AudioSource per utterance: end the current one and attach a fresh one for the next reply.

async def speak(agent: MeteredPeer, reply: str) -> AudioSource:
voice = AudioSource(input_rate=16_000)
agent.add_track(voice, MediaStream(id="agent-voice"))
async for chunk in your_tts(reply):
await voice.push(chunk)
voice.end() # drain + stop this utterance
return voice

# On barge-in: stop the in-flight utterance and remove its track.
current = await speak(agent, reply)
# … STT detects the human interrupting …
current.end()
agent.remove_track(current) # stop sending the cut-off voice

end() is idempotent and push() is a no-op afterward, so a late TTS chunk racing the interrupt can't resurrect the stopped track. To keep latency tight (less buffered audio to abandon on a barge-in), construct with a smaller cap, e.g. AudioSource(input_rate=16_000, max_buffered_seconds=1.0).

Transcripts and control over a side channel

Audio is the media path; everything else (live transcripts, tool-call status, "agent is thinking…", a hang-up signal) rides a data path. Two options:

Server-routed messaging — simplest, works the instant a peer is known. Broadcast with peer.send(...) or target one peer with peer.send_to(peer_id, ...). The far side receives a Data event.

from metered_realtime import Data

@agent.on(Data)
def _(ev: Data) -> None:
if ev.data.get("type") == "hangup":
... # human ended the call

# stream the running transcript to everyone in the room
await agent.send({"type": "transcript", "role": "agent", "text": partial})

These go Peer → Metered server → Peer and count against your message quota, so they're ideal for low-rate control and transcript updates, not per-frame data.

A real data channel — peer-to-peer, lower latency, off the message quota. Use it for chatty, high-rate control. The browser human opens it; your agent receives it as a DataChannelOpened event and wraps the raw channel:

from metered_realtime import DataChannel, DCMessage, DataChannelOpened

@agent.on(PeerJoined)
def _(ev: PeerJoined) -> None:
@ev.peer.on(DataChannelOpened)
def _(opened: DataChannelOpened) -> None:
dc = DataChannel(opened.channel) # backpressure-aware wrapper

@dc.on(DCMessage)
def _(m: DCMessage) -> None:
handle_control(m.data)

Your agent can also open one toward a peer with ev.peer.create_data_channel("control"). See Data Channels for the wrapper's backpressure handling and the reconnect gotcha.

Rule of thumb: transcripts and occasional control → peer.send / peer.send_to; high-rate or latency-sensitive control → a data channel.

Interoperating with a browser human

The canonical setup is a Python agent ↔ a browser human, and it Just Works — the agent joins the same channel name the browser joins, and the SDK negotiates the connection. A few Python-specific notes:

  • You supply the agent's track. There's no microphone on a server; the AudioSource is the agent's "mic". The human's browser supplies theirs via getUserMedia on its side.
  • Group the agent's voice under a named stream (MediaStream(id="agent-voice")) so the browser can label it in its UI. The stream.id is stable across reconnects.
  • The human's audio is just a Track. Check t.track.kind == "audio" and ignore video/screen tracks if the human also shares those.

Many humans, one agent

add_track sends the agent's voice to every current and future peer in the room automatically — drop a second human into the channel and they hear the agent too, no extra code. On the inbound side you get one Track event per peer, so start one run_conversation (one STT loop) per human. If you want a single mixed transcript, tag each by ev.peer.id.

For a true multi-party agent (a meeting bot), this is the whole pattern: one outbound voice fanned out to all, one inbound STT loop per participant.

Long-running agents: reconnects

A voice agent is a long-lived process, not a page that a user can refresh. Configure reconnect for daemon-style operation:

from metered_realtime import MeteredPeer, ReconnectOptions

agent = MeteredPeer(
api_key="pk_live_…",
reconnect=ReconnectOptions(
max_attempts=float("inf"), # never give up — no one's there to restart it
max_delay=60.0, # cap backoff at 1 minute
),
)

The default max_attempts=100 is tuned for browsers; an unattended agent wants float("inf") so a transient cloud blip doesn't take it offline permanently. Across a reconnect the RemotePeer objects you hold stay valid — but inbound media arrives on a fresh track, so re-arm your iter_frames loop from the peer's Track event each time it fires (it re-fires after recovery). Listen for FatalError to catch terminal conditions (token revoked, account suspended) and alert your ops path rather than spinning silently.

For the full reconnect playbook, see Reconnect Best Practices.

Pitfalls

  1. Treating STT/LLM/TTS as part of the SDK. They aren't. The SDK moves audio and data; you bring the intelligence. Pick libraries that stream (incremental STT, token-streaming LLM, chunked TTS) so the loop stays low-latency.

  2. Blocking the event loop. A synchronous LLM or TTS call inside the iter_frames loop stalls all the agent's audio (the AudioSource will emit silence while you're blocked). Run blocking work in asyncio.to_thread(...) or use async clients.

  3. Forgetting end(). An AudioSource you never end() keeps the track live, emitting silence forever. End each utterance (or the whole source on shutdown) so the track terminates cleanly.

  4. Assuming 16 kHz inbound. Frames from iter_frames are 48 kHz. Downsample before feeding STT.

  5. Pushing TTS faster than real time without bounding latency. push() applies backpressure at max_buffered_seconds, so memory is safe — but a big buffer means a long delay before a barge-in actually goes quiet. Keep the cap small for conversational latency.

  6. Re-using one AudioSource after end(). It's terminal. Construct a fresh one for the next utterance (this is also what makes barge-in clean).

  7. Caching the inbound track across reconnects. Media re-arrives on a new track after recovery. Re-take it from the Track event each time rather than holding the old object.

See also