Skip to content

FastAPI MJPEG Streamer

This example integrates Senxor non-blocking reads into an asyncio-based web framework (FastAPI). A background worker coroutine fetches and encodes frames to JPEG, and serves them as a continuous Motion JPEG (MJPEG) HTTP stream viewable in any browser.

"""FastAPI MJPEG Streamer for Senxor thermal cameras.

This example demonstrates how to integrate Senxor non-blocking reads into an
asyncio-based web framework like FastAPI. It runs a background worker coroutine
to continuously grab frames from the thermal camera, encodes them to JPEG, and
serves them as a continuous Motion JPEG (MJPEG) HTTP stream.

You can view the stream in any web browser by navigating to:
http://127.0.0.1:8000

External dependencies:
- fastapi
- uvicorn
- opencv-python
"""

import asyncio
from contextlib import asynccontextmanager

import cv2
import numpy as np
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse

from senxor import connect, list_senxor
from senxor.core import Senxor
from senxor.log import get_logger, setup_console_logger
from senxor.proc import enlarge, normalize

setup_console_logger("INFO")
logger = get_logger("fastapi_streamer")


class SenxorWorker:
    def __init__(self, senxor_device: Senxor):
        self.latest_jpeg_frame = None
        self.senxor_device = senxor_device

    async def senxor_worker(self):
        """Background coroutine to fetch and encode thermal frames."""
        logger.info("senxor_worker_started")
        try:
            while True:
                if self.senxor_device is not None:
                    # Non-blocking read is safe to run inside the asyncio event loop
                    _, thermal_raw = self.senxor_device.read(block=False)
                    if thermal_raw is not None:
                        # Process and colorize the thermal data
                        norm_img = normalize(thermal_raw, dtype=np.uint8)
                        norm_img = enlarge(norm_img, 3)
                        color_img = cv2.applyColorMap(norm_img, cv2.COLORMAP_INFERNO)

                        # Encode as JPEG
                        ret, buffer = cv2.imencode(".jpg", color_img)
                        if ret:
                            self.latest_jpeg_frame = buffer.tobytes()

                # Yield control back to the event loop so API requests can be served
                await asyncio.sleep(0.01)
        except asyncio.CancelledError:
            logger.info("senxor_worker_cancelled")

    async def frame_generator(self):
        """Generator for the MJPEG stream."""
        while True:
            if self.latest_jpeg_frame is not None:
                latest_jpeg_frame = self.latest_jpeg_frame
                self.latest_jpeg_frame = None
                yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + latest_jpeg_frame + b"\r\n")

            # Sleep briefly to control the framerate of the stream over the network
            await asyncio.sleep(0.03)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Lifespan context manager to handle startup and shutdown events."""
    devices = list_senxor("serial")
    if not devices:
        logger.error("no_senxor_devices_found")
        yield
        return

    senxor_device = connect(devices[0])
    senxor_device.fields.FRAME_RATE_DIVIDER.set(0)
    senxor_device.start_stream()
    logger.info("senxor_device_connected", device=devices[0])

    worker_instance = SenxorWorker(senxor_device)
    # Start the worker coroutine
    worker_task = asyncio.create_task(worker_instance.senxor_worker())

    # Store worker in app state to avoid global variables
    app.state.worker = worker_instance

    yield  # Hand control back to the FastAPI application

    # Shutdown cleanup
    if worker_instance.senxor_device is not None:
        worker_instance.senxor_device.close()
        logger.info("senxor_device_closed")
    if worker_task is not None:
        worker_task.cancel()


app = FastAPI(title="Senxor Thermal Streamer", lifespan=lifespan)


@app.get("/")
async def index():
    """Simple HTML page to display the MJPEG stream."""
    html_content = """
    <html>
        <head>
            <title>Senxor Thermal Stream</title>
            <style>
                body { background-color: #222; color: white; text-align: center; font-family: sans-serif; }
                img { border: 2px solid #555; margin-top: 20px; box-shadow: 0 0 15px rgba(0,0,0,0.5); }
            </style>
        </head>
        <body>
            <h1>Senxor Thermal Live Stream</h1>
            <img src="/stream" alt="Thermal Stream" />
        </body>
    </html>
    """
    return HTMLResponse(content=html_content)


@app.get("/stream")
async def video_stream(request: Request):
    """Endpoint serving the MJPEG continuous stream."""
    worker_instance: SenxorWorker | None = getattr(request.app.state, "worker", None)
    if worker_instance is None:
        return HTMLResponse("Worker not initialized", status_code=500)

    return StreamingResponse(
        worker_instance.frame_generator(),
        media_type="multipart/x-mixed-replace; boundary=frame",
    )


if __name__ == "__main__":
    logger.info("starting_server", url="http://127.0.0.1:8000")
    # Run the uvicorn server
    uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")