Skip to main content

IoT Telemetry & Device Control

AI cameras posting detection events, fleet vehicles streaming telematics, smart-home devices receiving commands. MQTT-style pub/sub semantics over WebSocket, with the simplicity of a single HTTPS-style endpoint and JWT auth.

The shape

┌──────────────┐                          ┌──────────────┐
│ DEVICE │ publish telemetry ─────→ │ Metered │
│ (camera / │ on device's own │ Realtime │
│ vehicle / │ channel │ Messaging │
│ thermostat)│ └──────────────┘
└──────────────┘ ▲
▲ │ subscribe
│ receive commands on │ (or REST publish)
│ fleet/group channel │
│ │
│ ┌──────────────┐
└──────────────────────────────────│ YOUR │
│ BACKEND │
│ (control │
│ plane) │
└──────────────┘

Per device:

  • Telemetry out → publish to devices/${deviceId}/events
  • Commands in → subscribe to devices/${deviceId}/commands (or a fleet-wide fleets/${fleetId}/commands)

Per backend:

  • Aggregate by subscribing to devices/*/events with a wildcard-permitted sk_
  • Command via REST publish to devices/${deviceId}/commands — no WS connection needed in the control-plane service

Step 1 — Device-side JWT with narrow scope

Each device gets its own JWT, scoped to its own channels only:

device-token-server.py — your provisioning backend
import jwt
import time
import requests

METERED_REALTIME_SK = "sk_live_..."

def mint_device_token(device_id, fleet_id):
# Channels:
# devices/<id>/events — device PUBLISHES telemetry here
# devices/<id>/commands — device SUBSCRIBES for direct commands
# fleets/<fleet>/commands — device SUBSCRIBES for fleet broadcasts
channels = [
f"devices/{device_id}/events",
f"devices/{device_id}/commands",
f"fleets/{fleet_id}/commands",
]
# Devices only need publish (events) + subscribe (commands).
permissions = ["publish", "subscribe"]

payload = {
"sub": f"device-{device_id}",
"exp": int(time.time()) + 3600,
"channels": channels,
"permissions": permissions,
"peerMetadata": {
"deviceId": device_id,
"fleetId": fleet_id,
"firmware": "v3.2.1",
"location": "us-east-1a",
},
}
return jwt.encode(
payload, METERED_REALTIME_SK.split("_secret_")[1],
algorithm="HS256",
headers={"kid": METERED_REALTIME_SK.split("_secret_")[0].replace("sk_id", "sk_id")},
)

In production you'd:

  • Use POST /v1/tokens instead of self-signing (offloads JWT cert management to Metered)
  • Cache the token on the device for as close to exp as you're comfortable with
  • Refresh proactively before exp (don't wait for a 4002 token_expired close)

The sk_ key's channelPatterns should cover devices/* and fleets/* so this mint succeeds.


Step 2 — Device publishes telemetry

device.py — running on the camera / vehicle / thermostat
import websocket
import json
import threading

token = mint_device_token("camera-001", "fleet-northeast")
ws = websocket.WebSocketApp(
f"wss://rms.metered.ca/v1?token={token}",
on_message=on_message,
on_open=on_open,
)

def on_open(ws):
# Subscribe to direct and fleet-wide command channels.
ws.send(json.dumps({
"type": "subscribe",
"channel": "devices/camera-001/commands",
"requestId": "sub-cmd-direct",
}))
ws.send(json.dumps({
"type": "subscribe",
"channel": "fleets/fleet-northeast/commands",
"requestId": "sub-cmd-fleet",
}))

def publish_telemetry(event):
ws.send(json.dumps({
"type": "publish",
"channel": "devices/camera-001/events",
"data": event,
}))

def on_message(ws, raw):
msg = json.loads(raw)
if msg["type"] == "message" and msg["channel"].endswith("/commands"):
handle_command(msg["data"])
if msg["type"] == "error" and msg["code"] == "over_message_quota":
# Plan exhausted; back off telemetry until next period
pass

def handle_command(cmd):
if cmd["cmd"] == "reboot":
os.system("reboot")
elif cmd["cmd"] == "set-frame-rate":
camera.set_frame_rate(cmd["fps"])

# Trigger on motion detection
def on_motion_detected(box):
publish_telemetry({
"kind": "motion",
"box": box,
"timestamp": time.time(),
})

ws.run_forever()

Step 3 — Backend aggregates telemetry

A small Node.js service subscribed to all devices:

ingester.js — Node.js backend
const WebSocket = require("ws");
const fetch = require("node-fetch");

async function startIngester() {
// Mint a backend token with WILDCARD scope.
const resp = await fetch("https://rms.metered.ca/v1/tokens", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
peerId: "ingester-001",
channels: ["devices/*/events"],
permissions: ["subscribe"],
expiresInSec: 86400,
}),
});
const { token } = await resp.json();
// NB: for `channels: ["devices/*/events"]` to be accepted, the sk_'s
// channelPatterns must include "devices/*/events" or a wider pattern.

const ws = new WebSocket(`wss://rms.metered.ca/v1?token=${token}`);

ws.on("message", (raw) => {
const msg = JSON.parse(raw.toString());
if (msg.type === "welcome") {
// Subscribe to all device-event channels with a wildcard.
// The wildcard MUST be authorized in the JWT's channels claim.
// Here we'd need to subscribe to each device's channel one
// by one, OR subscribe to a single channel-aggregator
// pattern via your sk_'s channelPatterns config.
}
if (msg.type === "message") {
const deviceId = msg.channel.match(/^devices\/([^/]+)\/events$/)?.[1];
persistEvent(deviceId, msg.from, msg.data);
}
});
}
Wildcard subscribe — current limitations

Channel-level subscribe today is exact-match; wildcard subscribe is on the roadmap. Until then, the typical ingester pattern is:

  • Subscribe to per-device channels enumerated from your device registry
  • Or have devices publish to a single shared aggregator channel (fleets/${fleetId}/events) and have the ingester subscribe there

Step 4 — Backend issues a command

control-plane.js — issuing a command, no WS required
async function rebootDevice(deviceId) {
await fetch(
`https://rms.metered.ca/v1/channels/${encodeURIComponent(`devices/${deviceId}/commands`)}/publish`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
data: { cmd: "reboot", reason: "operator-initiated" },
from: "control-plane",
}),
},
);
}

async function adjustFleetFrameRate(fleetId, fps) {
// Broadcast to every device in the fleet via the shared command channel.
await fetch(
`https://rms.metered.ca/v1/channels/${encodeURIComponent(`fleets/${fleetId}/commands`)}/publish`,
{
method: "POST",
headers: {
Authorization: `Bearer ${process.env.METERED_REALTIME_SK}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
data: { cmd: "set-frame-rate", fps },
from: "control-plane",
}),
},
);
}

The REST publish reaches every device subscribed to the channel. For a 1000-device fleet, that's one HTTPS request and 1000 deliveries.


Why this fits the IoT use case

ProblemHow Realtime Messaging solves it
Devices behind NAT need to receive commands without pollingLong-lived WebSocket; commands arrive as message events instantly
Authentication per deviceJWT per device with narrow channelPatterns — leaked credential leaks only that one device's scope
Backplane fan-out from control planePOST /v1/channels/.../publish from your backend — no need for the control-plane service to maintain its own WS
Cost-correlation per deviceDevices publish on per-device channels; you know exactly which device's telemetry costs what
Soft-fail behavior when a device is over its plan limitsSoft drops via over_message_quota error; device keeps receiving commands even when it can't publish telemetry

See also