diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index ddcb80630487..b1c607457f9f 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -1512,7 +1512,7 @@ def _create_impl(self): def is_deterministic(self) -> bool: return ( - all(c.is_deterministic for _, c in self._coder_types) and ( + all(c.is_deterministic() for _, c in self._coder_types) and ( self._fallback_coder is None or self._fallback_coder.is_deterministic())) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index fcc5e6ac58bf..ad742665fb8a 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -1082,6 +1082,20 @@ def test_OrderedUnionCoder(self): self.check_coder(test_coder, 123) self.check_coder(test_coder, 1.5) + def test_OrderedUnionCoderDeterministic(self): + # CustomCoder is not deterministic therefore test_coder is not + # deterministic + test_coder = coders._OrderedUnionCoder((str, coders.StrUtf8Coder()), + (int, CustomCoder()), + fallback_coder=coders.FloatCoder()) + + self.assertFalse(test_coder.is_deterministic()) + + test_coder = coders._OrderedUnionCoder((str, coders.StrUtf8Coder()), + (int, coders.VarIntCoder()), + fallback_coder=coders.FloatCoder()) + self.assertTrue(test_coder.is_deterministic()) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)