Skip to main content

AI Agent Communication

Multi-agent coordination, agent-to-user streaming, tool-call relay, and human-in-the-loop interventions — all over the same WebSocket protocol.

Why over HTTP?

HTTP is request-response. AI agents need to:

  • Receive without polling — a researcher agent waiting for the next sub-task shouldn't poll a queue every second.
  • Stream long-running work — tool calls, intermediate findings, partial reasoning steps need to flow back to the user in real time, not after the agent's done.
  • Coordinate without a central queue — agents talk to each other directly via channels (planner ↔ researchers ↔ writer), without staging through a backend message bus.
  • Tolerate long idle periods — agents are connected for the lifetime of a workflow run, often minutes to hours.

WebSocket + channels handles all four. HTTP doesn't.

Topologies

Pattern A — Planner fans out to specialists

                   ┌──────────┐
│ Planner │
└─────┬────┘
│ publish: subtask

┌──────────────────────────────────┐
│ workflows/run-xyz/subtasks │ ← channel
└────┬───────────┬───────────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Researcher│ │ Writer │ │ Reviewer │
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
└────────────┼────────────┘
│ send (direct): finding

┌──────────┐
│ Planner │ ← same instance
└──────────┘
  • Subtasks broadcast on workflows/run-xyz/subtasks so any researcher can pick one up.
  • Findings return via send (direct) to the planner — 1:1, no fan-out cost.

Pattern B — Agent streams to user

┌──────────┐  subscribe:               ┌──────────┐
│ USER │ user-uid-123/inbox ────→ │ Realtime │
│ BROWSER │ │ Messaging│
└──────────┘ └──────────┘

│ publish (or send)

┌──────────┐ agent-run-uid-123 │
│ AGENT │ ───────────────────────────────┘
│ (server) │
└──────────┘
  • The agent runs in your backend; its peerId is something like agent-${runId}.
  • The user's browser subscribes to user-uid-123/inbox.
  • The agent publishes progress / partial results to the user's inbox channel. Or sends directly using the user's peerId if they're known.

peerMetadata for agents

Agents have richer "identity" than users. Stamp it via peerMetadata so other agents and humans see what they're dealing with:

{
"peerId": "agent-researcher-001",
"peerMetadata": {
"agentName": "researcher-001",
"model": "claude-opus-4-7",
"capabilities": ["web-search", "arxiv-lookup", "summarize"],
"ownerUserId": "u_alice_123",
"workflowRunId": "run-xyz",
"role": "researcher"
}
}

When the planner sees presence.joined, it learns immediately which capabilities are available without a backend roundtrip:

ws.on("message", raw => {
const msg = JSON.parse(raw.toString());
if (msg.type === "presence" && msg.channel === "workflows/run-xyz/agents") {
for (const peer of msg.joined) {
const caps = peer.metadata?.capabilities ?? [];
registerAgent(peer.peerId, caps);
}
for (const peer of msg.left) {
unregisterAgent(peer.peerId);
}
}
});

Stream a long-running tool call back to the user

agent.js
const ws = new WebSocket(`wss://rms.metered.ca/v1?token=${agentToken}`);

async function runTool(toolName, args, userPeerId) {
// Tell the user we started.
ws.send(JSON.stringify({
type: "send",
to: userPeerId,
data: { kind: "tool-start", tool: toolName, args },
requestId: `tool-${toolName}-start`,
}));

let chunkIdx = 0;
for await (const chunk of streamFromTool(toolName, args)) {
// Each chunk: a finding, a search result, a partial answer.
ws.send(JSON.stringify({
type: "send",
to: userPeerId,
data: { kind: "tool-chunk", tool: toolName, idx: chunkIdx++, chunk },
requestId: `tool-${toolName}-chunk-${chunkIdx}`,
}));
}

ws.send(JSON.stringify({
type: "send",
to: userPeerId,
data: { kind: "tool-end", tool: toolName, chunksEmitted: chunkIdx },
requestId: `tool-${toolName}-end`,
}));
}

The user's browser, listening for direct events, renders progress live. No polling, no buffering.


Human-in-the-loop interventions

When an agent needs a human decision mid-run, it sends a direct message to the user requesting intervention. The user's reply (also a send) unblocks the agent.

agent waiting for intervention
async function askUserPermission(userPeerId, action) {
const requestId = `intervention-${Date.now()}`;
ws.send(JSON.stringify({
type: "send",
to: userPeerId,
data: { kind: "intervention-request", action },
requestId,
}));

// Wait for the user's reply, identified by an `inReplyTo` field on
// the direct message.
return new Promise((resolve) => {
const onMsg = (raw) => {
const msg = JSON.parse(raw.toString());
if (msg.type === "direct" && msg.from === userPeerId
&& msg.data?.kind === "intervention-reply"
&& msg.data?.inReplyTo === requestId) {
ws.off("message", onMsg);
resolve(msg.data.decision);
}
};
ws.on("message", onMsg);
});
}

This works because send is bidirectional — the user's browser sees a direct event, renders a "Approve / Deny" UI, and sends a reply back to the agent's peerId.


Why server-side publish via REST is helpful for workflow services

Your workflow orchestrator probably runs in a backend HTTP service (Inngest, Trigger.dev, Durable Objects, etc.). It doesn't need a WebSocket connection of its own — it can publish to channels via REST:

workflow orchestrator (HTTP service, no WS)
async function notifyAgentsOfNewTask(runId, task) {
await fetch(
`https://rms.metered.ca/v1/channels/${encodeURIComponent(`workflows/${runId}/subtasks`)}/publish`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
data: { kind: "new-subtask", task },
from: "orchestrator",
}),
},
);
}

Every agent subscribed to workflows/run-xyz/subtasks sees it. The orchestrator doesn't maintain its own WebSocket connection — fewer moving parts in your backend.


Token-bucket considerations for agents

A chatty agent doing many tool calls per second can hit the per-connection token bucket (100 msg/sec sustained). For agents that need to emit very fast (e.g., streaming dozens of intermediate-reasoning chunks per second), consider:

  • Batching chunks before send. 20 chunks at 50ms intervals → 1 batched message per 1s.
  • Per-plan rate limit tuning — contact sales if your sustained rate genuinely needs to be higher.
  • Multiple agent connections — split work across several agents each with their own connection (and their own token bucket).

See also