Skip to content

Concurrency patterns for Senxor: multi-device streaming and blocking I/O

In many real-world applications you rarely have just one sensor. It is common to run:

  • multiple Senxor thermal cameras,
  • Senxor plus one or more RGB cameras (for example cv2.VideoCapture),
  • Senxor combined with other sensors such as serial devices, LiDAR, PLCs, and so on.
  • Blocking IO operations such as file writes, database operations, network uploads, etc.

If you access every device with a simple blocking read() loop, one slow device can easily stall the whole system.

This guide is to help you pick the simplest pattern that works for your use case, and extend to more advanced patterns only when necessary.


1. Why multi-device parallelism or blocking IO is a problem

Let us start from a straightforward but problematic approach: a single blocking loop for multiple devices.

thermal_dev = connect(devices[0])
thermal_dev.start_stream()

cam = cv2.VideoCapture(0)

while True:
    header, thermal_frame = thermal_dev.read()  # blocks until a new frame is ready
    ret, rgb_frame = cam.read()                  # blocking read from RGB camera

    if thermal_frame is not None:
        thermal_image = normalize(thermal_frame, dtype=np.uint8)
        cv2.imshow("thermal", thermal_image)

    if ret and rgb_frame is not None:
        cv2.imshow("rgb", rgb_frame)

This code looks simple, but it has an important problem:

  • both thermal_dev.read() and cam.read() are blocking calls,
  • if any device is slow (low frame rate, IO delay, intermittent connection), the whole loop is stalled,
  • when you add more devices, the total blocking time just grows and grows.

To handle multiple devices reliably, you need to make sure that no single blocking call can hold the entire system hostage.

The rest of this guide shows four patterns to achieve this, starting from the simplest (single-thread, non-blocking reads) and moving towards more advanced models (threads and async).


2. Mode 1: single-thread non-blocking polling

For Senxor devices, the most straightforward parallel pattern is to use non-blocking reads:

header, frame = dev.read(block=False)

When block=False:

  • the call returns immediately,
  • if there is a new frame available, you get (header, frame),
  • if there is no new data yet, you get (None, None).
  • the call never blocks waiting for a new frame.

This allows you to poll multiple Senxor devices in a single thread without any device blocking the others.

2.1 Example: polling two Senxor devices in one loop

dev_0 = connect(devices[0])
dev_0.start_stream()

dev_1 = connect(devices[1])
dev_1.start_stream()

while True:
    _, frame_0 = dev_0.read(block=False)
    _, frame_1 = dev_1.read(block=False)

    if frame_0 is not None:
        image_0 = normalize(frame_0, dtype=np.uint8)
        cv2.imshow("thermal_0", image_0)

    if frame_1 is not None:
        image_1 = normalize(frame_1, dtype=np.uint8)
        cv2.imshow("thermal_1", image_1)

    time.sleep(0.01)

    key = cv2.waitKey(1)
    if key == 27:  # ESC to exit
        break

Characteristics:

  • the loop visits each device once per iteration,
  • all Senxor reads are non-blocking,
  • as long as your CPU can keep up, you can handle multiple Senxor devices in one thread.

2.2 Example: Senxor + one RGB camera

In many applications you have a single RGB camera in addition to a Senxor device. OpenCV’s VideoCapture.read() is a blocking function, but for a single camera it is usually fast enough to keep the loop responsive.

thermal_dev = connect(devices[0])
thermal_dev.start_stream()

cam = cv2.VideoCapture(0)

while True:
    _, thermal_frame = thermal_dev.read(block=False)  # non-blocking Senxor read
    ret, rgb_frame = cam.read()                        # blocking RGB read

    if thermal_frame is not None:
        thermal_image = normalize(thermal_frame, dtype=np.uint8)
        cv2.imshow("thermal", thermal_image)

    if ret and rgb_frame is not None:
        cv2.imshow("rgb", rgb_frame)

    key = cv2.waitKey(1)
    if key == 27:
        break

See the complete runnable example: Thermal + RGB.

In most scenarios this combination already works well because:

  • cam.read() is typically very fast for a single camera,
  • thermal_dev.read(block=False) never blocks, so the Senxor device does not slow down the loop.

2.3 When mode 1 is enough (and when it is not)

Mode 1 is usually sufficient when:

  • you only have Senxor devices, or just one extra fast RGB camera,
  • your per-frame processing is lightweight (simple display or basic processing),
  • the overall CPU usage is moderate.

However, you should consider other modes when:

  • you start to add more blocking devices (multiple RGB cameras, slow serial sensors, etc.),
  • your processing includes heavy IO (writing video to disk, database operations, network uploads),
  • any single read() or processing step can pause the entire main loop for noticeable time.

3. Mode 2: event-driven processing with Senxor.on()

Senxor devices also support an event-based model via the on() method:

dev = connect(devices[0])

dev.on("data",  lambda header, data: print("data received"))
dev.on("error", lambda error: print("device error:", error))
dev.on("open",  lambda: print("device opened"))
dev.on("close", lambda: print("device closed"))

Senxor.on() supports the following events:

  • "data": called when a new frame or data packet is available,
  • "error": called when a device error occurs,
  • "open": called when the device is successfully opened,
  • "close": called when the device is closed.

The method returns a function that you can call later to unregister the callback.

Callbacks are executed in background threads. This makes it safe to perform slow operations (disk writes, database inserts, network calls) inside the callbacks without blocking the main thread.

3.1 Example: saving frames to disk or database in a callback

Here is a simplified example where we do heavier work in the "data" callback:

def on_data(header, data):
    # This runs in a background thread.
    save_frame_to_disk(data)
    insert_frame_to_database(header, data)

def on_error(error):
    print("device error:", error)

dev = connect(devices[0])
dev.on("data", on_data)
dev.on("error", on_error)

dev.start_stream()

try:
    # The main thread can focus on light UI / monitoring logic.
    while True:
        if should_exit():
            break
finally:
    def.close()

This pattern is useful when:

  • you have heavy IO or processing per frame,
  • you want to keep the main thread free for UI, coordination, or additional device polling,
  • you prefer a “push” model (callbacks) instead of explicitly polling read(block=False) for every frame.

See the complete runnable example: Multi-Senxor Alarm.


4. Mode 3: multithreading – when to use threads

So far we have stayed inside a single main loop (plus background threads created by the Senxor library for callbacks). In many real-world systems you will also have other blocking devices and blocking libraries that do not offer a non-blocking API.

Typical examples:

  • multiple RGB cameras using cv2.VideoCapture,
  • serial devices or network APIs that block on read() or recv(),
  • long-running IO operations (file writes, database operations) that are hard to make non-blocking.

In these cases, the purpose of using threads is not primarily to “go faster”, but to make sure that:

A single blocking call cannot block all other devices.

The general pattern is:

  • move blocking operations into separate threads,
  • let those threads push data to the main thread via a queue or a shared variable,
  • keep the main thread busy only with lightweight operations (UI, coordination, non-blocking reads).

4.1 Thermal + RGB: using cv_utils.CVCamThread to wrap cv2.VideoCapture

For the most common combination of Senxor thermal cameras and RGB cameras, the library provides a helper in senxor.cv_utils:

  • class CVCamThread: runs a blocking VideoCapture.read() loop in a background thread and exposes a simple non-blocking read() and/or data callback interface.

4.1.1 Non-blocking read() from a background thread

import cv2
from senxor.cv_utils import CVCamThread

cam_0 = CVCamThread(cv2.VideoCapture(0))
cam_0.start()

while True:
    frame_0 = cam_0.read()  # non-blocking, returns latest frame or None
    if frame_0 is not None:
        cv2.imshow("cam_0", frame_0)

    time.sleep(0.01)

    key = cv2.waitKey(1)
    if key == 27:
        break

cam_0.stop()

The threading model here is:

  • the internal thread continuously calls the blocking VideoCapture.read() and stores the latest frame,
  • the main thread calls cam_0.read() to fetch the most recent frame without blocking.

4.1.2 Using a data callback in the camera thread

You can also provide a callback function to process frames directly in the camera thread:

import time
import numpy as np
import cv2
from senxor.cv_utils import CVCamThread

def on_data(frame: np.ndarray):
    print("frame received:", frame.shape)
    # Heavy processing, encoding or uploading can happen here.

cam_0 = CVCamThread(cv2.VideoCapture(0), on_data=on_data)
cam_0.start()

time.sleep(10)

cam_0.stop()

This is useful when you want the camera to “push” frames into some background processing pipeline.

See the complete runnable example: Thermal + RGB Recorder.

4.2 Other blocking devices and IO

CVCamThread solves the most common “thermal + RGB” combination, but you may have many other blocking operations in your system:

  • serial devices or custom sensors with blocking read() calls,
  • network APIs that block while waiting for data,
  • heavy file writes, database commits, and similar IO.

For these, the general guidelines are:

  • Prefer non-blocking APIs when available. Many libraries provide a way to configure non-blocking mode or poll for readiness. If your device supports this, the model becomes similar to read(block=False).

  • If the API is strictly blocking, move it to a separate thread. Create a dedicated thread that loops on the blocking call and uses thread-safe mechanisms (queues, shared variables) to pass results to the rest of your application.

  • For many small blocking tasks, consider a thread pool. Python’s concurrent.futures.ThreadPoolExecutor can manage a pool of threads for you. This is useful when IO operations are frequent but relatively short-lived.

Threads are a general tool: Senxor itself does not enforce a specific threading model for your other devices. The important part is to avoid letting a slow or blocking device run directly in your main loop.


5. Mode 4: asyncio integration

In some applications you need to expose Senxor data through network services, such as HTTP APIs or WebSocket streams. Frameworks like FastAPI are built on top of asyncio, so it is natural to integrate Senxor into an async environment.

The Senxor API does not provide an async_read method. However, because dev.read(block=False) is fully non-blocking, it can be combined with asyncio very easily.

5.1 Basic idea

The pattern is:

  1. Start a background async task that repeatedly calls dev.read(block=False).
  2. Store the latest frame in a shared variable or queue.
  3. Other async endpoints (for example a WebSocket handler) can read the latest frame and send it to clients.

5.2 Example: FastAPI with a background Senxor worker (simplified)

import asyncio
from fastapi import FastAPI, WebSocket

app = FastAPI()
latest_frame = None  # shared state for the latest frame

async def senxor_worker(dev):
    global latest_frame
    while True:
        _, frame = dev.read(block=False)
        if frame is not None:
            latest_frame = frame
        # Yield control so other tasks can run.
        await asyncio.sleep(0)

@app.on_event("startup")
async def startup_event():
    dev = connect(devices[0])
    dev.start_stream()
    asyncio.create_task(senxor_worker(dev))

@app.websocket("/ws/thermal")
async def websocket_endpoint(ws: WebSocket):
    await ws.accept()
    while True:
        if latest_frame is not None:
            await ws.send_bytes(encode_frame(latest_frame))
        await asyncio.sleep(0.05)

Key points:

  • the senxor_worker coroutine runs in the background, pulling frames from the device without blocking the event loop,
  • the WebSocket handler periodically sends out the latest available frame,
  • data is shared via a simple global or another concurrency primitive (such as an asyncio.Queue).

See the complete runnable example: FastAPI MJPEG Streamer.

5.3 Choosing between threads and asyncio

Very roughly:

  • if your application is already built around an async framework (FastAPI, aiohttp, websockets), mode 4 is a natural fit;
  • if you primarily deal with blocking libraries (legacy SDKs, drivers, or simple scripts), threads (mode 3) are often simpler to reason about;
  • you can also combine both: for example, use threads to wrap blocking devices and asyncio to serve data over the network.

6. Summary and decision checklist

In this guide we went through four complementary patterns for multi-device parallel streaming with Senxor:

  1. Single-thread non-blocking polling (read(block=False)) Ideal when you have a few Senxor devices and lightweight processing. Simple, easy to debug, and usually fast enough.

  2. Event-driven processing with Senxor.on(...) Useful when you want to move heavy IO (file writes, database operations, network calls) into background threads without blocking the main loop.

  3. Multithreading for blocking devices (CVCamThread and custom threads) Necessary when you combine Senxor with blocking APIs such as multiple cv2.VideoCapture instances, serial sensors, or other slow IO sources. Move blocking calls into threads, and keep the main thread responsive.

  4. Async integration with asyncio and frameworks like FastAPI A good choice when you need to expose Senxor data over HTTP or WebSocket in an async application. read(block=False) makes it simple to integrate without blocking the event loop.

As a quick checklist for your project:

  • only Senxor, light processing → start with mode 1;
  • heavy IO per frame (file/DB/network) → add mode 2;
  • multiple blocking RGB or other devices → design threads as in mode 3;
  • need HTTP or WebSocket streaming → integrate mode 4 on top of the previous patterns.

You can start from the simplest mode that works and gradually adopt callbacks, threads, and async as your system and performance requirements grow.