Skip to content

Comments

Omit state commits in stateful Dofn#37691

Open
LEEKYE wants to merge 9 commits intoapache:masterfrom
LEEKYE:stateful_dofn
Open

Omit state commits in stateful Dofn#37691
LEEKYE wants to merge 9 commits intoapache:masterfrom
LEEKYE:stateful_dofn

Conversation

@LEEKYE
Copy link
Contributor

@LEEKYE LEEKYE commented Feb 23, 2026


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @LEEKYE, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Apache Beam Fn Harness by introducing mechanisms to optimize state management for stateful DoFns. It allows the runner to signal whether a bundle has no initial state or if it's the final bundle for specific keys, meaning no further state needs to be committed. By propagating these flags, the system can intelligently skip unnecessary state read and write operations, improving efficiency for certain stateful processing scenarios.

Highlights

  • State Optimization Flags: Introduced hasNoState and onlyBundleForKeys flags to optimize stateful DoFn processing within the Fn Harness.
  • Conditional State Reads: Implemented logic to skip StateGetRequest operations when hasNoState is true, preventing unnecessary state fetches.
  • Conditional State Writes: Added conditions to prevent StateAppendRequest operations when onlyBundleForKeys is true, avoiding state commits for completed key processing.
  • Flag Propagation: Propagated these state optimization flags through the ProcessBundleHandler and various state accessor and user state classes to ensure consistent behavior.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions github-actions bot added the java label Feb 23, 2026
@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 23, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an optimization to skip state get and append operations based on hints from the runner, has_no_state and only_bundle_for_keys. The changes correctly propagate these flags through various layers of the state management code. However, I've identified a few areas where these new flags are not fully utilized, potentially leading to missed optimizations. Specifically, MultimapUserState and OrderedListUserState do not seem to use onlyBundleForKeys to skip append operations, and CachingStateIterable doesn't propagate hasNoState to its underlying iterator, which would prevent skipping state fetches. Addressing these points will ensure the optimization is fully effective.

Comment on lines +166 to +167
boolean hasNoState,
boolean onlyBundleForKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The onlyBundleForKeys parameter is stored but appears to be unused in asyncClose to skip StateAppendRequests. This seems to be an omission, as BagUserState uses this flag to optimize writes. Please ensure this flag is used to skip appending state when it's true.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 23, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request correctly introduces optimizations to skip state requests when the runner indicates that no state exists or that the current bundle is the only one for the given keys. However, there is a critical bug in StateFetchingIterators.java where has_no_state can lead to a NullPointerException. Additionally, the only_bundle_for_keys optimization is missing in OrderedListUserState.java and can be made more efficient in BagUserState.java.

Comment on lines 688 to 695
LazyBlockingStateFetchingIterator(
BeamFnStateClient beamFnStateClient, StateRequest stateRequestForFirstChunk,
boolean hasNoState) {
this.beamFnStateClient = beamFnStateClient;
this.stateRequestForFirstChunk = stateRequestForFirstChunk;
this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
this.hasNoState = hasNoState;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

If hasNoState is true, the continuationToken should be initialized to null. This ensures that hasNext() returns false and prevents next() from being called. Currently, if hasNoState is true, continuationToken is still initialized from the request, hasNext() returns true, and next() eventually throws a NullPointerException because prefetchedResponse is never initialized in prefetch().

Suggested change
LazyBlockingStateFetchingIterator(
BeamFnStateClient beamFnStateClient, StateRequest stateRequestForFirstChunk,
boolean hasNoState) {
this.beamFnStateClient = beamFnStateClient;
this.stateRequestForFirstChunk = stateRequestForFirstChunk;
this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
this.hasNoState = hasNoState;
}
LazyBlockingStateFetchingIterator(
BeamFnStateClient beamFnStateClient, StateRequest stateRequestForFirstChunk,
boolean hasNoState) {
this.beamFnStateClient = beamFnStateClient;
this.stateRequestForFirstChunk = stateRequestForFirstChunk;
this.continuationToken =
hasNoState ? null : stateRequestForFirstChunk.getGet().getContinuationToken();
this.hasNoState = hasNoState;
}

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 23, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an optimization to omit state commits for stateful DoFns by propagating has_no_state and only_bundle_for_keys flags. The changes are well-propagated through the state management classes.

My main concern is a potential NullPointerException in LazyBlockingStateFetchingIterator when has_no_state is true, which I've detailed in a comment. I've also suggested a minor simplification that can be applied after fixing the bug.

Otherwise, the logic to skip state reads and writes based on these new flags seems correct across the different state implementations (BagUserState, MultimapUserState, OrderedListUserState).

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 24, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an optimization to omit state commits for stateful DoFns when the runner indicates it's not necessary, using two new flags: has_no_state and only_bundle_for_keys. The changes correctly propagate these flags from ProcessBundleHandler down to the various state management classes.

The implementation for skipping state fetches (has_no_state) and state appends (only_bundle_for_keys) looks good. However, I've found a few inconsistencies where state clears are not being skipped when only_bundle_for_keys is true. This could lead to unnecessary RPCs to the runner. I've left comments in BagUserState, MultimapUserState, and OrderedListUserState to address this. I also found a minor performance improvement opportunity in BagUserState to avoid unnecessary encoding.

Overall, this is a valuable optimization, and with the suggested fixes, it will be more robust and consistent.


// Persist pending key-values
if (!pendingAdds.isEmpty()) {
if (!pendingAdds.isEmpty() && !onlyBundleForKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The StateClearRequests sent when isCleared is true or pendingRemoves is not empty are not guarded by !onlyBundleForKeys. To be consistent with skipping appends, should these clear operations also be skipped when onlyBundleForKeys is true?

Comment on lines +312 to +315
if (onlyBundleForKeys) {
pendingAdds.clear();
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to my other comments, the logic for pendingRemoves which results in StateClearRequests is not guarded by onlyBundleForKeys. To be consistent, we should probably skip sending those requests as well when onlyBundleForKeys is true. The current implementation only skips pendingAdds.

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 24, 2026

R: @apanich
R: @ryan-mbuashundip
R: @acrites

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 24, 2026

assign set of reviewers

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @Abacn for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@LEEKYE
Copy link
Contributor Author

LEEKYE commented Feb 24, 2026

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an optimization to omit state get/append operations in stateful DoFns by propagating has_no_state and only_bundle_for_keys flags. The changes are well-contained and consistently applied across various state management classes. The logic to skip state operations based on these flags appears correct. As noted in the description, adding comprehensive unit tests will be essential to validate these optimizations and prevent any potential regressions.

…te/StateFetchingIterators.java

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant