Skip to content

Commit da4750e

Browse files
authored
feat(pubsub): allow disabling ack deadline fetch (talkiq#898)
1 parent d8b17c9 commit da4750e

File tree

5 files changed

+23
-3
lines changed

5 files changed

+23
-3
lines changed

pubsub/gcloud/aio/pubsub/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ async def handler(message):
5353
'projects/<my_project>/subscriptions/<my_subscription>',
5454
handler,
5555
subscriber_client,
56+
ack_deadline=10,
5657
num_producers=1,
5758
max_messages_per_producer=100,
5859
ack_window=0.3,
@@ -70,6 +71,11 @@ async def handler(message):
7071
return ``None`` if the message should be acked. An exception raised within
7172
the handler will result in the message being left to expire, and thus it will
7273
be redelivered according to your subscription's ack deadline.
74+
- ``ack_deadline``: An optional parameter which should match the ack deadline
75+
(in seconds) of your subscription. If omitted, the library will fetch the
76+
current value from the subscription. Hard-coding it here is a way to avoid an
77+
extra call to the Pub/Sub REST API which counts as an Administrator
78+
Operation.
7379
- ``num_producers``: Number of workers that will be making ``pull`` requests to
7480
pubsub. Please note that a worker will only fetch new batch once the
7581
``handler`` was called for each message from the previous batch. This means

pubsub/gcloud/aio/pubsub/subscriber.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ class AckDeadlineCache:
4141
def __init__(
4242
self, subscriber_client: SubscriberClient,
4343
subscription: str, cache_timeout: float,
44+
ack_deadline: Optional[float] = None,
4445
):
4546
self.subscriber_client = subscriber_client
4647
self.subscription = subscription
4748
self.cache_timeout = cache_timeout
48-
self.ack_deadline: float = float('inf')
4949
self.last_refresh: float = float('-inf')
50+
self.ack_deadline = ack_deadline or float('inf')
5051

5152
async def get(self) -> float:
5253
if self.cache_outdated():
@@ -469,6 +470,7 @@ async def subscribe(
469470
num_producers: int = 1,
470471
max_messages_per_producer: int = 100,
471472
ack_window: float = 0.3,
473+
ack_deadline: Optional[float] = None,
472474
ack_deadline_cache_timeout: float = float('inf'),
473475
num_tasks_per_consumer: int = 1,
474476
enable_nack: bool = True,
@@ -484,6 +486,7 @@ async def subscribe(
484486
subscriber_client,
485487
subscription,
486488
ack_deadline_cache_timeout,
489+
ack_deadline,
487490
)
488491

489492
if metrics_client is not None:

pubsub/pyproject.rest.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "gcloud-rest-pubsub"
3-
version = "6.1.0"
3+
version = "6.2.0"
44
description = "Python Client for Google Cloud Pub/Sub"
55
readme = "README.rst"
66

pubsub/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "gcloud-aio-pubsub"
3-
version = "6.1.0"
3+
version = "6.2.0"
44
description = "Python Client for Google Cloud Pub/Sub"
55
readme = "README.rst"
66

pubsub/tests/unit/subscriber_test.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# pylint: disable=redefined-outer-name
2+
# pylint: disable=too-many-lines
23
from gcloud.aio.auth import BUILD_GCLOUD_REST
34

45
# pylint: disable=too-complex
@@ -148,6 +149,16 @@ async def test_ack_deadline_cache_get_calls_refresh_first_time(
148149
assert await cache.get() == 42
149150
assert cache.last_refresh
150151

152+
@pytest.mark.asyncio
153+
async def test_ack_deadline_cache_no_refresh_if_specified(
154+
subscriber_client,
155+
):
156+
cache = AckDeadlineCache(
157+
subscriber_client, 'fake_subscription', float('inf'), 100
158+
)
159+
assert await cache.get() == 100
160+
subscriber_client.get_subscription.assert_not_called()
161+
151162
@pytest.mark.asyncio
152163
async def test_ack_deadline_cache_get_no_call_if_not_outdated(
153164
subscriber_client,

0 commit comments

Comments
 (0)