From b64295ca6c6fc9909bb4c5c63cc228f15e07c0a0 Mon Sep 17 00:00:00 2001 From: Domenico Nappo Date: Thu, 1 Jan 2026 15:23:37 +0100 Subject: [PATCH] Add docstrings; prepare release 0.1.2 --- README.md | 5 +- docs/source/api.rst | 35 ++++++-- docs/source/quick_start.md | 26 +++--- protobunny/__init__.py | 134 ++++++++++++++++++++++++------ protobunny/__init__.py.j2 | 132 +++++++++++++++++++++++------ protobunny/asyncio/__init__.py | 133 +++++++++++++++++++++-------- protobunny/asyncio/__init__.py.j2 | 134 ++++++++++++++++++++++-------- protobunny/wrapper.py | 4 +- pyproject.toml | 14 +++- setup.py | 2 +- uv.lock | 2 +- 11 files changed, 474 insertions(+), 147 deletions(-) diff --git a/README.md b/README.md index 2a28490..8782370 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # Protobunny > [!WARNING] -> The project is in early development. +> The project is in early development. +> The core functionality for the available backends is implemented and tested, +> but the public API may undergo breaking changes before the 1.0 release. Protobunny is the open-source evolution of [AM-Flow](https://am-flow.com)'s internal messaging library. @@ -95,6 +97,7 @@ Documentation home page: [https://am-flow.github.io/protobunny/](https://am-flow - [x] **Result workflow**: Subscribe to results topics and receive protobunny `Result` messages produced by your callbacks. - [x] **Cloud-Native**: NATS (Core & JetStream) integration. - [ ] **Cloud Providers**: AWS (SQS/SNS) and GCP Pub/Sub. +- [ ] **OpenTelemetry** Integration (Planned) - [ ] **More backends**: Kafka support. - [ ] **gRPC** Direct Call support diff --git a/docs/source/api.rst b/docs/source/api.rst index c8fe636..61c9087 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -1,13 +1,17 @@ API Reference ============= -Core Package ------------- +Core Package Async +------------------ - .. automodule:: protobunny - :members: .. automodule:: protobunny.asyncio :members: + +Core Package Sync +----------------- + + .. automodule:: protobunny + :members: :no-index: Models @@ -48,6 +52,17 @@ Redis aio backend :members: :no-index: + +NATS aio backend +----------------- + .. automodule:: protobunny.asyncio.backends.nats + :members: + .. automodule:: protobunny.asyncio.backends.nats.connection + :members: + .. automodule:: protobunny.asyncio.backends.nats.queues + :members: + :no-index: + Python aio backend ------------------ .. automodule:: protobunny.asyncio.backends.python @@ -91,6 +106,17 @@ Redis backend :members: :no-index: +NATS backend +---------------- + .. automodule:: protobunny.backends.nats + :members: + .. automodule:: protobunny.backends.nats.connection + :members: + .. automodule:: protobunny.backends.nats.queues + :members: + :no-index: + + Python backend ---------------- .. automodule:: protobunny.backends.python @@ -113,4 +139,3 @@ protobunny utility ------------------ .. automodule:: protobunny.wrapper :members: - :undoc-members: diff --git a/docs/source/quick_start.md b/docs/source/quick_start.md index 8b083b8..982259c 100644 --- a/docs/source/quick_start.md +++ b/docs/source/quick_start.md @@ -9,7 +9,7 @@ For a full example, using FastAPI, Redis, and protobunny, see [this repo](https: ## Setup ### pyproject.toml -Add `protobunny` to your `pyproject.toml` dependencies: +Add `protobunny` to your `pyproject.toml` dependencies (add the backend you need as extra dependency): ```shell uv add protobunny[rabbitmq, numpy] @@ -325,7 +325,7 @@ messages-directory = "messages" messages-prefix = "acme" generated-package-name = "mymessagelib" generated-package-root = "codegen" -backend = "rabbitmq" +backend = "rabbitmq" # configure here the backend (choose between rabbitmq, redis, mosquitto, nats, python) mode = "async" ``` @@ -376,22 +376,19 @@ You should find the generated classes under `codegen/mymessagelib`. ### Python code to test the library ```python - import asyncio import logging -import sys +import time import protobunny as pb from protobunny import asyncio as pb_asyncio - -# sys.path.append(pb.default_configuration.generated_package_root) -# this is needed when the python classes for your lib are generated in a subfolder -# that is not in the namespace and it must not be treated as a package -# It's just a sys.path.append("./codegen") -pb.config_lib() +# # sys.path.append(pb.default_configuration.generated_package_root) +# sys.path.append("./codegen") +pb.config_lib() # this is needed when the python classes for your lib are generated in a subfolder import mymessagelib as ml + logging.basicConfig( level=logging.INFO, format="[%(asctime)s %(levelname)s] %(name)s - %(message)s" ) @@ -411,13 +408,16 @@ class TestLibAsync: async def worker1(self, task: ml.main.tasks.TaskMessage) -> None: log.info("1- Working on: %s", task) + await asyncio.sleep(0.1) async def worker2(self, task: ml.main.tasks.TaskMessage) -> None: log.info("2- Working on: %s", task) + await asyncio.sleep(0.1) async def on_message_mymessage(self, message: ml.main.MyMessage) -> None: log.info("Got main message: %s", message) + def run_forever(self): asyncio.run(self.main()) @@ -425,6 +425,7 @@ class TestLibAsync: log.info(f"LOG {incoming.routing_key}: {body}") async def main(self): + await pb_asyncio.subscribe_logger(self.log_callback) await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker1) await pb_asyncio.subscribe(ml.main.tasks.TaskMessage, self.worker2) @@ -454,9 +455,11 @@ class TestLib: def worker1(self, task: ml.main.tasks.TaskMessage) -> None: log.info("1- Working on: %s", task) + time.sleep(0.1) def worker2(self, task: ml.main.tasks.TaskMessage) -> None: log.info("2- Working on: %s", task) + time.sleep(0.1) def log_callback(self, incoming, body) -> None: log.info(f"LOG {incoming.routing_key}: {body}") @@ -478,7 +481,8 @@ class TestLib: if __name__ == "__main__": - if conf.use_async: + config = pb.config + if config.use_async: log.info("Using async") testlib = TestLibAsync() pb_asyncio.run_forever(testlib.main) diff --git a/protobunny/__init__.py b/protobunny/__init__.py index 44d5827..c4107e9 100644 --- a/protobunny/__init__.py +++ b/protobunny/__init__.py @@ -66,8 +66,18 @@ def connect(**kwargs: tp.Any) -> "BaseSyncConnection": - """Connect teh backend and get the singleton async connection. - You can pass the specific keyword arguments that you would pass to the `connect` function of the configured backend. + """Establishes a connection to the configured messaging broker. + + This method initializes and returns a singleton connection. Subsequent calls + return the existing connection instance unless it has been explicitly + disconnected. + + Args: + **kwargs: Backend-specific connection arguments (e.g., host, port, + credentials, or protocol-specific tuning parameters). + + Returns: + BaseSyncConnection: The active connection singleton. """ connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST, **kwargs) @@ -75,13 +85,18 @@ def connect(**kwargs: tp.Any) -> "BaseSyncConnection": def disconnect() -> None: + """Closes the active connection to the broker. + + Gracefully terminates heartbeats and background networking tasks. Safe to + call if no connection is active. + """ connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST) conn.disconnect() def reset_connection(**kwargs: tp.Any) -> "BaseSyncConnection": - """Reset the singleton connection.""" + """Resets the singleton connection and returns it.""" connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST) conn.disconnect() @@ -89,13 +104,15 @@ def reset_connection(**kwargs: tp.Any) -> "BaseSyncConnection": def publish(message: "PBM") -> None: - """Synchronously publish a message to its corresponding queue. + """Publishes a Protobuf message to its corresponding topic. - This method automatically determines the correct topic based on the - protobuf message type. + The destination topic is automatically derived from the message class and + package structure. Messages within a '.tasks' package are automatically + treated as persistent tasks requiring reliable delivery and queuing logic + across all supported backends. Args: - message: The Protobuf message instance to be published. + message: An instance of a class derived from ProtoBunnyMessage. """ queue = get_queue(message) queue.publish(message) @@ -104,13 +121,13 @@ def publish(message: "PBM") -> None: def publish_result( result: "Result", topic: str | None = None, correlation_id: str | None = None ) -> None: - """Publish the result message to the result topic of the source message + """Publishes a processing result to be consumed by results subscribers (See subscribe_results). Args: - result: a Result instance. - topic: The topic to send the message to. - Default to the source message result topic (e.g. "pb.vision.ExtractFeature.result") - correlation_id: + result: The Result object containing the response payload and source message from which the Result was generated. + topic: Optional override for the result topic. Defaults to the + automatically generated '.result' topic associated with the source message. + correlation_id: Optional ID used to link this result to a specific request. """ queue = get_queue(result.source) queue.publish_result(result, topic, correlation_id) @@ -120,14 +137,19 @@ def subscribe( pkg_or_msg: "type[PBM] | ModuleType", callback: "SyncCallback", ) -> "BaseSyncQueue": - """Subscribe a callback function to the topic. + """Registers a callback to consume messages from a specific topic or package. + + If a message class is provided, subscribes to that specific topic. If a + module is provided, subscribes to all message types defined within that module. + For shared tasks (identified by the '.tasks' convention), Protobunny + automatically manages shared consumer groups and load balancing. Args: - pkg_or_msg: The topic to subscribe to as message class or module. - callback: The callback function that consumes the received message. + pkg_or_msg: The message class (type[PBM]) or module to subscribe to. + callback: The function to execute when a message is received. Returns: - The Queue object. You can access the subscription via its `subscription` attribute. + BaseSyncQueue: The queue object managing the active subscription. """ register_key = str(pkg_or_msg) @@ -149,11 +171,17 @@ def subscribe_results( pkg: "type[PBM] | ModuleType", callback: "SyncCallback", ) -> "BaseSyncQueue": - """Subscribe a callback function to the result topic. + """Subscribes to result topics for a specific message type or package. + + Used by services that need to listen for completion signals or data + returned by workers processing specific message types. Args: - pkg: - callback: + pkg: The message class or module whose results should be monitored. + callback: The function to execute when a result message is received. + + Returns: + BaseSyncQueue: The queue object managing the result subscription. """ queue = get_queue(pkg) queue.subscribe_results(callback) @@ -168,7 +196,13 @@ def unsubscribe( if_unused: bool = True, if_empty: bool = True, ) -> None: - """Remove a subscription for a message/package""" + """Cancels an active subscription for a specific message type or package. + + Args: + pkg: The message class or module to unsubscribe from. + if_unused: If True, only unsubscribes if no other callbacks are attached. + if_empty: If True, only unsubscribes if the buffer is empty. + """ module_name = pkg.__module__ if hasattr(pkg, "__module__") else pkg.__name__ registry_key = registry.get_key(pkg) @@ -188,7 +222,11 @@ def unsubscribe( def unsubscribe_results( pkg: "type[PBM] | ModuleType", ) -> None: - """Remove all in-process subscriptions for a message/package result topic""" + """Remove all in-process subscriptions for a message results topic + + Args: + pkg: The message class or module to unsubscribe from its results topic. + """ with registry.sync_lock: queue = registry.unregister_results(pkg) if queue: @@ -196,11 +234,14 @@ def unsubscribe_results( def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: - """ - Remove all active in-process subscriptions. + """Stops all message consumption by canceling every in-process subscription. - This clears standard subscriptions, result subscriptions, and task - subscriptions, effectively stopping all message consumption for this process. + Clears standard subscriptions, result listeners, and task workers. Typically + invoked during graceful application shutdown. + + Args: + if_unused: Policy for evaluating unused standard queues. + if_empty: Policy for evaluating empty standard queues. """ with registry.sync_lock: queues = registry.get_all_subscriptions() @@ -218,16 +259,34 @@ def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: def get_message_count( - msg_type: "PBM | type[PBM] | ModuleType", + msg_type: "PBM | type[PBM]", ) -> int | None: + """Retrieves the current number of pending messages in a queue. + + Args: + msg_type: The message instance or class + + Returns: + int | None: The count of messages waiting to be processed, or None + if the backend does not support count retrieval for this queue type. + """ q = get_queue(msg_type) count = q.get_message_count() return count def get_consumer_count( - msg_type: "PBM | type[PBM] | ModuleType", + msg_type: "PBM | type[PBM]", ) -> int | None: + """Retrieves the number of active consumers currently attached to a shared (aka tasks) queue. + + Args: + msg_type: The message instance or class representing the queue. + + Returns: + int | None: The number of active subscribers/workers, or None if + unsupported by the current backend. + """ q = get_queue(msg_type) count = q.get_consumer_count() return count @@ -260,6 +319,20 @@ def _prepare_logger_queue( def subscribe_logger( log_callback: "LoggerCallback | None" = None, prefix: str | None = None ) -> "LoggingSyncQueue": + """Subscribes a specialized logging callback to monitor message traffic. + + This creates a logging-specific queue that captures and logs metadata + for messages matching the optional prefix. + + Args: + log_callback: A custom function to handle log messages. Defaults + to `default_log_callback` (logs routing key, cid, and content). + prefix: An optional subject/topic prefix to filter which messages + are logged. + + Returns: + LoggingSyncQueue: The specialized queue object for logging. + """ resolved_callback = log_callback or default_log_callback queue, cb = LoggingSyncQueue(prefix), resolved_callback queue.subscribe(cb) @@ -267,6 +340,13 @@ def subscribe_logger( def run_forever() -> None: + """Blocks the main thread to maintain active message consumption. + + Installs signal handlers for SIGINT and SIGTERM to trigger an orderly + shutdown, ensuring all subscriptions are canceled and the connection + is closed before the process exits. + """ + def shutdown(signum: int, _: FrameType | None) -> None: log.info("Shutting down protobunny connections %s", signal.Signals(signum).name) unsubscribe_all() diff --git a/protobunny/__init__.py.j2 b/protobunny/__init__.py.j2 index 2654cda..055c997 100644 --- a/protobunny/__init__.py.j2 +++ b/protobunny/__init__.py.j2 @@ -70,8 +70,18 @@ log = logging.getLogger(PACKAGE_NAME) def connect(**kwargs: tp.Any) -> "BaseSyncConnection": - """Connect teh backend and get the singleton async connection. - You can pass the specific keyword arguments that you would pass to the `connect` function of the configured backend. + """Establishes a connection to the configured messaging broker. + + This method initializes and returns a singleton connection. Subsequent calls + return the existing connection instance unless it has been explicitly + disconnected. + + Args: + **kwargs: Backend-specific connection arguments (e.g., host, port, + credentials, or protocol-specific tuning parameters). + + Returns: + BaseSyncConnection: The active connection singleton. """ connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST, **kwargs) @@ -79,13 +89,18 @@ def connect(**kwargs: tp.Any) -> "BaseSyncConnection": def disconnect() -> None: + """Closes the active connection to the broker. + + Gracefully terminates heartbeats and background networking tasks. Safe to + call if no connection is active. + """ connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST) conn.disconnect() def reset_connection(**kwargs: tp.Any) -> "BaseSyncConnection": - """Reset the singleton connection.""" + """Resets the singleton connection and returns it.""" connection_module = get_backend().connection conn = connection_module.Connection.get_connection(vhost=connection_module.VHOST) conn.disconnect() @@ -93,13 +108,15 @@ def reset_connection(**kwargs: tp.Any) -> "BaseSyncConnection": def publish(message: "PBM") -> None: - """Synchronously publish a message to its corresponding queue. + """Publishes a Protobuf message to its corresponding topic. - This method automatically determines the correct topic based on the - protobuf message type. + The destination topic is automatically derived from the message class and + package structure. Messages within a '.tasks' package are automatically + treated as persistent tasks requiring reliable delivery and queuing logic + across all supported backends. Args: - message: The Protobuf message instance to be published. + message: An instance of a class derived from ProtoBunnyMessage. """ queue = get_queue(message) queue.publish(message) @@ -108,13 +125,13 @@ def publish(message: "PBM") -> None: def publish_result( result: "Result", topic: str | None = None, correlation_id: str | None = None ) -> None: - """Publish the result message to the result topic of the source message + """Publishes a processing result to be consumed by results subscribers (See subscribe_results). Args: - result: a Result instance. - topic: The topic to send the message to. - Default to the source message result topic (e.g. "pb.vision.ExtractFeature.result") - correlation_id: + result: The Result object containing the response payload and source message from which the Result was generated. + topic: Optional override for the result topic. Defaults to the + automatically generated '.result' topic associated with the source message. + correlation_id: Optional ID used to link this result to a specific request. """ queue = get_queue(result.source) queue.publish_result(result, topic, correlation_id) @@ -124,14 +141,19 @@ def subscribe( pkg_or_msg: "type[PBM] | ModuleType", callback: "SyncCallback", ) -> "BaseSyncQueue": - """Subscribe a callback function to the topic. + """Registers a callback to consume messages from a specific topic or package. + + If a message class is provided, subscribes to that specific topic. If a + module is provided, subscribes to all message types defined within that module. + For shared tasks (identified by the '.tasks' convention), Protobunny + automatically manages shared consumer groups and load balancing. Args: - pkg_or_msg: The topic to subscribe to as message class or module. - callback: The callback function that consumes the received message. + pkg_or_msg: The message class (type[PBM]) or module to subscribe to. + callback: The function to execute when a message is received. Returns: - The Queue object. You can access the subscription via its `subscription` attribute. + BaseSyncQueue: The queue object managing the active subscription. """ register_key = str(pkg_or_msg) @@ -153,11 +175,17 @@ def subscribe_results( pkg: "type[PBM] | ModuleType", callback: "SyncCallback", ) -> "BaseSyncQueue": - """Subscribe a callback function to the result topic. + """Subscribes to result topics for a specific message type or package. + + Used by services that need to listen for completion signals or data + returned by workers processing specific message types. Args: - pkg: - callback: + pkg: The message class or module whose results should be monitored. + callback: The function to execute when a result message is received. + + Returns: + BaseSyncQueue: The queue object managing the result subscription. """ queue = get_queue(pkg) queue.subscribe_results(callback) @@ -172,7 +200,13 @@ def unsubscribe( if_unused: bool = True, if_empty: bool = True, ) -> None: - """Remove a subscription for a message/package""" + """Cancels an active subscription for a specific message type or package. + + Args: + pkg: The message class or module to unsubscribe from. + if_unused: If True, only unsubscribes if no other callbacks are attached. + if_empty: If True, only unsubscribes if the buffer is empty. + """ module_name = pkg.__module__ if hasattr(pkg, "__module__") else pkg.__name__ registry_key = registry.get_key(pkg) @@ -192,7 +226,11 @@ def unsubscribe( def unsubscribe_results( pkg: "type[PBM] | ModuleType", ) -> None: - """Remove all in-process subscriptions for a message/package result topic""" + """Remove all in-process subscriptions for a message results topic + + Args: + pkg: The message class or module to unsubscribe from its results topic. + """ with registry.sync_lock: queue = registry.unregister_results(pkg) if queue: @@ -200,11 +238,14 @@ def unsubscribe_results( def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: - """ - Remove all active in-process subscriptions. + """Stops all message consumption by canceling every in-process subscription. + + Clears standard subscriptions, result listeners, and task workers. Typically + invoked during graceful application shutdown. - This clears standard subscriptions, result subscriptions, and task - subscriptions, effectively stopping all message consumption for this process. + Args: + if_unused: Policy for evaluating unused standard queues. + if_empty: Policy for evaluating empty standard queues. """ with registry.sync_lock: queues = registry.get_all_subscriptions() @@ -222,16 +263,34 @@ def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: def get_message_count( - msg_type: "PBM | type[PBM] | ModuleType", + msg_type: "PBM | type[PBM]", ) -> int | None: + """Retrieves the current number of pending messages in a queue. + + Args: + msg_type: The message instance or class + + Returns: + int | None: The count of messages waiting to be processed, or None + if the backend does not support count retrieval for this queue type. + """ q = get_queue(msg_type) count = q.get_message_count() return count def get_consumer_count( - msg_type: "PBM | type[PBM] | ModuleType", + msg_type: "PBM | type[PBM]", ) -> int | None: + """Retrieves the number of active consumers currently attached to a shared (aka tasks) queue. + + Args: + msg_type: The message instance or class representing the queue. + + Returns: + int | None: The number of active subscribers/workers, or None if + unsupported by the current backend. + """ q = get_queue(msg_type) count = q.get_consumer_count() return count @@ -262,6 +321,20 @@ def _prepare_logger_queue( def subscribe_logger( log_callback: "LoggerCallback | None" = None, prefix: str | None = None ) -> "LoggingSyncQueue": + """Subscribes a specialized logging callback to monitor message traffic. + + This creates a logging-specific queue that captures and logs metadata + for messages matching the optional prefix. + + Args: + log_callback: A custom function to handle log messages. Defaults + to `default_log_callback` (logs routing key, cid, and content). + prefix: An optional subject/topic prefix to filter which messages + are logged. + + Returns: + LoggingSyncQueue: The specialized queue object for logging. + """ resolved_callback = log_callback or default_log_callback queue, cb = LoggingSyncQueue(prefix), resolved_callback queue.subscribe(cb) @@ -269,7 +342,12 @@ def subscribe_logger( def run_forever() -> None: + """Blocks the main thread to maintain active message consumption. + Installs signal handlers for SIGINT and SIGTERM to trigger an orderly + shutdown, ensuring all subscriptions are canceled and the connection + is closed before the process exits. + """ def shutdown(signum: int, _: FrameType | None) -> None: log.info("Shutting down protobunny connections %s", signal.Signals(signum).name) unsubscribe_all() diff --git a/protobunny/asyncio/__init__.py b/protobunny/asyncio/__init__.py index 0d5d879..c22b1d0 100644 --- a/protobunny/asyncio/__init__.py +++ b/protobunny/asyncio/__init__.py @@ -1,15 +1,8 @@ """ -A module providing support for messaging and communication using RabbitMQ as the backend. +A module providing async support for messaging and communication using the configured broker as the backend. This module includes functionality for publishing, subscribing, and managing message queues, -as well as dynamically managing imports and configurations for RabbitMQ-based communication -logics. It enables both synchronous and asynchronous operations, while also supporting -connection resetting and management. - -Modules and functionality are primarily imported from the core RabbitMQ backend, dynamically -generated package-specific configurations, and other base utilities. Exports are adjusted -as per the backend configuration. - +as well as dynamically managing imports and configurations for the backend. """ __all__ = [ @@ -82,7 +75,18 @@ async def connect(**kwargs) -> "BaseAsyncConnection": - """Get the singleton async connection.""" + """Establishes an asynchronous connection to the configured messaging broker. + + This method initializes and returns a singleton async connection. Subsequent + calls return the existing connection instance unless it has been disconnected. + + Args: + **kwargs: Backend-specific connection arguments (e.g., host, port, + credentials, or protocol-specific tuning parameters). + + Returns: + BaseAsyncConnection: The active asynchronous connection singleton. + """ connection_module = get_backend().connection conn = await connection_module.Connection.get_connection( vhost=connection_module.VHOST, **kwargs @@ -91,23 +95,32 @@ async def connect(**kwargs) -> "BaseAsyncConnection": async def disconnect() -> None: + """Closes the active asynchronous connection to the broker. + + Gracefully terminates heartbeats and background networking tasks. Safe to + call even if no connection is active. + """ connection_module = get_backend().connection conn = await connection_module.Connection.get_connection(vhost=connection_module.VHOST) await conn.disconnect() async def reset_connection() -> "BaseAsyncConnection": - """Reset the singleton connection.""" + """Resets the singleton connection and returns it.""" connection = await connect() await connection.disconnect() return await connect() async def publish(message: "PBM") -> None: - """Asynchronously publish a message to its corresponding queue. + """Asynchronously publishes a Protobuf message to its corresponding topic. + + The destination topic is automatically derived from the message class and + package structure. Messages within a '.tasks' package are automatically + treated as persistent tasks requiring reliable delivery and queuing logic. Args: - message: The Protobuf message instance to be published. + message: An instance of a class derived from ProtoBunnyMessage. """ queue = get_queue(message) await queue.publish(message) @@ -116,14 +129,13 @@ async def publish(message: "PBM") -> None: async def publish_result( result: "Result", topic: str | None = None, correlation_id: str | None = None ) -> None: - """ - Asynchronously publish a result message to a specific result topic. + """Asynchronously publishes a processing result to the results topic of the source message. Args: - result: The Result object to publish. - topic: Optional override for the destination topic. Defaults to the - source message's result topic (e.g., "namespace.Message.result"). - correlation_id: Optional ID to link the result to the original request. + result: The Result object containing the response payload and source message. + topic: Optional override for the result topic. Defaults to the + automatically generated '.result' topic associated with the source message. + correlation_id: Optional ID used to link this result to a specific request. """ queue = get_queue(result.source) await queue.publish_result(result, topic, correlation_id) @@ -133,21 +145,20 @@ async def subscribe( pkg: "type[PBM] | ModuleType", callback: "AsyncCallback", ) -> "BaseAsyncQueue": - """ - Subscribe an asynchronous callback to a specific topic or namespace. + """Registers an async callback to consume messages from a specific topic or package. - If the module name contains '.tasks', it is treated as a shared task queue - allowing multiple subscribers. Otherwise, it is treated as a standard - subscription (exclusive queue). + If a message class is provided, subscribes to that specific topic. If a + module is provided, subscribes to all message types defined within that module. + For shared tasks (identified by the '.tasks' convention), Protobunny + automatically manages shared consumer groups and load balancing. Args: - pkg: The message class, instance, or module to subscribe to. + pkg: The message class (type[PBM]) or module to subscribe to. callback: An async callable that accepts the received message. Returns: - AsyncQueue: The queue object managing the subscription. + BaseAsyncQueue: The queue object managing the active subscription. """ - # obj = type(pkg) if isinstance(pkg, betterproto.Message) else pkg module_name = pkg.__name__ if inspect.ismodule(pkg) else pkg.__module__ registry_key = str(pkg) async with registry.lock: @@ -170,7 +181,13 @@ async def unsubscribe( if_unused: bool = True, if_empty: bool = True, ) -> None: - """Remove a subscription for a message/package""" + """Asynchronously removes a subscription for a specific message or package. + + Args: + pkg: The message class or module to unsubscribe from. + if_unused: If True, only unsubscribes if no other callbacks are attached. + if_empty: If True, only unsubscribes if the local message buffer is empty. + """ module_name = pkg.__name__ if inspect.ismodule(pkg) else pkg.__module__ registry_key = registry.get_key(pkg) @@ -199,11 +216,14 @@ async def unsubscribe_results( async def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: - """ - Asynchronously remove all active in-process subscriptions. + """Asynchronously stops all message consumption by canceling every subscription. + + Clears standard subscriptions, result listeners, and task workers. Typically + invoked during graceful application shutdown. - This clears standard subscriptions, result subscriptions, and task - subscriptions, effectively stopping all message consumption for this process. + Args: + if_unused: Policy for evaluating unused standard queues. + if_empty: Policy for evaluating empty standard queues. """ async with registry.lock: queues = itertools.chain( @@ -223,11 +243,17 @@ async def subscribe_results( pkg: "type[PBM] | ModuleType", callback: "AsyncCallback", ) -> "BaseAsyncQueue": - """Subscribe a callback function to the result topic. + """Asynchronously subscribes to result topics for a message type or package. + + Used by services that need to listen for completion signals or data + returned by workers processing specific message types. Args: - pkg: - callback: + pkg: The message class or module whose results should be monitored. + callback: The async function to execute when a result is received. + + Returns: + BaseAsyncQueue: The queue object managing the result subscription. """ queue = get_queue(pkg) await queue.subscribe_results(callback) @@ -240,6 +266,15 @@ async def subscribe_results( async def get_message_count( msg_type: "PBM | type[PBM] | ModuleType", ) -> int | None: + """Asynchronously retrieves the current number of pending messages in a queue. + + Args: + msg_type: The message instance, class, or module representing the queue. + + Returns: + int | None: The count of messages waiting to be processed, or None + if the backend does not support count retrieval for this type. + """ q = get_queue(msg_type) count = await q.get_message_count() return count @@ -248,6 +283,15 @@ async def get_message_count( async def get_consumer_count( msg_type: "PBM | type[PBM] | ModuleType", ) -> int | None: + """Retrieves the number of active consumers currently attached to a shared (aka tasks) queue. + + Args: + msg_type: The message instance, class representing the queue. + + Returns: + int | None: The number of active subscribers/workers, or None if + unsupported by the current backend. + """ q = get_queue(msg_type) count = await q.get_consumer_count() return count @@ -266,6 +310,16 @@ def default_log_callback(message: "IncomingMessageProtocol", msg_content: str) - async def subscribe_logger( log_callback: "LoggerCallback | None" = None, prefix: str | None = None ) -> "LoggingAsyncQueue": + """Asynchronously subscribes a logging callback to monitor message traffic. + + Args: + log_callback: A custom function to handle log messages. Defaults + to `default_log_callback`. + prefix: An optional subject/topic prefix to filter logged messages. + + Returns: + LoggingAsyncQueue: The specialized async queue object for logging. + """ resolved_callback = log_callback or default_log_callback queue, cb = LoggingAsyncQueue(prefix), resolved_callback await queue.subscribe(cb) @@ -277,6 +331,15 @@ def is_module_tasks(module_name: str) -> bool: def run_forever(main: tp.Callable[..., tp.Awaitable[None]]) -> None: + """Starts the event loop and keeps the process alive to consume messages. + + Installs signal handlers for SIGINT and SIGTERM to trigger an orderly + async shutdown. + + Args: + main: The entry point async function to run before entering the + permanent wait state. + """ asyncio.run(_run_forever(main)) diff --git a/protobunny/asyncio/__init__.py.j2 b/protobunny/asyncio/__init__.py.j2 index de61bf3..dbd7323 100644 --- a/protobunny/asyncio/__init__.py.j2 +++ b/protobunny/asyncio/__init__.py.j2 @@ -1,16 +1,10 @@ """ -A module providing support for messaging and communication using RabbitMQ as the backend. +A module providing async support for messaging and communication using the configured broker as the backend. This module includes functionality for publishing, subscribing, and managing message queues, -as well as dynamically managing imports and configurations for RabbitMQ-based communication -logics. It enables both synchronous and asynchronous operations, while also supporting -connection resetting and management. - -Modules and functionality are primarily imported from the core RabbitMQ backend, dynamically -generated package-specific configurations, and other base utilities. Exports are adjusted -as per the backend configuration. - +as well as dynamically managing imports and configurations for the backend. """ + __all__ = [ "get_message_count", "get_queue", @@ -81,30 +75,50 @@ log = logging.getLogger(PACKAGE_NAME) ############################ async def connect(**kwargs) -> "BaseAsyncConnection": - """Get the singleton async connection.""" + """Establishes an asynchronous connection to the configured messaging broker. + + This method initializes and returns a singleton async connection. Subsequent + calls return the existing connection instance unless it has been disconnected. + + Args: + **kwargs: Backend-specific connection arguments (e.g., host, port, + credentials, or protocol-specific tuning parameters). + + Returns: + BaseAsyncConnection: The active asynchronous connection singleton. + """ connection_module = get_backend().connection conn = await connection_module.Connection.get_connection(vhost=connection_module.VHOST, **kwargs) return conn async def disconnect() -> None: + """Closes the active asynchronous connection to the broker. + + Gracefully terminates heartbeats and background networking tasks. Safe to + call even if no connection is active. + """ connection_module = get_backend().connection conn = await connection_module.Connection.get_connection(vhost=connection_module.VHOST) await conn.disconnect() async def reset_connection() -> "BaseAsyncConnection": - """Reset the singleton connection.""" + """Resets the singleton connection and returns it.""" connection = await connect() await connection.disconnect() return await connect() async def publish(message: "PBM") -> None: - """Asynchronously publish a message to its corresponding queue. + """Asynchronously publishes a Protobuf message to its corresponding topic. + + The destination topic is automatically derived from the message class and + package structure. Messages within a '.tasks' package are automatically + treated as persistent tasks requiring reliable delivery and queuing logic. Args: - message: The Protobuf message instance to be published. + message: An instance of a class derived from ProtoBunnyMessage. """ queue = get_queue(message) await queue.publish(message) @@ -113,14 +127,13 @@ async def publish(message: "PBM") -> None: async def publish_result( result: "Result", topic: str | None = None, correlation_id: str | None = None ) -> None: - """ - Asynchronously publish a result message to a specific result topic. + """Asynchronously publishes a processing result to the results topic of the source message. Args: - result: The Result object to publish. - topic: Optional override for the destination topic. Defaults to the - source message's result topic (e.g., "namespace.Message.result"). - correlation_id: Optional ID to link the result to the original request. + result: The Result object containing the response payload and source message. + topic: Optional override for the result topic. Defaults to the + automatically generated '.result' topic associated with the source message. + correlation_id: Optional ID used to link this result to a specific request. """ queue = get_queue(result.source) await queue.publish_result(result, topic, correlation_id) @@ -130,21 +143,20 @@ async def subscribe( pkg: "type[PBM] | ModuleType", callback: "AsyncCallback", ) -> "BaseAsyncQueue": - """ - Subscribe an asynchronous callback to a specific topic or namespace. + """Registers an async callback to consume messages from a specific topic or package. - If the module name contains '.tasks', it is treated as a shared task queue - allowing multiple subscribers. Otherwise, it is treated as a standard - subscription (exclusive queue). + If a message class is provided, subscribes to that specific topic. If a + module is provided, subscribes to all message types defined within that module. + For shared tasks (identified by the '.tasks' convention), Protobunny + automatically manages shared consumer groups and load balancing. Args: - pkg: The message class, instance, or module to subscribe to. + pkg: The message class (type[PBM]) or module to subscribe to. callback: An async callable that accepts the received message. Returns: - AsyncQueue: The queue object managing the subscription. + BaseAsyncQueue: The queue object managing the active subscription. """ - # obj = type(pkg) if isinstance(pkg, betterproto.Message) else pkg module_name = pkg.__name__ if inspect.ismodule(pkg) else pkg.__module__ registry_key = str(pkg) async with registry.lock: @@ -167,7 +179,13 @@ async def unsubscribe( if_unused: bool = True, if_empty: bool = True, ) -> None: - """Remove a subscription for a message/package""" + """Asynchronously removes a subscription for a specific message or package. + + Args: + pkg: The message class or module to unsubscribe from. + if_unused: If True, only unsubscribes if no other callbacks are attached. + if_empty: If True, only unsubscribes if the local message buffer is empty. + """ module_name = pkg.__name__ if inspect.ismodule(pkg) else pkg.__module__ registry_key = registry.get_key(pkg) @@ -196,11 +214,14 @@ async def unsubscribe_results( async def unsubscribe_all(if_unused: bool = True, if_empty: bool = True) -> None: - """ - Asynchronously remove all active in-process subscriptions. + """Asynchronously stops all message consumption by canceling every subscription. + + Clears standard subscriptions, result listeners, and task workers. Typically + invoked during graceful application shutdown. - This clears standard subscriptions, result subscriptions, and task - subscriptions, effectively stopping all message consumption for this process. + Args: + if_unused: Policy for evaluating unused standard queues. + if_empty: Policy for evaluating empty standard queues. """ async with registry.lock: queues = itertools.chain( @@ -220,11 +241,17 @@ async def subscribe_results( pkg: "type[PBM] | ModuleType", callback: "AsyncCallback", ) -> "BaseAsyncQueue": - """Subscribe a callback function to the result topic. + """Asynchronously subscribes to result topics for a message type or package. + + Used by services that need to listen for completion signals or data + returned by workers processing specific message types. Args: - pkg: - callback: + pkg: The message class or module whose results should be monitored. + callback: The async function to execute when a result is received. + + Returns: + BaseAsyncQueue: The queue object managing the result subscription. """ queue = get_queue(pkg) await queue.subscribe_results(callback) @@ -237,6 +264,15 @@ async def subscribe_results( async def get_message_count( msg_type: "PBM | type[PBM] | ModuleType", ) -> int | None: + """Asynchronously retrieves the current number of pending messages in a queue. + + Args: + msg_type: The message instance, class, or module representing the queue. + + Returns: + int | None: The count of messages waiting to be processed, or None + if the backend does not support count retrieval for this type. + """ q = get_queue(msg_type) count = await q.get_message_count() return count @@ -245,6 +281,15 @@ async def get_message_count( async def get_consumer_count( msg_type: "PBM | type[PBM] | ModuleType", ) -> int | None: + """Retrieves the number of active consumers currently attached to a shared (aka tasks) queue. + + Args: + msg_type: The message instance, class representing the queue. + + Returns: + int | None: The number of active subscribers/workers, or None if + unsupported by the current backend. + """ q = get_queue(msg_type) count = await q.get_consumer_count() return count @@ -263,6 +308,16 @@ def default_log_callback(message: "IncomingMessageProtocol", msg_content: str) - async def subscribe_logger( log_callback: "LoggerCallback | None" = None, prefix: str | None = None ) -> "LoggingAsyncQueue": + """Asynchronously subscribes a logging callback to monitor message traffic. + + Args: + log_callback: A custom function to handle log messages. Defaults + to `default_log_callback`. + prefix: An optional subject/topic prefix to filter logged messages. + + Returns: + LoggingAsyncQueue: The specialized async queue object for logging. + """ resolved_callback = log_callback or default_log_callback queue, cb = LoggingAsyncQueue(prefix), resolved_callback await queue.subscribe(cb) @@ -274,6 +329,15 @@ def is_module_tasks(module_name: str) -> bool: def run_forever(main: tp.Callable[..., tp.Awaitable[None]]) -> None: + """Starts the event loop and keeps the process alive to consume messages. + + Installs signal handlers for SIGINT and SIGTERM to trigger an orderly + async shutdown. + + Args: + main: The entry point async function to run before entering the + permanent wait state. + """ asyncio.run(_run_forever(main)) diff --git a/protobunny/wrapper.py b/protobunny/wrapper.py index f8ea798..e5fe0c3 100644 --- a/protobunny/wrapper.py +++ b/protobunny/wrapper.py @@ -8,14 +8,14 @@ Generate betterproto classes and automatically includes the path to the custom proto types and add the ProtoBunny mixin for the configured package (i.e. ``generated-package-name``). -See protobunny generate --help for more options. +See ``protobunny generate --help`` for more options. .. code-block:: shell protobunny log -Start a logger in console. See protobunny log --help for more options. +Start a logger in console. See ``protobunny log --help`` for more options. Full configuration for pyproject.toml diff --git a/pyproject.toml b/pyproject.toml index a26ea89..75b0a4c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,9 +1,19 @@ [project] name = "protobunny" classifiers = [ - "Development Status :: 3 - Alpha" + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: Communications", + "Topic :: Software Development :: Libraries :: Application Frameworks", + "Topic :: Software Development :: Object Messaging", + "Topic :: System :: Distributed Computing" ] -version = "0.1.2a2" +version = "0.1.2" description = "A type-safe, sync/async Python messaging library." authors = [ {name = "Domenico Nappo", email = "domenico.nappo@am-flow.com"}, diff --git a/setup.py b/setup.py index 6331cf7..fbd1694 100644 --- a/setup.py +++ b/setup.py @@ -98,7 +98,7 @@ def run(self) -> None: "install": GenerateProtoCommand, }, python_requires=">=3.10,<3.14", - description="Protobuf messages and python mqtt messaging toolkit", + description="A type-safe, sync/async Python messaging library.", entry_points={ "console_scripts": [ "protobunny=protobunny.wrapper:main", diff --git a/uv.lock b/uv.lock index cc5b48d..fa23b20 100644 --- a/uv.lock +++ b/uv.lock @@ -1274,7 +1274,7 @@ wheels = [ [[package]] name = "protobunny" -version = "0.1.2a2" +version = "0.1.2" source = { editable = "." } dependencies = [ { name = "betterproto", extra = ["compiler"] },