IoT Telemetry
Python runs on everything from a Raspberry Pi at the edge to a fleet-monitoring backend in the cloud, which makes it a natural fit for device telemetry. This guide covers the two shapes that matter:
- Pure pub/sub telemetry — devices publish sensor JSON to channels; backends and dashboards subscribe. Just
SignallingClient; no WebRTC, no per-peer state. - An edge camera bridge — a device with a camera streams live video to peers using
MeteredPeerandfrom_rtsp/from_camera.
Pub/sub telemetry needs only the base install; the camera bridge needs the webrtc extra:
pip install metered-realtime # pub/sub telemetry
pip install "metered-realtime[webrtc]" # + the camera bridge
Shape 1 — pub/sub telemetry
MQTT-style fan-in/fan-out. Devices publish telemetry to per-device channels; backend services subscribe to a wildcard for the firehose. To command one device, the backend publishes to that device's command channel, which the device subscribes to.
Devices Backend / dashboards
───────── ─────────────────────
[ Sensor 1 ]──publish──┐
[ Sensor 2 ]──publish──┼──► fleet/sensor-{id}/telemetry ──subscribe──► [ Dashboard ]
[ Sensor 3 ]──publish──┘ [ Alerter ]
[ Sensor 1 ]◄─subscribe── fleet/sensor-1/cmd ──publish──────────────── [ Control plane ]
Device side
A device publishes readings on a schedule and listens for commands — both on one SignallingClient connection.
import asyncio, time
from metered_realtime import SignallingClient, Connected, Message, ReconnectOptions
DEVICE_ID = "sensor-42"
async def main() -> None:
client = SignallingClient(
api_key="pk_live_…",
reconnect=ReconnectOptions(
max_attempts=float("inf"), # field devices never give up
max_delay=60.0, # cap backoff at 1 min
),
)
async with client: # connect on enter, close on exit
# Listen for commands directed at this device.
@client.on(Message)
def _(ev: Message) -> None:
if ev.channel == f"fleet/{DEVICE_ID}/cmd":
cmd = ev.data
if cmd.get("type") == "reboot":
reboot_device()
elif cmd.get("type") == "calibrate":
calibrate(cmd.get("params"))
await client.subscribe(f"fleet/{DEVICE_ID}/cmd")
# Publish telemetry on a schedule.
while True:
reading = read_sensor() # ← your sensor read
try:
await client.publish(f"fleet/{DEVICE_ID}/telemetry", {
"ts": int(time.time() * 1000),
"temp": reading["temperature"],
"humidity": reading["humidity"],
})
except Exception:
buffer_locally(reading) # reconnecting — buffer, flush later
await asyncio.sleep(5)
asyncio.run(main())
publish raises if the connection isn't connected (e.g. mid-reconnect), so wrap it and buffer locally — see Reliability below.
Backend / dashboard side
The backend subscribes to a wildcard for the whole fleet and reads the device id off ev.channel.
import re
from metered_realtime import SignallingClient, Message
_CHAN = re.compile(r"^fleet/(sensor-[^/]+)/telemetry$")
async def main() -> None:
client = SignallingClient(api_key="pk_live_…")
async with client:
@client.on(Message)
def _(ev: Message) -> None:
m = _CHAN.match(ev.channel) # ev.channel == "fleet/sensor-42/telemetry"
if m:
update_dashboard(m.group(1), ev.data)
await client.subscribe("fleet/sensor-*/telemetry") # firehose
# Send a command to one device by publishing to its command channel.
await client.publish("fleet/sensor-42/cmd", {"type": "reboot"})
Every inbound channel broadcast arrives as a Message with channel, sender_peer_id, and data. There's no separate per-channel callback — match on ev.channel to route.
Channels per connection
One SignallingClient connection can subscribe to many channels — a backend monitoring 10,000 devices typically uses a few wildcard subscriptions on a single connection, not one connection per device. A device, by contrast, usually needs just its own command channel. Most setups run one connection per process and split only if a burst of fast publishes starts delaying command receipt.
Directed messages with send
Channel publish is broadcast — every subscriber sees it. When the backend wants to reply to exactly one device process (not everyone on a channel), use a directed send to that peer's id:
@client.on(Message)
def _(ev: Message) -> None:
# ev.sender_peer_id is the server-stamped sender — reply straight to it.
asyncio.create_task(
client.send(ev.sender_peer_id, {"type": "ack", "for": ev.data.get("seq")})
)
client.send(peer_id, data) is server-routed point-to-point and surfaces on the recipient as a Direct event (subscribe with @client.on(Direct)). The peer id comes from the server envelope (ev.sender_peer_id) — never trust a from field inside the payload. (If you've modelled the fleet with MeteredPeer instead, the equivalent is peer.send_to(peer_id, data).)
Shape 2 — edge camera bridge
A device with a camera can stream live video into a room. The bridge ingests the camera (RTSP from an IP camera, or a locally-attached camera) and feeds the resulting video track to every peer with MeteredPeer. A dashboard joins the same channel and receives the video.
From an IP camera (RTSP)
from_rtsp handles most IP cameras. It returns a player whose .video / .audio are the contained tracks — either may be None, so check before add_track.
import asyncio
from metered_realtime import MeteredPeer, MediaStream, from_rtsp
ROOM = "front-door"
async def main() -> None:
peer = MeteredPeer(api_key="pk_live_…")
cam = from_rtsp("rtsp://camera.local:554/stream", transport="tcp")
if cam.video is not None:
peer.add_track(
cam.video,
MediaStream(id="front-door"),
{"role": "camera", "label": "Front Door"}, # metadata for the viewer's UI
)
if cam.audio is not None:
peer.add_track(cam.audio, MediaStream(id="front-door"))
try:
async with peer.joined(ROOM):
await asyncio.Future() # stream until cancelled
finally:
cam.video and cam.video.stop() # you own the player
cam.audio and cam.audio.stop()
asyncio.run(main())
add_track sends the feed to every current and future peer in the room — a dashboard that joins later starts receiving it automatically. The transport is the RTSP lower transport, one of {"tcp", "udp", "udp_multicast", "http"}; tcp is the most firewall-friendly. You own the player — stop its tracks on shutdown so the decoder/socket is released.
Don't pass an untrusted
url. FFmpeg picks the protocol/demuxer from the string, so a hostile value can read local files or reach internal URLs (SSRF). Validate before forwarding — see the Media reference.
From a locally-attached camera
If the camera is wired to the device itself, from_camera captures it directly:
from metered_realtime import from_camera, MediaStream
cam = from_camera(width=640, height=480, fps=15)
if cam.video is not None:
peer.add_track(cam.video, MediaStream(id="edge-cam"), {"role": "camera"})
Real device capture is platform- and FFmpeg-dependent (V4L2 on Linux, AVFoundation on macOS, DirectShow on Windows); device naming and availability vary by OS and build. from_rtsp / from_file are the portable options.
The encode CPU ceiling on small boards
This is the one to watch on edge hardware. The video the SDK sends is software-encoded (the camera's frames are decoded, then re-encoded for the WebRTC link). Software encode is CPU-heavy, and small SoCs — a Raspberry Pi, a low-power gateway — hit a ceiling fast. A 1080p30 stream that's trivial on a server can peg a Pi's CPU and stall the whole event loop.
Three ways to stay under the ceiling:
- Lower the resolution and frame rate.
from_camera(width=640, height=480, fps=15)encodes a fraction of the pixels of 1080p30. Start low and raise it only if the board has headroom. - Prefer
from_rtspand let the camera encode. Most IP cameras already produce an encoded stream. Pulling it over RTSP still means the SDK re-encodes for WebRTC, but ingesting RTSP on a more capable edge box (a small x86 mini-PC, a Jetson) rather than the camera's own SoC moves the encode cost off the constrained device. - Run the bridge on a gateway, not the sensor. Put the
MeteredPeer+from_rtspbridge on the most capable machine on the local network and have it ingest the camera's RTSP feed. The tiny sensors stay on pub/sub (Shape 1); only the gateway does video.
See the Media reference capture notes for the platform specifics.
Telemetry + video together
A camera device often sends both: a video feed and a stream of sensor readings (motion events, temperature, battery). Run both on the same MeteredPeer — add_track for the video, and peer.send(...) / peer.send_to(...) for the JSON telemetry, which lands on viewers as a Data event.
# alongside the camera track on the same peer
await peer.send({"type": "motion", "ts": int(time.time() * 1000), "zone": "porch"})
If a device only ever sends telemetry (no video), use Shape 1 — SignallingClient is smaller and has no WebRTC dependency.
Reliability — what to do when the network drops
Field devices lose connectivity constantly. The SDK's auto-reconnect handles re-establishing the socket and replaying your subscriptions; your code owns the data produced while disconnected.
Local buffering on disconnect
A publish while the client isn't connected raises. Buffer locally and flush on reconnect (the Connected event fires again with is_reconnect=True):
from metered_realtime import SignallingClient, Connected
buffer: list[dict] = []
async def safe_publish(client: SignallingClient, channel: str, data: dict) -> None:
if client.state != "connected":
buffer.append({"channel": channel, "data": data})
if len(buffer) > 10_000:
buffer.pop(0) # bound the buffer; drop oldest
return
try:
await client.publish(channel, data)
except Exception:
buffer.append({"channel": channel, "data": data})
@client.on(Connected)
def _(ev: Connected) -> None:
if ev.is_reconnect:
asyncio.create_task(flush(client))
async def flush(client: SignallingClient) -> None:
while buffer and client.state == "connected":
item = buffer[0]
try:
await client.publish(item["channel"], item["data"])
buffer.pop(0)
except Exception:
return # still flaky — try again next reconnect
This gives at-most-once delivery across transient drops. For at-least-once you'd dedup server-side on a messageId you include in each reading.
Daemon-style reconnect
reconnect=ReconnectOptions(max_attempts=float("inf"), max_delay=60.0)
The default max_attempts=100 exhausts on a long outage and takes the device offline permanently. Field devices want float("inf"); the 60-second max_delay cap keeps the device polite when the problem is server-side.
For the full reconnect playbook, see Reconnect Best Practices.
High-frequency telemetry
If you publish more than ~10 messages/sec/device:
Message quota. Every
publishcounts against your account's per-period message limit. A device publishing every 100 ms is 36,000 messages/hour. Make sure your plan covers fleet × frequency.Batch readings. Accumulate several readings client-side and publish them in one message — saves both quota and server CPU.
batch: list[dict] = []
# producer: batch.append(read_sensor()) every 100 ms
# flusher, every 1 s:
if batch:
await client.publish(f"fleet/{DEVICE_ID}/telemetry", {"readings": batch})
batch.clear()
Outbound size cap. A message can't exceed the server's
max_message_size(surfaced on theConnectedevent, default 64 KB). A large batch can overflow — split or compress.The SDK is not your storage layer. Messages are best-effort and ephemeral — a subscriber that's offline when a message is published misses it. Persist anything you can't lose to a real store (TimescaleDB, ClickHouse, …); the signalling layer is for live coordination, not history.
Pitfalls
max_attempts=100on a device. Defaults are tuned for browsers. Field devices needfloat("inf"), or a multi-hour outage takes them offline for good.No local buffering. Every publish during a disconnect raises and the reading is lost. If completeness matters, buffer and bound it.
Treating subscribers as durable. A subscriber coming online after a message was published doesn't see it. Persist what you can't afford to miss.
Streaming 1080p30 off a Raspberry Pi. Software encode pegs a small SoC's CPU and stalls the event loop. Lower resolution/fps, or run the bridge on a more capable edge box ingesting the camera's RTSP.
Not checking
.video/.audioforNone. A source missing that media returnsNonefor the track; passingNonetoadd_trackfails. Guard before adding.Leaking the player. A
from_rtsp/from_cameraplayer you never stop holds the decoder/socket open. Stop its tracks on shutdown.Trusting a
fromfield in the payload for device identity. Use the server-stampedev.sender_peer_id(and the device's token-asserted peer id), not a self-reported field insidedata.
See also
SignallingClientreference —subscribe,publish,send, theMessage/Directevents- Media reference —
from_rtsp,from_camera, and the capture notes MeteredPeerreference —add_track/send_tofor the camera bridge- Reconnect Best Practices — daemon-style settings for field devices