From b6fa072a82525d35d5c57e65e4648e148d0c291a Mon Sep 17 00:00:00 2001 From: yehaolan Date: Fri, 27 Sep 2024 08:38:02 -0700 Subject: [PATCH 1/6] init --- .../org/apache/samza/config/TaskConfig.java | 7 + .../apache/samza/container/TaskInstance.scala | 34 +++-- .../samza/container/TaskInstanceMetrics.scala | 2 + .../samza/container/TestTaskInstance.scala | 125 ++++++++++++++++++ 4 files changed, 160 insertions(+), 8 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 0f168be18d..d3f0c0192c 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -65,6 +65,9 @@ public class TaskConfig extends MapConfig { public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms"; static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis(); + public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = "task.commit.skip.commit.during.failures.enabled"; + private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = false; + // how long to wait for a clean shutdown public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L; @@ -418,4 +421,8 @@ public long getWatermarkIdleTimeoutMs() { public double getWatermarkQuorumSizePercentage() { return getDouble(WATERMARK_QUORUM_SIZE_PERCENTAGE, DEFAULT_WATERMARK_QUORUM_SIZE_PERCENTAGE); } + + public boolean getSkipCommitDuringFailuresEnabled() { + return getBoolean(SKIP_COMMIT_DURING_FAILURES_ENABLED, DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED); + } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 70d9ca3800..9748cff2a8 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -135,6 +135,7 @@ class TaskInstance( @volatile var lastCommitStartTimeMs = System.currentTimeMillis() val commitMaxDelayMs = taskConfig.getCommitMaxDelayMs val commitTimeoutMs = taskConfig.getCommitTimeoutMs + val skipCommitDuringFailureEnabled = taskConfig.getSkipCommitDuringFailuresEnabled val commitInProgress = new Semaphore(1) val commitException = new AtomicReference[Exception]() @@ -312,10 +313,18 @@ class TaskInstance( val commitStartNs = System.nanoTime() // first check if there were any unrecoverable errors during the async stage of the pending commit - // and if so, shut down the container. + // If there is unrecoverable error and skipCommitDuringFailureEnabled is enabled, ignore the error. + // Otherwise, shut down the container. if (commitException.get() != null) { - throw new SamzaException("Unrecoverable error during pending commit for taskName: %s." format taskName, - commitException.get()) + if (skipCommitDuringFailureEnabled) { + warn("Ignored the commit failure for taskName %s: %s" format (taskName, commitException.get().getMessage)) + metrics.commitExceptionIgnored.set(metrics.commitExceptionIgnored.getValue + 1) + commitException.set(null) + commitInProgress.release() + } else { + throw new SamzaException("Unrecoverable error during pending commit for taskName: %s." format taskName, + commitException.get()) + } } // if no commit is in progress for this task, continue with this commit. @@ -339,10 +348,18 @@ class TaskInstance( if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) { val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs metrics.commitsTimedOut.set(metrics.commitsTimedOut.getValue + 1) - throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " + - "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " + - "and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit, - commitMaxDelayMs, commitTimeoutMs)) + if (skipCommitDuringFailureEnabled) { + warn("Ignoring commit timeout for taskName: %s. %s ms have elapsed since another commit started. " + + "Max allowed commit delay is %s ms and commit timeout beyond that is %s ms." + format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs)) + commitInProgress.release() + return + } else { + throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " + + "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " + + "and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit, + commitMaxDelayMs, commitTimeoutMs)) + } } } } @@ -426,7 +443,7 @@ class TaskInstance( } }) - metrics.lastCommitNs.set(System.nanoTime() - commitStartNs) + metrics.lastCommitNs.set(System.nanoTime()) metrics.commitSyncNs.update(System.nanoTime() - commitStartNs) debug("Finishing sync stage of commit for taskName: %s checkpointId: %s" format (taskName, checkpointId)) } @@ -533,6 +550,7 @@ class TaskInstance( } else { metrics.commitAsyncNs.update(System.nanoTime() - asyncStageStartNs) metrics.commitNs.update(System.nanoTime() - commitStartNs) + metrics.lastAsyncCommitNs.set(System.nanoTime()) } } finally { // release the permit indicating that previous commit is complete. diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 54d3665253..807d50efee 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -40,8 +40,10 @@ class TaskInstanceMetrics( val asyncCallbackCompleted = newCounter("async-callback-complete-calls") val commitsTimedOut = newGauge("commits-timed-out", 0) val commitsSkipped = newGauge("commits-skipped", 0) + val commitExceptionIgnored = newGauge("commit-exceptions-ignored", 0) val commitNs = newTimer("commit-ns") val lastCommitNs = newGauge("last-commit-ns", 0L) + val lastAsyncCommitNs = newGauge("last-async-commit-ns", 0L) val commitSyncNs = newTimer("commit-sync-ns") val commitAsyncNs = newTimer("commit-async-ns") val snapshotNs = newTimer("snapshot-ns") diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 6afec52e72..0e34d864d2 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -1004,6 +1004,131 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { verify(snapshotTimer, times(2)).update(anyLong()) } + @Test + def testSkipExceptionFromFirstCommitAndContinueSecondCommit(): Unit = { + val commitsCounter = mock[Counter] + when(this.metrics.commits).thenReturn(commitsCounter) + val snapshotTimer = mock[Timer] + when(this.metrics.snapshotNs).thenReturn(snapshotTimer) + val uploadTimer = mock[Timer] + when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) + val commitTimer = mock[Timer] + when(this.metrics.commitNs).thenReturn(commitTimer) + val commitSyncTimer = mock[Timer] + when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer) + val commitAsyncTimer = mock[Timer] + when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) + val cleanUpTimer = mock[Timer] + when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) + val skippedCounter = mock[Gauge[Int]] + when(this.metrics.commitsSkipped).thenReturn(skippedCounter) + val lastCommitGauge = mock[Gauge[Long]] + when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionIgnoredCounter = mock[Gauge[Int]] + when(this.metrics.commitExceptionIgnored).thenReturn(commitExceptionIgnoredCounter) + + val taskConfigsMap = new util.HashMap[String, String]() + taskConfigsMap.put("task.commit.ms", "-1") + taskConfigsMap.put("task.commit.max.delay.ms", "-1") + taskConfigsMap.put("task.commit.timeout.ms", "2000000") + // skip commit if exception occurs during the commit + taskConfigsMap.put("task.commit.skip.commit.during.failures.enabled", "true") + when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) + setupTaskInstance(None, ForkJoinPool.commonPool()) + + val inputOffsets = new util.HashMap[SystemStreamPartition, String]() + inputOffsets.put(SYSTEM_STREAM_PARTITION, "4") + val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]() + when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets) + // Ensure the second commit proceeds without exceptions + when(this.taskCommitManager.upload(any(), any())) + .thenReturn(CompletableFuture.completedFuture( + Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers))) + // exception during the first commit + when(this.taskCommitManager.upload(any(), any())) + .thenReturn(FutureUtil.failedFuture[util.Map[String, util.Map[String, String]]](new RuntimeException)) + + taskInstance.commit + verify(commitsCounter).inc() + verify(snapshotTimer).update(anyLong()) + verifyZeroInteractions(uploadTimer) + verifyZeroInteractions(commitTimer) + verifyZeroInteractions(skippedCounter) + Thread.sleep(1000) // ensure the commitException is updated by the previous commit + taskInstance.commit + verify(commitsCounter, times(2)).inc() // should only have been incremented twice - once for each commit + verify(commitExceptionIgnoredCounter).set(1) + } + + @Test + def testIgnoreTimeoutAndContinueCommitIfPreviousAsyncCommitInProgressAfterMaxCommitDelayAndBlockTime(): Unit = { + val commitsCounter = mock[Counter] + when(this.metrics.commits).thenReturn(commitsCounter) + val snapshotTimer = mock[Timer] + when(this.metrics.snapshotNs).thenReturn(snapshotTimer) + val commitTimer = mock[Timer] + when(this.metrics.commitNs).thenReturn(commitTimer) + val commitSyncTimer = mock[Timer] + when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer) + val commitAsyncTimer = mock[Timer] + when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) + val uploadTimer = mock[Timer] + when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) + val cleanUpTimer = mock[Timer] + when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) + val skippedCounter = mock[Gauge[Int]] + when(this.metrics.commitsSkipped).thenReturn(skippedCounter) + val commitsTimedOutCounter = mock[Gauge[Int]] + when(this.metrics.commitsTimedOut).thenReturn(commitsTimedOutCounter) + val lastCommitGauge = mock[Gauge[Long]] + when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionIgnoredCounter = mock[Gauge[Int]] + when(this.metrics.commitExceptionIgnored).thenReturn(commitExceptionIgnoredCounter) + + val inputOffsets = new util.HashMap[SystemStreamPartition, String]() + inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") + val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0)) + + val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]() + val stateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, "5")) + stateCheckpointMarkers.put("storeName", stateCheckpointMarker) + when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets) + + val snapshotSCMs = ImmutableMap.of(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers) + when(this.taskCommitManager.snapshot(any())).thenReturn(snapshotSCMs) + val snapshotSCMFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] = + CompletableFuture.completedFuture(snapshotSCMs) + + when(this.taskCommitManager.upload(any(), Matchers.eq(snapshotSCMs))).thenReturn(snapshotSCMFuture) // kafka is no-op + + val cleanUpFuture = new CompletableFuture[Void]() + when(this.taskCommitManager.cleanUp(any(), any())).thenReturn(cleanUpFuture) + + // use a separate executor to perform async operations on to test caller thread blocking behavior + val taskConfigsMap = new util.HashMap[String, String]() + taskConfigsMap.put("task.commit.ms", "-1") + // "block" immediately if previous commit async stage not complete + taskConfigsMap.put("task.commit.max.delay.ms", "-1") + taskConfigsMap.put("task.commit.timeout.ms", "0") // throw exception immediately if blocked + taskConfigsMap.put("task.commit.skip.commit.during.failures.enabled", "true") + when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) // override default behavior + + setupTaskInstance(None, ForkJoinPool.commonPool()) + + taskInstance.commit // async stage will not complete until cleanUpFuture is completed + taskInstance.commit // second commit found commit timeout and release the semaphore + cleanUpFuture.complete(null) // just to unblock shared executor + + verifyZeroInteractions(commitExceptionIgnoredCounter) + verifyZeroInteractions(skippedCounter) + verify(commitsTimedOutCounter).set(1) + verify(commitsCounter, times(1)).inc() // should only have been incremented once now - second commit was skipped + + taskInstance.commit // third commit should proceed without any issues + + verify(commitsCounter, times(2)).inc() // should only have been incremented twice - second commit was skipped + } + /** * Given that no application task context factory is provided, then no lifecycle calls should be made. From 8f2fb807fe1dd74b55fb699d77a97b2f32f192a1 Mon Sep 17 00:00:00 2001 From: yehaolan Date: Tue, 22 Oct 2024 07:38:50 -0700 Subject: [PATCH 2/6] add max limit for exceptions and timeouts --- .../org/apache/samza/config/TaskConfig.java | 14 ++ .../apache/samza/container/TaskInstance.scala | 45 +++-- .../samza/container/TaskInstanceMetrics.scala | 6 +- .../samza/container/TestTaskInstance.scala | 158 ++++++++++++++---- 4 files changed, 175 insertions(+), 48 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index d3f0c0192c..84b16f0334 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -68,6 +68,12 @@ public class TaskConfig extends MapConfig { public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = "task.commit.skip.commit.during.failures.enabled"; private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = false; + public static final String SKIP_COMMIT_EXCEPTION_MAX_LIMIT = "task.commit.skip.commit.exception.max.limit"; + private static final int DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT = 5; + + public static final String SKIP_COMMIT_TIMEOUT_MAX_LIMIT = "task.commit.skip.commit.timeout.max.limit"; + private static final int DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT = 2; + // how long to wait for a clean shutdown public static final String TASK_SHUTDOWN_MS = "task.shutdown.ms"; static final long DEFAULT_TASK_SHUTDOWN_MS = 30000L; @@ -425,4 +431,12 @@ public double getWatermarkQuorumSizePercentage() { public boolean getSkipCommitDuringFailuresEnabled() { return getBoolean(SKIP_COMMIT_DURING_FAILURES_ENABLED, DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED); } + + public int getSkipCommitExceptionMaxLimit() { + return getInt(SKIP_COMMIT_EXCEPTION_MAX_LIMIT, DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT); + } + + public int getSkipCommitTimeoutMaxLimit() { + return getInt(SKIP_COMMIT_TIMEOUT_MAX_LIMIT, DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT); + } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 9748cff2a8..224fa67ee0 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -133,9 +133,13 @@ class TaskInstance( val checkpointWriteVersions = new TaskConfig(config).getCheckpointWriteVersions @volatile var lastCommitStartTimeMs = System.currentTimeMillis() + @volatile var commitExceptionCounter = 0 + @volatile var commitTimeoutCounter = 0 val commitMaxDelayMs = taskConfig.getCommitMaxDelayMs val commitTimeoutMs = taskConfig.getCommitTimeoutMs val skipCommitDuringFailureEnabled = taskConfig.getSkipCommitDuringFailuresEnabled + val skipCommitExceptionMaxLimit = taskConfig.getSkipCommitExceptionMaxLimit + val skipCommitTimeoutMaxLimit = taskConfig.getSkipCommitTimeoutMaxLimit val commitInProgress = new Semaphore(1) val commitException = new AtomicReference[Exception]() @@ -313,17 +317,21 @@ class TaskInstance( val commitStartNs = System.nanoTime() // first check if there were any unrecoverable errors during the async stage of the pending commit - // If there is unrecoverable error and skipCommitDuringFailureEnabled is enabled, ignore the error. - // Otherwise, shut down the container. + // If there is unrecoverable error, increment the metric and the counter. + // Shutdown the container in the following scenarios: + // 1. skipCommitDuringFailureEnabled is not enabled + // 2. skipCommitDuringFailureEnabled is enabled but the number of exceptions exceeded the max count + // Otherwise, ignore the exception. if (commitException.get() != null) { - if (skipCommitDuringFailureEnabled) { + metrics.commitExceptions.inc() + commitExceptionCounter += 1 + if (!skipCommitDuringFailureEnabled || commitExceptionCounter > skipCommitExceptionMaxLimit) { + throw new SamzaException("Unrecoverable error during pending commit for taskName: %s. Exception Counter: %s" + format (taskName, commitExceptionCounter), commitException.get()) + } else { warn("Ignored the commit failure for taskName %s: %s" format (taskName, commitException.get().getMessage)) - metrics.commitExceptionIgnored.set(metrics.commitExceptionIgnored.getValue + 1) commitException.set(null) commitInProgress.release() - } else { - throw new SamzaException("Unrecoverable error during pending commit for taskName: %s." format taskName, - commitException.get()) } } @@ -337,7 +345,7 @@ class TaskInstance( if (timeSinceLastCommit < commitMaxDelayMs) { info("Skipping commit for taskName: %s since another commit is in progress. " + "%s ms have elapsed since the pending commit started." format (taskName, timeSinceLastCommit)) - metrics.commitsSkipped.set(metrics.commitsSkipped.getValue + 1) + metrics.commitsSkipped.inc() return } else { warn("Blocking processing for taskName: %s until in-flight commit is complete. " + @@ -345,20 +353,27 @@ class TaskInstance( "which is greater than the max allowed commit delay: %s." format (taskName, timeSinceLastCommit, commitMaxDelayMs)) + // Wait for the previous commit to complete within the timeout. + // If it doesn't complete within the timeout, increment metric and the counter. + // Shutdown the container in the following scenarios: + // 1. skipCommitDuringFailureEnabled is not enabled + // 2. skipCommitDuringFailureEnabled is enabled but the number of timeouts exceeded the max count + // Otherwise, ignore the timeout. if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) { val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs - metrics.commitsTimedOut.set(metrics.commitsTimedOut.getValue + 1) - if (skipCommitDuringFailureEnabled) { + metrics.commitsTimedOut.inc() + commitTimeoutCounter += 1 + if (!skipCommitDuringFailureEnabled || commitTimeoutCounter > skipCommitTimeoutMaxLimit) { + throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " + + "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " + + "and commit timeout beyond that is %s ms. Timeout Counter: %s" format (taskName, timeSinceLastCommit, + commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter)) + } else { warn("Ignoring commit timeout for taskName: %s. %s ms have elapsed since another commit started. " + "Max allowed commit delay is %s ms and commit timeout beyond that is %s ms." format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs)) commitInProgress.release() return - } else { - throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " + - "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " + - "and commit timeout beyond that is %s ms" format (taskName, timeSinceLastCommit, - commitMaxDelayMs, commitTimeoutMs)) } } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 807d50efee..81fb4751c0 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -38,9 +38,9 @@ class TaskInstanceMetrics( val pendingMessages = newGauge("pending-messages", 0) val messagesInFlight = newGauge("messages-in-flight", 0) val asyncCallbackCompleted = newCounter("async-callback-complete-calls") - val commitsTimedOut = newGauge("commits-timed-out", 0) - val commitsSkipped = newGauge("commits-skipped", 0) - val commitExceptionIgnored = newGauge("commit-exceptions-ignored", 0) + val commitsTimedOut = newCounter("commits-timed-out") + val commitsSkipped = newCounter("commits-skipped") + val commitExceptions = newCounter("commit-exceptions") val commitNs = newTimer("commit-ns") val lastCommitNs = newGauge("last-commit-ns", 0L) val lastAsyncCommitNs = newGauge("last-async-commit-ns", 0L) diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 0e34d864d2..ff52b40062 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -277,7 +277,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -370,7 +370,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) val uploadTimer = mock[Timer] when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val inputOffsets = Map(SYSTEM_STREAM_PARTITION -> "4").asJava @@ -431,7 +431,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) val uploadTimer = mock[Timer] when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) @@ -504,10 +504,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionsGauge = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionsGauge) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -556,10 +558,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionsGauge = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionsGauge) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -608,10 +612,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionsGauge = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionsGauge) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -661,10 +667,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionsGauge = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionsGauge) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -714,10 +722,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionsGauge = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionsGauge) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -768,7 +778,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) @@ -828,7 +838,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) @@ -859,7 +869,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { taskInstance.commit - verify(skippedCounter).set(1) + verify(skippedCounter, times(1)).inc() verify(commitsCounter, times(1)).inc() // should only have been incremented once on the initial commit verify(snapshotTimer).update(anyLong()) @@ -884,7 +894,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) @@ -947,7 +957,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) @@ -1020,12 +1030,12 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) - val commitExceptionIgnoredCounter = mock[Gauge[Int]] - when(this.metrics.commitExceptionIgnored).thenReturn(commitExceptionIgnoredCounter) + val commitExceptionCounter = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionCounter) val taskConfigsMap = new util.HashMap[String, String]() taskConfigsMap.put("task.commit.ms", "-1") @@ -1033,6 +1043,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { taskConfigsMap.put("task.commit.timeout.ms", "2000000") // skip commit if exception occurs during the commit taskConfigsMap.put("task.commit.skip.commit.during.failures.enabled", "true") + // should throw exception if second commit exception occurs + taskConfigsMap.put("task.commit.skip.commit.exception.max.limit", "1") when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) setupTaskInstance(None, ForkJoinPool.commonPool()) @@ -1048,20 +1060,87 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.taskCommitManager.upload(any(), any())) .thenReturn(FutureUtil.failedFuture[util.Map[String, util.Map[String, String]]](new RuntimeException)) + // First commit fails but should not throw exception taskInstance.commit verify(commitsCounter).inc() verify(snapshotTimer).update(anyLong()) verifyZeroInteractions(uploadTimer) verifyZeroInteractions(commitTimer) verifyZeroInteractions(skippedCounter) - Thread.sleep(1000) // ensure the commitException is updated by the previous commit + waitForCommitExceptionIsSet(100, 5) + // Second commit should succeed taskInstance.commit verify(commitsCounter, times(2)).inc() // should only have been incremented twice - once for each commit - verify(commitExceptionIgnoredCounter).set(1) + verify(commitExceptionCounter).inc() } @Test - def testIgnoreTimeoutAndContinueCommitIfPreviousAsyncCommitInProgressAfterMaxCommitDelayAndBlockTime(): Unit = { + def testCommitThrowsIfAllowSkipCommitButExceptionCountReachMaxLimit(): Unit = { + val commitsCounter = mock[Counter] + when(this.metrics.commits).thenReturn(commitsCounter) + val snapshotTimer = mock[Timer] + when(this.metrics.snapshotNs).thenReturn(snapshotTimer) + val uploadTimer = mock[Timer] + when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) + val commitTimer = mock[Timer] + when(this.metrics.commitNs).thenReturn(commitTimer) + val commitSyncTimer = mock[Timer] + when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer) + val commitAsyncTimer = mock[Timer] + when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer) + val cleanUpTimer = mock[Timer] + when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) + val skippedCounter = mock[Counter] + when(this.metrics.commitsSkipped).thenReturn(skippedCounter) + val lastCommitGauge = mock[Gauge[Long]] + when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) + val commitExceptionCounter = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionCounter) + + val taskConfigsMap = new util.HashMap[String, String]() + taskConfigsMap.put("task.commit.ms", "-1") + taskConfigsMap.put("task.commit.max.delay.ms", "-1") + taskConfigsMap.put("task.commit.timeout.ms", "2000000") + // skip commit if exception occurs during the commit + taskConfigsMap.put("task.commit.skip.commit.during.failures.enabled", "true") + // should throw exception if second commit exception occurs + taskConfigsMap.put("task.commit.skip.commit.exception.max.limit", "1") + when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) + setupTaskInstance(None, ForkJoinPool.commonPool()) + + val inputOffsets = new util.HashMap[SystemStreamPartition, String]() + inputOffsets.put(SYSTEM_STREAM_PARTITION, "4") + when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets) + // exception for commits + when(this.taskCommitManager.upload(any(), any())) + .thenReturn(FutureUtil.failedFuture[util.Map[String, util.Map[String, String]]](new RuntimeException)) + + // First commit fails but should not throw exception + taskInstance.commit + waitForCommitExceptionIsSet(100, 5) + // Second commit fails but should not throw exception + taskInstance.commit + verify(commitExceptionCounter).inc() + verify(commitsCounter, times(2)).inc() + verify(snapshotTimer, times(2)).update(anyLong()) + verifyZeroInteractions(uploadTimer) + verifyZeroInteractions(commitTimer) + verifyZeroInteractions(skippedCounter) + waitForCommitExceptionIsSet(100, 5) + // third commit should fail as the the commit exception counter is greater than the max limit + try { + taskInstance.commit + fail("Should have thrown an exception if exception count reached the max limit.") + } catch { + case e: Exception => + // expected + } + verify(commitExceptionCounter, times(2)).inc() + verify(commitsCounter, times(2)).inc() + } + + @Test + def testCommitThrowsIfAllowSkipTimeoutButTimeoutCountReachMaxLimit(): Unit = { val commitsCounter = mock[Counter] when(this.metrics.commits).thenReturn(commitsCounter) val snapshotTimer = mock[Timer] @@ -1076,14 +1155,14 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { when(this.metrics.asyncUploadNs).thenReturn(uploadTimer) val cleanUpTimer = mock[Timer] when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer) - val skippedCounter = mock[Gauge[Int]] + val skippedCounter = mock[Counter] when(this.metrics.commitsSkipped).thenReturn(skippedCounter) - val commitsTimedOutCounter = mock[Gauge[Int]] + val commitsTimedOutCounter = mock[Counter] when(this.metrics.commitsTimedOut).thenReturn(commitsTimedOutCounter) val lastCommitGauge = mock[Gauge[Long]] when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge) - val commitExceptionIgnoredCounter = mock[Gauge[Int]] - when(this.metrics.commitExceptionIgnored).thenReturn(commitExceptionIgnoredCounter) + val commitExceptionCounter = mock[Counter] + when(this.metrics.commitExceptions).thenReturn(commitExceptionCounter) val inputOffsets = new util.HashMap[SystemStreamPartition, String]() inputOffsets.put(SYSTEM_STREAM_PARTITION,"4") @@ -1111,22 +1190,30 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { taskConfigsMap.put("task.commit.max.delay.ms", "-1") taskConfigsMap.put("task.commit.timeout.ms", "0") // throw exception immediately if blocked taskConfigsMap.put("task.commit.skip.commit.during.failures.enabled", "true") + // should throw exception if second commit timeout occurs + taskConfigsMap.put("task.commit.skip.commit.timeout.max.limit", "1") when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) // override default behavior setupTaskInstance(None, ForkJoinPool.commonPool()) taskInstance.commit // async stage will not complete until cleanUpFuture is completed taskInstance.commit // second commit found commit timeout and release the semaphore - cleanUpFuture.complete(null) // just to unblock shared executor - verifyZeroInteractions(commitExceptionIgnoredCounter) + verifyZeroInteractions(commitExceptionCounter) verifyZeroInteractions(skippedCounter) - verify(commitsTimedOutCounter).set(1) + verify(commitsTimedOutCounter).inc() verify(commitsCounter, times(1)).inc() // should only have been incremented once now - second commit was skipped - - taskInstance.commit // third commit should proceed without any issues - - verify(commitsCounter, times(2)).inc() // should only have been incremented twice - second commit was skipped + taskInstance.commit // third commit should proceed without any issues and acquire the semaphore + try { + taskInstance.commit // fourth commit should throw exception as the timeout count reached the max limit + fail("Should have thrown an exception due to exceeding timeout limit.") + } catch { + case e: Exception => + // expected + } + verify(commitsTimedOutCounter, times(2)).inc() // incremented twice (second and fourth commit) + verify(commitsCounter, times(2)).inc() // incremented twice (first and third commit) + cleanUpFuture.complete(null) // just to unblock shared executor } @@ -1216,6 +1303,17 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar { externalContextOption = Some(this.externalContext), elasticityFactor = elasticityFactor) } + private def waitForCommitExceptionIsSet(sleepTimeInMs: Int, maxRetry: Int): Unit = { + var retries = 0 + while (taskInstance.commitException.get() == null && retries < maxRetry) { + retries += 1 + Thread.sleep(sleepTimeInMs) + } + if (taskInstance.commitException.get() == null) { + fail("Should have set the commit exception.") + } + } + /** * Task type which has all task traits, which can be mocked. */ From b3453c4c73dae17040e49dc8c3cee36901673175 Mon Sep 17 00:00:00 2001 From: yehaolan Date: Wed, 23 Oct 2024 14:26:51 -0700 Subject: [PATCH 3/6] add comments and reset counters --- .../main/java/org/apache/samza/config/TaskConfig.java | 5 +++++ .../org/apache/samza/container/TaskInstance.scala | 10 ++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 84b16f0334..c4ea6a1e0a 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -65,12 +65,17 @@ public class TaskConfig extends MapConfig { public static final String COMMIT_TIMEOUT_MS = "task.commit.timeout.ms"; static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis(); + // Flag to indicate whether to skip commit during failures (exceptions or timeouts) public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = "task.commit.skip.commit.during.failures.enabled"; private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = false; + // Maximum number of allowed commit exceptions. + // If the number of commit exceptions exceeds this limit, the task will be shut down. public static final String SKIP_COMMIT_EXCEPTION_MAX_LIMIT = "task.commit.skip.commit.exception.max.limit"; private static final int DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT = 5; + // Maximum number of allowed commit timeouts. + // If the number of commit timeout exceeds this limit, the task will be shut down. public static final String SKIP_COMMIT_TIMEOUT_MAX_LIMIT = "task.commit.skip.commit.timeout.max.limit"; private static final int DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT = 2; diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 224fa67ee0..b77b508bb2 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -329,9 +329,9 @@ class TaskInstance( throw new SamzaException("Unrecoverable error during pending commit for taskName: %s. Exception Counter: %s" format (taskName, commitExceptionCounter), commitException.get()) } else { - warn("Ignored the commit failure for taskName %s: %s" format (taskName, commitException.get().getMessage)) + warn("Ignored the commit failure for taskName %s: %s. Exception Counter: %s." + format (taskName, commitException.get().getMessage, commitExceptionCounter)) commitException.set(null) - commitInProgress.release() } } @@ -370,8 +370,8 @@ class TaskInstance( commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter)) } else { warn("Ignoring commit timeout for taskName: %s. %s ms have elapsed since another commit started. " + - "Max allowed commit delay is %s ms and commit timeout beyond that is %s ms." - format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs)) + "Max allowed commit delay is %s ms and commit timeout beyond that is %s ms. Timeout Counter: %s." + format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter)) commitInProgress.release() return } @@ -563,6 +563,8 @@ class TaskInstance( "Saved exception under Caused By.", commitException.get()) } } else { + commitExceptionCounter = 0 + commitTimeoutCounter = 0 metrics.commitAsyncNs.update(System.nanoTime() - asyncStageStartNs) metrics.commitNs.update(System.nanoTime() - commitStartNs) metrics.lastAsyncCommitNs.set(System.nanoTime()) From 7d88a04aa8a8bb884506d14fce6c801e39d49bde Mon Sep 17 00:00:00 2001 From: yehaolan Date: Fri, 8 Nov 2024 00:27:19 -0800 Subject: [PATCH 4/6] change exception/timeout counter to AtomicInteger --- .../apache/samza/container/TaskInstance.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index b77b508bb2..611bb60459 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -38,7 +38,7 @@ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals.toRichOptional import org.apache.samza.util.{Logging, ReflectionUtil, ScalaJavaUtil} import java.util -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import java.util.function.BiConsumer import java.util.function.Function import scala.collection.JavaConversions._ @@ -133,8 +133,8 @@ class TaskInstance( val checkpointWriteVersions = new TaskConfig(config).getCheckpointWriteVersions @volatile var lastCommitStartTimeMs = System.currentTimeMillis() - @volatile var commitExceptionCounter = 0 - @volatile var commitTimeoutCounter = 0 + val commitExceptionCounter = new AtomicInteger(0) + val commitTimeoutCounter = new AtomicInteger(0) val commitMaxDelayMs = taskConfig.getCommitMaxDelayMs val commitTimeoutMs = taskConfig.getCommitTimeoutMs val skipCommitDuringFailureEnabled = taskConfig.getSkipCommitDuringFailuresEnabled @@ -324,13 +324,13 @@ class TaskInstance( // Otherwise, ignore the exception. if (commitException.get() != null) { metrics.commitExceptions.inc() - commitExceptionCounter += 1 - if (!skipCommitDuringFailureEnabled || commitExceptionCounter > skipCommitExceptionMaxLimit) { + commitExceptionCounter.incrementAndGet() + if (!skipCommitDuringFailureEnabled || commitExceptionCounter.get() > skipCommitExceptionMaxLimit) { throw new SamzaException("Unrecoverable error during pending commit for taskName: %s. Exception Counter: %s" - format (taskName, commitExceptionCounter), commitException.get()) + format (taskName, commitExceptionCounter.get()), commitException.get()) } else { warn("Ignored the commit failure for taskName %s: %s. Exception Counter: %s." - format (taskName, commitException.get().getMessage, commitExceptionCounter)) + format (taskName, commitException.get().getMessage, commitExceptionCounter.get())) commitException.set(null) } } @@ -362,16 +362,16 @@ class TaskInstance( if (!commitInProgress.tryAcquire(commitTimeoutMs, TimeUnit.MILLISECONDS)) { val timeSinceLastCommit = System.currentTimeMillis() - lastCommitStartTimeMs metrics.commitsTimedOut.inc() - commitTimeoutCounter += 1 - if (!skipCommitDuringFailureEnabled || commitTimeoutCounter > skipCommitTimeoutMaxLimit) { + commitTimeoutCounter.incrementAndGet() + if (!skipCommitDuringFailureEnabled || commitTimeoutCounter.get() > skipCommitTimeoutMaxLimit) { throw new SamzaException("Timeout waiting for pending commit for taskName: %s to finish. " + "%s ms have elapsed since the pending commit started. Max allowed commit delay is %s ms " + "and commit timeout beyond that is %s ms. Timeout Counter: %s" format (taskName, timeSinceLastCommit, - commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter)) + commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter.get())) } else { warn("Ignoring commit timeout for taskName: %s. %s ms have elapsed since another commit started. " + "Max allowed commit delay is %s ms and commit timeout beyond that is %s ms. Timeout Counter: %s." - format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter)) + format (taskName, timeSinceLastCommit, commitMaxDelayMs, commitTimeoutMs, commitTimeoutCounter.get())) commitInProgress.release() return } @@ -563,8 +563,8 @@ class TaskInstance( "Saved exception under Caused By.", commitException.get()) } } else { - commitExceptionCounter = 0 - commitTimeoutCounter = 0 + commitExceptionCounter.set(0) + commitTimeoutCounter.set(0) metrics.commitAsyncNs.update(System.nanoTime() - asyncStageStartNs) metrics.commitNs.update(System.nanoTime() - commitStartNs) metrics.lastAsyncCommitNs.set(System.nanoTime()) From cdfe7aa8e8f4537230d0060b763732d01a09639d Mon Sep 17 00:00:00 2001 From: yehaolan Date: Mon, 18 Nov 2024 17:51:15 -0800 Subject: [PATCH 5/6] rename metric and update comment --- .../main/java/org/apache/samza/config/TaskConfig.java | 9 +++++---- .../scala/org/apache/samza/container/TaskInstance.scala | 2 +- .../org/apache/samza/container/TaskInstanceMetrics.scala | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index c4ea6a1e0a..276f3812ff 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -66,16 +66,17 @@ public class TaskConfig extends MapConfig { static final long DEFAULT_COMMIT_TIMEOUT_MS = Duration.ofMinutes(30).toMillis(); // Flag to indicate whether to skip commit during failures (exceptions or timeouts) + // The number of allowed successive commit exceptions and timeouts are controlled by the following two configs. public static final String SKIP_COMMIT_DURING_FAILURES_ENABLED = "task.commit.skip.commit.during.failures.enabled"; private static final boolean DEFAULT_SKIP_COMMIT_DURING_FAILURES_ENABLED = false; - // Maximum number of allowed commit exceptions. - // If the number of commit exceptions exceeds this limit, the task will be shut down. + // Maximum number of allowed successive commit exceptions. + // If the number of successive commit exceptions exceeds this limit, the task will be shut down. public static final String SKIP_COMMIT_EXCEPTION_MAX_LIMIT = "task.commit.skip.commit.exception.max.limit"; private static final int DEFAULT_SKIP_COMMIT_EXCEPTION_MAX_LIMIT = 5; - // Maximum number of allowed commit timeouts. - // If the number of commit timeout exceeds this limit, the task will be shut down. + // Maximum number of allowed successive commit timeouts. + // If the number of successive commit timeout exceeds this limit, the task will be shut down. public static final String SKIP_COMMIT_TIMEOUT_MAX_LIMIT = "task.commit.skip.commit.timeout.max.limit"; private static final int DEFAULT_SKIP_COMMIT_TIMEOUT_MAX_LIMIT = 2; diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 611bb60459..d3f22f07eb 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -567,7 +567,7 @@ class TaskInstance( commitTimeoutCounter.set(0) metrics.commitAsyncNs.update(System.nanoTime() - asyncStageStartNs) metrics.commitNs.update(System.nanoTime() - commitStartNs) - metrics.lastAsyncCommitNs.set(System.nanoTime()) + metrics.lastCommitAsyncTimestamp.set(System.nanoTime()) } } finally { // release the permit indicating that previous commit is complete. diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index 81fb4751c0..02674fb7eb 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -43,7 +43,7 @@ class TaskInstanceMetrics( val commitExceptions = newCounter("commit-exceptions") val commitNs = newTimer("commit-ns") val lastCommitNs = newGauge("last-commit-ns", 0L) - val lastAsyncCommitNs = newGauge("last-async-commit-ns", 0L) + val lastCommitAsyncTimestamp = newGauge("last-async-commit-timestamp", 0L) val commitSyncNs = newTimer("commit-sync-ns") val commitAsyncNs = newTimer("commit-async-ns") val snapshotNs = newTimer("snapshot-ns") From 603dd2d1a97d289fa7f163c39059e3605e79a3d3 Mon Sep 17 00:00:00 2001 From: yehaolan Date: Mon, 25 Nov 2024 10:12:01 -0800 Subject: [PATCH 6/6] print stacktrace --- .../main/scala/org/apache/samza/container/TaskInstance.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index d3f22f07eb..f5d13106f3 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -329,8 +329,8 @@ class TaskInstance( throw new SamzaException("Unrecoverable error during pending commit for taskName: %s. Exception Counter: %s" format (taskName, commitExceptionCounter.get()), commitException.get()) } else { - warn("Ignored the commit failure for taskName %s: %s. Exception Counter: %s." - format (taskName, commitException.get().getMessage, commitExceptionCounter.get())) + warn("Ignored the commit failure for taskName %s. Exception Counter: %s." + format (taskName, commitExceptionCounter.get()), commitException.get()) commitException.set(null) } }