From 62b120a988c8a34229bffd17113f8a2bb34a8116 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 12 Feb 2026 15:37:17 +0000 Subject: [PATCH 1/5] fix mixed types --- sdks/python/apache_beam/yaml/yaml_provider.py | 17 +++++++++++++++++ .../apache_beam/yaml/yaml_provider_unit_test.py | 14 ++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index e9882602d100..080b6e8755d0 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -878,6 +878,23 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): if not isinstance(elements, Iterable) or isinstance(elements, (dict, str)): raise TypeError('elements must be a list of elements') + # Validated that we have some elements. + if elements: + # Normalize elements to be all dicts or all primitives. + # If we have a mix, we want to treat them all as dicts for the purpose + # of schema inference (so we can have a schema like + # Row(element=..., other_field=...)). + # Note that we don't want to change the elements themselves if they + # are already all dicts or all primitives, as that would change the + # resulting schema (e.g. from int to Row(element=int)). + is_dict = [isinstance(e, dict) for e in elements] + if not all(is_dict) and any(is_dict): + elements = [ + e if isinstance(e, dict) else { + 'element': e + } for e in elements + ] + # Check if elements have different keys updated_elements = elements if elements and all(isinstance(e, dict) for e in elements): diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py index 1ebae9a3b446..fc3ea0c14967 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py @@ -364,3 +364,17 @@ def test_empty_base(self): if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() + + +class YamlProvidersCreateTest(unittest.TestCase): + def test_create_mixed_types(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + # A mix of a primitive (Row(element=1)) and a dict (Row(a=2)) + result = p | YamlProviders.create([1, {"a": 2}]) + assert_that( + result | beam.Map(lambda x: sorted(x._asdict().items())), + equal_to([ + [('a', None), ('element', 1)], + [('a', 2), ('element', None)], + ])) From 7e73ef0e968c895a2ea645c919b74786eaae96dc Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 12 Feb 2026 20:02:40 +0000 Subject: [PATCH 2/5] add another create pipeline --- sdks/python/apache_beam/yaml/tests/create.yaml | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml b/sdks/python/apache_beam/yaml/tests/create.yaml index bf346f7667c8..6cd7807681c0 100644 --- a/sdks/python/apache_beam/yaml/tests/create.yaml +++ b/sdks/python/apache_beam/yaml/tests/create.yaml @@ -138,3 +138,21 @@ pipelines: - {sdk: MapReduce, year: 2004} - {sdk: MillWheel, year: 2008} + + # Simple Create with mixed types + - pipeline: + type: chain + transforms: + - type: Create + config: + elements: + - 1 + - {a: 2, c: "hello"} + - 3 + - type: AssertEqual + config: + elements: + - {element: 1, a: null, c: null} + - {element: null, a: 2, c: "hello"} + - {element: 3, a: null, c: null} + From 0be36e698d906e4ad3c705720e986d5d620f1180 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 12 Feb 2026 20:19:18 +0000 Subject: [PATCH 3/5] remove comment --- sdks/python/apache_beam/yaml/yaml_provider.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 080b6e8755d0..1da6f4539751 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -878,7 +878,6 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): if not isinstance(elements, Iterable) or isinstance(elements, (dict, str)): raise TypeError('elements must be a list of elements') - # Validated that we have some elements. if elements: # Normalize elements to be all dicts or all primitives. # If we have a mix, we want to treat them all as dicts for the purpose From 2fc9dd721ff08ef5e6a4a7af57fbdfc47035c567 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Thu, 12 Feb 2026 23:35:28 +0000 Subject: [PATCH 4/5] address gemini comment about performance --- sdks/python/apache_beam/yaml/yaml_provider.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index 1da6f4539751..c8c63aaba6d3 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -886,8 +886,17 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): # Note that we don't want to change the elements themselves if they # are already all dicts or all primitives, as that would change the # resulting schema (e.g. from int to Row(element=int)). - is_dict = [isinstance(e, dict) for e in elements] - if not all(is_dict) and any(is_dict): + has_dict = False + has_non_dict = False + for e in elements: + if isinstance(e, dict): + has_dict = True + else: + has_non_dict = True + if has_dict and has_non_dict: + break + + if has_dict and has_non_dict: elements = [ e if isinstance(e, dict) else { 'element': e From 0cbdff443d968a8e0c5d0235f1c5cee03bbdc9a6 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 13 Feb 2026 02:53:46 +0000 Subject: [PATCH 5/5] address doc string and cloud pickle comments --- sdks/python/apache_beam/yaml/yaml_provider.py | 19 +++++++++++++------ .../yaml/yaml_provider_unit_test.py | 3 +-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py b/sdks/python/apache_beam/yaml/yaml_provider.py index c8c63aaba6d3..5a3ccf6b0c2e 100755 --- a/sdks/python/apache_beam/yaml/yaml_provider.py +++ b/sdks/python/apache_beam/yaml/yaml_provider.py @@ -864,6 +864,19 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): str: "bar" values: [4, 5, 6] + If the elements are a mix of dicts and non-dicts, the non-dict elements + will be wrapped in a Row with a single field "element". For example:: + + type: Create + config: + elements: [1, {"a": 2}] + + will result in an output with two elements with a schema of + Row(element=int, a=int) looking like: + + Row(element=1, a=None) + Row(element=None, a=2) + Args: elements: The set of elements that should belong to the PCollection. YAML/JSON-style mappings will be interpreted as Beam rows. @@ -880,12 +893,6 @@ def create(elements: Iterable[Any], reshuffle: Optional[bool] = True): if elements: # Normalize elements to be all dicts or all primitives. - # If we have a mix, we want to treat them all as dicts for the purpose - # of schema inference (so we can have a schema like - # Row(element=..., other_field=...)). - # Note that we don't want to change the elements themselves if they - # are already all dicts or all primitives, as that would change the - # resulting schema (e.g. from int to Row(element=int)). has_dict = False has_non_dict = False for e in elements: diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py index fc3ea0c14967..e1e3ee847d96 100644 --- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py +++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py @@ -368,8 +368,7 @@ def test_empty_base(self): class YamlProvidersCreateTest(unittest.TestCase): def test_create_mixed_types(self): - with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( - pickle_library='cloudpickle')) as p: + with beam.Pipeline() as p: # A mix of a primitive (Row(element=1)) and a dict (Row(a=2)) result = p | YamlProviders.create([1, {"a": 2}]) assert_that(