diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index f34b74866575..24cf24af3e2d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1299,6 +1299,17 @@ public DataflowPipelineJob run(Pipeline pipeline) { if (shouldActAsStreaming(pipeline)) { options.setStreaming(true); + { + List experiments = + options.getExperiments() == null + ? new ArrayList<>() + : new ArrayList<>(options.getExperiments()); + // Experiment marking that the harness supports tag encoding v2 + // Backend will enable tag encoding v2 only if the harness supports it. + experiments.add("streaming_engine_state_tag_encoding_v2_supported"); + options.setExperiments(ImmutableList.copyOf(experiments)); + } + if (useUnifiedWorker(options)) { options.setEnableStreamingEngine(true); List experiments = diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index ee5a7e1d26c3..8c33123be6d5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.Matchers.instanceOf; @@ -1791,7 +1792,7 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception assertFalse(options.isEnableStreamingEngine()); assertThat( options.getExperiments(), - containsInAnyOrder( + hasItems( "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")); } @@ -1807,7 +1808,7 @@ public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception assertTrue(options.isEnableStreamingEngine()); assertThat( options.getExperiments(), - containsInAnyOrder( + hasItems( "beam_fn_api", "use_runner_v2", "use_unified_worker",