Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .kokoro/presubmit/system.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "system-3.9"
}
value: "system-3.10"
}
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
from google.cloud.bigtable.data.exceptions import _ResetRow
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import _rst_stream_aware_predicate

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator

from google.cloud.bigtable.data._cross_sync import CrossSync
Expand Down Expand Up @@ -98,7 +98,7 @@ def __init__(
else:
self.request = query._to_pb(target)
self.target = target
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._predicate = _rst_stream_aware_predicate(*retryable_exceptions)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

Expand Down
39 changes: 38 additions & 1 deletion google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
"""
from __future__ import annotations

from typing import Sequence, List, Tuple, TYPE_CHECKING, Union
from typing import Callable, Sequence, List, Tuple, TYPE_CHECKING, Union
import time
import enum
from collections import namedtuple
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
from google.api_core.retry import RetryFailureReason
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup

Expand All @@ -47,6 +48,15 @@
# used by every data client as a default project name for testing on Bigtable emulator.
_DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator"

# Internal error messages that can be retried during ReadRows. Internal error messages with this error
# text should be treated as Unavailable error messages with the same error text, and will therefore be
# treated as Unavailable errors rather than Internal errors.
_RETRYABLE_INTERNAL_ERROR_MESSAGES = (
"rst_stream",
"rst stream",
"received unexpected eos on data frame from server",
)

# used to identify an active bigtable resource that needs to be warmed through PingAndWarm
# each instance/app_profile_id pair needs to be individually tracked
_WarmedInstanceKey = namedtuple(
Expand Down Expand Up @@ -122,6 +132,33 @@ def _retry_exception_factory(
return source_exc, cause_exc


def _rst_stream_aware_predicate(
*exception_types: type[Exception],
) -> Callable[[Exception], bool]:
"""A custom retry predicate.

This predicate treats Internal error messages with RST_STREAM errors as
ServiceUnavailable errors and will retry them if the Unavailable exception is retryable.

Args:
exception_types: Exception types to be retried during operation

Returns:
Callable[[Exception], bool]: A retry predicate that takes in an exception and
returns whether or not that exception is retryable
"""
# predicate to check for retryable error types
if_exception_type = retries.if_exception_type(*exception_types)
# special case: treat InternalServerError with rst_stream error message as ServiceUnavailable
rst_check = (
lambda e: core_exceptions.ServiceUnavailable in exception_types
and isinstance(e, core_exceptions.InternalServerError)
and any(m in e.message.lower() for m in _RETRYABLE_INTERNAL_ERROR_MESSAGES)
)

return lambda e: if_exception_type(e) or rst_check(e)


def _get_timeouts(
operation: float | TABLE_DEFAULT,
attempt: float | None | TABLE_DEFAULT,
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/data/_sync_autogen/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from google.cloud.bigtable.data.exceptions import _ResetRow
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.api_core import retry as retries
from google.cloud.bigtable.data._helpers import _rst_stream_aware_predicate
from google.api_core.retry import exponential_sleep_generator
from google.cloud.bigtable.data._cross_sync import CrossSync

Expand Down Expand Up @@ -88,7 +88,7 @@ def __init__(
else:
self.request = query._to_pb(target)
self.target = target
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._predicate = _rst_stream_aware_predicate(*retryable_exceptions)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable/data/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __repr__(self):


# TODO: When working on mutations batcher, rework exception handling to guarantee that
# MutationsExceptionGroup only stores FailedMutationEntryErrors.
# MutationsExceptionGroup only stores FailedMutationEntryErrors.
class MutationsExceptionGroup(_BigtableExceptionGroup):
"""
Represents one or more exceptions that occur during a bulk mutation operation
Expand Down
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
UNIT_TEST_EXTRAS: List[str] = []
UNIT_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {}

SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.9", "3.14"]
SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.10", "3.14"]
SYSTEM_TEST_STANDARD_DEPENDENCIES: List[str] = [
"mock",
"pytest",
Expand All @@ -79,7 +79,6 @@

# 'docfx' is excluded since it only needs to run in 'docs-presubmit'
nox.options.sessions = [
"unit-3.9",
"unit-3.10",
"unit-3.11",
"unit-3.12",
Expand Down
29 changes: 23 additions & 6 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1349,49 +1349,56 @@ def test_table_ctor_sync(self):
@CrossSync.pytest
# iterate over all retryable rpcs
@pytest.mark.parametrize(
"fn_name,fn_args,is_stream,extra_retryables",
"fn_name,fn_args,is_read_rows_fn,is_stream,extra_retryables",
[
(
"read_rows_stream",
(ReadRowsQuery(),),
True,
True,
(),
),
(
"read_rows",
(ReadRowsQuery(),),
True,
True,
(),
),
(
"read_row",
(b"row_key",),
True,
True,
(),
),
(
"read_rows_sharded",
([ReadRowsQuery()],),
True,
True,
(),
),
(
"row_exists",
(b"row_key",),
True,
True,
(),
),
("sample_row_keys", (), False, ()),
("sample_row_keys", (), False, False, ()),
(
"mutate_row",
(b"row_key", [DeleteAllFromRow()]),
False,
False,
(),
),
(
"bulk_mutate_rows",
([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],),
False,
False,
(_MutateRowsIncomplete,),
),
],
Expand Down Expand Up @@ -1426,6 +1433,7 @@ async def test_customizable_retryable_errors(
expected_retryables,
fn_name,
fn_args,
is_read_rows_fn,
is_stream,
extra_retryables,
):
Expand All @@ -1438,18 +1446,26 @@ async def test_customizable_retryable_errors(
retry_fn += "_stream"
if CrossSync.is_async:
retry_fn = f"CrossSync.{retry_fn}"
subpackage = "_async"
else:
retry_fn = f"CrossSync._Sync_Impl.{retry_fn}"
subpackage = "_sync_autogen"

# Read Rows has its own custom predicate builder that also takes in
# a list of exceptions
if is_read_rows_fn:
predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._rst_stream_aware_predicate"
else:
predicate_builder = "google.api_core.retry.if_exception_type"

with mock.patch(
f"google.cloud.bigtable.data._cross_sync.{retry_fn}"
) as retry_fn_mock:
async with self._make_client() as client:
table = client.get_table("instance-id", "table-id")
expected_predicate = expected_retryables.__contains__
retry_fn_mock.side_effect = RuntimeError("stop early")
with mock.patch(
"google.api_core.retry.if_exception_type"
) as predicate_builder_mock:
with mock.patch(predicate_builder) as predicate_builder_mock:
predicate_builder_mock.return_value = expected_predicate
with pytest.raises(Exception):
# we expect an exception from attempting to call the mock
Expand All @@ -1460,7 +1476,8 @@ async def test_customizable_retryable_errors(
*expected_retryables, *extra_retryables
)
retry_call_args = retry_fn_mock.call_args_list[0].args
# output of if_exception_type should be sent in to retry constructor

# output of the predicate builder should be sent in to retry constructor
assert retry_call_args[1] is expected_predicate

@pytest.mark.parametrize(
Expand Down
27 changes: 16 additions & 11 deletions tests/unit/data/_sync_autogen/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,19 +1101,20 @@ def test_ctor_invalid_timeout_values(self):
client.close()

@pytest.mark.parametrize(
"fn_name,fn_args,is_stream,extra_retryables",
"fn_name,fn_args,is_read_rows_fn,is_stream,extra_retryables",
[
("read_rows_stream", (ReadRowsQuery(),), True, ()),
("read_rows", (ReadRowsQuery(),), True, ()),
("read_row", (b"row_key",), True, ()),
("read_rows_sharded", ([ReadRowsQuery()],), True, ()),
("row_exists", (b"row_key",), True, ()),
("sample_row_keys", (), False, ()),
("mutate_row", (b"row_key", [DeleteAllFromRow()]), False, ()),
("read_rows_stream", (ReadRowsQuery(),), True, True, ()),
("read_rows", (ReadRowsQuery(),), True, True, ()),
("read_row", (b"row_key",), True, True, ()),
("read_rows_sharded", ([ReadRowsQuery()],), True, True, ()),
("row_exists", (b"row_key",), True, True, ()),
("sample_row_keys", (), False, False, ()),
("mutate_row", (b"row_key", [DeleteAllFromRow()]), False, False, ()),
(
"bulk_mutate_rows",
([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],),
False,
False,
(_MutateRowsIncomplete,),
),
],
Expand Down Expand Up @@ -1147,6 +1148,7 @@ def test_customizable_retryable_errors(
expected_retryables,
fn_name,
fn_args,
is_read_rows_fn,
is_stream,
extra_retryables,
):
Expand All @@ -1156,16 +1158,19 @@ def test_customizable_retryable_errors(
if is_stream:
retry_fn += "_stream"
retry_fn = f"CrossSync._Sync_Impl.{retry_fn}"
subpackage = "_sync_autogen"
if is_read_rows_fn:
predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._rst_stream_aware_predicate"
else:
predicate_builder = "google.api_core.retry.if_exception_type"
with mock.patch(
f"google.cloud.bigtable.data._cross_sync.{retry_fn}"
) as retry_fn_mock:
with self._make_client() as client:
table = client.get_table("instance-id", "table-id")
expected_predicate = expected_retryables.__contains__
retry_fn_mock.side_effect = RuntimeError("stop early")
with mock.patch(
"google.api_core.retry.if_exception_type"
) as predicate_builder_mock:
with mock.patch(predicate_builder) as predicate_builder_mock:
predicate_builder_mock.return_value = expected_predicate
with pytest.raises(Exception):
test_fn = table.__getattribute__(fn_name)
Expand Down
42 changes: 42 additions & 0 deletions tests/unit/data/test__helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,48 @@ def test_get_timeouts_invalid(self, input_times):
_helpers._align_timeouts(input_times[0], input_times[1])


class TestRstStreamAwarePredicate:
@pytest.mark.parametrize(
"retryable_exceptions,exception,expected_is_retryable",
[
(
[core_exceptions.Aborted, core_exceptions.InternalServerError],
core_exceptions.InternalServerError("Sorry"),
True,
),
(
[core_exceptions.Aborted, core_exceptions.InternalServerError],
core_exceptions.DataLoss("Sorry"),
False,
),
(
[core_exceptions.ServiceUnavailable, core_exceptions.Aborted],
core_exceptions.InternalServerError("Sorry"),
False,
),
(
[core_exceptions.ServiceUnavailable, core_exceptions.Aborted],
core_exceptions.InternalServerError(
_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]
),
True,
),
(
[core_exceptions.InternalServerError, core_exceptions.Aborted],
core_exceptions.InternalServerError(
_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]
),
True,
),
],
)
def test_rst_stream_aware_predicate(
self, retryable_exceptions, exception, expected_is_retryable
):
predicate = _helpers._rst_stream_aware_predicate(*retryable_exceptions)
assert predicate(exception) is expected_is_retryable


class TestGetRetryableErrors:
@pytest.mark.parametrize(
"input_codes,input_table,expected",
Expand Down
Loading