Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ The application consumes messages from the `input_channel` in the form:
}
```

This example includes mult

And sends messages to `output_channel` in the form:

```json
{
"state": "processed",
"id": "1dde5e4e-c690-49ac-a6c0-d63cf26edef0"
"state": "processed",
"id": "1dde5e4e-c690-49ac-a6c0-d63cf26edef0"
}
```

Expand All @@ -28,7 +26,7 @@ This example includes multiple docker compose files for different protocols, whi
Run the Kafka compose file with:

```commandline
docker compose --file kafka/docker-compose.yml up --detach
docker compose --file kafka/docker-compose.yaml up --detach
```

This includes:
Expand All @@ -47,7 +45,7 @@ asyncfast run amgi-aiokafka main:app input_channel
Run the MQTT compose file with:

```commandline
docker compose --file mqtt/docker-compose.yml up --detach
docker compose --file mqtt/docker-compose.yaml up --detach
```

This includes:
Expand Down Expand Up @@ -100,5 +98,24 @@ This includes:
Connect the app via Redis with:

```commandline
asyncfast run amgi-redis main:app input_channel
asyncfast run amgi-redis main:app input_channel
```

## AMQP

Run the AMQP compose file with:

```commandline
docker compose --file amqp/docker-compose.yaml up --detach
```

This includes:

- RabbitMQ `amqp://guest:guest@localhost:5672/`
- RabbitMQ Management UI ([`http://localhost:15672/`](http://localhost:15672/)) (username: `guest`, password: `guest`)

Connect the app via AMQP with:

```commandline
asyncfast run amgi-aiopika-amqp main:app input_channel
```
9 changes: 9 additions & 0 deletions example/amqp/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
services:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its possible to load definitions on rabbitmq start using the load_definitions option, this can fully setup the queues so they do not need to be declared elsewhere

https://www.rabbitmq.com/docs/definitions

rabbitmq:
image: rabbitmq:4.2.1-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
File renamed without changes.
37 changes: 37 additions & 0 deletions packages/amgi-aiopika-amqp/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# amgi-aiopika-amqp

:construction: This package is currently under development :construction:

AMGI server for AMQP using aio-pika.

## Installation

```
pip install amgi-aiopika-amqp==0.21.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should match the global project version, it will be bumped automatically

```

## Usage

```python
from asyncfast import AsyncFast
from amgi_aiopika_amqp import run

app = AsyncFast()


@app.channel("my_queue")
async def handle_message(payload: str) -> None:
print(f"Received: {payload}")


if __name__ == "__main__":
run(app, "my_queue", url="amqp://guest:guest@localhost/")
```

## Contact

For questions or suggestions, please contact [jack.burridge@mail.com](mailto:jack.burridge@mail.com).

## License

Copyright 2025 AMGI
47 changes: 47 additions & 0 deletions packages/amgi-aiopika-amqp/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[build-system]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build system should be uv_build

build-backend = "hatchling.build"
requires = [ "hatchling" ]

[project]
name = "amgi-aiopika-amqp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name = "amgi-aiopika-amqp"
name = "amgi-aio-pika"

The other packages are named after the core dependency. Not to be overly pedantic but once a package is published we probably want to stick with the name (Though I have changed them later on... so who really cares)

version = "0.21.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Versions should match the current version of the overall project, they will be bumped automatically

description = "AMQP AMGI server implementation using aio-pika"
readme = "README.md"
license = { text = "MIT" }
authors = [ { name = "Jack Burridge", email = "jack.burridge@mail.com" } ]
requires-python = ">=3.10"
classifiers = [
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
]
dependencies = [
"aio-pika>=9.0.0",
"amgi-common==0.21.0",
"amgi-types==0.21.0",
]

entry-points.amgi_server.amgi-aiopika-amqp = "amgi_aiopika_amqp:_run_cli"

[tool.hatch.build.targets.wheel]
packages = [ "src/amgi_aiopika_amqp" ]

[tool.uv]
dev-dependencies = [
"pytest>=8.3.4",
"pytest-asyncio>=0.24.0",
"testcontainers>=4.9.0",
"pika>=1.3.0",
]

[tool.uv.sources]
amgi-common = { workspace = true }
amgi-types = { workspace = true }

[tool.pytest.ini_options]
testpaths = [ "tests_amgi_aiopika_amqp" ]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
129 changes: 129 additions & 0 deletions packages/amgi-aiopika-amqp/src/amgi_aiopika_amqp/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import asyncio
from asyncio import Event
from asyncio import Task
from typing import Optional

import aio_pika
from aio_pika import connect_robust
from aio_pika import IncomingMessage
from aio_pika.abc import AbstractRobustChannel
from aio_pika.abc import AbstractRobustConnection
from amgi_common import Lifespan
from amgi_types import AMGIApplication
from amgi_types import AMGISendEvent
from amgi_types import MessageReceiveEvent
from amgi_types import MessageScope


def run(
app: AMGIApplication,
queue: str,
url: str = "amqp://guest:guest@localhost/",
durable: bool = True,
) -> None:
asyncio.run(_run(app, queue, url, durable))


async def _run(app: AMGIApplication, queue: str, url: str, durable: bool) -> None:
server = Server(app, queue, url, durable)
await server.serve()


def _run_cli(
app: AMGIApplication,
queues: list[str],
url: str = "amqp://guest:guest@localhost/",
durable: bool = True,
) -> None:
run(app, queues[0], url, durable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be possible to support multiple queues by simultaneously running multiple queue loops, look at the aiobotocore SQS implementation



class _MessageReceive:
def __init__(self, message: IncomingMessage) -> None:
self._message = message

async def __call__(self) -> MessageReceiveEvent:
return {
"type": "message.receive",
"id": str(self._message.delivery_tag),
"headers": [
(key.encode(), value.encode())
for key, value in (self._message.headers or {}).items()
],
"payload": self._message.body,
}


class _MessageSend:
def __init__(self, channel: AbstractRobustChannel) -> None:
self._channel = channel

async def __call__(self, event: AMGISendEvent) -> None:
if event["type"] == "message.send":
await self._channel.default_exchange.publish(
aio_pika.Message(
body=event.get("payload", b""),
headers={
key.decode(): value.decode()
for key, value in event.get("headers", [])
},
),
routing_key=event["address"],
)


class Server:
def __init__(
self,
app: AMGIApplication,
queue: str,
url: str,
durable: bool = True,
) -> None:
self._app = app
self._queue = queue
self._url = url
self._durable = durable
self._stop_event = Event()
self._tasks: set[Task[None]] = set()
self._connection: Optional[AbstractRobustConnection] = None
self._channel: Optional[AbstractRobustChannel] = None
Comment on lines +89 to +90
Copy link
Contributor

@jackburridge jackburridge Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably doesn't need to be properties of the class, they can be instantiated in serve() and be passed around


async def _handle_message(self, message: IncomingMessage) -> None:
scope: MessageScope = {
"type": "message",
"amgi": {"version": "1.0", "spec_version": "1.0"},
"address": self._queue,
}

try:
await self._app(
scope, _MessageReceive(message), _MessageSend(self._channel)
)
await message.ack()
except Exception:
await message.nack(requeue=True)
Comment on lines +103 to +105
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a little confusion here based on the AMGI spec. The n/ack should be sent via the application, the _MessageSend class should handle an n/ack

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


async def serve(self) -> None:
self._connection = await connect_robust(self._url)
self._channel = await self._connection.channel()

queue = await self._channel.declare_queue(self._queue, durable=self._durable)

async with Lifespan(self._app) as state:
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if self._stop_event.is_set():
break
Comment on lines +114 to +117
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with queue.iterator() as queue_iter:
async for message in queue_iter:
if self._stop_event.is_set():
break
async with queue.iterator() as queue_iterator:
queue_aiter = aiter(queue_iterator)
async for message in self._stoppable.call(anext, queue_aiter):

I think Stoppable can be used here. I believe the coroutine waiting for the next message, and this is why the serve task is having to be cancelled in the tests. Stoppable should allow you to handle this cleanly

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


task = asyncio.create_task(self._handle_message(message))
self._tasks.add(task)
task.add_done_callback(self._tasks.discard)

await asyncio.gather(*self._tasks)

await self._channel.close()
await self._connection.close()

def stop(self) -> None:
self._stop_event.set()
Empty file.
Empty file.
Loading