diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index dce4fcaa68bd..b07b7eca7d8e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -46,7 +47,8 @@ public interface DoFnRunner void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { - doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + TimeDomain timeDomain, + CausedByDrain causedByDrain) { + doFnRunner.onTimer( + timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 6f9f15b13589..5d71112f0d46 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.OutputBuilderSuppliers; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -396,6 +397,11 @@ public PaneInfo pane() { return element.getRecordOffset(); } + @Override + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; + } + @Override public PipelineOptions getPipelineOptions() { return pipelineOptions; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 0fd63556b9c7..05186ba5adb7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; @@ -200,11 +201,13 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { Preconditions.checkNotNull(outputTimestamp, "outputTimestamp"); OnTimerArgumentProvider argumentProvider = - new OnTimerArgumentProvider<>(timerId, key, window, timestamp, outputTimestamp, timeDomain); + new OnTimerArgumentProvider<>( + timerId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain); invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider); } @@ -399,6 +402,11 @@ public InputT element() { return elem.getValue(); } + @Override + public CausedByDrain causedByDrain() { + return elem.causedByDrain(); + } + @Override public T sideInput(PCollectionView view) { checkNotNull(view, "View passed to sideInput cannot be null"); @@ -702,6 +710,7 @@ private class OnTimerArgumentProvider extends DoFn.OnTime private final TimeDomain timeDomain; private final String timerId; private final KeyT key; + private final CausedByDrain causedByDrain; private final OutputBuilderSupplier builderSupplier; /** Lazily initialized; should only be accessed via {@link #getNamespace()}. */ @@ -727,7 +736,8 @@ private OnTimerArgumentProvider( BoundedWindow window, Instant fireTimestamp, Instant timestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { fn.super(); this.timerId = timerId; this.window = window; @@ -735,13 +745,15 @@ private OnTimerArgumentProvider( this.timestamp = timestamp; this.timeDomain = timeDomain; this.key = key; + this.causedByDrain = causedByDrain; this.builderSupplier = OutputBuilderSuppliers.supplierForElement( WindowedValues.builder() .setValue(null) .setTimestamp(timestamp) .setWindow(window) - .setPaneInfo(PaneInfo.NO_FIRING)); + .setPaneInfo(PaneInfo.NO_FIRING) + .setCausedByDrain(causedByDrain)); } @Override @@ -749,6 +761,11 @@ public Instant timestamp() { return timestamp; } + @Override + public CausedByDrain causedByDrain() { + return causedByDrain; + } + @Override public Instant fireTimestamp() { return fireTimestamp; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java index 2be8071f9835..70cb58e66f3d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -116,7 +117,15 @@ public void onTimer( Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { - underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + underlying.onTimer( + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + CausedByDrain.NORMAL); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java index 52bbb3a306a0..e562a4067d22 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -48,7 +49,8 @@ /** * A customized {@link DoFnRunner} that handles late data dropping and garbage collection for * stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)} - * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain)} + * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, Instant, TimeDomain, + * boolean)} * * @param the type of the {@link DoFn} (main) input elements * @param the type of the {@link DoFn} (main) output elements @@ -208,7 +210,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { if (timerId.equals(SORT_FLUSH_TIMER)) { onSortFlushTimer(window, stepContext.timerInternals().currentInputWatermarkTime()); @@ -232,7 +235,14 @@ public void onTimer( stepContext.timerInternals().currentInputWatermarkTime()); } else { doFnRunner.onTimer( - timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain); } } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 92385b1c8220..f8964c458f76 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -141,7 +141,8 @@ public void testOnTimerExceptionsWrappedAsUserCodeException() { GlobalWindow.INSTANCE, new Instant(0), new Instant(0), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL); } /** @@ -266,7 +267,8 @@ public void testOnTimerCalled() { GlobalWindow.INSTANCE, currentTime.plus(offset), currentTime.plus(offset), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL); assertThat( fn.onTimerInvocations, @@ -277,7 +279,8 @@ public void testOnTimerCalled() { StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE), currentTime.plus(offset), currentTime.plus(offset), - TimeDomain.EVENT_TIME))); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL))); } /** @@ -593,7 +596,8 @@ public void testOnTimerAllowedSkew() { GlobalWindow.INSTANCE, new Instant(0), new Instant(0), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL); } @Test @@ -625,7 +629,8 @@ public void testOnTimerNoSkew() { GlobalWindow.INSTANCE, new Instant(0), new Instant(0), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL); }); assertThat(exception.getCause(), isA(IllegalArgumentException.class)); @@ -703,7 +708,7 @@ public void onTimer(OnTimerContext context) { context.fireTimestamp(), context.timestamp(), context.timeDomain(), - CausedByDrain.NORMAL)); + context.causedByDrain())); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index e32065f6ef56..f207dd3bc488 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -353,7 +353,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { firedTimers.add( TimerData.of( timerId, diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java index 4ed4f09fe62b..df3aceff4a92 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -468,7 +468,8 @@ private static void advanceInputWatermark( window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } } diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 2a208d30a87e..7e0b0be28d43 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -250,7 +250,8 @@ private void fireTimer( window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index f1ec36564a4b..56e077253ae6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -73,10 +74,19 @@ public void onTimer( final BoundedWindow window, final Instant timestamp, final Instant outputTimestamp, - final TimeDomain timeDomain) { + final TimeDomain timeDomain, + CausedByDrain causedByDrain) { try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { - delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + delegate.onTimer( + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 3aa5a0802b38..12c59569f435 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -250,7 +250,8 @@ private void fireTimer( window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java index d5a7ff035efc..2134fa869b47 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java @@ -101,6 +101,7 @@ import org.apache.beam.sdk.util.construction.Timer; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.util.construction.graph.UserStateReference; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -1001,7 +1002,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { Object timerKey = keyForTimer.get(); Preconditions.checkNotNull(timerKey, "Key for timer needs to be set before calling onTimer"); Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle"); @@ -1034,7 +1036,8 @@ public void onTimer( timestamp, outputTimestamp, // TODO: Support propagating the PaneInfo through. - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + causedByDrain); try { timerReceiver.accept(timerValue); } catch (Exception e) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java index 0c2ba87c4baa..d0a1b12e01e2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -104,7 +105,14 @@ static final class Timer implements BufferedElement { @Override public void processWith(DoFnRunner doFnRunner) { doFnRunner.onTimer( - timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + CausedByDrain.NORMAL); } @Override diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java index 10a3182f90f4..73b20238ef05 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -234,7 +235,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { minBufferedElementTimestamp = Math.min(outputTimestamp.getMillis(), minBufferedElementTimestamp); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java index c2556d7229ba..5c4975ffab01 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java @@ -81,6 +81,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -507,7 +508,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { if ("cleanup".equals(timerId)) { holdState.clear(); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java index 982871e59f14..01c9d25f1bf1 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java @@ -98,6 +98,7 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.NoopLock; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; @@ -534,14 +535,16 @@ public void testWatermarkHandling() throws Exception { windowedValue.getWindows(), timestamp, timestamp, - PaneInfo.NO_FIRING), + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL), TimerInternals.TimerData.of( "", TimerReceiverFactory.encodeToTimerDataTimerId("transform", timerId), StateNamespaces.window(IntervalWindow.getCoder(), intervalWindow), timestamp, timestamp, - TimeDomain.EVENT_TIME)); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL)); timerConsumer.accept("timer", timerTarget); timerConsumer.accept("timer2", timerTarget2); @@ -867,7 +870,8 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing stateNamespace, window.maxTimestamp(), window.maxTimestamp(), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + CausedByDrain.NORMAL); operator.setTimer( Timer.of( windowedValue.getValue().getKey(), @@ -875,7 +879,8 @@ private void testEnsureDeferredStateCleanupTimerFiring(boolean withCheckpointing windowedValue.getWindows(), window.maxTimestamp(), window.maxTimestamp(), - PaneInfo.NO_FIRING), + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL), userTimer2); assertThat(testHarness.numEventTimeTimers(), is(1)); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java index 741716b82848..ec1fcd6c8432 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -117,7 +118,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { throw new UnsupportedOperationException("Unsupported for ProcessFn"); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java index 1909a73dc8b9..37e04316e237 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValueReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -81,7 +82,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { throw new UnsupportedOperationException( String.format("Timers are not supported by %s", GroupAlsoByWindowFn.class.getSimpleName())); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java index 00c8192b1e40..434d46c20a5b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java @@ -374,7 +374,8 @@ private void processUserTimer(TimerData timer) throws Exception { window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java index e5d8a18be76f..0b9ccd1f37c6 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -142,7 +143,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { throw new UnsupportedOperationException( "Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow."); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java index b5b723adb2b5..3b7891c5378d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -84,7 +85,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { throw new UnsupportedOperationException( "Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow."); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java index d99a733a4eb4..952431411b84 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java @@ -94,7 +94,8 @@ public FnDataReceiver> create(String transformId, String timerFamil namespace, timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getFireTimestamp(), timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : timer.getHoldTimestamp(), - timerSpec.getTimerSpec().getTimeDomain()); + timerSpec.getTimerSpec().getTimeDomain(), + timer.causedByDrain()); timerDataConsumer.accept(timer, timerData); } }; diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java index 5b7880ad56ee..d7d4c8cac449 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java @@ -156,7 +156,8 @@ private static void fireTimer( Collections.singletonList(window), timestamp, outputTimestamp, - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + timer.causedByDrain()); KV transformAndTimerId = TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId()); FnDataReceiver fnTimerReceiver = timerReceivers.get(transformAndTimerId); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 556cc7993ae1..996c206a7e97 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -137,6 +137,7 @@ import org.apache.beam.sdk.util.construction.graph.PipelineNode.PTransformNode; import org.apache.beam.sdk.util.construction.graph.ProtoOverrides; import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -2254,6 +2255,7 @@ private org.apache.beam.sdk.util.construction.Timer timerForTest( Collections.singletonList(GlobalWindow.INSTANCE), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(fireTimestamp)), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(holdTimestamp)), - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL); } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java index eccf1e66434e..f9575994c464 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java @@ -84,6 +84,7 @@ import org.apache.beam.sdk.util.construction.CoderTranslator; import org.apache.beam.sdk.util.construction.ModelCoderRegistrar; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowedValues; @@ -326,7 +327,8 @@ private static Object convertValue(Object value, CommonCoder coderSpec, Coder co windows, new Instant(((Number) kvMap.get("fireTimestamp")).longValue()), new Instant(((Number) kvMap.get("holdTimestamp")).longValue()), - paneInfo); + paneInfo, + CausedByDrain.NORMAL); } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) { Map kvMap = (Map) value; Instant end = new Instant(((Number) kvMap.get("end")).longValue()); diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java index f696873ec5c0..09c94dac1190 100644 --- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java +++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -105,7 +106,8 @@ private static void fireTimer( window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + CausedByDrain.NORMAL); } @Override diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java index b815649a7659..7bec91abb34d 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -61,11 +62,19 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { withMetrics( () -> underlying.onTimer( - timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain), + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain), false); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java index d07a9bda78c6..2485ac2d5522 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.checkerframework.checker.nullness.qual.Nullable; @@ -154,8 +155,10 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { - underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + TimeDomain timeDomain, + CausedByDrain causedByDrain) { + underlying.onTimer( + timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain, causedByDrain); } @Override diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java index 2f27e31d05dd..bc87e2460ec4 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java @@ -437,7 +437,8 @@ private void fireTimer(KeyedTimerData keyedTimerData) { window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } // todo: should this go through bundle manager to start and finish the bundle? diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java index 34e3405660c2..84cf5b26c505 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java @@ -22,6 +22,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -66,14 +67,22 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { // Note: wrap with KV.of(key, null) as a special use case of setKeyedInternals() to set key // directly. setKeyedInternals(KV.of(key, null)); try { underlying.onTimer( - timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain); } finally { clearKeyedInternals(); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java index 743a42d14791..d867a3c85764 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -425,7 +426,8 @@ private void fireTimer(KeyedTimerData keyedTimerData) { window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + CausedByDrain.NORMAL); } // todo: should this go through bundle manager to start and finish the bundle? diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java index 7129ba9145eb..a2ec88a43415 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java @@ -61,6 +61,7 @@ import org.apache.beam.sdk.util.construction.Timer; import org.apache.beam.sdk.util.construction.graph.ExecutableStage; import org.apache.beam.sdk.util.construction.graph.PipelineNode; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -430,7 +431,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { final KV timerReceiverKey = TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId); final FnDataReceiver timerReceiver = @@ -443,7 +445,8 @@ public void onTimer( timestamp, outputTimestamp, // TODO: Support propagating the PaneInfo through. - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + causedByDrain); try { timerReceiver.accept(timerValue); } catch (Exception e) { diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java index 350f7daa56cd..15ec818dba74 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.util.WindowedValueMultiReceiver; import org.apache.beam.sdk.util.WindowedValueReceiver; import org.apache.beam.sdk.util.construction.ParDoTranslation; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -266,7 +267,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) { + TimeDomain timeDomain, + CausedByDrain causedByDrain) { throw new UnsupportedOperationException(); } diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java index db4ee5be5786..28dbf44cb8fe 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -77,9 +78,18 @@ public void onTimer( final BoundedWindow window, final Instant timestamp, final Instant outputTimestamp, - final TimeDomain timeDomain) { + final TimeDomain timeDomain, + CausedByDrain causedByDrain) { try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metrics)) { - delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + delegate.onTimer( + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java index 9d73a605b3b5..4ac56c2550b5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java @@ -75,7 +75,8 @@ public void fireTimer(TimerInternals.TimerData timer) { window, timer.getTimestamp(), timer.getOutputTimestamp(), - timer.getDomain()); + timer.getDomain(), + timer.causedByDrain()); } finally { if (this.ctx.getTimerDataIterator() instanceof ParDoStateUpdateFn.SparkTimerInternalsIterator) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java index a6b1f65571dc..c8cd7eb5f262 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -76,9 +77,18 @@ public void onTimer( final BoundedWindow window, final Instant timestamp, final Instant outputTimestamp, - final TimeDomain timeDomain) { + final TimeDomain timeDomain, + CausedByDrain causedByDrain) { try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) { - delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, outputTimestamp, timeDomain); + delegate.onTimer( + timerId, + timerFamilyId, + key, + window, + timestamp, + outputTimestamp, + timeDomain, + causedByDrain); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java index b1e0e3a39808..cfda84c1981f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.joda.time.Instant; @@ -56,6 +57,7 @@ public class AbstractInOutIteratorTest { private static final Instant TEST_TIMESTAMP = new Instant(42L); private static final Instant TEST_OUTPUT_TIMESTAMP = new Instant(84L); private static final TimeDomain TEST_TIME_DOMAIN = TimeDomain.EVENT_TIME; + private static final CausedByDrain TEST_CAUSED_BY_DRAIN = CausedByDrain.CAUSED_BY_DRAIN; private StateNamespace testNamespace; @@ -88,6 +90,7 @@ public void setUp() { when(mockTimer.getTimestamp()).thenReturn(TEST_TIMESTAMP); when(mockTimer.getOutputTimestamp()).thenReturn(TEST_OUTPUT_TIMESTAMP); when(mockTimer.getDomain()).thenReturn(TEST_TIME_DOMAIN); + when(mockTimer.causedByDrain()).thenReturn(TEST_CAUSED_BY_DRAIN); } @Test @@ -107,7 +110,8 @@ public void testFireTimer() { mockWindow, TEST_TIMESTAMP, TEST_OUTPUT_TIMESTAMP, - TEST_TIME_DOMAIN); + TEST_TIME_DOMAIN, + CausedByDrain.CAUSED_BY_DRAIN); // Verify that timer data iterator deletion was not called (no timer iterator was set in this // test) @@ -133,7 +137,8 @@ public void testFireTimerWithTimerDataIterator() { mockWindow, TEST_TIMESTAMP, TEST_OUTPUT_TIMESTAMP, - TEST_TIME_DOMAIN); + TEST_TIME_DOMAIN, + CausedByDrain.CAUSED_BY_DRAIN); // Verify that the timer data iterator's deleteTimer method was called verify(mockTimerDataIterator).deleteTimer(mockTimer); diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java index 2ff06b59f8eb..2dc428d5a6b2 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -252,7 +253,8 @@ public void onTimer( BoundedWindow window, Instant timestamp, Instant outputTimestamp, - TimeDomain timeDomain) {} + TimeDomain timeDomain, + CausedByDrain causedByDrain) {} @Override public void finishBundle() {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 125408108c07..c1b4f2ec7d82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -330,6 +331,9 @@ public abstract class ProcessContext extends WindowedContext { @Pure public abstract @Nullable Long currentRecordOffset(); + + @Pure + public abstract CausedByDrain causedByDrain(); } /** Information accessible when running a {@link DoFn.OnTimer} method. */ @@ -346,6 +350,9 @@ public abstract class OnTimerContext extends WindowedContext { /** Returns the time domain of the current timer. */ public abstract TimeDomain timeDomain(); + + @Pure + public abstract CausedByDrain causedByDrain(); } public abstract class OnWindowExpirationContext extends WindowedContext { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 3bdeb57ed888..1558bb74e0d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.OutputBuilderSuppliers; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; @@ -589,6 +590,11 @@ public Long currentRecordOffset() { return element.getCurrentRecordOffset(); } + @Override + public CausedByDrain causedByDrain() { + return CausedByDrain.NORMAL; + } + @Override public PipelineOptions getPipelineOptions() { return options; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java index 9f5322fb5116..a22d3378cfdf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.OutputBuilderSupplier; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollection; @@ -513,6 +514,11 @@ public Object key() { throw new UnsupportedOperationException(); } + @Override + public CausedByDrain causedByDrain() { + return outerContext.causedByDrain(); + } + @Override public Object sideInput(String tagId) { PCollectionView view = sideInputMapping.get(tagId); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java index d443f008b7d2..6e5bb3303c39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; +import org.apache.beam.sdk.values.CausedByDrain; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; @@ -65,9 +66,17 @@ public static Timer of( Collection windows, Instant fireTimestamp, Instant holdTimestamp, - PaneInfo paneInfo) { + PaneInfo paneInfo, + CausedByDrain causedByDrain) { return new AutoValue_Timer( - userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, paneInfo); + userKey, + dynamicTimerTag, + windows, + false, + fireTimestamp, + holdTimestamp, + paneInfo, + causedByDrain); } /** @@ -76,7 +85,8 @@ public static Timer of( */ public static Timer cleared( K userKey, String dynamicTimerTag, Collection windows) { - return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null); + return new AutoValue_Timer( + userKey, dynamicTimerTag, windows, true, null, null, null, CausedByDrain.NORMAL); } /** Returns the key that the timer is set on. */ @@ -116,6 +126,8 @@ public static Timer cleared( */ public abstract @Nullable PaneInfo getPaneInfo(); + public abstract @Nullable CausedByDrain causedByDrain(); + @Override public final boolean equals(@Nullable Object other) { if (!(other instanceof Timer)) { @@ -186,6 +198,7 @@ public void encode(Timer timer, OutputStream outStream) throws IOException { InstantCoder.of().encode(timer.getFireTimestamp(), outStream); InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); PaneInfoCoder.INSTANCE.encode(timer.getPaneInfo(), outStream); + // todo maybe similarly to windowedValue, should we propagate metadata with paneinfo bit } } @@ -201,7 +214,15 @@ public Timer decode(InputStream inStream) throws IOException { Instant fireTimestamp = InstantCoder.of().decode(inStream); Instant holdTimestamp = InstantCoder.of().decode(inStream); PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream); - return Timer.of(userKey, dynamicTimerTag, windows, fireTimestamp, holdTimestamp, paneInfo); + // todo maybe similarly to windowedValue, should we propagate metadata with paneinfo bit + return Timer.of( + userKey, + dynamicTimerTag, + windows, + fireTimestamp, + holdTimestamp, + paneInfo, + CausedByDrain.NORMAL); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java index e96bf27c6ed9..06f43350f5ae 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.CausedByDrain; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,7 +58,8 @@ public void testTimer() { Collections.singleton(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL); assertEquals("key", timer.getUserKey()); assertEquals("tag", timer.getDynamicTimerTag()); assertEquals(FIRE_TIME, timer.getFireTimestamp()); @@ -79,7 +81,8 @@ public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exc Collections.singleton(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING)); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL)); CoderProperties.structuralValueDecodeEncodeEqual( coder, Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE))); CoderProperties.structuralValueConsistentWithEquals( @@ -90,14 +93,16 @@ public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exc Collections.singleton(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING), + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL), Timer.of( "key", "tag", Collections.singleton(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING)); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL)); CoderProperties.structuralValueConsistentWithEquals( coder, Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)), @@ -115,7 +120,8 @@ public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Excep Collections.singletonList(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING)); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL)); CoderProperties.coderDecodeEncodeEqual( coder, Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE))); CoderProperties.coderConsistentWithEquals( @@ -126,14 +132,16 @@ public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Excep Collections.singletonList(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING), + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL), Timer.of( "key", "tag", Collections.singletonList(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING)); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL)); CoderProperties.coderConsistentWithEquals( coder, Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)), @@ -146,14 +154,16 @@ public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Excep Collections.singletonList(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING), + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL), Timer.of( "key", "tag", Collections.singletonList(GlobalWindow.INSTANCE), FIRE_TIME, HOLD_TIME, - PaneInfo.NO_FIRING)); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL)); CoderProperties.coderDeterministic( coder, Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)), diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 1b7d75f6ec32..78aceeaab198 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -102,6 +102,7 @@ import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.RehydratedComponents; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.OutputBuilder; import org.apache.beam.sdk.values.PCollectionView; @@ -247,6 +248,7 @@ public final void addRunnerForPTransform(Context context) throws IOException { private WindowedValue currentElement; private Object currentKey; + private CausedByDrain causedByDrain; /** * Only valid during {@link @@ -1200,6 +1202,8 @@ private void processTimer( checkNotNull(timerBundleTracker); try { currentKey = timer.getUserKey(); + causedByDrain = timer.causedByDrain(); + // add drain Iterator windowIterator = (Iterator) timer.getWindows().iterator(); while (windowIterator.hasNext()) { @@ -1531,7 +1535,8 @@ private Timer getTimerForTime(Instant scheduledTime) { Collections.singletonList(boundedWindow), scheduledTime, outputTimestamp, - paneInfo); + paneInfo, + causedByDrain); } } @@ -1848,6 +1853,11 @@ public TimerMap timerFamily(String timerFamilyId) { currentElement.getTimestamp(), currentElement.getPaneInfo()); } + + @Override + public CausedByDrain causedByDrain() { + return currentElement.causedByDrain(); + } } /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -1929,6 +1939,11 @@ public void outputWindowedValue( } outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo)); } + + @Override + public CausedByDrain causedByDrain() { + return currentElement.causedByDrain(); + } } /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2278,6 +2293,7 @@ public OutputBuilder builder(OutputT value) { .setWindow(currentWindow) .setTimestamp(currentTimer.getHoldTimestamp()) .setPaneInfo(currentTimer.getPaneInfo()) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> { checkOnWindowExpirationTimestamp(windowedValue.getTimestamp()); @@ -2395,6 +2411,7 @@ public OutputBuilder builder(Row value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> context.outputWindowedValue( @@ -2432,6 +2449,7 @@ public OutputBuilder builder(T value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> context.outputWindowedValue( @@ -2469,6 +2487,7 @@ public OutputBuilder builder(Row value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> context.outputWindowedValue( @@ -2546,6 +2565,11 @@ public BoundedWindow window() { return currentWindow; } + @Override + public CausedByDrain causedByDrain() { + return causedByDrain; + } + @Override public OutputBuilder builder(OutputT value) { return WindowedValues.builder() @@ -2553,6 +2577,7 @@ public OutputBuilder builder(OutputT value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setPaneInfo(currentTimer.getPaneInfo()) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> { checkTimerTimestamp(windowedValue.getTimestamp()); @@ -2734,6 +2759,7 @@ public OutputBuilder builder(T value) { .setValue(value) .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) + .setCausedByDrain(causedByDrain) .setPaneInfo(currentTimer.getPaneInfo()) .setReceiver( windowedValue -> @@ -2772,6 +2798,7 @@ public OutputBuilder builder(Row value) { .setTimestamp(currentTimer.getHoldTimestamp()) .setWindow(currentWindow) .setPaneInfo(currentTimer.getPaneInfo()) + .setCausedByDrain(causedByDrain) .setReceiver( windowedValue -> context.outputWindowedValue( diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index ef19b7c18804..50a2fec0b5a2 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -123,6 +123,7 @@ import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.graph.ProtoOverrides; import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -1161,7 +1162,8 @@ private org.apache.beam.sdk.util.construction.Timer dynamicTimerInGlobalW Collections.singletonList(GlobalWindow.INSTANCE), fireTimestamp, holdTimestamp, - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL); } private WindowedValue valueInWindows( diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java index 13a0b105ec39..756f17fdfa32 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.graph.ProtoOverrides; import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -123,7 +124,8 @@ private org.apache.beam.sdk.util.construction.Timer dynamicTimerInGlobalW Collections.singletonList(GlobalWindow.INSTANCE), fireTimestamp, holdTimestamp, - PaneInfo.NO_FIRING); + PaneInfo.NO_FIRING, + CausedByDrain.NORMAL); } private WindowedValue valueInWindows( diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 52b6c87a5c05..d86ef653dcab 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -145,6 +145,7 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.ParDoTranslation; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.values.CausedByDrain; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; @@ -1129,7 +1130,8 @@ public void testInstructionEmbeddedElementsAreProcessed() throws Exception { Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), - PaneInfo.ON_TIME_AND_ONLY_FIRING), + PaneInfo.ON_TIME_AND_ONLY_FIRING, + CausedByDrain.NORMAL), encodedTimer); Elements elements = Elements.newBuilder() @@ -1248,7 +1250,8 @@ public void testInstructionEmbeddedElementsWithMalformedTimers() throws Exceptio Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), - PaneInfo.ON_TIME_AND_ONLY_FIRING), + PaneInfo.ON_TIME_AND_ONLY_FIRING, + CausedByDrain.NORMAL), encodedTimer); assertThrows( @@ -1342,7 +1345,8 @@ public void testOutputEmbeddedElementsAreProcessed() throws Exception { Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), - PaneInfo.ON_TIME_AND_ONLY_FIRING), + PaneInfo.ON_TIME_AND_ONLY_FIRING, + CausedByDrain.NORMAL), encodedTimer); InstructionResponse.Builder builder = @@ -1961,7 +1965,8 @@ public void testTimerMetrics() throws Exception { Collections.singletonList(GlobalWindow.INSTANCE), Instant.ofEpochMilli(1L), Instant.ofEpochMilli(1L), - PaneInfo.ON_TIME_AND_ONLY_FIRING), + PaneInfo.ON_TIME_AND_ONLY_FIRING, + CausedByDrain.NORMAL), encodedTimer); Elements elements = Elements.newBuilder()