diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index cb2325a..b644d53 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -17,8 +17,10 @@ package nl.aerius.taskmanager.metrics; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -80,6 +82,9 @@ public class PerformanceMetricsReporter implements WorkerFinishedHandler { private final DoubleGauge loadGauge; private final Attributes workerAttributes; + // Keep track of dispatched tasks, because when taskmanager restarts it should not register tasks already on the queue + // as it doesn't have any metrics on it anymore. + private final Set dispatchedTasks = new HashSet<>(); public PerformanceMetricsReporter(final ScheduledExecutorService newScheduledThreadPool, final String queueGroupName, final Meter meter, final WorkerMetrics workerMetrics) { @@ -125,6 +130,7 @@ private DoubleGauge createGauge(final String name, final String description) { @Override public void onWorkDispatched(final String messageId, final Map messageMetaData) { + dispatchedTasks.add(messageId); final TaskMetrics taskMetrics = new TaskMetrics(messageMetaData); taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); @@ -138,7 +144,9 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + if (dispatchedTasks.remove(messageId)) { + loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); + } } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index d32b18e..d1a0bdf 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -61,11 +62,11 @@ class PerformanceMetricsReporterTest { private final Map mockedGauges = new HashMap<>(); - @Mock Meter mockedMeter; - @Mock WorkerMetrics workMetrics; - @Mock ScheduledExecutorService scheduledExecutorService; - @Captor ArgumentCaptor methodCaptor; - @Captor ArgumentCaptor durationCaptor; + private @Mock Meter mockedMeter; + private @Mock WorkerMetrics workMetrics; + private @Mock ScheduledExecutorService scheduledExecutorService; + private @Captor ArgumentCaptor methodCaptor; + private @Captor ArgumentCaptor durationCaptor; private PerformanceMetricsReporter reporter; @@ -100,7 +101,8 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - doReturn(10).when(workMetrics).getReportedWorkerSize(); + lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); + reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -110,6 +112,8 @@ void testOnWorkerFinished() { verify(mockedGauges.get("aer.taskmanager.work.queue.duration")).set(durationCaptor.capture(), any()); durationCaptor.getAllValues() .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + // getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1" + verify(workMetrics, times(2)).getReportedWorkerSize(); } @Test