From b23848b368d7e2cea303ed574a7e4ebdf158dfe4 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 17 Feb 2026 23:19:05 +0000 Subject: [PATCH 1/6] Adds a new coder translator for Java SchemaCoder. Adds PipelineOptions to translation context so we can disable the new translator based on pipeline compatibility version. --- CHANGES.md | 6 +- .../dataflow/DataflowPipelineTranslator.java | 2 +- .../beam/runners/dataflow/DataflowRunner.java | 8 +- .../DataflowPipelineTranslatorTest.java | 57 ++++++++- .../util/construction/CoderTranslation.java | 6 +- .../util/construction/CoderTranslator.java | 9 +- .../util/construction/CoderTranslators.java | 112 +++++++++++++++++- .../construction/ModelCoderRegistrar.java | 3 + .../sdk/util/construction/ModelCoders.java | 2 + .../construction/RehydratedComponents.java | 3 +- .../sdk/util/construction/SdkComponents.java | 34 +++--- .../construction/CoderTranslationTest.java | 36 ++++++ .../expansion/service/ExpansionService.java | 4 +- .../avro/AvroGenericCoderTranslator.java | 2 +- 14 files changed, 250 insertions(+), 34 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1bd807abe1e4..0078c2e8e2db 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,7 +77,9 @@ ## Breaking Changes -* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). +* Portable Java SDK now encodes SchemaCoders in a portable way ([34672](https://github.com/apache/beam/issues/34672)). + * Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.71")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47). + * Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245). ## Deprecations @@ -2356,4 +2358,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 08d84705c5c7..6a35204007e7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -137,7 +137,7 @@ public class DataflowPipelineTranslator { private static byte[] serializeWindowingStrategy( WindowingStrategy windowingStrategy, PipelineOptions options) { try { - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f34b74866575..93963922ccb8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1327,13 +1327,13 @@ public DataflowPipelineJob run(Pipeline pipeline) { // with the SDK harness image (which implements Fn API). // // The same Environment is used in different and contradictory ways, depending on whether - // it is a v1 or v2 job submission. + // it is a portable or non-portable job submission. RunnerApi.Environment defaultEnvironmentForDataflow = Environments.createDockerEnvironment(workerHarnessContainerImageURL); - // The SdkComponents for portable an non-portable job submission must be kept distinct. Both + // The SdkComponents for portable and non-portable job submission must be kept distinct. Both // need the default environment. - SdkComponents portableComponents = SdkComponents.create(); + SdkComponents portableComponents = SdkComponents.create(options); portableComponents.registerEnvironment( defaultEnvironmentForDataflow .toBuilder() @@ -1372,7 +1372,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { replaceV1Transforms(pipeline); } // Capture the SdkComponents for look up during step translations - SdkComponents dataflowV1Components = SdkComponents.create(); + SdkComponents dataflowV1Components = SdkComponents.create(options); dataflowV1Components.registerEnvironment( defaultEnvironmentForDataflow .toBuilder() diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8226dc2c7274..5293ea00c71d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -46,6 +46,7 @@ import com.google.api.services.dataflow.model.Job; import com.google.api.services.dataflow.model.Step; import com.google.api.services.dataflow.model.WorkerPool; +import com.google.auto.value.AutoValue; import java.io.File; import java.io.IOException; import java.io.Serializable; @@ -89,7 +90,10 @@ import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; import org.apache.beam.sdk.state.ValueState; @@ -153,7 +157,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); private SdkComponents createSdkComponents(PipelineOptions options) { - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); String containerImageURL = DataflowRunner.getContainerImageForJob(options.as(DataflowPipelineOptions.class)); @@ -1125,7 +1129,7 @@ public String apply(byte[] input) { file1.deleteOnExit(); File file2 = File.createTempFile("file2-", ".txt"); file2.deleteOnExit(); - SdkComponents sdkComponents = SdkComponents.create(); + SdkComponents sdkComponents = SdkComponents.create(options); sdkComponents.registerEnvironment( Environments.createDockerEnvironment(DataflowRunner.getContainerImageForJob(options)) .toBuilder() @@ -1699,4 +1703,53 @@ public OffsetRange getInitialRange(@Element String element) { return null; } } + + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class SimpleAutoValue { + public abstract String getString(); + + public abstract Integer getInt32(); + + public abstract Long getInt64(); + + public static DataflowPipelineTranslatorTest.SimpleAutoValue of( + String string, Integer int32, Long int64) { + return new AutoValue_DataflowPipelineTranslatorTest_SimpleAutoValue(string, int32, int64); + } + } + + @Test + public void testSchemaCoderTranslation() throws Exception { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + pipeline + .apply(Impulse.create()) + .apply( + MapElements.via( + new SimpleFunction() { + @Override + public SimpleAutoValue apply(byte[] input) { + return SimpleAutoValue.of("foo", 5, 10L); + } + })) + .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); + { + SdkComponents sdkComponents = createSdkComponents(options); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + Map coders = pipelineProto.getComponents().getCodersMap(); + assertTrue(coders.containsKey("SchemaCoder")); + assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn()); + } + + // Prior to version 2.72, SchemaCoders are translated as custom java coders. + { + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.71"); + SdkComponents sdkComponents = createSdkComponents(options); + RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); + Map coders = pipelineProto.getComponents().getCodersMap(); + assertTrue(coders.containsKey("SchemaCoder")); + assertEquals("beam:coders:javasdk:0.1", coders.get("SchemaCoder").getSpec().getUrn()); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java index 22859dc68b93..d0f477e3a6b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslation.java @@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.construction.CoderTranslators.TranslationContextWithOptions; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap; @@ -131,12 +132,13 @@ private static RunnerApi.Coder toKnownCoder(Coder coder, SdkComponents compon throws IOException { CoderTranslator translator = getKnownTranslators().get(coder.getClass()); List componentIds = registerComponents(coder, translator, components); + TranslationContextWithOptions context = () -> components::getPipelineOptions; return RunnerApi.Coder.newBuilder() .addAllComponentCoderIds(componentIds) .setSpec( FunctionSpec.newBuilder() - .setUrn(getKnownCoderUrns().get(coder.getClass())) - .setPayload(ByteString.copyFrom(translator.getPayload(coder)))) + .setUrn(translator.getUrn(coder, context)) + .setPayload(ByteString.copyFrom(translator.getPayload(coder, context)))) .build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java index 3d89c4c7ff4a..1a2ed9e1714c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslator.java @@ -28,6 +28,13 @@ * additional payload, which is not currently supported. This exists as a temporary measure. */ public interface CoderTranslator> { + + /** Returns the String to use for the coder URN. */ + default String getUrn(T coder, TranslationContext context) { + return CoderTranslation.getKnownCoderUrns() + .getOrDefault(coder.getClass(), CoderTranslation.JAVA_SERIALIZED_CODER_URN); + } + /** Extract all component {@link Coder coders} within a coder. */ List> getComponents(T from); @@ -37,7 +44,7 @@ public interface CoderTranslator> { * *

The default implementation returns a byte array of length zero. */ - default byte[] getPayload(T from) { + default byte[] getPayload(T from, TranslationContext context) { return new byte[0]; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index 84a90721a983..488ce58b6f0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -19,8 +19,10 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import org.apache.beam.model.pipeline.v1.SchemaApi; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; @@ -29,13 +31,22 @@ import org.apache.beam.sdk.coders.NullableCoder; import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaTranslation; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.InstanceBuilder; +import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; +import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -43,6 +54,10 @@ class CoderTranslators { private CoderTranslators() {} + public interface TranslationContextWithOptions extends TranslationContext { + Supplier pipelineOptions(); + } + static > CoderTranslator atomic(final Class clazz) { return new SimpleStructuredCoderTranslator() { @Override @@ -137,7 +152,9 @@ public List> getComponents( } @Override - public byte[] getPayload(WindowedValues.ParamWindowedValueCoder from) { + public byte[] getPayload( + WindowedValues.ParamWindowedValueCoder from, + CoderTranslation.TranslationContext context) { return WindowedValues.ParamWindowedValueCoder.getPayload(from); } @@ -157,7 +174,7 @@ public List> getComponents(RowCoder from) { } @Override - public byte[] getPayload(RowCoder from) { + public byte[] getPayload(RowCoder from, CoderTranslation.TranslationContext context) { return SchemaTranslation.schemaToProto(from.getSchema(), true).toByteArray(); } @@ -177,6 +194,97 @@ public RowCoder fromComponents( }; } + static CoderTranslator> schema() { + return new CoderTranslator>() { + private static final String TO_ROW_FUNCTION_URN = "beam:torowfn:javasdk:v1"; + private static final String FROM_ROW_FUNCTION_URN = "beam:fromrowfn:javasdk:v1"; + private static final String TYPE_DESCRIPTOR_URN = "beam:typedescriptor:javasdk:v1"; + + @Override + public ImmutableList> getComponents(SchemaCoder from) { + return ImmutableList.of(); + } + + @Override + public String getUrn(SchemaCoder from, TranslationContext context) { + if (context instanceof TranslationContextWithOptions) { + PipelineOptions options = + ((TranslationContextWithOptions) context).pipelineOptions().get(); + if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { + return CoderTranslation.JAVA_SERIALIZED_CODER_URN; + } + } + return CoderTranslation.getKnownCoderUrns() + .getOrDefault(from.getClass(), CoderTranslation.JAVA_SERIALIZED_CODER_URN); + } + + @Override + public byte[] getPayload(SchemaCoder from, TranslationContext context) { + if (context instanceof TranslationContextWithOptions) { + PipelineOptions options = + ((TranslationContextWithOptions) context).pipelineOptions().get(); + if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { + return SerializableUtils.serializeToByteArray(from); + } + } + SchemaApi.SchemaCoderPayload.Builder payload = SchemaApi.SchemaCoderPayload.newBuilder(); + payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(), true)); + payload + .getToRowFnBuilder() + .setUrn(TO_ROW_FUNCTION_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getToRowFunction()))); + payload + .getFromRowFnBuilder() + .setUrn(FROM_ROW_FUNCTION_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getFromRowFunction()))); + payload + .addAdditionalCoderInfosBuilder() + .setUrn(TYPE_DESCRIPTOR_URN) + .setPayload( + ByteString.copyFrom( + SerializableUtils.serializeToByteArray(from.getEncodedTypeDescriptor()))); + return payload.build().toByteArray(); + } + + @Override + public SchemaCoder fromComponents( + List> components, byte[] payload, CoderTranslation.TranslationContext context) { + checkArgument( + components.isEmpty(), "Expected empty component list, but received: %s", components); + try { + SchemaApi.SchemaCoderPayload schemaCoderPayload = + SchemaApi.SchemaCoderPayload.parseFrom(payload); + if (schemaCoderPayload.getAdditionalCoderInfosCount() == 0) { + throw new IllegalArgumentException("Missing serialized typeDescriptor"); + } + TypeDescriptor typeDescriptor = + (TypeDescriptor) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getAdditionalCoderInfos(0).getPayload().toByteArray(), + "typeDescriptor"); + SerializableFunction toRowFunction = + (SerializableFunction) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getToRowFn().getPayload().toByteArray(), "toRowFunction"); + SerializableFunction fromRowFunction = + (SerializableFunction) + SerializableUtils.deserializeFromByteArray( + schemaCoderPayload.getFromRowFn().getPayload().toByteArray(), + "fromRowFunction"); + + Schema schema = SchemaTranslation.schemaFromProto(schemaCoderPayload.getSchema()); + return SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction); + } catch (IOException | IllegalArgumentException e) { + throw new RuntimeException(e); + } + } + }; + } + static CoderTranslator> shardedKey() { return new SimpleStructuredCoderTranslator>() { @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java index 5b0d5aedd619..6d8b7d08916a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoderRegistrar.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; @@ -71,6 +72,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { ModelCoders.PARAM_WINDOWED_VALUE_CODER_URN) .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN) .put(RowCoder.class, ModelCoders.ROW_CODER_URN) + .put(SchemaCoder.class, ModelCoders.SCHEMA_CODER_URN) .put(ShardedKey.Coder.class, ModelCoders.SHARDED_KEY_CODER_URN) .put(TimestampPrefixingWindowCoder.class, ModelCoders.CUSTOM_WINDOW_CODER_URN) .put(NullableCoder.class, ModelCoders.NULLABLE_CODER_URN) @@ -96,6 +98,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { CoderTranslators.paramWindowedValue()) .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class)) .put(RowCoder.class, CoderTranslators.row()) + .put(SchemaCoder.class, CoderTranslators.schema()) .put(ShardedKey.Coder.class, CoderTranslators.shardedKey()) .put(TimestampPrefixingWindowCoder.class, CoderTranslators.timestampPrefixingWindow()) .put(NullableCoder.class, CoderTranslators.nullable()) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java index 7b7546aceb61..5059cc1c6b83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/ModelCoders.java @@ -61,6 +61,7 @@ private ModelCoders() {} getUrn(StandardCoders.Enum.PARAM_WINDOWED_VALUE); public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW); + public static final String SCHEMA_CODER_URN = getUrn(StandardCoders.Enum.SCHEMA); public static final String STATE_BACKED_ITERABLE_CODER_URN = "beam:coder:state_backed_iterable:v1"; @@ -90,6 +91,7 @@ private ModelCoders() {} WINDOWED_VALUE_CODER_URN, DOUBLE_CODER_URN, ROW_CODER_URN, + SCHEMA_CODER_URN, PARAM_WINDOWED_VALUE_CODER_URN, STATE_BACKED_ITERABLE_CODER_URN, SHARDED_KEY_CODER_URN, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java index f79696214368..64c7898a37b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/RehydratedComponents.java @@ -189,6 +189,7 @@ public SdkComponents getSdkComponents(Collection requirements) { windowingStrategies.asMap(), coders.asMap(), Collections.emptyMap(), - requirements); + requirements, + pipeline.getOptions()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java index 446697f24a81..58d10a712c40 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SdkComponents.java @@ -63,6 +63,7 @@ public class SdkComponents { private final BiMap environmentIds = HashBiMap.create(); private final BiMap coderProtoToId = HashBiMap.create(); private final Set requirements; + private PipelineOptions pipelineOptions; private final Set reservedIds = new HashSet<>(); @@ -71,17 +72,7 @@ public class SdkComponents { /** Create a new {@link SdkComponents} with no components. */ public static SdkComponents create() { - return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); - } - - /** - * Create new {@link SdkComponents} importing all items from provided {@link Components} object. - * - *

WARNING: This action might cause some of duplicate items created. - */ - public static SdkComponents create( - RunnerApi.Components components, Collection requirements) { - return new SdkComponents(components, requirements, ""); + return new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", null); } /*package*/ static SdkComponents create( @@ -91,31 +82,36 @@ public static SdkComponents create( Map> windowingStrategies, Map> coders, Map environments, - Collection requirements) { - SdkComponents sdkComponents = SdkComponents.create(components, requirements); + Collection requirements, + PipelineOptions pipelineOptions) { + SdkComponents sdkComponents = new SdkComponents(components, requirements, "", pipelineOptions); sdkComponents.transformIds.inverse().putAll(transforms); sdkComponents.pCollectionIds.inverse().putAll(pCollections); sdkComponents.windowingStrategyIds.inverse().putAll(windowingStrategies); sdkComponents.coderIds.inverse().putAll(coders); sdkComponents.environmentIds.inverse().putAll(environments); + sdkComponents.pipelineOptions = pipelineOptions; return sdkComponents; } public static SdkComponents create(PipelineOptions options) { SdkComponents sdkComponents = - new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, ""); + new SdkComponents(RunnerApi.Components.getDefaultInstance(), null, "", options); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); sdkComponents.registerEnvironment( Environments.createOrGetDefaultEnvironment(portablePipelineOptions)); + sdkComponents.pipelineOptions = options; return sdkComponents; } private SdkComponents( @Nullable Components components, @Nullable Collection requirements, - String newIdPrefix) { + String newIdPrefix, + @Nullable PipelineOptions pipelineOptions) { this.newIdPrefix = newIdPrefix; this.requirements = new HashSet<>(); + this.pipelineOptions = pipelineOptions; if (components == null) { if (requirements != null) { @@ -153,7 +149,7 @@ public void mergeFrom( */ public SdkComponents withNewIdPrefix(String newIdPrefix) { SdkComponents sdkComponents = - new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix); + new SdkComponents(componentsBuilder.build(), requirements, newIdPrefix, pipelineOptions); sdkComponents.transformIds.putAll(transformIds); sdkComponents.pCollectionIds.putAll(pCollectionIds); sdkComponents.windowingStrategyIds.putAll(windowingStrategyIds); @@ -174,7 +170,7 @@ public String registerPTransform( throws IOException { String name = getApplicationName(appliedPTransform); // If this transform is present in the components, nothing to do. return the existing name. - // Otherwise the transform must be translated and added to the components. + // Otherwise, the transform must be translated and added to the components. if (componentsBuilder.getTransformsOrDefault(name, null) != null) { return name; } @@ -375,4 +371,8 @@ public RunnerApi.Components toComponents() { public Collection requirements() { return ImmutableSet.copyOf(requirements); } + + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java index b8f92ff0053e..e2ed399dacf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/CoderTranslationTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.not; +import com.google.auto.value.AutoValue; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -45,14 +46,20 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext; +import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowedValues.FullWindowedValueCoder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -70,6 +77,34 @@ "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) }) public class CoderTranslationTest { + @AutoValue + @DefaultSchema(AutoValueSchema.class) + public abstract static class SimpleAutoValue { + public abstract String getString(); + + public abstract Integer getInt32(); + + public abstract Long getInt64(); + + public static SimpleAutoValue of(String string, Integer int32, Long int64) { + return new AutoValue_CoderTranslationTest_SimpleAutoValue(string, int32, int64); + } + } + + private static final SchemaRegistry REGISTRY = SchemaRegistry.createDefault(); + + private static SchemaCoder schemaCoderFrom(TypeDescriptor typeDescriptor) { + try { + return SchemaCoder.of( + REGISTRY.getSchema(typeDescriptor), + typeDescriptor, + REGISTRY.getToRowFunction(typeDescriptor), + REGISTRY.getFromRowFunction(typeDescriptor)); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + private static final Set> KNOWN_CODERS = ImmutableSet.>builder() .add(ByteArrayCoder.of()) @@ -94,6 +129,7 @@ public class CoderTranslationTest { Field.of("array", FieldType.array(FieldType.STRING)), Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)), Field.of("bar", FieldType.logicalType(FixedBytes.of(123)))))) + .add(schemaCoderFrom(TypeDescriptor.of(SimpleAutoValue.class))) .add(ShardedKey.Coder.of(StringUtf8Coder.of())) .add(TimestampPrefixingWindowCoder.of(IntervalWindowCoder.of())) .add(NullableCoder.of(ByteArrayCoder.of())) diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index c3c3ccfd3266..e0e7283dea73 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -602,7 +602,7 @@ private Map loadRegisteredTransforms() { pipeline.getOptions().as(ExperimentalOptions.class), "use_sdf_read"); } else { LOG.warn( - "Using use_depreacted_read in portable runners is runner-dependent. The " + "Using use_deprecated_read in portable runners is runner-dependent. The " + "ExpansionService will respect that, but if your runner does not have support for " + "native Read transform, your Pipeline will fail during Pipeline submission."); } @@ -675,6 +675,8 @@ private Map loadRegisteredTransforms() { inputs); // Needed to find which transform was new... + // This SdkComponents comes from rehydratedComponents, but doesn't take into account any + // additional translation options specified in PipelineOptions. SdkComponents sdkComponents = rehydratedComponents .getSdkComponents(request.getRequirementsList()) diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderTranslator.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderTranslator.java index e56b95d7f8a6..f7fa0c85954b 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderTranslator.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/AvroGenericCoderTranslator.java @@ -34,7 +34,7 @@ public List> getComponents(AvroGenericCoder from) { } @Override - public byte[] getPayload(AvroGenericCoder from) { + public byte[] getPayload(AvroGenericCoder from, TranslationContext context) { return from.getSchema().toString().getBytes(StandardCharsets.UTF_8); } From 2a637d93a01b4a5cb0349e9e77582abaf68bf394 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Wed, 18 Feb 2026 00:06:27 +0000 Subject: [PATCH 2/6] Fixes whitespace in CHANGES file --- CHANGES.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0078c2e8e2db..2e88e7ceb5dd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,7 +64,6 @@ * Flink 2.0 support for Java Classic and Portable Flink Runners ([#36947](https://github.com/apache/beam/issues/36947)), experimental support for other SDK languages including Python. - ## I/Os * Add Datadog IO support (Java) ([#37318](https://github.com/apache/beam/issues/37318)). @@ -2358,4 +2357,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file From c3fc500fe01386823462607eda3ad7728fab97e7 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Thu, 19 Feb 2026 16:00:04 -0800 Subject: [PATCH 3/6] Addresses review comments. Renames v1/v2 to non-portable/portable, refactors common code into helper method, and removes an incorrect comment. --- .../beam/runners/dataflow/DataflowRunner.java | 24 ++++++++-------- .../util/construction/CoderTranslators.java | 28 +++++++++++-------- .../expansion/service/ExpansionService.java | 2 -- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 93963922ccb8..bc1022e52a92 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1365,28 +1365,28 @@ public DataflowPipelineJob run(Pipeline pipeline) { dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); if (useUnifiedWorker(options)) { - LOG.info("Skipping v1 transform replacements since job will run on v2."); + LOG.info("Skipping non-portable transform replacements since job will run on portable worker."); } else { - // Now rewrite things to be as needed for v1 (mutates the pipeline) - // This way the job submitted is valid for v1 and v2, simultaneously + // Now rewrite things to be as needed for non-portable (mutates the pipeline). + // This way the job submitted is valid for portable and non-portable, simultaneously. replaceV1Transforms(pipeline); } - // Capture the SdkComponents for look up during step translations - SdkComponents dataflowV1Components = SdkComponents.create(options); - dataflowV1Components.registerEnvironment( + // Capture the SdkComponents for look up during step translations. + SdkComponents dataflowNonPortableComponents = SdkComponents.create(options); + dataflowNonPortableComponents.registerEnvironment( defaultEnvironmentForDataflow .toBuilder() .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); - // No need to perform transform upgrading for the Runner v1 proto. - RunnerApi.Pipeline dataflowV1PipelineProto = - PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); + // No need to perform transform upgrading for the non-portable runner proto. + RunnerApi.Pipeline dataflowNonPortablePipelineProto = + PipelineTranslation.toProto(pipeline, dataflowNonPortableComponents, true, false); if (LOG.isDebugEnabled()) { LOG.debug( - "Dataflow v1 pipeline proto:\n{}", - TextFormat.printer().printToString(dataflowV1PipelineProto)); + "Dataflow non-portable worker pipeline proto:\n{}", + TextFormat.printer().printToString(dataflowNonPortablePipelineProto)); } // Set a unique client_request_id in the CreateJob request. @@ -1406,7 +1406,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { JobSpecification jobSpecification = translator.translate( - pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages); + pipeline, dataflowNonPortablePipelineProto, dataflowNonPortableComponents, this, packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { List experiments = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index 488ce58b6f0c..c0ad444857c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -205,27 +205,32 @@ public ImmutableList> getComponents(SchemaCoder from) { return ImmutableList.of(); } - @Override - public String getUrn(SchemaCoder from, TranslationContext context) { + // Used for backwards compatibility with older versions of the SDK. + private boolean encodeAsJavaSerializedCoder(TranslationContext context) { if (context instanceof TranslationContextWithOptions) { PipelineOptions options = ((TranslationContextWithOptions) context).pipelineOptions().get(); if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { - return CoderTranslation.JAVA_SERIALIZED_CODER_URN; + return true; } } - return CoderTranslation.getKnownCoderUrns() + return false; + } + + @Override + public String getUrn(SchemaCoder from, TranslationContext context) { + if (encodeAsJavaSerializedCoder(context)) { + return CoderTranslation.JAVA_SERIALIZED_CODER_URN; + } else { + return CoderTranslation.getKnownCoderUrns() .getOrDefault(from.getClass(), CoderTranslation.JAVA_SERIALIZED_CODER_URN); + } } @Override public byte[] getPayload(SchemaCoder from, TranslationContext context) { - if (context instanceof TranslationContextWithOptions) { - PipelineOptions options = - ((TranslationContextWithOptions) context).pipelineOptions().get(); - if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { - return SerializableUtils.serializeToByteArray(from); - } + if (encodeAsJavaSerializedCoder(context)) { + return SerializableUtils.serializeToByteArray(from); } SchemaApi.SchemaCoderPayload.Builder payload = SchemaApi.SchemaCoderPayload.newBuilder(); payload.setSchema(SchemaTranslation.schemaToProto(from.getSchema(), true)); @@ -252,9 +257,10 @@ public byte[] getPayload(SchemaCoder from, TranslationContext context) { @Override public SchemaCoder fromComponents( - List> components, byte[] payload, CoderTranslation.TranslationContext context) { + List> components, byte[] payload, TranslationContext context) { checkArgument( components.isEmpty(), "Expected empty component list, but received: %s", components); + checkArgument(!encodeAsJavaSerializedCoder(context), "This translator should not be used for Java serialized coders."); try { SchemaApi.SchemaCoderPayload schemaCoderPayload = SchemaApi.SchemaCoderPayload.parseFrom(payload); diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java index e0e7283dea73..9c4c0dfa4113 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java @@ -675,8 +675,6 @@ private Map loadRegisteredTransforms() { inputs); // Needed to find which transform was new... - // This SdkComponents comes from rehydratedComponents, but doesn't take into account any - // additional translation options specified in PipelineOptions. SdkComponents sdkComponents = rehydratedComponents .getSdkComponents(request.getRequirementsList()) From 0e5aa2899e2070e4d739169e1b6fa87d85d7139b Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 23 Feb 2026 14:14:58 -0800 Subject: [PATCH 4/6] Fixed formatting. --- .../org/apache/beam/runners/dataflow/DataflowRunner.java | 9 +++++++-- .../beam/sdk/util/construction/CoderTranslators.java | 6 ++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index bc1022e52a92..969ea158befb 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1365,7 +1365,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { dataflowOptions.setPipelineUrl(stagedPipeline.getLocation()); if (useUnifiedWorker(options)) { - LOG.info("Skipping non-portable transform replacements since job will run on portable worker."); + LOG.info( + "Skipping non-portable transform replacements since job will run on portable worker."); } else { // Now rewrite things to be as needed for non-portable (mutates the pipeline). // This way the job submitted is valid for portable and non-portable, simultaneously. @@ -1406,7 +1407,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { JobSpecification jobSpecification = translator.translate( - pipeline, dataflowNonPortablePipelineProto, dataflowNonPortableComponents, this, packages); + pipeline, + dataflowNonPortablePipelineProto, + dataflowNonPortableComponents, + this, + packages); if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) { List experiments = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index c0ad444857c3..7f1997898998 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -223,7 +223,7 @@ public String getUrn(SchemaCoder from, TranslationContext context) { return CoderTranslation.JAVA_SERIALIZED_CODER_URN; } else { return CoderTranslation.getKnownCoderUrns() - .getOrDefault(from.getClass(), CoderTranslation.JAVA_SERIALIZED_CODER_URN); + .getOrDefault(from.getClass(), CoderTranslation.JAVA_SERIALIZED_CODER_URN); } } @@ -260,7 +260,9 @@ public SchemaCoder fromComponents( List> components, byte[] payload, TranslationContext context) { checkArgument( components.isEmpty(), "Expected empty component list, but received: %s", components); - checkArgument(!encodeAsJavaSerializedCoder(context), "This translator should not be used for Java serialized coders."); + checkArgument( + !encodeAsJavaSerializedCoder(context), + "This translator should not be used for Java serialized coders."); try { SchemaApi.SchemaCoderPayload schemaCoderPayload = SchemaApi.SchemaCoderPayload.parseFrom(payload); From 98ceb213411d5bd3b96a5e6f7b70b9f2c9e1acc7 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Mon, 23 Feb 2026 15:07:31 -0800 Subject: [PATCH 5/6] Updates formatting for CHANGES.md. --- CHANGES.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 2e88e7ceb5dd..a83722bc28b5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -77,8 +77,8 @@ ## Breaking Changes * Portable Java SDK now encodes SchemaCoders in a portable way ([34672](https://github.com/apache/beam/issues/34672)). - * Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.71")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47). - * Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245). + - Original custom Java coder encoding can still be obtained using [StreamingOptions.setUpdateCompatibilityVersion("2.71")](https://github.com/apache/beam/blob/2cf0930e7ae1aa389c26ce6639b584877a3e31d9/sdks/java/core/src/main/java/org/apache/beam/sdk/options/StreamingOptions.java#L47). + - Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245). ## Deprecations @@ -2357,4 +2357,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss ## Highlights -- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). \ No newline at end of file +- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/). From e0437a890a3f4a84c18f69c09760d747f6ba171d Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 24 Feb 2026 09:50:47 -0800 Subject: [PATCH 6/6] Cleans up CHANGES.md and changes new coder to take effect in 2.73 release since 2.72 is already cut. --- CHANGES.md | 2 -- .../beam/runners/dataflow/DataflowPipelineTranslatorTest.java | 4 ++-- .../apache/beam/sdk/util/construction/CoderTranslators.java | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0e26c1bded34..5bd04eea3981 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,8 +80,6 @@ - Fixes [36496](https://github.com/apache/beam/issues/36496), [30276](https://github.com/apache/beam/issues/30276), [29245](https://github.com/apache/beam/issues/29245). * The Python SDK container's `boot.go` now passes pipeline options through a file instead of the `PIPELINE_OPTIONS` environment variable. If a user pairs a new Python SDK container with an older SDK version (which does not support the file-based approach), the pipeline options will not be recognized and the pipeline will fail. Users must ensure their SDK and container versions are synchronized ([#37370](https://github.com/apache/beam/issues/37370)). ->>>>>>> fd08df28b21a5b5e6d8a519ca000b963c53e46a2 - ## Deprecations * X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 5293ea00c71d..e3396687410b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -1742,9 +1742,9 @@ public SimpleAutoValue apply(byte[] input) { assertEquals("beam:coder:schema:v1", coders.get("SchemaCoder").getSpec().getUrn()); } - // Prior to version 2.72, SchemaCoders are translated as custom java coders. + // Prior to version 2.73, SchemaCoders are translated as custom java coders. { - options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.71"); + options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.72"); SdkComponents sdkComponents = createSdkComponents(options); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents, true); Map coders = pipelineProto.getComponents().getCodersMap(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java index 7f1997898998..768ce57997f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CoderTranslators.java @@ -210,7 +210,7 @@ private boolean encodeAsJavaSerializedCoder(TranslationContext context) { if (context instanceof TranslationContextWithOptions) { PipelineOptions options = ((TranslationContextWithOptions) context).pipelineOptions().get(); - if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.72")) { + if (StreamingOptions.updateCompatibilityVersionLessThan(options, "2.73")) { return true; } }