IoT Telemetry & Device Control
AI cameras posting detection events, fleet vehicles streaming telematics, smart-home devices receiving commands. MQTT-style pub/sub semantics over WebSocket, with the simplicity of a single HTTPS-style endpoint and JWT auth.
The shape
┌──────────────┐ ┌──────────────┐
│ DEVICE │ publish telemetry ─────→ │ Metered │
│ (camera / │ on device's own │ Realtime │
│ vehicle / │ channel │ Messaging │
│ thermostat)│ └──────────────┘
└──────────────┘ ▲
▲ │ subscribe
│ receive commands on │ (or REST publish)
│ fleet/group channel │
│ │
│ ┌──────────────┐
└──────────────────────────────────│ YOUR │
│ BACKEND │
│ (control │
│ plane) │
└──────────────┘
Per device:
- Telemetry out → publish to
devices/${deviceId}/events - Commands in → subscribe to
devices/${deviceId}/commands(or a fleet-widefleets/${fleetId}/commands)
Per backend:
- Aggregate by subscribing to
devices/*/eventswith a wildcard-permitted sk_ - Command via REST publish to
devices/${deviceId}/commands— no WS connection needed in the control-plane service
Step 1 — Device-side JWT with narrow scope
Each device gets its own JWT, scoped to its own channels only:
device-token-server.py — your provisioning backend
import jwt
import time
import requests
METERED_REALTIME_SK = "sk_live_..."
def mint_device_token(device_id, fleet_id):
# Channels:
# devices/<id>/events — device PUBLISHES telemetry here
# devices/<id>/commands — device SUBSCRIBES for direct commands
# fleets/<fleet>/commands — device SUBSCRIBES for fleet broadcasts
channels = [
f"devices/{device_id}/events",
f"devices/{device_id}/commands",
f"fleets/{fleet_id}/commands",
]
# Devices only need publish (events) + subscribe (commands).
permissions = ["publish", "subscribe"]
payload = {
"sub": f"device-{device_id}",
"exp": int(time.time()) + 3600,
"channels": channels,
"permissions": permissions,
"peerMetadata": {
"deviceId": device_id,
"fleetId": fleet_id,
"firmware": "v3.2.1",
"location": "us-east-1a",
},
}
return jwt.encode(
payload, METERED_REALTIME_SK.split("_secret_")[1],
algorithm="HS256",
headers={"kid": METERED_REALTIME_SK.split("_secret_")[0].replace("sk_id", "sk_id")},
)
In production you'd:
- Use
POST /v1/tokensinstead of self-signing (offloads JWT cert management to Metered) - Cache the token on the device for as close to
expas you're comfortable with - Refresh proactively before exp (don't wait for a
4002 token_expiredclose)
The sk_ key's channelPatterns should cover devices/* and fleets/* so this mint succeeds.
Step 2 — Device publishes telemetry
device.py — running on the camera / vehicle / thermostat
import websocket
import json
import threading
token = mint_device_token("camera-001", "fleet-northeast")
ws = websocket.WebSocketApp(
f"wss://rms.metered.ca/v1?token={token}",
on_message=on_message,
on_open=on_open,
)
def on_open(ws):
# Subscribe to direct and fleet-wide command channels.
ws.send(json.dumps({
"type": "subscribe",
"channel": "devices/camera-001/commands",
"requestId": "sub-cmd-direct",
}))
ws.send(json.dumps({
"type": "subscribe",
"channel": "fleets/fleet-northeast/commands",
"requestId": "sub-cmd-fleet",
}))
def publish_telemetry(event):
ws.send(json.dumps({
"type": "publish",
"channel": "devices/camera-001/events",
"data": event,
}))
def on_message(ws, raw):
msg = json.loads(raw)
if msg["type"] == "message" and msg["channel"].endswith("/commands"):
handle_command(msg["data"])
if msg["type"] == "error" and msg["code"] == "over_message_quota":
# Plan exhausted; back off telemetry until next period
pass
def handle_command(cmd):
if cmd["cmd"] == "reboot":
os.system("reboot")
elif cmd["cmd"] == "set-frame-rate":
camera.set_frame_rate(cmd["fps"])
# Trigger on motion detection
def on_motion_detected(box):
publish_telemetry({
"kind": "motion",
"box": box,
"timestamp": time.time(),
})
ws.run_forever()
Step 3 — Backend aggregates telemetry
A small Node.js service subscribed to all devices:
ingester.js — Node.js backend
const WebSocket = require("ws");
const fetch = require("node-fetch");
async function startIngester() {
// Mint a backend token with WILDCARD scope.
const resp = await fetch("https://rms.metered.ca/v1/tokens", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
peerId: "ingester-001",
channels: ["devices/*/events"],
permissions: ["subscribe"],
expiresInSec: 86400,
}),
});
const { token } = await resp.json();
// NB: for `channels: ["devices/*/events"]` to be accepted, the sk_'s
// channelPatterns must include "devices/*/events" or a wider pattern.
const ws = new WebSocket(`wss://rms.metered.ca/v1?token=${token}`);
ws.on("message", (raw) => {
const msg = JSON.parse(raw.toString());
if (msg.type === "welcome") {
// Subscribe to all device-event channels with a wildcard.
// The wildcard MUST be authorized in the JWT's channels claim.
// Here we'd need to subscribe to each device's channel one
// by one, OR subscribe to a single channel-aggregator
// pattern via your sk_'s channelPatterns config.
}
if (msg.type === "message") {
const deviceId = msg.channel.match(/^devices\/([^/]+)\/events$/)?.[1];
persistEvent(deviceId, msg.from, msg.data);
}
});
}
Wildcard subscribe — current limitations
Channel-level subscribe today is exact-match; wildcard subscribe is on the roadmap. Until then, the typical ingester pattern is:
- Subscribe to per-device channels enumerated from your device registry
- Or have devices publish to a single shared aggregator channel (
fleets/${fleetId}/events) and have the ingester subscribe there
Step 4 — Backend issues a command
control-plane.js — issuing a command, no WS required
async function rebootDevice(deviceId) {
await fetch(
`https://rms.metered.ca/v1/channels/${encodeURIComponent(`devices/${deviceId}/commands`)}/publish`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
data: { cmd: "reboot", reason: "operator-initiated" },
from: "control-plane",
}),
},
);
}
async function adjustFleetFrameRate(fleetId, fps) {
// Broadcast to every device in the fleet via the shared command channel.
await fetch(
`https://rms.metered.ca/v1/channels/${encodeURIComponent(`fleets/${fleetId}/commands`)}/publish`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
data: { cmd: "set-frame-rate", fps },
from: "control-plane",
}),
},
);
}
The REST publish reaches every device subscribed to the channel. For a 1000-device fleet, that's one HTTPS request and 1000 deliveries.
Why this fits the IoT use case
| Problem | How Realtime Messaging solves it |
|---|---|
| Devices behind NAT need to receive commands without polling | Long-lived WebSocket; commands arrive as message events instantly |
| Authentication per device | JWT per device with narrow channelPatterns — leaked credential leaks only that one device's scope |
| Backplane fan-out from control plane | POST /v1/channels/.../publish from your backend — no need for the control-plane service to maintain its own WS |
| Cost-correlation per device | Devices publish on per-device channels; you know exactly which device's telemetry costs what |
| Soft-fail behavior when a device is over its plan limits | Soft drops via over_message_quota error; device keeps receiving commands even when it can't publish telemetry |
See also
- Authentication — narrow
channelPatternsfor per-device tokens - REST API → Channels — server-side publish for commands
- Rate Limits — abuse bounds for chatty devices