diff --git a/example/README.md b/example/README.md index a9e4b31..b7991d8 100644 --- a/example/README.md +++ b/example/README.md @@ -10,14 +10,12 @@ The application consumes messages from the `input_channel` in the form: } ``` -This example includes mult - And sends messages to `output_channel` in the form: ```json { - "state": "processed", - "id": "1dde5e4e-c690-49ac-a6c0-d63cf26edef0" + "state": "processed", + "id": "1dde5e4e-c690-49ac-a6c0-d63cf26edef0" } ``` @@ -28,7 +26,7 @@ This example includes multiple docker compose files for different protocols, whi Run the Kafka compose file with: ```commandline -docker compose --file kafka/docker-compose.yml up --detach +docker compose --file kafka/docker-compose.yaml up --detach ``` This includes: @@ -47,7 +45,7 @@ asyncfast run amgi-aiokafka main:app input_channel Run the MQTT compose file with: ```commandline -docker compose --file mqtt/docker-compose.yml up --detach +docker compose --file mqtt/docker-compose.yaml up --detach ``` This includes: @@ -100,5 +98,24 @@ This includes: Connect the app via Redis with: ```commandline -asyncfast run amgi-redis main:app input_channel +asyncfast run amgi-redis main:app input_channel +``` + +## AMQP + +Run the AMQP compose file with: + +```commandline +docker compose --file amqp/docker-compose.yaml up --detach +``` + +This includes: + +- RabbitMQ `amqp://guest:guest@localhost:5672/` +- RabbitMQ Management UI ([`http://localhost:15672/`](http://localhost:15672/)) (username: `guest`, password: `guest`) + +Connect the app via AMQP with: + +```commandline +asyncfast run amgi-aiopika-amqp main:app input_channel ``` diff --git a/example/amqp/docker-compose.yaml b/example/amqp/docker-compose.yaml new file mode 100644 index 0000000..5b7285b --- /dev/null +++ b/example/amqp/docker-compose.yaml @@ -0,0 +1,9 @@ +services: + rabbitmq: + image: rabbitmq:4.2.1-management + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest diff --git a/example/kafka/docker-compose.yml b/example/kafka/docker-compose.yaml similarity index 100% rename from example/kafka/docker-compose.yml rename to example/kafka/docker-compose.yaml diff --git a/packages/amgi-aiopika-amqp/README.md b/packages/amgi-aiopika-amqp/README.md new file mode 100644 index 0000000..2ff168c --- /dev/null +++ b/packages/amgi-aiopika-amqp/README.md @@ -0,0 +1,37 @@ +# amgi-aiopika-amqp + +:construction: This package is currently under development :construction: + +AMGI server for AMQP using aio-pika. + +## Installation + +``` +pip install amgi-aiopika-amqp==0.21.0 +``` + +## Usage + +```python +from asyncfast import AsyncFast +from amgi_aiopika_amqp import run + +app = AsyncFast() + + +@app.channel("my_queue") +async def handle_message(payload: str) -> None: + print(f"Received: {payload}") + + +if __name__ == "__main__": + run(app, "my_queue", url="amqp://guest:guest@localhost/") +``` + +## Contact + +For questions or suggestions, please contact [jack.burridge@mail.com](mailto:jack.burridge@mail.com). + +## License + +Copyright 2025 AMGI diff --git a/packages/amgi-aiopika-amqp/pyproject.toml b/packages/amgi-aiopika-amqp/pyproject.toml new file mode 100644 index 0000000..490c09b --- /dev/null +++ b/packages/amgi-aiopika-amqp/pyproject.toml @@ -0,0 +1,47 @@ +[build-system] +build-backend = "hatchling.build" +requires = [ "hatchling" ] + +[project] +name = "amgi-aiopika-amqp" +version = "0.21.0" +description = "AMQP AMGI server implementation using aio-pika" +readme = "README.md" +license = { text = "MIT" } +authors = [ { name = "Jack Burridge", email = "jack.burridge@mail.com" } ] +requires-python = ">=3.10" +classifiers = [ + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", +] +dependencies = [ + "aio-pika>=9.0.0", + "amgi-common==0.21.0", + "amgi-types==0.21.0", +] + +entry-points.amgi_server.amgi-aiopika-amqp = "amgi_aiopika_amqp:_run_cli" + +[tool.hatch.build.targets.wheel] +packages = [ "src/amgi_aiopika_amqp" ] + +[tool.uv] +dev-dependencies = [ + "pytest>=8.3.4", + "pytest-asyncio>=0.24.0", + "testcontainers>=4.9.0", + "pika>=1.3.0", +] + +[tool.uv.sources] +amgi-common = { workspace = true } +amgi-types = { workspace = true } + +[tool.pytest.ini_options] +testpaths = [ "tests_amgi_aiopika_amqp" ] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" diff --git a/packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/__init__.py b/packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/__init__.py new file mode 100644 index 0000000..26dd6af --- /dev/null +++ b/packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/__init__.py @@ -0,0 +1,129 @@ +import asyncio +from asyncio import Event +from asyncio import Task +from typing import Optional + +import aio_pika +from aio_pika import connect_robust +from aio_pika import IncomingMessage +from aio_pika.abc import AbstractRobustChannel +from aio_pika.abc import AbstractRobustConnection +from amgi_common import Lifespan +from amgi_types import AMGIApplication +from amgi_types import AMGISendEvent +from amgi_types import MessageReceiveEvent +from amgi_types import MessageScope + + +def run( + app: AMGIApplication, + queue: str, + url: str = "amqp://guest:guest@localhost/", + durable: bool = True, +) -> None: + asyncio.run(_run(app, queue, url, durable)) + + +async def _run(app: AMGIApplication, queue: str, url: str, durable: bool) -> None: + server = Server(app, queue, url, durable) + await server.serve() + + +def _run_cli( + app: AMGIApplication, + queues: list[str], + url: str = "amqp://guest:guest@localhost/", + durable: bool = True, +) -> None: + run(app, queues[0], url, durable) + + +class _MessageReceive: + def __init__(self, message: IncomingMessage) -> None: + self._message = message + + async def __call__(self) -> MessageReceiveEvent: + return { + "type": "message.receive", + "id": str(self._message.delivery_tag), + "headers": [ + (key.encode(), value.encode()) + for key, value in (self._message.headers or {}).items() + ], + "payload": self._message.body, + } + + +class _MessageSend: + def __init__(self, channel: AbstractRobustChannel) -> None: + self._channel = channel + + async def __call__(self, event: AMGISendEvent) -> None: + if event["type"] == "message.send": + await self._channel.default_exchange.publish( + aio_pika.Message( + body=event.get("payload", b""), + headers={ + key.decode(): value.decode() + for key, value in event.get("headers", []) + }, + ), + routing_key=event["address"], + ) + + +class Server: + def __init__( + self, + app: AMGIApplication, + queue: str, + url: str, + durable: bool = True, + ) -> None: + self._app = app + self._queue = queue + self._url = url + self._durable = durable + self._stop_event = Event() + self._tasks: set[Task[None]] = set() + self._connection: Optional[AbstractRobustConnection] = None + self._channel: Optional[AbstractRobustChannel] = None + + async def _handle_message(self, message: IncomingMessage) -> None: + scope: MessageScope = { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": self._queue, + } + + try: + await self._app( + scope, _MessageReceive(message), _MessageSend(self._channel) + ) + await message.ack() + except Exception: + await message.nack(requeue=True) + + async def serve(self) -> None: + self._connection = await connect_robust(self._url) + self._channel = await self._connection.channel() + + queue = await self._channel.declare_queue(self._queue, durable=self._durable) + + async with Lifespan(self._app) as state: + async with queue.iterator() as queue_iter: + async for message in queue_iter: + if self._stop_event.is_set(): + break + + task = asyncio.create_task(self._handle_message(message)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + await asyncio.gather(*self._tasks) + + await self._channel.close() + await self._connection.close() + + def stop(self) -> None: + self._stop_event.set() diff --git a/packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/py.typed b/packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-aiopika-amqp/tests_amgi_aiopika_amqp/__init__.py b/packages/amgi-aiopika-amqp/tests_amgi_aiopika_amqp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/amgi-aiopika-amqp/tests_amgi_aiopika_amqp/test_amqp_message_integration.py b/packages/amgi-aiopika-amqp/tests_amgi_aiopika_amqp/test_amqp_message_integration.py new file mode 100644 index 0000000..18c6a5d --- /dev/null +++ b/packages/amgi-aiopika-amqp/tests_amgi_aiopika_amqp/test_amqp_message_integration.py @@ -0,0 +1,213 @@ +import asyncio +from collections.abc import AsyncGenerator +from collections.abc import Generator +from typing import Any +from typing import cast +from uuid import uuid4 + +import pytest +from aio_pika import connect_robust +from aio_pika import Message +from amgi_aiopika_amqp import Server +from test_utils import MockApp +from testcontainers.rabbitmq import RabbitMqContainer + + +@pytest.fixture(scope="session") +def rabbitmq_container() -> Generator[RabbitMqContainer, None, None]: + with RabbitMqContainer("rabbitmq:4.2.1-management") as rabbitmq: + yield rabbitmq + + +@pytest.fixture +def amqp_url(rabbitmq_container: RabbitMqContainer) -> str: + host = rabbitmq_container.get_container_host_ip() + port = rabbitmq_container.get_exposed_port(5672) + return f"amqp://guest:guest@{host}:{port}/" + + +@pytest.fixture +def queue_name() -> str: + return f"receive-{uuid4()}" + + +@pytest.fixture() +async def app(amqp_url: str, queue_name: str) -> AsyncGenerator[MockApp, None]: + app = MockApp() + server = Server(app, queue_name, url=amqp_url) + loop = asyncio.get_running_loop() + serve_task = loop.create_task(server.serve()) + async with app.call() as (scope, receive, send): + assert scope == { + "amgi": {"spec_version": "1.0", "version": "1.0"}, + "type": "lifespan", + "state": {}, + } + lifespan_startup = await receive() + assert lifespan_startup == {"type": "lifespan.startup"} + await send(cast(Any, {"type": "lifespan.startup.complete"})) + yield app + server.stop() + for name in dir(server): + try: + attr = getattr(server, name) + except Exception: + continue + if attr is None: + continue + close = getattr(attr, "close", None) + if callable(close): + try: + maybe_coro = close() + except TypeError: + continue + if asyncio.iscoroutine(maybe_coro): + try: + await maybe_coro + except Exception: + pass + lifespan_shutdown = await receive() + assert lifespan_shutdown == {"type": "lifespan.shutdown"} + await send(cast(Any, {"type": "lifespan.shutdown.complete"})) + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + + +async def test_message(app: MockApp, amqp_url: str, queue_name: str) -> None: + """Test receiving messages through AMQP.""" + for attempt in range(5): + try: + connection = await connect_robust(amqp_url) + break + except ConnectionResetError: + if attempt == 4: + raise + await asyncio.sleep(0.5 * (2**attempt)) + async with connection: + channel = await connection.channel() + queue = await channel.declare_queue(queue_name, durable=True) + await channel.default_exchange.publish( + Message(body=b"value", headers={"test": "test"}), + routing_key=queue_name, + ) + + async with app.call() as (scope, receive, send): + assert scope == { + "address": queue_name, + "amgi": {"spec_version": "1.0", "version": "1.0"}, + "type": "message", + } + + message_receive = await receive() + assert message_receive["type"] == "message.receive" + assert message_receive["payload"] == b"value" + assert message_receive["headers"] == [(b"test", b"test")] + + await send(cast(Any, {"type": "message.ack", "id": message_receive["id"]})) + + +async def test_message_send(app: MockApp, amqp_url: str, queue_name: str) -> None: + """Test sending messages through AMQP.""" + send_queue_name = f"send-{uuid4()}" + + for attempt in range(5): + try: + connection = await connect_robust(amqp_url) + break + except ConnectionResetError: + if attempt == 4: + raise + await asyncio.sleep(0.5 * (2**attempt)) + + async with connection: + channel = await connection.channel() + await channel.declare_queue(queue_name, durable=True) + send_queue = await channel.declare_queue(send_queue_name, durable=True) + + await channel.default_exchange.publish( + Message(body=b"trigger"), + routing_key=queue_name, + ) + + async with app.call() as (scope, receive, send): + await send( + cast( + Any, + { + "type": "message.send", + "address": send_queue_name, + "headers": [(b"test", b"test")], + "payload": b"test", + }, + ) + ) + + incoming_message = await send_queue.get(timeout=5) + assert incoming_message.body == b"test" + assert incoming_message.headers == {"test": "test"} + await incoming_message.ack() + + +async def test_message_nack(app: MockApp, amqp_url: str, queue_name: str) -> None: + """Test negative acknowledgement of messages.""" + for attempt in range(5): + try: + connection = await connect_robust(amqp_url) + break + except ConnectionResetError: + if attempt == 4: + raise + await asyncio.sleep(0.5 * (2**attempt)) + async with connection: + channel = await connection.channel() + queue = await channel.declare_queue(queue_name, durable=True) + await channel.default_exchange.publish( + Message(body=b"value", headers={"test": "test"}), + routing_key=queue_name, + ) + + async with app.call() as (scope, receive, send): + assert scope == { + "address": queue_name, + "amgi": {"spec_version": "1.0", "version": "1.0"}, + "type": "message", + } + + message_receive = await receive() + assert message_receive["type"] == "message.receive" + assert message_receive["payload"] == b"value" + assert message_receive["headers"] == [(b"test", b"test")] + + await send( + cast( + Any, + { + "type": "message.nack", + "id": message_receive["id"], + "message": "", + "requeue": True, + }, + ) + ) + + message_receive_requeued = None + for attempt in range(10): + try: + message_receive_requeued = await asyncio.wait_for(receive(), timeout=1) + break + except asyncio.TimeoutError: + await asyncio.sleep(0.2) + + assert ( + message_receive_requeued is not None + ), "Message was not re-delivered to the application after NACK/requeue" + assert message_receive_requeued["type"] == "message.receive" + assert message_receive_requeued["payload"] == b"value" + assert message_receive_requeued["headers"] == [(b"test", b"test")] + + await send( + cast(Any, {"type": "message.ack", "id": message_receive_requeued["id"]}) + ) diff --git a/uv.lock b/uv.lock index 15c8601..4ec32da 100644 --- a/uv.lock +++ b/uv.lock @@ -7,6 +7,7 @@ members = [ "amgi", "amgi-aiobotocore", "amgi-aiokafka", + "amgi-aiopika-amqp", "amgi-common", "amgi-paho-mqtt", "amgi-redis", @@ -29,6 +30,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8d/3f/95338030883d8c8b91223b4e21744b04d11b161a3ef117295d8241f50ab4/accessible_pygments-0.0.5-py3-none-any.whl", hash = "sha256:88ae3211e68a1d0b011504b2ffc1691feafce124b845bd072ab6f9f66f34d4b7", size = 1395903, upload-time = "2024-05-10T11:23:08.421Z" }, ] +[[package]] +name = "aio-pika" +version = "9.5.8" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiormq" }, + { name = "exceptiongroup", marker = "python_full_version < '3.11'" }, + { name = "yarl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c5/73/8d1020683970de5532b3b01732d75c8bf922a6505fcdad1a9c7c6405242a/aio_pika-9.5.8.tar.gz", hash = "sha256:7c36874115f522bbe7486c46d8dd711a4dbedd67c4e8a8c47efe593d01862c62", size = 47408, upload-time = "2025-11-12T10:37:10.215Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7c/91/513971861d845d28160ecb205ae2cfaf618b16918a9cd4e0b832b5360ce7/aio_pika-9.5.8-py3-none-any.whl", hash = "sha256:f4c6cb8a6c5176d00f39fd7431e9702e638449bc6e86d1769ad7548b2a506a8d", size = 54397, upload-time = "2025-11-12T10:37:08.374Z" }, +] + [[package]] name = "aiobotocore" version = "2.25.2" @@ -222,6 +237,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/0d/4cb57231ff650a01123a09075bf098d8fdaf94b15a1a58465066b2251e8b/aiokafka-0.12.0-cp313-cp313-win_amd64.whl", hash = "sha256:bdc0a83eb386d2384325d6571f8ef65b4cfa205f8d1c16d7863e8d10cacd995a", size = 363194, upload-time = "2024-10-26T20:52:59.434Z" }, ] +[[package]] +name = "aiormq" +version = "6.9.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pamqp" }, + { name = "yarl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8e/f6/01bc850db6d9b46ae825e3c373f610b0544e725a1159745a6de99ad0d9f1/aiormq-6.9.2.tar.gz", hash = "sha256:d051d46086079934d3a7157f4d8dcb856b77683c2a94aee9faa165efa6a785d3", size = 30554, upload-time = "2025-10-20T10:49:59.763Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/52/ec/763b13f148f3760c1562cedb593feaffbae177eeece61af5d0ace7b72a3e/aiormq-6.9.2-py3-none-any.whl", hash = "sha256:ab0f4e88e70f874b0ea344b3c41634d2484b5dc8b17cb6ae0ae7892a172ad003", size = 31829, upload-time = "2025-10-20T10:49:58.547Z" }, +] + [[package]] name = "aiosignal" version = "1.4.0" @@ -343,6 +371,39 @@ dev = [ { name = "testcontainers", extras = ["kafka"], specifier = ">=4.13.0" }, ] +[[package]] +name = "amgi-aiopika-amqp" +version = "0.21.0" +source = { editable = "packages/amgi-aiopika-amqp" } +dependencies = [ + { name = "aio-pika" }, + { name = "amgi-common" }, + { name = "amgi-types" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pika" }, + { name = "pytest" }, + { name = "pytest-asyncio" }, + { name = "testcontainers" }, +] + +[package.metadata] +requires-dist = [ + { name = "aio-pika", specifier = ">=9.0.0" }, + { name = "amgi-common", editable = "packages/amgi-common" }, + { name = "amgi-types", editable = "packages/amgi-types" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pika", specifier = ">=1.3.0" }, + { name = "pytest", specifier = ">=8.3.4" }, + { name = "pytest-asyncio", specifier = ">=0.24.0" }, + { name = "testcontainers", specifier = ">=4.9.0" }, +] + [[package]] name = "amgi-common" version = "0.21.0" @@ -1333,6 +1394,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" }, ] +[[package]] +name = "pamqp" +version = "3.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/fb/62/35bbd3d3021e008606cd0a9532db7850c65741bbf69ac8a3a0d8cfeb7934/pamqp-3.3.0.tar.gz", hash = "sha256:40b8795bd4efcf2b0f8821c1de83d12ca16d5760f4507836267fd7a02b06763b", size = 30993, upload-time = "2024-01-12T20:37:25.085Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ac/8d/c1e93296e109a320e508e38118cf7d1fc2a4d1c2ec64de78565b3c445eb5/pamqp-3.3.0-py2.py3-none-any.whl", hash = "sha256:c901a684794157ae39b52cbf700db8c9aae7a470f13528b9d7b4e5f7202f8eb0", size = 33848, upload-time = "2024-01-12T20:37:21.359Z" }, +] + [[package]] name = "pathspec" version = "0.12.1" @@ -1342,6 +1412,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/20/ff623b09d963f88bfde16306a54e12ee5ea43e9b597108672ff3a408aad6/pathspec-0.12.1-py3-none-any.whl", hash = "sha256:a0d503e138a4c123b27490a4f7beda6a01c6f288df0e4a8b79c7eb0dc7b4cc08", size = 31191, upload-time = "2023-12-10T22:30:43.14Z" }, ] +[[package]] +name = "pika" +version = "1.3.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/db/db/d4102f356af18f316c67f2cead8ece307f731dd63140e2c71f170ddacf9b/pika-1.3.2.tar.gz", hash = "sha256:b2a327ddddf8570b4965b3576ac77091b850262d34ce8c1d8cb4e4146aa4145f", size = 145029, upload-time = "2023-05-05T14:25:43.368Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f9/f3/f412836ec714d36f0f4ab581b84c491e3f42c6b5b97a6c6ed1817f3c16d0/pika-1.3.2-py3-none-any.whl", hash = "sha256:0779a7c1fafd805672796085560d290213a465e4f6f76a6fb19e378d8041a14f", size = 155415, upload-time = "2023-05-05T14:25:41.484Z" }, +] + [[package]] name = "platformdirs" version = "4.4.0"