From 339c9b99289d2a504e6bffcf163d99c867a13ed9 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 25 Feb 2026 19:06:37 -0800 Subject: [PATCH 1/4] disable combiner lifting only for count triggers --- .../runners/dataflow/DataflowPipelineTranslator.java | 5 ++--- .../beam/sdk/transforms/windowing/AfterAll.java | 5 +++++ .../beam/sdk/transforms/windowing/AfterEach.java | 5 +++++ .../beam/sdk/transforms/windowing/AfterFirst.java | 5 +++++ .../beam/sdk/transforms/windowing/AfterPane.java | 6 ++++++ .../sdk/transforms/windowing/AfterProcessingTime.java | 5 +++++ .../windowing/AfterSynchronizedProcessingTime.java | 5 +++++ .../beam/sdk/transforms/windowing/AfterWatermark.java | 11 +++++++++++ .../beam/sdk/transforms/windowing/DefaultTrigger.java | 5 +++++ .../apache/beam/sdk/transforms/windowing/Never.java | 6 ++++++ .../sdk/transforms/windowing/OrFinallyTrigger.java | 6 ++++++ .../beam/sdk/transforms/windowing/Repeatedly.java | 5 +++++ .../sdk/transforms/windowing/ReshuffleTrigger.java | 5 +++++ .../apache/beam/sdk/transforms/windowing/Trigger.java | 2 ++ .../beam/sdk/transforms/windowing/StubTrigger.java | 5 +++++ .../beam/sdk/transforms/windowing/TriggerTest.java | 10 ++++++++++ 16 files changed, 88 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 08d84705c5c7..4e4572318787 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -88,7 +88,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.resourcehints.ResourceHint; import org.apache.beam.sdk.transforms.resourcehints.ResourceHints; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.CoderUtils; @@ -970,8 +969,8 @@ private void groupByKeyHelper( && windowingStrategy.getWindowFn().assignsToOneWindow(); if (isStreaming) { allowCombinerLifting &= transform.fewKeys(); - // TODO: Allow combiner lifting on the non-default trigger, as appropriate. - allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger); + allowCombinerLifting &= + windowingStrategy.getTrigger().isCompatibleWithCombinerLifting(); } stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting); stepContext.addInput( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index b5838b775211..fd77c6411cc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return deadline; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting); + } + @Override protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return new AfterAll(continuationTriggers); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 654c7a466478..8310df16a429 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -75,6 +75,11 @@ protected Trigger getContinuationTrigger(List continuationTriggers) { return Repeatedly.forever(new AfterFirst(continuationTriggers)); } + @Override + public boolean isCompatibleWithCombinerLifting() { + return subTriggers.stream().allMatch(trigger -> trigger.isCompatibleWithCombinerLifting()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder("AfterEach.inOrder("); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index b8118848c96e..0e8b2b9bc285 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -58,6 +58,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return deadline; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting); + } + @Override protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return new AfterFirst(continuationTriggers); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index 3544a38b69a0..f93c422ad668 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -59,6 +59,12 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } + @Override + public boolean isCompatibleWithCombinerLifting() { + // Combiner lifting not supported for count triggers. + return false; + } + @Override protected OnceTrigger getContinuationTrigger(List continuationTriggers) { return AfterPane.elementCountAtLeast(1); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index c9612c069207..342f0a0be0e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -107,6 +107,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return true; + } + @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return AfterSynchronizedProcessingTime.ofFirstElement(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index b3f25c6b14c3..efbf1344c18f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -49,6 +49,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return true; + } + @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return this; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 295b277267ce..c4b8a6480044 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -126,6 +126,12 @@ public boolean mayFinish() { return lateTrigger == null; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return earlyTrigger.isCompatibleWithCombinerLifting() + && lateTrigger.isCompatibleWithCombinerLifting(); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(TO_STRING); @@ -177,6 +183,11 @@ protected FromEndOfWindow getContinuationTrigger(List continuationTrigg return this; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return true; + } + @Override public String toString() { return TO_STRING; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index 7060936c2fb9..49cb2bc7ec0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -54,6 +54,11 @@ public boolean isCompatible(Trigger other) { return other instanceof DefaultTrigger; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return true; + } + @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return this; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 5f810430cdf6..eaba6a5e5f68 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -56,5 +56,11 @@ protected Trigger getContinuationTrigger(List continuationTriggers) { public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return BoundedWindow.TIMESTAMP_MAX_VALUE; } + + @Override + public boolean isCompatibleWithCombinerLifting() { + // Should we allow this here? + return true; + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 5a16c818da74..919042c3b363 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -64,6 +64,12 @@ public boolean mayFinish() { return subTriggers.get(ACTUAL).mayFinish() || subTriggers.get(UNTIL).mayFinish(); } + @Override + public boolean isCompatibleWithCombinerLifting() { + return subTriggers.get(ACTUAL).isCompatibleWithCombinerLifting() + && subTriggers.get(UNTIL).isCompatibleWithCombinerLifting(); + } + @Override protected Trigger getContinuationTrigger(List continuationTriggers) { // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 9c54d75da3e0..0d66d90851d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -73,6 +73,11 @@ public boolean mayFinish() { return false; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return subTriggers.get(REPEATED).isCompatibleWithCombinerLifting(); + } + @Override protected Trigger getContinuationTrigger(List continuationTriggers) { return new Repeatedly(continuationTriggers.get(REPEATED)); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java index bf84466ccdbc..fdd9f62f4c8a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/ReshuffleTrigger.java @@ -53,6 +53,11 @@ public boolean mayFinish() { return false; } + @Override + public boolean isCompatibleWithCombinerLifting() { + return false; + } + @Override public String toString() { return "ReshuffleTrigger()"; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 16daa453021a..88743bbb2014 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -109,6 +109,8 @@ public Trigger getContinuationTrigger() { return getContinuationTrigger(subTriggerContinuations); } + public abstract boolean isCompatibleWithCombinerLifting(); + /** * Subclasses should override this to return the {@link #getContinuationTrigger} of this {@link * Trigger}. For convenience, this is provided the continuation trigger of each of the diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java index bdcb82a38631..f925150748ed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java @@ -48,4 +48,9 @@ protected Trigger getContinuationTrigger(List continuationTriggers) { public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { return null; } + + @Override + public boolean isCompatibleWithCombinerLifting() { + return false; + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java index 335d9670625c..ac343df07997 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java @@ -73,6 +73,11 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { public boolean mayFinish() { return false; } + + @Override + public boolean isCompatibleWithCombinerLifting() { + return false; + } } private static class Trigger2 extends Trigger { @@ -95,5 +100,10 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { public boolean mayFinish() { return false; } + + @Override + public boolean isCompatibleWithCombinerLifting() { + return false; + } } } From a234fe7ed264f533cbb20f55c10d6a1e94b3cf78 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 25 Feb 2026 19:09:45 -0800 Subject: [PATCH 2/4] add comments --- .../java/org/apache/beam/sdk/transforms/windowing/Never.java | 1 - .../java/org/apache/beam/sdk/transforms/windowing/Trigger.java | 3 +++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index eaba6a5e5f68..57bee1752a57 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -59,7 +59,6 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { @Override public boolean isCompatibleWithCombinerLifting() { - // Should we allow this here? return true; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 88743bbb2014..363b1adb0919 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -109,6 +109,9 @@ public Trigger getContinuationTrigger() { return getContinuationTrigger(subTriggerContinuations); } + /** + * Returns whether this trigger is compatible with combiner lifting. + */ public abstract boolean isCompatibleWithCombinerLifting(); /** From 451f00e84c4d8d9042b5df04bcce9cb56fb489f9 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 25 Feb 2026 19:12:10 -0800 Subject: [PATCH 3/4] simplify code --- .../org/apache/beam/sdk/transforms/windowing/AfterEach.java | 4 ++-- .../org/apache/beam/sdk/transforms/windowing/Trigger.java | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 8310df16a429..586f38e30d0e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -67,7 +67,7 @@ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { @Override public boolean mayFinish() { - return subTriggers.stream().allMatch(trigger -> trigger.mayFinish()); + return subTriggers.stream().allMatch(Trigger::mayFinish); } @Override @@ -77,7 +77,7 @@ protected Trigger getContinuationTrigger(List continuationTriggers) { @Override public boolean isCompatibleWithCombinerLifting() { - return subTriggers.stream().allMatch(trigger -> trigger.isCompatibleWithCombinerLifting()); + return subTriggers.stream().allMatch(Trigger::isCompatibleWithCombinerLifting); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index 363b1adb0919..6dd75b25569d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -109,9 +109,7 @@ public Trigger getContinuationTrigger() { return getContinuationTrigger(subTriggerContinuations); } - /** - * Returns whether this trigger is compatible with combiner lifting. - */ + /** Returns whether this trigger is compatible with combiner lifting. */ public abstract boolean isCompatibleWithCombinerLifting(); /** From 8c5eff41987c01c5e230016e363ca1ab603a23c0 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 25 Feb 2026 19:28:15 -0800 Subject: [PATCH 4/4] fix nullpointerexception --- .../apache/beam/sdk/transforms/windowing/AfterWatermark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index c4b8a6480044..9a14a910eb60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -129,7 +129,7 @@ public boolean mayFinish() { @Override public boolean isCompatibleWithCombinerLifting() { return earlyTrigger.isCompatibleWithCombinerLifting() - && lateTrigger.isCompatibleWithCombinerLifting(); + && (lateTrigger == null || lateTrigger.isCompatibleWithCombinerLifting()); } @Override