From e26023e7d862fcf7867979d62186ad5924ac6c51 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 23 Feb 2026 12:26:11 +0000 Subject: [PATCH 1/3] [Dataflow Streaming] Mark support for windmill state tag encoding v2 The added experiment informs backend of the support and make jobs having this change eligible for state tag encoding v2. Post submit tests with v2 encoding passes and internal test suites pass with encoding v2. --- .../beam/runners/dataflow/DataflowRunner.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f34b74866575..85d2ce0cc544 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1299,6 +1299,17 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (shouldActAsStreaming(pipeline)) { options.setStreaming(true); + { + List experiments = + options.getExperiments() == null + ? new ArrayList<>() + : new ArrayList<>(options.getExperiments()); + // Experiment marking that the harness supports tag encoding v2 + // Backend will enable tag encoding v2 only if the harness supports it. + experiments.add("streaming_engine_state_tag_encoding_v2_supported"); + options.setExperiments(ImmutableList.copyOf(experiments)); + } + if (useUnifiedWorker(options)) { options.setEnableStreamingEngine(true); List experiments = @@ -1309,6 +1320,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (!experiments.contains("enable_windmill_service")) { experiments.add("enable_windmill_service"); } + // TODO: The experiments added above are not included in the list of experiments sent to the + // backend, an options.setExperiments call is missing. } } From 65e26f98ec4366a1d19fc54c02fbe5045ff96b1a Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 23 Feb 2026 13:31:32 +0000 Subject: [PATCH 2/3] remove todo --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 85d2ce0cc544..24cf24af3e2d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1320,8 +1320,6 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (!experiments.contains("enable_windmill_service")) { experiments.add("enable_windmill_service"); } - // TODO: The experiments added above are not included in the list of experiments sent to the - // backend, an options.setExperiments call is missing. } } From f5a3186ea475ddbd4c509baaaa0eae7aa5458e5a Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 23 Feb 2026 14:32:41 +0000 Subject: [PATCH 3/3] fix test --- .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ee5a7e1d26c3..8c33123be6d5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.instanceOf; @@ -1791,7 +1792,7 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception assertFalse(options.isEnableStreamingEngine()); assertThat( options.getExperiments(), - containsInAnyOrder( + hasItems( "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")); } @@ -1807,7 +1808,7 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception assertTrue(options.isEnableStreamingEngine()); assertThat( options.getExperiments(), - containsInAnyOrder( + hasItems( "beam_fn_api", "use_runner_v2", "use_unified_worker",