From f0f3839fe3b9f7b03a1f1d2ecda8ee82e15e3951 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Thu, 19 Feb 2026 14:00:39 +0500 Subject: [PATCH 1/3] Fix type checking failure with Python 3.10+ union pipe syntax (int | None) The normalize() function did not handle types.UnionType, causing is_consistent_with() to fall through to issubclass() with a non-class argument. Route types.UnionType through convert_to_beam_type() which already knows how to convert it to Beam's UnionConstraint. Fixes #36592 --- .../python/apache_beam/typehints/typehints.py | 6 ++++ .../apache_beam/typehints/typehints_test.py | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index d0dfaec23afc..26040fd17047 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1462,6 +1462,12 @@ def normalize(x, none_as_type=False): # Convert bare builtin types to correct type hints directly elif x in _KNOWN_PRIMITIVE_TYPES: return _KNOWN_PRIMITIVE_TYPES[x] + elif isinstance(x, types.UnionType): + beam_type = native_type_compatibility.convert_to_beam_type(x) + if beam_type != x: + return beam_type + else: + return Any elif getattr(x, '__module__', None) in ('typing', 'collections', 'collections.abc') or getattr( x, '__origin__', None) in _KNOWN_PRIMITIVE_TYPES: diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index cec830380087..3eb20147348c 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -1596,6 +1596,25 @@ def test_hint_helper(self): self.assertFalse(is_consistent_with(Union[str, int], str)) self.assertFalse(is_consistent_with(str, NonBuiltInGeneric[str])) + def test_hint_helper_pipe_union(self): + # GH issue: https://github.com/apache/beam/issues/36592 + # Python 3.10+ pipe operator union types (types.UnionType) should work + # in is_consistent_with just like typing.Union. + pipe_union = int | None # pylint: disable=unsupported-binary-operation + typing_union = Union[int, None] + self.assertTrue(is_consistent_with(int, pipe_union)) + self.assertTrue(is_consistent_with(type(None), pipe_union)) + self.assertFalse(is_consistent_with(str, pipe_union)) + self.assertTrue( + is_consistent_with(int, pipe_union) == + is_consistent_with(int, typing_union)) + self.assertTrue( + is_consistent_with(str, pipe_union) == + is_consistent_with(str, typing_union)) + pipe_union_2 = int | float # pylint: disable=unsupported-binary-operation + self.assertTrue(is_consistent_with(int, pipe_union_2)) + self.assertTrue(is_consistent_with(float, pipe_union_2)) + def test_positional_arg_hints(self): self.assertEqual(typehints.Any, _positional_arg_hints('x', {})) self.assertEqual(int, _positional_arg_hints('x', {'x': int})) @@ -1934,6 +1953,16 @@ def test_pipe_operator_as_union(self): native_type_compatibility.convert_to_beam_type(type_a), native_type_compatibility.convert_to_beam_type(type_b)) + def test_normalize_pipe_union(self): + # GH issue: https://github.com/apache/beam/issues/36592 + # normalize() should convert types.UnionType to Beam's UnionConstraint. + pipe_union = int | None # pylint: disable=unsupported-binary-operation + normalized = typehints.normalize(pipe_union) + self.assertIsInstance(normalized, typehints.UnionConstraint) + pipe_union_2 = int | float # pylint: disable=unsupported-binary-operation + normalized_2 = typehints.normalize(pipe_union_2) + self.assertIsInstance(normalized_2, typehints.UnionConstraint) + class TestNonBuiltInGenerics(unittest.TestCase): def test_no_error_thrown(self): From d0c5302d2054e59ad549cad473bfdea7107d95cd Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Thu, 19 Feb 2026 14:50:07 +0500 Subject: [PATCH 2/3] Fix yapf formatting in test file --- sdks/python/apache_beam/typehints/typehints_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index 3eb20147348c..24f2cc94a909 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -1606,11 +1606,11 @@ def test_hint_helper_pipe_union(self): self.assertTrue(is_consistent_with(type(None), pipe_union)) self.assertFalse(is_consistent_with(str, pipe_union)) self.assertTrue( - is_consistent_with(int, pipe_union) == - is_consistent_with(int, typing_union)) + is_consistent_with(int, pipe_union) == is_consistent_with( + int, typing_union)) self.assertTrue( - is_consistent_with(str, pipe_union) == - is_consistent_with(str, typing_union)) + is_consistent_with(str, pipe_union) == is_consistent_with( + str, typing_union)) pipe_union_2 = int | float # pylint: disable=unsupported-binary-operation self.assertTrue(is_consistent_with(int, pipe_union_2)) self.assertTrue(is_consistent_with(float, pipe_union_2)) From 701dbd8cf17d534422aa53b0f4ce9e079c806994 Mon Sep 17 00:00:00 2001 From: junaiddshaukat Date: Thu, 19 Feb 2026 20:37:22 +0500 Subject: [PATCH 3/3] Address review: merge UnionType check into existing elif, remove issue link comments --- sdks/python/apache_beam/typehints/typehints.py | 13 ++++--------- sdks/python/apache_beam/typehints/typehints_test.py | 5 ----- 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py index 26040fd17047..f429935c3a0e 100644 --- a/sdks/python/apache_beam/typehints/typehints.py +++ b/sdks/python/apache_beam/typehints/typehints.py @@ -1462,15 +1462,10 @@ def normalize(x, none_as_type=False): # Convert bare builtin types to correct type hints directly elif x in _KNOWN_PRIMITIVE_TYPES: return _KNOWN_PRIMITIVE_TYPES[x] - elif isinstance(x, types.UnionType): - beam_type = native_type_compatibility.convert_to_beam_type(x) - if beam_type != x: - return beam_type - else: - return Any - elif getattr(x, '__module__', - None) in ('typing', 'collections', 'collections.abc') or getattr( - x, '__origin__', None) in _KNOWN_PRIMITIVE_TYPES: + elif isinstance(x, types.UnionType) or getattr( + x, '__module__', + None) in ('typing', 'collections', 'collections.abc') or getattr( + x, '__origin__', None) in _KNOWN_PRIMITIVE_TYPES: beam_type = native_type_compatibility.convert_to_beam_type(x) if beam_type != x: # We were able to do the conversion. diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py index 24f2cc94a909..1377bea6d56d 100644 --- a/sdks/python/apache_beam/typehints/typehints_test.py +++ b/sdks/python/apache_beam/typehints/typehints_test.py @@ -1597,9 +1597,6 @@ def test_hint_helper(self): self.assertFalse(is_consistent_with(str, NonBuiltInGeneric[str])) def test_hint_helper_pipe_union(self): - # GH issue: https://github.com/apache/beam/issues/36592 - # Python 3.10+ pipe operator union types (types.UnionType) should work - # in is_consistent_with just like typing.Union. pipe_union = int | None # pylint: disable=unsupported-binary-operation typing_union = Union[int, None] self.assertTrue(is_consistent_with(int, pipe_union)) @@ -1954,8 +1951,6 @@ def test_pipe_operator_as_union(self): native_type_compatibility.convert_to_beam_type(type_b)) def test_normalize_pipe_union(self): - # GH issue: https://github.com/apache/beam/issues/36592 - # normalize() should convert types.UnionType to Beam's UnionConstraint. pipe_union = int | None # pylint: disable=unsupported-binary-operation normalized = typehints.normalize(pipe_union) self.assertIsInstance(normalized, typehints.UnionConstraint)