Skip to content

Multi-Senxor Alarm

This example scans all available Senxor devices, connects to them, and registers a data callback. If a certain temperature threshold is exceeded, it triggers an alarm, logs the event as JSON, and asynchronously saves an image without blocking other sensors.

"""Multi-device alarm system using event-driven callbacks.

This example scans all available Senxor devices, connects to them,
and registers a data callback using `dev.on("data")`.
The callback calculates the maximum value in the frame. If it exceeds
a threshold, it logs the event as JSON and saves an image to the disk.
These I/O operations happen in background threads, so they do not block
the main loop or other devices.

External dependencies:
- opencv-python
"""

from __future__ import annotations

import time
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING

import cv2
import numpy as np

from senxor import connect, list_senxor
from senxor.log import get_logger, setup_file_logger
from senxor.proc import normalize, parse_header

if TYPE_CHECKING:
    from senxor.core import Senxor

ALARM_THRESHOLD = 50  # Example threshold for temperature in Celsius

DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)

# Setup JSON file logging for structured alarm records
app_log = get_logger("alarm_system")
setup_file_logger(DATA_DIR / "alarm_log.json", log_level="INFO", json_format=True, logger_name="alarm_system")


def on_data(
    header: np.ndarray | None,
    frame: np.ndarray,
    senxor: Senxor,
):
    if header is None:
        app_log.error("header_is_none", device=senxor.name)

    else:
        parsed_header = parse_header(header)
        frame_counter = parsed_header.frame_counter
        vdd = parsed_header.vdd
        die_temp = parsed_header.die_temp

        app_log.info(
            "frame_received",
            device=senxor.name,
            frame_counter=frame_counter,
            vdd=vdd,
            die_temp=die_temp,
        )

    max_val = np.max(frame)
    if max_val > ALARM_THRESHOLD:
        app_log.info("alarm_triggered", senxor=senxor, max_val=float(max_val))

        image = normalize(frame, dtype=np.uint8)
        color_image = cv2.applyColorMap(image, cv2.COLORMAP_INFERNO)

        timestamp = int(time.time() * 1000)

        # Blocking I/O: save image to disk
        # Because this runs in the callback's background thread,
        # it won't block the main thread or other sensors.
        filename = f"data/alarm_{senxor.name}_{timestamp}.png"
        cv2.imwrite(filename, color_image)
        app_log.info("image_saved", filename=filename)


def main():
    devices = list_senxor("serial")
    if not devices:
        raise ValueError("No devices found")

    app_log.info("devices_found", count=len(devices))

    print("Devices found:")
    for dev in devices:
        print(f"- {dev.name}")

    connected_devices = []

    try:
        for port in devices:
            dev = connect(port)
            connected_devices.append(dev)
            callback = partial(on_data, senxor=dev)
            dev.on("data", callback)
            dev.start_stream()
            app_log.info("device_started", device=dev.name)

        app_log.info("system_running")

        print("System running, press Ctrl+C to exit")

        # The main thread can remain completely idle or do other lightweight tasks
        while True:
            time.sleep(1)

    except KeyboardInterrupt:
        app_log.info("system_shutting_down")
    finally:
        for dev in connected_devices:
            dev.close()
            app_log.info("device_closed", device=dev.name)


if __name__ == "__main__":
    print("=" * 80)
    print("Start senxor alarm system")
    print("=" * 80)
    main()
    print("=" * 80)
    print("Senxor alarm system stopped")
    print("=" * 80)