From 155e8975c49f723c7374933ed851761911a94175 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Tue, 7 Oct 2025 21:07:40 +0200 Subject: [PATCH 1/2] AER-4020 fix for misscounting the load metric When the taskmanager is restarted and there were still tasks on the queue the PerformanceMetricsReport will get these tasks and compute them even if the taskmanager doesn't know the metric. Because after restart it has no information on the tasks that were still on the worker queue. Therefore it should keep track on what it has put on the queue and only call register on tasks it knows and ignore tasks that it doesn't know about. It can compute the duration because that is based on information in the task which it receives from the queue and not from information in the taskmanager itself. Added safeguard to LoadMetric that it should never go negative. If the calculation would go to negative it means there were more tasks completed than have been accounted for. By making sure it won't go negative it will correct itself for when task would be reported that were not known (for example after restart). Technically with the other change in the commit it should never get to negative as tasks unknown would not trigger register call. --- .../aerius/taskmanager/metrics/LoadMetric.java | 2 +- .../metrics/PerformanceMetricsReporter.java | 10 +++++++++- .../metrics/PerformanceMetricsReporterTest.java | 16 ++++++++++------ 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 76c817a..8a80732 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -59,7 +59,7 @@ public synchronized void register(final int deltaActiveWorkers, final int totalW totalLoad += delta * (totalWorkers > 0 ? (runningWorkers / (double) totalWorkers) : 0); totalMeasureTime += delta; last = newLast; - runningWorkers += deltaActiveWorkers; + runningWorkers = Math.max(0, runningWorkers + deltaActiveWorkers); } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index cb2325a..b644d53 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -17,8 +17,10 @@ package nl.aerius.taskmanager.metrics; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,6 +82,9 @@ public class PerformanceMetricsReporter implements WorkerFinishedHandler { private final DoubleGauge loadGauge; private final Attributes workerAttributes; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue + // as it doesn't have any metrics on it anymore. + private final Set dispatchedTasks = new HashSet<>(); public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, final WorkerMetrics workerMetrics) { @@ -125,6 +130,7 @@ private DoubleGauge createGauge(final String name, final String description) { @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { + dispatchedTasks.add(messageId); final TaskMetrics taskMetrics = new TaskMetrics(messageMetaData); taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); @@ -138,7 +144,9 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + if (dispatchedTasks.remove(messageId)) { + loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + } } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index d32b18e..d1a0bdf 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -61,11 +62,11 @@ class PerformanceMetricsReporterTest { private final Map mockedGauges = new HashMap<>(); - @Mock Meter mockedMeter; - @Mock WorkerMetrics workMetrics; - @Mock ScheduledExecutorService scheduledExecutorService; - @Captor ArgumentCaptor methodCaptor; - @Captor ArgumentCaptor durationCaptor; + private @Mock Meter mockedMeter; + private @Mock WorkerMetrics workMetrics; + private @Mock ScheduledExecutorService scheduledExecutorService; + private @Captor ArgumentCaptor methodCaptor; + private @Captor ArgumentCaptor durationCaptor; private PerformanceMetricsReporter reporter; @@ -100,7 +101,8 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - doReturn(10).when(workMetrics).getReportedWorkerSize(); + lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); + reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -110,6 +112,8 @@ void testOnWorkerFinished() { verify(mockedGauges.get("aer.taskmanager.work.queue.duration")).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + // getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1" + verify(workMetrics, times(2)).getReportedWorkerSize(); } @Test From d7fe5c841a39feff2427335d4a617bcfd55445a0 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Fri, 10 Oct 2025 10:10:03 +0200 Subject: [PATCH 2/2] Review comment --- .../src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 8a80732..76c817a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -59,7 +59,7 @@ public synchronized void register(final int deltaActiveWorkers, final int totalW totalLoad += delta * (totalWorkers > 0 ? (runningWorkers / (double) totalWorkers) : 0); totalMeasureTime += delta; last = newLast; - runningWorkers = Math.max(0, runningWorkers + deltaActiveWorkers); + runningWorkers += deltaActiveWorkers; } /**