Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* the queue was purged after some restart. In such a case the scheduler keeps the tasks locked and since there will never come an message
* for the task it's locked indefinitely. This watch dog tries to detect such tasks and release them at some point.
*/
class QueueWatchDog {
public class QueueWatchDog {

/**
* If for more than 10 minutes the problem remains the sign to reset is given.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import nl.aerius.taskmanager.adaptor.AdaptorFactory;
import nl.aerius.taskmanager.adaptor.WorkerProducer;
import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy;
import nl.aerius.taskmanager.domain.QueueConfig;
import nl.aerius.taskmanager.domain.TaskConsumer;
Expand Down Expand Up @@ -126,6 +127,9 @@ public TaskScheduleBucket(final QueueConfig queueConfig) throws InterruptedExcep
OpenTelemetryMetrics.METER, workerPool);
workerProducer.addWorkerFinishedHandler(reporter);
workerSizeObserverProxy.addObserver(workerQueueName, workerPool);
if (taskScheduler instanceof final WorkerSizeObserver wzo) {
workerSizeObserverProxy.addObserver(workerQueueName, wzo);
}
workerProducer.start();
// Set up metrics
WorkerPoolMetrics.setupMetrics(workerPool, workerQueueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

Expand Down Expand Up @@ -54,10 +52,6 @@ class WorkerPool implements WorkerSizeObserver, WorkerFinishedHandler, WorkerMet
private final WorkerProducer wp;
private final WorkerUpdateHandler workerUpdateHandler;

private final Timer deltaTimer = new Timer();
private TimerTask deltaTimerTask;
private int deltaCounter;

public WorkerPool(final String workerQueueName, final WorkerProducer wp, final WorkerUpdateHandler workerUpdateHandler) {
this.workerQueueName = workerQueueName;
this.wp = wp;
Expand Down Expand Up @@ -89,15 +83,13 @@ public void sendTaskToWorker(final Task task) throws IOException {

public int getWorkerSize() {
synchronized (this) {
return totalConfiguredWorkers;
return freeWorkers.availablePermits() + runningWorkers.size();
}
}

@Override
public int getCurrentWorkerSize() {
synchronized (this) {
return freeWorkers.availablePermits() + runningWorkers.size();
}
public int getReportedWorkerSize() {
return totalConfiguredWorkers;
}

@Override
Expand Down Expand Up @@ -143,7 +135,7 @@ public void releaseWorker(final String taskId, final TaskRecord taskRecord) {
if (runningWorkers.containsKey(taskId)) {
// if currentSize is smaller than the worker size it means the worker
// must not be re-added as free worker but removed from the pool.
if (totalConfiguredWorkers >= getCurrentWorkerSize()) {
if (totalConfiguredWorkers >= runningWorkers.size()) {
freeWorkers.release(1);
}
runningWorkers.remove(taskId);
Expand Down Expand Up @@ -172,24 +164,7 @@ public void reserveWorker() {
}
}

@Override
public void onDeltaNumberOfWorkersUpdate(final int delta) {
synchronized (this) {
deltaCounter += delta;
if (deltaTimerTask != null) {
deltaTimerTask.cancel();
}
deltaTimerTask = new TimerTask() {
@Override
public void run() {
synchronized (WorkerPool.this) {
updateNumberOfWorkers(totalConfiguredWorkers + deltaCounter);
}
}
};
deltaTimer.schedule(deltaTimerTask, 50L);
}
}


/**
* Sets the number of workers which are actually available. This number should
Expand All @@ -212,22 +187,18 @@ public void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberO
}

private void updateNumberOfWorkers(final int numberOfWorkers) {
if (deltaTimerTask != null) {
deltaTimerTask.cancel();
deltaTimerTask = null;
}
deltaCounter = 0;
final int previousTotalConfiguredWorkers = totalConfiguredWorkers;
totalConfiguredWorkers = numberOfWorkers;
final int deltaWorkers = totalConfiguredWorkers - getCurrentWorkerSize();
final int deltaWorkers = totalConfiguredWorkers - getWorkerSize();

if (deltaWorkers > 0) {
freeWorkers.release(deltaWorkers);
LOG.info("# Workers of {} increased to {}(+{})", workerQueueName, totalConfiguredWorkers, deltaWorkers);
} else if ((deltaWorkers < 0) && (freeWorkers.availablePermits() > 0)) {
freeWorkers.acquireUninterruptibly(Math.min(freeWorkers.availablePermits(), -deltaWorkers));
freeWorkers.tryAcquire(Math.min(freeWorkers.availablePermits(), -deltaWorkers));
LOG.info("# Workers of {} decreased to {}({})", workerQueueName, totalConfiguredWorkers, deltaWorkers);
}
if (deltaWorkers != 0) {
if (previousTotalConfiguredWorkers != totalConfiguredWorkers) {
workerUpdateHandler.onWorkerPoolSizeChange(totalConfiguredWorkers);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public final class WorkerPoolMetrics {

private enum WorkerPoolMetricType {
// @formatter:off
WORKER_SIZE(WorkerPool::getWorkerSize, "Configured number of workers according to taskmanager"),
CURRENT_WORKER_SIZE(WorkerPool::getCurrentWorkerSize, "Current number of workers according to taskmanager"),
WORKER_SIZE(WorkerPool::getWorkerSize, "Number of workers based on internal state of taskmanager"),
CURRENT_WORKER_SIZE(WorkerPool::getReportedWorkerSize, "Current number of workers according to taskmanager"),
RUNNING_WORKER_SIZE(WorkerPool::getRunningWorkerSize, "Running (or occupied) number of workers according to taskmanager");
// @formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ interface WorkerMetrics {
int getRunningWorkerSize();

/**
* @return Returns the number total number of workers .
* @return Returns the number total number of workers based on what the queue reports as being active.
*/
int getCurrentWorkerSize();
int getReportedWorkerSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,4 @@ public interface WorkerSizeObserver {
* @param numberOfMessages Actual total number of messages on the queue
*/
void onNumberOfWorkersUpdate(final int numberOfWorkers, final int numberOfMessages);

/**
* Gives an increment or decrement number of workers processes connected on the queue.
*
* @param deltaNumberOfWorkers increase/decrease of number of workers processes
*/
void onDeltaNumberOfWorkersUpdate(final int deltaNumberOfWorkers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ public interface WorkerSizeProviderProxy {
boolean removeObserver(String workerQueueName);

/**
* Returns the {@link WorkerSizeObserver} for the given worker queue name.
* Triggers to get the worker queue state.
*
* @param workerQueueName name of the worker queue
* @return observer for the worker queue
* @param queueName name of the worker queue
*/
WorkerSizeObserver getWorkerSizeObserver(String workerQueueName);
void triggerWorkerQueueState(final String queueName);

/**
* Starts the worker size provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void onWorkDispatched(final String messageId, final Map<String, Object> m
taskMetrics.determineDuration();
dispatchedQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
dispatchedWorkerMetrics.register(taskMetrics);
loadMetrics.register(1, workerMetrics.getCurrentWorkerSize());
loadMetrics.register(1, workerMetrics.getReportedWorkerSize());
}

@Override
Expand All @@ -138,7 +138,7 @@ public synchronized void onWorkerFinished(final String messageId, final Map<Stri
taskMetrics.determineDuration();
workQueueMetrics.computeIfAbsent(taskMetrics.queueName(), k -> createQueueDurationMetric(taskMetrics)).register(taskMetrics);
workWorkerMetrics.register(taskMetrics);
loadMetrics.register(-1, workerMetrics.getCurrentWorkerSize());
loadMetrics.register(-1, workerMetrics.getReportedWorkerSize());
}

private DurationMetric createQueueDurationMetric(final TaskMetrics taskMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import nl.aerius.taskmanager.adaptor.WorkerSizeObserver;
import nl.aerius.taskmanager.adaptor.WorkerSizeProviderProxy;
import nl.aerius.taskmanager.client.BrokerConnectionFactory;

Expand All @@ -42,7 +41,6 @@ class RabbitMQChannelQueueEventsWatcher {
private static final String AMQ_RABBITMQ_EVENT = "amq.rabbitmq.event";
private static final String CHANNEL_PATTERN = "consumer.*";
private static final String HEADER_PARAM_QUEUE = "queue";
private static final String CONSUMER_CREATED = "consumer.created";

private static final Logger LOG = LoggerFactory.getLogger(RabbitMQChannelQueueEventsWatcher.class);

Expand Down Expand Up @@ -104,20 +102,9 @@ public void handleDelivery(final String consumerTag, final Envelope envelope, fi
final Map<String, Object> headers = properties.getHeaders();
final Object queue = headers.get(HEADER_PARAM_QUEUE);
final String queueName = queue == null ? null : queue.toString();
final WorkerSizeObserver observer = proxy.getWorkerSizeObserver(queueName);

if (observer == null) {
LOG.trace("No handler to watch channel changes for queue: {}", queueName);
return;
}
final String event = envelope.getRoutingKey();

LOG.trace("Event: {} - queue: {}", event, queueName);
if (CONSUMER_CREATED.equals(event)) {
observer.onDeltaNumberOfWorkersUpdate(1);
} else { // consumer.deleted is the only other possibility
observer.onDeltaNumberOfWorkersUpdate(-1);
}
LOG.trace("Event: {} - queue: {}", envelope.getRoutingKey(), queueName);
proxy.triggerWorkerQueueState(queueName);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void shutdown() {
private void updateMetrics() {
try {
metrics.forEach((q, wpm) -> {
final int size = wpm.getCurrentWorkerSize();
final int size = wpm.getReportedWorkerSize();
final int utilisation = wpm.getRunningWorkerSize();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,17 @@ private boolean startReplyConsumer(final Connection connection) throws IOExcepti
replyChannel.basicConsume(workerReplyQueue, true, workerReplyQueue, new DefaultConsumer(replyChannel) {
@Override
public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) {
workerFinishedHandlers.forEach(h -> h.onWorkerFinished(properties.getMessageId(), properties.getHeaders()));
workerFinishedHandlers.forEach(h -> handleWorkFinished(h, properties));
}
});
return true;
}

private void handleWorkFinished(final WorkerFinishedHandler handler, final BasicProperties properties) {
try {
handler.onWorkerFinished(properties.getMessageId(), properties.getHeaders());
} catch (final RuntimeException e) {
LOG.error("Runtime exception during handleWorkFinished of {}", handler.getClass(), e);
}
}
}
Loading