Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions sdks/python/apache_beam/runners/worker/data_sampler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,10 @@ def test_samples_all_with_both_experiments(self):
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)
# N.B. the order of the samplers is not guaranteed due to Protobuf not
# guaranteeing map iteration order.
first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
Expand All @@ -393,19 +395,25 @@ def test_samples_all_with_both_experiments(self):
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')
first_sampler.sample_exception(
'first', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True
second_sampler.element_sampler.el = 'second'
second_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples(['a', 'b'])
self.assertEqual(len(samples.element_samples), 2)
self.assertTrue(
samples.element_samples['a'].elements[0].HasField('exception'))
self.assertFalse(
samples.element_samples['b'].elements[0].HasField('exception'))
sample_elements = list(
s.elements[0] for s in samples.element_samples.values())
num_exceptions = sum(
1 for element in sample_elements if element.HasField('exception'))
self.assertEqual(
num_exceptions,
1,
"Only one of the samples should have an exception, found: {}".format(
sample_elements))

def test_only_sample_exceptions(self):
"""Tests that the exception sampling experiment only samples exceptions."""
Expand All @@ -420,8 +428,10 @@ def test_only_sample_exceptions(self):
MAIN_TRANSFORM_ID, descriptor, self.primitives_coder_factory)

# Get the samples for the two outputs.
a_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
b_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)
# N.B. the order of the samplers is not guaranteed due to Protobuf not
# guaranteeing map iteration order.
first_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 0)
second_sampler = self.data_sampler.sampler_for_output(MAIN_TRANSFORM_ID, 1)

# Sample an exception for the output 'a', this will show up in the final
# samples response.
Expand All @@ -430,16 +440,18 @@ def test_only_sample_exceptions(self):
raise Exception('test')
except Exception:
exc_info = sys.exc_info()
a_sampler.sample_exception('a', exc_info, MAIN_TRANSFORM_ID, 'instid')
first_sampler.sample_exception(
'first', exc_info, MAIN_TRANSFORM_ID, 'instid')

# Sample a normal element for the output 'b', this will not show up in the
# final samples response.
b_sampler.element_sampler.el = 'b'
b_sampler.element_sampler.has_element = True
second_sampler.element_sampler.el = 'second'
second_sampler.element_sampler.has_element = True

samples = self.data_sampler.wait_for_samples([])
self.assertEqual(len(samples.element_samples), 1)
self.assertIsNotNone(samples.element_samples['a'].elements[0].exception)
value = list(samples.element_samples.values())[0]
self.assertIsNotNone(value.elements[0].exception)


class OutputSamplerTest(unittest.TestCase):
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def get_portability_package_data():
#
# 3. Exclude protobuf 4 versions that leak memory, see:
# https://github.com/apache/beam/issues/28246
'protobuf>=3.20.3,<6.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long
'protobuf>=3.20.3,<7.0.0.dev0,!=4.0.*,!=4.21.*,!=4.22.0,!=4.23.*,!=4.24.*', # pylint: disable=line-too-long
'pydot>=1.2.0,<2',
'python-dateutil>=2.8.0,<3',
'pytz>=2018.3',
Expand Down
Loading