Skip to main content

IoT Telemetry

Python runs on everything from a Raspberry Pi at the edge to a fleet-monitoring backend in the cloud, which makes it a natural fit for device telemetry. This guide covers the two shapes that matter:

  1. Pure pub/sub telemetry — devices publish sensor JSON to channels; backends and dashboards subscribe. Just SignallingClient; no WebRTC, no per-peer state.
  2. An edge camera bridge — a device with a camera streams live video to peers using MeteredPeer and from_rtsp / from_camera.

Pub/sub telemetry needs only the base install; the camera bridge needs the webrtc extra:

pip install metered-realtime              # pub/sub telemetry
pip install "metered-realtime[webrtc]" # + the camera bridge

Shape 1 — pub/sub telemetry

MQTT-style fan-in/fan-out. Devices publish telemetry to per-device channels; backend services subscribe to a wildcard for the firehose. To command one device, the backend publishes to that device's command channel, which the device subscribes to.

Devices                              Backend / dashboards
───────── ─────────────────────
[ Sensor 1 ]──publish──┐
[ Sensor 2 ]──publish──┼──► fleet/sensor-{id}/telemetry ──subscribe──► [ Dashboard ]
[ Sensor 3 ]──publish──┘ [ Alerter ]
[ Sensor 1 ]◄─subscribe── fleet/sensor-1/cmd ──publish──────────────── [ Control plane ]

Device side

A device publishes readings on a schedule and listens for commands — both on one SignallingClient connection.

device.py
import asyncio, time
from metered_realtime import SignallingClient, Connected, Message, ReconnectOptions

DEVICE_ID = "sensor-42"

async def main() -> None:
client = SignallingClient(
api_key="pk_live_…",
reconnect=ReconnectOptions(
max_attempts=float("inf"), # field devices never give up
max_delay=60.0, # cap backoff at 1 min
),
)
async with client: # connect on enter, close on exit
# Listen for commands directed at this device.
@client.on(Message)
def _(ev: Message) -> None:
if ev.channel == f"fleet/{DEVICE_ID}/cmd":
cmd = ev.data
if cmd.get("type") == "reboot":
reboot_device()
elif cmd.get("type") == "calibrate":
calibrate(cmd.get("params"))

await client.subscribe(f"fleet/{DEVICE_ID}/cmd")

# Publish telemetry on a schedule.
while True:
reading = read_sensor() # ← your sensor read
try:
await client.publish(f"fleet/{DEVICE_ID}/telemetry", {
"ts": int(time.time() * 1000),
"temp": reading["temperature"],
"humidity": reading["humidity"],
})
except Exception:
buffer_locally(reading) # reconnecting — buffer, flush later
await asyncio.sleep(5)

asyncio.run(main())

publish raises if the connection isn't connected (e.g. mid-reconnect), so wrap it and buffer locally — see Reliability below.

Backend / dashboard side

The backend subscribes to a wildcard for the whole fleet and reads the device id off ev.channel.

dashboard.py
import re
from metered_realtime import SignallingClient, Message

_CHAN = re.compile(r"^fleet/(sensor-[^/]+)/telemetry$")

async def main() -> None:
client = SignallingClient(api_key="pk_live_…")
async with client:
@client.on(Message)
def _(ev: Message) -> None:
m = _CHAN.match(ev.channel) # ev.channel == "fleet/sensor-42/telemetry"
if m:
update_dashboard(m.group(1), ev.data)

await client.subscribe("fleet/sensor-*/telemetry") # firehose

# Send a command to one device by publishing to its command channel.
await client.publish("fleet/sensor-42/cmd", {"type": "reboot"})

Every inbound channel broadcast arrives as a Message with channel, sender_peer_id, and data. There's no separate per-channel callback — match on ev.channel to route.

Channels per connection

One SignallingClient connection can subscribe to many channels — a backend monitoring 10,000 devices typically uses a few wildcard subscriptions on a single connection, not one connection per device. A device, by contrast, usually needs just its own command channel. Most setups run one connection per process and split only if a burst of fast publishes starts delaying command receipt.

Directed messages with send

Channel publish is broadcast — every subscriber sees it. When the backend wants to reply to exactly one device process (not everyone on a channel), use a directed send to that peer's id:

@client.on(Message)
def _(ev: Message) -> None:
# ev.sender_peer_id is the server-stamped sender — reply straight to it.
asyncio.create_task(
client.send(ev.sender_peer_id, {"type": "ack", "for": ev.data.get("seq")})
)

client.send(peer_id, data) is server-routed point-to-point and surfaces on the recipient as a Direct event (subscribe with @client.on(Direct)). The peer id comes from the server envelope (ev.sender_peer_id) — never trust a from field inside the payload. (If you've modelled the fleet with MeteredPeer instead, the equivalent is peer.send_to(peer_id, data).)

Shape 2 — edge camera bridge

A device with a camera can stream live video into a room. The bridge ingests the camera (RTSP from an IP camera, or a locally-attached camera) and feeds the resulting video track to every peer with MeteredPeer. A dashboard joins the same channel and receives the video.

From an IP camera (RTSP)

from_rtsp handles most IP cameras. It returns a player whose .video / .audio are the contained tracks — either may be None, so check before add_track.

camera_bridge.py
import asyncio
from metered_realtime import MeteredPeer, MediaStream, from_rtsp

ROOM = "front-door"

async def main() -> None:
peer = MeteredPeer(api_key="pk_live_…")

cam = from_rtsp("rtsp://camera.local:554/stream", transport="tcp")
if cam.video is not None:
peer.add_track(
cam.video,
MediaStream(id="front-door"),
{"role": "camera", "label": "Front Door"}, # metadata for the viewer's UI
)
if cam.audio is not None:
peer.add_track(cam.audio, MediaStream(id="front-door"))

try:
async with peer.joined(ROOM):
await asyncio.Future() # stream until cancelled
finally:
cam.video and cam.video.stop() # you own the player
cam.audio and cam.audio.stop()

asyncio.run(main())

add_track sends the feed to every current and future peer in the room — a dashboard that joins later starts receiving it automatically. The transport is the RTSP lower transport, one of {"tcp", "udp", "udp_multicast", "http"}; tcp is the most firewall-friendly. You own the player — stop its tracks on shutdown so the decoder/socket is released.

Don't pass an untrusted url. FFmpeg picks the protocol/demuxer from the string, so a hostile value can read local files or reach internal URLs (SSRF). Validate before forwarding — see the Media reference.

From a locally-attached camera

If the camera is wired to the device itself, from_camera captures it directly:

from metered_realtime import from_camera, MediaStream

cam = from_camera(width=640, height=480, fps=15)
if cam.video is not None:
peer.add_track(cam.video, MediaStream(id="edge-cam"), {"role": "camera"})

Real device capture is platform- and FFmpeg-dependent (V4L2 on Linux, AVFoundation on macOS, DirectShow on Windows); device naming and availability vary by OS and build. from_rtsp / from_file are the portable options.

The encode CPU ceiling on small boards

This is the one to watch on edge hardware. The video the SDK sends is software-encoded (the camera's frames are decoded, then re-encoded for the WebRTC link). Software encode is CPU-heavy, and small SoCs — a Raspberry Pi, a low-power gateway — hit a ceiling fast. A 1080p30 stream that's trivial on a server can peg a Pi's CPU and stall the whole event loop.

Three ways to stay under the ceiling:

  1. Lower the resolution and frame rate. from_camera(width=640, height=480, fps=15) encodes a fraction of the pixels of 1080p30. Start low and raise it only if the board has headroom.
  2. Prefer from_rtsp and let the camera encode. Most IP cameras already produce an encoded stream. Pulling it over RTSP still means the SDK re-encodes for WebRTC, but ingesting RTSP on a more capable edge box (a small x86 mini-PC, a Jetson) rather than the camera's own SoC moves the encode cost off the constrained device.
  3. Run the bridge on a gateway, not the sensor. Put the MeteredPeer + from_rtsp bridge on the most capable machine on the local network and have it ingest the camera's RTSP feed. The tiny sensors stay on pub/sub (Shape 1); only the gateway does video.

See the Media reference capture notes for the platform specifics.

Telemetry + video together

A camera device often sends both: a video feed and a stream of sensor readings (motion events, temperature, battery). Run both on the same MeteredPeeradd_track for the video, and peer.send(...) / peer.send_to(...) for the JSON telemetry, which lands on viewers as a Data event.

# alongside the camera track on the same peer
await peer.send({"type": "motion", "ts": int(time.time() * 1000), "zone": "porch"})

If a device only ever sends telemetry (no video), use Shape 1 — SignallingClient is smaller and has no WebRTC dependency.

Reliability — what to do when the network drops

Field devices lose connectivity constantly. The SDK's auto-reconnect handles re-establishing the socket and replaying your subscriptions; your code owns the data produced while disconnected.

Local buffering on disconnect

A publish while the client isn't connected raises. Buffer locally and flush on reconnect (the Connected event fires again with is_reconnect=True):

from metered_realtime import SignallingClient, Connected

buffer: list[dict] = []

async def safe_publish(client: SignallingClient, channel: str, data: dict) -> None:
if client.state != "connected":
buffer.append({"channel": channel, "data": data})
if len(buffer) > 10_000:
buffer.pop(0) # bound the buffer; drop oldest
return
try:
await client.publish(channel, data)
except Exception:
buffer.append({"channel": channel, "data": data})

@client.on(Connected)
def _(ev: Connected) -> None:
if ev.is_reconnect:
asyncio.create_task(flush(client))

async def flush(client: SignallingClient) -> None:
while buffer and client.state == "connected":
item = buffer[0]
try:
await client.publish(item["channel"], item["data"])
buffer.pop(0)
except Exception:
return # still flaky — try again next reconnect

This gives at-most-once delivery across transient drops. For at-least-once you'd dedup server-side on a messageId you include in each reading.

Daemon-style reconnect

reconnect=ReconnectOptions(max_attempts=float("inf"), max_delay=60.0)

The default max_attempts=100 exhausts on a long outage and takes the device offline permanently. Field devices want float("inf"); the 60-second max_delay cap keeps the device polite when the problem is server-side.

For the full reconnect playbook, see Reconnect Best Practices.

High-frequency telemetry

If you publish more than ~10 messages/sec/device:

  1. Message quota. Every publish counts against your account's per-period message limit. A device publishing every 100 ms is 36,000 messages/hour. Make sure your plan covers fleet × frequency.

  2. Batch readings. Accumulate several readings client-side and publish them in one message — saves both quota and server CPU.

batch: list[dict] = []
# producer: batch.append(read_sensor()) every 100 ms
# flusher, every 1 s:
if batch:
await client.publish(f"fleet/{DEVICE_ID}/telemetry", {"readings": batch})
batch.clear()
  1. Outbound size cap. A message can't exceed the server's max_message_size (surfaced on the Connected event, default 64 KB). A large batch can overflow — split or compress.

  2. The SDK is not your storage layer. Messages are best-effort and ephemeral — a subscriber that's offline when a message is published misses it. Persist anything you can't lose to a real store (TimescaleDB, ClickHouse, …); the signalling layer is for live coordination, not history.

Pitfalls

  1. max_attempts=100 on a device. Defaults are tuned for browsers. Field devices need float("inf"), or a multi-hour outage takes them offline for good.

  2. No local buffering. Every publish during a disconnect raises and the reading is lost. If completeness matters, buffer and bound it.

  3. Treating subscribers as durable. A subscriber coming online after a message was published doesn't see it. Persist what you can't afford to miss.

  4. Streaming 1080p30 off a Raspberry Pi. Software encode pegs a small SoC's CPU and stalls the event loop. Lower resolution/fps, or run the bridge on a more capable edge box ingesting the camera's RTSP.

  5. Not checking .video / .audio for None. A source missing that media returns None for the track; passing None to add_track fails. Guard before adding.

  6. Leaking the player. A from_rtsp / from_camera player you never stop holds the decoder/socket open. Stop its tracks on shutdown.

  7. Trusting a from field in the payload for device identity. Use the server-stamped ev.sender_peer_id (and the device's token-asserted peer id), not a self-reported field inside data.

See also