diff --git a/sdks/python/apache_beam/runners/worker/data_sampler_test.py b/sdks/python/apache_beam/runners/worker/data_sampler_test.py index 8c47315b7a9e..47b6cca880d3 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler_test.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler_test.py @@ -383,8 +383,10 @@ def test_samples_all_with_both_experiments(self): MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) # Get the samples for the two outputs. - a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) - b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + # N.B. the order of the samplers is not guaranteed due to Protobuf not + # guaranteeing map iteration order. + first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) # Sample an exception for the output 'a', this will show up in the final # samples response. @@ -393,19 +395,25 @@ def test_samples_all_with_both_experiments(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception( + 'first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response. - b_sampler.element_sampler.el = 'b' - b_sampler.element_sampler.has_element = True + second_sampler.element_sampler.el = 'second' + second_sampler.element_sampler.has_element = True samples = self.data_sampler.wait_for_samples(['a', 'b']) self.assertEqual(len(samples.element_samples), 2) - self.assertTrue( - samples.element_samples['a'].elements[0].HasField('exception')) - self.assertFalse( - samples.element_samples['b'].elements[0].HasField('exception')) + sample_elements = list( + s.elements[0] for s in samples.element_samples.values()) + num_exceptions = sum( + 1 for element in sample_elements if element.HasField('exception')) + self.assertEqual( + num_exceptions, + 1, + "Only one of the samples should have an exception, found: {}".format( + sample_elements)) def test_only_sample_exceptions(self): """Tests that the exception sampling experiment only samples exceptions.""" @@ -420,8 +428,10 @@ def test_only_sample_exceptions(self): MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) # Get the samples for the two outputs. - a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) - b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + # N.B. the order of the samplers is not guaranteed due to Protobuf not + # guaranteeing map iteration order. + first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) # Sample an exception for the output 'a', this will show up in the final # samples response. @@ -430,16 +440,18 @@ def test_only_sample_exceptions(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception( + 'first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response. - b_sampler.element_sampler.el = 'b' - b_sampler.element_sampler.has_element = True + second_sampler.element_sampler.el = 'second' + second_sampler.element_sampler.has_element = True samples = self.data_sampler.wait_for_samples([]) self.assertEqual(len(samples.element_samples), 1) - self.assertIsNotNone(samples.element_samples['a'].elements[0].exception) + value = list(samples.element_samples.values())[0] + self.assertIsNotNone(value.elements[0].exception) class OutputSamplerTest(unittest.TestCase): diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 87765f3fce16..fd16ac7accdd 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -390,7 +390,7 @@ def get_portability_package_data(): # # 3. Exclude protobuf 4 versions that leak memory, see: # https://github.com/apache/beam/issues/28246 - 'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long + 'protobuf>=3.20.3,<7.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long 'pydot>=1.2.0,<2', 'python-dateutil>=2.8.0,<3', 'pytz>=2018.3',