Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ orjson==3.10.15
# via fastapi
packaging==24.2
# via pytest
pillow==11.3.0
# via labthings-fastapi (pyproject.toml)
pluggy==1.5.0
# via pytest
pydantic==2.10.6
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dev = [
"mypy>=1.6.1, <2",
"ruff>=0.1.3",
"types-jsonschema",
"Pillow",
]

[project.urls]
Expand Down
59 changes: 52 additions & 7 deletions src/labthings_fastapi/outputs/mjpeg_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@

This response is initialised with an async generator that yields `bytes`
objects, each of which is a JPEG file. We add the --frame markers and mime
types that enable it to work in an `img` tag.
types that mark it as an MJPEG stream. This is sufficient to enable it to
work in an `img` tag, with the `src` set to the MJPEG stream's endpoint.

It expects an async generator that supplies individual JPEGs to be streamed,
such as the one provided by `.MJPEGStream`.

NB the ``status_code`` argument is used by FastAPI to set the status code of
the response in OpenAPI.
Expand All @@ -63,6 +67,24 @@


class MJPEGStream:
"""Manage streaming images over HTTP as an MJPEG stream

An MJPEGStream object handles accepting images (already in
JPEG format) and streaming them to HTTP clients as a multipart
response.

The minimum needed to make the stream work is to periodically
call `add_frame` with JPEG image data.

To add a stream to a `.Thing`, use the `.MJPEGStreamDescriptor`
which will handle creating an `MJPEGStream` object on first access,
and will also add it to the HTTP API.

The MJPEG stream buffers the last few frames (10 by default) and
also has a hook to notify the size of each frame as it is added.
The latter is used by OpenFlexure's autofocus routine.
"""

def __init__(self, ringbuffer_size: int = 10):
self._lock = threading.Lock()
self.condition = anyio.Condition()
Expand All @@ -85,10 +107,11 @@
]
self.last_frame_i = -1

def stop(self):
def stop(self, portal: BlockingPortal):
"""Stop the stream"""
with self._lock:
self._streaming = False
portal.start_task_soon(self.notify_stream_stopped)

async def ringbuffer_entry(self, i: int) -> RingbufferEntry:
"""Return the ith frame acquired by the camera
Expand All @@ -96,15 +119,15 @@
:param i: The index of the frame to read
"""
if i < 0:
raise ValueError("i must be >= 0")

Check warning on line 122 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

122 line is not covered with tests
if i < self.last_frame_i - len(self._ringbuffer) + 2:
raise ValueError("the ith frame has been overwritten")

Check warning on line 124 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

124 line is not covered with tests
if i > self.last_frame_i:
# TODO: await the ith frame
raise ValueError("the ith frame has not yet been acquired")

Check warning on line 127 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

127 line is not covered with tests
entry = self._ringbuffer[i % len(self._ringbuffer)]
if entry.index != i:
raise ValueError("the ith frame has been overwritten")

Check warning on line 130 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

130 line is not covered with tests
return entry

@asynccontextmanager
Expand All @@ -117,9 +140,13 @@
yield entry.frame

async def next_frame(self) -> int:
"""Wait for the next frame, and return its index"""
"""Wait for the next frame, and return its index

:raises StopAsyncIteration: if the stream has stopped."""
async with self.condition:
await self.condition.wait()
if not self._streaming:
raise StopAsyncIteration()
return self.last_frame_i

async def grab_frame(self) -> bytes:
Expand All @@ -128,18 +155,18 @@
This copies the frame for safety, so we can release the
read lock on the buffer.
"""
i = await self.next_frame()
async with self.buffer_for_reading(i) as frame:
return copy(frame)

Check warning on line 160 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

158-160 lines are not covered with tests

async def next_frame_size(self) -> int:
"""Wait for the next frame and return its size

This is useful if you want to use JPEG size as a sharpness metric.
"""
i = await self.next_frame()
async with self.buffer_for_reading(i) as frame:
return len(frame)

Check warning on line 169 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

167-169 lines are not covered with tests

async def frame_async_generator(self) -> AsyncGenerator[bytes, None]:
"""A generator that yields frames as bytes"""
Expand All @@ -148,15 +175,17 @@
i = await self.next_frame()
async with self.buffer_for_reading(i) as frame:
yield frame
except StopAsyncIteration:
break
except Exception as e:
logging.error(f"Error in stream: {e}, stream stopped")
return

Check warning on line 182 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

180-182 lines are not covered with tests

async def mjpeg_stream_response(self) -> MJPEGStreamResponse:
"""Return a StreamingResponse that streams an MJPEG stream"""
return MJPEGStreamResponse(self.frame_async_generator())

def add_frame(self, frame: bytes, portal: BlockingPortal):
def add_frame(self, frame: bytes, portal: BlockingPortal) -> None:
"""Return the next buffer in the ringbuffer to write to

:param frame: The frame to add
Expand All @@ -174,15 +203,31 @@
entry.index = self.last_frame_i + 1
portal.start_task_soon(self.notify_new_frame, entry.index)

async def notify_new_frame(self, i):
"""Notify any waiting tasks that a new frame is available"""
async def notify_new_frame(self, i: int) -> None:
"""Notify any waiting tasks that a new frame is available.

:param i: The number of the frame (which counts up since the server starts)
"""
async with self.condition:
self.last_frame_i = i
self.condition.notify_all()

async def notify_stream_stopped(self) -> None:
"""Raise an exception in any waiting tasks to signal the stream has stopped."""
assert self._streaming is False
async with self.condition:
self.condition.notify_all()


class MJPEGStreamDescriptor:
"""A descriptor that returns a MJPEGStream object when accessed"""
"""A descriptor that returns a MJPEGStream object when accessed

If this descriptor is added to a `.Thing`, it will create an `.MJPEGStream`
object when it is first accessed. It will also add two HTTP endpoints,
one with the name of the descriptor serving the MJPEG stream, and another
with `/viewer` appended, which serves a basic HTML page that views the stream.

"""

def __init__(self, **kwargs):
self._kwargs = kwargs
Expand Down Expand Up @@ -217,7 +262,7 @@
return obj.__dict__[self.name]

async def viewer_page(self, url: str) -> HTMLResponse:
return HTMLResponse(f"<html><body><img src='{url}'></body></html>")

Check warning on line 265 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

265 line is not covered with tests

def add_to_fastapi(self, app: FastAPI, thing: Thing):
"""Add the stream to the FastAPI app"""
Expand All @@ -231,4 +276,4 @@
response_class=HTMLResponse,
)
async def viewer_page():
return await self.viewer_page(f"{thing.path}{self.name}")

Check warning on line 279 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

279 line is not covered with tests
78 changes: 78 additions & 0 deletions tests/test_mjpeg_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import io
import threading
import time
from PIL import Image
from fastapi.testclient import TestClient
import labthings_fastapi as lt


class Telly(lt.Thing):
_stream_thread: threading.Thread
_streaming: bool = False
framerate: float = 1000
frame_limit: int = 3

stream = lt.outputs.MJPEGStreamDescriptor()

def __enter__(self):
self._streaming = True
self._stream_thread = threading.Thread(target=self._make_images)
self._stream_thread.start()

def __exit__(self, exc_t, exc_v, exc_tb):
self._streaming = False
self._stream_thread.join()

def _make_images(self):
"""Stream a series of solid colours"""
colours = ["#F00", "#0F0", "#00F"]
jpegs = []
for c in colours:
image = Image.new("RGB", (10, 10), c)
dest = io.BytesIO()
image.save(dest, "jpeg")
jpegs.append(dest.getvalue())

i = 0
while self._streaming and (i < self.frame_limit or self.frame_limit < 0):
self.stream.add_frame(
jpegs[i % len(jpegs)], self._labthings_blocking_portal
)
time.sleep(1 / self.framerate)
i = i + 1
self.stream.stop(self._labthings_blocking_portal)
self._streaming = False


def test_mjpeg_stream():
"""Verify the MJPEG stream contains at least one frame marker.

A limitation of the TestClient is that it can't actually stream.
This means that all of the frames sent by our test Thing will
arrive in a single packet.

For now, we just check it starts with the frame separator,
but it might be possible in the future to check there are three
images there.
"""
server = lt.ThingServer()
telly = Telly()
server.add_thing(telly, "telly")
with TestClient(server.app) as client:
with client.stream("GET", "/telly/stream") as stream:
stream.raise_for_status()
received = 0
for b in stream.iter_bytes():
received += 1
assert b.startswith(b"--frame")


if __name__ == "__main__":
import uvicorn

server = lt.ThingServer()
telly = Telly()
telly.framerate = 6
telly.frame_limit = -1
server.add_thing(telly, "telly")
uvicorn.run(server.app, port=5000)