diff --git a/pubsub/gcloud/aio/pubsub/__init__.py b/pubsub/gcloud/aio/pubsub/__init__.py index 4b4d730c8..f0da93b39 100644 --- a/pubsub/gcloud/aio/pubsub/__init__.py +++ b/pubsub/gcloud/aio/pubsub/__init__.py @@ -53,6 +53,7 @@ async def handler(message): 'projects//subscriptions/', handler, subscriber_client, + ack_deadline=10, num_producers=1, max_messages_per_producer=100, ack_window=0.3, @@ -70,6 +71,11 @@ async def handler(message): return ``None`` if the message should be acked. An exception raised within the handler will result in the message being left to expire, and thus it will be redelivered according to your subscription's ack deadline. +- ``ack_deadline``: An optional parameter which should match the ack deadline + (in seconds) of your subscription. If omitted, the library will fetch the + current value from the subscription. Hard-coding it here is a way to avoid an + extra call to the Pub/Sub REST API which counts as an Administrator + Operation. - ``num_producers``: Number of workers that will be making ``pull`` requests to pubsub. Please note that a worker will only fetch new batch once the ``handler`` was called for each message from the previous batch. This means diff --git a/pubsub/gcloud/aio/pubsub/subscriber.py b/pubsub/gcloud/aio/pubsub/subscriber.py index 41c5dd2ae..8edec3184 100644 --- a/pubsub/gcloud/aio/pubsub/subscriber.py +++ b/pubsub/gcloud/aio/pubsub/subscriber.py @@ -41,12 +41,13 @@ class AckDeadlineCache: def __init__( self, subscriber_client: SubscriberClient, subscription: str, cache_timeout: float, + ack_deadline: Optional[float] = None, ): self.subscriber_client = subscriber_client self.subscription = subscription self.cache_timeout = cache_timeout - self.ack_deadline: float = float('inf') self.last_refresh: float = float('-inf') + self.ack_deadline = ack_deadline or float('inf') async def get(self) -> float: if self.cache_outdated(): @@ -469,6 +470,7 @@ async def subscribe( num_producers: int = 1, max_messages_per_producer: int = 100, ack_window: float = 0.3, + ack_deadline: Optional[float] = None, ack_deadline_cache_timeout: float = float('inf'), num_tasks_per_consumer: int = 1, enable_nack: bool = True, @@ -484,6 +486,7 @@ async def subscribe( subscriber_client, subscription, ack_deadline_cache_timeout, + ack_deadline, ) if metrics_client is not None: diff --git a/pubsub/pyproject.rest.toml b/pubsub/pyproject.rest.toml index b1c68ffdd..cb3d79f89 100644 --- a/pubsub/pyproject.rest.toml +++ b/pubsub/pyproject.rest.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcloud-rest-pubsub" -version = "6.1.0" +version = "6.2.0" description = "Python Client for Google Cloud Pub/Sub" readme = "README.rst" diff --git a/pubsub/pyproject.toml b/pubsub/pyproject.toml index f48eff69d..3c7a223ea 100644 --- a/pubsub/pyproject.toml +++ b/pubsub/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gcloud-aio-pubsub" -version = "6.1.0" +version = "6.2.0" description = "Python Client for Google Cloud Pub/Sub" readme = "README.rst" diff --git a/pubsub/tests/unit/subscriber_test.py b/pubsub/tests/unit/subscriber_test.py index 0e2225666..b21d37f1e 100644 --- a/pubsub/tests/unit/subscriber_test.py +++ b/pubsub/tests/unit/subscriber_test.py @@ -1,4 +1,5 @@ # pylint: disable=redefined-outer-name +# pylint: disable=too-many-lines from gcloud.aio.auth import BUILD_GCLOUD_REST # pylint: disable=too-complex @@ -148,6 +149,16 @@ async def test_ack_deadline_cache_get_calls_refresh_first_time( assert await cache.get() == 42 assert cache.last_refresh + @pytest.mark.asyncio + async def test_ack_deadline_cache_no_refresh_if_specified( + subscriber_client, + ): + cache = AckDeadlineCache( + subscriber_client, 'fake_subscription', float('inf'), 100 + ) + assert await cache.get() == 100 + subscriber_client.get_subscription.assert_not_called() + @pytest.mark.asyncio async def test_ack_deadline_cache_get_no_call_if_not_outdated( subscriber_client,