From 586b7c18886f2b6016bfc5ebc4ecb3031a051172 Mon Sep 17 00:00:00 2001 From: Alek Mitrevski Date: Sat, 6 Dec 2025 15:13:31 -0700 Subject: [PATCH 1/5] Add PrimitiveUnboundedRead and PrimitiveBoundedRead translators to Flink streaming runner This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming runner, enabling unbounded sources like KinesisIO.read() to work with the classic Flink runner on AWS Managed Flink. Problem: - Read.Unbounded.expand() wraps sources with UnboundedSourceAsSDFWrapperFn - FlinkRunner converts SDF-based reads to PrimitiveUnboundedRead - FlinkStreamingTransformTranslators.getTranslator() had no translator for PrimitiveUnboundedRead, causing 'No translator known' errors Solution: - Add PrimitiveUnboundedReadTranslator that extracts UnboundedSource directly from transform.getSource() instead of using ReadTranslation - Add PrimitiveBoundedReadTranslator for bounded sources - Modify getTranslator() to check for these transform types explicitly Testing: - Added 4 new unit tests - All 383 Flink runner tests pass --- .../FlinkStreamingTransformTranslators.java | 131 ++++++++++++++++++ ...linkStreamingTransformTranslatorsTest.java | 117 ++++++++++++++++ 2 files changed, 248 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 79a90c554027..305c719f90c1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -174,6 +174,14 @@ class FlinkStreamingTransformTranslators { public static FlinkStreamingPipelineTranslator.StreamTransformTranslator getTranslator( PTransform transform) { + // Handle PrimitiveUnboundedRead explicitly (created by SplittableParDo conversion) + if (transform instanceof SplittableParDo.PrimitiveUnboundedRead) { + return new PrimitiveUnboundedReadTranslator<>(); + } + // Handle PrimitiveBoundedRead explicitly + if (transform instanceof SplittableParDo.PrimitiveBoundedRead) { + return new PrimitiveBoundedReadTranslator<>(); + } @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform); return urn == null ? null : TRANSLATORS.get(urn); } @@ -263,6 +271,129 @@ public void translateNode( } } + /** + * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}. + * + *

This handles the case where Read.Unbounded is converted to PrimitiveUnboundedRead by {@link + * SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}. + */ + private static class PrimitiveUnboundedReadTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.PrimitiveUnboundedRead> { + + @Override + public void translateNode( + SplittableParDo.PrimitiveUnboundedRead transform, + FlinkStreamingTranslationContext context) { + + PCollection output = context.getOutput(transform); + + DataStream> source; + DataStream>> nonDedupSource; + TypeInformation> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + Coder coder = context.getOutput(transform).getCoder(); + + TypeInformation>> withIdTypeInfo = + new CoderTypeInformation<>( + WindowedValues.getFullCoder( + ValueWithRecordId.ValueWithRecordIdCoder.of(coder), + output.getWindowingStrategy().getWindowFn().windowCoder()), + context.getPipelineOptions()); + + // Get source directly from PrimitiveUnboundedRead (not via ReadTranslation) + UnboundedSource rawSource = transform.getSource(); + + String fullName = getCurrentTransformName(context); + try { + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transform.getName(), + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + nonDedupSource = + context + .getExecutionEnvironment() + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); + + if (rawSource.requiresDeduping()) { + source = + nonDedupSource + .keyBy(new ValueWithRecordIdKeySelector<>()) + .transform( + "deduping", + outputTypeInfo, + new DedupingOperator<>(context.getPipelineOptions())) + .uid(format("%s/__deduplicated__", fullName)); + } else { + source = + nonDedupSource + .flatMap(new StripIdsMap<>(context.getPipelineOptions())) + .returns(outputTypeInfo); + } + } catch (Exception e) { + throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); + } + + context.setOutputDataStream(output, source); + } + } + + /** + * Translator for {@link SplittableParDo.PrimitiveBoundedRead}. + * + *

This handles the case where Read.Bounded is converted to PrimitiveBoundedRead by {@link + * SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}. + */ + private static class PrimitiveBoundedReadTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.PrimitiveBoundedRead> { + + @Override + public void translateNode( + SplittableParDo.PrimitiveBoundedRead transform, + FlinkStreamingTranslationContext context) { + + PCollection output = context.getOutput(transform); + TypeInformation> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + // Get source directly from PrimitiveBoundedRead (not via ReadTranslation) + BoundedSource rawSource = transform.getSource(); + + String fullName = getCurrentTransformName(context); + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + fullName, + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + + DataStream> source = + context + .getExecutionEnvironment() + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName); + + context.setOutputDataStream(output, source); + } + } + static class ValueWithRecordIdKeySelector implements KeySelector>, FlinkKey>, ResultTypeQueryable { diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 733bf536634c..57eac183fd0b 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -147,6 +149,86 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { assertEquals(parallelism, source.getNumSplits()); } + @Test + public void getTranslatorReturnsPrimitiveUnboundedReadTranslator() { + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); + + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + assertNotNull("Translator should not be null for PrimitiveUnboundedRead", translator); + } + + @Test + public void getTranslatorReturnsPrimitiveBoundedReadTranslator() { + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(100))); + + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + assertNotNull("Translator should not be null for PrimitiveBoundedRead", translator); + } + + @Test + public void primitiveUnboundedReadTranslatorProducesCorrectSource() { + final int maxParallelism = 4; + final int parallelism = 2; + + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.setMaxParallelism(maxParallelism); + + // Use getTranslator directly to verify our new translator is used + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + assertNotNull(translator); + + Object sourceTransform = + applyReadSourceTransformWithTranslator( + transform, translator, PCollection.IsBounded.UNBOUNDED, env); + + assertTrue(sourceTransform instanceof OneInputTransformation); + OneInputTransformation oneInputTransform = (OneInputTransformation) sourceTransform; + + FlinkSource source = + (FlinkSource) + ((SourceTransformation) Iterables.getOnlyElement(oneInputTransform.getInputs())) + .getSource(); + + assertEquals(maxParallelism, source.getNumSplits()); + } + + @Test + public void primitiveBoundedReadTranslatorProducesCorrectSource() { + final int maxParallelism = 4; + final int parallelism = 2; + + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(maxParallelism))); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.setMaxParallelism(maxParallelism); + + // Use getTranslator directly to verify our new translator is used + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + assertNotNull(translator); + + Object sourceTransform = + applyReadSourceTransformWithTranslator( + transform, translator, PCollection.IsBounded.BOUNDED, env); + + assertTrue(sourceTransform instanceof SourceTransformation); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + + assertEquals(maxParallelism, source.getNumSplits()); + } + private Object applyReadSourceTransform( PTransform transform, PCollection.IsBounded isBounded, StreamExecutionEnvironment env) { @@ -178,6 +260,41 @@ private Object applyReadSourceTransform( return ctx.getInputDataStream(pc).getTransformation(); } + @SuppressWarnings("unchecked") + private Object applyReadSourceTransformWithTranslator( + PTransform transform, + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator, + PCollection.IsBounded isBounded, + StreamExecutionEnvironment env) { + + FlinkStreamingTranslationContext ctx = + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); + + Pipeline pipeline = Pipeline.create(); + PCollection pc = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), isBounded, StringUtf8Coder.of()); + pc.setName("output"); + + Map, PValue> outputs = new HashMap<>(); + outputs.put(new TupleTag<>(), pc); + AppliedPTransform appliedTransform = + AppliedPTransform.of( + "test-transform", + Collections.emptyMap(), + PValues.fullyExpand(outputs), + transform, + ResourceHints.create(), + Pipeline.create()); + + ctx.setCurrentTransform(appliedTransform); + ((FlinkStreamingPipelineTranslator.StreamTransformTranslator>) + translator) + .translateNode(transform, ctx); + + return ctx.getInputDataStream(pc).getTransformation(); + } + @SuppressWarnings("unchecked") private FlinkStreamingPipelineTranslator.StreamTransformTranslator> getReadSourceTranslator() { From f3eed3dcfe5cb9a05473ab3e66c81bded4ca291a Mon Sep 17 00:00:00 2001 From: Alek Mitrevski Date: Sat, 6 Dec 2025 16:07:22 -0700 Subject: [PATCH 2/5] [Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead This commit adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. The Flink classic runner calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using beam_fn_api experiment, which converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead. Without translators for these transforms, pipelines using unbounded sources like KinesisIO.read() fail with: 'No translator known for PrimitiveUnboundedRead' Changes: - Add PrimitiveUnboundedReadTranslator class - Add PrimitiveBoundedReadTranslator class - Modify getTranslator() to handle these transforms before URN lookup - Add unit tests for the new translators - Update CHANGES.md with bugfix entry Related to #20530 --- CHANGES.md | 1 + .../flink/FlinkStreamingTransformTranslatorsTest.java | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index dfad320a694d..80ef5d3cc83b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ ## Bugfixes +* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#XXXXX](https://github.com/apache/beam/issues/XXXXX)). * Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)). diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 57eac183fd0b..0d2b547d82ea 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -196,7 +196,8 @@ public void primitiveUnboundedReadTranslatorProducesCorrectSource() { FlinkSource source = (FlinkSource) - ((SourceTransformation) Iterables.getOnlyElement(oneInputTransform.getInputs())) + ((SourceTransformation) + Iterables.getOnlyElement(oneInputTransform.getInputs())) .getSource(); assertEquals(maxParallelism, source.getNumSplits()); @@ -208,7 +209,8 @@ public void primitiveBoundedReadTranslatorProducesCorrectSource() { final int parallelism = 2; SplittableParDo.PrimitiveBoundedRead transform = - new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(maxParallelism))); + new SplittableParDo.PrimitiveBoundedRead<>( + Read.from(new TestBoundedSource(maxParallelism))); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.setMaxParallelism(maxParallelism); @@ -288,8 +290,7 @@ private Object applyReadSourceTransformWithTranslator( Pipeline.create()); ctx.setCurrentTransform(appliedTransform); - ((FlinkStreamingPipelineTranslator.StreamTransformTranslator>) - translator) + ((FlinkStreamingPipelineTranslator.StreamTransformTranslator>) translator) .translateNode(transform, ctx); return ctx.getInputDataStream(pc).getTransformation(); From 282f84a276a63a26671d9395e05554eabd40a394 Mon Sep 17 00:00:00 2001 From: Alek Date: Mon, 8 Dec 2025 16:55:40 -0700 Subject: [PATCH 3/5] Update CHANGES.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 80ef5d3cc83b..5292c147368f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,7 +84,7 @@ ## Bugfixes -* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#XXXXX](https://github.com/apache/beam/issues/XXXXX)). +* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#37035](https://github.com/apache/beam/issues/37035)). * Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)). From b46e69b6dd77ce00b6736eb44416b6d629bd8671 Mon Sep 17 00:00:00 2001 From: Alek Mitrevski Date: Mon, 8 Dec 2025 17:20:06 -0700 Subject: [PATCH 4/5] Refactor primitive read translators to reduce code duplication - Add translateUnboundedSource() helper method for common unbounded source translation logic - Add translateBoundedSource() helper method for common bounded source translation logic - Simplify UnboundedReadSourceTranslator to use helper method - Simplify PrimitiveUnboundedReadTranslator to use helper method - Simplify BoundedReadSourceTranslator to use helper method - Simplify PrimitiveBoundedReadTranslator to use helper method - Add missing try-catch, .returns(), and batch mode slot sharing to PrimitiveBoundedReadTranslator This refactoring reduces ~100 lines of duplicated code and ensures consistent behavior across all read translators. --- .../FlinkStreamingTransformTranslators.java | 307 +++++++----------- 1 file changed, 119 insertions(+), 188 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 305c719f90c1..d836d9bd8f52 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -97,6 +97,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -195,6 +196,120 @@ public static String getCurrentTransformName(FlinkStreamingTranslationContext co // Transformation Implementations // -------------------------------------------------------------------------------------------- + /** Common translation logic for unbounded sources. */ + @SuppressWarnings("unchecked") + private static void translateUnboundedSource( + UnboundedSource rawSource, + String transformName, + FlinkStreamingTranslationContext context) { + + PCollection output = + (PCollection) + Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values()); + + DataStream> source; + DataStream>> nonDedupSource; + TypeInformation> outputTypeInfo = context.getTypeInfo(output); + + Coder coder = output.getCoder(); + + TypeInformation>> withIdTypeInfo = + new CoderTypeInformation<>( + WindowedValues.getFullCoder( + ValueWithRecordId.ValueWithRecordIdCoder.of(coder), + output.getWindowingStrategy().getWindowFn().windowCoder()), + context.getPipelineOptions()); + + String fullName = getCurrentTransformName(context); + try { + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transformName, + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + nonDedupSource = + context + .getExecutionEnvironment() + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); + + if (rawSource.requiresDeduping()) { + source = + nonDedupSource + .keyBy(new ValueWithRecordIdKeySelector<>()) + .transform( + "deduping", + outputTypeInfo, + new DedupingOperator<>(context.getPipelineOptions())) + .uid(format("%s/__deduplicated__", fullName)); + } else { + source = + nonDedupSource + .flatMap(new StripIdsMap<>(context.getPipelineOptions())) + .returns(outputTypeInfo); + } + } catch (Exception e) { + throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); + } + + context.setOutputDataStream(output, source); + } + + /** Common translation logic for bounded sources. */ + @SuppressWarnings("unchecked") + private static void translateBoundedSource( + BoundedSource rawSource, String transformName, FlinkStreamingTranslationContext context) { + + PCollection output = + (PCollection) + Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values()); + + TypeInformation> outputTypeInfo = context.getTypeInfo(output); + + String fullName = getCurrentTransformName(context); + int parallelism = + context.getExecutionEnvironment().getMaxParallelism() > 0 + ? context.getExecutionEnvironment().getMaxParallelism() + : context.getExecutionEnvironment().getParallelism(); + + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + transformName, + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + + SingleOutputStreamOperator> source; + try { + source = + context + .getExecutionEnvironment() + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName) + .returns(outputTypeInfo); + + if (!context.isStreaming() + && context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getForceSlotSharingGroup()) { + source = source.slotSharingGroup(FORCED_SLOT_GROUP); + } + } catch (Exception e) { + throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); + } + + context.setOutputDataStream(output, source); + } + private static class UnboundedReadSourceTranslator extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< PTransform>> { @@ -202,22 +317,6 @@ private static class UnboundedReadSourceTranslator @Override public void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - PCollection output = context.getOutput(transform); - - DataStream> source; - DataStream>> nonDedupSource; - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - Coder coder = context.getOutput(transform).getCoder(); - - TypeInformation>> withIdTypeInfo = - new CoderTypeInformation<>( - WindowedValues.getFullCoder( - ValueWithRecordId.ValueWithRecordIdCoder.of(coder), - output.getWindowingStrategy().getWindowFn().windowCoder()), - context.getPipelineOptions()); - UnboundedSource rawSource; try { rawSource = @@ -227,47 +326,7 @@ public void translateNode( } catch (IOException e) { throw new RuntimeException(e); } - - String fullName = getCurrentTransformName(context); - try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkUnboundedSource unboundedSource = - FlinkSource.unbounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - nonDedupSource = - context - .getExecutionEnvironment() - .fromSource( - unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) - .uid(fullName); - - if (rawSource.requiresDeduping()) { - source = - nonDedupSource - .keyBy(new ValueWithRecordIdKeySelector<>()) - .transform( - "deduping", - outputTypeInfo, - new DedupingOperator<>(context.getPipelineOptions())) - .uid(format("%s/__deduplicated__", fullName)); - } else { - source = - nonDedupSource - .flatMap(new StripIdsMap<>(context.getPipelineOptions())) - .returns(outputTypeInfo); - } - } catch (Exception e) { - throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); - } - - context.setOutputDataStream(output, source); + translateUnboundedSource(rawSource, transform.getName(), context); } } @@ -285,66 +344,7 @@ private static class PrimitiveUnboundedReadTranslator public void translateNode( SplittableParDo.PrimitiveUnboundedRead transform, FlinkStreamingTranslationContext context) { - - PCollection output = context.getOutput(transform); - - DataStream> source; - DataStream>> nonDedupSource; - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - Coder coder = context.getOutput(transform).getCoder(); - - TypeInformation>> withIdTypeInfo = - new CoderTypeInformation<>( - WindowedValues.getFullCoder( - ValueWithRecordId.ValueWithRecordIdCoder.of(coder), - output.getWindowingStrategy().getWindowFn().windowCoder()), - context.getPipelineOptions()); - - // Get source directly from PrimitiveUnboundedRead (not via ReadTranslation) - UnboundedSource rawSource = transform.getSource(); - - String fullName = getCurrentTransformName(context); - try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkUnboundedSource unboundedSource = - FlinkSource.unbounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - nonDedupSource = - context - .getExecutionEnvironment() - .fromSource( - unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) - .uid(fullName); - - if (rawSource.requiresDeduping()) { - source = - nonDedupSource - .keyBy(new ValueWithRecordIdKeySelector<>()) - .transform( - "deduping", - outputTypeInfo, - new DedupingOperator<>(context.getPipelineOptions())) - .uid(format("%s/__deduplicated__", fullName)); - } else { - source = - nonDedupSource - .flatMap(new StripIdsMap<>(context.getPipelineOptions())) - .returns(outputTypeInfo); - } - } catch (Exception e) { - throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); - } - - context.setOutputDataStream(output, source); + translateUnboundedSource(transform.getSource(), transform.getName(), context); } } @@ -362,35 +362,7 @@ private static class PrimitiveBoundedReadTranslator public void translateNode( SplittableParDo.PrimitiveBoundedRead transform, FlinkStreamingTranslationContext context) { - - PCollection output = context.getOutput(transform); - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - // Get source directly from PrimitiveBoundedRead (not via ReadTranslation) - BoundedSource rawSource = transform.getSource(); - - String fullName = getCurrentTransformName(context); - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkBoundedSource flinkBoundedSource = - FlinkSource.bounded( - fullName, - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - - DataStream> source = - context - .getExecutionEnvironment() - .fromSource( - flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) - .uid(fullName); - - context.setOutputDataStream(output, source); + translateBoundedSource(transform.getSource(), transform.getName(), context); } } @@ -503,11 +475,6 @@ private static class BoundedReadSourceTranslator @Override public void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - PCollection output = context.getOutput(transform); - - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - BoundedSource rawSource; try { rawSource = @@ -517,43 +484,7 @@ public void translateNode( } catch (IOException e) { throw new RuntimeException(e); } - - String fullName = getCurrentTransformName(context); - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkBoundedSource flinkBoundedSource = - FlinkSource.bounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - - TypeInformation> typeInfo = context.getTypeInfo(output); - - SingleOutputStreamOperator> source; - try { - source = - context - .getExecutionEnvironment() - .fromSource( - flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) - .uid(fullName) - .returns(typeInfo); - - if (!context.isStreaming() - && context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getForceSlotSharingGroup()) { - source = source.slotSharingGroup(FORCED_SLOT_GROUP); - } - } catch (Exception e) { - throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); - } - context.setOutputDataStream(output, source); + translateBoundedSource(rawSource, transform.getName(), context); } } From 5b58c20a6d6957ca32272340e6c1113a5dab0fdd Mon Sep 17 00:00:00 2001 From: Alek Mitrevski Date: Mon, 8 Dec 2025 17:27:22 -0700 Subject: [PATCH 5/5] Extract getSourceParallelism() helper method Extract the duplicated parallelism calculation logic from translateUnboundedSource and translateBoundedSource into a shared private helper method. --- .../FlinkStreamingTransformTranslators.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index d836d9bd8f52..c148aa16005e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -192,6 +192,12 @@ public static String getCurrentTransformName(FlinkStreamingTranslationContext co return context.getCurrentTransform().getFullName(); } + /** Returns the parallelism to use for source operators. */ + private static int getSourceParallelism(FlinkStreamingTranslationContext context) { + int maxParallelism = context.getExecutionEnvironment().getMaxParallelism(); + return maxParallelism > 0 ? maxParallelism : context.getExecutionEnvironment().getParallelism(); + } + // -------------------------------------------------------------------------------------------- // Transformation Implementations // -------------------------------------------------------------------------------------------- @@ -222,10 +228,7 @@ private static void translateUnboundedSource( String fullName = getCurrentTransformName(context); try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); + int parallelism = getSourceParallelism(context); FlinkUnboundedSource unboundedSource = FlinkSource.unbounded( @@ -274,10 +277,7 @@ private static void translateBoundedSource( TypeInformation> outputTypeInfo = context.getTypeInfo(output); String fullName = getCurrentTransformName(context); - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); + int parallelism = getSourceParallelism(context); FlinkBoundedSource flinkBoundedSource = FlinkSource.bounded(