From 64fda7f23108d31de5e24e6647573c9a5145e405 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Thu, 10 Jul 2025 19:56:54 +0200 Subject: [PATCH 1/2] AER-3864 Improved handling changes in number of workers available To improve the handling of changes in number of workers running the following improvements are made: 1) Changed the event handling of changes in number of workers triggered by RabbitMQ channel event queue watchter. Instead of directly updating the number of workers it will trigger the RabbitMQ api call. The api will give the actual number of workers available. Because the event is triggered for every worker added/removed a timer is used to delay the api call. When a new event is given within 15 seconds, the timer is reset. The actual recurring api call is included in the timer process, so it will not run also when the timer is running from an event or the other way around. This time mechanism assures an update id done within 15 seconds after a change. This seems appropriate. (The alternative is to just do the call each 15 seconds, and remove the whole event triggering). 2) The changes in number of workers is also propagated to the priority scheduler so it can also check it's internal state. 3) The workerFinishedHandlers are guarded from RuntimeExceptions, which could result in not all handlers to be called when an exception happens in one of the handlers. 4) In workerpool to check if a freeworker should be released when a task is finished it doesn't include the number of freeworkers. Because if no change in number of workers this test would always test on equals size(configured == free + running). Unless the number of free workers is just a bit out of sync due to concurrency. While number of running workers is always in sync because when called it's when a task is finished. So if all workers were running it would valid the == condition. 5) To free the workers when the number of workers is reduced no waiting acquire is used. It could block if the data isn't completely in sync. By removing the potential block this is avoided. Even when this would mean less workers are marked as free than are actually free this would recover from when running tasks a finishing and than won't release a free worker, this will than recover itself to the correct condition. It's unclear if it every actually blocked. But just removing the block makes the code more robust. 6) The timeout values on unit tests have been fixed. The default unit is seconds. Therefore all timeout that had no unit actually had a timeout that was 1000 time seconds. The annotations have been fixed and have a explicit time unit added. --- .../nl/aerius/taskmanager/QueueWatchDog.java | 2 +- .../nl/aerius/taskmanager/TaskManager.java | 4 + .../nl/aerius/taskmanager/WorkerPool.java | 47 ++-------- .../aerius/taskmanager/WorkerPoolMetrics.java | 4 +- .../taskmanager/adaptor/WorkerProducer.java | 4 +- .../adaptor/WorkerSizeObserver.java | 7 -- .../adaptor/WorkerSizeProviderProxy.java | 7 +- .../metrics/PerformanceMetricsReporter.java | 4 +- .../mq/RabbitMQChannelQueueEventsWatcher.java | 17 +--- .../mq/RabbitMQWorkerEventProducer.java | 2 +- .../mq/RabbitMQWorkerProducer.java | 10 +- .../mq/RabbitMQWorkerSizeProvider.java | 92 ++++++++++++++++--- .../priorityqueue/PriorityQueueMap.java | 4 + .../priorityqueue/PriorityTaskScheduler.java | 43 +++++++-- .../taskmanager/ConfigurationManagerTest.java | 3 +- .../java/nl/aerius/taskmanager/MockTask.java | 5 +- .../taskmanager/TaskDispatcherTest.java | 12 +-- .../nl/aerius/taskmanager/WorkerPoolTest.java | 28 +++--- .../PerformanceMetricsReporterTest.java | 6 +- .../taskmanager/mq/AbstractRabbitMQTest.java | 3 +- ...RabbitMQChannelQueueEventsWatcherTest.java | 29 ++---- .../mq/RabbitMQMessageHandlerTest.java | 2 +- .../mq/RabbitMQQueueMonitorTest.java | 12 +-- .../mq/RabbitMQWorkerProducerTest.java | 2 +- .../mq/RabbitMQWorkerSizeProviderTest.java | 35 ++++++- .../PriorityTaskSchedulerTest.java | 27 ++++-- 26 files changed, 243 insertions(+), 168 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 37723c0..2bac89e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -23,7 +23,7 @@ * the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message * for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point. */ -class QueueWatchDog { +public class QueueWatchDog { /** * If for more than 10 minutes the problem remains the sign to reset is given. diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 5187f53..b954bd7 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -34,6 +34,7 @@ import nl.aerius.taskmanager.adaptor.AdaptorFactory; import nl.aerius.taskmanager.adaptor.WorkerProducer; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.QueueConfig; import nl.aerius.taskmanager.domain.TaskConsumer; @@ -126,6 +127,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep OpenTelemetryMetrics.METER, workerPool); workerProducer.addWorkerFinishedHandler(reporter); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); + if (taskScheduler instanceof final WorkerSizeObserver wzo) { + workerSizeObserverProxy.addObserver(workerQueueName, wzo); + } workerProducer.start(); // Set up metrics WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 1bba8cb..c977852 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -19,8 +19,6 @@ import java.io.IOException; import java.util.Map; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; @@ -54,10 +52,6 @@ class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMet private final WorkerProducer wp; private final WorkerUpdateHandler workerUpdateHandler; - private final Timer deltaTimer = new Timer(); - private TimerTask deltaTimerTask; - private int deltaCounter; - public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) { this.workerQueueName = workerQueueName; this.wp = wp; @@ -89,15 +83,13 @@ public void sendTaskToWorker(final Task task) throws IOException { public int getWorkerSize() { synchronized (this) { - return totalConfiguredWorkers; + return freeWorkers.availablePermits() + runningWorkers.size(); } } @Override - public int getCurrentWorkerSize() { - synchronized (this) { - return freeWorkers.availablePermits() + runningWorkers.size(); - } + public int getReportedWorkerSize() { + return totalConfiguredWorkers; } @Override @@ -143,7 +135,7 @@ public void releaseWorker(final String taskId, final TaskRecord taskRecord) { if (runningWorkers.containsKey(taskId)) { // if currentSize is smaller than the worker size it means the worker // must not be re-added as free worker but removed from the pool. - if (totalConfiguredWorkers >= getCurrentWorkerSize()) { + if (totalConfiguredWorkers >= runningWorkers.size()) { freeWorkers.release(1); } runningWorkers.remove(taskId); @@ -172,24 +164,7 @@ public void reserveWorker() { } } - @Override - public void onDeltaNumberOfWorkersUpdate(final int delta) { - synchronized (this) { - deltaCounter += delta; - if (deltaTimerTask != null) { - deltaTimerTask.cancel(); - } - deltaTimerTask = new TimerTask() { - @Override - public void run() { - synchronized (WorkerPool.this) { - updateNumberOfWorkers(totalConfiguredWorkers + deltaCounter); - } - } - }; - deltaTimer.schedule(deltaTimerTask, 50L); - } - } + /** * Sets the number of workers which are actually available. This number should @@ -212,22 +187,18 @@ public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberO } private void updateNumberOfWorkers(final int numberOfWorkers) { - if (deltaTimerTask != null) { - deltaTimerTask.cancel(); - deltaTimerTask = null; - } - deltaCounter = 0; + final int previousTotalConfiguredWorkers = totalConfiguredWorkers; totalConfiguredWorkers = numberOfWorkers; - final int deltaWorkers = totalConfiguredWorkers - getCurrentWorkerSize(); + final int deltaWorkers = totalConfiguredWorkers - getWorkerSize(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers); } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0)) { - freeWorkers.acquireUninterruptibly(Math.min(freeWorkers.availablePermits(), -deltaWorkers)); + freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers)); LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers); } - if (deltaWorkers != 0) { + if (previousTotalConfiguredWorkers != totalConfiguredWorkers) { workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java index 3dc101e..23f1bf4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java @@ -35,8 +35,8 @@ public final class WorkerPoolMetrics { private enum WorkerPoolMetricType { // @formatter:off - WORKER_SIZE(WorkerPool::getWorkerSize, "Configured number of workers according to taskmanager"), - CURRENT_WORKER_SIZE(WorkerPool::getCurrentWorkerSize, "Current number of workers according to taskmanager"), + WORKER_SIZE(WorkerPool::getWorkerSize, "Number of workers based on internal state of taskmanager"), + CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, "Current number of workers according to taskmanager"), RUNNING_WORKER_SIZE(WorkerPool::getRunningWorkerSize, "Running (or occupied) number of workers according to taskmanager"); // @formatter:on diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java index 4253da7..2083eec 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java @@ -89,8 +89,8 @@ interface WorkerMetrics { int getRunningWorkerSize(); /** - * @return Returns the number total number of workers . + * @return Returns the number total number of workers based on what the queue reports as being active. */ - int getCurrentWorkerSize(); + int getReportedWorkerSize(); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java index 517f208..7bcb427 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java @@ -28,11 +28,4 @@ public interface WorkerSizeObserver { * @param numberOfMessages Actual total number of messages on the queue */ void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages); - - /** - * Gives an increment or decrement number of workers processes connected on the queue. - * - * @param deltaNumberOfWorkers increase/decrease of number of workers processes - */ - void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java index d17d187..fe7b269 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java @@ -40,12 +40,11 @@ public interface WorkerSizeProviderProxy { boolean removeObserver(String workerQueueName); /** - * Returns the {@link WorkerSizeObserver} for the given worker queue name. + * Triggers to get the worker queue state. * - * @param workerQueueName name of the worker queue - * @return observer for the worker queue + * @param queueName name of the worker queue */ - WorkerSizeObserver getWorkerSizeObserver(String workerQueueName); + void triggerWorkerQueueState(final String queueName); /** * Starts the worker size provider. diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 386df21..60fb1e5 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -129,7 +129,7 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, workerMetrics.getCurrentWorkerSize()); + loadMetrics.register(1, workerMetrics.getReportedWorkerSize()); } @Override @@ -138,7 +138,7 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, workerMetrics.getCurrentWorkerSize()); + loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java index b1ac8b1..2143a18 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java @@ -30,7 +30,6 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; @@ -42,7 +41,6 @@ class RabbitMQChannelQueueEventsWatcher { private static final String AMQ_RABBITMQ_EVENT = "amq.rabbitmq.event"; private static final String CHANNEL_PATTERN = "consumer.*"; private static final String HEADER_PARAM_QUEUE = "queue"; - private static final String CONSUMER_CREATED = "consumer.created"; private static final Logger LOG = LoggerFactory.getLogger(RabbitMQChannelQueueEventsWatcher.class); @@ -104,20 +102,9 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi final Map headers = properties.getHeaders(); final Object queue = headers.get(HEADER_PARAM_QUEUE); final String queueName = queue == null ? null : queue.toString(); - final WorkerSizeObserver observer = proxy.getWorkerSizeObserver(queueName); - if (observer == null) { - LOG.trace("No handler to watch channel changes for queue: {}", queueName); - return; - } - final String event = envelope.getRoutingKey(); - - LOG.trace("Event: {} - queue: {}", event, queueName); - if (CONSUMER_CREATED.equals(event)) { - observer.onDeltaNumberOfWorkersUpdate(1); - } else { // consumer.deleted is the only other possibility - observer.onDeltaNumberOfWorkersUpdate(-1); - } + LOG.trace("Event: {} - queue: {}", envelope.getRoutingKey(), queueName); + proxy.triggerWorkerQueueState(queueName); } }; } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java index b706333..cbba798 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java @@ -110,7 +110,7 @@ public void shutdown() { private void updateMetrics() { try { metrics.forEach((q, wpm) -> { - final int size = wpm.getCurrentWorkerSize(); + final int size = wpm.getReportedWorkerSize(); final int utilisation = wpm.getRunningWorkerSize(); try { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java index ec88cb7..b090ce1 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java @@ -182,9 +182,17 @@ private boolean startReplyConsumer(final Connection connection) throws IOExcepti replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) { @Override public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) { - workerFinishedHandlers.forEach(h -> h.onWorkerFinished(properties.getMessageId(), properties.getHeaders())); + workerFinishedHandlers.forEach(h -> handleWorkFinished(h, properties)); } }); return true; } + + private void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) { + try { + handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders()); + } catch (final RuntimeException e) { + LOG.error("Runtime exception during handleWorkFinished of {}", handler.getClass(), e); + } + } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 6500519..4cb22ce 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -19,8 +19,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -44,14 +47,26 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { * Delay first read from the RabittMQ admin to give the taskmanager some time to start up and register all observers. */ private static final int INITIAL_DELAY_SECONDS = 10; + /** + * The minimum time before the RabbitMQ management api is fetch again to get an update on the queue state. + */ + private static final long DELAY_BEFORE_UPDATE_TIME_SECONDS = 15; private final ScheduledExecutorService executorService; private final BrokerConnectionFactory factory; private final RabbitMQChannelQueueEventsWatcher channelQueueEventsWatcher; private final RabbitMQWorkerEventProducer eventProducer; - private final long refreshDelaySeconds; + /** + * The time in seconds between each scheduled update. + */ + private final long refreshRateSeconds; + /** + * The time delay in seconds of the update call is made. + */ + private final long refreshDelayBeforeUpdateSeconds; - private final Map observers = new HashMap<>(); + private final Map> lastRuns = new HashMap<>(); + private final Map observers = new HashMap<>(); private final Map monitors = new HashMap<>(); private boolean running; @@ -60,24 +75,36 @@ public RabbitMQWorkerSizeProvider(final ScheduledExecutorService executorService this.executorService = executorService; this.factory = factory; channelQueueEventsWatcher = new RabbitMQChannelQueueEventsWatcher(factory, this); - refreshDelaySeconds = factory.getConnectionConfiguration().getBrokerManagementRefreshRate(); + refreshRateSeconds = factory.getConnectionConfiguration().getBrokerManagementRefreshRate(); eventProducer = new RabbitMQWorkerEventProducer(executorService, factory); + refreshDelayBeforeUpdateSeconds = Math.min(refreshRateSeconds / 2, DELAY_BEFORE_UPDATE_TIME_SECONDS); } @Override public void addObserver(final String workerQueueName, final WorkerSizeObserver observer) { - observers.put(workerQueueName, observer); + observers.computeIfAbsent(workerQueueName, k -> new WorkerSizeObserverComposite()).add(observer); if (observer instanceof WorkerMetrics) { eventProducer.addMetrics(workerQueueName, (WorkerMetrics) observer); } - if (refreshDelaySeconds > 0) { + if (refreshRateSeconds > 0) { final RabbitMQQueueMonitor monitor = new RabbitMQQueueMonitor(factory.getConnectionConfiguration()); - monitors.put(workerQueueName, monitor); + + putMonitor(workerQueueName, monitor); } else { - LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshDelaySeconds); + LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshRateSeconds); } } + /** + * Store the monitor. Should only be called outside of this class from unit tests to add a mock monitor. + * + * @param workerQueueName + * @param monitor + */ + void putMonitor(final String workerQueueName, final RabbitMQQueueMonitor monitor) { + monitors.put(workerQueueName, monitor); + } + @Override public boolean removeObserver(final String workerQueueName) { final RabbitMQQueueMonitor monitor = monitors.remove(workerQueueName); @@ -93,9 +120,9 @@ public boolean removeObserver(final String workerQueueName) { public void start() throws IOException { channelQueueEventsWatcher.start(); eventProducer.start(); - if (refreshDelaySeconds > 0) { + if (refreshRateSeconds > 0) { running = true; - executorService.scheduleWithFixedDelay(this::updateWorkerQueueState, INITIAL_DELAY_SECONDS, refreshDelaySeconds, TimeUnit.SECONDS); + executorService.scheduleWithFixedDelay(this::updateWorkerQueueState, INITIAL_DELAY_SECONDS, refreshRateSeconds, TimeUnit.SECONDS); } } @@ -108,18 +135,53 @@ public void shutdown() { channelQueueEventsWatcher.shutdown(); } - @Override - public WorkerSizeObserver getWorkerSizeObserver(final String workerQueueName) { - return observers.get(workerQueueName); - } - private void updateWorkerQueueState() { if (running) { try { - monitors.forEach((k, v) -> v.updateWorkerQueueState(k, getWorkerSizeObserver(k))); + monitors.forEach((k, v) -> triggerWorkerQueueState(k)); } catch (final RuntimeException e) { LOG.error("Runtime error during updateWorkerQueueState", e); } } } + + @Override + public void triggerWorkerQueueState(final String queueName) { + // This uses a delayed update. It schedules a task to run in x-seconds. + // If a new update is received before the schedule has run it will cancel the current schedule and reschedule. + // This is mainly for when multiple events are triggered to not trigger a call for every event, + // and also to manage the events trigger in combination with the scheduled process. + synchronized (queueName) { + Optional.ofNullable(lastRuns.get(queueName)).ifPresent(f -> f.cancel(false)); + final Runnable updateTask = () -> updateWorkerQueueState(queueName); + + lastRuns.put(queueName, executorService.schedule(updateTask, refreshDelayBeforeUpdateSeconds, TimeUnit.SECONDS)); + } + } + + private void updateWorkerQueueState(final String queueName) { + synchronized (queueName) { + monitors.get(queueName).updateWorkerQueueState(queueName, observers.get(queueName)); + lastRuns.remove(queueName); + } + } + + private static class WorkerSizeObserverComposite implements WorkerSizeObserver { + private final List list = new ArrayList<>(); + + public void add(final WorkerSizeObserver observer) { + list.add(observer); + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + for (final WorkerSizeObserver observer : list) { + try { + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + } catch (final RuntimeException e) { + LOG.error("RuntimeException during onNumberOfWorkersUpdate in {}", observer.getClass(), e); + } + } + } + } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index 971e4c2..a667e12 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -77,6 +77,10 @@ public int onWorkerTotal(final String queueName) { .sum(); } + public void reset() { + tasksOnWorkersPerQueue.forEach((k, v) -> v.set(0)); + } + public int onWorker(final TaskRecord taskRecord) { return Optional.ofNullable(tasksOnWorkersPerQueue.get(key(taskRecord))).map(AtomicInteger::intValue).orElse(0); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 7d05ca7..353160e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import nl.aerius.taskmanager.QueueWatchDog; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskRecord; @@ -39,14 +41,15 @@ * like with other priorities. * */ -class PriorityTaskScheduler implements TaskScheduler, Comparator { +class PriorityTaskScheduler implements TaskScheduler, Comparator, WorkerSizeObserver { private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskScheduler.class); private static final int NEXT_TASK_MAX_WAIT_TIME_SECONDS = 120; + private final QueueWatchDog watchDog = new QueueWatchDog(); private final PriorityTaskSchedulerMetrics metrics = new PriorityTaskSchedulerMetrics(); private final Queue queue; - private final PriorityQueueMap priorityQueueMap; + private final PriorityQueueMap priorityQueueMap; private final Lock lock = new ReentrantLock(); private final Condition nextTaskCondition = lock.newCondition(); private final String workerQueueName; @@ -55,9 +58,8 @@ class PriorityTaskScheduler implements TaskScheduler, Compara /** * Constructs scheduler for given configuration. - * */ - PriorityTaskScheduler(final PriorityQueueMap priorityQueueKeyMap, final Function, Queue> queueCreator, + PriorityTaskScheduler(final PriorityQueueMap priorityQueueKeyMap, final Function, Queue> queueCreator, final String workerQueueName) { this.priorityQueueMap = priorityQueueKeyMap; this.workerQueueName = workerQueueName; @@ -116,14 +118,14 @@ public Task getNextTask() throws InterruptedException { return task; } - private Task obtainTask() { + private void obtainTask() { tasksOnWorkers++; final Task task = queue.poll(); + priorityQueueMap.incrementOnWorker(task.getTaskRecord()); if (task.getContext() != null) { task.getContext().makeCurrent(); } - return task; } /** @@ -284,6 +286,33 @@ private int comparePrioWithoutTask(final TaskRecord taskRecord1, final TaskRecor * Signal that the next task process can check again.. */ private void signalNextTask() { - nextTaskCondition.signalAll(); + try { + nextTaskCondition.signalAll(); + } catch (final IllegalMonitorStateException e) { + LOG.error("Caller of signalNextTask did not wrapped call with lock field.", e); + } + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + if (watchDog.isItDead(tasksOnWorkers > 0, numberOfMessages)) { + LOG.info("It looks like some tasks are zombies on {} worker queue in priority scheduler, so all tasks currently in state running are released.", workerQueueName); + reset(); + } + } + + /** + * Resets the internal state. Called in case a difference was detected that internally it still has messages as being on the queue, + * while the queue is empty. + */ + void reset() { + lock.lock(); + try { + tasksOnWorkers = 0; + priorityQueueMap.reset(); + signalNextTask(); + } finally { + lock.unlock(); + } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java index 05804df..4e2fd96 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,7 +42,7 @@ void before() throws IOException { } @Test - @Timeout(2000) + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testLoadConfiguration() { final TaskManagerConfiguration tmc = ConfigurationManager.loadConfiguration(properties); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java index 36dfae1..5ec3996 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java @@ -34,11 +34,12 @@ public MockTask(final TaskConsumer taskConsumer) { this(taskConsumer, UUID.randomUUID().toString()); } - public MockTask(final TaskConsumer taskConsumer, final String id) { + public MockTask(final TaskConsumer taskConsumer, final String messageId) { super(taskConsumer); final RabbitMQMessage message = mock(RabbitMQMessage.class); - doReturn(id).when(message).getMessageId(); + doReturn(messageId).when(message).getMessageId(); + doReturn(taskConsumer.getQueueName()).when(message).getCorrelationId(); setData(message); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index d3f7296..e7a130e 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -73,7 +73,7 @@ void after() throws InterruptedException { } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testNoFreeWorkers() { // Add Worker which will unlock workerPool.onNumberOfWorkersUpdate(1, 0); @@ -85,13 +85,13 @@ void testNoFreeWorkers() { forwardTaskAsync(createTask(), null); // Dispatcher should go back to wait for worker to become available. await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); - assertEquals(0, workerPool.getCurrentWorkerSize(), "WorkerPool should be empty"); + assertEquals(0, workerPool.getReportedWorkerSize(), "WorkerPool should be empty"); workerPool.onNumberOfWorkersUpdate(1, 0); - assertEquals(1, workerPool.getCurrentWorkerSize(), "WorkerPool should have 1 running"); + assertEquals(1, workerPool.getReportedWorkerSize(), "WorkerPool should have 1 running"); } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testForwardTest() { final Task task = createTask(); final Future future = forwardTaskAsync(task, null); @@ -104,7 +104,7 @@ void testForwardTest() { @Disabled("TaskAlreadySendexception error willl not be thrown") @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testForwardDuplicateTask() { final Task task = createTask(); executor.execute(dispatcher); @@ -122,7 +122,7 @@ void testForwardDuplicateTask() { } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testExceptionDuringForward() { workerProducer.setShutdownExceptionOnForward(true); final Task task = createTask(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index db13d25..03d921a 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -78,17 +78,17 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { - assertSame(0, workerPool.getCurrentWorkerSize(), "Check if workerPool size is empty at start"); + assertSame(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); workerPool.onNumberOfWorkersUpdate(10, 0); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is changed after sizing"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); final Task task = createTask(); workerPool.sendTaskToWorker(task); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); workerPool.releaseWorker(task.getId()); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); } @Test @@ -105,16 +105,16 @@ void testWorkerPoolScaleDown() throws IOException { workerPool.sendTaskToWorker(task2); final Task task3 = createTask(); workerPool.sendTaskToWorker(task3); - assertSame(5, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after 2 workers running"); + assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); workerPool.onNumberOfWorkersUpdate(1, 0); - assertSame(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); - assertSame(3, workerPool.getCurrentWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); + assertEquals(3, workerPool.getWorkerSize(), "Workpool size should match number of running tasks, since new total is lower than currently running"); + assertEquals(1, workerPool.getReportedWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); workerPool.releaseWorker(task1.getId()); - assertSame(2, workerPool.getCurrentWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); + assertEquals(2, workerPool.getWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); workerPool.releaseWorker(task2.getId()); - assertSame(1, workerPool.getCurrentWorkerSize(), "Check if workerPool size is lower"); + assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); workerPool.releaseWorker(task3.getId()); - assertSame(1, workerPool.getCurrentWorkerSize(), "Check if workerPool size should remain the same"); + assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); } @Test @@ -124,11 +124,11 @@ void testReleaseTaskTwice() throws IOException { workerPool.sendTaskToWorker(task1); final String id = task1.getId(); workerPool.releaseWorker(id); - final int currentWorkerSize1 = workerPool.getCurrentWorkerSize(); + final int currentWorkerSize1 = workerPool.getReportedWorkerSize(); workerPool.releaseWorker(id); - final int currentWorkerSize2 = workerPool.getCurrentWorkerSize(); + final int currentWorkerSize2 = workerPool.getReportedWorkerSize(); assertEquals(currentWorkerSize1, currentWorkerSize2, "Check if task is not sent twice"); - assertEquals(2, workerPool.getCurrentWorkerSize(), "Check if task worker size not decreased to much"); + assertEquals(2, workerPool.getReportedWorkerSize(), "Check if task worker size not decreased to much"); } @Disabled("Exception is not thrown anymore, so test ignored for now") diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index eff414b..0d9fde2 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -87,7 +87,7 @@ void beforeEach() { @Test void testOnWorkDispatched() { - doReturn(10).when(workMetrics).getCurrentWorkerSize(); + doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -101,7 +101,7 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - doReturn(10).when(workMetrics).getCurrentWorkerSize(); + doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -115,7 +115,7 @@ void testOnWorkerFinished() { @Test void testWorkLoad() throws InterruptedException { - doReturn(4).when(workMetrics).getCurrentWorkerSize(); + doReturn(4).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java index 196c164..13e08c1 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java @@ -45,6 +45,7 @@ class AbstractRabbitMQTest { protected BrokerConnectionFactory factory; protected Channel mockChannel; protected AdaptorFactory adapterFactory; + protected int brokerManagementRefreshRate; @BeforeAll static void setupClass() { @@ -61,7 +62,7 @@ static void afterClass() throws InterruptedException { void setUp() throws Exception { final Connection mockConnection = Mockito.mock(Connection.class); final ConnectionConfiguration configuration = ConnectionConfiguration.builder() - .brokerHost("localhost").brokerUsername("guest").brokerPassword("guest").build(); + .brokerHost("localhost").brokerUsername("guest").brokerPassword("guest").brokerManagementRefreshRate(brokerManagementRefreshRate).build(); mockChannel = MockedChannelFactory.create(); lenient().doReturn(mockChannel).when(mockConnection).createChannel(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java index 5b5a1fb..a572901 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java @@ -17,19 +17,18 @@ package nl.aerius.taskmanager.mq; import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -44,7 +43,6 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; @@ -89,25 +87,13 @@ protected Connection createNewConnection() throws IOException { @Test void testReceiving() throws IOException { - final AtomicInteger deltaAI = new AtomicInteger(); - final WorkerSizeObserver observer = new WorkerSizeObserver() { - - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - fail("Should not call onNumberOfWorkersUpdate in this class"); - } - - @Override - public void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers) { - deltaAI.set(deltaNumberOfWorkers); - } - }; - doReturn(observer).when(proxy).getWorkerSizeObserver(TEST_QUEUENAME); - assertDeltaCheck("consumer.created", deltaAI, 1); - assertDeltaCheck("consumer.removed", deltaAI, -1); + assertDeltaCheck("consumer.created"); + verify(proxy).triggerWorkerQueueState(TEST_QUEUENAME); + assertDeltaCheck("consumer.removed"); + verify(proxy, times(2)).triggerWorkerQueueState(TEST_QUEUENAME); } - private void assertDeltaCheck(final String event, final AtomicInteger deltaAI, final int expected) throws IOException { + private void assertDeltaCheck(final String event) throws IOException { doAnswer(i -> { final Envelope envelope = Mockito.mock(Envelope.class); doReturn(event).when(envelope).getRoutingKey(); @@ -118,6 +104,5 @@ private void assertDeltaCheck(final String event, final AtomicInteger deltaAI, f return null; }).when(mockChannel).basicConsume(any(), eq(true), any()); watcher.start(); - assertEquals(expected, deltaAI.get(), "Expected increment by 1"); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index cbb90e4..6188dca 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -59,7 +59,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { private @Captor ArgumentCaptor shutdownListenerCaptor; @Test - @Timeout(10000) + @Timeout(value = 10, unit = TimeUnit.SECONDS) void testMessageReceivedHandler() throws IOException, InterruptedException { final byte[] receivedBody = "4321".getBytes(); final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java index 1d26054..e82f254 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java @@ -44,17 +44,7 @@ void testGetWorkerQueueState() { final ConnectionConfiguration configuration = ConnectionConfiguration.builder() .brokerHost(DUMMY).brokerPort(0).brokerUsername(DUMMY).brokerPassword(DUMMY).build(); final AtomicInteger workerSize = new AtomicInteger(); - final WorkerSizeObserver mwps = new WorkerSizeObserver() { - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - workerSize.set(numberOfWorkers); - } - - @Override - public void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers) { - // not tested here. - } - }; + final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages) -> workerSize.set(numberOfWorkers); final RabbitMQQueueMonitor rpm = new RabbitMQQueueMonitor(configuration) { @Override protected JsonNode getJsonResultFromApi(final String apiPath) throws IOException { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java index 215fe73..5a6f2e3 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java @@ -44,7 +44,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest { private @Mock WorkerSizeObserver queueSizeObserver; @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testForwardMessage() throws IOException, InterruptedException { final byte[] sendBody = "4321".getBytes(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java index 81863a1..21bfd74 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java @@ -17,12 +17,19 @@ package nl.aerius.taskmanager.mq; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import org.junit.jupiter.api.Timeout; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; @@ -31,19 +38,41 @@ */ public class RabbitMQWorkerSizeProviderTest extends AbstractRabbitMQTest { + private static final String TEST_QUEUE = "test"; + private RabbitMQWorkerSizeProvider provider; @Override @BeforeEach void setUp() throws Exception { + brokerManagementRefreshRate = 5; super.setUp(); provider = new RabbitMQWorkerSizeProvider(executor, factory); } + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + void testTriggerWorkerQueueState() throws IOException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final RabbitMQQueueMonitor mockMonitor = mock(RabbitMQQueueMonitor.class); + + doAnswer(inv -> { + latch.countDown(); + return null; + }).when(mockMonitor).updateWorkerQueueState(eq(TEST_QUEUE), any()); + provider.putMonitor(TEST_QUEUE, mockMonitor); + // Call twice, which should result in only 1 call to updateWorkerQueueState + provider.triggerWorkerQueueState(TEST_QUEUE); + provider.triggerWorkerQueueState(TEST_QUEUE); + latch.await(); + verify(mockMonitor).updateWorkerQueueState(eq(TEST_QUEUE), any()); + } + @Test void testStartShutdown() throws IOException { - final WorkerSizeObserver dummyObserver = Mockito.mock(WorkerSizeObserver.class); - provider.addObserver("test", dummyObserver); + final WorkerSizeObserver dummyObserver = mock(WorkerSizeObserver.class); + + provider.addObserver(TEST_QUEUE, dummyObserver); provider.start(); provider.shutdown(); assertFalse(provider.removeObserver("test"), "Observer should already have been removed"); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java index 9965734..ea8c25b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java @@ -82,7 +82,7 @@ void setUp() throws IOException { configuration.getQueues().add(tc1); configuration.getQueues().add(tc2); configuration.getQueues().add(tc3); - scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, false, null)); + scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, true, null)); configuration.getQueues().forEach(scheduler::updateQueue); task1 = createTask(taskConsumer1, "1", QUEUE1); task2a = createTask(taskConsumer2, "2a", QUEUE2); @@ -91,13 +91,13 @@ void setUp() throws IOException { } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompare() throws InterruptedException { assertTrue(compare(task1, task2a, 1), "Compare Ok"); } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareReverse() throws InterruptedException { assertTrue(compare(task2a, task1, -1), "Compare reserve Ok"); } @@ -118,13 +118,13 @@ private boolean compare(final Task taskA, final Task taskB, final int returnResu } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareSame() throws InterruptedException { assertTrue(compareSame(task2a, task3, -1), "Compare same Ok"); } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareSameReverse() throws InterruptedException { assertTrue(compareSame(task3, task2a, 1), "Compare same reserve Ok"); } @@ -167,7 +167,7 @@ void testGetTaskWith1WorkerAvailable() throws InterruptedException, ExecutionExc * it should not be run until that more than one or the task of that queue is finished. */ @Test - @Timeout(7000) + @Timeout(value = 7, unit = TimeUnit.SECONDS) void testGetTask() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(2); final Task task1a = createTask(taskConsumer1, "1a", QUEUE1); @@ -200,7 +200,7 @@ void testGetTask() throws InterruptedException, ExecutionException { * In the meanwhile, other tasks can start/finish (as long as there is a capacity for those tasks). */ @Test - @Timeout(7000) + @Timeout(value = 7, unit = TimeUnit.SECONDS) void testGetTaskBigPool() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(10); final List tasks = new ArrayList<>(); @@ -243,7 +243,7 @@ void testGetTaskBigPool() throws InterruptedException, ExecutionException { * @throws InterruptedException */ @Test - @Timeout(1000) + @Timeout(value = 1, unit = TimeUnit.SECONDS) void testCompare2Workers() throws InterruptedException { scheduler.onWorkerPoolSizeChange(2); scheduler.addTask(task2a); @@ -254,6 +254,17 @@ void testCompare2Workers() throws InterruptedException { assertSame(task3, scheduler.getNextTask(), "Scheduler should prefer task3"); } + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + void testReset() throws InterruptedException { + scheduler.onWorkerPoolSizeChange(2); + scheduler.addTask(task2a); + scheduler.getNextTask(); + assertEquals(1, scheduler.compare(task2b, task3), "Scheduler should prefer task3"); + scheduler.reset(); + assertEquals(0, scheduler.compare(task2b, task3), "After reset Scheduler should not have a preference because all are on the same level"); + } + private Future waitForTask(final Task task, final AtomicInteger chkCounter) { final ExecutorService es = Executors.newCachedThreadPool(); final Future receivedTask = es.submit(new Callable() { From 1177445268c6b79662ce46ca4c3739522d95a3af Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Mon, 21 Jul 2025 17:04:36 +0200 Subject: [PATCH 2/2] Review comments --- .../mq/RabbitMQWorkerSizeProvider.java | 20 ++++++++++--------- .../priorityqueue/PriorityTaskScheduler.java | 2 +- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 4cb22ce..aa19d7b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -48,7 +48,7 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { */ private static final int INITIAL_DELAY_SECONDS = 10; /** - * The minimum time before the RabbitMQ management api is fetch again to get an update on the queue state. + * The minimum time before the RabbitMQ management api is fetched again to get an update on the queue state. */ private static final long DELAY_BEFORE_UPDATE_TIME_SECONDS = 15; @@ -61,7 +61,7 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { */ private final long refreshRateSeconds; /** - * The time delay in seconds of the update call is made. + * The time delay in seconds before the update call is made. */ private final long refreshDelayBeforeUpdateSeconds; @@ -82,17 +82,19 @@ public RabbitMQWorkerSizeProvider(final ScheduledExecutorService executorService @Override public void addObserver(final String workerQueueName, final WorkerSizeObserver observer) { + if (!observers.containsKey(workerQueueName)) { + if (refreshRateSeconds > 0) { + final RabbitMQQueueMonitor monitor = new RabbitMQQueueMonitor(factory.getConnectionConfiguration()); + + putMonitor(workerQueueName, monitor); + } else { + LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshRateSeconds); + } + } observers.computeIfAbsent(workerQueueName, k -> new WorkerSizeObserverComposite()).add(observer); if (observer instanceof WorkerMetrics) { eventProducer.addMetrics(workerQueueName, (WorkerMetrics) observer); } - if (refreshRateSeconds > 0) { - final RabbitMQQueueMonitor monitor = new RabbitMQQueueMonitor(factory.getConnectionConfiguration()); - - putMonitor(workerQueueName, monitor); - } else { - LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshRateSeconds); - } } /** diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 353160e..152e8d3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -289,7 +289,7 @@ private void signalNextTask() { try { nextTaskCondition.signalAll(); } catch (final IllegalMonitorStateException e) { - LOG.error("Caller of signalNextTask did not wrapped call with lock field.", e); + LOG.error("Caller of signalNextTask did not wrap call with lock field.", e); } }