Skip to content

Disable combiner lifting only for count triggers#37715

Open
reuvenlax wants to merge 4 commits intoapache:masterfrom
reuvenlax:triggered_combiner_lifting
Open

Disable combiner lifting only for count triggers#37715
reuvenlax wants to merge 4 commits intoapache:masterfrom
reuvenlax:triggered_combiner_lifting

Conversation

@reuvenlax
Copy link
Contributor

The runner currently only allows combiner lifting for DefaultTrigger. This is an overly-strict restriction - in reality combiner lifting is supported for everything except count triggers (since element-count information is lost when combining). This PR sharpens that restriction to be less conservative.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @reuvenlax, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refines the conditions under which combiner lifting is applied in the Dataflow runner. By introducing a dedicated method for triggers to declare their compatibility, the system can now more accurately enable this optimization for a broader range of triggers, while correctly disabling it for count-based triggers where it is not appropriate due to loss of element-count information. This change improves the flexibility and efficiency of pipeline execution without compromising correctness.

Highlights

  • Refined Combiner Lifting Logic: The restriction for combiner lifting has been sharpened. Previously, it was only allowed for DefaultTrigger, but now it is permitted for all triggers except those that lose element-count information, specifically count triggers.
  • New isCompatibleWithCombinerLifting Method: An abstract method isCompatibleWithCombinerLifting() was added to the Trigger class. This method is now implemented by various trigger subclasses to explicitly declare their compatibility with combiner lifting.
  • Updated Dataflow Pipeline Translator: The DataflowPipelineTranslator now uses the new isCompatibleWithCombinerLifting() method on the WindowingStrategy's trigger to determine if combiner lifting is allowed, replacing the previous check for DefaultTrigger.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
    • Removed import for DefaultTrigger.
    • Updated the logic for allowCombinerLifting to use the new isCompatibleWithCombinerLifting() method on the trigger instead of checking for DefaultTrigger instance.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
    • Implemented isCompatibleWithCombinerLifting() to return true if all sub-triggers are compatible.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
    • Implemented isCompatibleWithCombinerLifting() to return true if all sub-triggers are compatible.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
    • Implemented isCompatibleWithCombinerLifting() to return true if all sub-triggers are compatible.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
    • Implemented isCompatibleWithCombinerLifting() to return false, as combiner lifting is not supported for count triggers.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
    • Implemented isCompatibleWithCombinerLifting() to return true.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
    • Implemented isCompatibleWithCombinerLifting() to return true.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
    • Implemented isCompatibleWithCombinerLifting() for AfterWatermarkEarlyAndLate to check compatibility of both early and late triggers.
    • Implemented isCompatibleWithCombinerLifting() for FromEndOfWindow to return true.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
    • Implemented isCompatibleWithCombinerLifting() to return true.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
    • Implemented isCompatibleWithCombinerLifting() to return true.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
    • Implemented isCompatibleWithCombinerLifting() to return true if both actual and until sub-triggers are compatible.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
    • Implemented isCompatibleWithCombinerLifting() to return true if its sub-trigger is compatible.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java
    • Implemented isCompatibleWithCombinerLifting() to return false.
  • sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
    • Added abstract method isCompatibleWithCombinerLifting().
  • sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java
    • Implemented isCompatibleWithCombinerLifting() to return false.
  • sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java
    • Implemented isCompatibleWithCombinerLifting() for inner test triggers to return false.
Activity
  • No specific review comments or activity have been recorded for this pull request yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@reuvenlax
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the logic for enabling combiner lifting in streaming Dataflow jobs. It replaces a hardcoded check for DefaultTrigger with a more flexible approach by introducing a new method isCompatibleWithCombinerLifting() on the Trigger interface. This allows each trigger implementation to declare its compatibility. The changes are well-structured and correctly implement this for most trigger types. However, I've identified a potential NullPointerException in the implementation for AfterWatermark.AfterWatermarkEarlyAndLate that should be addressed.

Comment on lines 130 to 133
public boolean isCompatibleWithCombinerLifting() {
return earlyTrigger.isCompatibleWithCombinerLifting()
&& lateTrigger.isCompatibleWithCombinerLifting();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This implementation can cause a NullPointerException because lateTrigger can be null. A more robust and concise approach, consistent with other composite triggers, is to check all subTriggers.

    public boolean isCompatibleWithCombinerLifting() {
      return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting);
    }

@reuvenlax
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly relaxes the restrictions on combiner lifting for the Dataflow runner by introducing a new method, isCompatibleWithCombinerLifting(), into the Trigger hierarchy. This is a clean and effective solution. The implementations across the various trigger types are logical, properly identifying count-based triggers as incompatible. The change in DataflowPipelineTranslator to adopt this new method is also well-executed. I have a couple of minor suggestions to enhance consistency in the new method's implementation across different composite triggers.

Comment on lines +129 to +133
@Override
public boolean isCompatibleWithCombinerLifting() {
return earlyTrigger.isCompatibleWithCombinerLifting()
&& (lateTrigger == null || lateTrigger.isCompatibleWithCombinerLifting());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with other composite triggers like AfterAll and AfterFirst, consider using a stream over subTriggers to check for compatibility. This would make the implementation more uniform across different trigger types.

Suggested change
@Override
public boolean isCompatibleWithCombinerLifting() {
return earlyTrigger.isCompatibleWithCombinerLifting()
&& (lateTrigger == null || lateTrigger.isCompatibleWithCombinerLifting());
}
@Override
public boolean isCompatibleWithCombinerLifting() {
return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting);
}

Comment on lines +67 to +71
@Override
public boolean isCompatibleWithCombinerLifting() {
return subTriggers.get(ACTUAL).isCompatibleWithCombinerLifting()
&& subTriggers.get(UNTIL).isCompatibleWithCombinerLifting();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with other composite triggers, you could use a stream over subTriggers here as well. This would make the code more uniform and align with the pattern used in AfterAll, AfterFirst, and AfterEach.

Suggested change
@Override
public boolean isCompatibleWithCombinerLifting() {
return subTriggers.get(ACTUAL).isCompatibleWithCombinerLifting()
&& subTriggers.get(UNTIL).isCompatibleWithCombinerLifting();
}
@Override
public boolean isCompatibleWithCombinerLifting() {
return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting);
}

@kennknowles
Copy link
Member

Let's be sure to touch the files in .github/trigger_files/ for the dataflow validates runner tests. They should all already exist, so you can easily see which they are.

@kennknowles
Copy link
Member

Actually I realize that we have very few actually end-to-end integration tests for triggers - they are mostly tested in ReduceFnRunnerTest at a granularity that even TestStream doesn't really do. Still, a good idea to run a big suite.

// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
allowCombinerLifting &=
windowingStrategy.getTrigger().isCompatibleWithCombinerLifting();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method should be part of the DataflowRunner not the trigger. Whether the needed metadata is available is a property of how the runner executes the trigger.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it part of the Trigger does ensure that we don't accidentally miss a trigger (in case one gets added). I guess we could check this at runtime or we could create a TriggerVisitor (which will ensure no cases are missed at compile time)?

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants