Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PreCommit_Python_Dill.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
32 changes: 10 additions & 22 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,16 +927,16 @@ def _create_impl(self):

class DeterministicFastPrimitivesCoderV2(FastCoder):
"""Throws runtime errors when encoding non-deterministic values."""
def __init__(self, coder, step_label, update_compatibility_version=None):
def __init__(self, coder, step_label):
self._underlying_coder = coder
self._step_label = step_label
self._use_relative_filepaths = True
self._version_tag = "v2_69"
from apache_beam.transforms.util import is_v1_prior_to_v2

# Versions prior to 2.69.0 did not use relative filepaths.
if update_compatibility_version and is_v1_prior_to_v2(
v1=update_compatibility_version, v2="2.69.0"):
from apache_beam.options.pipeline_options_context import get_pipeline_options
opts = get_pipeline_options()
if opts and opts.is_compat_version_prior_to("2.69.0"):
self._version_tag = ""
self._use_relative_filepaths = False

Expand Down Expand Up @@ -1005,20 +1005,11 @@ def to_type_hint(self):
return Any


def _should_force_use_dill(registry):
# force_dill_deterministic_coders is for testing purposes. If there is a
# DeterministicFastPrimitivesCoder in the pipeline graph but the dill
# encoding path is not really triggered dill does not have to be installed.
# and this check can be skipped.
if getattr(registry, 'force_dill_deterministic_coders', False):
return True
def _should_force_use_dill():
from apache_beam.options.pipeline_options_context import get_pipeline_options

from apache_beam.transforms.util import is_v1_prior_to_v2
update_compat_version = registry.update_compatibility_version
if not update_compat_version:
return False

if not is_v1_prior_to_v2(v1=update_compat_version, v2="2.68.0"):
opts = get_pipeline_options()
if opts is None or not opts.is_compat_version_prior_to("2.68.0"):
return False

try:
Expand All @@ -1043,12 +1034,9 @@ def _update_compatible_deterministic_fast_primitives_coder(coder, step_label):
- In SDK version 2.69.0 cloudpickle is used to encode "special types" with
relative filepaths in code objects and dynamic functions.
"""
from apache_beam.coders import typecoders

if _should_force_use_dill(typecoders.registry):
if _should_force_use_dill():
return DeterministicFastPrimitivesCoder(coder, step_label)
return DeterministicFastPrimitivesCoderV2(
coder, step_label, typecoders.registry.update_compatibility_version)
return DeterministicFastPrimitivesCoderV2(coder, step_label)


class FastPrimitivesCoder(FastCoder):
Expand Down
194 changes: 98 additions & 96 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options_context import scoped_pipeline_options
from apache_beam.runners import pipeline_context
from apache_beam.transforms import userstate
from apache_beam.transforms import window
Expand Down Expand Up @@ -202,9 +204,6 @@ def tearDownClass(cls):
assert not standard - cls.seen, str(standard - cls.seen)
assert not cls.seen_nested - standard, str(cls.seen_nested - standard)

def tearDown(self):
typecoders.registry.update_compatibility_version = None

@classmethod
def _observe(cls, coder):
cls.seen.add(type(coder))
Expand Down Expand Up @@ -274,80 +273,82 @@ def test_deterministic_coder(self, compat_version):
- In SDK version >=2.69.0 cloudpickle is used to encode "special types"
with relative filepaths in code objects and dynamic functions.
"""
with scoped_pipeline_options(
PipelineOptions(update_compatibility_version=compat_version)):
coder = coders.FastPrimitivesCoder()
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
deterministic_coder = coder.as_deterministic_coder(step_label="step")

self.check_coder(deterministic_coder, *self.test_values_deterministic)
for v in self.test_values_deterministic:
self.check_coder(coders.TupleCoder((deterministic_coder, )), (v, ))
self.check_coder(
coders.TupleCoder(
(deterministic_coder, ) * len(self.test_values_deterministic)),
tuple(self.test_values_deterministic))

typecoders.registry.update_compatibility_version = compat_version
coder = coders.FastPrimitivesCoder()
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
deterministic_coder = coder.as_deterministic_coder(step_label="step")

self.check_coder(deterministic_coder, *self.test_values_deterministic)
for v in self.test_values_deterministic:
self.check_coder(coders.TupleCoder((deterministic_coder, )), (v, ))
self.check_coder(
coders.TupleCoder(
(deterministic_coder, ) * len(self.test_values_deterministic)),
tuple(self.test_values_deterministic))

self.check_coder(deterministic_coder, {})
self.check_coder(deterministic_coder, {2: 'x', 1: 'y'})
with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, {1: 'x', 'y': 2})
self.check_coder(deterministic_coder, [1, {}])
with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, [1, {1: 'x', 'y': 2}])

self.check_coder(
coders.TupleCoder((deterministic_coder, coder)), (1, {}), ('a', [{}]))
self.check_coder(deterministic_coder, {})
self.check_coder(deterministic_coder, {2: 'x', 1: 'y'})
with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, {1: 'x', 'y': 2})
self.check_coder(deterministic_coder, [1, {}])
with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, [1, {1: 'x', 'y': 2}])

self.check_coder(deterministic_coder, test_message.MessageA(field1='value'))
self.check_coder(
coders.TupleCoder((deterministic_coder, coder)), (1, {}), ('a', [{}]))

# Skip this test during cloudpickle. Dill monkey patches the __reduce__
# method for anonymous named tuples (MyNamedTuple) which is not pickleable.
# Since the test is parameterized the type gets colbbered.
if compat_version == "2.67.0":
self.check_coder(
deterministic_coder, [MyNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
deterministic_coder, test_message.MessageA(field1='value'))

self.check_coder(
deterministic_coder,
[AnotherNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])
# Skip this test during cloudpickle. Dill monkey patches the __reduce__
# method for anonymous named tuples (MyNamedTuple) which is not
# pickleable. Since the test is parameterized the type gets colbbered.
if compat_version == "2.67.0":
self.check_coder(
deterministic_coder,
[MyNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])

if dataclasses is not None:
self.check_coder(deterministic_coder, FrozenDataClass(1, 2))
self.check_coder(deterministic_coder, FrozenKwOnlyDataClass(c=1, d=2))
self.check_coder(
deterministic_coder, FrozenUnInitKwOnlyDataClass(side=11))
deterministic_coder,
[AnotherNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])

with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, UnFrozenDataClass(1, 2))

with self.assertRaises(TypeError):
if dataclasses is not None:
self.check_coder(deterministic_coder, FrozenDataClass(1, 2))
self.check_coder(deterministic_coder, FrozenKwOnlyDataClass(c=1, d=2))
self.check_coder(
deterministic_coder, FrozenDataClass(UnFrozenDataClass(1, 2), 3))
deterministic_coder, FrozenUnInitKwOnlyDataClass(side=11))

with self.assertRaises(TypeError):
self.check_coder(
deterministic_coder,
AnotherNamedTuple(UnFrozenDataClass(1, 2), 3))
self.check_coder(deterministic_coder, UnFrozenDataClass(1, 2))

self.check_coder(deterministic_coder, list(MyEnum))
self.check_coder(deterministic_coder, list(MyIntEnum))
self.check_coder(deterministic_coder, list(MyIntFlag))
self.check_coder(deterministic_coder, list(MyFlag))
with self.assertRaises(TypeError):
self.check_coder(
deterministic_coder, FrozenDataClass(UnFrozenDataClass(1, 2), 3))
with self.assertRaises(TypeError):
self.check_coder(
deterministic_coder,
AnotherNamedTuple(UnFrozenDataClass(1, 2), 3))

self.check_coder(
deterministic_coder,
[DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
self.check_coder(deterministic_coder, list(MyEnum))
self.check_coder(deterministic_coder, list(MyIntEnum))
self.check_coder(deterministic_coder, list(MyIntFlag))
self.check_coder(deterministic_coder, list(MyFlag))

with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, DefinesGetState(1))
with self.assertRaises(TypeError):
self.check_coder(
deterministic_coder, DefinesGetAndSetState({
1: 'x', 'y': 2
}))
deterministic_coder,
[DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])

with self.assertRaises(TypeError):
self.check_coder(deterministic_coder, DefinesGetState(1))
with self.assertRaises(TypeError):
self.check_coder(
deterministic_coder, DefinesGetAndSetState({
1: 'x', 'y': 2
}))

@parameterized.expand([
param(compat_version=None),
Expand All @@ -364,28 +365,29 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
- In SDK version >=2.69.0 cloudpickle is used to encode "special types"
with relative file.
"""
typecoders.registry.update_compatibility_version = compat_version
values = [{
MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
for i in range(10)
}]
with scoped_pipeline_options(
PipelineOptions(update_compatibility_version=compat_version)):
values = [{
MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
for i in range(10)
}]

coder = coders.MapCoder(
coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())
coder = coders.MapCoder(
coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())

if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')

deterministic_coder = coder.as_deterministic_coder(step_label="step")
deterministic_coder = coder.as_deterministic_coder(step_label="step")

assert isinstance(
deterministic_coder._key_coder,
coders.DeterministicFastPrimitivesCoderV2 if compat_version
in (None, "2.68.0") else coders.DeterministicFastPrimitivesCoder)
assert isinstance(
deterministic_coder._key_coder,
coders.DeterministicFastPrimitivesCoderV2 if compat_version
in (None, "2.68.0") else coders.DeterministicFastPrimitivesCoder)

self.check_coder(deterministic_coder, *values)
self.check_coder(deterministic_coder, *values)

def test_dill_coder(self):
if not dill:
Expand Down Expand Up @@ -738,7 +740,6 @@ def test_cross_process_encoding_of_special_types_is_deterministic(

if sys.executable is None:
self.skipTest('No Python interpreter found')
typecoders.registry.update_compatibility_version = compat_version

# pylint: disable=line-too-long
script = textwrap.dedent(
Expand All @@ -750,7 +751,8 @@ def test_cross_process_encoding_of_special_types_is_deterministic(
import logging

from apache_beam.coders import coders
from apache_beam.coders import typecoders
from apache_beam.options.pipeline_options_context import scoped_pipeline_options
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.coders.coders_test_common import MyNamedTuple
from apache_beam.coders.coders_test_common import MyTypedNamedTuple
from apache_beam.coders.coders_test_common import MyEnum
Expand Down Expand Up @@ -802,20 +804,20 @@ def test_cross_process_encoding_of_special_types_is_deterministic(
])

compat_version = {'"'+ compat_version +'"' if compat_version else None}
typecoders.registry.update_compatibility_version = compat_version
coder = coders.FastPrimitivesCoder()
deterministic_coder = coder.as_deterministic_coder("step")

results = dict()
for test_name, value in test_cases:
try:
encoded = deterministic_coder.encode(value)
results[test_name] = encoded
except Exception as e:
logging.warning("Encoding failed with %s", e)
sys.exit(1)

sys.stdout.buffer.write(pickle.dumps(results))
with scoped_pipeline_options(PipelineOptions(update_compatibility_version=compat_version)):
coder = coders.FastPrimitivesCoder()
deterministic_coder = coder.as_deterministic_coder("step")
results = dict()
for test_name, value in test_cases:
try:
encoded = deterministic_coder.encode(value)
results[test_name] = encoded
except Exception as e:
logging.warning("Encoding failed with %s", e)
sys.exit(1)
sys.stdout.buffer.write(pickle.dumps(results))


''')
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/coders/typecoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ def __init__(self, fallback_coder=None):
self._coders: Dict[Any, Type[coders.Coder]] = {}
self.custom_types: List[Any] = []
self.register_standard_coders(fallback_coder)
self.update_compatibility_version = None

def register_standard_coders(self, fallback_coder):
"""Register coders for all basic and composite types."""
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,8 @@ def _load_data(
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
"""
self.reshuffle_before_load = not util.is_compat_version_prior_to(
p.options, "2.65.0")
self.reshuffle_before_load = not p.options.is_compat_version_prior_to(
"2.65.0")
if self.reshuffle_before_load:
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking
# fusion for inputs.
Expand Down
15 changes: 6 additions & 9 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,9 @@ def test_records_traverse_transform_with_mocks(self):
param(compat_version=None),
param(compat_version="2.64.0"),
])
def test_reshuffle_before_load(self, compat_version):
from apache_beam.coders import typecoders
typecoders.registry.force_dill_deterministic_coders = True
@mock.patch(
'apache_beam.coders.coders._should_force_use_dill', return_value=True)
def test_reshuffle_before_load(self, mock_force_dill, compat_version):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -523,7 +523,6 @@ def test_reshuffle_before_load(self, compat_version):

reshuffle_before_load = compat_version is None
assert transform.reshuffle_before_load == reshuffle_before_load
typecoders.registry.force_dill_deterministic_coders = False

def test_load_job_id_used(self):
job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -998,10 +997,10 @@ def dynamic_destination_resolver(element, *side_inputs):
param(
is_streaming=True, with_auto_sharding=True, compat_version="2.64.0"),
])
@mock.patch(
'apache_beam.coders.coders._should_force_use_dill', return_value=True)
def test_triggering_frequency(
self, is_streaming, with_auto_sharding, compat_version):
from apache_beam.coders import typecoders
typecoders.registry.force_dill_deterministic_coders = True
self, mock_force_dill, is_streaming, with_auto_sharding, compat_version):

destination = 'project1:dataset1.table1'

Expand Down Expand Up @@ -1108,8 +1107,6 @@ def __call__(self):
label='CheckDestinations')
assert_that(jobs, equal_to(expected_jobs), label='CheckJobs')

typecoders.registry.force_dill_deterministic_coders = False


class BigQueryFileLoadsIT(unittest.TestCase):

Expand Down
Loading
Loading