From 85b3c82b3c203a02ee5ade99bbe9ba5fba39796b Mon Sep 17 00:00:00 2001 From: Yilei Yang Date: Sun, 29 Jun 2025 10:10:48 -0700 Subject: [PATCH 1/3] Support Protobuf 6.x. --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 87765f3fce16..fd16ac7accdd 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -390,7 +390,7 @@ def get_portability_package_data(): # # 3. Exclude protobuf 4 versions that leak memory, see: # https://github.com/apache/beam/issues/28246 - 'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long + 'protobuf>=3.20.3,<7.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long 'pydot>=1.2.0,<2', 'python-dateutil>=2.8.0,<3', 'pytz>=2018.3', From 2eed5fa5bda01cacbbbafd504a2104c55b285d10 Mon Sep 17 00:00:00 2001 From: Yilei Yang Date: Sun, 17 Aug 2025 02:06:14 +0800 Subject: [PATCH 2/3] Fix these flaky tests due to Protobuf not guaranteeing map iteration order. --- .../runners/worker/data_sampler_test.py | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_sampler_test.py b/sdks/python/apache_beam/runners/worker/data_sampler_test.py index 8c47315b7a9e..a708194faccd 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler_test.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler_test.py @@ -383,8 +383,10 @@ def test_samples_all_with_both_experiments(self): MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) # Get the samples for the two outputs. - a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) - b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + # N.B. the order of the samplers is not guaranteed due to Protobuf not + # guaranteeing map iteration order. + first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) # Sample an exception for the output 'a', this will show up in the final # samples response. @@ -393,19 +395,22 @@ def test_samples_all_with_both_experiments(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception('first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response. - b_sampler.element_sampler.el = 'b' - b_sampler.element_sampler.has_element = True + second_sampler.element_sampler.el = 'second' + second_sampler.element_sampler.has_element = True samples = self.data_sampler.wait_for_samples(['a', 'b']) self.assertEqual(len(samples.element_samples), 2) - self.assertTrue( - samples.element_samples['a'].elements[0].HasField('exception')) - self.assertFalse( - samples.element_samples['b'].elements[0].HasField('exception')) + sample_elements = list(s.elements[0] for s in samples.element_samples.values()) + num_exceptions = sum( + 1 for element in sample_elements if element.HasField('exception')) + self.assertEqual( + num_exceptions, 1, + "Only one of the samples should have an exception, found: {}".format( + sample_elements)) def test_only_sample_exceptions(self): """Tests that the exception sampling experiment only samples exceptions.""" @@ -420,8 +425,10 @@ def test_only_sample_exceptions(self): MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory) # Get the samples for the two outputs. - a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) - b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) + # N.B. the order of the samplers is not guaranteed due to Protobuf not + # guaranteeing map iteration order. + first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0) + second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1) # Sample an exception for the output 'a', this will show up in the final # samples response. @@ -430,16 +437,17 @@ def test_only_sample_exceptions(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception('first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response. - b_sampler.element_sampler.el = 'b' - b_sampler.element_sampler.has_element = True + second_sampler.element_sampler.el = 'second' + second_sampler.element_sampler.has_element = True samples = self.data_sampler.wait_for_samples([]) self.assertEqual(len(samples.element_samples), 1) - self.assertIsNotNone(samples.element_samples['a'].elements[0].exception) + value = list(samples.element_samples.values())[0] + self.assertIsNotNone(value.elements[0].exception) class OutputSamplerTest(unittest.TestCase): From 78ef74e7beae7a200568c73472fb487f6650153f Mon Sep 17 00:00:00 2001 From: Yilei Yang Date: Sun, 17 Aug 2025 03:01:09 +0800 Subject: [PATCH 3/3] Run YAPF. --- .../apache_beam/runners/worker/data_sampler_test.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/data_sampler_test.py b/sdks/python/apache_beam/runners/worker/data_sampler_test.py index a708194faccd..47b6cca880d3 100644 --- a/sdks/python/apache_beam/runners/worker/data_sampler_test.py +++ b/sdks/python/apache_beam/runners/worker/data_sampler_test.py @@ -395,7 +395,8 @@ def test_samples_all_with_both_experiments(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - first_sampler.sample_exception('first', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception( + 'first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response. @@ -404,11 +405,13 @@ def test_samples_all_with_both_experiments(self): samples = self.data_sampler.wait_for_samples(['a', 'b']) self.assertEqual(len(samples.element_samples), 2) - sample_elements = list(s.elements[0] for s in samples.element_samples.values()) + sample_elements = list( + s.elements[0] for s in samples.element_samples.values()) num_exceptions = sum( 1 for element in sample_elements if element.HasField('exception')) self.assertEqual( - num_exceptions, 1, + num_exceptions, + 1, "Only one of the samples should have an exception, found: {}".format( sample_elements)) @@ -437,7 +440,8 @@ def test_only_sample_exceptions(self): raise Exception('test') except Exception: exc_info = sys.exc_info() - first_sampler.sample_exception('first', exc_info, MAIN_TRANSFORM_ID, 'instid') + first_sampler.sample_exception( + 'first', exc_info, MAIN_TRANSFORM_ID, 'instid') # Sample a normal element for the output 'b', this will not show up in the # final samples response.