From 8f09ff4a65cf01d58758dddaeebf15c59c24c2d4 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Sat, 22 Nov 2025 15:02:46 +0000 Subject: [PATCH 01/11] feat: add nats support --- packages/amgi-nats-py/LICENSE | 21 ++++ packages/amgi-nats-py/README.md | 0 packages/amgi-nats-py/pyproject.toml | 39 ++++++++ .../amgi-nats-py/src/amgi_nats_py/__init__.py | 0 .../amgi-nats-py/src/amgi_nats_py/_base.py | 87 +++++++++++++++++ .../amgi-nats-py/src/amgi_nats_py/core.py | 34 +++++++ .../amgi-nats-py/src/amgi_nats_py/py.typed | 0 .../amgi-nats-py/tests_amgi_nats/__init__.py | 0 .../amgi-nats-py/tests_amgi_nats/conftest.py | 18 ++++ .../test_nats_core_message_integration.py | 97 +++++++++++++++++++ uv.lock | 45 ++++++++- 11 files changed, 338 insertions(+), 3 deletions(-) create mode 100644 packages/amgi-nats-py/LICENSE create mode 100644 packages/amgi-nats-py/README.md create mode 100644 packages/amgi-nats-py/pyproject.toml create mode 100644 packages/amgi-nats-py/src/amgi_nats_py/__init__.py create mode 100644 packages/amgi-nats-py/src/amgi_nats_py/_base.py create mode 100644 packages/amgi-nats-py/src/amgi_nats_py/core.py create mode 100644 packages/amgi-nats-py/src/amgi_nats_py/py.typed create mode 100644 packages/amgi-nats-py/tests_amgi_nats/__init__.py create mode 100644 packages/amgi-nats-py/tests_amgi_nats/conftest.py create mode 100644 packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py diff --git a/packages/amgi-nats-py/LICENSE b/packages/amgi-nats-py/LICENSE new file mode 100644 index 0000000..5ae4694 --- /dev/null +++ b/packages/amgi-nats-py/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Jack Burridge + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/packages/amgi-nats-py/README.md b/packages/amgi-nats-py/README.md new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-nats-py/pyproject.toml b/packages/amgi-nats-py/pyproject.toml new file mode 100644 index 0000000..70dc7de --- /dev/null +++ b/packages/amgi-nats-py/pyproject.toml @@ -0,0 +1,39 @@ +[build-system] +build-backend = "uv_build" +requires = [ "uv-build>=0.8.14,<0.9.0" ] + +[project] +name = "amgi-nats-py" +version = "0.25.1" +description = "Add your description here" +readme = "README.md" +license = "MIT" +license-files = [ "LICENSE" ] +authors = [ + { name = "jack.burridge", email = "jack.burridge@mail.com" }, +] +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "amgi-common==0.25.1", + "amgi-types==0.25.1", + "nats-py>=2.12.0", +] + +[dependency-groups] +dev = [ + "test-utils", + "testcontainers[nats]>=4.13.0", +] + +[tool.uv.sources] +amgi-common = { workspace = true } +amgi-types = { workspace = true } +test-utils = { workspace = true } diff --git a/packages/amgi-nats-py/src/amgi_nats_py/__init__.py b/packages/amgi-nats-py/src/amgi_nats_py/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-nats-py/src/amgi_nats_py/_base.py b/packages/amgi-nats-py/src/amgi_nats_py/_base.py new file mode 100644 index 0000000..5b1e23d --- /dev/null +++ b/packages/amgi-nats-py/src/amgi_nats_py/_base.py @@ -0,0 +1,87 @@ +import asyncio +from abc import ABC +from abc import abstractmethod +from asyncio import Event +from functools import partial +from typing import Any +from typing import Union + +from amgi_common import Lifespan +from amgi_types import AMGIApplication +from amgi_types import AMGISendCallable +from amgi_types import MessageReceiveEvent +from amgi_types import MessageScope +from amgi_types import MessageSendEvent +from nats.aio.client import Client +from nats.aio.msg import Msg +from nats.js import JetStreamContext + + +NatsClient = Union[Client, JetStreamContext] + + +class _Receive: + def __init__(self, msg: Msg) -> None: + self._msg = msg + + async def __call__(self) -> MessageReceiveEvent: + return { + "type": "message.receive", + "id": self._msg.reply or "", + "headers": [ + (key.encode(), value.encode()) + for key, value in (self._msg.headers or {}).items() + ], + "payload": self._msg.data, + } + + +async def _message_send(client: NatsClient, event: MessageSendEvent) -> None: + await client.publish( + event["address"], + event.get("payload") or b"", + headers={key.decode(): value.decode() for key, value in event["headers"]}, + ) + + +class _BaseServer(ABC): + def __init__( + self, app: AMGIApplication, *subjects: str, servers: str | list[str] + ) -> None: + self._app = app + self._subjects = subjects + self._servers = servers + + self._stop_event = Event() + self._state: dict[str, Any] = {} + + async def _client_serve(self, client: NatsClient) -> None: + client_subscribe_cb = partial(self._subscribe_cb, client) + subscriptions = await asyncio.gather( + *( + client.subscribe(subject, cb=client_subscribe_cb) + for subject in self._subjects + ) + ) + + async with Lifespan(self._app, self._state): + await self._stop_event.wait() + + for subscription in subscriptions: + await subscription.unsubscribe() + + async def _subscribe_cb(self, client: NatsClient, msg: Msg) -> None: + scope: MessageScope = { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": msg.subject, + "state": self._state.copy(), + } + await self._app(scope, _Receive(msg), self._send(client, msg)) + + def stop(self) -> None: + self._stop_event.set() + + @abstractmethod + def _send(self, client: NatsClient, msg: Msg) -> AMGISendCallable: + raise NotImplementedError # pragma: no cover diff --git a/packages/amgi-nats-py/src/amgi_nats_py/core.py b/packages/amgi-nats-py/src/amgi_nats_py/core.py new file mode 100644 index 0000000..7665049 --- /dev/null +++ b/packages/amgi-nats-py/src/amgi_nats_py/core.py @@ -0,0 +1,34 @@ +from amgi_nats_py._base import _BaseServer +from amgi_nats_py._base import _message_send +from amgi_nats_py._base import NatsClient +from amgi_types import AMGIApplication +from amgi_types import AMGISendCallable +from amgi_types import AMGISendEvent +from nats.aio.client import Client +from nats.aio.msg import Msg + + +class _Send: + def __init__(self, client: Client, msg: Msg) -> None: + self._client = client + self._msg = msg + + async def __call__(self, event: AMGISendEvent) -> None: + if event["type"] == "message.send": + await _message_send(self._client, event) + + +class Server(_BaseServer): + def __init__( + self, app: AMGIApplication, *subjects: str, servers: str | list[str] + ) -> None: + super().__init__(app, *subjects, servers=servers) + + async def serve(self) -> None: + client = Client() + await client.connect(self._servers) + + await self._client_serve(client) + + def _send(self, client: NatsClient, msg: Msg) -> AMGISendCallable: + return _Send(client, msg) diff --git a/packages/amgi-nats-py/src/amgi_nats_py/py.typed b/packages/amgi-nats-py/src/amgi_nats_py/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-nats-py/tests_amgi_nats/__init__.py b/packages/amgi-nats-py/tests_amgi_nats/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-nats-py/tests_amgi_nats/conftest.py b/packages/amgi-nats-py/tests_amgi_nats/conftest.py new file mode 100644 index 0000000..d9fc7fe --- /dev/null +++ b/packages/amgi-nats-py/tests_amgi_nats/conftest.py @@ -0,0 +1,18 @@ +from collections.abc import Generator + +import pytest +from nats.aio.client import Client +from testcontainers.nats import NatsContainer + + +@pytest.fixture(scope="package") +def nats_container() -> Generator[NatsContainer, None, None]: + with NatsContainer() as nats_container: + yield nats_container + + +@pytest.fixture +async def client(nats_container: NatsContainer) -> Client: + client = Client() + await client.connect(nats_container.nats_uri()) + return client diff --git a/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py new file mode 100644 index 0000000..606a938 --- /dev/null +++ b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py @@ -0,0 +1,97 @@ +from collections.abc import AsyncGenerator +from uuid import uuid4 + +import pytest +from amgi_nats_py.core import Server +from nats.aio.client import Client +from test_utils import MockApp +from testcontainers.nats import NatsContainer + + +@pytest.fixture +def subject() -> str: + return f"receive-{uuid4()}" + + +@pytest.fixture +async def app( + nats_container: NatsContainer, subject: str +) -> AsyncGenerator[MockApp, None]: + app = MockApp() + server = Server(app, subject, servers=nats_container.nats_uri()) + + async with app.lifespan(server=server): + yield app + + +async def test_message(app: MockApp, subject: str, client: Client) -> None: + await client.publish(subject, b"value", headers={"test": "test"}) + + async with app.call() as (scope, receive, send): + assert scope == { + "address": subject, + "amgi": {"spec_version": "1.0", "version": "1.0"}, + "type": "message", + "state": {}, + } + + message_receive = await receive() + assert message_receive["type"] == "message.receive" + assert message_receive == { + "headers": [(b"test", b"test")], + "id": f"", + "payload": b"value", + "type": "message.receive", + } + + await send( + { + "type": "message.ack", + "id": "", + } + ) + + +async def test_message_send(app: MockApp, subject: str, client: Client) -> None: + send_subject = f"send-{uuid4()}" + await client.publish(subject, b"") + subscription = await client.subscribe(send_subject) + + async with app.call() as (scope, receive, send): + await send( + { + "type": "message.send", + "address": send_subject, + "headers": [(b"test", b"test")], + "payload": b"test", + } + ) + + msg = await anext(aiter(subscription.messages)) + + assert msg.subject == send_subject + assert msg.data == b"test" + assert msg.headers == {"test": "test"} + + await subscription.unsubscribe() + + +async def test_lifespan( + nats_container: NatsContainer, subject: str, client: Client +) -> None: + app = MockApp() + + server = Server(app, subject, servers=nats_container.nats_uri()) + + state_item = uuid4() + + async with app.lifespan({"item": state_item}, server): + await client.publish(subject, b"") + + async with app.call() as (scope, receive, send): + assert scope == { + "address": subject, + "amgi": {"spec_version": "1.0", "version": "1.0"}, + "type": "message", + "state": {"item": state_item}, + } diff --git a/uv.lock b/uv.lock index fc69509..abe4762 100644 --- a/uv.lock +++ b/uv.lock @@ -8,6 +8,7 @@ members = [ "amgi-aiobotocore", "amgi-aiokafka", "amgi-common", + "amgi-nats-py", "amgi-paho-mqtt", "amgi-redis", "amgi-sqs-event-source-mapping", @@ -353,6 +354,35 @@ name = "amgi-common" version = "0.25.1" source = { editable = "packages/amgi-common" } +[[package]] +name = "amgi-nats-py" +version = "0.25.1" +source = { editable = "packages/amgi-nats-py" } +dependencies = [ + { name = "amgi-common" }, + { name = "amgi-types" }, + { name = "nats-py" }, +] + +[package.dev-dependencies] +dev = [ + { name = "test-utils" }, + { name = "testcontainers", extra = ["nats"] }, +] + +[package.metadata] +requires-dist = [ + { name = "amgi-common", editable = "packages/amgi-common" }, + { name = "amgi-types", editable = "packages/amgi-types" }, + { name = "nats-py", specifier = ">=2.12.0" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "test-utils", editable = "packages/test-utils" }, + { name = "testcontainers", extras = ["nats"], specifier = ">=4.13.0" }, +] + [[package]] name = "amgi-paho-mqtt" version = "0.25.1" @@ -1435,6 +1465,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e2/de/21aa8394f16add8f7427f0a1326ccd2b3a2a8a3245c9252bc5ac034c6155/myst_parser-3.0.1-py3-none-any.whl", hash = "sha256:6457aaa33a5d474aca678b8ead9b3dc298e89c68e67012e73146ea6fd54babf1", size = 83163, upload-time = "2024-04-28T20:22:39.985Z" }, ] +[[package]] +name = "nats-py" +version = "2.12.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/71/c5/2564d917503fe8d68fe630c74bf6b678fbc15c01b58f2565894761010f57/nats_py-2.12.0.tar.gz", hash = "sha256:2981ca4b63b8266c855573fa7871b1be741f1889fd429ee657e5ffc0971a38a1", size = 119821, upload-time = "2025-10-31T05:27:31.247Z" } + [[package]] name = "nodeenv" version = "1.9.1" @@ -2155,7 +2191,7 @@ source = { editable = "packages/test-utils" } [[package]] name = "testcontainers" -version = "4.13.2" +version = "4.13.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "docker" }, @@ -2164,15 +2200,18 @@ dependencies = [ { name = "urllib3" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/18/51/edac83edab339d8b4dce9a7b659163afb1ea7e011bfed1d5573d495a4485/testcontainers-4.13.2.tar.gz", hash = "sha256:2315f1e21b059427a9d11e8921f85fef322fbe0d50749bcca4eaa11271708ba4", size = 78692, upload-time = "2025-10-07T21:53:07.531Z" } +sdist = { url = "https://files.pythonhosted.org/packages/fc/b3/c272537f3ea2f312555efeb86398cc382cd07b740d5f3c730918c36e64e1/testcontainers-4.13.3.tar.gz", hash = "sha256:9d82a7052c9a53c58b69e1dc31da8e7a715e8b3ec1c4df5027561b47e2efe646", size = 79064, upload-time = "2025-11-14T05:08:47.584Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/2a/5e/73aa94770f1df0595364aed526f31d54440db5492911e2857318ed326e51/testcontainers-4.13.2-py3-none-any.whl", hash = "sha256:0209baf8f4274b568cde95bef2cadf7b1d33b375321f793790462e235cd684ee", size = 124771, upload-time = "2025-10-07T21:53:05.937Z" }, + { url = "https://files.pythonhosted.org/packages/73/27/c2f24b19dafa197c514abe70eda69bc031c5152c6b1f1e5b20099e2ceedd/testcontainers-4.13.3-py3-none-any.whl", hash = "sha256:063278c4805ffa6dd85e56648a9da3036939e6c0ac1001e851c9276b19b05970", size = 124784, upload-time = "2025-11-14T05:08:46.053Z" }, ] [package.optional-dependencies] localstack = [ { name = "boto3" }, ] +nats = [ + { name = "nats-py" }, +] redis = [ { name = "redis" }, ] From 3a175b8a91504784298e5b637e3a8bb61c7de25c Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 18:06:24 +0000 Subject: [PATCH 02/11] feat(amgi-nats-py): add nats entrypoint and example --- example/README.md | 19 +++++++++++++++++++ example/nats/docker-compose.yaml | 12 ++++++++++++ packages/amgi-nats-py/pyproject.toml | 1 + .../amgi-nats-py/src/amgi_nats_py/core.py | 16 ++++++++++++++++ 4 files changed, 48 insertions(+) create mode 100644 example/nats/docker-compose.yaml diff --git a/example/README.md b/example/README.md index a9e4b31..4e5a825 100644 --- a/example/README.md +++ b/example/README.md @@ -102,3 +102,22 @@ Connect the app via Redis with: ```commandline asyncfast run amgi-redis main:app input_channel ``` + +## NATS + +Run the NATS compose file with: + +```commandline +docker compose --file nats/docker-compose.yaml up --detach +``` + +This includes: + +- Redis `nats://localhost:4222/` +- NUI NATS management GUI ([`http://localhost:31311/`](http://localhost:31311/)) + +Connect the app via Redis with: + +```commandline +asyncfast run amgi-nats-py-core main:app input_channel +``` diff --git a/example/nats/docker-compose.yaml b/example/nats/docker-compose.yaml new file mode 100644 index 0000000..8255048 --- /dev/null +++ b/example/nats/docker-compose.yaml @@ -0,0 +1,12 @@ +services: + nats: + image: nats:2.12.2 + ports: + - "4222:4222" + - "8222:8222" + nui: + image: ghcr.io/nats-nui/nui:0.8.1 + ports: + - 31311:31311 + depends_on: + - nats diff --git a/packages/amgi-nats-py/pyproject.toml b/packages/amgi-nats-py/pyproject.toml index 70dc7de..7a68a77 100644 --- a/packages/amgi-nats-py/pyproject.toml +++ b/packages/amgi-nats-py/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "amgi-types==0.25.1", "nats-py>=2.12.0", ] +entry-points.amgi_server.amgi-nats-py-core = "amgi_nats_py.core:_run_cli" [dependency-groups] dev = [ diff --git a/packages/amgi-nats-py/src/amgi_nats_py/core.py b/packages/amgi-nats-py/src/amgi_nats_py/core.py index 7665049..ed4a6fb 100644 --- a/packages/amgi-nats-py/src/amgi_nats_py/core.py +++ b/packages/amgi-nats-py/src/amgi_nats_py/core.py @@ -1,3 +1,4 @@ +from amgi_common import server_serve from amgi_nats_py._base import _BaseServer from amgi_nats_py._base import _message_send from amgi_nats_py._base import NatsClient @@ -8,6 +9,21 @@ from nats.aio.msg import Msg +def run(app: AMGIApplication, *subjects: str, servers: str | list[str]) -> None: + server = Server(app, *subjects, servers=servers) + server_serve(server) + + +def _run_cli( + app: AMGIApplication, + subjects: list[str], + servers: list[str] | None = None, +) -> None: + if servers is None: + servers = ["nats://localhost:4222"] + run(app, *subjects, servers=servers) + + class _Send: def __init__(self, client: Client, msg: Msg) -> None: self._client = client From 04f2342c25dfb8937927565e591b4567bdac29c0 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 19:59:25 +0000 Subject: [PATCH 03/11] feat(amgi-common): run servers using a new event loop, this adds multiprocess support --- packages/amgi-common/src/amgi_common/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/amgi-common/src/amgi_common/__init__.py b/packages/amgi-common/src/amgi_common/__init__.py index 73224a4..4caa723 100644 --- a/packages/amgi-common/src/amgi_common/__init__.py +++ b/packages/amgi-common/src/amgi_common/__init__.py @@ -274,7 +274,7 @@ def stop(self) -> None: def server_serve(server: _Server) -> None: - loop = asyncio.get_event_loop() + loop = asyncio.new_event_loop() loop.run_until_complete(_server_serve_async(server, loop)) From 0d1b1e679af0d1fa7a2c1ca133b9676ee712b613 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 20:01:34 +0000 Subject: [PATCH 04/11] test(test-utils): add helper to test for termination --- .../test-utils/src/test_utils/__init__.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/packages/test-utils/src/test_utils/__init__.py b/packages/test-utils/src/test_utils/__init__.py index 5b4e8f9..963571f 100644 --- a/packages/test-utils/src/test_utils/__init__.py +++ b/packages/test-utils/src/test_utils/__init__.py @@ -1,9 +1,11 @@ import asyncio +import multiprocessing from asyncio import Event from asyncio import Queue from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from typing import Any +from typing import Callable from typing import Optional from typing import Protocol @@ -70,3 +72,30 @@ async def __call__( return_event = Event() self._call_queue.put_nowait((scope, receive, send, return_event)) await return_event.wait() + + +def assert_run_can_terminate( + run: Callable[..., None], *args: Any, **kwargs: Any +) -> None: + lifespan_event = multiprocessing.Event() + + async def _app( + scope: Scope, receive: AMGIReceiveCallable, send: AMGISendCallable + ) -> None: + if scope["type"] == "lifespan": + lifespan_event.set() + raise Exception + + process = multiprocessing.Process( + target=run, + args=(_app, *args), + kwargs=kwargs, + ) + process.start() + + lifespan_event.wait() + assert lifespan_event.is_set() + + process.terminate() + process.join() + assert not process.is_alive() From 36e4ba0ab1b48f2b79cb83a8999a02deb5c1ccf6 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 20:03:07 +0000 Subject: [PATCH 05/11] test: add coverage settings to allow for multiprocessing --- .coveragerc | 4 ++++ tox.ini | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..a12d14f --- /dev/null +++ b/.coveragerc @@ -0,0 +1,4 @@ +[run] +concurrency = multiprocessing +parallel = true +sigterm = true \ No newline at end of file diff --git a/tox.ini b/tox.ini index 6aa884e..87d3b8e 100644 --- a/tox.ini +++ b/tox.ini @@ -20,7 +20,7 @@ pass_env = TESTCONTAINERS_RYUK_DISABLED TESTCONTAINERS_RYUK_PRIVILEGED commands = - pytest {tty:--color=yes} {posargs:-v --cov --cov-append --cov-branch --cov-report=xml} + pytest {tty:--color=yes} {posargs:-v --cov --cov-append --cov-report=xml} depends = {py313, py312, py311, py310}: clean uv_sync_flags = --all-packages From 645fb7679ba1811fcbab096f65f334accb18e1ee Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 20:03:45 +0000 Subject: [PATCH 06/11] test(amgi-nats-py): add tests for run functions --- .../tests_amgi_nats/test_nats_core_message.py | 14 ++++++++++++++ .../test_nats_core_message_integration.py | 15 +++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message.py diff --git a/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message.py b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message.py new file mode 100644 index 0000000..e708fc9 --- /dev/null +++ b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message.py @@ -0,0 +1,14 @@ +from unittest.mock import Mock +from unittest.mock import patch + +from amgi_nats_py import core +from amgi_nats_py.core import _run_cli + + +def test_run_cli_defaults() -> None: + with patch.object(core, "run") as mock_run: + mock_app = Mock() + _run_cli(mock_app, ["subject"]) + mock_run.assert_called_once_with( + mock_app, "subject", servers=["nats://localhost:4222"] + ) diff --git a/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py index 606a938..2d4a0e5 100644 --- a/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py +++ b/packages/amgi-nats-py/tests_amgi_nats/test_nats_core_message_integration.py @@ -2,8 +2,11 @@ from uuid import uuid4 import pytest +from amgi_nats_py.core import _run_cli +from amgi_nats_py.core import run from amgi_nats_py.core import Server from nats.aio.client import Client +from test_utils import assert_run_can_terminate from test_utils import MockApp from testcontainers.nats import NatsContainer @@ -95,3 +98,15 @@ async def test_lifespan( "type": "message", "state": {"item": state_item}, } + + +def test_run(client: Client, nats_container: NatsContainer) -> None: + assert_run_can_terminate( + run, f"receive-{uuid4()}", servers=nats_container.nats_uri() + ) + + +def test_run_cli(client: Client, nats_container: NatsContainer) -> None: + assert_run_can_terminate( + _run_cli, [f"receive-{uuid4()}"], servers=[nats_container.nats_uri()] + ) From b98ac8a6a7d1b5ca21eb9a40422468e54e70b61f Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 21:14:34 +0000 Subject: [PATCH 07/11] test(amgi-aiobotocore): add tests for sqs run functions --- .../test_sqs_message_integration.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py index ac21d8e..c5d2187 100644 --- a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py +++ b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py @@ -4,10 +4,13 @@ from uuid import uuid4 import pytest +from amgi_aiobotocore.sqs import _run_cli +from amgi_aiobotocore.sqs import run from amgi_aiobotocore.sqs import Server from amgi_aiobotocore.sqs import SqsBatchFailureError from amgi_types import MessageAckEvent from amgi_types import MessageNackEvent +from test_utils import assert_run_can_terminate from test_utils import MockApp from testcontainers.localstack import LocalStackContainer @@ -288,3 +291,25 @@ async def test_lifespan( "type": "message", "state": {"item": state_item}, } + + +def test_run(queue_name: str, localstack_container: LocalStackContainer) -> None: + assert_run_can_terminate( + run, + queue_name, + region_name=localstack_container.region_name, + endpoint_url=localstack_container.get_url(), + aws_access_key_id="testcontainers-localstack", + aws_secret_access_key="testcontainers-localstack", + ) + + +def test_run_cli(queue_name: str, localstack_container: LocalStackContainer) -> None: + assert_run_can_terminate( + _run_cli, + [queue_name], + region_name=localstack_container.region_name, + endpoint_url=localstack_container.get_url(), + aws_access_key_id="testcontainers-localstack", + aws_secret_access_key="testcontainers-localstack", + ) From f66c2fbcd6d054144ec5120fc7f2c23d672b53d0 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 21:18:34 +0000 Subject: [PATCH 08/11] test(amgi-aiokafka): add tests for run functions --- .../test_kafka_message_integration.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py index 7891e32..315291c 100644 --- a/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py +++ b/packages/amgi-aiokafka/tests_amgi_aiokafka/test_kafka_message_integration.py @@ -4,8 +4,11 @@ import pytest from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer +from amgi_aiokafka import _run_cli +from amgi_aiokafka import run from amgi_aiokafka import Server from amgi_types import MessageAckEvent +from test_utils import assert_run_can_terminate from test_utils import MockApp from testcontainers.kafka import KafkaContainer @@ -156,3 +159,11 @@ async def test_lifespan(bootstrap_server: str, topic: str) -> None: "type": "message", "state": {"item": state_item}, } + + +def test_run(bootstrap_server: str, topic: str) -> None: + assert_run_can_terminate(run, topic, bootstrap_servers=bootstrap_server) + + +def test_run_cli(bootstrap_server: str, topic: str) -> None: + assert_run_can_terminate(_run_cli, [topic], bootstrap_servers=bootstrap_server) From ad370254bda187b6ffa2cffbaa85239062eb24ca Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 21:21:58 +0000 Subject: [PATCH 09/11] style(amgi-common): use ellipsis as an indication that the protocol requires future implementation --- packages/amgi-common/src/amgi_common/__init__.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/amgi-common/src/amgi_common/__init__.py b/packages/amgi-common/src/amgi_common/__init__.py index 4caa723..1f87150 100644 --- a/packages/amgi-common/src/amgi_common/__init__.py +++ b/packages/amgi-common/src/amgi_common/__init__.py @@ -266,11 +266,9 @@ def _remove_on_exception(self, key: H, task: Task[R]) -> None: class _Server(Protocol): - async def serve(self) -> None: - pass + async def serve(self) -> None: ... - def stop(self) -> None: - pass + def stop(self) -> None: ... def server_serve(server: _Server) -> None: From 90c6fcab1f77806eb79e189fdc86173cfb8e28f2 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 21:28:50 +0000 Subject: [PATCH 10/11] test(amgi-redis): add tests for run functions --- .../test_redis_message_integration.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py index adb3d3a..691799f 100644 --- a/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py +++ b/packages/amgi-redis/tests_amgi_redis/test_redis_message_integration.py @@ -3,8 +3,11 @@ from uuid import uuid4 import pytest +from amgi_redis import _run_cli +from amgi_redis import run from amgi_redis import Server from redis.asyncio.client import PubSub +from test_utils import assert_run_can_terminate from test_utils import MockApp from testcontainers.redis import AsyncRedisContainer @@ -118,3 +121,17 @@ async def test_lifespan(redis_container: AsyncRedisContainer, channel: str) -> N "type": "message", "state": {"item": state_item}, } + + +def test_run(redis_container: AsyncRedisContainer, channel: str) -> None: + host = redis_container.get_container_host_ip() + port = redis_container.get_exposed_port(redis_container.port) + + assert_run_can_terminate(run, channel, url=f"redis://{host}:{port}") + + +def test_run_cli(redis_container: AsyncRedisContainer, channel: str) -> None: + host = redis_container.get_container_host_ip() + port = redis_container.get_exposed_port(redis_container.port) + + assert_run_can_terminate(_run_cli, [channel], url=f"redis://{host}:{port}") From 1f722b3163e44145446b8024bedc817f8c09f3e2 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 3 Dec 2025 21:30:28 +0000 Subject: [PATCH 11/11] fix(amgi-paho-mqtt): fix server so it does not attempt to get event loop at initialisation --- .../amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py | 7 ++++++- .../test_mqtt_message_integration.py | 11 +++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py b/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py index c4c3420..ea85127 100644 --- a/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py +++ b/packages/amgi-paho-mqtt/src/amgi_paho_mqtt/__init__.py @@ -1,7 +1,9 @@ import asyncio +from asyncio import AbstractEventLoop from asyncio import Event from asyncio import Future from asyncio import Task +from functools import cached_property from socket import SO_SNDBUF from socket import SOL_SOCKET from typing import Any @@ -71,7 +73,6 @@ def __init__( self._topic = topic self._host = host self._port = port - self._loop = asyncio.get_running_loop() self._client = Client( CallbackAPIVersion.VERSION2, client_id=client_id, protocol=protocol @@ -93,6 +94,10 @@ def __init__( self._publish_futures = WeakValueDictionary[int, Future[None]]() self._state: dict[str, Any] = {} + @cached_property + def _loop(self) -> AbstractEventLoop: + return asyncio.get_running_loop() + def _on_connect( self, client: Client, diff --git a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py index c1a0d0d..77ab4a2 100644 --- a/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py +++ b/packages/amgi-paho-mqtt/tests_amgi_paho_mqtt/test_mqtt_message_integration.py @@ -7,11 +7,13 @@ import pytest from amgi_paho_mqtt import PublishError +from amgi_paho_mqtt import run from amgi_paho_mqtt import Server from paho.mqtt.client import Client from paho.mqtt.client import MQTTMessage from paho.mqtt.client import MQTTv5 from paho.mqtt.enums import CallbackAPIVersion +from test_utils import assert_run_can_terminate from test_utils import MockApp from testcontainers.mqtt import MosquittoContainer @@ -164,3 +166,12 @@ async def test_message_send_deny( "bindings": {"mqtt": {"qos": 1}}, } ) + + +def test_run(topic: str, mosquitto_container: MosquittoContainer) -> None: + assert_run_can_terminate( + run, + topic, + host=mosquitto_container.get_container_host_ip(), + port=mosquitto_container.get_exposed_port(mosquitto_container.MQTT_PORT), + )