From 81745571cd8b413188979971ca5ba76539b18e26 Mon Sep 17 00:00:00 2001 From: "jack.burridge" Date: Thu, 22 Jan 2026 22:59:17 +0000 Subject: [PATCH] docs(amgi): update amgi specification docs to include extensions and more details on lifespan also add typeddict directive to automatically generate message and scope documentation --- .pre-commit-config.yaml | 2 +- conf.py | 5 + docs/_ext/typeddict.py | 314 ++++++++++++++++++ docs/extensions.rst | 29 ++ docs/index.rst | 2 +- docs/introduction.rst | 3 - docs/specifications/lifespan.rst | 110 +++++- docs/specifications/message.rst | 82 +++-- .../amgi-types/src/amgi_types/__init__.py | 55 +++ 9 files changed, 553 insertions(+), 49 deletions(-) create mode 100644 docs/_ext/typeddict.py create mode 100644 docs/extensions.rst delete mode 100644 docs/introduction.rst diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 2c19fdd..f560d13 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,7 +11,7 @@ repos: rev: v1.19.0 hooks: - id: mypy - additional_dependencies: ["pydantic==2.11.7", "types-docutils"] + additional_dependencies: ["pydantic==2.11.7", "types-docutils", "sphinx>=7.4.7"] - repo: https://github.com/PyCQA/autoflake rev: v2.3.1 hooks: diff --git a/conf.py b/conf.py index 79bd029..3940f61 100644 --- a/conf.py +++ b/conf.py @@ -47,5 +47,10 @@ ) extensions += ["async_fast_example"] +if current_project == "amgi": + sys.path.append(str((Path(".") / "docs" / "_ext").resolve())) + + extensions += ["typeddict"] + html_theme = "furo" diff --git a/docs/_ext/typeddict.py b/docs/_ext/typeddict.py new file mode 100644 index 0000000..8910bb9 --- /dev/null +++ b/docs/_ext/typeddict.py @@ -0,0 +1,314 @@ +from __future__ import annotations + +import importlib +from collections.abc import Iterable +from collections.abc import Iterator +from types import NoneType +from types import UnionType +from typing import Any +from typing import get_args +from typing import get_origin +from typing import get_type_hints +from typing import Literal +from typing import Optional +from typing import TypeGuard +from typing import Union + +from docutils import nodes +from docutils.parsers.rst import Directive +from docutils.statemachine import StringList +from sphinx.addnodes import pending_xref +from sphinx.application import Sphinx +from sphinx.util.typing import ExtensionMetadata + +try: + from typing import is_typeddict +except ImportError: + is_typeddict = None # type: ignore[assignment] + + +def _import_python_object(dotted_path: str) -> Any: + module_path, _, object_name = dotted_path.rpartition(".") + + if not module_path or not object_name: + raise ValueError(f"Invalid dotted path: {dotted_path!r}") + + imported_module = importlib.import_module(module_path) + return getattr(imported_module, object_name) + + +def _is_typed_dict_class(candidate_object: Any) -> TypeGuard[type]: + if is_typeddict is not None: + return bool(is_typeddict(candidate_object)) + return ( + isinstance(candidate_object, type) + and hasattr(candidate_object, "__annotations__") + and hasattr(candidate_object, "__required_keys__") + and hasattr(candidate_object, "__optional_keys__") + ) + + +def _resolve_type_hints(typed_dict_class: type) -> dict[str, Any]: + try: + return get_type_hints(typed_dict_class, include_extras=True) + except TypeError: + return get_type_hints(typed_dict_class) + + +def _definition_order(typed_dict_class: type) -> Iterable[str]: + return (typed_dict_class.__annotations__ or {}).keys() + + +def _find_nested_typed_dict(type_hint: Any) -> type | None: + if _is_typed_dict_class(type_hint): + return type_hint + + for arg in get_args(type_hint) or (): + found = _find_nested_typed_dict(arg) + if found: + return found + + return None + + +def _is_literal_origin(origin_object: Any) -> bool: + if Literal is not None and origin_object is Literal: + return True + + module_name = getattr(origin_object, "__module__", None) + qual_name = getattr(origin_object, "__qualname__", None) or getattr( + origin_object, "__name__", None + ) + + return (module_name, qual_name) in { + ("typing", "Literal"), + ("typing_extensions", "Literal"), + } + + +def _is_optional_type(origin: Any, args: tuple[Any, ...]) -> bool: + if origin in (Union, UnionType): + return NoneType in args + return False + + +def _python_obj_ref(obj: Any, label: str) -> nodes.Node: + if obj.__module__ == "builtins": + target = obj.__name__ + elif obj.__module__ == "typing_extensions": + target = f"typing.{obj.__qualname__}" + else: + target = f"{obj.__module__}.{obj.__qualname__}" + + return pending_xref( + "", + nodes.literal(text=label), + refdomain="py", + reftype="obj", + reftarget=target, + refexplicit=True, + ) + + +def _yield_type_nodes(type_hint: Any) -> Iterator[nodes.Node]: + origin = get_origin(type_hint) + args = get_args(type_hint) + + if origin is None: + display_name = getattr(type_hint, "__qualname__", None) or getattr( + type_hint, "__name__", None + ) + if display_name: + yield _python_obj_ref(type_hint, display_name) + else: + yield nodes.literal(text=repr(type_hint)) + return + + origin_display_name = ( + getattr(origin, "__qualname__", None) + or getattr(origin, "__name__", None) + or str(origin) + ) + if _is_optional_type(origin, args): + yield _python_obj_ref(Optional, "Optional") + else: + yield _python_obj_ref(origin, origin_display_name) + yield nodes.Text("[") + + if _is_literal_origin(origin): + for index, literal_value in enumerate(args): + if index: + yield nodes.Text(", ") + yield nodes.literal(text=repr(literal_value)) + yield nodes.Text("]") + return + + filtered_args = ( + filter(lambda arg: arg is not NoneType, args) + if _is_optional_type(origin, args) + else args + ) + + for index, argument in enumerate(filtered_args): + if index: + yield nodes.Text(", ") + yield from _yield_type_nodes(argument) + + yield nodes.Text("]") + + +def _format_accessor(type_name: str, keys: Iterable[str]) -> str: + return type_name + "".join(f'["{k}"]' for k in keys) + + +def _nested_parse_rst(directive: Directive, text: str, source: str) -> list[nodes.Node]: + if not text.strip(): + return [] + + string_list = StringList() + for line in text.splitlines(): + string_list.append(line, source) + + container = nodes.container() + directive.state.nested_parse(string_list, directive.content_offset, container) + return list(container.children) + + +def _extract_docstring_var_nodes( + directive: Directive, typed_dict_class: type +) -> dict[str, list[nodes.Node]]: + doc = typed_dict_class.__doc__ or "" + if not doc.strip(): + return {} + + source = ( + f"" + ) + top_nodes = _nested_parse_rst(directive, doc, source) + + vars_nodes: dict[str, list[nodes.Node]] = {} + + for node in top_nodes: + for field in node.findall(nodes.field): + field_name_node = field[0] + field_body_node = field[1] + + parts = field_name_node.astext().strip().split(None, 1) + if len(parts) != 2: + continue + + tag, name = parts[0], parts[1].strip() + if tag != "var" or not name: + continue + + vars_nodes[name] = list(field_body_node.children) + + return vars_nodes + + +def _append_doc_nodes_to_list_item( + list_item: nodes.list_item, + paragraph: nodes.paragraph, + doc_nodes: list[nodes.Node], +) -> None: + if not doc_nodes: + return + + first, *rest = doc_nodes + + if isinstance(first, nodes.paragraph): + paragraph += nodes.Text(" – ") + paragraph.extend(child for child in first.children) + else: + rest = doc_nodes + + for extra in rest: + list_item += extra + + +def _yield_leaf_items( + typed_dict_class: type, + type_name: str, + parent_keys: tuple[str, ...], + directive: Directive, +) -> Iterator[nodes.list_item]: + resolved_hints = _resolve_type_hints(typed_dict_class) + + docstring_var_nodes = _extract_docstring_var_nodes(directive, typed_dict_class) + + for key in _definition_order(typed_dict_class): + raw_type_hint = resolved_hints.get(key, Any) + + full_path = parent_keys + (key,) + nested_typed_dict = _find_nested_typed_dict(raw_type_hint) + + if nested_typed_dict is not None: + yield from _yield_leaf_items( + typed_dict_class=nested_typed_dict, + type_name=type_name, + parent_keys=full_path, + directive=directive, + ) + continue + + list_item = nodes.list_item() + paragraph = nodes.paragraph() + + paragraph += nodes.literal(text=_format_accessor(type_name, full_path)) + paragraph += nodes.Text(" (") + + for type_node in _yield_type_nodes(raw_type_hint): + paragraph += type_node + + paragraph += nodes.Text(")") + + doc_nodes = docstring_var_nodes.get(key, []) + + list_item += paragraph + + _append_doc_nodes_to_list_item( + list_item, + paragraph, + doc_nodes, + ) + + yield list_item + + +class TypedDictDirective(Directive): + has_content = False + required_arguments = 1 + option_spec = {"type": str} + + def run(self) -> list[nodes.Node]: + dotted_path = self.arguments[0].strip() + typed_dict_class = _import_python_object(dotted_path) + + if not _is_typed_dict_class(typed_dict_class): + return [ + self.state_machine.reporter.error( + f"{dotted_path!r} is not a TypedDict", + line=self.lineno, + ) + ] + + type_name = self.options.get("type", "message") + + unordered_list = nodes.bullet_list() + for list_item in _yield_leaf_items( + typed_dict_class, + type_name, + (), + self, + ): + unordered_list += list_item + + return [unordered_list] + + +def setup(app: Sphinx) -> ExtensionMetadata: + app.add_directive("typeddict", TypedDictDirective) + return { + "parallel_read_safe": True, + "parallel_write_safe": True, + } diff --git a/docs/extensions.rst b/docs/extensions.rst new file mode 100644 index 0000000..0d6bf69 --- /dev/null +++ b/docs/extensions.rst @@ -0,0 +1,29 @@ +############ + Extensions +############ + +The ASGI specification provides for server-specific extensions to be used outside of the core ASGI specification. This +document specifies some common extensions. + +****************************** + Acknowledgement Out Of Order +****************************** + +This is sent by the server to indicate that acknowledgements can be sent out of order. + +.. code:: + + "scope": { + ... + "extensions": { + "message.ack.out_of_order": {} + } + } + +When this extension is present, the application MAY send ``message.ack`` and ``message.nack`` events for received +messages in any order. The server MUST accept out-of-order acknowledgements and MUST NOT treat them as a protocol error. + +When this extension is absent, applications SHOULD assume that acknowledgement ordering is constrained by the server and +MAY be required to follow message delivery order. + +This extension does not change the semantics of message delivery or batching, and only affects acknowledgement ordering. diff --git a/docs/index.rst b/docs/index.rst index ebcc3a8..54dfcf8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -8,7 +8,7 @@ the focus here is event-based applications. .. toctree:: :hidden: - introduction specifications + extensions .. _asgi: https://asgi.readthedocs.io/en/latest/ diff --git a/docs/introduction.rst b/docs/introduction.rst deleted file mode 100644 index 83639a9..0000000 --- a/docs/introduction.rst +++ /dev/null @@ -1,3 +0,0 @@ -############## - Introduction -############## diff --git a/docs/specifications/lifespan.rst b/docs/specifications/lifespan.rst index 0823429..38b7343 100644 --- a/docs/specifications/lifespan.rst +++ b/docs/specifications/lifespan.rst @@ -2,6 +2,112 @@ Lifespan Protocol ################### -The lifespan protocol follows that of ASGI_. +The Message AMGI lifespan sub-specification outlines how applications are started up and shut down within AMGI. -.. _asgi: https://asgi.readthedocs.io/en/latest/ +It follows the same semantics and event flow as the ASGI lifespan specification. This allows servers and applications to +coordinate resource allocation, background tasks, and graceful shutdown. + +A simple implementation would be: + +.. code:: python + + async def app(scope, receive, send): + if scope["type"] == "lifespan": + while True: + message = await receive() + if message["type"] == "lifespan.startup": + ... # Do some startup here! + await send({"type": "lifespan.startup.complete"}) + elif message["type"] == "lifespan.shutdown": + ... # Do some shutdown here! + await send({"type": "lifespan.shutdown.complete"}) + return + else: + pass # Handle other types + + +********** + Lifespan +********** + +A lifespan session has a single lifespan scope. Your application will be called exactly once for the duration of the +process lifetime. + +The lifespan scope information passed in ``scope`` contains: + +.. typeddict:: amgi_types.LifespanScope + :type: scope + +********************************************* + Lifespan startup - :py:func:`receive` event +********************************************* + +Sent to the application to indicate that the server is starting up. + +Applications should perform any required initialization at this point (for example: opening connections, warming caches, +or starting background tasks). + +Keys: + +.. typeddict:: amgi_types.LifespanStartupEvent + +*************************************************** + Lifespan startup complete - :py:func:`send` event +*************************************************** + +Sent by the application to signify that startup has completed successfully. + +Once this event is sent, the server may begin delivering messages to the application. + +Keys: + +.. typeddict:: amgi_types.LifespanStartupCompleteEvent + +************************************************* + Lifespan startup failed - :py:func:`send` event +************************************************* + +Sent by the application to signify that startup has failed. + +If this event is sent, the server must not start message processing and should terminate the application. + +Keys: + +.. typeddict:: amgi_types.LifespanStartupFailedEvent + +********************************************** + Lifespan shutdown - :py:func:`receive` event +********************************************** + +Sent to the application to indicate that the server is shutting down. + +Applications should begin graceful shutdown at this point (for example: stopping background tasks, closing connections, +and flushing buffers). + +Keys: + +.. typeddict:: amgi_types.LifespanShutdownEvent + +**************************************************** + Lifespan shutdown complete - :py:func:`send` event +**************************************************** + +Sent by the application to signify that shutdown has completed successfully. + +Once this event is sent, the server may safely terminate the application. + +Keys: + +.. typeddict:: amgi_types.LifespanShutdownCompleteEvent + +************************************************** + Lifespan shutdown failed - :py:func:`send` event +************************************************** + +Sent by the application to signify that shutdown has failed. + +The server should still terminate the application, but may log or surface the failure as appropriate. + +Keys: + +.. typeddict:: amgi_types.LifespanShutdownFailedEvent diff --git a/docs/specifications/message.rst b/docs/specifications/message.rst index 3eb4a42..7c4b418 100644 --- a/docs/specifications/message.rst +++ b/docs/specifications/message.rst @@ -7,6 +7,40 @@ The Message AMGI sub-specification outlines how messages are sent, and received It is deliberately designed to be agnostic where possible. Terminology is taken from AsyncAPI_ so as to follow their agnosticism. +A simple implementation would be: + +.. code:: python + + async def app(scope, receive, send): + if scope["type"] == "message": + more_messages = True + while more_messages: + message = await receive() + message_id = message["id"] + try: + headers = message["headers"] + payload = message.get("payload") + bindings = message.get("bindings", {}) + ... # Do some message handling here! + await send( + { + "type": "message.ack", + "id": message_id, + } + ) + except Exception as e: + await send( + { + "type": "message.nack", + "id": message_id, + "message": str(e), + } + ) + more_messages = message.get("more_messages") + else: + pass # Handle other types + + ********* Message ********* @@ -16,11 +50,8 @@ support batched consumption a batch of one message should be sent to the applica The message scope information passed in scope contains: -- ``scope["type"]`` (:py:obj:`str`) - ``"message"`` -- ``scope["amgi"]["version"]`` (:py:obj:`str`) - Version of the AMGI spec. -- ``scope["amgi"]["spec_version"]`` (:py:obj:`str`) - Version of the AMGI message spec this server understands. -- ``scope["address"]`` (:py:obj:`str`) - The address of the batch of messages, for example, in Kafka this would be the - topic. +.. typeddict:: amgi_types.MessageScope + :type: scope ******************************************** Receive message - :py:func:`receive` event @@ -30,23 +61,7 @@ Sent to the application to indicate an incoming message in the batch. Keys: -- ``message["type"]`` (:py:obj:`str`) – ``"message.receive"`` - -- ``message["id"]`` (:py:obj:`str`) - A unique id for the message, used to ack, or nack the message - -- ``message["headers"]`` (:py:obj:`~collections.abc.Iterable` [:py:obj:`tuple` [:py:obj:`bytes`, :py:obj:`bytes`]]) - - Includes the headers of the message. - -- ``message["payload"]`` (:py:obj:`~typing.NotRequired` [ :py:obj:`~typing.Optional` [:py:obj:`bytes`]]) - Payload of - the message, which can be :py:obj:`None` or :py:obj:`bytes`. If missing, it defaults to :py:obj:`None`. - -- ``message["bindings"]`` (:py:obj:`~typing.NotRequired` [:py:obj:`dict` [:py:obj:`str`, :py:obj:`dict` [:py:obj:`str`, - :py:obj:`~typing.Any`]]]) - Protocol specific bindings, for example, when receiving a Kafka message the bindings - could include the key: ``{"kafka": {"key": b"key"}}``. - -- ``message["more_messages"]`` (:py:obj:`~typing.NotRequired` [:py:obj:`bool`]) - Indicates there are more messages to - process in the batch. The application should keep receiving until it receives :py:obj:`False`. If missing it defaults - to :py:obj:`False`. +.. typeddict:: amgi_types.MessageReceiveEvent ********************************************** Response message ack - :py:func:`send` event @@ -56,8 +71,7 @@ Sent by the application to signify that it has successfully acknowledged a messa Keys: -- ``message["type"]`` (:py:obj:`str`) – ``"message.ack"`` -- ``message["id"]`` (:py:obj:`str`) - The unique id of the message +.. typeddict:: amgi_types.MessageAckEvent *********************************************** Response message nack - :py:func:`send` event @@ -67,9 +81,7 @@ Sent by the application to signify that it could not process a message. Keys: -- ``message["type"]`` (:py:obj:`str`) – ``"message.nack"`` -- ``message["id"]`` (:py:obj:`str`) - The unique id of the message -- ``message["message"]`` (:py:obj:`str`) - A message indicating why the message could not be processed +.. typeddict:: amgi_types.MessageNackEvent *********************************************** Response message send - :py:func:`send` event @@ -80,21 +92,7 @@ server-specific subclass of :py:obj:`OSError`. Keys: -- ``message["type"]`` (:py:obj:`str`) – ``"message.send"`` - -- ``message["address"]`` (:py:obj:`str`) – Address to send the message to - -- ``message["address"]`` (:py:obj:`str`) – Address to send the message to - -- ``message["headers"]`` (:py:obj:`~collections.abc.Iterable` [:py:obj:`tuple` [:py:obj:`bytes`, :py:obj:`bytes`]]) - - Headers of the message - -- ``message["payload"]`` (:py:obj:`~typing.NotRequired` [ :py:obj:`~typing.Optional` [:py:obj:`bytes`]]) - Payload of - the message, which can be :py:obj:`None`, or :py:obj:`bytes`. If missing, it defaults to :py:obj:`None`. - -- ``message["bindings"]`` (:py:obj:`~typing.NotRequired` [:py:obj:`dict` [:py:obj:`str`, :py:obj:`dict` [:py:obj:`str`, - :py:obj:`~typing.Any`]]]) - Protocol specific bindings to send. This can be bindings for multiple protocols, allowing - the server to decide to handle them, or ignore them. +.. typeddict:: amgi_types.MessageSendEvent ***************** Bindings Object diff --git a/packages/amgi-types/src/amgi_types/__init__.py b/packages/amgi-types/src/amgi_types/__init__.py index dce6dc8..56b43f1 100644 --- a/packages/amgi-types/src/amgi_types/__init__.py +++ b/packages/amgi-types/src/amgi_types/__init__.py @@ -14,11 +14,26 @@ class AMGIVersions(TypedDict): + """ + :var version: Version of the AMGI spec + :var spec_version: Version of the AMGI message spec this server understands- + """ + spec_version: str version: Literal["1.0"] class MessageScope(TypedDict): + """ + :var address: The address of the batch of messages, for example, in Kafka this would be the topic + :var state: + A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server + does not support this feature. + :var extensions: + Extensions allow AMGI servers to advertise optional capabilities to applications. Extensions are provided via + scope and are opt-in: applications MUST assume an extension is unsupported unless it is explicitly present. + """ + type: Literal["message"] amgi: AMGIVersions address: str @@ -27,6 +42,12 @@ class MessageScope(TypedDict): class LifespanScope(TypedDict): + """ + :var state: + An empty namespace where the application can persist state to be used when handling subsequent requests. + Optional; if missing the server does not support this feature. + """ + type: Literal["lifespan"] amgi: AMGIVersions state: NotRequired[dict[str, Any]] @@ -59,6 +80,20 @@ class LifespanShutdownFailedEvent(TypedDict): class MessageReceiveEvent(TypedDict): + """ + :var id: A unique id for the message, used to ack, or nack the message + :var headers: Includes the headers of the message + :var payload: + Payload of the message, which can be :py:obj:`None` or :py:obj:`bytes`. If missing, it defaults to + :py:obj:`None` + :var bindings: + Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key: + ``{"kafka": {"key": b"key"}}`` + :var more_messages: + Indicates there are more messages to process in the batch. The application should keep receiving until it + receives :py:obj:`False`. If missing it defaults to :py:obj:`False` + """ + type: Literal["message.receive"] id: str headers: Iterable[tuple[bytes, bytes]] @@ -68,17 +103,37 @@ class MessageReceiveEvent(TypedDict): class MessageAckEvent(TypedDict): + """ + :var id: The unique id of the message + """ + type: Literal["message.ack"] id: str class MessageNackEvent(TypedDict): + """ + :var id: The unique id of the message + :var message: A message indicating why the message could not be processed + """ + type: Literal["message.nack"] id: str message: str class MessageSendEvent(TypedDict): + """ + :var address: Address to send the message to + :var headers: Headers of the message + :var payload: + Payload of the message, which can be :py:obj:`None`, or :py:obj:`bytes`. If missing, it defaults to + :py:obj:`None`. + :var bindings: + Protocol specific bindings to send. This can be bindings for multiple protocols, allowing the server to decide + to handle them, or ignore them. + """ + type: Literal["message.send"] address: str headers: Iterable[tuple[bytes, bytes]]