From 4da062dabcc5004ecb73e50a92f7f152e9b0d398 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Mon, 27 Oct 2025 12:38:13 +0000 Subject: [PATCH 1/4] Update StagingQueueTargetQueuePair.java --- .../lowlevel/StagingQueueTargetQueuePair.java | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java index f005259a..0ba2ef7f 100644 --- a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java +++ b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java @@ -211,24 +211,28 @@ public void handleDeliveryToTargetQueueAck(final long deliveryTag, final boolean } if (multiple) { - final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( - deliveryTag, true - ); + synchronized (outstandingConfirms) { + final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( + deliveryTag, true + ); - LOGGER.trace("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", - confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), - targetQueue.getName()); + LOGGER.trace("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", + confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), + targetQueue.getName()); - stagingQueueChannel.basicAck(confirmed.lastKey(), true); - confirmed.clear(); + stagingQueueChannel.basicAck(confirmed.lastKey(), true); + confirmed.clear(); + } timeSinceLastDoneWork = Instant.now(); } else { - LOGGER.trace("Ack message source delivery {} from {} after publish confirm {} of message to {}", - outstandingConfirms.get(deliveryTag), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), - targetQueue.getName()); + synchronized (outstandingConfirms) { + LOGGER.trace("Ack message source delivery {} from {} after publish confirm {} of message to {}", + outstandingConfirms.get(deliveryTag), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), + targetQueue.getName()); - stagingQueueChannel.basicAck(outstandingConfirms.get(deliveryTag), false); - outstandingConfirms.remove(deliveryTag); + stagingQueueChannel.basicAck(outstandingConfirms.get(deliveryTag), false); + outstandingConfirms.remove(deliveryTag); + } timeSinceLastDoneWork = Instant.now(); } } @@ -256,18 +260,22 @@ public void handleDeliveryToTargetQueueNack(final long deliveryTag, final boolea targetQueue.getName()); if (multiple) { - final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( - deliveryTag, true - ); - - for(final Long messageDeliveryTagToNack: confirmed.values()) { - stagingQueueChannel.basicNack(messageDeliveryTagToNack, true, true); + synchronized (outstandingConfirms) { + final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( + deliveryTag, true + ); + + if (!confirmed.isEmpty()) { + stagingQueueChannel.basicNack(confirmed.lastEntry().getValue(), true, true); + } + + confirmed.clear(); } - - confirmed.clear(); } else { - stagingQueueChannel.basicNack(outstandingConfirms.get(deliveryTag), false, true); - outstandingConfirms.remove(deliveryTag); + synchronized (outstandingConfirms) { + stagingQueueChannel.basicNack(outstandingConfirms.get(deliveryTag), false, true); + outstandingConfirms.remove(deliveryTag); + } } } From 69ac7e13c0e408376b39949457a343a7d2387ce7 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Wed, 29 Oct 2025 11:37:46 +0000 Subject: [PATCH 2/4] Revert "Update StagingQueueTargetQueuePair.java" This reverts commit 4da062dabcc5004ecb73e50a92f7f152e9b0d398. --- .../lowlevel/StagingQueueTargetQueuePair.java | 54 ++++++++----------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java index 0ba2ef7f..f005259a 100644 --- a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java +++ b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java @@ -211,28 +211,24 @@ public void handleDeliveryToTargetQueueAck(final long deliveryTag, final boolean } if (multiple) { - synchronized (outstandingConfirms) { - final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( - deliveryTag, true - ); + final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( + deliveryTag, true + ); - LOGGER.trace("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", - confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), - targetQueue.getName()); + LOGGER.trace("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", + confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), + targetQueue.getName()); - stagingQueueChannel.basicAck(confirmed.lastKey(), true); - confirmed.clear(); - } + stagingQueueChannel.basicAck(confirmed.lastKey(), true); + confirmed.clear(); timeSinceLastDoneWork = Instant.now(); } else { - synchronized (outstandingConfirms) { - LOGGER.trace("Ack message source delivery {} from {} after publish confirm {} of message to {}", - outstandingConfirms.get(deliveryTag), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), - targetQueue.getName()); + LOGGER.trace("Ack message source delivery {} from {} after publish confirm {} of message to {}", + outstandingConfirms.get(deliveryTag), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), + targetQueue.getName()); - stagingQueueChannel.basicAck(outstandingConfirms.get(deliveryTag), false); - outstandingConfirms.remove(deliveryTag); - } + stagingQueueChannel.basicAck(outstandingConfirms.get(deliveryTag), false); + outstandingConfirms.remove(deliveryTag); timeSinceLastDoneWork = Instant.now(); } } @@ -260,22 +256,18 @@ public void handleDeliveryToTargetQueueNack(final long deliveryTag, final boolea targetQueue.getName()); if (multiple) { - synchronized (outstandingConfirms) { - final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( - deliveryTag, true - ); - - if (!confirmed.isEmpty()) { - stagingQueueChannel.basicNack(confirmed.lastEntry().getValue(), true, true); - } - - confirmed.clear(); + final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( + deliveryTag, true + ); + + for(final Long messageDeliveryTagToNack: confirmed.values()) { + stagingQueueChannel.basicNack(messageDeliveryTagToNack, true, true); } + + confirmed.clear(); } else { - synchronized (outstandingConfirms) { - stagingQueueChannel.basicNack(outstandingConfirms.get(deliveryTag), false, true); - outstandingConfirms.remove(deliveryTag); - } + stagingQueueChannel.basicNack(outstandingConfirms.get(deliveryTag), false, true); + outstandingConfirms.remove(deliveryTag); } } From 4a074b4821127e67d203a70dc3f36dc065af72e3 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Wed, 29 Oct 2025 11:50:18 +0000 Subject: [PATCH 3/4] Align the log levels for publish confirms and remove the loop for multiple nacks --- .../lowlevel/StagingQueueTargetQueuePair.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java index f005259a..31527517 100644 --- a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java +++ b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java @@ -215,7 +215,7 @@ public void handleDeliveryToTargetQueueAck(final long deliveryTag, final boolean deliveryTag, true ); - LOGGER.trace("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", + LOGGER.debug("Ack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), targetQueue.getName()); @@ -223,7 +223,7 @@ public void handleDeliveryToTargetQueueAck(final long deliveryTag, final boolean confirmed.clear(); timeSinceLastDoneWork = Instant.now(); } else { - LOGGER.trace("Ack message source delivery {} from {} after publish confirm {} of message to {}", + LOGGER.debug("Ack message source delivery {} from {} after publish confirm {} of message to {}", outstandingConfirms.get(deliveryTag), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), targetQueue.getName()); @@ -259,10 +259,11 @@ public void handleDeliveryToTargetQueueNack(final long deliveryTag, final boolea final ConcurrentNavigableMap confirmed = outstandingConfirms.headMap( deliveryTag, true ); - - for(final Long messageDeliveryTagToNack: confirmed.values()) { - stagingQueueChannel.basicNack(messageDeliveryTagToNack, true, true); - } + LOGGER.debug("Nack (multiple) message source delivery {} from {} after publish confirm {} of message to {}", + confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), + targetQueue.getName()); + + stagingQueueChannel.basicNack(confirmed.lastKey(), true, true); confirmed.clear(); } else { From 9b1a59bda1a7d565b2e4bc8a836eaaf60caee616 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Wed, 29 Oct 2025 11:52:34 +0000 Subject: [PATCH 4/4] Update StagingQueueTargetQueuePair.java --- .../lowlevel/StagingQueueTargetQueuePair.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java index 31527517..1d743d5c 100644 --- a/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java +++ b/worker-message-prioritization-distribution/src/main/java/com/github/workerframework/workermessageprioritization/redistribution/lowlevel/StagingQueueTargetQueuePair.java @@ -263,6 +263,12 @@ public void handleDeliveryToTargetQueueNack(final long deliveryTag, final boolea confirmed.lastKey(), stagingQueue.getName(), outstandingConfirms.get(deliveryTag), targetQueue.getName()); + /// Using stagingQueueChannel.basicNack(confirmed.lastKey(), true, true); is preferred over looping and nacking each message individually because: + /// Efficiency: The multiple=true flag tells RabbitMQ to nack all messages up to and including lastKey in a single network call, reducing protocol overhead and improving performance. + /// Atomicity: It ensures all relevant messages are nacked together, avoiding race conditions or partial failures that could occur if nacking in a loop. + /// Simplicity: The code is simpler, easier to read, and less error-prone than managing a loop and multiple nack calls. + /// Consistency: This approach matches the semantics of how RabbitMQ delivers multiple acks/nacks, ensuring message order and state are handled as expected. + /// In summary, using the multiple flag with a single nack is more efficient, reliable, and idiomatic for RabbitMQ. stagingQueueChannel.basicNack(confirmed.lastKey(), true, true); confirmed.clear();