Concurrency patterns for Senxor: multi-device streaming and blocking I/O¶
In many real-world applications you rarely have just one sensor. It is common to run:
- multiple Senxor thermal cameras,
- Senxor plus one or more RGB cameras (for example
cv2.VideoCapture), - Senxor combined with other sensors such as serial devices, LiDAR, PLCs, and so on.
- Blocking IO operations such as file writes, database operations, network uploads, etc.
If you access every device with a simple blocking read() loop, one slow device can easily stall the whole system.
This guide is to help you pick the simplest pattern that works for your use case, and extend to more advanced patterns only when necessary.
1. Why multi-device parallelism or blocking IO is a problem¶
Let us start from a straightforward but problematic approach: a single blocking loop for multiple devices.
thermal_dev = connect(devices[0])
thermal_dev.start_stream()
cam = cv2.VideoCapture(0)
while True:
header, thermal_frame = thermal_dev.read() # blocks until a new frame is ready
ret, rgb_frame = cam.read() # blocking read from RGB camera
if thermal_frame is not None:
thermal_image = normalize(thermal_frame, dtype=np.uint8)
cv2.imshow("thermal", thermal_image)
if ret and rgb_frame is not None:
cv2.imshow("rgb", rgb_frame)
This code looks simple, but it has an important problem:
- both
thermal_dev.read()andcam.read()are blocking calls, - if any device is slow (low frame rate, IO delay, intermittent connection), the whole loop is stalled,
- when you add more devices, the total blocking time just grows and grows.
To handle multiple devices reliably, you need to make sure that no single blocking call can hold the entire system hostage.
The rest of this guide shows four patterns to achieve this, starting from the simplest (single-thread, non-blocking reads) and moving towards more advanced models (threads and async).
2. Mode 1: single-thread non-blocking polling¶
For Senxor devices, the most straightforward parallel pattern is to use non-blocking reads:
header, frame = dev.read(block=False)
When block=False:
- the call returns immediately,
- if there is a new frame available, you get
(header, frame), - if there is no new data yet, you get
(None, None). - the call never blocks waiting for a new frame.
This allows you to poll multiple Senxor devices in a single thread without any device blocking the others.
2.1 Example: polling two Senxor devices in one loop¶
dev_0 = connect(devices[0])
dev_0.start_stream()
dev_1 = connect(devices[1])
dev_1.start_stream()
while True:
_, frame_0 = dev_0.read(block=False)
_, frame_1 = dev_1.read(block=False)
if frame_0 is not None:
image_0 = normalize(frame_0, dtype=np.uint8)
cv2.imshow("thermal_0", image_0)
if frame_1 is not None:
image_1 = normalize(frame_1, dtype=np.uint8)
cv2.imshow("thermal_1", image_1)
time.sleep(0.01)
key = cv2.waitKey(1)
if key == 27: # ESC to exit
break
Characteristics:
- the loop visits each device once per iteration,
- all Senxor reads are non-blocking,
- as long as your CPU can keep up, you can handle multiple Senxor devices in one thread.
2.2 Example: Senxor + one RGB camera¶
In many applications you have a single RGB camera in addition to a Senxor device. OpenCV’s VideoCapture.read() is a blocking function, but for a single camera it is usually fast enough to keep the loop responsive.
thermal_dev = connect(devices[0])
thermal_dev.start_stream()
cam = cv2.VideoCapture(0)
while True:
_, thermal_frame = thermal_dev.read(block=False) # non-blocking Senxor read
ret, rgb_frame = cam.read() # blocking RGB read
if thermal_frame is not None:
thermal_image = normalize(thermal_frame, dtype=np.uint8)
cv2.imshow("thermal", thermal_image)
if ret and rgb_frame is not None:
cv2.imshow("rgb", rgb_frame)
key = cv2.waitKey(1)
if key == 27:
break
See the complete runnable example: Thermal + RGB.
In most scenarios this combination already works well because:
cam.read()is typically very fast for a single camera,thermal_dev.read(block=False)never blocks, so the Senxor device does not slow down the loop.
2.3 When mode 1 is enough (and when it is not)¶
Mode 1 is usually sufficient when:
- you only have Senxor devices, or just one extra fast RGB camera,
- your per-frame processing is lightweight (simple display or basic processing),
- the overall CPU usage is moderate.
However, you should consider other modes when:
- you start to add more blocking devices (multiple RGB cameras, slow serial sensors, etc.),
- your processing includes heavy IO (writing video to disk, database operations, network uploads),
- any single
read()or processing step can pause the entire main loop for noticeable time.
3. Mode 2: event-driven processing with Senxor.on()¶
Senxor devices also support an event-based model via the on() method:
dev = connect(devices[0])
dev.on("data", lambda header, data: print("data received"))
dev.on("error", lambda error: print("device error:", error))
dev.on("open", lambda: print("device opened"))
dev.on("close", lambda: print("device closed"))
Senxor.on() supports the following events:
"data": called when a new frame or data packet is available,"error": called when a device error occurs,"open": called when the device is successfully opened,"close": called when the device is closed.
The method returns a function that you can call later to unregister the callback.
Callbacks are executed in background threads. This makes it safe to perform slow operations (disk writes, database inserts, network calls) inside the callbacks without blocking the main thread.
3.1 Example: saving frames to disk or database in a callback¶
Here is a simplified example where we do heavier work in the "data" callback:
def on_data(header, data):
# This runs in a background thread.
save_frame_to_disk(data)
insert_frame_to_database(header, data)
def on_error(error):
print("device error:", error)
dev = connect(devices[0])
dev.on("data", on_data)
dev.on("error", on_error)
dev.start_stream()
try:
# The main thread can focus on light UI / monitoring logic.
while True:
if should_exit():
break
finally:
def.close()
This pattern is useful when:
- you have heavy IO or processing per frame,
- you want to keep the main thread free for UI, coordination, or additional device polling,
- you prefer a “push” model (callbacks) instead of explicitly polling
read(block=False)for every frame.
See the complete runnable example: Multi-Senxor Alarm.
4. Mode 3: multithreading – when to use threads¶
So far we have stayed inside a single main loop (plus background threads created by the Senxor library for callbacks). In many real-world systems you will also have other blocking devices and blocking libraries that do not offer a non-blocking API.
Typical examples:
- multiple RGB cameras using
cv2.VideoCapture, - serial devices or network APIs that block on
read()orrecv(), - long-running IO operations (file writes, database operations) that are hard to make non-blocking.
In these cases, the purpose of using threads is not primarily to “go faster”, but to make sure that:
A single blocking call cannot block all other devices.
The general pattern is:
- move blocking operations into separate threads,
- let those threads push data to the main thread via a queue or a shared variable,
- keep the main thread busy only with lightweight operations (UI, coordination, non-blocking reads).
4.1 Thermal + RGB: using cv_utils.CVCamThread to wrap cv2.VideoCapture¶
For the most common combination of Senxor thermal cameras and RGB cameras, the library provides a helper in senxor.cv_utils:
- class
CVCamThread: runs a blockingVideoCapture.read()loop in a background thread and exposes a simple non-blockingread()and/or data callback interface.
4.1.1 Non-blocking read() from a background thread¶
import cv2
from senxor.cv_utils import CVCamThread
cam_0 = CVCamThread(cv2.VideoCapture(0))
cam_0.start()
while True:
frame_0 = cam_0.read() # non-blocking, returns latest frame or None
if frame_0 is not None:
cv2.imshow("cam_0", frame_0)
time.sleep(0.01)
key = cv2.waitKey(1)
if key == 27:
break
cam_0.stop()
The threading model here is:
- the internal thread continuously calls the blocking
VideoCapture.read()and stores the latest frame, - the main thread calls
cam_0.read()to fetch the most recent frame without blocking.
4.1.2 Using a data callback in the camera thread¶
You can also provide a callback function to process frames directly in the camera thread:
import time
import numpy as np
import cv2
from senxor.cv_utils import CVCamThread
def on_data(frame: np.ndarray):
print("frame received:", frame.shape)
# Heavy processing, encoding or uploading can happen here.
cam_0 = CVCamThread(cv2.VideoCapture(0), on_data=on_data)
cam_0.start()
time.sleep(10)
cam_0.stop()
This is useful when you want the camera to “push” frames into some background processing pipeline.
See the complete runnable example: Thermal + RGB Recorder.
4.2 Other blocking devices and IO¶
CVCamThread solves the most common “thermal + RGB” combination, but you may have many other blocking operations in your system:
- serial devices or custom sensors with blocking
read()calls, - network APIs that block while waiting for data,
- heavy file writes, database commits, and similar IO.
For these, the general guidelines are:
-
Prefer non-blocking APIs when available. Many libraries provide a way to configure non-blocking mode or poll for readiness. If your device supports this, the model becomes similar to
read(block=False). -
If the API is strictly blocking, move it to a separate thread. Create a dedicated thread that loops on the blocking call and uses thread-safe mechanisms (queues, shared variables) to pass results to the rest of your application.
-
For many small blocking tasks, consider a thread pool. Python’s
concurrent.futures.ThreadPoolExecutorcan manage a pool of threads for you. This is useful when IO operations are frequent but relatively short-lived.
Threads are a general tool: Senxor itself does not enforce a specific threading model for your other devices. The important part is to avoid letting a slow or blocking device run directly in your main loop.
5. Mode 4: asyncio integration¶
In some applications you need to expose Senxor data through network services, such as HTTP APIs or WebSocket streams. Frameworks like FastAPI are built on top of asyncio, so it is natural to integrate Senxor into an async environment.
The Senxor API does not provide an async_read method. However, because dev.read(block=False) is fully non-blocking, it can be combined with asyncio very easily.
5.1 Basic idea¶
The pattern is:
- Start a background async task that repeatedly calls
dev.read(block=False). - Store the latest frame in a shared variable or queue.
- Other async endpoints (for example a WebSocket handler) can read the latest frame and send it to clients.
5.2 Example: FastAPI with a background Senxor worker (simplified)¶
import asyncio
from fastapi import FastAPI, WebSocket
app = FastAPI()
latest_frame = None # shared state for the latest frame
async def senxor_worker(dev):
global latest_frame
while True:
_, frame = dev.read(block=False)
if frame is not None:
latest_frame = frame
# Yield control so other tasks can run.
await asyncio.sleep(0)
@app.on_event("startup")
async def startup_event():
dev = connect(devices[0])
dev.start_stream()
asyncio.create_task(senxor_worker(dev))
@app.websocket("/ws/thermal")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
while True:
if latest_frame is not None:
await ws.send_bytes(encode_frame(latest_frame))
await asyncio.sleep(0.05)
Key points:
- the
senxor_workercoroutine runs in the background, pulling frames from the device without blocking the event loop, - the WebSocket handler periodically sends out the latest available frame,
- data is shared via a simple global or another concurrency primitive (such as an
asyncio.Queue).
See the complete runnable example: FastAPI MJPEG Streamer.
5.3 Choosing between threads and asyncio¶
Very roughly:
- if your application is already built around an async framework (FastAPI, aiohttp, websockets), mode 4 is a natural fit;
- if you primarily deal with blocking libraries (legacy SDKs, drivers, or simple scripts), threads (mode 3) are often simpler to reason about;
- you can also combine both: for example, use threads to wrap blocking devices and
asyncioto serve data over the network.
6. Summary and decision checklist¶
In this guide we went through four complementary patterns for multi-device parallel streaming with Senxor:
-
Single-thread non-blocking polling (
read(block=False)) Ideal when you have a few Senxor devices and lightweight processing. Simple, easy to debug, and usually fast enough. -
Event-driven processing with
Senxor.on(...)Useful when you want to move heavy IO (file writes, database operations, network calls) into background threads without blocking the main loop. -
Multithreading for blocking devices (
CVCamThreadand custom threads) Necessary when you combine Senxor with blocking APIs such as multiplecv2.VideoCaptureinstances, serial sensors, or other slow IO sources. Move blocking calls into threads, and keep the main thread responsive. -
Async integration with
asyncioand frameworks like FastAPI A good choice when you need to expose Senxor data over HTTP or WebSocket in an async application.read(block=False)makes it simple to integrate without blocking the event loop.
As a quick checklist for your project:
- only Senxor, light processing → start with mode 1;
- heavy IO per frame (file/DB/network) → add mode 2;
- multiple blocking RGB or other devices → design threads as in mode 3;
- need HTTP or WebSocket streaming → integrate mode 4 on top of the previous patterns.
You can start from the simplest mode that works and gradually adopt callbacks, threads, and async as your system and performance requirements grow.