Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
Expand Down Expand Up @@ -122,7 +123,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>> sideInputCache;

private final WindmillTagEncoding windmillTagEncoding;
private WindmillTagEncoding windmillTagEncoding;
/**
* The current user-facing key for this execution context.
*
Expand Down Expand Up @@ -162,8 +163,7 @@ public StreamingModeExecutionContext(
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput,
boolean enableWindmillTagEncodingV2) {
boolean throwExceptionOnLargeOutput) {
super(
counterFactory,
metricsContainerRegistry,
Expand All @@ -174,10 +174,6 @@ public StreamingModeExecutionContext(
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.windmillTagEncoding =
enableWindmillTagEncodingV2
? WindmillTagEncodingV2.instance()
: WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
Expand Down Expand Up @@ -244,8 +240,13 @@ public void start(
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
StreamingGlobalConfig config = globalConfigHandle.getConfig();
// Snapshot the limits for entire bundle processing.
this.operationalLimits = globalConfigHandle.getConfig().operationalLimits();
this.operationalLimits = config.operationalLimits();
this.windmillTagEncoding =
config.enableStateTagEncodingV2()
? WindmillTagEncodingV2.instance()
: WindmillTagEncodingV1.instance();
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -61,6 +62,7 @@
@Internal
@ThreadSafe
public final class StreamingEngineComputationConfigFetcher implements ComputationConfig.Fetcher {

private static final Logger LOG =
LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
private static final String CONFIG_REFRESHER_THREAD_NAME = "GlobalPipelineConfigRefresher";
Expand Down Expand Up @@ -209,6 +211,14 @@ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask config) {
pipelineConfig.setUserWorkerJobSettings(settings);
}

Integer tagEncodingVersion = config.getStreamingEngineStateTagEncodingVersion();
if (tagEncodingVersion != null) {
Preconditions.checkState(tagEncodingVersion <= 2);
}
if (Objects.equals(2, tagEncodingVersion)) {
pipelineConfig.setEnableStateTagEncodingV2(true);
}

return pipelineConfig.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ public static StreamingGlobalConfig.Builder builder() {
return new AutoValue_StreamingGlobalConfig.Builder()
.setWindmillServiceEndpoints(ImmutableSet.of())
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
.setOperationalLimits(OperationalLimits.builder().build());
.setOperationalLimits(OperationalLimits.builder().build())
.setEnableStateTagEncodingV2(false);
}

public abstract OperationalLimits operationalLimits();

public abstract boolean enableStateTagEncodingV2();

public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();

public abstract UserWorkerRunnerV1Settings userWorkerJobSettings();
Expand All @@ -51,6 +54,8 @@ public abstract static class Builder {

public abstract Builder setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);

public abstract Builder setEnableStateTagEncodingV2(boolean enable);

public abstract StreamingGlobalConfig build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ final class ComputationWorkExecutorFactory {
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
"throw_exceptions_on_large_output";

// Experiment to enable tag encoding v2.
// Experiment is for testing by dataflow runner developers.
// Related logic could change anytime without notice.
// **DO NOT USE** on real workloads.
// Enabling the experiment could lead to state incompatibilities and broken jobs.
private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT =
"unstable_windmill_tag_encoding_v2";

private final DataflowWorkerHarnessOptions options;
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
private final ReaderCache readerCache;
Expand All @@ -105,7 +97,6 @@ final class ComputationWorkExecutorFactory {
private final IdGenerator idGenerator;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
private final boolean enableWindmillTagEncodingV2;

ComputationWorkExecutorFactory(
DataflowWorkerHarnessOptions options,
Expand Down Expand Up @@ -133,8 +124,6 @@ final class ComputationWorkExecutorFactory {
: StreamingDataflowWorker.MAX_SINK_BYTES;
this.throwExceptionOnLargeOutput =
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
this.enableWindmillTagEncodingV2 =
hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT);
}

private static Nodes.ParallelInstructionNode extractReadNode(
Expand Down Expand Up @@ -279,8 +268,7 @@ private StreamingModeExecutionContext createExecutionContext(
stageInfo.executionStateRegistry(),
globalConfigHandle,
maxSinkBytes,
throwExceptionOnLargeOutput,
enableWindmillTagEncodingV2);
throwExceptionOnLargeOutput);
}

private DataflowMapTaskExecutor createMapTaskExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -58,14 +59,15 @@
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.MetricsContainer;
Expand All @@ -92,6 +94,7 @@
/** Tests for {@link StreamingModeExecutionContext}. */
@RunWith(JUnit4.class)
public class StreamingModeExecutionContextTest {

@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
@Mock private SideInputStateFetcher sideInputStateFetcher;
@Mock private WindmillStateReader stateReader;
Expand All @@ -102,15 +105,15 @@ public class StreamingModeExecutionContextTest {
new StreamingModeExecutionStateRegistry();
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;
private FakeGlobalConfigHandle globalConfigHandle;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
StreamingGlobalConfigHandle globalConfigHandle =
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
globalConfigHandle = new FakeGlobalConfigHandle(StreamingGlobalConfig.builder().build());
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), "testStateFamily");
executionContext =
new StreamingModeExecutionContext(
Expand All @@ -133,8 +136,7 @@ public void setUp() {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false,
/*enableWindmillTagEncodingV2=*/ false);
/*throwExceptionOnLargeOutput=*/ false);
}

private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) {
Expand Down Expand Up @@ -406,4 +408,25 @@ public void stateSamplingInStreaming() {
sampler.stop();
}
}

@Test
public void testStateTagEncodingBasedOnConfig() {
for (Boolean isV2Encoding : Lists.newArrayList(Boolean.TRUE, Boolean.FALSE)) {
Class<?> expectedEncoding =
isV2Encoding ? WindmillTagEncodingV2.class : WindmillTagEncodingV1.class;
Windmill.WorkItemCommitRequest.Builder outputBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
globalConfigHandle.setConfig(
StreamingGlobalConfig.builder().setEnableStateTagEncodingV2(isV2Encoding).build());
executionContext.start(
"key",
createMockWork(
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(),
Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
outputBuilder);
assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,7 @@ public void testReadUnboundedReader() throws Exception {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false,
/*enableWindmillTagEncodingV2=*/ false);
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 10;
Expand Down Expand Up @@ -990,8 +989,7 @@ public void testFailedWorkItemsAbort() throws Exception {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false,
/*enableWindmillTagEncodingV2=*/ false);
/*throwExceptionOnLargeOutput=*/ false);

options.setNumWorkers(5);
int maxElements = 100;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.streaming.config;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
Expand All @@ -29,7 +30,9 @@
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand All @@ -47,6 +50,7 @@

@RunWith(JUnit4.class)
public class StreamingEngineComputationConfigFetcherTest {

private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class, new Returns(Optional.empty()));
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
Expand Down Expand Up @@ -231,4 +235,38 @@ public void testGetComputationConfig_fetchConfigFromDataflowError() throws IOExc
() -> streamingEngineConfigFetcher.fetchConfig("someComputationId"));
assertThat(fetchConfigError).isSameInstanceAs(e);
}

@Test
public void test_streamingEngineStateTagEncodingVersion()
throws IOException, InterruptedException {
for (Optional<Integer> tagEncoding :
Arrays.<Optional<Integer>>asList(Optional.empty(), Optional.of(1), Optional.of(2))) {
StreamingConfigTask streamingConfigTask =
new StreamingConfigTask().setMaxWorkItemCommitBytes(100L);
tagEncoding.ifPresent(
version -> streamingConfigTask.setStreamingEngineStateTagEncodingVersion(version));
WorkItem initialConfig =
new WorkItem().setJobId("job").setStreamingConfigTask(streamingConfigTask);
CountDownLatch waitForInitialConfig = new CountDownLatch(1);
Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
.thenReturn(Optional.of(initialConfig));
StreamingGlobalConfigHandleImpl globalConfigHandle = new StreamingGlobalConfigHandleImpl();
globalConfigHandle.registerConfigObserver(
config -> {
receivedPipelineConfig.add(config);
waitForInitialConfig.countDown();
});
streamingEngineConfigFetcher =
createConfigFetcher(/* waitForInitialConfig= */ true, 0, globalConfigHandle);
Thread asyncStartConfigLoader = new Thread(streamingEngineConfigFetcher::start);
asyncStartConfigLoader.start();
waitForInitialConfig.await();
asyncStartConfigLoader.join();
assertEquals(1, receivedPipelineConfig.size());
assertEquals(
Objects.equals(2, streamingConfigTask.getStreamingEngineStateTagEncodingVersion()),
receivedPipelineConfig.iterator().next().enableStateTagEncodingV2());
}
}
}
Loading