From 4273254ea8a8429b32a19018629e2fcfde898fbf Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Mon, 12 Jan 2026 17:03:05 +0100 Subject: [PATCH 1/3] AER-4181 Fix taskmanager to handle RabbitMQ blocking on memory limit trigger When RabbitMQ runs out of memory it will temporary block tasks on the queue. The taskmanager will than get disconnected. It will handle this, but it didn't update this correctly in the load statistics. As a result the taskmanager reported it was busy while it wasn't actually. This will result in few to none new tasks being put on the queue. The fix here is a change in the QueueWatchDog. That class now does the actually watching and calls reset to all relevant processes when a watch dog situation appears. Added unit test to test reset conditions in the different classes. Also update to use the latest aerius parent root pom (to also get ride of junit startup issues in Eclipse). --- source/pom.xml | 2 +- .../nl/aerius/taskmanager/TaskManager.java | 13 ++++- .../nl/aerius/taskmanager/WorkerPool.java | 35 ++++-------- .../taskmanager/adaptor/WorkerProducer.java | 17 ++---- .../{ => domain}/QueueWatchDog.java | 57 ++++++++++++++++++- .../taskmanager/metrics/LoadMetric.java | 8 +++ .../metrics/PerformanceMetricsReporter.java | 18 ++++-- .../mq/RabbitMQWorkerEventProducer.java | 6 +- .../mq/RabbitMQWorkerProducer.java | 20 +++++-- .../mq/RabbitMQWorkerSizeProvider.java | 6 +- .../taskmanager/scheduler/TaskScheduler.java | 3 +- .../priorityqueue/PriorityQueueMap.java | 6 +- .../priorityqueue/PriorityTaskScheduler.java | 26 ++------- .../aerius/taskmanager/MockTaskScheduler.java | 5 ++ .../taskmanager/MockWorkerProducer.java | 2 +- .../aerius/taskmanager/QueueWatchDogTest.java | 52 +++++++++++++---- .../nl/aerius/taskmanager/WorkerPoolTest.java | 51 ++++++++--------- .../PerformanceMetricsReporterTest.java | 45 ++++++++++----- .../PriorityTaskSchedulerTest.java | 4 +- 19 files changed, 238 insertions(+), 138 deletions(-) rename source/taskmanager/src/main/java/nl/aerius/taskmanager/{ => domain}/QueueWatchDog.java (54%) diff --git a/source/pom.xml b/source/pom.xml index 2a86881..3be6b92 100644 --- a/source/pom.xml +++ b/source/pom.xml @@ -22,7 +22,7 @@ nl.aerius aerius-root-pom - 1.1.0 + 1.2.0 diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index b954bd7..758c392 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -37,6 +37,7 @@ import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.QueueConfig; +import nl.aerius.taskmanager.domain.QueueWatchDog; import nl.aerius.taskmanager.domain.TaskConsumer; import nl.aerius.taskmanager.domain.TaskQueue; import nl.aerius.taskmanager.domain.TaskSchedule; @@ -120,13 +121,23 @@ private class TaskScheduleBucket { public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedException { this.workerQueueName = queueConfig.queueName(); + final QueueWatchDog watchDog = new QueueWatchDog(workerQueueName); taskScheduler = schedulerFactory.createScheduler(queueConfig); workerProducer = factory.createWorkerProducer(queueConfig); final WorkerPool workerPool = new WorkerPool(workerQueueName, workerProducer, taskScheduler); final PerformanceMetricsReporter reporter = new PerformanceMetricsReporter(scheduledExecutorService, queueConfig.queueName(), OpenTelemetryMetrics.METER, workerPool); - workerProducer.addWorkerFinishedHandler(reporter); + + watchDog.addQueueWatchDogListener(workerPool); + watchDog.addQueueWatchDogListener(taskScheduler); + watchDog.addQueueWatchDogListener(reporter); + + workerProducer.addWorkerProducerHandler(reporter); + workerProducer.addWorkerProducerHandler(watchDog); + workerSizeObserverProxy.addObserver(workerQueueName, workerPool); + workerSizeObserverProxy.addObserver(workerQueueName, watchDog); + if (taskScheduler instanceof final WorkerSizeObserver wzo) { workerSizeObserverProxy.addObserver(workerQueueName, wzo); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 739d19d..32769a2 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -26,9 +26,10 @@ import org.slf4j.LoggerFactory; import nl.aerius.taskmanager.adaptor.WorkerProducer; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; +import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskRecord; import nl.aerius.taskmanager.domain.WorkerUpdateHandler; @@ -40,13 +41,12 @@ *

Reserved workers are workers that are waiting for a task to become available on the queue. *

Running workers are workers for that are busy running the task and are waiting for the task to finish. */ -class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMetrics { +class WorkerPool implements WorkerSizeObserver, WorkerProducerHandler, WorkerMetrics, QueueWatchDogListener { private static final Logger LOG = LoggerFactory.getLogger(WorkerPool.class); private final Semaphore freeWorkers = new Semaphore(0); private final Map runningWorkers = new ConcurrentHashMap<>(); - private final QueueWatchDog watchDog = new QueueWatchDog(); private int totalConfiguredWorkers; private final String workerQueueName; private final WorkerProducer wp; @@ -56,7 +56,7 @@ public WorkerPool(final String workerQueueName, final WorkerProducer wp, final W this.workerQueueName = workerQueueName; this.wp = wp; this.workerUpdateHandler = workerUpdateHandler; - wp.addWorkerFinishedHandler(this); + wp.addWorkerProducerHandler(this); } /** @@ -104,22 +104,12 @@ public void onWorkerFinished(final String messageId, final Map m releaseWorker(messageId); } - @Override - public void reset() { - synchronized (this) { - for (final Entry taskEntry : runningWorkers.entrySet()) { - releaseWorker(taskEntry.getKey(), taskEntry.getValue()); - } - updateNumberOfWorkers(0); - } - } - /** * Adds the worker to the pool of available workers and calls onWorkerReady. * * @param taskId Id of the task to release */ - public void releaseWorker(final String taskId) { + void releaseWorker(final String taskId) { releaseWorker(taskId, runningWorkers.get(taskId)); } @@ -129,7 +119,7 @@ public void releaseWorker(final String taskId) { * @param taskId Id of the task that was reported done and can be released * @param taskRecord the task is expected to be on. */ - public void releaseWorker(final String taskId, final TaskRecord taskRecord) { + void releaseWorker(final String taskId, final TaskRecord taskRecord) { if (taskRecord != null) { synchronized (this) { if (runningWorkers.containsKey(taskId)) { @@ -164,8 +154,6 @@ public void reserveWorker() { } } - - /** * Sets the number of workers which are actually available. This number should * be determined on the number of workers that are actually in operation. @@ -182,7 +170,6 @@ public void reserveWorker() { public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { synchronized (this) { updateNumberOfWorkers(numberOfWorkers); - checkDeadTasks(numberOfMessages); } } @@ -203,10 +190,12 @@ private void updateNumberOfWorkers(final int numberOfWorkers) { } } - private void checkDeadTasks(final int numberOfMessages) { - if (watchDog.isItDead(!runningWorkers.isEmpty(), numberOfMessages)) { - LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); - reset(); + @Override + public void reset() { + synchronized (this) { + for (final Entry taskEntry : runningWorkers.entrySet()) { + releaseWorker(taskEntry.getKey(), taskEntry.getValue()); + } } } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java index 2083eec..72d9d45 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java @@ -28,9 +28,9 @@ public interface WorkerProducer { /** * Sets the handler to call when a task is finished by a worker. - * @param workerFinishedHandler handler. + * @param workerProducerHandler handler. */ - void addWorkerFinishedHandler(WorkerFinishedHandler workerFinishedHandler); + void addWorkerProducerHandler(WorkerProducerHandler workerProducerHandler); /** * Starts the worker producer. @@ -39,7 +39,7 @@ public interface WorkerProducer { /** * Dispatch a message to the worker. - * @param message message to ispatch + * @param message message to dispatch * @throws IOException connection errors */ void dispatchMessage(final Message message) throws IOException; @@ -50,9 +50,9 @@ public interface WorkerProducer { void shutdown(); /** - * Interface for handling finished tasks from the communication layer send by the workers. + * Interface for called when the tasks is finished by the worker. */ - interface WorkerFinishedHandler { + interface WorkerProducerHandler { /** * Called when work dispatched to the worker. * @@ -70,13 +70,6 @@ default void onWorkDispatched(final String messageId, final Map * @param messageMetaData message meta data */ void onWorkerFinished(String messageId, Map messageMetaData); - - /** - * Instruct the handler to reset; that means all tasks that are waiting to be finished will never be marked as finished and therefor should - * be cleaned up. - */ - default void reset() { - } } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java similarity index 54% rename from source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java rename to source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java index 2bac89e..6db3676 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java @@ -14,24 +14,74 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see http://www.gnu.org/licenses/. */ -package nl.aerius.taskmanager; +package nl.aerius.taskmanager.domain; import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; /** * WatchDog to detect dead messages. Dead messages are messages once put on the queue, but those messages have gone. For example because * the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message * for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point. */ -public class QueueWatchDog { +public class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler { + + /** + * Interface for classes that need to be reset when the watch dog is triggered. + */ + public interface QueueWatchDogListener { + void reset(); + } + + private static final Logger LOG = LoggerFactory.getLogger(QueueWatchDog.class); /** * If for more than 10 minutes the problem remains the sign to reset is given. */ private static final long RESET_TIME_MINUTES = 10; + private final String workerQueueName; + private final List listeners = new ArrayList<>(); + private final Set runningTasks = new HashSet<>(); + private LocalDateTime firstProblem; + public QueueWatchDog(final String workerQueueName) { + this.workerQueueName = workerQueueName; + } + + public void addQueueWatchDogListener(final QueueWatchDogListener listener) { + listeners.add(listener); + } + + @Override + public void onWorkDispatched(final String messageId, final Map messageMetaData) { + runningTasks.add(messageId); + } + + @Override + public void onWorkerFinished(final String messageId, final Map messageMetaData) { + runningTasks.remove(messageId); + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + if (isItDead(!runningTasks.isEmpty(), numberOfMessages)) { + LOG.info("It looks like some tasks are zombies on {} worker queue, so all tasks currently in state running are released.", workerQueueName); + listeners.forEach(QueueWatchDogListener::reset); + } + } + /** * Check if the condition is met to do a reset. This is if for more than {@link #RESET_TIME_MINUTES} workers are running, * but no messages were on the queue it's time to free all tasks. @@ -39,7 +89,7 @@ public class QueueWatchDog { * @param numberOfMessages number of messages on queue * @return true if it's time to free all tasks */ - public boolean isItDead(final boolean runningWorkers, final int numberOfMessages) { + private boolean isItDead(final boolean runningWorkers, final int numberOfMessages) { boolean doReset = false; if (runningWorkers && numberOfMessages == 0) { if (firstProblem == null) { @@ -59,4 +109,5 @@ public boolean isItDead(final boolean runningWorkers, final int numberOfMessages protected LocalDateTime now() { return LocalDateTime.now(); } + } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java index 76c817a..2a2a837 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/LoadMetric.java @@ -62,6 +62,14 @@ public synchronized void register(final int deltaActiveWorkers, final int totalW runningWorkers += deltaActiveWorkers; } + /** + * Resets the metric state. Sets running workers to 0, and resets the average load time by calling process. + */ + public synchronized void reset() { + runningWorkers = 0; + process(); + } + /** * Calculates average duration over the last time frame since this method was called. Internals are reset in this method to a new measure point. * diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index b644d53..fe1a7e9 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -31,9 +31,10 @@ import io.opentelemetry.api.metrics.DoubleGauge; import io.opentelemetry.api.metrics.Meter; -import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerFinishedHandler; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; +import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.client.TaskMetrics; +import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; /** @@ -51,7 +52,7 @@ * * - Average load (in percentage) of all workers (of a certain type) together. */ -public class PerformanceMetricsReporter implements WorkerFinishedHandler { +public class PerformanceMetricsReporter implements WorkerProducerHandler, QueueWatchDogListener { private static final Logger LOG = LoggerFactory.getLogger(PerformanceMetricsReporter.class); @@ -149,6 +150,15 @@ public synchronized void onWorkerFinished(final String messageId, final Map e.getValue().process()); + dispatchedWorkerMetrics.process(); + // work metrics not needed to be reset because they are about work already done. + loadMetrics.reset(); + } + private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { return new DurationMetric(OpenTelemetryMetrics.queueAttributes(queueGroupName, taskMetrics.queueName())); } @@ -172,13 +182,13 @@ private static void metrics(final String prefixText, final Map 0) { LOG.debug("{} for {}: {} ms/task (#tasks: {})", prefixText, name, metric.avgDuration(), count); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java index cbba798..e177c41 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java @@ -127,9 +127,7 @@ private void updateMetrics() { } private void publish(final String queueName, final int size, final int utilisation) throws IOException, TimeoutException { - final Channel channel = factory.getConnection().createChannel(); - - try { + try (final Channel channel = factory.getConnection().createChannel()) { channel.exchangeDeclare(AERIUS_EVENT_EXCHANGE, EXCHANGE_TYPE); final Map headers = new HashMap<>(); @@ -139,8 +137,6 @@ private void publish(final String queueName, final int size, final int utilisati final BasicProperties props = new BasicProperties().builder().headers(headers).build(); channel.basicPublish(AERIUS_EVENT_EXCHANGE, "", props, null); debugLogState(queueName, size, utilisation); - } finally { - channel.close(); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java index 33a21ce..8cb1b2a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java @@ -57,7 +57,7 @@ class RabbitMQWorkerProducer implements WorkerProducer { private final String workerQueueName; private final boolean durable; private final RabbitMQQueueType queueType; - private final List workerFinishedHandlers = new ArrayList<>(); + private final List workerProducerHandlers = new ArrayList<>(); private Channel channel; private boolean isShutdown; @@ -72,8 +72,8 @@ public RabbitMQWorkerProducer(final BrokerConnectionFactory factory, final Queue } @Override - public void addWorkerFinishedHandler(final WorkerFinishedHandler workerFinishedHandler) { - this.workerFinishedHandlers.add(workerFinishedHandler); + public void addWorkerProducerHandler(final WorkerProducerHandler workerProducerHandler) { + this.workerProducerHandlers.add(workerProducerHandler); } @Override @@ -99,7 +99,15 @@ public void dispatchMessage(final Message message) throws IOException { forwardBuilder.headers(headers); final BasicProperties forwardProperties = forwardBuilder.deliveryMode(2).build(); channel.basicPublish("", workerQueueName, forwardProperties, rabbitMQMessage.getBody()); - workerFinishedHandlers.forEach(h -> h.onWorkDispatched(message.getMessageId(), headers)); + workerProducerHandlers.forEach(h -> handleWorkDispatched(message, headers, h)); + } + + private static void handleWorkDispatched(final Message message, final Map headers, final WorkerProducerHandler handler) { + try { + handler.onWorkDispatched(message.getMessageId(), headers); + } catch (final RuntimeException e) { + LOG.error("Runtime exception during onWorkDispatched of {}", handler.getClass(), e); + } } private synchronized void ensureChanne() throws IOException { @@ -182,13 +190,13 @@ private boolean startReplyConsumer(final Connection connection) throws IOExcepti replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) { @Override public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) { - workerFinishedHandlers.forEach(h -> handleWorkFinished(h, properties)); + workerProducerHandlers.forEach(h -> handleWorkFinished(h, properties)); } }); return true; } - private static void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) { + private static void handleWorkFinished(final WorkerProducerHandler handler, final BasicProperties properties) { try { handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders()); } catch (final RuntimeException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 520317a..0aa5a16 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -171,15 +171,15 @@ private void updateWorkerQueueState(final String queueName) { } private static class WorkerSizeObserverComposite implements WorkerSizeObserver { - private final List list = new ArrayList<>(); + private final List observers = new ArrayList<>(); public void add(final WorkerSizeObserver observer) { - list.add(observer); + observers.add(observer); } @Override public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - for (final WorkerSizeObserver observer : list) { + for (final WorkerSizeObserver observer : observers) { try { observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); } catch (final RuntimeException e) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java index 0fd70b6..d219102 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java @@ -17,6 +17,7 @@ package nl.aerius.taskmanager.scheduler; import nl.aerius.taskmanager.domain.QueueConfig; +import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskQueue; import nl.aerius.taskmanager.domain.TaskSchedule; @@ -26,7 +27,7 @@ * Interface for the scheduling algorithm. The implementation should maintain an internal list of all tasks added and return the task to be processed * in {@link #getNextTask()} based on whatever priority algorithm the scheduler implements. */ -public interface TaskScheduler extends WorkerUpdateHandler { +public interface TaskScheduler extends WorkerUpdateHandler, QueueWatchDogListener { /** * Adds a Task to the scheduler to being processed. The scheduler will return this task in {@link #getNextTask()} diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index a2ab392..435cb7b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -70,7 +70,11 @@ public void incrementOnWorker(final TaskRecord taskRecord) { tasksOnWorkersPerQueue.computeIfAbsent(key(taskRecord), k -> new AtomicInteger()).incrementAndGet(); } - public int onWorkerTotal(final String queueName) { + public int onWorkerTotal() { + return tasksOnWorkersPerQueue.entrySet().stream().mapToInt(e -> e.getValue().get()).sum(); + } + + public int onWorkerByQueue(final String queueName) { return tasksOnWorkersPerQueue.entrySet().stream() .filter(e -> keyMapper.queueName(e.getKey()).equals(queueName)) .mapToInt(e -> e.getValue().get()) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 152e8d3..2ba755a 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -27,8 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import nl.aerius.taskmanager.QueueWatchDog; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskRecord; @@ -41,12 +39,11 @@ * like with other priorities. * */ -class PriorityTaskScheduler implements TaskScheduler, Comparator, WorkerSizeObserver { +class PriorityTaskScheduler implements TaskScheduler, Comparator { private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskScheduler.class); private static final int NEXT_TASK_MAX_WAIT_TIME_SECONDS = 120; - private final QueueWatchDog watchDog = new QueueWatchDog(); private final PriorityTaskSchedulerMetrics metrics = new PriorityTaskSchedulerMetrics(); private final Queue queue; private final PriorityQueueMap priorityQueueMap; @@ -54,7 +51,6 @@ class PriorityTaskScheduler implements TaskScheduler, Compara private final Condition nextTaskCondition = lock.newCondition(); private final String workerQueueName; private int numberOfWorkers; - private int tasksOnWorkers; /** * Constructs scheduler for given configuration. @@ -119,7 +115,6 @@ public Task getNextTask() throws InterruptedException { } private void obtainTask() { - tasksOnWorkers++; final Task task = queue.poll(); priorityQueueMap.incrementOnWorker(task.getTaskRecord()); @@ -146,14 +141,14 @@ private boolean isTaskNext(final TaskRecord taskRecord) { if (!taskNext) { LOG.trace("Task for queue '{}.{}' not scheduled: queueConfiguration:{}, numberOfWorkers:{}, tasksOnWorkers:{}, tasksForQueue:{}", - workerQueueName, taskRecord, priorityQueueMap.get(taskRecord), numberOfWorkers, tasksOnWorkers, + workerQueueName, taskRecord, priorityQueueMap.get(taskRecord), numberOfWorkers, priorityQueueMap.onWorkerTotal(), priorityQueueMap.onWorker(taskRecord)); } return taskNext; } private int getFreeWorkers() { - return numberOfWorkers - tasksOnWorkers; + return numberOfWorkers - priorityQueueMap.onWorkerTotal(); } private boolean hasCapacityRemaining(final TaskRecord taskRecord) { @@ -166,7 +161,6 @@ public void onTaskFinished(final TaskRecord taskRecord) { lock.lock(); try { priorityQueueMap.decrementOnWorker(taskRecord); - tasksOnWorkers--; signalNextTask(); // clean up queue if it has been removed. if (!priorityQueueMap.containsKey(taskRecord.queueName())) { @@ -198,7 +192,7 @@ public void updateQueue(final PriorityTaskQueue queue) { try { final String queueName = queue.getQueueName(); if (!priorityQueueMap.containsKey(queueName)) { - metrics.addMetric(() -> priorityQueueMap.onWorkerTotal(queueName), workerQueueName, queueName); + metrics.addMetric(() -> priorityQueueMap.onWorkerByQueue(queueName), workerQueueName, queueName); } final PriorityTaskQueue old = priorityQueueMap.put(queueName, queue); @@ -293,22 +287,14 @@ private void signalNextTask() { } } - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - if (watchDog.isItDead(tasksOnWorkers > 0, numberOfMessages)) { - LOG.info("It looks like some tasks are zombies on {} worker queue in priority scheduler, so all tasks currently in state running are released.", workerQueueName); - reset(); - } - } - /** * Resets the internal state. Called in case a difference was detected that internally it still has messages as being on the queue, * while the queue is empty. */ - void reset() { + @Override + public void reset() { lock.lock(); try { - tasksOnWorkers = 0; priorityQueueMap.reset(); signalNextTask(); } finally { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java index da85353..4ffff18 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTaskScheduler.java @@ -83,4 +83,9 @@ public PriorityTaskSchedulerFileHandler getHandler() { } } + @Override + public void reset() { + // No-op + } + } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java index e90ef76..43b6846 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockWorkerProducer.java @@ -31,7 +31,7 @@ public class MockWorkerProducer implements WorkerProducer { private boolean shutdownExceptionOnForward = false; @Override - public void addWorkerFinishedHandler(final WorkerFinishedHandler workerFinishedHandler) { + public void addWorkerProducerHandler(final WorkerProducerHandler workerFinishedHandler) { // no-op } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index e57682e..0e79b50 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -16,33 +16,61 @@ */ package nl.aerius.taskmanager; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.time.LocalDateTime; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import nl.aerius.taskmanager.domain.QueueWatchDog; +import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; /** * Test class for {@link QueueWatchDog}. */ class QueueWatchDogTest { - @Test - void testIsItDead() { + private static final String QUEUE_NAME = "queue1"; + + @ParameterizedTest + @MethodSource("isDeadTests") + void testIsItDead(final int runningWorkers, final int finishedWorkers, final int numberOfMessages, final int expected, final String message) { final AtomicReference now = new AtomicReference<>(LocalDateTime.now()); - final QueueWatchDog qwd = new QueueWatchDog() { + final QueueWatchDog qwd = new QueueWatchDog(QUEUE_NAME) { @Override protected LocalDateTime now() { return now.get(); } }; - assertFalse(qwd.isItDead(false, 0), "No running workers, with no messages, no problem"); - assertFalse(qwd.isItDead(false, 10), "No running workers, no problem"); - assertFalse(qwd.isItDead(true, 10), "Running workers, with messages, no problem"); - assertFalse(qwd.isItDead(true, 0), "Running workers, with no messages, possible problem, we just wait"); - now.set(now.get().plusMinutes(20)); //fast forward 20 minutes. - assertTrue(qwd.isItDead(true, 0), "Running workers, with no messages, after specified time; yes reset"); + final QueueWatchDogListener listener = mock(QueueWatchDogListener.class); + + qwd.addQueueWatchDogListener(listener); + IntStream.range(0, runningWorkers).forEach(i -> qwd.onWorkDispatched(String.valueOf(i), null)); + IntStream.range(0, finishedWorkers).forEach(i -> qwd.onWorkerFinished(String.valueOf(i), null)); + + qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + // reset should never trigger the first time the problem was reported. + verify(listener, never()).reset(); + + // Fast forward 20 minutes to trigger reset if there is a problem. + now.set(now.get().plusMinutes(20)); + qwd.onNumberOfWorkersUpdate(0, numberOfMessages); + verify(listener, times(expected)).reset(); + } + + private static List isDeadTests() { + return List.of( + Arguments.of(5, 5, 0, 0, "No running workers, with no messages, no problem"), + Arguments.of(5, 5, 10, 0, "No running workers, 10 messages, no problem"), + Arguments.of(5, 1, 10, 0, "Running workers, with messages, no problem"), + Arguments.of(5, 1, 0, 1, "Running workers, with no messages, after specified time; yes reset")); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index 3cdf948..562cdbf 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -41,7 +40,6 @@ import nl.aerius.taskmanager.domain.TaskConsumer; import nl.aerius.taskmanager.domain.WorkerUpdateHandler; import nl.aerius.taskmanager.exception.NoFreeWorkersException; -import nl.aerius.taskmanager.exception.TaskAlreadySentException; import nl.aerius.taskmanager.mq.RabbitMQMessage; /** @@ -83,8 +81,7 @@ void testWorkerPoolSizing() throws IOException { assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); - final Task task = createTask(); - workerPool.sendTaskToWorker(task); + final Task task = createAndSendTaskToWorker(); assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); workerPool.releaseWorker(task.getId()); assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); @@ -92,19 +89,16 @@ void testWorkerPoolSizing() throws IOException { @Test void testNoFreeWorkers() { - assertThrows(NoFreeWorkersException.class, () -> workerPool.sendTaskToWorker(createTask()), + assertThrows(NoFreeWorkersException.class, () -> createAndSendTaskToWorker(), "Expected NoFreeWorkersException when trying to send a task while there are no free workers."); } @Test void testWorkerPoolScaleDown() throws IOException { workerPool.onNumberOfWorkersUpdate(5, 0); - final Task task1 = createTask(); - workerPool.sendTaskToWorker(task1); - final Task task2 = createTask(); - workerPool.sendTaskToWorker(task2); - final Task task3 = createTask(); - workerPool.sendTaskToWorker(task3); + final Task task1 = createAndSendTaskToWorker(); + final Task task2 = createAndSendTaskToWorker(); + final Task task3 = createAndSendTaskToWorker(); assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); workerPool.onNumberOfWorkersUpdate(1, 0); assertEquals(3, workerPool.getWorkerSize(), @@ -118,11 +112,11 @@ void testWorkerPoolScaleDown() throws IOException { assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); } + @Test void testReleaseTaskTwice() throws IOException { workerPool.onNumberOfWorkersUpdate(2, 0); - final Task task1 = createTask(); - workerPool.sendTaskToWorker(task1); + final Task task1 = createAndSendTaskToWorker(); final String id = task1.getId(); workerPool.releaseWorker(id); final int currentWorkerSize1 = workerPool.getReportedWorkerSize(); @@ -132,26 +126,27 @@ void testReleaseTaskTwice() throws IOException { assertEquals(2, workerPool.getReportedWorkerSize(), "Check if task worker size not decreased to much"); } - @Disabled("Exception is not thrown anymore, so test ignored for now") - @Test - void testSendSameTaskTwice() throws IOException { - workerPool.onNumberOfWorkersUpdate(3, 0); - final Task task1 = createTask(); - workerPool.sendTaskToWorker(task1); - - assertThrows(TaskAlreadySentException.class, () -> workerPool.sendTaskToWorker(task1), - "Expected TaskAlreadySentException when a message is send a second time."); - } - @Test void testMessageDeliverd() throws IOException { workerPool.onNumberOfWorkersUpdate(1, 0); - final Task task1 = createTask(); - workerPool.sendTaskToWorker(task1); + createAndSendTaskToWorker(); assertNotSame(0, message.getDeliveryTag(), "Check if message is delivered"); } - private Task createTask() { - return new MockTask(taskConsumer); + @Test + void testReset() throws IOException { + workerPool.onNumberOfWorkersUpdate(5, 0); + createAndSendTaskToWorker(); + createAndSendTaskToWorker(); + assertEquals(2, workerPool.getRunningWorkerSize(), "Should report 2 workers running."); + workerPool.reset(); + assertEquals(0, workerPool.getRunningWorkerSize(), "Should report no workers running after internal state reset."); + } + + private Task createAndSendTaskToWorker() throws IOException { + final Task task = new MockTask(taskConsumer); + + workerPool.sendTaskToWorker(task); + return task; } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index d1a0bdf..16df98e 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,6 +32,7 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -87,35 +87,34 @@ void beforeEach() { @Test void testOnWorkDispatched() { - doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); - verify(mockedGauges.get("aer.taskmanager.dispatched")).set(eq(2.0), any()); - verify(mockedGauges.get("aer.taskmanager.dispatched.wait")).set(durationCaptor.capture(), any()); - verify(mockedGauges.get("aer.taskmanager.dispatched.queue")).set(eq(2.0), any()); - verify(mockedGauges.get("aer.taskmanager.dispatched.queue.wait")).set(durationCaptor.capture(), any()); - durationCaptor.getAllValues() - .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + assertGaugeCalls("dispatched", "wait", 2.0, v -> v > 99.0); } @Test void testOnWorkerFinished() { - lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); + lenient().doReturn(10).when(workMetrics).getReportedWorkerSize(); methodCaptor.getValue().run(); - verify(mockedGauges.get("aer.taskmanager.work")).set(eq(2.0), any()); - verify(mockedGauges.get("aer.taskmanager.work.duration")).set(durationCaptor.capture(), any()); - verify(mockedGauges.get("aer.taskmanager.work.queue")).set(eq(2.0), any()); - verify(mockedGauges.get("aer.taskmanager.work.queue.duration")).set(durationCaptor.capture(), any()); - durationCaptor.getAllValues() - .forEach(v -> assertTrue(v > 99.0, "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + assertGaugeCalls("work", "duration", 2.0, v -> v > 99.0); // getReportedWorkerSize should only be called on tasks also dispatched, so only for task "1" verify(workMetrics, times(2)).getReportedWorkerSize(); } + private void assertGaugeCalls(final String label, final String type, final double expected, final Predicate duration) { + verify(mockedGauges.get("aer.taskmanager." + label)).set(eq(expected), any()); + verify(mockedGauges.get("aer.taskmanager.%s.%s".formatted(label, type))).set(durationCaptor.capture(), any()); + verify(mockedGauges.get("aer.taskmanager.%s.queue".formatted(label))).set(eq(expected), any()); + verify(mockedGauges.get("aer.taskmanager.%s.queue.%s".formatted(label, type))).set(durationCaptor.capture(), any()); + durationCaptor.getAllValues() + .forEach(v -> assertTrue(duration.test(v), "Duration should report at least 100.0 as it is the offset of the start time, but was " + v)); + } + @Test void testWorkLoad() throws InterruptedException { doReturn(4).when(workMetrics).getReportedWorkerSize(); @@ -128,6 +127,22 @@ void testWorkLoad() throws InterruptedException { assertEquals(50.0, durationCaptor.getAllValues().get(1), "Expected workload of 50%"); } + @Test + void testReset() throws InterruptedException { + doReturn(4).when(workMetrics).getReportedWorkerSize(); + reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); + reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); + Thread.sleep(2); // Add a bit of delay to get some time frame between updates.. + reporter.reset(); + methodCaptor.getValue().run(); + // Verify dispatched metrics have been reset. + assertGaugeCalls("dispatched", "wait", 0.0, v -> v == 0.0); + + // Verify load metric have been reset. + verify(mockedGauges.get("aer.taskmanager.work.load"), times(1)).set(durationCaptor.capture(), any()); + assertEquals(0.0, durationCaptor.getAllValues().get(0), 1E-5, "Expected to have no workload anymore"); + } + private Map createMap(final String queueName, final long duration) { return new TaskMetrics().duration(duration).queueName(queueName).start(System.currentTimeMillis() - 100).build(); } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java index a89ffea..227f6d0 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java @@ -63,7 +63,7 @@ class PriorityTaskSchedulerTest { private static final String QUEUE2 = "queue2"; private static final String QUEUE3 = "queue3"; private static final double TEST_CAPACITY = 0.7; - private static final PriorityTaskSchedulerFactory factory = new PriorityTaskSchedulerFactory(); + private static final PriorityTaskSchedulerFactory FACTORY = new PriorityTaskSchedulerFactory(); private TaskConsumer taskConsumer1; private TaskConsumer taskConsumer2; @@ -86,7 +86,7 @@ void setUp() throws IOException { configuration.getQueues().add(tc1); configuration.getQueues().add(tc2); configuration.getQueues().add(tc3); - scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, true, null)); + scheduler = (PriorityTaskScheduler) FACTORY.createScheduler(new QueueConfig(QUEUE1, false, true, null)); configuration.getQueues().forEach(scheduler::updateQueue); task1 = createTask(taskConsumer1, "1"); task2a = createTask(taskConsumer2, "2a"); From 5ada1c18146335df5b4421b0478862bd66fb879f Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Wed, 14 Jan 2026 17:14:28 +0100 Subject: [PATCH 2/3] Review comments --- .../{domain => }/QueueWatchDog.java | 12 +++------- .../nl/aerius/taskmanager/TaskManager.java | 1 - .../nl/aerius/taskmanager/WorkerPool.java | 2 +- .../taskmanager/adaptor/WorkerProducer.java | 2 +- .../domain/QueueWatchDogListener.java | 24 +++++++++++++++++++ .../metrics/PerformanceMetricsReporter.java | 2 +- .../taskmanager/scheduler/TaskScheduler.java | 2 +- .../aerius/taskmanager/QueueWatchDogTest.java | 3 +-- 8 files changed, 32 insertions(+), 16 deletions(-) rename source/taskmanager/src/main/java/nl/aerius/taskmanager/{domain => }/QueueWatchDog.java (93%) create mode 100644 source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java similarity index 93% rename from source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java rename to source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 6db3676..2dd111c 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -14,7 +14,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see http://www.gnu.org/licenses/. */ -package nl.aerius.taskmanager.domain; +package nl.aerius.taskmanager; import java.time.LocalDateTime; import java.util.ArrayList; @@ -28,20 +28,14 @@ import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; /** * WatchDog to detect dead messages. Dead messages are messages once put on the queue, but those messages have gone. For example because * the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message * for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point. */ -public class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler { - - /** - * Interface for classes that need to be reset when the watch dog is triggered. - */ - public interface QueueWatchDogListener { - void reset(); - } +class QueueWatchDog implements WorkerSizeObserver, WorkerProducerHandler { private static final Logger LOG = LoggerFactory.getLogger(QueueWatchDog.class); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 758c392..c92b1c7 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -37,7 +37,6 @@ import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.QueueConfig; -import nl.aerius.taskmanager.domain.QueueWatchDog; import nl.aerius.taskmanager.domain.TaskConsumer; import nl.aerius.taskmanager.domain.TaskQueue; import nl.aerius.taskmanager.domain.TaskSchedule; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 32769a2..d489735 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -29,7 +29,7 @@ import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; -import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskRecord; import nl.aerius.taskmanager.domain.WorkerUpdateHandler; diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java index 72d9d45..34c67ab 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java @@ -50,7 +50,7 @@ public interface WorkerProducer { void shutdown(); /** - * Interface for called when the tasks is finished by the worker. + * Interface called when the tasks is finished by the worker. */ interface WorkerProducerHandler { /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java new file mode 100644 index 0000000..844a5b4 --- /dev/null +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java @@ -0,0 +1,24 @@ +/* + * Copyright the State of the Netherlands + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package nl.aerius.taskmanager.domain; + +/** + * Interface for classes that need to be reset when the watch dog is triggered. + */ +public interface QueueWatchDogListener { + void reset(); +} \ No newline at end of file diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index fe1a7e9..e13338d 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -34,7 +34,7 @@ import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerMetrics; import nl.aerius.taskmanager.adaptor.WorkerProducer.WorkerProducerHandler; import nl.aerius.taskmanager.client.TaskMetrics; -import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.metrics.DurationMetric.DurationMetricValue; /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java index d219102..73ace23 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/TaskScheduler.java @@ -17,7 +17,7 @@ package nl.aerius.taskmanager.scheduler; import nl.aerius.taskmanager.domain.QueueConfig; -import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskQueue; import nl.aerius.taskmanager.domain.TaskSchedule; diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java index 0e79b50..1974a67 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/QueueWatchDogTest.java @@ -30,8 +30,7 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import nl.aerius.taskmanager.domain.QueueWatchDog; -import nl.aerius.taskmanager.domain.QueueWatchDog.QueueWatchDogListener; +import nl.aerius.taskmanager.domain.QueueWatchDogListener; /** * Test class for {@link QueueWatchDog}. From 537ef86c8c7741c897635c4fe1d2a7e80191a87d Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Thu, 15 Jan 2026 11:10:52 +0100 Subject: [PATCH 3/3] Newline --- .../nl/aerius/taskmanager/domain/QueueWatchDogListener.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java index 844a5b4..1514cf0 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/domain/QueueWatchDogListener.java @@ -21,4 +21,4 @@ */ public interface QueueWatchDogListener { void reset(); -} \ No newline at end of file +}