Skip to main content

AI Agent Communication

Multi-agent systems, agent ↔ user streams, tool-call relays, coordinator-worker patterns. The SDK is a good fit because agents are often long-lived processes that need to discover each other, broadcast intermediate state, and recover from disconnects without losing context.

This guide walks through a few common topologies. They compose — you can mix and match.

Pattern 1 — Coordinator dispatches to workers

One coordinator delegates tasks to a pool of worker agents. Each worker subscribes to its own task channel; the coordinator publishes to that channel to dispatch.

worker.js (one per worker process)
import { SignallingClient } from "@metered-ca/peer";

const WORKER_ID = `worker_${process.env.WORKER_INDEX}`;

const client = new SignallingClient({
tokenProvider: async () => mintJwtForWorker(WORKER_ID),
});

await client.connect();
await client.subscribe(`tasks/${WORKER_ID}`);

client.on("message", async ({ data }) => {
if (data.type === "task") {
const result = await runTask(data.prompt);
// Send result back via direct send to the coordinator
await client.send(data.from, { type: "task-result", taskId: data.taskId, result });
}
});
coordinator.js
import { SignallingClient } from "@metered-ca/peer";

const client = new SignallingClient({
tokenProvider: async () => mintJwtForCoordinator(),
});

await client.connect();
await client.subscribe("results");

// Dispatch
async function dispatch(workerId, prompt) {
const taskId = crypto.randomUUID();
await client.publish(`tasks/${workerId}`, {
type: "task",
taskId,
prompt,
from: "coordinator",
});
return taskId;
}

client.on("direct", ({ from, data }) => {
if (data.type === "task-result") {
handleResult(data.taskId, data.result, from);
}
});

dispatch("worker_3", "summarize this document: …");

JWT setup: workers have channels: ["tasks/${WORKER_ID}"] (subscribe to their own queue); the coordinator has channels: ["tasks/**"] (can publish to any worker).

Pattern 2 — Streaming tokens to the user (LLM-style)

An agent streams partial generations to one user while it's still running. Use client.send(userPeerId, chunk) (SignallingClient's wire-faithful directed send) — server-routed, works before the agent and user are mutually aware.

agent.js
import { SignallingClient } from "@metered-ca/peer";

const agent = new SignallingClient({ tokenProvider });
await agent.connect();

async function streamCompletionTo(userPeerId, prompt) {
for await (const chunk of llmStream(prompt)) {
await agent.send(userPeerId, {
type: "completion-chunk",
content: chunk,
done: false,
});
}
await agent.send(userPeerId, { type: "completion-chunk", content: "", done: true });
}
user.js
import { SignallingClient } from "@metered-ca/peer";

const client = new SignallingClient({ tokenProvider });
await client.connect();

let buffer = "";
client.on("direct", ({ from, data }) => {
if (data.type === "completion-chunk") {
if (data.done) {
finalize(buffer);
buffer = "";
} else {
buffer += data.content;
renderPartial(buffer);
}
}
});

// Ask an agent to start streaming
await client.send("agent_42", { type: "ask", prompt: "..." });

The user doesn't need to subscribe to anything — direct messages arrive on the user's connection by peerId. The agent identifies the user from a registry / their JWT's sub claim / from-field in a request.

Pattern 3 — Shared agent state via pub/sub

Multiple agents in the same "team" subscribe to a shared channel and broadcast intermediate state. Useful for parallel agents that coordinate without a central manager.

const client = new SignallingClient({ tokenProvider });
await client.connect();
await client.subscribe("team_42/state");

client.on("message", ({ from, data }) => {
if (data.type === "claim") {
if (claimedItems.has(data.itemId)) {
// Another agent already took it. Tell them.
client.publish("team_42/state", {
type: "reject",
itemId: data.itemId,
from: ourId,
});
} else {
claimedItems.add(data.itemId);
}
}
});

async function tryClaim(itemId) {
await client.publish("team_42/state", { type: "claim", itemId, from: ourId });
// Wait for rejects, settle, decide
}

This is best-effort coordination — for strong consistency, use a separate store (Redis, Firestore). The signalling layer is for fast, ephemeral coordination.

Pattern 4 — User ↔ agent presence

Show the user which agents are online + responsive. MeteredPeer with peerMetadata gives you exactly this.

// User side
const peer = new MeteredPeer({ tokenProvider });

peer.on("peer-joined", ({ peer: remote }) => {
if (remote.metadata?.kind === "agent") {
addAgentToSidebar(remote.id, remote.metadata.name);
}
});

peer.on("peer-left", ({ peer: remote }) => {
removeAgentFromSidebar(remote.id);
});

await peer.join("agents-lobby");

Agents mint JWTs with peerMetadata: { kind: "agent", name: "Code Reviewer Agent", capabilities: ["typescript", "react"] }. The user's sidebar populates automatically as agents come online.

Choosing between SignallingClient and MeteredPeer

PatternUse
One-to-many dispatch (coordinator → workers)SignallingClient — multiple channels per connection
Bidirectional streaming between known peersSignallingClient (cleaner) or MeteredPeer (if presence matters)
Shared coordination (multiple agents in a team channel)SignallingClient
Showing "which agents are online" in a UIMeteredPeer (uses peer-joined / peer-left)

For pure pub/sub agent infrastructure (no UI presence), SignallingClient is enough. Add MeteredPeer when you need per-peer state.

Reconnect handling for long-running agents

Agents run for hours or days — far longer than a typical browser session. A few production considerations:

const client = new SignallingClient({
tokenProvider,
reconnect: {
maxAttempts: Infinity, // never give up — agents are daemon-style
maxDelayMs: 60_000, // cap backoff at 1 minute
},
});

Set maxAttempts: Infinity so a cloud-provider blip doesn't take an agent offline permanently. The default 100 attempts works for browsers (a user can refresh); a server-side agent has no one to refresh it.

Listen for disconnected with willReconnect: false and log it loudly — that's the SDK telling you something terminal happened (token revoked, account suspended). Don't silently keep running.

client.on("disconnected", ({ code, willReconnect }) => {
if (!willReconnect) {
console.error("agent disconnected terminally:", code);
sendAlertToOpsTeam();
process.exit(1); // let your orchestrator restart it
}
});

For the full reconnect playbook, see Reconnect Best Practices.

Pitfalls

  1. Treating peer.send as guaranteed delivery. It's at-most-once — the server acks delivery to the channel, but doesn't guarantee the target peer is online to receive. If the target is offline, you get peer_not_found for direct sends or just no listener for channel publishes. For guaranteed delivery between agents, persist tasks in a queue (Redis Streams, BullMQ, RabbitMQ) and use messaging only for "wake up, there's work" notifications.

  2. Not handling peer_not_found. If you dispatch to an agent that's offline, the direct send rejects. Don't silently drop — either retry, queue, or surface to your orchestrator.

  3. Streaming partial completions without sequence numbers. Server-routed messages preserve order per (sender, receiver) pair, but if you stream from multiple agent processes the user can't tell them apart. Add a streamId and seq to each chunk.

  4. Authentication shared across agents. Each agent process should have its own JWT with its own sub (peerId) so the server can route directs to it. Don't reuse one token across multiple agent processes — they'd race for direct messages.

  5. Token expiry on long-lived agents. If your agent runs for days, the JWT must refresh. Use tokenProvider (the SDK calls it on every reconnect) — apiKey doesn't refresh.

  6. Subscribing to wildcard channels you don't need. ** subscribers see every message, which counts as ingress traffic. Subscribe to the narrowest pattern that covers your needs.

  7. Building a queue out of broadcast channels. If you publish("tasks", task) and 10 worker agents are subscribed, all 10 see the task. That's broadcast, not queue. For "one worker picks up the task," use directed sends with explicit dispatch logic (Pattern 1 above) or back the work queue with Redis Streams / similar.

See also