diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java index 37723c0..2bac89e 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/QueueWatchDog.java @@ -23,7 +23,7 @@ * the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message * for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point. */ -class QueueWatchDog { +public class QueueWatchDog { /** * If for more than 10 minutes the problem remains the sign to reset is given. diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java index 5187f53..b954bd7 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/TaskManager.java @@ -34,6 +34,7 @@ import nl.aerius.taskmanager.adaptor.AdaptorFactory; import nl.aerius.taskmanager.adaptor.WorkerProducer; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.domain.QueueConfig; import nl.aerius.taskmanager.domain.TaskConsumer; @@ -126,6 +127,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep OpenTelemetryMetrics.METER, workerPool); workerProducer.addWorkerFinishedHandler(reporter); workerSizeObserverProxy.addObserver(workerQueueName, workerPool); + if (taskScheduler instanceof final WorkerSizeObserver wzo) { + workerSizeObserverProxy.addObserver(workerQueueName, wzo); + } workerProducer.start(); // Set up metrics WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName); diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java index 1bba8cb..c977852 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPool.java @@ -19,8 +19,6 @@ import java.io.IOException; import java.util.Map; import java.util.Map.Entry; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; @@ -54,10 +52,6 @@ class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMet private final WorkerProducer wp; private final WorkerUpdateHandler workerUpdateHandler; - private final Timer deltaTimer = new Timer(); - private TimerTask deltaTimerTask; - private int deltaCounter; - public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) { this.workerQueueName = workerQueueName; this.wp = wp; @@ -89,15 +83,13 @@ public void sendTaskToWorker(final Task task) throws IOException { public int getWorkerSize() { synchronized (this) { - return totalConfiguredWorkers; + return freeWorkers.availablePermits() + runningWorkers.size(); } } @Override - public int getCurrentWorkerSize() { - synchronized (this) { - return freeWorkers.availablePermits() + runningWorkers.size(); - } + public int getReportedWorkerSize() { + return totalConfiguredWorkers; } @Override @@ -143,7 +135,7 @@ public void releaseWorker(final String taskId, final TaskRecord taskRecord) { if (runningWorkers.containsKey(taskId)) { // if currentSize is smaller than the worker size it means the worker // must not be re-added as free worker but removed from the pool. - if (totalConfiguredWorkers >= getCurrentWorkerSize()) { + if (totalConfiguredWorkers >= runningWorkers.size()) { freeWorkers.release(1); } runningWorkers.remove(taskId); @@ -172,24 +164,7 @@ public void reserveWorker() { } } - @Override - public void onDeltaNumberOfWorkersUpdate(final int delta) { - synchronized (this) { - deltaCounter += delta; - if (deltaTimerTask != null) { - deltaTimerTask.cancel(); - } - deltaTimerTask = new TimerTask() { - @Override - public void run() { - synchronized (WorkerPool.this) { - updateNumberOfWorkers(totalConfiguredWorkers + deltaCounter); - } - } - }; - deltaTimer.schedule(deltaTimerTask, 50L); - } - } + /** * Sets the number of workers which are actually available. This number should @@ -212,22 +187,18 @@ public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberO } private void updateNumberOfWorkers(final int numberOfWorkers) { - if (deltaTimerTask != null) { - deltaTimerTask.cancel(); - deltaTimerTask = null; - } - deltaCounter = 0; + final int previousTotalConfiguredWorkers = totalConfiguredWorkers; totalConfiguredWorkers = numberOfWorkers; - final int deltaWorkers = totalConfiguredWorkers - getCurrentWorkerSize(); + final int deltaWorkers = totalConfiguredWorkers - getWorkerSize(); if (deltaWorkers > 0) { freeWorkers.release(deltaWorkers); LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers); } else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0)) { - freeWorkers.acquireUninterruptibly(Math.min(freeWorkers.availablePermits(), -deltaWorkers)); + freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers)); LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers); } - if (deltaWorkers != 0) { + if (previousTotalConfiguredWorkers != totalConfiguredWorkers) { workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java index 3dc101e..23f1bf4 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/WorkerPoolMetrics.java @@ -35,8 +35,8 @@ public final class WorkerPoolMetrics { private enum WorkerPoolMetricType { // @formatter:off - WORKER_SIZE(WorkerPool::getWorkerSize, "Configured number of workers according to taskmanager"), - CURRENT_WORKER_SIZE(WorkerPool::getCurrentWorkerSize, "Current number of workers according to taskmanager"), + WORKER_SIZE(WorkerPool::getWorkerSize, "Number of workers based on internal state of taskmanager"), + CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, "Current number of workers according to taskmanager"), RUNNING_WORKER_SIZE(WorkerPool::getRunningWorkerSize, "Running (or occupied) number of workers according to taskmanager"); // @formatter:on diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java index 4253da7..2083eec 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerProducer.java @@ -89,8 +89,8 @@ interface WorkerMetrics { int getRunningWorkerSize(); /** - * @return Returns the number total number of workers . + * @return Returns the number total number of workers based on what the queue reports as being active. */ - int getCurrentWorkerSize(); + int getReportedWorkerSize(); } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java index 517f208..7bcb427 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeObserver.java @@ -28,11 +28,4 @@ public interface WorkerSizeObserver { * @param numberOfMessages Actual total number of messages on the queue */ void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages); - - /** - * Gives an increment or decrement number of workers processes connected on the queue. - * - * @param deltaNumberOfWorkers increase/decrease of number of workers processes - */ - void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java index d17d187..fe7b269 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/adaptor/WorkerSizeProviderProxy.java @@ -40,12 +40,11 @@ public interface WorkerSizeProviderProxy { boolean removeObserver(String workerQueueName); /** - * Returns the {@link WorkerSizeObserver} for the given worker queue name. + * Triggers to get the worker queue state. * - * @param workerQueueName name of the worker queue - * @return observer for the worker queue + * @param queueName name of the worker queue */ - WorkerSizeObserver getWorkerSizeObserver(String workerQueueName); + void triggerWorkerQueueState(final String queueName); /** * Starts the worker size provider. diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java index 386df21..60fb1e5 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporter.java @@ -129,7 +129,7 @@ public void onWorkDispatched(final String messageId, final Map m taskMetrics.determineDuration(); dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics); dispatchedWorkerMetrics.register(taskMetrics); - loadMetrics.register(1, workerMetrics.getCurrentWorkerSize()); + loadMetrics.register(1, workerMetrics.getReportedWorkerSize()); } @Override @@ -138,7 +138,7 @@ public synchronized void onWorkerFinished(final String messageId, final Map createQueueDurationMetric(taskMetrics)).register(taskMetrics); workWorkerMetrics.register(taskMetrics); - loadMetrics.register(-1, workerMetrics.getCurrentWorkerSize()); + loadMetrics.register(-1, workerMetrics.getReportedWorkerSize()); } private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java index b1ac8b1..2143a18 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcher.java @@ -30,7 +30,6 @@ import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; @@ -42,7 +41,6 @@ class RabbitMQChannelQueueEventsWatcher { private static final String AMQ_RABBITMQ_EVENT = "amq.rabbitmq.event"; private static final String CHANNEL_PATTERN = "consumer.*"; private static final String HEADER_PARAM_QUEUE = "queue"; - private static final String CONSUMER_CREATED = "consumer.created"; private static final Logger LOG = LoggerFactory.getLogger(RabbitMQChannelQueueEventsWatcher.class); @@ -104,20 +102,9 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi final Map headers = properties.getHeaders(); final Object queue = headers.get(HEADER_PARAM_QUEUE); final String queueName = queue == null ? null : queue.toString(); - final WorkerSizeObserver observer = proxy.getWorkerSizeObserver(queueName); - if (observer == null) { - LOG.trace("No handler to watch channel changes for queue: {}", queueName); - return; - } - final String event = envelope.getRoutingKey(); - - LOG.trace("Event: {} - queue: {}", event, queueName); - if (CONSUMER_CREATED.equals(event)) { - observer.onDeltaNumberOfWorkersUpdate(1); - } else { // consumer.deleted is the only other possibility - observer.onDeltaNumberOfWorkersUpdate(-1); - } + LOG.trace("Event: {} - queue: {}", envelope.getRoutingKey(), queueName); + proxy.triggerWorkerQueueState(queueName); } }; } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java index b706333..cbba798 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerEventProducer.java @@ -110,7 +110,7 @@ public void shutdown() { private void updateMetrics() { try { metrics.forEach((q, wpm) -> { - final int size = wpm.getCurrentWorkerSize(); + final int size = wpm.getReportedWorkerSize(); final int utilisation = wpm.getRunningWorkerSize(); try { diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java index ec88cb7..b090ce1 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducer.java @@ -182,9 +182,17 @@ private boolean startReplyConsumer(final Connection connection) throws IOExcepti replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) { @Override public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) { - workerFinishedHandlers.forEach(h -> h.onWorkerFinished(properties.getMessageId(), properties.getHeaders())); + workerFinishedHandlers.forEach(h -> handleWorkFinished(h, properties)); } }); return true; } + + private void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) { + try { + handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders()); + } catch (final RuntimeException e) { + LOG.error("Runtime exception during handleWorkFinished of {}", handler.getClass(), e); + } + } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java index 6500519..aa19d7b 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProvider.java @@ -19,8 +19,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -44,14 +47,26 @@ public class RabbitMQWorkerSizeProvider implements WorkerSizeProviderProxy { * Delay first read from the RabittMQ admin to give the taskmanager some time to start up and register all observers. */ private static final int INITIAL_DELAY_SECONDS = 10; + /** + * The minimum time before the RabbitMQ management api is fetched again to get an update on the queue state. + */ + private static final long DELAY_BEFORE_UPDATE_TIME_SECONDS = 15; private final ScheduledExecutorService executorService; private final BrokerConnectionFactory factory; private final RabbitMQChannelQueueEventsWatcher channelQueueEventsWatcher; private final RabbitMQWorkerEventProducer eventProducer; - private final long refreshDelaySeconds; + /** + * The time in seconds between each scheduled update. + */ + private final long refreshRateSeconds; + /** + * The time delay in seconds before the update call is made. + */ + private final long refreshDelayBeforeUpdateSeconds; - private final Map observers = new HashMap<>(); + private final Map> lastRuns = new HashMap<>(); + private final Map observers = new HashMap<>(); private final Map monitors = new HashMap<>(); private boolean running; @@ -60,22 +75,36 @@ public RabbitMQWorkerSizeProvider(final ScheduledExecutorService executorService this.executorService = executorService; this.factory = factory; channelQueueEventsWatcher = new RabbitMQChannelQueueEventsWatcher(factory, this); - refreshDelaySeconds = factory.getConnectionConfiguration().getBrokerManagementRefreshRate(); + refreshRateSeconds = factory.getConnectionConfiguration().getBrokerManagementRefreshRate(); eventProducer = new RabbitMQWorkerEventProducer(executorService, factory); + refreshDelayBeforeUpdateSeconds = Math.min(refreshRateSeconds / 2, DELAY_BEFORE_UPDATE_TIME_SECONDS); } @Override public void addObserver(final String workerQueueName, final WorkerSizeObserver observer) { - observers.put(workerQueueName, observer); + if (!observers.containsKey(workerQueueName)) { + if (refreshRateSeconds > 0) { + final RabbitMQQueueMonitor monitor = new RabbitMQQueueMonitor(factory.getConnectionConfiguration()); + + putMonitor(workerQueueName, monitor); + } else { + LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshRateSeconds); + } + } + observers.computeIfAbsent(workerQueueName, k -> new WorkerSizeObserverComposite()).add(observer); if (observer instanceof WorkerMetrics) { eventProducer.addMetrics(workerQueueName, (WorkerMetrics) observer); } - if (refreshDelaySeconds > 0) { - final RabbitMQQueueMonitor monitor = new RabbitMQQueueMonitor(factory.getConnectionConfiguration()); - monitors.put(workerQueueName, monitor); - } else { - LOG.info("Not monitoring RabbitMQ admin api because refresh delay was {} seconds", refreshDelaySeconds); - } + } + + /** + * Store the monitor. Should only be called outside of this class from unit tests to add a mock monitor. + * + * @param workerQueueName + * @param monitor + */ + void putMonitor(final String workerQueueName, final RabbitMQQueueMonitor monitor) { + monitors.put(workerQueueName, monitor); } @Override @@ -93,9 +122,9 @@ public boolean removeObserver(final String workerQueueName) { public void start() throws IOException { channelQueueEventsWatcher.start(); eventProducer.start(); - if (refreshDelaySeconds > 0) { + if (refreshRateSeconds > 0) { running = true; - executorService.scheduleWithFixedDelay(this::updateWorkerQueueState, INITIAL_DELAY_SECONDS, refreshDelaySeconds, TimeUnit.SECONDS); + executorService.scheduleWithFixedDelay(this::updateWorkerQueueState, INITIAL_DELAY_SECONDS, refreshRateSeconds, TimeUnit.SECONDS); } } @@ -108,18 +137,53 @@ public void shutdown() { channelQueueEventsWatcher.shutdown(); } - @Override - public WorkerSizeObserver getWorkerSizeObserver(final String workerQueueName) { - return observers.get(workerQueueName); - } - private void updateWorkerQueueState() { if (running) { try { - monitors.forEach((k, v) -> v.updateWorkerQueueState(k, getWorkerSizeObserver(k))); + monitors.forEach((k, v) -> triggerWorkerQueueState(k)); } catch (final RuntimeException e) { LOG.error("Runtime error during updateWorkerQueueState", e); } } } + + @Override + public void triggerWorkerQueueState(final String queueName) { + // This uses a delayed update. It schedules a task to run in x-seconds. + // If a new update is received before the schedule has run it will cancel the current schedule and reschedule. + // This is mainly for when multiple events are triggered to not trigger a call for every event, + // and also to manage the events trigger in combination with the scheduled process. + synchronized (queueName) { + Optional.ofNullable(lastRuns.get(queueName)).ifPresent(f -> f.cancel(false)); + final Runnable updateTask = () -> updateWorkerQueueState(queueName); + + lastRuns.put(queueName, executorService.schedule(updateTask, refreshDelayBeforeUpdateSeconds, TimeUnit.SECONDS)); + } + } + + private void updateWorkerQueueState(final String queueName) { + synchronized (queueName) { + monitors.get(queueName).updateWorkerQueueState(queueName, observers.get(queueName)); + lastRuns.remove(queueName); + } + } + + private static class WorkerSizeObserverComposite implements WorkerSizeObserver { + private final List list = new ArrayList<>(); + + public void add(final WorkerSizeObserver observer) { + list.add(observer); + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + for (final WorkerSizeObserver observer : list) { + try { + observer.onNumberOfWorkersUpdate(numberOfWorkers, numberOfMessages); + } catch (final RuntimeException e) { + LOG.error("RuntimeException during onNumberOfWorkersUpdate in {}", observer.getClass(), e); + } + } + } + } } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java index 971e4c2..a667e12 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityQueueMap.java @@ -77,6 +77,10 @@ public int onWorkerTotal(final String queueName) { .sum(); } + public void reset() { + tasksOnWorkersPerQueue.forEach((k, v) -> v.set(0)); + } + public int onWorker(final TaskRecord taskRecord) { return Optional.ofNullable(tasksOnWorkersPerQueue.get(key(taskRecord))).map(AtomicInteger::intValue).orElse(0); } diff --git a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java index 7d05ca7..152e8d3 100644 --- a/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java +++ b/source/taskmanager/src/main/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskScheduler.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import nl.aerius.taskmanager.QueueWatchDog; +import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.domain.PriorityTaskQueue; import nl.aerius.taskmanager.domain.Task; import nl.aerius.taskmanager.domain.TaskRecord; @@ -39,14 +41,15 @@ * like with other priorities. * */ -class PriorityTaskScheduler implements TaskScheduler, Comparator { +class PriorityTaskScheduler implements TaskScheduler, Comparator, WorkerSizeObserver { private static final Logger LOG = LoggerFactory.getLogger(PriorityTaskScheduler.class); private static final int NEXT_TASK_MAX_WAIT_TIME_SECONDS = 120; + private final QueueWatchDog watchDog = new QueueWatchDog(); private final PriorityTaskSchedulerMetrics metrics = new PriorityTaskSchedulerMetrics(); private final Queue queue; - private final PriorityQueueMap priorityQueueMap; + private final PriorityQueueMap priorityQueueMap; private final Lock lock = new ReentrantLock(); private final Condition nextTaskCondition = lock.newCondition(); private final String workerQueueName; @@ -55,9 +58,8 @@ class PriorityTaskScheduler implements TaskScheduler, Compara /** * Constructs scheduler for given configuration. - * */ - PriorityTaskScheduler(final PriorityQueueMap priorityQueueKeyMap, final Function, Queue> queueCreator, + PriorityTaskScheduler(final PriorityQueueMap priorityQueueKeyMap, final Function, Queue> queueCreator, final String workerQueueName) { this.priorityQueueMap = priorityQueueKeyMap; this.workerQueueName = workerQueueName; @@ -116,14 +118,14 @@ public Task getNextTask() throws InterruptedException { return task; } - private Task obtainTask() { + private void obtainTask() { tasksOnWorkers++; final Task task = queue.poll(); + priorityQueueMap.incrementOnWorker(task.getTaskRecord()); if (task.getContext() != null) { task.getContext().makeCurrent(); } - return task; } /** @@ -284,6 +286,33 @@ private int comparePrioWithoutTask(final TaskRecord taskRecord1, final TaskRecor * Signal that the next task process can check again.. */ private void signalNextTask() { - nextTaskCondition.signalAll(); + try { + nextTaskCondition.signalAll(); + } catch (final IllegalMonitorStateException e) { + LOG.error("Caller of signalNextTask did not wrap call with lock field.", e); + } + } + + @Override + public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { + if (watchDog.isItDead(tasksOnWorkers > 0, numberOfMessages)) { + LOG.info("It looks like some tasks are zombies on {} worker queue in priority scheduler, so all tasks currently in state running are released.", workerQueueName); + reset(); + } + } + + /** + * Resets the internal state. Called in case a difference was detected that internally it still has messages as being on the queue, + * while the queue is empty. + */ + void reset() { + lock.lock(); + try { + tasksOnWorkers = 0; + priorityQueueMap.reset(); + signalNextTask(); + } finally { + lock.unlock(); + } } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java index 05804df..4e2fd96 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/ConfigurationManagerTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Properties; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,7 +42,7 @@ void before() throws IOException { } @Test - @Timeout(2000) + @Timeout(value = 2, unit = TimeUnit.SECONDS) void testLoadConfiguration() { final TaskManagerConfiguration tmc = ConfigurationManager.loadConfiguration(properties); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java index 36dfae1..5ec3996 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/MockTask.java @@ -34,11 +34,12 @@ public MockTask(final TaskConsumer taskConsumer) { this(taskConsumer, UUID.randomUUID().toString()); } - public MockTask(final TaskConsumer taskConsumer, final String id) { + public MockTask(final TaskConsumer taskConsumer, final String messageId) { super(taskConsumer); final RabbitMQMessage message = mock(RabbitMQMessage.class); - doReturn(id).when(message).getMessageId(); + doReturn(messageId).when(message).getMessageId(); + doReturn(taskConsumer.getQueueName()).when(message).getCorrelationId(); setData(message); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java index d3f7296..e7a130e 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/TaskDispatcherTest.java @@ -73,7 +73,7 @@ void after() throws InterruptedException { } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testNoFreeWorkers() { // Add Worker which will unlock workerPool.onNumberOfWorkersUpdate(1, 0); @@ -85,13 +85,13 @@ void testNoFreeWorkers() { forwardTaskAsync(createTask(), null); // Dispatcher should go back to wait for worker to become available. await().until(() -> dispatcher.getState() == State.WAIT_FOR_WORKER); - assertEquals(0, workerPool.getCurrentWorkerSize(), "WorkerPool should be empty"); + assertEquals(0, workerPool.getReportedWorkerSize(), "WorkerPool should be empty"); workerPool.onNumberOfWorkersUpdate(1, 0); - assertEquals(1, workerPool.getCurrentWorkerSize(), "WorkerPool should have 1 running"); + assertEquals(1, workerPool.getReportedWorkerSize(), "WorkerPool should have 1 running"); } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testForwardTest() { final Task task = createTask(); final Future future = forwardTaskAsync(task, null); @@ -104,7 +104,7 @@ void testForwardTest() { @Disabled("TaskAlreadySendexception error willl not be thrown") @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testForwardDuplicateTask() { final Task task = createTask(); executor.execute(dispatcher); @@ -122,7 +122,7 @@ void testForwardDuplicateTask() { } @Test - @Timeout(3000) + @Timeout(value = 3, unit = TimeUnit.SECONDS) void testExceptionDuringForward() { workerProducer.setShutdownExceptionOnForward(true); final Task task = createTask(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java index db13d25..03d921a 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/WorkerPoolTest.java @@ -78,17 +78,17 @@ public void messageDelivered(final Message message) { @Test void testWorkerPoolSizing() throws IOException { - assertSame(0, workerPool.getCurrentWorkerSize(), "Check if workerPool size is empty at start"); + assertSame(0, workerPool.getReportedWorkerSize(), "Check if workerPool size is empty at start"); workerPool.onNumberOfWorkersUpdate(10, 0); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is changed after sizing"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is changed after sizing"); assertEquals(10, numberOfWorkers, "Check if workerPool change handler called."); workerPool.reserveWorker(); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); final Task task = createTask(); workerPool.sendTaskToWorker(task); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after reserving 1 worker"); workerPool.releaseWorker(task.getId()); - assertSame(10, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); + assertSame(10, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after releasing 1 worker"); } @Test @@ -105,16 +105,16 @@ void testWorkerPoolScaleDown() throws IOException { workerPool.sendTaskToWorker(task2); final Task task3 = createTask(); workerPool.sendTaskToWorker(task3); - assertSame(5, workerPool.getCurrentWorkerSize(), "Check if workerPool size is same after 2 workers running"); + assertEquals(5, workerPool.getReportedWorkerSize(), "Check if workerPool size is same after 2 workers running"); workerPool.onNumberOfWorkersUpdate(1, 0); - assertSame(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); - assertSame(3, workerPool.getCurrentWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); + assertEquals(3, workerPool.getWorkerSize(), "Workpool size should match number of running tasks, since new total is lower than currently running"); + assertEquals(1, workerPool.getReportedWorkerSize(), "Check if current workerPool size is same after decreasing # workers"); workerPool.releaseWorker(task1.getId()); - assertSame(2, workerPool.getCurrentWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); + assertEquals(2, workerPool.getWorkerSize(), "Check if workerPool size is lower, but not yet same as total because still process running"); workerPool.releaseWorker(task2.getId()); - assertSame(1, workerPool.getCurrentWorkerSize(), "Check if workerPool size is lower"); + assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size is lower"); workerPool.releaseWorker(task3.getId()); - assertSame(1, workerPool.getCurrentWorkerSize(), "Check if workerPool size should remain the same"); + assertEquals(1, workerPool.getWorkerSize(), "Check if workerPool size should remain the same"); } @Test @@ -124,11 +124,11 @@ void testReleaseTaskTwice() throws IOException { workerPool.sendTaskToWorker(task1); final String id = task1.getId(); workerPool.releaseWorker(id); - final int currentWorkerSize1 = workerPool.getCurrentWorkerSize(); + final int currentWorkerSize1 = workerPool.getReportedWorkerSize(); workerPool.releaseWorker(id); - final int currentWorkerSize2 = workerPool.getCurrentWorkerSize(); + final int currentWorkerSize2 = workerPool.getReportedWorkerSize(); assertEquals(currentWorkerSize1, currentWorkerSize2, "Check if task is not sent twice"); - assertEquals(2, workerPool.getCurrentWorkerSize(), "Check if task worker size not decreased to much"); + assertEquals(2, workerPool.getReportedWorkerSize(), "Check if task worker size not decreased to much"); } @Disabled("Exception is not thrown anymore, so test ignored for now") diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java index eff414b..0d9fde2 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/metrics/PerformanceMetricsReporterTest.java @@ -87,7 +87,7 @@ void beforeEach() { @Test void testOnWorkDispatched() { - doReturn(10).when(workMetrics).getCurrentWorkerSize(); + doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -101,7 +101,7 @@ void testOnWorkDispatched() { @Test void testOnWorkerFinished() { - doReturn(10).when(workMetrics).getCurrentWorkerSize(); + doReturn(10).when(workMetrics).getReportedWorkerSize(); reporter.onWorkerFinished("1", createMap(QUEUE_1, 100L)); reporter.onWorkerFinished("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); @@ -115,7 +115,7 @@ void testOnWorkerFinished() { @Test void testWorkLoad() throws InterruptedException { - doReturn(4).when(workMetrics).getCurrentWorkerSize(); + doReturn(4).when(workMetrics).getReportedWorkerSize(); reporter.onWorkDispatched("1", createMap(QUEUE_1, 100L)); reporter.onWorkDispatched("2", createMap(QUEUE_2, 200L)); methodCaptor.getValue().run(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java index 196c164..13e08c1 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/AbstractRabbitMQTest.java @@ -45,6 +45,7 @@ class AbstractRabbitMQTest { protected BrokerConnectionFactory factory; protected Channel mockChannel; protected AdaptorFactory adapterFactory; + protected int brokerManagementRefreshRate; @BeforeAll static void setupClass() { @@ -61,7 +62,7 @@ static void afterClass() throws InterruptedException { void setUp() throws Exception { final Connection mockConnection = Mockito.mock(Connection.class); final ConnectionConfiguration configuration = ConnectionConfiguration.builder() - .brokerHost("localhost").brokerUsername("guest").brokerPassword("guest").build(); + .brokerHost("localhost").brokerUsername("guest").brokerPassword("guest").brokerManagementRefreshRate(brokerManagementRefreshRate).build(); mockChannel = MockedChannelFactory.create(); lenient().doReturn(mockChannel).when(mockConnection).createChannel(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java index 5b5a1fb..a572901 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQChannelQueueEventsWatcherTest.java @@ -17,19 +17,18 @@ package nl.aerius.taskmanager.mq; import static nl.aerius.taskmanager.client.mq.RabbitMQWorkerMonitor.HEADER_PARAM_QUEUE; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -44,7 +43,6 @@ import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; -import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy; import nl.aerius.taskmanager.client.BrokerConnectionFactory; @@ -89,25 +87,13 @@ protected Connection createNewConnection() throws IOException { @Test void testReceiving() throws IOException { - final AtomicInteger deltaAI = new AtomicInteger(); - final WorkerSizeObserver observer = new WorkerSizeObserver() { - - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - fail("Should not call onNumberOfWorkersUpdate in this class"); - } - - @Override - public void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers) { - deltaAI.set(deltaNumberOfWorkers); - } - }; - doReturn(observer).when(proxy).getWorkerSizeObserver(TEST_QUEUENAME); - assertDeltaCheck("consumer.created", deltaAI, 1); - assertDeltaCheck("consumer.removed", deltaAI, -1); + assertDeltaCheck("consumer.created"); + verify(proxy).triggerWorkerQueueState(TEST_QUEUENAME); + assertDeltaCheck("consumer.removed"); + verify(proxy, times(2)).triggerWorkerQueueState(TEST_QUEUENAME); } - private void assertDeltaCheck(final String event, final AtomicInteger deltaAI, final int expected) throws IOException { + private void assertDeltaCheck(final String event) throws IOException { doAnswer(i -> { final Envelope envelope = Mockito.mock(Envelope.class); doReturn(event).when(envelope).getRoutingKey(); @@ -118,6 +104,5 @@ private void assertDeltaCheck(final String event, final AtomicInteger deltaAI, f return null; }).when(mockChannel).basicConsume(any(), eq(true), any()); watcher.start(); - assertEquals(expected, deltaAI.get(), "Expected increment by 1"); } } diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java index cbb90e4..6188dca 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQMessageHandlerTest.java @@ -59,7 +59,7 @@ class RabbitMQMessageHandlerTest extends AbstractRabbitMQTest { private @Captor ArgumentCaptor shutdownListenerCaptor; @Test - @Timeout(10000) + @Timeout(value = 10, unit = TimeUnit.SECONDS) void testMessageReceivedHandler() throws IOException, InterruptedException { final byte[] receivedBody = "4321".getBytes(); final TaskMessageHandler tmh = adapterFactory.createTaskMessageHandler(new QueueConfig(taskQueueName, false, false, null)); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java index 1d26054..e82f254 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQQueueMonitorTest.java @@ -44,17 +44,7 @@ void testGetWorkerQueueState() { final ConnectionConfiguration configuration = ConnectionConfiguration.builder() .brokerHost(DUMMY).brokerPort(0).brokerUsername(DUMMY).brokerPassword(DUMMY).build(); final AtomicInteger workerSize = new AtomicInteger(); - final WorkerSizeObserver mwps = new WorkerSizeObserver() { - @Override - public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages) { - workerSize.set(numberOfWorkers); - } - - @Override - public void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers) { - // not tested here. - } - }; + final WorkerSizeObserver mwps = (numberOfWorkers, numberOfMessages) -> workerSize.set(numberOfWorkers); final RabbitMQQueueMonitor rpm = new RabbitMQQueueMonitor(configuration) { @Override protected JsonNode getJsonResultFromApi(final String apiPath) throws IOException { diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java index 215fe73..5a6f2e3 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerProducerTest.java @@ -44,7 +44,7 @@ class RabbitMQWorkerProducerTest extends AbstractRabbitMQTest { private @Mock WorkerSizeObserver queueSizeObserver; @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testForwardMessage() throws IOException, InterruptedException { final byte[] sendBody = "4321".getBytes(); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java index 81863a1..21bfd74 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/mq/RabbitMQWorkerSizeProviderTest.java @@ -17,12 +17,19 @@ package nl.aerius.taskmanager.mq; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import org.junit.jupiter.api.Timeout; import nl.aerius.taskmanager.adaptor.WorkerSizeObserver; @@ -31,19 +38,41 @@ */ public class RabbitMQWorkerSizeProviderTest extends AbstractRabbitMQTest { + private static final String TEST_QUEUE = "test"; + private RabbitMQWorkerSizeProvider provider; @Override @BeforeEach void setUp() throws Exception { + brokerManagementRefreshRate = 5; super.setUp(); provider = new RabbitMQWorkerSizeProvider(executor, factory); } + @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) + void testTriggerWorkerQueueState() throws IOException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final RabbitMQQueueMonitor mockMonitor = mock(RabbitMQQueueMonitor.class); + + doAnswer(inv -> { + latch.countDown(); + return null; + }).when(mockMonitor).updateWorkerQueueState(eq(TEST_QUEUE), any()); + provider.putMonitor(TEST_QUEUE, mockMonitor); + // Call twice, which should result in only 1 call to updateWorkerQueueState + provider.triggerWorkerQueueState(TEST_QUEUE); + provider.triggerWorkerQueueState(TEST_QUEUE); + latch.await(); + verify(mockMonitor).updateWorkerQueueState(eq(TEST_QUEUE), any()); + } + @Test void testStartShutdown() throws IOException { - final WorkerSizeObserver dummyObserver = Mockito.mock(WorkerSizeObserver.class); - provider.addObserver("test", dummyObserver); + final WorkerSizeObserver dummyObserver = mock(WorkerSizeObserver.class); + + provider.addObserver(TEST_QUEUE, dummyObserver); provider.start(); provider.shutdown(); assertFalse(provider.removeObserver("test"), "Observer should already have been removed"); diff --git a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java index 9965734..ea8c25b 100644 --- a/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java +++ b/source/taskmanager/src/test/java/nl/aerius/taskmanager/scheduler/priorityqueue/PriorityTaskSchedulerTest.java @@ -82,7 +82,7 @@ void setUp() throws IOException { configuration.getQueues().add(tc1); configuration.getQueues().add(tc2); configuration.getQueues().add(tc3); - scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, false, null)); + scheduler = (PriorityTaskScheduler) factory.createScheduler(new QueueConfig(QUEUE1, false, true, null)); configuration.getQueues().forEach(scheduler::updateQueue); task1 = createTask(taskConsumer1, "1", QUEUE1); task2a = createTask(taskConsumer2, "2a", QUEUE2); @@ -91,13 +91,13 @@ void setUp() throws IOException { } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompare() throws InterruptedException { assertTrue(compare(task1, task2a, 1), "Compare Ok"); } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareReverse() throws InterruptedException { assertTrue(compare(task2a, task1, -1), "Compare reserve Ok"); } @@ -118,13 +118,13 @@ private boolean compare(final Task taskA, final Task taskB, final int returnResu } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareSame() throws InterruptedException { assertTrue(compareSame(task2a, task3, -1), "Compare same Ok"); } @Test - @Timeout(5000) + @Timeout(value = 5, unit = TimeUnit.SECONDS) void testCompareSameReverse() throws InterruptedException { assertTrue(compareSame(task3, task2a, 1), "Compare same reserve Ok"); } @@ -167,7 +167,7 @@ void testGetTaskWith1WorkerAvailable() throws InterruptedException, ExecutionExc * it should not be run until that more than one or the task of that queue is finished. */ @Test - @Timeout(7000) + @Timeout(value = 7, unit = TimeUnit.SECONDS) void testGetTask() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(2); final Task task1a = createTask(taskConsumer1, "1a", QUEUE1); @@ -200,7 +200,7 @@ void testGetTask() throws InterruptedException, ExecutionException { * In the meanwhile, other tasks can start/finish (as long as there is a capacity for those tasks). */ @Test - @Timeout(7000) + @Timeout(value = 7, unit = TimeUnit.SECONDS) void testGetTaskBigPool() throws InterruptedException, ExecutionException { scheduler.onWorkerPoolSizeChange(10); final List tasks = new ArrayList<>(); @@ -243,7 +243,7 @@ void testGetTaskBigPool() throws InterruptedException, ExecutionException { * @throws InterruptedException */ @Test - @Timeout(1000) + @Timeout(value = 1, unit = TimeUnit.SECONDS) void testCompare2Workers() throws InterruptedException { scheduler.onWorkerPoolSizeChange(2); scheduler.addTask(task2a); @@ -254,6 +254,17 @@ void testCompare2Workers() throws InterruptedException { assertSame(task3, scheduler.getNextTask(), "Scheduler should prefer task3"); } + @Test + @Timeout(value = 1, unit = TimeUnit.SECONDS) + void testReset() throws InterruptedException { + scheduler.onWorkerPoolSizeChange(2); + scheduler.addTask(task2a); + scheduler.getNextTask(); + assertEquals(1, scheduler.compare(task2b, task3), "Scheduler should prefer task3"); + scheduler.reset(); + assertEquals(0, scheduler.compare(task2b, task3), "After reset Scheduler should not have a preference because all are on the same level"); + } + private Future waitForTask(final Task task, final AtomicInteger chkCounter) { final ExecutorService es = Executors.newCachedThreadPool(); final Future receivedTask = es.submit(new Callable() {