From d32616b5d9e2f96b0731f0d83fc15aa13abf9126 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 24 Feb 2026 18:21:31 +0000 Subject: [PATCH 1/4] feat: Added rst_stream exception handling for ReadRows. --- .../cloud/bigtable/data/_async/_read_rows.py | 4 +- google/cloud/bigtable/data/_helpers.py | 39 ++++++++++++++++++- .../bigtable/data/_sync_autogen/_read_rows.py | 4 +- tests/unit/data/_async/test_client.py | 29 +++++++++++--- tests/unit/data/_sync_autogen/test_client.py | 27 +++++++------ tests/unit/data/test__helpers.py | 16 ++++++++ 6 files changed, 98 insertions(+), 21 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 8787bfa71..05d2c436c 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -29,8 +29,10 @@ from google.cloud.bigtable.data.exceptions import _ResetRow from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory +from google.cloud.bigtable.data._helpers import _read_rows_predicate_with_exceptions from google.api_core import retry as retries +from google.api_core import exceptions as core_exceptions from google.api_core.retry import exponential_sleep_generator from google.cloud.bigtable.data._cross_sync import CrossSync @@ -98,7 +100,7 @@ def __init__( else: self.request = query._to_pb(target) self.target = target - self._predicate = retries.if_exception_type(*retryable_exceptions) + self._predicate = _read_rows_predicate_with_exceptions(*retryable_exceptions) self._last_yielded_row_key: bytes | None = None self._remaining_count: int | None = self.request.rows_limit or None diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index db17b0e42..def90ce8a 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -16,13 +16,14 @@ """ from __future__ import annotations -from typing import Sequence, List, Tuple, TYPE_CHECKING, Union +from typing import Callable, Sequence, List, Tuple, TYPE_CHECKING, Union import time import enum from collections import namedtuple from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery from google.api_core import exceptions as core_exceptions +from google.api_core import retry as retries from google.api_core.retry import RetryFailureReason from google.cloud.bigtable.data.exceptions import RetryExceptionGroup @@ -47,6 +48,15 @@ # used by every data client as a default project name for testing on Bigtable emulator. _DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator" +# Internal error messages that can be retried during ReadRows. Internal error messages with this error +# text should be streated as Unavailable error messages with the same error text, and will therefore be +# treated as Unavailable errors rather than Internal errors. +_RETRYABLE_INTERNAL_ERROR_MESSAGES = ( + "rst_stream", + "rst stream", + "received unexpected eos on data frame from server", +) + # used to identify an active bigtable resource that needs to be warmed through PingAndWarm # each instance/app_profile_id pair needs to be individually tracked _WarmedInstanceKey = namedtuple( @@ -122,6 +132,33 @@ def _retry_exception_factory( return source_exc, cause_exc +def _read_rows_predicate_with_exceptions(*exception_types: type[Exception]) -> Callable[[Exception], bool]: + """A custom retry predicate for ReadRows. + + This predicate treats Internal error messages with RST_STREAM errors as + ServiceUnavailable errors and will retry them if the Unavailable exception is retryable. + + Args: + retryable_exceptions: tuple of Exception types to be retried during operation + + Returns: + Callable[[Exception], bool]: A retry predicate that takes in an exception and + returns whether or not that exception is retryable + """ + is_exception_type = retries.if_exception_type(*exception_types) + + def predicate(exception: Exception) -> bool: + return (isinstance(exception, core_exceptions.InternalServerError) and exception.message.lower() in _RETRYABLE_INTERNAL_ERROR_MESSAGES) or is_exception_type(exception) + + # Treating RST_STREAM internal errors as unavailable errors is only done if ServiceUnavailable is one of the + # given exception types. If InternalServerError is also a retryable exception, we don't necessarily need the + # custom predicate either. + if core_exceptions.ServiceUnavailable in exception_types and core_exceptions.InternalServerError not in exception_types: + return predicate + + return is_exception_type + + def _get_timeouts( operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, diff --git a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py index 3593475a9..752e0a3a4 100644 --- a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py +++ b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py @@ -29,7 +29,7 @@ from google.cloud.bigtable.data.exceptions import _ResetRow from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory -from google.api_core import retry as retries +from google.cloud.bigtable.data._helpers import _read_rows_predicate_with_exceptions from google.api_core.retry import exponential_sleep_generator from google.cloud.bigtable.data._cross_sync import CrossSync @@ -88,7 +88,7 @@ def __init__( else: self.request = query._to_pb(target) self.target = target - self._predicate = retries.if_exception_type(*retryable_exceptions) + self._predicate = _read_rows_predicate_with_exceptions(*retryable_exceptions) self._last_yielded_row_key: bytes | None = None self._remaining_count: int | None = self.request.rows_limit or None diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 0bc8b921d..2b77028bd 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1349,49 +1349,56 @@ def test_table_ctor_sync(self): @CrossSync.pytest # iterate over all retryable rpcs @pytest.mark.parametrize( - "fn_name,fn_args,is_stream,extra_retryables", + "fn_name,fn_args,is_read_rows_fn,is_stream,extra_retryables", [ ( "read_rows_stream", (ReadRowsQuery(),), True, + True, (), ), ( "read_rows", (ReadRowsQuery(),), True, + True, (), ), ( "read_row", (b"row_key",), True, + True, (), ), ( "read_rows_sharded", ([ReadRowsQuery()],), True, + True, (), ), ( "row_exists", (b"row_key",), True, + True, (), ), - ("sample_row_keys", (), False, ()), + ("sample_row_keys", (), False, False, ()), ( "mutate_row", (b"row_key", [DeleteAllFromRow()]), False, + False, (), ), ( "bulk_mutate_rows", ([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],), False, + False, (_MutateRowsIncomplete,), ), ], @@ -1426,6 +1433,7 @@ async def test_customizable_retryable_errors( expected_retryables, fn_name, fn_args, + is_read_rows_fn, is_stream, extra_retryables, ): @@ -1438,8 +1446,18 @@ async def test_customizable_retryable_errors( retry_fn += "_stream" if CrossSync.is_async: retry_fn = f"CrossSync.{retry_fn}" + subpackage = "_async" else: retry_fn = f"CrossSync._Sync_Impl.{retry_fn}" + subpackage = "_sync_autogen" + + # Read Rows has its own custom predicate builder that also takes in + # a list of exceptions + if is_read_rows_fn: + predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._read_rows_predicate_with_exceptions" + else: + predicate_builder = "google.api_core.retry.if_exception_type" + with mock.patch( f"google.cloud.bigtable.data._cross_sync.{retry_fn}" ) as retry_fn_mock: @@ -1447,9 +1465,7 @@ async def test_customizable_retryable_errors( table = client.get_table("instance-id", "table-id") expected_predicate = expected_retryables.__contains__ retry_fn_mock.side_effect = RuntimeError("stop early") - with mock.patch( - "google.api_core.retry.if_exception_type" - ) as predicate_builder_mock: + with mock.patch(predicate_builder) as predicate_builder_mock: predicate_builder_mock.return_value = expected_predicate with pytest.raises(Exception): # we expect an exception from attempting to call the mock @@ -1460,7 +1476,8 @@ async def test_customizable_retryable_errors( *expected_retryables, *extra_retryables ) retry_call_args = retry_fn_mock.call_args_list[0].args - # output of if_exception_type should be sent in to retry constructor + + # output of the predicate builder should be sent in to retry constructor assert retry_call_args[1] is expected_predicate @pytest.mark.parametrize( diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index c2a15a991..64dcebe3f 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -1101,19 +1101,20 @@ def test_ctor_invalid_timeout_values(self): client.close() @pytest.mark.parametrize( - "fn_name,fn_args,is_stream,extra_retryables", + "fn_name,fn_args,is_read_rows_fn,is_stream,extra_retryables", [ - ("read_rows_stream", (ReadRowsQuery(),), True, ()), - ("read_rows", (ReadRowsQuery(),), True, ()), - ("read_row", (b"row_key",), True, ()), - ("read_rows_sharded", ([ReadRowsQuery()],), True, ()), - ("row_exists", (b"row_key",), True, ()), - ("sample_row_keys", (), False, ()), - ("mutate_row", (b"row_key", [DeleteAllFromRow()]), False, ()), + ("read_rows_stream", (ReadRowsQuery(),), True, True, ()), + ("read_rows", (ReadRowsQuery(),), True, True, ()), + ("read_row", (b"row_key",), True, True, ()), + ("read_rows_sharded", ([ReadRowsQuery()],), True, True, ()), + ("row_exists", (b"row_key",), True, True, ()), + ("sample_row_keys", (), False, False, ()), + ("mutate_row", (b"row_key", [DeleteAllFromRow()]), False, False, ()), ( "bulk_mutate_rows", ([mutations.RowMutationEntry(b"key", [DeleteAllFromRow()])],), False, + False, (_MutateRowsIncomplete,), ), ], @@ -1147,6 +1148,7 @@ def test_customizable_retryable_errors( expected_retryables, fn_name, fn_args, + is_read_rows_fn, is_stream, extra_retryables, ): @@ -1156,6 +1158,11 @@ def test_customizable_retryable_errors( if is_stream: retry_fn += "_stream" retry_fn = f"CrossSync._Sync_Impl.{retry_fn}" + subpackage = "_sync_autogen" + if is_read_rows_fn: + predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._read_rows_predicate_with_exceptions" + else: + predicate_builder = "google.api_core.retry.if_exception_type" with mock.patch( f"google.cloud.bigtable.data._cross_sync.{retry_fn}" ) as retry_fn_mock: @@ -1163,9 +1170,7 @@ def test_customizable_retryable_errors( table = client.get_table("instance-id", "table-id") expected_predicate = expected_retryables.__contains__ retry_fn_mock.side_effect = RuntimeError("stop early") - with mock.patch( - "google.api_core.retry.if_exception_type" - ) as predicate_builder_mock: + with mock.patch(predicate_builder) as predicate_builder_mock: predicate_builder_mock.return_value = expected_predicate with pytest.raises(Exception): test_fn = table.__getattribute__(fn_name) diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 96c726a20..79631924a 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -222,6 +222,22 @@ def test_get_timeouts_invalid(self, input_times): _helpers._align_timeouts(input_times[0], input_times[1]) +class TestReadRowsPredicateWithException: + @pytest.mark.parametrize( + "retryable_exceptions,exception,expected_is_retryable", + [ + ([core_exceptions.Aborted, core_exceptions.InternalServerError], core_exceptions.InternalServerError("Sorry"), True), + ([core_exceptions.Aborted, core_exceptions.InternalServerError], core_exceptions.DataLoss("Sorry"), False), + ([core_exceptions.ServiceUnavailable, core_exceptions.Aborted], core_exceptions.InternalServerError("Sorry"), False), + ([core_exceptions.ServiceUnavailable, core_exceptions.Aborted], core_exceptions.InternalServerError(_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]), True), + ([core_exceptions.InternalServerError, core_exceptions.Aborted], core_exceptions.InternalServerError(_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]), True), + ] + ) + def test_ctor_retryable_exceptions_predicate(self, retryable_exceptions, exception, expected_is_retryable): + predicate = _helpers._read_rows_predicate_with_exceptions(*retryable_exceptions) + assert predicate(exception) is expected_is_retryable + + class TestGetRetryableErrors: @pytest.mark.parametrize( "input_codes,input_table,expected", From e915411f981d1a115ed35b41ea0e5d9cea0d55a5 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 24 Feb 2026 22:44:19 +0000 Subject: [PATCH 2/4] addressed Gemini comments --- google/cloud/bigtable/data/_helpers.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index def90ce8a..00ed3a978 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -49,7 +49,7 @@ _DEFAULT_BIGTABLE_EMULATOR_CLIENT = "google-cloud-bigtable-emulator" # Internal error messages that can be retried during ReadRows. Internal error messages with this error -# text should be streated as Unavailable error messages with the same error text, and will therefore be +# text should be treated as Unavailable error messages with the same error text, and will therefore be # treated as Unavailable errors rather than Internal errors. _RETRYABLE_INTERNAL_ERROR_MESSAGES = ( "rst_stream", @@ -139,7 +139,7 @@ def _read_rows_predicate_with_exceptions(*exception_types: type[Exception]) -> C ServiceUnavailable errors and will retry them if the Unavailable exception is retryable. Args: - retryable_exceptions: tuple of Exception types to be retried during operation + exception_types: Exception types to be retried during operation Returns: Callable[[Exception], bool]: A retry predicate that takes in an exception and @@ -148,7 +148,7 @@ def _read_rows_predicate_with_exceptions(*exception_types: type[Exception]) -> C is_exception_type = retries.if_exception_type(*exception_types) def predicate(exception: Exception) -> bool: - return (isinstance(exception, core_exceptions.InternalServerError) and exception.message.lower() in _RETRYABLE_INTERNAL_ERROR_MESSAGES) or is_exception_type(exception) + return (isinstance(exception, core_exceptions.InternalServerError) and any(m in exception.message.lower() for m in _RETRYABLE_INTERNAL_ERROR_MESSAGES)) or is_exception_type(exception) # Treating RST_STREAM internal errors as unavailable errors is only done if ServiceUnavailable is one of the # given exception types. If InternalServerError is also a retryable exception, we don't necessarily need the From b266a2920d69c41403b56d75a884883593095820 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Thu, 12 Feb 2026 10:30:56 -0800 Subject: [PATCH 3/4] chore: remove 3.9 tests from kokoro (#1293) 3.9 has been removed from the kokoro base image, so we should remove the tests from our configs 3.9 is still tested in GitHub Actions --- .kokoro/presubmit/system.cfg | 4 ++-- noxfile.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/.kokoro/presubmit/system.cfg b/.kokoro/presubmit/system.cfg index b8ae66b37..30956a3ab 100644 --- a/.kokoro/presubmit/system.cfg +++ b/.kokoro/presubmit/system.cfg @@ -3,5 +3,5 @@ # Only run this nox session. env_vars: { key: "NOX_SESSION" - value: "system-3.9" -} \ No newline at end of file + value: "system-3.10" +} diff --git a/noxfile.py b/noxfile.py index a182bafba..b2696ed2b 100644 --- a/noxfile.py +++ b/noxfile.py @@ -59,7 +59,7 @@ UNIT_TEST_EXTRAS: List[str] = [] UNIT_TEST_EXTRAS_BY_PYTHON: Dict[str, List[str]] = {} -SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.9", "3.14"] +SYSTEM_TEST_PYTHON_VERSIONS: List[str] = ["3.10", "3.14"] SYSTEM_TEST_STANDARD_DEPENDENCIES: List[str] = [ "mock", "pytest", @@ -79,7 +79,6 @@ # 'docfx' is excluded since it only needs to run in 'docs-presubmit' nox.options.sessions = [ - "unit-3.9", "unit-3.10", "unit-3.11", "unit-3.12", From 6c716b24f34a5f80bd3ca4e1b56dc4ab44658fc7 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Wed, 25 Feb 2026 16:56:51 +0000 Subject: [PATCH 4/4] Renamed and cleaned up predicate --- .../cloud/bigtable/data/_async/_read_rows.py | 6 +-- google/cloud/bigtable/data/_helpers.py | 32 +++++++------- .../bigtable/data/_sync_autogen/_read_rows.py | 4 +- google/cloud/bigtable/data/exceptions.py | 2 +- tests/unit/data/_async/test_client.py | 2 +- tests/unit/data/_sync_autogen/test_client.py | 2 +- tests/unit/data/test__helpers.py | 44 +++++++++++++++---- 7 files changed, 58 insertions(+), 34 deletions(-) diff --git a/google/cloud/bigtable/data/_async/_read_rows.py b/google/cloud/bigtable/data/_async/_read_rows.py index 05d2c436c..16c0bbf95 100644 --- a/google/cloud/bigtable/data/_async/_read_rows.py +++ b/google/cloud/bigtable/data/_async/_read_rows.py @@ -29,10 +29,8 @@ from google.cloud.bigtable.data.exceptions import _ResetRow from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory -from google.cloud.bigtable.data._helpers import _read_rows_predicate_with_exceptions +from google.cloud.bigtable.data._helpers import _rst_stream_aware_predicate -from google.api_core import retry as retries -from google.api_core import exceptions as core_exceptions from google.api_core.retry import exponential_sleep_generator from google.cloud.bigtable.data._cross_sync import CrossSync @@ -100,7 +98,7 @@ def __init__( else: self.request = query._to_pb(target) self.target = target - self._predicate = _read_rows_predicate_with_exceptions(*retryable_exceptions) + self._predicate = _rst_stream_aware_predicate(*retryable_exceptions) self._last_yielded_row_key: bytes | None = None self._remaining_count: int | None = self.request.rows_limit or None diff --git a/google/cloud/bigtable/data/_helpers.py b/google/cloud/bigtable/data/_helpers.py index 00ed3a978..f88fa3078 100644 --- a/google/cloud/bigtable/data/_helpers.py +++ b/google/cloud/bigtable/data/_helpers.py @@ -132,31 +132,31 @@ def _retry_exception_factory( return source_exc, cause_exc -def _read_rows_predicate_with_exceptions(*exception_types: type[Exception]) -> Callable[[Exception], bool]: - """A custom retry predicate for ReadRows. - +def _rst_stream_aware_predicate( + *exception_types: type[Exception], +) -> Callable[[Exception], bool]: + """A custom retry predicate. + This predicate treats Internal error messages with RST_STREAM errors as ServiceUnavailable errors and will retry them if the Unavailable exception is retryable. Args: exception_types: Exception types to be retried during operation - + Returns: Callable[[Exception], bool]: A retry predicate that takes in an exception and returns whether or not that exception is retryable """ - is_exception_type = retries.if_exception_type(*exception_types) - - def predicate(exception: Exception) -> bool: - return (isinstance(exception, core_exceptions.InternalServerError) and any(m in exception.message.lower() for m in _RETRYABLE_INTERNAL_ERROR_MESSAGES)) or is_exception_type(exception) - - # Treating RST_STREAM internal errors as unavailable errors is only done if ServiceUnavailable is one of the - # given exception types. If InternalServerError is also a retryable exception, we don't necessarily need the - # custom predicate either. - if core_exceptions.ServiceUnavailable in exception_types and core_exceptions.InternalServerError not in exception_types: - return predicate - - return is_exception_type + # predicate to check for retryable error types + if_exception_type = retries.if_exception_type(*exception_types) + # special case: treat InternalServerError with rst_stream error message as ServiceUnavailable + rst_check = ( + lambda e: core_exceptions.ServiceUnavailable in exception_types + and isinstance(e, core_exceptions.InternalServerError) + and any(m in e.message.lower() for m in _RETRYABLE_INTERNAL_ERROR_MESSAGES) + ) + + return lambda e: if_exception_type(e) or rst_check(e) def _get_timeouts( diff --git a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py index 752e0a3a4..f17a7e446 100644 --- a/google/cloud/bigtable/data/_sync_autogen/_read_rows.py +++ b/google/cloud/bigtable/data/_sync_autogen/_read_rows.py @@ -29,7 +29,7 @@ from google.cloud.bigtable.data.exceptions import _ResetRow from google.cloud.bigtable.data._helpers import _attempt_timeout_generator from google.cloud.bigtable.data._helpers import _retry_exception_factory -from google.cloud.bigtable.data._helpers import _read_rows_predicate_with_exceptions +from google.cloud.bigtable.data._helpers import _rst_stream_aware_predicate from google.api_core.retry import exponential_sleep_generator from google.cloud.bigtable.data._cross_sync import CrossSync @@ -88,7 +88,7 @@ def __init__( else: self.request = query._to_pb(target) self.target = target - self._predicate = _read_rows_predicate_with_exceptions(*retryable_exceptions) + self._predicate = _rst_stream_aware_predicate(*retryable_exceptions) self._last_yielded_row_key: bytes | None = None self._remaining_count: int | None = self.request.rows_limit or None diff --git a/google/cloud/bigtable/data/exceptions.py b/google/cloud/bigtable/data/exceptions.py index a5e6d31ba..4597a8d1d 100644 --- a/google/cloud/bigtable/data/exceptions.py +++ b/google/cloud/bigtable/data/exceptions.py @@ -142,7 +142,7 @@ def __repr__(self): # TODO: When working on mutations batcher, rework exception handling to guarantee that -# MutationsExceptionGroup only stores FailedMutationEntryErrors. +# MutationsExceptionGroup only stores FailedMutationEntryErrors. class MutationsExceptionGroup(_BigtableExceptionGroup): """ Represents one or more exceptions that occur during a bulk mutation operation diff --git a/tests/unit/data/_async/test_client.py b/tests/unit/data/_async/test_client.py index 2b77028bd..55cbd169d 100644 --- a/tests/unit/data/_async/test_client.py +++ b/tests/unit/data/_async/test_client.py @@ -1454,7 +1454,7 @@ async def test_customizable_retryable_errors( # Read Rows has its own custom predicate builder that also takes in # a list of exceptions if is_read_rows_fn: - predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._read_rows_predicate_with_exceptions" + predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._rst_stream_aware_predicate" else: predicate_builder = "google.api_core.retry.if_exception_type" diff --git a/tests/unit/data/_sync_autogen/test_client.py b/tests/unit/data/_sync_autogen/test_client.py index 64dcebe3f..c844d04b3 100644 --- a/tests/unit/data/_sync_autogen/test_client.py +++ b/tests/unit/data/_sync_autogen/test_client.py @@ -1160,7 +1160,7 @@ def test_customizable_retryable_errors( retry_fn = f"CrossSync._Sync_Impl.{retry_fn}" subpackage = "_sync_autogen" if is_read_rows_fn: - predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._read_rows_predicate_with_exceptions" + predicate_builder = f"google.cloud.bigtable.data.{subpackage}._read_rows._rst_stream_aware_predicate" else: predicate_builder = "google.api_core.retry.if_exception_type" with mock.patch( diff --git a/tests/unit/data/test__helpers.py b/tests/unit/data/test__helpers.py index 79631924a..20ad972d7 100644 --- a/tests/unit/data/test__helpers.py +++ b/tests/unit/data/test__helpers.py @@ -222,19 +222,45 @@ def test_get_timeouts_invalid(self, input_times): _helpers._align_timeouts(input_times[0], input_times[1]) -class TestReadRowsPredicateWithException: +class TestRstStreamAwarePredicate: @pytest.mark.parametrize( "retryable_exceptions,exception,expected_is_retryable", [ - ([core_exceptions.Aborted, core_exceptions.InternalServerError], core_exceptions.InternalServerError("Sorry"), True), - ([core_exceptions.Aborted, core_exceptions.InternalServerError], core_exceptions.DataLoss("Sorry"), False), - ([core_exceptions.ServiceUnavailable, core_exceptions.Aborted], core_exceptions.InternalServerError("Sorry"), False), - ([core_exceptions.ServiceUnavailable, core_exceptions.Aborted], core_exceptions.InternalServerError(_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]), True), - ([core_exceptions.InternalServerError, core_exceptions.Aborted], core_exceptions.InternalServerError(_helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0]), True), - ] + ( + [core_exceptions.Aborted, core_exceptions.InternalServerError], + core_exceptions.InternalServerError("Sorry"), + True, + ), + ( + [core_exceptions.Aborted, core_exceptions.InternalServerError], + core_exceptions.DataLoss("Sorry"), + False, + ), + ( + [core_exceptions.ServiceUnavailable, core_exceptions.Aborted], + core_exceptions.InternalServerError("Sorry"), + False, + ), + ( + [core_exceptions.ServiceUnavailable, core_exceptions.Aborted], + core_exceptions.InternalServerError( + _helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0] + ), + True, + ), + ( + [core_exceptions.InternalServerError, core_exceptions.Aborted], + core_exceptions.InternalServerError( + _helpers._RETRYABLE_INTERNAL_ERROR_MESSAGES[0] + ), + True, + ), + ], ) - def test_ctor_retryable_exceptions_predicate(self, retryable_exceptions, exception, expected_is_retryable): - predicate = _helpers._read_rows_predicate_with_exceptions(*retryable_exceptions) + def test_rst_stream_aware_predicate( + self, retryable_exceptions, exception, expected_is_retryable + ): + predicate = _helpers._rst_stream_aware_predicate(*retryable_exceptions) assert predicate(exception) is expected_is_retryable