From 27a698560da20772e02b8572843c12d546e65659 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 21 Jan 2026 18:03:22 +0000 Subject: [PATCH 1/4] feat(amgi-types): add support for extensions to the amgi message scope --- packages/amgi-types/src/amgi_types/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/amgi-types/src/amgi_types/__init__.py b/packages/amgi-types/src/amgi_types/__init__.py index aeaa45f..dce6dc8 100644 --- a/packages/amgi-types/src/amgi_types/__init__.py +++ b/packages/amgi-types/src/amgi_types/__init__.py @@ -23,6 +23,7 @@ class MessageScope(TypedDict): amgi: AMGIVersions address: str state: NotRequired[dict[str, Any]] + extensions: NotRequired[dict[str, dict[str, Any]]] class LifespanScope(TypedDict): From 17e113d4d5f4be74a3fbad295266e8efbdad8cc2 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 21 Jan 2026 18:04:38 +0000 Subject: [PATCH 2/4] feat(asyncfast): add support for the message.ack.out_of_order to allow for out of order handling --- packages/asyncfast/src/asyncfast/__init__.py | 83 ++++++++++++------- .../asyncfast/tests_asyncfast/test_message.py | 40 +++++++++ 2 files changed, 93 insertions(+), 30 deletions(-) diff --git a/packages/asyncfast/src/asyncfast/__init__.py b/packages/asyncfast/src/asyncfast/__init__.py index e7a8ea8..91be293 100644 --- a/packages/asyncfast/src/asyncfast/__init__.py +++ b/packages/asyncfast/src/asyncfast/__init__.py @@ -486,6 +486,17 @@ async def _handle_generator( exception = None +async def _receive_messages( + receive: AMGIReceiveCallable, +) -> AsyncGenerator[MessageReceiveEvent, None]: + more_messages = True + while more_messages: + message = await receive() + assert message["type"] == "message.receive" + yield message + more_messages = message.get("more_messages", False) + + class Channel: def __init__( @@ -566,36 +577,48 @@ async def __call__( send: AMGISendCallable, parameters: dict[str, str], ) -> None: - more_messages = True - while more_messages: - message = await receive() - if message["type"] != "message.receive": - continue - more_messages = message.get("more_messages", False) - try: - arguments = dict(self._generate_arguments(message, parameters, send)) - - if inspect.isasyncgenfunction(self._handler): - await _handle_async_generator(self._handler, arguments, send) - elif inspect.isgeneratorfunction(self._handler): - await _handle_generator(self._handler, arguments, send) - elif inspect.iscoroutinefunction(self._handler): - await self._handler(**arguments) - else: - await asyncio.to_thread(self._handler, **arguments) - - message_ack_event: MessageAckEvent = { - "type": "message.ack", - "id": message["id"], - } - await send(message_ack_event) - except Exception as e: - message_nack_event: MessageNackEvent = { - "type": "message.nack", - "id": message["id"], - "message": str(e), - } - await send(message_nack_event) + ack_out_of_order = "message.ack.out_of_order" in scope.get("extensions", {}) + if ack_out_of_order: + await asyncio.gather( + *[ + self._handle_message(message, parameters, send) + async for message in _receive_messages(receive) + ] + ) + else: + async for message in _receive_messages(receive): + await self._handle_message(message, parameters, send) + + async def _handle_message( + self, + message: MessageReceiveEvent, + parameters: dict[str, str], + send: AMGISendCallable, + ) -> None: + try: + arguments = dict(self._generate_arguments(message, parameters, send)) + + if inspect.isasyncgenfunction(self._handler): + await _handle_async_generator(self._handler, arguments, send) + elif inspect.isgeneratorfunction(self._handler): + await _handle_generator(self._handler, arguments, send) + elif inspect.iscoroutinefunction(self._handler): + await self._handler(**arguments) + else: + await asyncio.to_thread(self._handler, **arguments) + + message_ack_event: MessageAckEvent = { + "type": "message.ack", + "id": message["id"], + } + await send(message_ack_event) + except Exception as e: + message_nack_event: MessageNackEvent = { + "type": "message.nack", + "id": message["id"], + "message": str(e), + } + await send(message_nack_event) def _generate_arguments( self, diff --git a/packages/asyncfast/tests_asyncfast/test_message.py b/packages/asyncfast/tests_asyncfast/test_message.py index 8511d7e..e36e05d 100644 --- a/packages/asyncfast/tests_asyncfast/test_message.py +++ b/packages/asyncfast/tests_asyncfast/test_message.py @@ -1,3 +1,4 @@ +from asyncio import Event from collections.abc import AsyncGenerator from collections.abc import Generator from collections.abc import Iterable @@ -884,3 +885,42 @@ async def topic_handler(message_sender: MessageSender[SendMessage]) -> None: ) ] ) + + +async def test_message_ack_out_of_order() -> None: + app = AsyncFast() + + received = set() + block_event = Event() + + @app.channel("topic") + async def topic_handler(i: int) -> None: + received.add(i) + if received == {1, 2}: + block_event.set() + await block_event.wait() + + message_scope: MessageScope = { + "type": "message", + "amgi": {"version": "1.0", "spec_version": "1.0"}, + "address": "topic", + "extensions": {"message.ack.out_of_order": {}}, + } + message_receive_event1: MessageReceiveEvent = { + "type": "message.receive", + "id": "id-1", + "headers": [], + "payload": b"1", + "more_messages": True, + } + message_receive_event2: MessageReceiveEvent = { + "type": "message.receive", + "id": "id-2", + "payload": b"2", + "headers": [], + } + await app( + message_scope, + AsyncMock(side_effect=[message_receive_event1, message_receive_event2]), + AsyncMock(), + ) From f1254288e073dc0b9dda4a1d85fa0693edf4c2ae Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 21 Jan 2026 18:05:52 +0000 Subject: [PATCH 3/4] feat(amgi-sqs-event-source-mapping): add the message.ack.out_of_order extension to message scope so messages can be handled out of order --- .../src/amgi_sqs_event_source_mapping/__init__.py | 1 + .../tests_amgi_sqs_event_source_mapping/test_sqs_handler.py | 5 +++++ .../test_sqs_handler_integration.py | 4 ++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py index 7cf55d4..06fd338 100644 --- a/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py +++ b/packages/amgi-sqs-event-source-mapping/src/amgi_sqs_event_source_mapping/__init__.py @@ -293,6 +293,7 @@ async def _call_source_batch( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": event_source_arn_match["queue"], "state": self._state.copy(), + "extensions": {"message.ack.out_of_order": {}}, } records_send = _Send(self._queue_url_cache, self._send_batcher, message_ids) diff --git a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler.py b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler.py index 491c0cb..958c899 100644 --- a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler.py +++ b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler.py @@ -107,6 +107,7 @@ async def test_sqs_handler_records(app: MockApp, sqs_handler: SqsHandler) -> Non "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": "my-queue", "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } assert await receive() == { @@ -178,6 +179,7 @@ async def test_sqs_handler_record_nack(app: MockApp, sqs_handler: SqsHandler) -> "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": "my-queue", "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } assert await receive() == { @@ -243,6 +245,7 @@ async def test_sqs_handler_record_unacked( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": "my-queue", "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } assert await receive() == { @@ -300,6 +303,7 @@ async def test_sqs_handler_record_message_attribute_binary_value( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": "my-queue", "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } assert await receive() == { @@ -415,6 +419,7 @@ async def test_lifespan() -> None: "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": "my-queue", "state": {"item": state_item}, + "extensions": {"message.ack.out_of_order": {}}, } await call_task diff --git a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler_integration.py b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler_integration.py index 2684569..1917f1f 100644 --- a/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler_integration.py +++ b/packages/amgi-sqs-event-source-mapping/tests_amgi_sqs_event_source_mapping/test_sqs_handler_integration.py @@ -11,8 +11,8 @@ @pytest.fixture(scope="module") async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: - with LocalStackContainer( - image="localstack/localstack:4.9.2" + with LocalStackContainer(image="localstack/localstack:4.9.2").with_services( + "sqs" ) as localstack_container: yield localstack_container From c5db5ac36086a61f6635692874234e03082e6a4b Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Wed, 21 Jan 2026 18:06:38 +0000 Subject: [PATCH 4/4] feat(amgi-aiobotocore): add the message.ack.out_of_order extension to message scope so messages can be handled out of order --- packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py | 1 + .../tests_amgi_aiobotocore/test_sqs_message_integration.py | 7 +++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py b/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py index 2d47a9d..605defc 100644 --- a/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py +++ b/packages/amgi-aiobotocore/src/amgi_aiobotocore/sqs.py @@ -305,6 +305,7 @@ async def _queue_loop( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": queue_name, "state": state.copy(), + "extensions": {"message.ack.out_of_order": {}}, } await self._app( scope, diff --git a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py index c5d2187..e1de592 100644 --- a/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py +++ b/packages/amgi-aiobotocore/tests_amgi_aiobotocore/test_sqs_message_integration.py @@ -22,8 +22,8 @@ def __eq__(self, other: Any) -> bool: @pytest.fixture(scope="module") async def localstack_container() -> AsyncGenerator[LocalStackContainer, None]: - with LocalStackContainer( - image="localstack/localstack:4.9.2" + with LocalStackContainer(image="localstack/localstack:4.9.2").with_services( + "sqs" ) as localstack_container: yield localstack_container @@ -89,6 +89,7 @@ async def test_message( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": queue_name, "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } message_receive = await receive() @@ -131,6 +132,7 @@ async def test_message_nack( "amgi": {"version": "1.0", "spec_version": "1.0"}, "address": queue_name, "state": {}, + "extensions": {"message.ack.out_of_order": {}}, } message_receive = await receive() @@ -290,6 +292,7 @@ async def test_lifespan( "amgi": {"spec_version": "1.0", "version": "1.0"}, "type": "message", "state": {"item": state_item}, + "extensions": {"message.ack.out_of_order": {}}, }