From 50f7eaff05379f8d154df72ba41341669178027d Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 30 Jan 2026 23:48:41 +0000 Subject: [PATCH 1/5] Upgrade google-api-services-dataflow --- .../main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index e64d7b1ebee3..a357e6e9256b 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -739,7 +739,7 @@ class BeamModulePlugin implements Plugin { google_api_common : "com.google.api:api-common", // google_cloud_platform_libraries_bom sets version google_api_services_bigquery : "com.google.apis:google-api-services-bigquery:v2-rev20251012-2.0.0", // [bomupgrader] sets version google_api_services_cloudresourcemanager : "com.google.apis:google-api-services-cloudresourcemanager:v1-rev20250606-2.0.0", // [bomupgrader] sets version - google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20250519-$google_clients_version", + google_api_services_dataflow : "com.google.apis:google-api-services-dataflow:v1b3-rev20260118-$google_clients_version", google_api_services_healthcare : "com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version", google_api_services_pubsub : "com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version", google_api_services_storage : "com.google.apis:google-api-services-storage:v1-rev20251118-2.0.0", // [bomupgrader] sets version From 099bee8f6a91e54a52bf4d67c1330cf20d631f64 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Sat, 31 Jan 2026 00:15:29 +0000 Subject: [PATCH 2/5] [Dataflow Streaming] Enable state tag encoding v2 based on backend flag --- .../worker/StreamingModeExecutionContext.java | 13 ++-- ...reamingEngineComputationConfigFetcher.java | 5 ++ .../config/StreamingGlobalConfig.java | 7 ++- .../ComputationWorkExecutorFactory.java | 14 +---- .../StreamingModeExecutionContextTest.java | 61 ++++++++++++++++++- .../worker/WorkerCustomSourcesTest.java | 6 +- ...ingEngineComputationConfigFetcherTest.java | 39 ++++++++++++ 7 files changed, 118 insertions(+), 27 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 09afcadc3002..83727acc93ec 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -122,7 +122,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext, Map>> sideInputCache; - private final WindmillTagEncoding windmillTagEncoding; + private WindmillTagEncoding windmillTagEncoding; /** * The current user-facing key for this execution context. * @@ -162,8 +162,7 @@ public StreamingModeExecutionContext( StreamingModeExecutionStateRegistry executionStateRegistry, StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, - boolean throwExceptionOnLargeOutput, - boolean enableWindmillTagEncodingV2) { + boolean throwExceptionOnLargeOutput) { super( counterFactory, metricsContainerRegistry, @@ -174,10 +173,6 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); - this.windmillTagEncoding = - enableWindmillTagEncodingV2 - ? WindmillTagEncodingV2.instance() - : WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; @@ -246,6 +241,10 @@ public void start( this.sideInputStateFetcher = sideInputStateFetcher; // Snapshot the limits for entire bundle processing. this.operationalLimits = globalConfigHandle.getConfig().operationalLimits(); + this.windmillTagEncoding = + globalConfigHandle.getConfig().enableStateTagEncodingV2() + ? WindmillTagEncodingV2.instance() + : WindmillTagEncodingV1.instance(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); clearSinkFullHint(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 73f2afc04223..9a815c6982fd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.StreamingConfigTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -209,6 +210,10 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { pipelineConfig.setUserWorkerJobSettings(settings); } + if (Objects.equals(2, config.getStreamingEngineStateTagEncodingVersion())) { + pipelineConfig.setEnableStateTagEncodingV2(true); + } + return pipelineConfig.build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java index 8f76f5ec27af..c1347e338c73 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java @@ -33,11 +33,14 @@ public static StreamingGlobalConfig.Builder builder() { return new AutoValue_StreamingGlobalConfig.Builder() .setWindmillServiceEndpoints(ImmutableSet.of()) .setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build()) - .setOperationalLimits(OperationalLimits.builder().build()); + .setOperationalLimits(OperationalLimits.builder().build()) + .setEnableStateTagEncodingV2(false); } public abstract OperationalLimits operationalLimits(); + public abstract boolean enableStateTagEncodingV2(); + public abstract ImmutableSet windmillServiceEndpoints(); public abstract UserWorkerRunnerV1Settings userWorkerJobSettings(); @@ -51,6 +54,8 @@ public abstract static class Builder { public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings); + public abstract Builder setEnableStateTagEncodingV2(boolean enable); + public abstract StreamingGlobalConfig build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index 097da87fb015..269799903300 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -74,14 +74,6 @@ final class ComputationWorkExecutorFactory { private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT = "throw_exceptions_on_large_output"; - // Experiment to enable tag encoding v2. - // Experiment is for testing by dataflow runner developers. - // Related logic could change anytime without notice. - // **DO NOT USE** on real workloads. - // Enabling the experiment could lead to state incompatibilities and broken jobs. - private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT = - "unstable_windmill_tag_encoding_v2"; - private final DataflowWorkerHarnessOptions options; private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; private final ReaderCache readerCache; @@ -105,7 +97,6 @@ final class ComputationWorkExecutorFactory { private final IdGenerator idGenerator; private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; - private final boolean enableWindmillTagEncodingV2; ComputationWorkExecutorFactory( DataflowWorkerHarnessOptions options, @@ -133,8 +124,6 @@ final class ComputationWorkExecutorFactory { : StreamingDataflowWorker.MAX_SINK_BYTES; this.throwExceptionOnLargeOutput = hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT); - this.enableWindmillTagEncodingV2 = - hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT); } private static Nodes.ParallelInstructionNode extractReadNode( @@ -279,8 +268,7 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.executionStateRegistry(), globalConfigHandle, maxSinkBytes, - throwExceptionOnLargeOutput, - enableWindmillTagEncodingV2); + throwExceptionOnLargeOutput); } private DataflowMapTaskExecutor createMapTaskExecutor( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 8372b33d81c8..cb2cfa9129ae 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -66,6 +67,8 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -92,6 +95,7 @@ /** Tests for {@link StreamingModeExecutionContext}. */ @RunWith(JUnit4.class) public class StreamingModeExecutionContextTest { + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @Mock private SideInputStateFetcher sideInputStateFetcher; @Mock private WindmillStateReader stateReader; @@ -133,8 +137,7 @@ public void setUp() { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); } private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) { @@ -406,4 +409,58 @@ public void stateSamplingInStreaming() { sampler.stop(); } } + + @Test + public void testStateTagEncodingBasedOnConfig() { + for (Class expectedEncoding : + Lists.newArrayList(WindmillTagEncodingV1.class, WindmillTagEncodingV2.class)) { + CounterSet counterSet = new CounterSet(); + ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); + StreamingGlobalConfigHandle globalConfigHandle = + new FixedGlobalConfigHandle( + StreamingGlobalConfig.builder() + .setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding)) + .build()); + stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); + executionContext = + new StreamingModeExecutionContext( + counterSet, + COMPUTATION_ID, + new ReaderCache(Duration.standardMinutes(1), Executors.newCachedThreadPool()), + stateNameMap, + WindmillStateCache.builder() + .setSizeMb(options.getWorkerCacheMb()) + .build() + .forComputation("comp"), + StreamingStepMetricsContainer.createRegistry(), + new DataflowExecutionStateTracker( + ExecutionStateSampler.newForTest(), + executionStateRegistry.getState( + NameContext.forStage("stage"), "other", null, NoopProfileScope.NOOP), + counterSet, + PipelineOptionsFactory.create(), + "test-work-item-id"), + executionStateRegistry, + globalConfigHandle, + Long.MAX_VALUE, + /*throwExceptionOnLargeOutput=*/ false); + Windmill.WorkItemCommitRequest.Builder outputBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + NameContext nameContext = NameContextsForTests.nameContextForTest(); + DataflowOperationContext operationContext = + executionContext.createOperationContext(nameContext); + StreamingModeExecutionContext.StepContext stepContext = + executionContext.getStepContext(operationContext); + + executionContext.start( + "key", + createMockWork( + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(), + Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), + stateReader, + sideInputStateFetcher, + outputBuilder); + assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index f7364104f5db..334b9414b26b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -618,8 +618,7 @@ public void testReadUnboundedReader() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 10; @@ -990,8 +989,7 @@ public void testFailedWorkItemsAbort() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 100; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 3a0ae7bb2084..f074a3c52515 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.streaming.config; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -29,7 +30,9 @@ import com.google.api.services.dataflow.model.StreamingConfigTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -47,6 +50,7 @@ @RunWith(JUnit4.class) public class StreamingEngineComputationConfigFetcherTest { + private final WorkUnitClient mockDataflowServiceClient = mock(WorkUnitClient.class, new Returns(Optional.empty())); private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher; @@ -231,4 +235,39 @@ public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOExc () -> streamingEngineConfigFetcher.fetchConfig("someComputationId")); assertThat(fetchConfigError).isSameInstanceAs(e); } + + @Test + public void test_streamingEngineStateTagEncodingVersion() + throws IOException, InterruptedException { + for (Optional tagEncoding : + Arrays.>asList( + Optional.empty(), Optional.of(1), Optional.of(2), Optional.of(3))) { + StreamingConfigTask streamingConfigTask = + new StreamingConfigTask().setMaxWorkItemCommitBytes(100L); + tagEncoding.ifPresent( + version -> streamingConfigTask.setStreamingEngineStateTagEncodingVersion(version)); + WorkItem initialConfig = + new WorkItem().setJobId("job").setStreamingConfigTask(streamingConfigTask); + CountDownLatch waitForInitialConfig = new CountDownLatch(1); + Set receivedPipelineConfig = new HashSet<>(); + when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) + .thenReturn(Optional.of(initialConfig)); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.registerConfigObserver( + config -> { + receivedPipelineConfig.add(config); + waitForInitialConfig.countDown(); + }); + streamingEngineConfigFetcher = + createConfigFetcher(/* waitForInitialConfig= */ true, 0, globalConfigHandle); + Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); + asyncStartConfigLoader.start(); + waitForInitialConfig.await(); + asyncStartConfigLoader.join(); + assertEquals(1, receivedPipelineConfig.size()); + assertEquals( + Objects.equals(2, streamingConfigTask.getStreamingEngineStateTagEncodingVersion()), + receivedPipelineConfig.iterator().next().enableStateTagEncodingV2()); + } + } } From 8164e5bdf1eebf8b2734ae788b32d65bc7fedf9f Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 3 Feb 2026 05:54:52 +0000 Subject: [PATCH 3/5] Address commits --- .../worker/StreamingModeExecutionContext.java | 6 ++- .../StreamingModeExecutionContextTest.java | 50 +++---------------- 2 files changed, 12 insertions(+), 44 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 83727acc93ec..2c936f88e28c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; @@ -239,10 +240,11 @@ public void start( this.work = work; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; + StreamingGlobalConfig config = globalConfigHandle.getConfig(); // Snapshot the limits for entire bundle processing. - this.operationalLimits = globalConfigHandle.getConfig().operationalLimits(); + this.operationalLimits = config.operationalLimits(); this.windmillTagEncoding = - globalConfigHandle.getConfig().enableStateTagEncodingV2() + config.enableStateTagEncodingV2() ? WindmillTagEncodingV2.instance() : WindmillTagEncodingV1.instance(); this.outputBuilder = outputBuilder; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index cb2cfa9129ae..449d1cc27cc7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -59,9 +59,8 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; @@ -106,6 +105,7 @@ public class StreamingModeExecutionContextTest { new StreamingModeExecutionStateRegistry(); private StreamingModeExecutionContext executionContext; DataflowWorkerHarnessOptions options; + private FakeGlobalConfigHandle globalConfigHandle; @Before public void setUp() { @@ -113,8 +113,7 @@ public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); CounterSet counterSet = new CounterSet(); ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); - StreamingGlobalConfigHandle globalConfigHandle = - new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); + globalConfigHandle = new FakeGlobalConfigHandle(StreamingGlobalConfig.builder().build()); stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); executionContext = new StreamingModeExecutionContext( @@ -412,46 +411,13 @@ public void stateSamplingInStreaming() { @Test public void testStateTagEncodingBasedOnConfig() { - for (Class expectedEncoding : - Lists.newArrayList(WindmillTagEncodingV1.class, WindmillTagEncodingV2.class)) { - CounterSet counterSet = new CounterSet(); - ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); - StreamingGlobalConfigHandle globalConfigHandle = - new FixedGlobalConfigHandle( - StreamingGlobalConfig.builder() - .setEnableStateTagEncodingV2(WindmillTagEncodingV2.class.equals(expectedEncoding)) - .build()); - stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); - executionContext = - new StreamingModeExecutionContext( - counterSet, - COMPUTATION_ID, - new ReaderCache(Duration.standardMinutes(1), Executors.newCachedThreadPool()), - stateNameMap, - WindmillStateCache.builder() - .setSizeMb(options.getWorkerCacheMb()) - .build() - .forComputation("comp"), - StreamingStepMetricsContainer.createRegistry(), - new DataflowExecutionStateTracker( - ExecutionStateSampler.newForTest(), - executionStateRegistry.getState( - NameContext.forStage("stage"), "other", null, NoopProfileScope.NOOP), - counterSet, - PipelineOptionsFactory.create(), - "test-work-item-id"), - executionStateRegistry, - globalConfigHandle, - Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false); + for (Boolean isV2Encoding : Lists.newArrayList(Boolean.TRUE, Boolean.FALSE)) { + Class expectedEncoding = + isV2Encoding ? WindmillTagEncodingV2.class : WindmillTagEncodingV1.class; Windmill.WorkItemCommitRequest.Builder outputBuilder = Windmill.WorkItemCommitRequest.newBuilder(); - NameContext nameContext = NameContextsForTests.nameContextForTest(); - DataflowOperationContext operationContext = - executionContext.createOperationContext(nameContext); - StreamingModeExecutionContext.StepContext stepContext = - executionContext.getStepContext(operationContext); - + globalConfigHandle.setConfig( + StreamingGlobalConfig.builder().setEnableStateTagEncodingV2(isV2Encoding).build()); executionContext.start( "key", createMockWork( From 1160c4a839c873a4246e7a2fe804cfd711093576 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Tue, 3 Feb 2026 06:01:21 +0000 Subject: [PATCH 4/5] Address commits --- .../config/StreamingEngineComputationConfigFetcher.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 9a815c6982fd..0f50c04d1f0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -62,6 +62,7 @@ @Internal @ThreadSafe public final class StreamingEngineComputationConfigFetcher implements ComputationConfig.Fetcher { + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class); private static final String CONFIG_REFRESHER_THREAD_NAME = "GlobalPipelineConfigRefresher"; @@ -210,7 +211,11 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { pipelineConfig.setUserWorkerJobSettings(settings); } - if (Objects.equals(2, config.getStreamingEngineStateTagEncodingVersion())) { + Integer tagEncodingVersion = config.getStreamingEngineStateTagEncodingVersion(); + if (tagEncodingVersion != null) { + Preconditions.checkState(tagEncodingVersion <= 2); + } + if (Objects.equals(2, tagEncodingVersion)) { pipelineConfig.setEnableStateTagEncodingV2(true); } From 4e967fe1737f38bffcc9da2c9e754326e87c540b Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Wed, 4 Feb 2026 00:00:22 +0000 Subject: [PATCH 5/5] Fix test --- .../config/StreamingEngineComputationConfigFetcherTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index f074a3c52515..8b181a7abda5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -240,8 +240,7 @@ public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOExc public void test_streamingEngineStateTagEncodingVersion() throws IOException, InterruptedException { for (Optional tagEncoding : - Arrays.>asList( - Optional.empty(), Optional.of(1), Optional.of(2), Optional.of(3))) { + Arrays.>asList(Optional.empty(), Optional.of(1), Optional.of(2))) { StreamingConfigTask streamingConfigTask = new StreamingConfigTask().setMaxWorkItemCommitBytes(100L); tagEncoding.ifPresent(