Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ <T> void addIncomingTimerEndpoint(
* instant provides the timeout on how long the finalization callback is valid for.
*/
DoFn.BundleFinalizer getBundleFinalizer();

/**
* Returns true if the runner has no state for the keys in the ProcessBundleRequest. If true,
* the SDK can begin stateful processing with an initial empty state.
*/
boolean getHasNoState();

/**
* Returns true if the runner will never process another bundle for the keys it contains.
* Therefore, the generated state need not be included in the bundle commit.
*/
boolean getOnlyBundleForKeys();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ private void addRunnerAndConsumersForPTransformRecursively(
BundleFinalizer bundleFinalizer,
Collection<BeamFnDataReadRunner<?>> channelRoots,
Map<ApiServiceDescriptor, BeamFnDataOutboundAggregator> outboundAggregatorMap,
Set<String> runnerCapabilities)
Set<String> runnerCapabilities,
boolean hasNoState,
boolean onlyBundleForKeys)
throws IOException {

// Recursively ensure that all consumers of the output PCollection have been created.
Expand Down Expand Up @@ -279,7 +281,9 @@ private void addRunnerAndConsumersForPTransformRecursively(
bundleFinalizer,
channelRoots,
outboundAggregatorMap,
runnerCapabilities);
runnerCapabilities,
hasNoState,
onlyBundleForKeys);
}
}

Expand Down Expand Up @@ -488,6 +492,16 @@ public BundleSplitListener getSplitListener() {
public BundleFinalizer getBundleFinalizer() {
return bundleFinalizer;
}

@Override
public boolean getHasNoState() {
return hasNoState;
}

@Override
public boolean getOnlyBundleForKeys() {
return onlyBundleForKeys;
}
});
processedPTransformIds.add(pTransformId);
}
Expand Down Expand Up @@ -913,7 +927,9 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
bundleFinalizer,
bundleProcessor.getChannelRoots(),
bundleProcessor.getOutboundAggregators(),
bundleProcessor.getRunnerCapabilities());
bundleProcessor.getRunnerCapabilities(),
processBundleRequest.getHasNoState(),
processBundleRequest.getOnlyBundleForKeys());
}
bundleProcessor.finish();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,33 @@ public class BagUserState<T> {
private List<T> newValues;
private boolean isCleared;
private boolean isClosed;
private final boolean hasNoState;
private final boolean onlyBundleForKeys;

static final int BAG_APPEND_BATCHING_LIMIT = 10 * 1024 * 1024;

/** The cache must be namespaced for this state object accordingly. */
public BagUserState(
Cache<?, ?> cache,
BeamFnStateClient beamFnStateClient,
String instructionId,
StateKey stateKey,
Coder<T> valueCoder) {
Cache<?, ?> cache,
BeamFnStateClient beamFnStateClient,
String instructionId,
StateKey stateKey,
Coder<T> valueCoder,
boolean hasNoState,
boolean onlyBundleForKeys) {
checkArgument(
stateKey.hasBagUserState(), "Expected BagUserState StateKey but received %s.", stateKey);
this.cache = cache;
this.beamFnStateClient = beamFnStateClient;
this.valueCoder = valueCoder;
this.hasNoState = hasNoState;
this.onlyBundleForKeys = onlyBundleForKeys;
this.request =
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();

this.oldValues =
StateFetchingIterators.readAllAndDecodeStartingFrom(
this.cache, beamFnStateClient, request, valueCoder);
this.cache, beamFnStateClient, request, valueCoder, hasNoState);
this.newValues = new ArrayList<>();
}

Expand Down Expand Up @@ -127,7 +133,7 @@ public void asyncClose() throws Exception {
beamFnStateClient.handle(
request.toBuilder().setClear(StateClearRequest.getDefaultInstance()));
}
if (!newValues.isEmpty()) {
if (!onlyBundleForKeys && !newValues.isEmpty()) {
// Batch values up to a arbitrary limit to reduce overhead of write
// requests. We treat this limit as strict to ensure that large elements
// are not batched as they may otherwise exceed runner limits.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ public static class Factory<K> {
private final Map<TupleTag<?>, SideInputSpec> sideInputSpecMap;
private final Coder<K> keyCoder;
private final Coder<BoundedWindow> windowCoder;
private final boolean hasNoState;
private final boolean onlyBundleForKeys;

public Factory(
PipelineOptions pipelineOptions,
Expand All @@ -126,7 +128,9 @@ public Factory(
Map<TupleTag<?>, SideInputSpec> sideInputSpecMap,
BeamFnStateClient beamFnStateClient,
Coder<K> keyCoder,
Coder<BoundedWindow> windowCoder) {
Coder<BoundedWindow> windowCoder,
boolean hasNoState,
boolean onlyBundleForKeys) {
this.pipelineOptions = pipelineOptions;
this.runnerCapabilities = runnerCapabilities;
this.ptransformId = ptransformId;
Expand All @@ -138,6 +142,8 @@ public Factory(
this.beamFnStateClient = beamFnStateClient;
this.keyCoder = keyCoder;
this.windowCoder = windowCoder;
this.hasNoState = hasNoState;
this.onlyBundleForKeys = onlyBundleForKeys;
}

public static <K> Factory<K> factoryForPTransformContext(
Expand Down Expand Up @@ -220,7 +226,9 @@ public static <K> Factory<K> factoryForPTransformContext(
tagToSideInputSpecMap,
context.getBeamFnStateClient(),
keyCoder,
windowCoder);
windowCoder,
context.getHasNoState(),
context.getOnlyBundleForKeys());
}

public FnApiStateAccessor<K> create() {
Expand All @@ -235,7 +243,9 @@ public FnApiStateAccessor<K> create() {
sideInputSpecMap,
beamFnStateClient,
keyCoder,
windowCoder);
windowCoder,
hasNoState,
onlyBundleForKeys);
}
}

Expand All @@ -252,6 +262,8 @@ public FnApiStateAccessor<K> create() {
private final Collection<ThrowingRunnable> stateFinalizers;
private final Coder<K> keyCoder;
private final Coder<BoundedWindow> windowCoder;
private final boolean hasNoState;
private final boolean onlyBundleForKeys;

private @Nullable Supplier<BoundedWindow> currentWindowSupplier;
private @Nullable Supplier<ByteString> encodedCurrentKeySupplier;
Expand All @@ -268,7 +280,9 @@ public FnApiStateAccessor(
Map<TupleTag<?>, SideInputSpec> sideInputSpecMap,
BeamFnStateClient beamFnStateClient,
Coder<K> keyCoder,
Coder<BoundedWindow> windowCoder) {
Coder<BoundedWindow> windowCoder,
boolean hasNoState,
boolean onlyBundleForKeys) {
this.pipelineOptions = pipelineOptions;
this.runnerCapabilities = runnerCapabilities;
this.stateKeyObjectCache = Maps.newHashMap();
Expand All @@ -282,6 +296,8 @@ public FnApiStateAccessor(
this.keyCoder = keyCoder;
this.windowCoder = windowCoder;
this.stateFinalizers = new ArrayList<>();
this.hasNoState = hasNoState;
this.onlyBundleForKeys = onlyBundleForKeys;
}

public void setKeyAndWindowContext(MutatingStateContext<K, BoundedWindow> keyAndWindowContext) {
Expand Down Expand Up @@ -417,7 +433,8 @@ public <T> T get(PCollectionView<T> view, BoundedWindow window) {
runnerCapabilities.contains(
BeamUrns.getUrn(
RunnerApi.StandardRunnerProtocols.Enum
.MULTIMAP_KEYS_VALUES_SIDE_INPUT))));
.MULTIMAP_KEYS_VALUES_SIDE_INPUT)),
hasNoState));
default:
throw new IllegalStateException(
String.format(
Expand Down Expand Up @@ -1201,7 +1218,9 @@ private <T> BagUserState<T> createBagUserState(StateKey stateKey, Coder<T> value
beamFnStateClient,
processBundleInstructionId.get(),
stateKey,
valueCoder);
valueCoder,
hasNoState,
onlyBundleForKeys);
stateFinalizers.add(rval::asyncClose);
return rval;
}
Expand Down Expand Up @@ -1283,7 +1302,9 @@ private <KeyT, ValueT> MultimapUserState<KeyT, ValueT> createMultimapUserState(
processBundleInstructionId.get(),
stateKey,
keyCoder,
valueCoder);
valueCoder,
hasNoState,
onlyBundleForKeys);
stateFinalizers.add(rval::asyncClose);
return rval;
}
Expand Down Expand Up @@ -1318,7 +1339,9 @@ private <T> OrderedListUserState<T> createOrderedListUserState(
beamFnStateClient,
processBundleInstructionId.get(),
stateKey,
valueCoder);
valueCoder,
hasNoState,
onlyBundleForKeys);
stateFinalizers.add(rval::asyncClose);
return rval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public IterableSideInput(
BeamFnStateClient beamFnStateClient,
String instructionId,
StateKey stateKey,
Coder<T> valueCoder) {
Coder<T> valueCoder,
boolean hasNoState) {
checkArgument(
stateKey.hasIterableSideInput(),
"Expected IterableSideInput StateKey but received %s.",
Expand All @@ -50,7 +51,8 @@ public IterableSideInput(
cache,
beamFnStateClient,
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build(),
valueCoder);
valueCoder,
hasNoState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MultimapSideInput<K, V> implements MultimapView<K, V> {
private final Coder<V> valueCoder;
private volatile Function<ByteString, Iterable<V>> bulkReadResult;
private final boolean useBulkRead;
private final boolean hasNoState;

public MultimapSideInput(
Cache<?, ?> cache,
Expand All @@ -63,7 +64,8 @@ public MultimapSideInput(
StateKey stateKey,
Coder<K> keyCoder,
Coder<V> valueCoder,
boolean useBulkRead) {
boolean useBulkRead,
boolean hasNoState) {
checkArgument(
stateKey.hasMultimapKeysSideInput(),
"Expected MultimapKeysSideInput StateKey but received %s.",
Expand All @@ -75,12 +77,13 @@ public MultimapSideInput(
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
this.useBulkRead = useBulkRead;
this.hasNoState = hasNoState;
}

@Override
public Iterable<K> get() {
return StateFetchingIterators.readAllAndDecodeStartingFrom(
cache, beamFnStateClient, keysRequest, keyCoder);
cache, beamFnStateClient, keysRequest, keyCoder, hasNoState);
}

@Override
Expand Down Expand Up @@ -120,7 +123,8 @@ public Iterable<V> get(K k) {
Caches.noop(),
beamFnStateClient,
bulkReadRequest,
KvCoder.of(keyCoder, IterableCoder.of(valueCoder)))
KvCoder.of(keyCoder, IterableCoder.of(valueCoder)),
hasNoState)
.iterator();
while (bulkRead.size() < BULK_READ_SIZE && entries.hasNext()) {
KV<K, Iterable<V>> entry = entries.next();
Expand Down Expand Up @@ -169,7 +173,11 @@ public Iterable<V> get(K k) {

StateRequest request = keysRequest.toBuilder().setStateKey(stateKey).build();
return StateFetchingIterators.readAllAndDecodeStartingFrom(
Caches.subCache(cache, "ValuesForKey", encodedKey), beamFnStateClient, request, valueCoder);
Caches.subCache(cache, "ValuesForKey", encodedKey),
beamFnStateClient,
request,
valueCoder,
hasNoState);
}

private ByteString encodeKey(K k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class MultimapUserState<K, V> {

private boolean isClosed;
private boolean isCleared;
private final boolean hasNoState;
private final boolean onlyBundleForKeys;
// Pending updates to persistent storage
private HashMap<Object, K> pendingRemoves = Maps.newHashMap();
private HashMap<Object, KV<K, List<V>>> pendingAdds = Maps.newHashMap();
Expand All @@ -84,7 +86,9 @@ public MultimapUserState(
String instructionId,
StateKey stateKey,
Coder<K> mapKeyCoder,
Coder<V> valueCoder) {
Coder<V> valueCoder,
boolean hasNoState,
boolean onlyBundleForKeys) {
checkArgument(
stateKey.hasMultimapKeysUserState(),
"Expected MultimapKeysUserState StateKey but received %s.",
Expand All @@ -93,14 +97,16 @@ public MultimapUserState(
this.beamFnStateClient = beamFnStateClient;
this.mapKeyCoder = mapKeyCoder;
this.valueCoder = valueCoder;
this.hasNoState = hasNoState;
this.onlyBundleForKeys = onlyBundleForKeys;

// Note: These StateRequest protos are constructed even if we never try to read the
// corresponding state type. Consider constructing them lazily, as needed.
this.keysStateRequest =
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
this.persistedKeys =
StateFetchingIterators.readAllAndDecodeStartingFrom(
cache, beamFnStateClient, keysStateRequest, mapKeyCoder);
cache, beamFnStateClient, keysStateRequest, mapKeyCoder, hasNoState);

StateRequest.Builder userStateRequestBuilder = StateRequest.newBuilder();
userStateRequestBuilder
Expand Down Expand Up @@ -128,7 +134,8 @@ public MultimapUserState(
Caches.subCache(this.cache, "AllEntries"),
beamFnStateClient,
entriesStateRequest,
KvCoder.of(mapKeyCoder, IterableCoder.of(valueCoder)));
KvCoder.of(mapKeyCoder, IterableCoder.of(valueCoder)),
hasNoState);
}

public void clear() {
Expand Down Expand Up @@ -438,7 +445,7 @@ private void startStateApiWrites() {
}

// Persist pending key-values
if (!pendingAdds.isEmpty()) {
if (!pendingAdds.isEmpty() && !onlyBundleForKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The StateClearRequests sent when isCleared is true or pendingRemoves is not empty are not guarded by !onlyBundleForKeys. To be consistent with skipping appends, should these clear operations also be skipped when onlyBundleForKeys is true?

for (KV<K, List<V>> entry : pendingAdds.values()) {
StateRequest request = createUserStateRequest(entry.getKey());
beamFnStateClient.handle(
Expand Down Expand Up @@ -542,7 +549,8 @@ private CachingStateIterable<V> getPersistedValues(Object structuralKey, K key)
request.getStateKey().getMultimapUserState().getMapKey()),
beamFnStateClient,
request,
valueCoder));
valueCoder,
hasNoState));
})
.getValue();
}
Expand Down
Loading
Loading