diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index 09afcadc3002..2c936f88e28c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState; @@ -122,7 +123,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext, Map>> sideInputCache; - private final WindmillTagEncoding windmillTagEncoding; + private WindmillTagEncoding windmillTagEncoding; /** * The current user-facing key for this execution context. * @@ -162,8 +163,7 @@ public StreamingModeExecutionContext( StreamingModeExecutionStateRegistry executionStateRegistry, StreamingGlobalConfigHandle globalConfigHandle, long sinkByteLimit, - boolean throwExceptionOnLargeOutput, - boolean enableWindmillTagEncodingV2) { + boolean throwExceptionOnLargeOutput) { super( counterFactory, metricsContainerRegistry, @@ -174,10 +174,6 @@ public StreamingModeExecutionContext( this.readerCache = readerCache; this.globalConfigHandle = globalConfigHandle; this.sideInputCache = new HashMap<>(); - this.windmillTagEncoding = - enableWindmillTagEncodingV2 - ? WindmillTagEncodingV2.instance() - : WindmillTagEncodingV1.instance(); this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; @@ -244,8 +240,13 @@ public void start( this.work = work; this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey()); this.sideInputStateFetcher = sideInputStateFetcher; + StreamingGlobalConfig config = globalConfigHandle.getConfig(); // Snapshot the limits for entire bundle processing. - this.operationalLimits = globalConfigHandle.getConfig().operationalLimits(); + this.operationalLimits = config.operationalLimits(); + this.windmillTagEncoding = + config.enableStateTagEncodingV2() + ? WindmillTagEncodingV2.instance() + : WindmillTagEncodingV1.instance(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); clearSinkFullHint(); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 73f2afc04223..0f50c04d1f0b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -25,6 +25,7 @@ import com.google.api.services.dataflow.model.StreamingConfigTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -61,6 +62,7 @@ @Internal @ThreadSafe public final class StreamingEngineComputationConfigFetcher implements ComputationConfig.Fetcher { + private static final Logger LOG = LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class); private static final String CONFIG_REFRESHER_THREAD_NAME = "GlobalPipelineConfigRefresher"; @@ -209,6 +211,14 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) { pipelineConfig.setUserWorkerJobSettings(settings); } + Integer tagEncodingVersion = config.getStreamingEngineStateTagEncodingVersion(); + if (tagEncodingVersion != null) { + Preconditions.checkState(tagEncodingVersion <= 2); + } + if (Objects.equals(2, tagEncodingVersion)) { + pipelineConfig.setEnableStateTagEncodingV2(true); + } + return pipelineConfig.build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java index 8f76f5ec27af..c1347e338c73 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java @@ -33,11 +33,14 @@ public static StreamingGlobalConfig.Builder builder() { return new AutoValue_StreamingGlobalConfig.Builder() .setWindmillServiceEndpoints(ImmutableSet.of()) .setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build()) - .setOperationalLimits(OperationalLimits.builder().build()); + .setOperationalLimits(OperationalLimits.builder().build()) + .setEnableStateTagEncodingV2(false); } public abstract OperationalLimits operationalLimits(); + public abstract boolean enableStateTagEncodingV2(); + public abstract ImmutableSet windmillServiceEndpoints(); public abstract UserWorkerRunnerV1Settings userWorkerJobSettings(); @@ -51,6 +54,8 @@ public abstract static class Builder { public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings); + public abstract Builder setEnableStateTagEncodingV2(boolean enable); + public abstract StreamingGlobalConfig build(); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index 097da87fb015..269799903300 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -74,14 +74,6 @@ final class ComputationWorkExecutorFactory { private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT = "throw_exceptions_on_large_output"; - // Experiment to enable tag encoding v2. - // Experiment is for testing by dataflow runner developers. - // Related logic could change anytime without notice. - // **DO NOT USE** on real workloads. - // Enabling the experiment could lead to state incompatibilities and broken jobs. - private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT = - "unstable_windmill_tag_encoding_v2"; - private final DataflowWorkerHarnessOptions options; private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; private final ReaderCache readerCache; @@ -105,7 +97,6 @@ final class ComputationWorkExecutorFactory { private final IdGenerator idGenerator; private final StreamingGlobalConfigHandle globalConfigHandle; private final boolean throwExceptionOnLargeOutput; - private final boolean enableWindmillTagEncodingV2; ComputationWorkExecutorFactory( DataflowWorkerHarnessOptions options, @@ -133,8 +124,6 @@ final class ComputationWorkExecutorFactory { : StreamingDataflowWorker.MAX_SINK_BYTES; this.throwExceptionOnLargeOutput = hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT); - this.enableWindmillTagEncodingV2 = - hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT); } private static Nodes.ParallelInstructionNode extractReadNode( @@ -279,8 +268,7 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.executionStateRegistry(), globalConfigHandle, maxSinkBytes, - throwExceptionOnLargeOutput, - enableWindmillTagEncodingV2); + throwExceptionOnLargeOutput); } private DataflowMapTaskExecutor createMapTaskExecutor( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 8372b33d81c8..449d1cc27cc7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -58,14 +59,15 @@ import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; import org.apache.beam.runners.dataflow.worker.streaming.Watermarks; import org.apache.beam.runners.dataflow.worker.streaming.Work; -import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; +import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.metrics.MetricsContainer; @@ -92,6 +94,7 @@ /** Tests for {@link StreamingModeExecutionContext}. */ @RunWith(JUnit4.class) public class StreamingModeExecutionContextTest { + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); @Mock private SideInputStateFetcher sideInputStateFetcher; @Mock private WindmillStateReader stateReader; @@ -102,6 +105,7 @@ public class StreamingModeExecutionContextTest { new StreamingModeExecutionStateRegistry(); private StreamingModeExecutionContext executionContext; DataflowWorkerHarnessOptions options; + private FakeGlobalConfigHandle globalConfigHandle; @Before public void setUp() { @@ -109,8 +113,7 @@ public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); CounterSet counterSet = new CounterSet(); ConcurrentHashMap stateNameMap = new ConcurrentHashMap<>(); - StreamingGlobalConfigHandle globalConfigHandle = - new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()); + globalConfigHandle = new FakeGlobalConfigHandle(StreamingGlobalConfig.builder().build()); stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily"); executionContext = new StreamingModeExecutionContext( @@ -133,8 +136,7 @@ public void setUp() { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); } private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) { @@ -406,4 +408,25 @@ public void stateSamplingInStreaming() { sampler.stop(); } } + + @Test + public void testStateTagEncodingBasedOnConfig() { + for (Boolean isV2Encoding : Lists.newArrayList(Boolean.TRUE, Boolean.FALSE)) { + Class expectedEncoding = + isV2Encoding ? WindmillTagEncodingV2.class : WindmillTagEncodingV1.class; + Windmill.WorkItemCommitRequest.Builder outputBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + globalConfigHandle.setConfig( + StreamingGlobalConfig.builder().setEnableStateTagEncodingV2(isV2Encoding).build()); + executionContext.start( + "key", + createMockWork( + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(), + Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), + stateReader, + sideInputStateFetcher, + outputBuilder); + assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index f7364104f5db..334b9414b26b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -618,8 +618,7 @@ public void testReadUnboundedReader() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 10; @@ -990,8 +989,7 @@ public void testFailedWorkItemsAbort() throws Exception { executionStateRegistry, globalConfigHandle, Long.MAX_VALUE, - /*throwExceptionOnLargeOutput=*/ false, - /*enableWindmillTagEncodingV2=*/ false); + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 100; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java index 3a0ae7bb2084..8b181a7abda5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.worker.streaming.config; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -29,7 +30,9 @@ import com.google.api.services.dataflow.model.StreamingConfigTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -47,6 +50,7 @@ @RunWith(JUnit4.class) public class StreamingEngineComputationConfigFetcherTest { + private final WorkUnitClient mockDataflowServiceClient = mock(WorkUnitClient.class, new Returns(Optional.empty())); private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher; @@ -231,4 +235,38 @@ public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOExc () -> streamingEngineConfigFetcher.fetchConfig("someComputationId")); assertThat(fetchConfigError).isSameInstanceAs(e); } + + @Test + public void test_streamingEngineStateTagEncodingVersion() + throws IOException, InterruptedException { + for (Optional tagEncoding : + Arrays.>asList(Optional.empty(), Optional.of(1), Optional.of(2))) { + StreamingConfigTask streamingConfigTask = + new StreamingConfigTask().setMaxWorkItemCommitBytes(100L); + tagEncoding.ifPresent( + version -> streamingConfigTask.setStreamingEngineStateTagEncodingVersion(version)); + WorkItem initialConfig = + new WorkItem().setJobId("job").setStreamingConfigTask(streamingConfigTask); + CountDownLatch waitForInitialConfig = new CountDownLatch(1); + Set receivedPipelineConfig = new HashSet<>(); + when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem()) + .thenReturn(Optional.of(initialConfig)); + StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl(); + globalConfigHandle.registerConfigObserver( + config -> { + receivedPipelineConfig.add(config); + waitForInitialConfig.countDown(); + }); + streamingEngineConfigFetcher = + createConfigFetcher(/* waitForInitialConfig= */ true, 0, globalConfigHandle); + Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start); + asyncStartConfigLoader.start(); + waitForInitialConfig.await(); + asyncStartConfigLoader.join(); + assertEquals(1, receivedPipelineConfig.size()); + assertEquals( + Objects.equals(2, streamingConfigTask.getStreamingEngineStateTagEncodingVersion()), + receivedPipelineConfig.iterator().next().enableStateTagEncodingV2()); + } + } }