From 9a5de4f46ce1cec45d4aab78ebdcac2f4701deb7 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 5 Dec 2025 15:05:26 +0100 Subject: [PATCH] [Drain] Propagate drain information to timerData and TImer data structures. mostly noop. move from boolean to CausedByDrain { CAUSED_BY_DRAIN, NORMAL } dsdas --- .../runners/core/ReduceFnContextFactory.java | 12 +- .../SplittableParDoViaKeyedWorkItems.java | 6 +- .../beam/runners/core/TimerInternals.java | 67 +++++++-- .../core/InMemoryTimerInternalsTest.java | 84 +++++++++-- .../runners/core/KeyedWorkItemCoderTest.java | 3 +- .../beam/runners/core/ReduceFnTester.java | 9 +- .../runners/core/SimpleDoFnRunnerTest.java | 3 +- ...SimplePushbackSideInputDoFnRunnerTest.java | 6 +- .../beam/runners/core/TimerInternalsTest.java | 86 +++++++++-- .../triggers/TriggerStateMachineTester.java | 12 +- .../direct/DirectTimerInternalsTest.java | 18 ++- ...ManagerRemovingTransformEvaluatorTest.java | 3 +- .../runners/direct/EvaluationContextTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java | 137 ++++++++++++++---- .../worker/WindmillTimerInternals.java | 12 +- .../StreamingGroupAlsoByWindowFnsTest.java | 3 +- ...gKeyedWorkItemSideInputDoFnRunnerTest.java | 3 +- .../StreamingModeExecutionContextTest.java | 3 +- .../worker/StreamingSideInputFetcherTest.java | 3 +- .../worker/UserParDoFnFactoryTest.java | 9 +- .../worker/WindmillKeyedWorkItemTest.java | 6 +- .../state/WindmillTagEncodingV1Test.java | 30 +++- .../samza/runtime/ClassicBundleManager.java | 3 +- .../samza/runtime/PortableBundleManager.java | 3 +- .../samza/runtime/KeyedTimerDataTest.java | 7 +- .../SamzaTimerInternalsFactoryTest.java | 84 +++++++++-- 26 files changed, 501 insertions(+), 117 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 1ca927f16caa..f09c0cd6eff2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -135,17 +135,23 @@ public TimersImpl(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); + timerInternals.setTimer( + TimerData.of( + namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain)); + timerInternals.setTimer( + TimerData.of( + namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); + timerInternals.deleteTimer( + TimerData.of( + namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 9cf6db23f244..4cc96ee4ad54 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -600,7 +600,11 @@ public String getErrorContext() { // Set a timer to continue processing this element. timerInternals.setTimer( TimerInternals.TimerData.of( - stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME)); + stateNamespace, + wakeupTime, + wakeupTime, + TimeDomain.PROCESSING_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL)); } private DoFnInvoker.ArgumentProvider wrapOptionsAsSetup( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java index 254e6f5fcf5b..92b9f92d0594 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java @@ -168,6 +168,10 @@ void deleteTimer( /** Data about a timer as represented within {@link TimerInternals}. */ @AutoValue abstract class TimerData implements Comparable { + public enum CausedByDrain { + CAUSED_BY_DRAIN, + NORMAL + } public abstract String getTimerId(); @@ -188,6 +192,8 @@ abstract class TimerData implements Comparable { public abstract boolean getDeleted(); + public abstract CausedByDrain causedByDrain(); + // When adding a new field, make sure to add it to the compareTo() method. /** Construct a {@link TimerData} for the given parameters. */ @@ -196,9 +202,10 @@ public static TimerData of( StateNamespace namespace, Instant timestamp, Instant outputTimestamp, - TimeDomain domain) { + TimeDomain domain, + CausedByDrain causedByDrain) { return new AutoValue_TimerInternals_TimerData( - timerId, "", namespace, timestamp, outputTimestamp, domain, false); + timerId, "", namespace, timestamp, outputTimestamp, domain, false, causedByDrain); } /** @@ -211,9 +218,34 @@ public static TimerData of( StateNamespace namespace, Instant timestamp, Instant outputTimestamp, - TimeDomain domain) { + TimeDomain domain, + CausedByDrain causedByDrain) { return new AutoValue_TimerInternals_TimerData( - timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain, false); + timerId, + timerFamilyId, + namespace, + timestamp, + outputTimestamp, + domain, + false, + causedByDrain); + } + + public static TimerData of( + String timerId, + String timerFamilyId, + StateNamespace namespace, + Instant timestamp, + Instant outputTimestamp, + TimeDomain domain) { + return of( + timerId, + timerFamilyId, + namespace, + timestamp, + outputTimestamp, + domain, + TimerData.CausedByDrain.NORMAL); } /** @@ -221,9 +253,13 @@ public static TimerData of( * deterministically generated from the {@code timestamp} and {@code domain}. */ public static TimerData of( - StateNamespace namespace, Instant timestamp, Instant outputTimestamp, TimeDomain domain) { + StateNamespace namespace, + Instant timestamp, + Instant outputTimestamp, + TimeDomain domain, + CausedByDrain causedByDrain) { String timerId = String.valueOf(domain.ordinal()) + ':' + timestamp.getMillis(); - return of(timerId, namespace, timestamp, outputTimestamp, domain); + return of(timerId, namespace, timestamp, outputTimestamp, domain, causedByDrain); } public TimerData deleted() { @@ -234,7 +270,8 @@ public TimerData deleted() { getTimestamp(), getOutputTimestamp(), getDomain(), - true); + true, + causedByDrain()); } /** @@ -272,7 +309,9 @@ public String stringKey() { + "/" + getTimerFamilyId() + ":" - + getTimerId(); + + getTimerId() + + ":" + + causedByDrain(); } } @@ -309,7 +348,14 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException Instant timestamp = INSTANT_CODER.decode(inStream); Instant outputTimestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - return TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, domain); + return TimerData.of( + timerId, + timerFamilyId, + namespace, + timestamp, + outputTimestamp, + domain, + TimerData.CausedByDrain.NORMAL); } @Override @@ -355,7 +401,8 @@ public TimerData decode(InputStream inStream) throws CoderException, IOException StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream)); - return TimerData.of(timerId, namespace, timestamp, timestamp, domain); + return TimerData.of( + timerId, namespace, timestamp, timestamp, domain, TimerData.CausedByDrain.NORMAL); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index abdc49d887ea..0444048b0798 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -41,9 +41,21 @@ public class InMemoryTimerInternalsTest { public void testFiringEventTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTimer1 = - TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData.of( + ID1, + NS1, + new Instant(19), + new Instant(19), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData eventTimer2 = - TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME); + TimerData.of( + ID2, + NS1, + new Instant(29), + new Instant(29), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); underTest.setTimer(eventTimer1); underTest.setTimer(eventTimer2); @@ -111,9 +123,19 @@ public void testDeletionById() throws Exception { public void testFiringProcessingTimeTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData processingTime1 = - TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, + new Instant(19), + new Instant(19), + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTime2 = - TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, + new Instant(29), + new Instant(29), + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); @@ -142,19 +164,47 @@ public void testFiringProcessingTimeTimers() throws Exception { public void testTimerOrdering() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime1 = - TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData.of( + NS1, + new Instant(19), + new Instant(19), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTime1 = - TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, + new Instant(19), + new Instant(19), + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData synchronizedProcessingTime1 = TimerData.of( - NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + NS1, + new Instant(19), + new Instant(19), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData eventTime2 = - TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME); + TimerData.of( + NS1, + new Instant(29), + new Instant(29), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTime2 = - TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, + new Instant(29), + new Instant(29), + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData synchronizedProcessingTime2 = TimerData.of( - NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + NS1, + new Instant(29), + new Instant(29), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); underTest.setTimer(processingTime1); underTest.setTimer(eventTime1); @@ -188,9 +238,19 @@ public void testTimerOrdering() throws Exception { public void testDeduplicate() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData eventTime = - TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME); + TimerData.of( + NS1, + new Instant(19), + new Instant(19), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTime = - TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData.of( + NS1, + new Instant(19), + new Instant(19), + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); underTest.setTimer(eventTime); underTest.setTimer(eventTime); underTest.setTimer(processingTime); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java index 6099f1fdc8e1..e5355dd3cfe5 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java @@ -48,7 +48,8 @@ public void testEncodeDecodeEqual() throws Exception { StateNamespaces.global(), new Instant(500L), new Instant(500L), - TimeDomain.EVENT_TIME)); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)); Iterable> elements = ImmutableList.of( WindowedValues.valueInGlobalWindow(1), diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 55791d582717..ecaf3c9433cb 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -574,7 +574,11 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc ArrayList timers = new ArrayList<>(1); timers.add( TimerData.of( - StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain)); + StateNamespaces.window(windowFn.windowCoder(), window), + timestamp, + timestamp, + domain, + TimerData.CausedByDrain.NORMAL)); runner.onTimers(timers); runner.persist(); } @@ -588,7 +592,8 @@ public void fireTimers(W window, TimestampedValue... timers) throws StateNamespaces.window(windowFn.windowCoder(), window), timer.getTimestamp(), timer.getTimestamp(), - timer.getValue())); + timer.getValue(), + TimerData.CausedByDrain.NORMAL)); } runner.onTimers(timerData); runner.persist(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java index 40bcb70e32c6..e13b97ee62db 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java @@ -701,7 +701,8 @@ public void onTimer(OnTimerContext context) { StateNamespaces.window(windowCoder, (W) context.window()), context.fireTimestamp(), context.timestamp(), - context.timeDomain())); + context.timeDomain(), + TimerData.CausedByDrain.NORMAL)); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java index b7dda599b8d2..fe95fcec2e8d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java @@ -317,7 +317,8 @@ public void testOnTimerCalled() { StateNamespaces.window(IntervalWindow.getCoder(), window), timestamp, timestamp, - TimeDomain.EVENT_TIME))); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL))); } private static class TestDoFnRunner implements DoFnRunner { @@ -359,7 +360,8 @@ public void onTimer( StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window), timestamp, outputTimestamp, - timeDomain)); + timeDomain, + TimerData.CausedByDrain.NORMAL)); } @Override diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java index 83fb30eb8120..0e0b8ba40982 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java @@ -46,7 +46,8 @@ public void testTimerDataCoder() throws Exception { StateNamespaces.global(), new Instant(0), new Instant(0), - TimeDomain.EVENT_TIME)); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)); Coder windowCoder = IntervalWindow.getCoder(); CoderProperties.coderDecodeEncodeEqual( @@ -57,7 +58,8 @@ public void testTimerDataCoder() throws Exception { windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), new Instant(99), - TimeDomain.PROCESSING_TIME)); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL)); } @Test @@ -69,12 +71,25 @@ public void testCoderIsSerializableWithWellKnownCoderType() { public void testCompareEqual() { Instant timestamp = new Instant(100); StateNamespace namespace = StateNamespaces.global(); - TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + "id", + namespace, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); assertThat( timer, comparesEqualTo( - TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME))); + TimerData.of( + "id", + namespace, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL))); } @Test @@ -84,9 +99,19 @@ public void testCompareByTimestamp() { StateNamespace namespace = StateNamespaces.global(); TimerData firstTimer = - TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME); + TimerData.of( + namespace, + firstTimestamp, + firstTimestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData secondTimer = - TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME); + TimerData.of( + namespace, + secondTimestamp, + secondTimestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); assertThat(firstTimer, lessThan(secondTimer)); } @@ -96,10 +121,23 @@ public void testCompareByDomain() { Instant timestamp = new Instant(100); StateNamespace namespace = StateNamespaces.global(); - TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); - TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME); + TimerData eventTimer = + TimerData.of( + namespace, timestamp, timestamp, TimeDomain.EVENT_TIME, TimerData.CausedByDrain.NORMAL); + TimerData procTimer = + TimerData.of( + namespace, + timestamp, + timestamp, + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData synchronizedProcTimer = - TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimerData.of( + namespace, + timestamp, + timestamp, + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); assertThat(eventTimer, lessThan(procTimer)); assertThat(eventTimer, lessThan(synchronizedProcTimer)); @@ -117,9 +155,19 @@ public void testCompareByNamespace() { StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow); TimerData secondEventTime = - TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData.of( + firstWindowNs, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData thirdEventTime = - TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData.of( + secondWindowNs, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); assertThat(secondEventTime, lessThan(thirdEventTime)); } @@ -130,9 +178,21 @@ public void testCompareByTimerId() { StateNamespace namespace = StateNamespaces.global(); TimerData id0Timer = - TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData.of( + "id0", + namespace, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData id1Timer = - TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME); + TimerData.of( + "id1", + namespace, + timestamp, + timestamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); assertThat(id0Timer, lessThan(id1Timer)); } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 21dfddce7a6e..4819efa73f96 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -400,17 +400,23 @@ public TestTimers(StateNamespace namespace) { @Override public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); + timerInternals.setTimer( + TimerData.of( + namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override public void setTimer(Instant timestamp, Instant outputTimestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, outputTimestamp, timeDomain)); + timerInternals.setTimer( + TimerData.of( + namespace, timestamp, outputTimestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain)); + timerInternals.deleteTimer( + TimerData.of( + namespace, timestamp, timestamp, timeDomain, TimerData.CausedByDrain.NORMAL)); } @Override diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java index b81ca28da077..cf9296139ddb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java @@ -69,19 +69,22 @@ public void setTimerAddsToBuilder() { StateNamespaces.global(), new Instant(20145L), new Instant(20145L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); internals.setTimer(eventTimer); internals.setTimer(processingTimer); internals.setTimer(synchronizedProcessingTimer); @@ -98,19 +101,22 @@ public void deleteTimerDeletesOnBuilder() { StateNamespaces.global(), new Instant(20145L), new Instant(20145L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData processingTimer = TimerData.of( StateNamespaces.global(), new Instant(125555555L), new Instant(125555555L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData synchronizedProcessingTimer = TimerData.of( StateNamespaces.global(), new Instant(98745632189L), new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); internals.deleteTimer(eventTimer); internals.deleteTimer(processingTimer); internals.deleteTimer(synchronizedProcessingTimer); diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java index 1a9cdc187bf9..d0da3a293dbe 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java @@ -110,7 +110,8 @@ public void removesOnExceptionInOnTimer() throws Exception { StateNamespaces.global(), new Instant(0), new Instant(0), - TimeDomain.EVENT_TIME), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL), "", GlobalWindow.INSTANCE); } catch (Exception e) { diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 3e470c63c5dd..0427c2ed5401 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -334,7 +334,11 @@ public void extractFiredTimersExtractsTimers() { StructuralKey key = StructuralKey.of("foo".length(), VarIntCoder.of()); TimerData toFire = TimerData.of( - StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME); + StateNamespaces.global(), + new Instant(100L), + new Instant(100L), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TransformResult timerResult = StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 785c59bae84f..a2e163acc648 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -1002,13 +1002,15 @@ public void getSynchronizedProcessingTimeOutputHeldToPendingTimers() { StateNamespaces.global(), new Instant(250L), new Instant(250L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData futureTimer = TimerData.of( StateNamespaces.global(), new Instant(4096L), new Instant(4096L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build(); manager.updateWatermarks( impulseBundle, @@ -1144,7 +1146,11 @@ public void synchronizedProcessingInputTimeIsHeldToUpstreamProcessingTimeTimers( Instant upstreamHold = new Instant(2048L); TimerData upstreamProcessingTimer = TimerData.of( - StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME); + StateNamespaces.global(), + upstreamHold, + upstreamHold, + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); manager.updateWatermarks( impulseBundle, TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of())) @@ -1236,19 +1242,25 @@ public void extractFiredTimersReturnsFiredEventTimeTimers() { TimerData earliestTimer = TimerData.of( - StateNamespaces.global(), new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME); + StateNamespaces.global(), + new Instant(1000), + new Instant(1000), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1324,19 +1336,22 @@ public void extractFiredTimersReturnsFiredProcessingTimeTimers() { StateNamespaces.global(), new Instant(999L), new Instant(999L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(-12L, VarLongCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1412,19 +1427,22 @@ public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { StateNamespaces.global(), new Instant(999L), new Instant(999L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData middleTimer = TimerData.of( StateNamespaces.global(), new Instant(5000L), new Instant(5000L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData lastTimer = TimerData.of( StateNamespaces.global(), new Instant(10000L), new Instant(10000L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); StructuralKey key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of()); TimerUpdate update = TimerUpdate.builder(key) @@ -1491,7 +1509,8 @@ public void processingTimeTimersCanBeReset() { StateNamespaces.global(), new Instant(5000L), new Instant(5000L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( @@ -1499,7 +1518,8 @@ public void processingTimeTimersCanBeReset() { StateNamespaces.global(), new Instant(10000L), new Instant(10000L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1547,14 +1567,16 @@ public void eventTimeTimersCanBeReset() { StateNamespaces.global(), new Instant(1000L), new Instant(1000L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData overridingTimer = TimerData.of( timerId, StateNamespaces.global(), new Instant(2000L), new Instant(2000L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build(); TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build(); @@ -1617,14 +1639,16 @@ public void inputWatermarkDuplicates() { StateNamespaces.global(), new Instant(100), new Instant(100), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData timer2 = TimerData.of( "a", StateNamespaces.global(), new Instant(200), new Instant(200), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build()); // Only the last timer update should be observable @@ -1656,25 +1680,32 @@ public void inputWatermarkDuplicates() { public void timerUpdateBuilderBuildAddsAllAddedTimers() { TimerData set = TimerData.of( - StateNamespaces.global(), new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME); + StateNamespaces.global(), + new Instant(10L), + new Instant(10L), + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerData deleted = TimerData.of( StateNamespaces.global(), new Instant(24L), new Instant(24L), - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData completedOne = TimerData.of( StateNamespaces.global(), new Instant(1024L), new Instant(1024L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + TimeDomain.SYNCHRONIZED_PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); TimerData completedTwo = TimerData.of( StateNamespaces.global(), new Instant(2048L), new Instant(2048L), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate update = TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of())) @@ -1692,7 +1723,12 @@ public void timerUpdateBuilderBuildAddsAllAddedTimers() { public void timerUpdateBuilderWithSetAtEndOfTime() { Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE; TimerData tooFar = - TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + timerStamp, + timerStamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1704,7 +1740,12 @@ public void timerUpdateBuilderWithSetAtEndOfTime() { public void timerUpdateBuilderWithSetPastEndOfTime() { Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2)); TimerData tooFar = - TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME); + TimerData.of( + StateNamespaces.global(), + timerStamp, + timerStamp, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty()); thrown.expect(IllegalArgumentException.class); @@ -1716,7 +1757,13 @@ public void timerUpdateBuilderWithSetPastEndOfTime() { public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build(); @@ -1728,7 +1775,13 @@ public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() { public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build(); @@ -1740,7 +1793,13 @@ public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() { public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.setTimer(timer); @@ -1753,7 +1812,13 @@ public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() { public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.deletedTimer(timer); @@ -1766,7 +1831,13 @@ public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() { public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.build(); builder.withCompletedTimers(ImmutableList.of(timer)); @@ -1779,7 +1850,13 @@ public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() { public void timerUpdateWithCompletedTimersNotAddedToExisting() { TimerUpdateBuilder builder = TimerUpdate.builder(null); Instant now = Instant.now(); - TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME); + TimerData timer = + TimerData.of( + StateNamespaces.global(), + now, + now, + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); TimerUpdate built = builder.build(); assertThat(built.getCompletedTimers(), emptyIterable()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java index cb41aa1ccab4..99c10ba23dad 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java @@ -101,7 +101,14 @@ public void setTimer( Instant outputTimestamp, TimeDomain timeDomain) { TimerData timer = - TimerData.of(timerId, timerFamilyId, namespace, timestamp, outputTimestamp, timeDomain); + TimerData.of( + timerId, + timerFamilyId, + namespace, + timestamp, + outputTimestamp, + timeDomain, + TimerData.CausedByDrain.NORMAL); setTimer(timer); } @@ -138,7 +145,8 @@ public void deleteTimer( namespace, BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE, - timeDomain)); + timeDomain, + TimerData.CausedByDrain.NORMAL)); } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java index 094623b81311..520e9b3f5745 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java @@ -158,7 +158,8 @@ private void addTimer( timestamp, type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME - : TimeDomain.PROCESSING_TIME))) + : TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp)) .setType(type) .setStateFamily(STATE_FAMILY); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java index de4ec8e5aa9e..d30c59b2709a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java @@ -166,7 +166,8 @@ private TimerData timerData(IntervalWindow window, Instant timestamp, Timer.Type StateNamespaces.window(IntervalWindow.getCoder(), window), timestamp, timestamp, - type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME); + type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME, + TimerData.CausedByDrain.NORMAL); } private IntervalWindow window(long start, long end) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 93b279f0aec5..a3e6933d5b4f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -173,7 +173,8 @@ public void testTimerInternalsSetTimer() { new StateNamespaceForTest("key"), new Instant(5000), new Instant(5000), - TimeDomain.EVENT_TIME)); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)); executionContext.flushState(); Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java index cf616ee6ac0d..6400bb8b10d7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java @@ -210,7 +210,8 @@ private TimerData createTimer(long timestamp) { StateNamespaces.window(IntervalWindow.getCoder(), createWindow(timestamp)), new Instant(timestamp), new Instant(timestamp), - TimeDomain.EVENT_TIME); + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL); } private IntervalWindow createWindow(long timestamp) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java index 771ffe192baf..00614c4b7397 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java @@ -462,7 +462,8 @@ public void testCleanupTimerForGlobalWindowWithAllowedLateness() throws Exceptio globalWindowNamespace, BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)), - TimeDomain.EVENT_TIME)) + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)) .thenReturn(null); // Set up non-empty state. We don't mock + verify calls to clear() but instead @@ -537,7 +538,8 @@ public void testCleanupWorks() throws Exception { firstWindowNamespace, firstWindow.maxTimestamp().plus(Duration.millis(1L)), firstWindow.maxTimestamp().plus(Duration.millis(1L)), - TimeDomain.EVENT_TIME)) + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)) .thenReturn(null); // This should fire the timer to clean up the first window @@ -553,7 +555,8 @@ public void testCleanupWorks() throws Exception { secondWindowNamespace, secondWindow.maxTimestamp().plus(Duration.millis(1L)), secondWindow.maxTimestamp().plus(Duration.millis(1L)), - TimeDomain.EVENT_TIME)) + TimeDomain.EVENT_TIME, + TimerData.CausedByDrain.NORMAL)) .thenReturn(null); // And this should clean up the second window diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java index 2227c25ef15d..080be93a8955 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java @@ -202,7 +202,8 @@ private Windmill.Timer makeSerializedTimer( ns, new Instant(timestamp), new Instant(timestamp), - timerTypeToTimeDomain(type)))) + timerTypeToTimeDomain(type), + TimerData.CausedByDrain.NORMAL))) .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp))) .setType(type) .setStateFamily(STATE_FAMILY) @@ -210,7 +211,8 @@ private Windmill.Timer makeSerializedTimer( } private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) { - return TimerData.of(ns, new Instant(timestamp), new Instant(timestamp), domain); + return TimerData.of( + ns, new Instant(timestamp), new Instant(timestamp), domain, TimerData.CausedByDrain.NORMAL); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java index 73acdf937811..9c4e1daaf3d0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java @@ -143,9 +143,18 @@ public void testTimerDataToFromTimer() { for (Instant timestamp : TEST_TIMESTAMPS) { List anonymousTimers = ImmutableList.of( - TimerData.of(namespace, timestamp, timestamp, timeDomain), TimerData.of( - namespace, timestamp, timestamp.minus(Duration.millis(1)), timeDomain)); + namespace, + timestamp, + timestamp, + timeDomain, + TimerData.CausedByDrain.NORMAL), + TimerData.of( + namespace, + timestamp, + timestamp.minus(Duration.millis(1)), + timeDomain, + TimerData.CausedByDrain.NORMAL)); for (TimerData timer : anonymousTimers) { Instant expectedTimestamp = timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE) @@ -165,7 +174,11 @@ public void testTimerDataToFromTimer() { // output, we expect it to be bounded TimerData expected = TimerData.of( - timer.getNamespace(), timestamp, expectedTimestamp, timer.getDomain()); + timer.getNamespace(), + timestamp, + expectedTimestamp, + timer.getDomain(), + TimerData.CausedByDrain.NORMAL); assertThat(computed, equalTo(expected)); } @@ -173,7 +186,13 @@ public void testTimerDataToFromTimer() { for (String timerId : TEST_TIMER_IDS) { List timers = ImmutableList.of( - TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain), + TimerData.of( + timerId, + namespace, + timestamp, + timestamp, + timeDomain, + TimerData.CausedByDrain.NORMAL), TimerData.of( timerId, "family", namespace, timestamp, timestamp, timeDomain), TimerData.of( @@ -181,7 +200,8 @@ public void testTimerDataToFromTimer() { namespace, timestamp, timestamp.minus(Duration.millis(1)), - timeDomain), + timeDomain, + TimerData.CausedByDrain.NORMAL), TimerData.of( timerId, "family", diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java index cf68cdb5b65c..bc4eb94758cc 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java @@ -117,7 +117,8 @@ private void scheduleNextBundleCheck() { StateNamespaces.global(), nextBundleCheckTime, nextBundleCheckTime, - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java index a332aa2fbe60..8391049e7b14 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableBundleManager.java @@ -104,7 +104,8 @@ private void scheduleNextBundleCheck() { StateNamespaces.global(), nextBundleCheckTime, nextBundleCheckTime, - TimeDomain.PROCESSING_TIME); + TimeDomain.PROCESSING_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); bundleTimerScheduler.schedule( new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis()); } diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java index d3da93a054bf..9b19471fe7a7 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java @@ -42,7 +42,12 @@ public class KeyedTimerDataTest { public void testCoder() throws Exception { final TimerInternals.TimerData td = TimerInternals.TimerData.of( - "timer", StateNamespaces.global(), TIMESTAMP, TIMESTAMP, TimeDomain.EVENT_TIME); + "timer", + StateNamespaces.global(), + TIMESTAMP, + TIMESTAMP, + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); final String key = "timer-key"; final ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java index e2db0cb7c842..e9c90bbbf464 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java @@ -151,12 +151,22 @@ public void testEventTimeTimers() { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); + "timer1", + nameSpace, + new Instant(10), + new Instant(10), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); + "timer2", + nameSpace, + new Instant(100), + new Instant(100), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer2); timerInternalsFactory.setInputWatermark(new Instant(5)); @@ -190,7 +200,12 @@ public void testRestoreEventBufferSize() throws Exception { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); + "timer1", + nameSpace, + new Instant(10), + new Instant(10), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer1); store.close(); @@ -211,7 +226,12 @@ public void testRestoreEventBufferSize() throws Exception { final TimerInternals restoredTimerInternals = restoredFactory.timerInternalsForKey(key); final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timer2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME); + "timer2", + nameSpace, + new Instant(200), + new Instant(200), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); restoredTimerInternals.setTimer(timer2); // Timer 2 should be added to the Event buffer @@ -247,12 +267,22 @@ public void testRestore() throws Exception { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); + "timer1", + nameSpace, + new Instant(10), + new Instant(10), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); + "timer2", + nameSpace, + new Instant(100), + new Instant(100), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer2); store.close(); @@ -293,12 +323,22 @@ public void testProcessingTimeTimers() throws IOException { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME); + "timer1", + nameSpace, + new Instant(10), + new Instant(10), + TimeDomain.PROCESSING_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer1); final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME); + "timer2", + nameSpace, + new Instant(100), + new Instant(100), + TimeDomain.PROCESSING_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = @@ -344,18 +384,33 @@ public void testOverride() { final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey"); final TimerInternals.TimerData timer1 = TimerInternals.TimerData.of( - "timerId", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME); + "timerId", + nameSpace, + new Instant(10), + new Instant(10), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer1); // this timer should override the first timer final TimerInternals.TimerData timer2 = TimerInternals.TimerData.of( - "timerId", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME); + "timerId", + nameSpace, + new Instant(100), + new Instant(100), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer2); final TimerInternals.TimerData timer3 = TimerInternals.TimerData.of( - "timerId2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME); + "timerId2", + nameSpace, + new Instant(200), + new Instant(200), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer3); // this timer shouldn't override since it has a different id @@ -410,7 +465,12 @@ private void testMaxExpiredEventTimersProcessAtOnce( for (int i = 0; i < totalNumberOfTimersInStore; i++) { timer = TimerInternals.TimerData.of( - "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME); + "timer" + i, + nameSpace, + new Instant(i), + new Instant(i), + TimeDomain.EVENT_TIME, + TimerInternals.TimerData.CausedByDrain.NORMAL); timerInternals.setTimer(timer); }