Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pubsub/gcloud/aio/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async def handler(message):
'projects/<my_project>/subscriptions/<my_subscription>',
handler,
subscriber_client,
ack_deadline=10,
num_producers=1,
max_messages_per_producer=100,
ack_window=0.3,
Expand All @@ -70,6 +71,11 @@ async def handler(message):
return ``None`` if the message should be acked. An exception raised within
the handler will result in the message being left to expire, and thus it will
be redelivered according to your subscription's ack deadline.
- ``ack_deadline``: An optional parameter which should match the ack deadline
(in seconds) of your subscription. If omitted, the library will fetch the
current value from the subscription. Hard-coding it here is a way to avoid an
extra call to the Pub/Sub REST API which counts as an Administrator
Operation.
- ``num_producers``: Number of workers that will be making ``pull`` requests to
pubsub. Please note that a worker will only fetch new batch once the
``handler`` was called for each message from the previous batch. This means
Expand Down
5 changes: 4 additions & 1 deletion pubsub/gcloud/aio/pubsub/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ class AckDeadlineCache:
def __init__(
self, subscriber_client: SubscriberClient,
subscription: str, cache_timeout: float,
ack_deadline: Optional[float] = None,
):
self.subscriber_client = subscriber_client
self.subscription = subscription
self.cache_timeout = cache_timeout
self.ack_deadline: float = float('inf')
self.last_refresh: float = float('-inf')
self.ack_deadline = ack_deadline or float('inf')

async def get(self) -> float:
if self.cache_outdated():
Expand Down Expand Up @@ -469,6 +470,7 @@ async def subscribe(
num_producers: int = 1,
max_messages_per_producer: int = 100,
ack_window: float = 0.3,
ack_deadline: Optional[float] = None,
ack_deadline_cache_timeout: float = float('inf'),
num_tasks_per_consumer: int = 1,
enable_nack: bool = True,
Expand All @@ -484,6 +486,7 @@ async def subscribe(
subscriber_client,
subscription,
ack_deadline_cache_timeout,
ack_deadline,
)

if metrics_client is not None:
Expand Down
2 changes: 1 addition & 1 deletion pubsub/pyproject.rest.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcloud-rest-pubsub"
version = "6.1.0"
version = "6.2.0"
description = "Python Client for Google Cloud Pub/Sub"
readme = "README.rst"

Expand Down
2 changes: 1 addition & 1 deletion pubsub/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcloud-aio-pubsub"
version = "6.1.0"
version = "6.2.0"
description = "Python Client for Google Cloud Pub/Sub"
readme = "README.rst"

Expand Down
11 changes: 11 additions & 0 deletions pubsub/tests/unit/subscriber_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-lines
from gcloud.aio.auth import BUILD_GCLOUD_REST

# pylint: disable=too-complex
Expand Down Expand Up @@ -148,6 +149,16 @@ async def test_ack_deadline_cache_get_calls_refresh_first_time(
assert await cache.get() == 42
assert cache.last_refresh

@pytest.mark.asyncio
async def test_ack_deadline_cache_no_refresh_if_specified(
subscriber_client,
):
cache = AckDeadlineCache(
subscriber_client, 'fake_subscription', float('inf'), 100
)
assert await cache.get() == 100
subscriber_client.get_subscription.assert_not_called()

@pytest.mark.asyncio
async def test_ack_deadline_cache_get_no_call_if_not_outdated(
subscriber_client,
Expand Down