Skip to content

Adds bundleFinalizer support to Dataflow non-portable worker.#37723

Open
acrites wants to merge 2 commits intoapache:masterfrom
acrites:bundle-finalizer
Open

Adds bundleFinalizer support to Dataflow non-portable worker.#37723
acrites wants to merge 2 commits intoapache:masterfrom
acrites:bundle-finalizer

Conversation

@acrites
Copy link
Contributor

@acrites acrites commented Feb 26, 2026

This PR modifies the existing StreamingCommitFinalizer cache used for source checkpoint callbacks, but adds the ability to specify a callback expiration. This was done by mimic-ing the code in the portable runner's sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java.

We needed to expose more already-existing fields in the Windmill proto for bundle finalization ids. Windmill is already reading from and populating these fields for the portable runner.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Dataflow non-portable worker by introducing robust support for bundle finalization. The changes enable the worker to register and manage callbacks that execute after a bundle's state has been successfully committed, complete with an expiration mechanism. This brings the non-portable runner closer in functionality to its portable counterpart, improving consistency and reliability in how bundle-level finalization tasks are handled in streaming pipelines.

Highlights

  • Bundle Finalizer Support: Introduced BundleFinalizer support within the SimpleDoFnRunner for StartBundle and FinishBundle contexts, allowing callbacks to be registered for execution upon bundle commit.
  • Enhanced StreamingCommitFinalizer: Refactored StreamingCommitFinalizer to manage bundle finalization callbacks with an expiration mechanism, mimicking the portable runner's FinalizeBundleHandler. This new implementation uses a PriorityQueue and ReentrantLock for efficient, thread-safe management and cleanup of expired callbacks.
  • Windmill Proto Updates: Extended the Windmill protocol buffers to include applied_finalize_ids in WorkItem and StreamingGetWorkResponseChunk messages, and finalize_ids in WorkItemCommitRequest, enabling the Dataflow worker to communicate bundle finalization IDs with the Windmill service.
  • Dataflow Worker Integration: Integrated the new bundle finalization logic into StreamingModeExecutionContext to collect and manage BundleFinalizer callbacks, and updated StreamingWorkScheduler to process these applied_finalize_ids received from Windmill.
  • Comprehensive Testing: Added new unit tests for SimpleParDoFn to verify BundleFinalizer functionality and introduced a dedicated test suite for the refactored StreamingCommitFinalizer to ensure correct behavior, including expiration and duplicate ID handling.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
    • Added bundleFinalizer() method to StartBundleContext and FinishBundleContext.
    • Updated UnsupportedOperationException messages for bundleFinalizer() in OnTimer and OnWindowExpiration contexts for clarity.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
    • Added necessary imports for ArrayList, BundleFinalizer, and Pair.
    • Modified flushState() to return Map<Long, Pair<Instant, Runnable>> and incorporate BundleFinalizer callbacks with expiration.
    • Updated flushState() to apply an expiration to checkpointMark.finalizeCheckpoint() callbacks.
    • Implemented bundleFinalizer() in DataflowStepContext.
    • Introduced WindmillBundleFinalizer as an inner class to manage BundleFinalizer.Callback instances with associated expiration times.
    • Added getBundleFinalizerCallbacks() and clearBundleFinalizerCallbacks() methods to StepContext.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
    • Added a new field appliedFinalizeIds to track bundle finalization IDs.
    • Updated the append method to collect appliedFinalizeIds from incoming chunks.
    • Corrected a minor typo in a Javadoc comment.
    • Modified the flushToWorkItem and create methods to pass and process appliedFinalizeIds when assembling WorkItem objects, and ensured appliedFinalizeIds are cleared after use.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
    • Completely refactored the class to use ReentrantLock, Condition, HashMap, and PriorityQueue for managing callbacks with expiration.
    • Removed the Guava Cache dependency and its associated default expiry mechanism.
    • Introduced an AutoValue class FinalizationInfo to encapsulate callback details and expiration.
    • Implemented a cleanupThreadBody() method for background processing and removal of expired callbacks.
    • Updated cacheCommitFinalizers to store callbacks with their expiration times and add them to the priority queue.
    • Modified finalizeCommits to safely remove callbacks from both the map and priority queue.
    • Added cleanupQueueSize() for testing purposes.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
    • Added a call to commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList()) to process bundle finalization IDs.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
    • Added imports for AtomicInteger, DataflowStepContext, VarLongCoder, BundleFinalizer, Duration, Instant, and Mockito.
    • Introduced testBundleFinalizer() to validate the functionality of BundleFinalizer within SimpleParDoFn.
    • Added helper methods increaseBundleSuccessCount() and getBundleSuccessCount() for test assertions.
    • Defined WithBundleFinalizerDoFn as a test DoFn to demonstrate BundleFinalizer usage in StartBundle, ProcessElement, and FinishBundle.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
    • Added a new test file StreamingCommitFinalizerTest.java to provide comprehensive unit tests for the refactored StreamingCommitFinalizer.
    • Included tests for initialization, caching callbacks, handling duplicate IDs, successful finalization, ignoring unknown IDs, and verifying cleanup of expired callbacks.
  • runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
    • Added repeated int64 applied_finalize_ids = 16; to the WorkItem message.
    • Added repeated int64 finalize_ids = 19 [packed = true]; to the WorkItemCommitRequest message, along with a description.
    • Added repeated int64 applied_finalize_ids = 6 [packed = true]; to the StreamingGetWorkResponseChunk message, along with a description.
    • Updated reserved field ranges in WorkItem, WorkItemCommitRequest, and StreamingGetWorkResponseChunk to accommodate new fields.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

…running on Dataflow streaming non-portable worker.
@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant