From 8e367c56507cee774a3dd09ce433373401f38983 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 25 Feb 2026 22:15:25 +0100 Subject: [PATCH 1/9] feat(client): implement async send with CompletionListener support and enforce callback restrictions --- .../apache/activemq/ActiveMQConnection.java | 4 + .../activemq/ActiveMQMessageConsumer.java | 7 +- .../activemq/ActiveMQMessageProducer.java | 141 ++++++++++++- .../ActiveMQMessageProducerSupport.java | 12 ++ .../org/apache/activemq/ActiveMQProducer.java | 14 +- .../org/apache/activemq/ActiveMQSession.java | 188 +++++++++++++++++- 6 files changed, 351 insertions(+), 15 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 86008616759..bd75cb97811 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -609,6 +609,8 @@ public void start() throws JMSException { */ @Override public void stop() throws JMSException { + ActiveMQSession.checkNotInCompletionListenerCallback("stop"); + ActiveMQSession.checkNotInMessageListenerCallback("stop"); doStop(true); } @@ -677,6 +679,8 @@ void doStop(boolean checkClosed) throws JMSException { */ @Override public void close() throws JMSException { + ActiveMQSession.checkNotInCompletionListenerCallback("close"); + ActiveMQSession.checkNotInMessageListenerCallback("close"); try { // If we were running, lets stop first. if (!closed.get() && !transportFailed.get()) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a6bf1b20952..469e04d43e8 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1454,7 +1454,12 @@ public void dispatch(MessageDispatch md) { try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if (!expired) { - listener.onMessage(message); + ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(true); + try { + listener.onMessage(message); + } finally { + ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(false); + } } afterMessageIsConsumed(md, expired); } catch (RuntimeException e) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index fbd093f10f2..3db69c82c56 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -168,6 +168,7 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { + ActiveMQSession.checkNotInCompletionListenerCallback("close"); if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); @@ -197,6 +198,35 @@ protected void checkClosed() throws IllegalStateException { } } + @Override + public void send(Message message) throws JMSException { + checkClosed(); + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + super.send(message); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkClosed(); + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + super.send(message, deliveryMode, priority, timeToLive); + } + + @Override + public void send(Destination destination, Message message) throws JMSException { + checkClosed(); + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message) variants."); + } + super.send(destination, message); + } + /** * Sends a message to a destination for an unidentified message producer, * specifying delivery mode, priority and time to live. @@ -221,42 +251,133 @@ protected void checkClosed() throws IllegalStateException { */ @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null); + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback) null); } /** * * @param message the message to send - * @param CompletionListener to callback + * @param completionListener to callback * @throws JMSException if the JMS provider fails to send the message due to * some internal error. - * @throws UnsupportedOperationException if an invalid destination is - * specified. - * @throws InvalidDestinationException if a client uses this method with an - * invalid destination. + * @throws UnsupportedOperationException if called on an anonymous producer (no fixed destination) * @see jakarta.jms.Session#createProducer * @since 2.0 */ @Override public void send(Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + this.doSendWithCompletionListener(info.getDestination(), message, this.defaultDeliveryMode, + this.defaultPriority, this.defaultTimeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.doSendWithCompletionListener(info.getDestination(), message, deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); + } + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message, + this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); + } + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message, + deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); + } + + public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, + CompletionListener completionListener) throws JMSException { + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); + } + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message, + deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, completionListener); + } + + private void doSendWithCompletionListener(final ActiveMQDestination dest, Message message, + final int deliveryMode, final int priority, final long timeToLive, + final boolean disableMessageID, final boolean disableMessageTimestamp, + final CompletionListener completionListener) throws JMSException { + if (dest == null) { + throw new JMSException("No destination specified"); + } + + if (transformer != null) { + final Message transformedMessage = transformer.producerTransform(session, this, message); + if (transformedMessage != null) { + message = transformedMessage; + } + } + + if (producerWindow != null) { + try { + producerWindow.waitForSpace(); + } catch (InterruptedException e) { + throw new JMSException("Send aborted due to thread interrupt."); + } + } + + this.session.send(this, dest, message, deliveryMode, priority, timeToLive, + disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, completionListener); + stats.onMessage(); } public void send(Message message, AsyncCallback onComplete) throws JMSException { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java index 5816d70e30c..5e36a15f55f 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java @@ -336,6 +336,18 @@ public void send(Destination destination, Message message) throws JMSException { protected abstract void checkClosed() throws IllegalStateException; + protected static void validateDeliveryMode(final int deliveryMode) throws JMSException { + if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) { + throw new JMSException("Invalid delivery mode: " + deliveryMode); + } + } + + protected static void validatePriority(final int priority) throws JMSException { + if (priority < 0 || priority > 9) { + throw new JMSException("Invalid priority: " + priority + " (must be 0-9)"); + } + } + /** * @return the sendTimeout */ diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java index abf74930242..ae574565e0e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java @@ -57,6 +57,8 @@ public class ActiveMQProducer implements JMSProducer { // Properties applied to all messages on a per-JMS producer instance basis private Map messageProperties = null; + private CompletionListener completionListener = null; + ActiveMQProducer(ActiveMQContext activemqContext, ActiveMQMessageProducer activemqMessageProducer) { this.activemqContext = activemqContext; this.activemqMessageProducer = activemqMessageProducer; @@ -90,7 +92,12 @@ public JMSProducer send(Destination destination, Message message) { } } - activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), null); + if (completionListener != null) { + activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); + } else { + activemqMessageProducer.send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive(), getDisableMessageID(), getDisableMessageTimestamp(), (AsyncCallback) null); + } } catch (JMSException e) { throw JMSExceptionSupport.convertToJMSRuntimeException(e); } @@ -256,12 +263,13 @@ public long getDeliveryDelay() { @Override public JMSProducer setAsync(CompletionListener completionListener) { - throw new UnsupportedOperationException("setAsync(CompletionListener) is not supported"); + this.completionListener = completionListener; + return this; } @Override public CompletionListener getAsync() { - throw new UnsupportedOperationException("getAsync() is not supported"); + return this.completionListener; } @Override diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index dc206850415..43d3166f25e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -26,13 +26,20 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import jakarta.jms.BytesMessage; +import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; +import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.InvalidSelectorException; import jakarta.jms.JMSException; @@ -236,6 +243,20 @@ public static interface DeliveryListener { private BlobTransferPolicy blobTransferPolicy; private long lastDeliveredSequenceId = -2; + // Single-threaded executor for async send: ensures one CompletionListener callback at a time + // and that callbacks are invoked in the same order as the corresponding send calls + // per Jakarta Messaging 3.1 spec section 7.3.8 + private final ExecutorService asyncSendExecutor = Executors.newSingleThreadExecutor( + r -> new Thread(r, "ActiveMQ async-send")); + + // Set to true on the executor thread while a CompletionListener callback is executing. + // Used to detect illegal session operations (close/commit/rollback) from within a callback. + static final ThreadLocal IN_COMPLETION_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false); + + // Set to true on the dispatch thread while a MessageListener.onMessage() callback is executing. + // Used to detect illegal connection/session operations from within a MessageListener callback. + static final ThreadLocal IN_MESSAGE_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false); + /** * Construct the Session * @@ -577,6 +598,7 @@ public int getAcknowledgeMode() throws JMSException { @Override public void commit() throws JMSException { checkClosed(); + checkNotInCompletionListenerCallback("commit"); if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } @@ -598,6 +620,7 @@ public void commit() throws JMSException { @Override public void rollback() throws JMSException { checkClosed(); + checkNotInCompletionListenerCallback("rollback"); if (!getTransacted()) { throw new jakarta.jms.IllegalStateException("Not a transacted session"); } @@ -637,6 +660,8 @@ public void rollback() throws JMSException { */ @Override public void close() throws JMSException { + checkNotInCompletionListenerCallback("close"); + checkNotInMessageListenerCallback("close"); if (!closed) { if (getTransactionContext().isInXATransaction()) { if (!synchronizationRegistered) { @@ -725,6 +750,15 @@ public synchronized void dispose() throws JMSException { if (!closed) { try { + // Shutdown async send executor and wait for any in-progress callbacks to finish + // per Jakarta Messaging 3.1 spec section 7.3.5 + asyncSendExecutor.shutdown(); + try { + asyncSendExecutor.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.debug("Interrupted while waiting for async send executor to terminate", e); + } executor.close(); for (Iterator iter = consumers.iterator(); iter.hasNext();) { @@ -1054,7 +1088,12 @@ public void run() { } LOG.trace("{} onMessage({})", this, message.getMessageId()); - messageListener.onMessage(message); + IN_MESSAGE_LISTENER_CALLBACK.set(true); + try { + messageListener.onMessage(message); + } finally { + IN_MESSAGE_LISTENER_CALLBACK.set(false); + } } catch (Throwable e) { if (!isClosed()) { @@ -2362,4 +2401,151 @@ private static void setForeignMessageDeliveryTime(final Message foreignMessage, foreignMessage.setJMSDeliveryTime(deliveryTime); } } + + /** + * Sends a message with a CompletionListener for async notification per Jakarta Messaging 3.1 spec section 7.3. + *

+ * The wire-level send is performed synchronously (inside sendMutex to preserve ordering). The + * CompletionListener is then invoked on a dedicated single-threaded executor, ensuring: + *

    + *
  • Callbacks are not called on the sender's thread (spec 7.3.8)
  • + *
  • Only one callback executes at a time (spec 7.3.8)
  • + *
  • Callbacks are in the same order as the corresponding send calls (spec 7.3.8)
  • + *
+ * The sender thread blocks until the send completes and the callback has been invoked. + */ + protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, + int deliveryMode, int priority, long timeToLive, + boolean disableMessageID, boolean disableMessageTimestamp, + MemoryUsage producerWindow, int sendTimeout, + CompletionListener completionListener) throws JMSException { + + checkClosed(); + if (destination.isTemporary() && connection.isDeleted(destination)) { + throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); + } + + final ActiveMQMessage msg; + final Message originalMessage = message; + + synchronized (sendMutex) { + doStartTransaction(); + if (transactionContext.isRollbackOnly()) { + throw new IllegalStateException("transaction marked rollback only"); + } + final TransactionId txid = transactionContext.getTransactionId(); + final long sequenceNumber = producer.getMessageSequence(); + + // Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 + message.setJMSDeliveryMode(deliveryMode); + final long timeStamp = System.currentTimeMillis(); + final long expiration = timeToLive > 0 ? timeToLive + timeStamp : 0L; + + if (!(message instanceof ActiveMQMessage)) { + setForeignMessageDeliveryTime(message, timeStamp); + } else { + message.setJMSDeliveryTime(timeStamp); + } + if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) { + message.setJMSTimestamp(timeStamp); + } else { + message.setJMSTimestamp(0L); + } + message.setJMSExpiration(expiration); + message.setJMSPriority(priority); + message.setJMSRedelivered(false); + + // Transform to our own message format + ActiveMQMessage amqMsg = ActiveMQMessageTransformation.transformMessage(message, connection); + amqMsg.setDestination(destination); + amqMsg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); + + // Propagate the message id and destination back to the original message + if (amqMsg != message) { + message.setJMSMessageID(amqMsg.getMessageId().toString()); + message.setJMSDestination(destination); + } + amqMsg.setBrokerPath(null); + amqMsg.setTransactionId(txid); + + // Always copy when sending async so the user can safely modify the message after send() + // returns without affecting the in-flight message + msg = (ActiveMQMessage) amqMsg.copy(); + msg.setConnection(connection); + msg.onSend(); + msg.setProducerId(msg.getMessageId().getProducerId()); + + if (LOG.isTraceEnabled()) { + LOG.trace(getSessionId() + " async sending message: " + msg); + } + + // Perform the wire-level send synchronously while holding sendMutex. + // This ensures messages are delivered to the broker in send order. + try { + this.connection.syncSendPacket(msg); + } catch (JMSException sendEx) { + // Send failed - invoke onException on executor thread (not sender thread) + final Future future = asyncSendExecutor.submit(() -> { + IN_COMPLETION_LISTENER_CALLBACK.set(true); + try { + completionListener.onException(originalMessage, sendEx); + } finally { + IN_COMPLETION_LISTENER_CALLBACK.set(false); + } + }); + awaitAsyncSendFuture(future, originalMessage, completionListener); + return; + } + } + + // Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8 + final Future future = asyncSendExecutor.submit(() -> { + IN_COMPLETION_LISTENER_CALLBACK.set(true); + try { + completionListener.onCompletion(originalMessage); + } catch (Exception e) { + // Per spec 7.3.2, exceptions thrown by the callback are swallowed + LOG.warn("CompletionListener.onCompletion threw an exception", e); + } finally { + IN_COMPLETION_LISTENER_CALLBACK.set(false); + } + }); + awaitAsyncSendFuture(future, originalMessage, completionListener); + } + + private void awaitAsyncSendFuture(final Future future, final Message originalMessage, + final CompletionListener completionListener) throws JMSException { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new JMSException("Async send interrupted while waiting for CompletionListener"); + } catch (ExecutionException e) { + // Should not happen since we catch all exceptions inside the submitted task + LOG.warn("Unexpected error executing CompletionListener", e.getCause()); + } + } + + /** + * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a + * CompletionListener callback, per Jakarta Messaging 3.1 spec section 7.3.5. + * The classic JMS API uses checked IllegalStateException (not the runtime variant). + */ + static void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { + if (Boolean.TRUE.equals(IN_COMPLETION_LISTENER_CALLBACK.get())) { + throw new jakarta.jms.IllegalStateException( + "Cannot call " + operation + "() from within a CompletionListener callback"); + } + } + + /** + * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a + * MessageListener.onMessage() callback, per Jakarta Messaging spec section 4.4. + */ + static void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { + if (Boolean.TRUE.equals(IN_MESSAGE_LISTENER_CALLBACK.get())) { + throw new jakarta.jms.IllegalStateException( + "Cannot call " + operation + "() from within a MessageListener callback"); + } + } } From 3bc5f45df59706ed19e049a96523d0545ea3f94b Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 25 Feb 2026 22:51:41 +0100 Subject: [PATCH 2/9] fix(client): simplify MessageProducer send calls by removing redundant destination parameter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit calling send(destination, message) on fixed-destination producers — a JMS spec violation that the old code silently allowed --- .../src/test/java/org/apache/activemq/bugs/AMQ7085Test.java | 2 +- .../src/test/java/org/apache/activemq/bugs/AMQ4361Test.java | 2 +- .../src/test/java/org/apache/activemq/bugs/AMQ6059Test.java | 2 +- .../org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java | 4 ++-- .../activemq/transport/failover/InitalReconnectDelayTest.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java index 630fb560f5e..e4381ca35c8 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java +++ b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { final Message toSend = session.createMessage(); toSend.setStringProperty("foo", "bar"); final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); + producer.send(toSend); } finally { conn.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java index f8c49e01aff..a2c57b01ade 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java @@ -113,7 +113,7 @@ public void run() { lastLoop.set(System.currentTimeMillis()); ObjectMessage objMsg = session.createObjectMessage(); objMsg.setObject(data); - producer.send(destination, objMsg); + producer.send(objMsg); } } catch (Exception e) { publishException.set(e); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java index 8fbc6337a07..7b8eb1951f6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java @@ -176,7 +176,7 @@ private void sendMessage(Destination destination) throws Exception { connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer producer = session.createProducer(destination); - producer.send(destination, session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000); + producer.send(session.createTextMessage("DLQ message"), DeliveryMode.PERSISTENT, 4, 1000); connection.stop(); LOG.info("### Send message that will expire."); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java index 2f59fcf3b2a..13d5a0ad52c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java @@ -112,7 +112,7 @@ public void run() { producer.setDeliveryMode(deliveryMode); for (int idx = 0; idx < toSend; ++idx) { Message message = session.createTextMessage(new String(buf) + idx); - producer.send(destination, message); + producer.send(message); messagesSent.incrementAndGet(); LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); } @@ -132,7 +132,7 @@ public void run() { producer.setDeliveryMode(deliveryMode); for (int idx = 0; idx < toSend; ++idx) { Message message = session.createTextMessage(new String(buf) + idx); - producer.send(destination, message); + producer.send(message); messagesSent.incrementAndGet(); LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java index c12cbf4755d..008f9c1eacc 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java @@ -129,7 +129,7 @@ public void transportResumed() { LOG.info("Attempting to send... failover should throw on disconnect"); try { - producer.send(destination, message); + producer.send(message); fail("Expect IOException to bubble up on send"); } catch (jakarta.jms.IllegalStateException producerClosed) { } From a7a147659ff893e06621dd87628f6b29ec774643 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 26 Feb 2026 01:53:38 +0100 Subject: [PATCH 3/9] fix(client): ensure CompletionListener is not null in send methods fix send calls to remove the destination --- .../activemq/ActiveMQMessageProducer.java | 18 +++++++++--------- .../org/apache/activemq/ActiveMQSession.java | 18 ------------------ ...sageInterceptorStrategyMemoryUsageTest.java | 4 ++-- .../policy/MessageInterceptorStrategyTest.java | 16 ++++++++-------- .../org/apache/activemq/bugs/AMQ3934Test.java | 2 +- .../org/apache/activemq/bugs/AMQ4530Test.java | 2 +- .../org/apache/activemq/bugs/AMQ4930Test.java | 2 +- .../org/apache/activemq/bugs/AMQ7270Test.java | 2 +- .../activemq/jms2/ActiveMQJMS2ContextTest.java | 2 +- .../activemq/jmx/OpenTypeSupportTest.java | 2 +- .../activemq/jmx/TotalMessageCountTest.java | 2 +- .../usecases/TopicDurableConnectStatsTest.java | 2 +- 12 files changed, 27 insertions(+), 45 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 3db69c82c56..3a99cc02198 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -299,12 +299,12 @@ public void send(Message message, int deliveryMode, int priority, long timeToLiv @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { checkClosed(); - if (completionListener == null) { - throw new IllegalArgumentException("CompletionListener must not be null"); - } if (info.getDestination() != null) { throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); } + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } if (destination == null) { throw new InvalidDestinationException("Don't understand null destinations"); } @@ -317,12 +317,12 @@ public void send(Destination destination, Message message, CompletionListener co public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { checkClosed(); - if (completionListener == null) { - throw new IllegalArgumentException("CompletionListener must not be null"); - } if (info.getDestination() != null) { throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); } + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } if (destination == null) { throw new InvalidDestinationException("Don't understand null destinations"); } @@ -337,12 +337,12 @@ public void send(Destination destination, Message message, int deliveryMode, int boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException { checkClosed(); - if (completionListener == null) { - throw new IllegalArgumentException("CompletionListener must not be null"); - } if (info.getDestination() != null) { throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); } + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } if (destination == null) { throw new InvalidDestinationException("Don't understand null destinations"); } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 43d3166f25e..4aaa6940693 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -1427,21 +1427,12 @@ public Topic createTopic(String topicName) throws JMSException { @Override public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException { checkClosed(); - if (topic == null) { - throw new InvalidDestinationException("Topic cannot be null"); - } throw new UnsupportedOperationException("createSharedConsumer(Topic, sharedSubscriptionName) is not supported"); } @Override public MessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException { checkClosed(); - if (topic == null) { - throw new InvalidDestinationException("Topic cannot be null"); - } - if (messageSelector != null && !messageSelector.trim().isEmpty()) { - SelectorParser.parse(messageSelector); - } throw new UnsupportedOperationException("createSharedConsumer(Topic, sharedSubscriptionName, messageSelector) is not supported"); } @@ -1460,21 +1451,12 @@ public MessageConsumer createDurableConsumer(Topic topic, String name, String me @Override public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException { checkClosed(); - if (topic == null) { - throw new InvalidDestinationException("Topic cannot be null"); - } throw new UnsupportedOperationException("createSharedDurableConsumer(Topic, name) is not supported"); } @Override public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException { checkClosed(); - if (topic == null) { - throw new InvalidDestinationException("Topic cannot be null"); - } - if (messageSelector != null && !messageSelector.trim().isEmpty()) { - SelectorParser.parse(messageSelector); - } throw new UnsupportedOperationException("createSharedDurableConsumer(Topic, name, messageSelector) is not supported"); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java index 6c1a3e540ad..0de8434a885 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java @@ -105,7 +105,7 @@ public void testMemoryUsageBodyIncrease() throws Exception { BytesMessage sendMessageP = session.createBytesMessage(); byte[] origBody = new byte[1*1024]; sendMessageP.writeBytes(origBody); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); } QueueViewMBean queueViewMBean = getProxyToQueue(queueName); @@ -127,7 +127,7 @@ public void testMemoryUsageBodyDecrease() throws Exception { BytesMessage sendMessageP = session.createBytesMessage(); byte[] origBody = new byte[1*1024*1024]; sendMessageP.writeBytes(origBody); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); } QueueViewMBean queueViewMBean = getProxyToQueue(queueName); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java index 5537037cb74..267bd7a5a59 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java @@ -99,11 +99,11 @@ public void testForceDeliveryModePersistent() throws Exception { Queue queue = createQueue("mis.forceDeliveryMode.true"); Message sendMessageP = session.createTextMessage("forceDeliveryMode=true"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); Message sendMessageNP = session.createTextMessage("forceDeliveryMode=true"); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(queue, sendMessageNP); + producer.send(sendMessageNP); queueBrowser = session.createBrowser(queue); Enumeration browseEnumeration = queueBrowser.getEnumeration(); @@ -127,11 +127,11 @@ public void testForceDeliveryModeNonPersistent() throws Exception { Queue queue = createQueue("mis.forceDeliveryMode.false"); Message sendMessageP = session.createTextMessage("forceDeliveryMode=false"); producer.setDeliveryMode(DeliveryMode.PERSISTENT); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); Message sendMessageNP = session.createTextMessage("forceDeliveryMode=false"); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(queue, sendMessageNP); + producer.send(sendMessageNP); queueBrowser = session.createBrowser(queue); Enumeration browseEnumeration = queueBrowser.getEnumeration(); @@ -155,7 +155,7 @@ public void testForceExpirationDisabled() throws Exception { Queue queue = createQueue("mis.forceExpiration.zero"); Message sendMessageP = session.createTextMessage("expiration=zero"); producer.setTimeToLive(0l); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); queueBrowser = session.createBrowser(queue); Enumeration browseEnumeration = queueBrowser.getEnumeration(); @@ -181,7 +181,7 @@ public void testForceExpirationZeroOverride() throws Exception { Queue queue = createQueue("mis.forceExpiration.100k"); Message sendMessageP = session.createTextMessage("expiration=zero"); producer.setTimeToLive(100_000l); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); queueBrowser = session.createBrowser(queue); Enumeration browseEnumeration = queueBrowser.getEnumeration(); @@ -205,7 +205,7 @@ public void testForceExpirationZeroOverrideDLQ() throws Exception { Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry"); Message sendMessageP = session.createTextMessage("expiration=zero-no-dlq-expiry"); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); Thread.sleep(250l); @@ -245,7 +245,7 @@ public void testForceExpirationCeilingOverride() throws Exception { Queue queue = createQueue("mis.forceExpiration.maxValue"); Message sendMessageP = session.createTextMessage("expiration=ceiling"); producer.setTimeToLive(expiryTime); - producer.send(queue, sendMessageP); + producer.send(sendMessageP); queueBrowser = session.createBrowser(queue); Enumeration browseEnumeration = queueBrowser.getEnumeration(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java index 59e3f15bace..1a8a09cf69f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java @@ -74,7 +74,7 @@ public void sendMessage() throws Exception { final Destination queue = session.createQueue(TEST_QUEUE); final Message toSend = session.createMessage(); final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); + producer.send(toSend); } finally { conn.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java index ee8499628c1..78c31e998c9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4530Test.java @@ -81,7 +81,7 @@ public void sendMessage() throws Exception { final Message toSend = session.createMessage(); toSend.setStringProperty(KEY, VALUE); final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); + producer.send(toSend); } finally { conn.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java index b5c4f87bc14..51b4905076a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java @@ -86,7 +86,7 @@ public void doTestBrowsePending(int deliveryMode) throws Exception { bytesMessage.writeBytes(new byte[messageSize]); for (int i = 0; i < messageCount; i++) { - producer.send(bigQueue, bytesMessage); + producer.send(bytesMessage); } final QueueViewMBean queueViewMBean = (QueueViewMBean) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java index 1291d3c6fc8..c080bc71643 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java @@ -70,7 +70,7 @@ public void testConcurrentCopyMatchingPageSizeOk() throws Exception { for (int i = 0; i < messageCount; i++) { bytesMessage.setIntProperty("id", i); - producer.send(activeMQQueue, bytesMessage); + producer.send(bytesMessage); } final QueueViewMBean queueViewMBean = (QueueViewMBean) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index b5f54679314..46a81baec46 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -304,7 +304,7 @@ public void testProducerSendMessageCompletionListener() throws JMSException { messageProducer.send(session.createQueue(methodNameDestinationName), null, (CompletionListener)null); } - @Test(expected = UnsupportedOperationException.class) + @Test(expected = IllegalArgumentException.class) public void testProducerSendMessageQoSParamsCompletionListener() throws JMSException { messageProducer.send(null, 1, 4, 0l, null); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/OpenTypeSupportTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/OpenTypeSupportTest.java index 1314618decd..06fddd5c1b8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/OpenTypeSupportTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/OpenTypeSupportTest.java @@ -80,7 +80,7 @@ private static void sendMessage() throws JMSException { BytesMessage toSend = session.createBytesMessage(); toSend.writeBytes(BYTESMESSAGE_TEXT.getBytes()); MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); + producer.send(toSend); } finally { conn.close(); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java index b641cf05bb4..949090aa24b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jmx/TotalMessageCountTest.java @@ -108,7 +108,7 @@ private void sendMessage() throws JMSException { Destination queue = session.createQueue(TESTQUEUE); TextMessage msg = session.createTextMessage("This is a message."); MessageProducer producer = session.createProducer(queue); - producer.send(queue, msg); + producer.send(msg); LOG.info("Message sent to " + TESTQUEUE); } finally { conn.close(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java index 2d169b6dfd6..d729ae12525 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicDurableConnectStatsTest.java @@ -190,7 +190,7 @@ public void testPendingTopicStat() throws Exception { TextMessage message = producerSessions.createTextMessage(createMessageText(i)); message.setJMSExpiration(0); message.setStringProperty("filter", "true"); - producer.send(topic, message); + producer.send(message); producerSessions.commit(); } From af4c1b43579e0b395c8fd08a0d0af5a653f2b825 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 26 Feb 2026 02:00:02 +0100 Subject: [PATCH 4/9] test: fix send calls to remove the destination --- .../org/apache/activemq/JmsQueueBrowserExpirationTest.java | 2 +- .../src/test/java/org/apache/activemq/bugs/AMQ4083Test.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java index 901fbca7e64..65d5db068ad 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java @@ -130,7 +130,7 @@ public void testDoNotReceiveExpiredMessage() throws Exception { producer.setTimeToLive(WAIT_TIME); TextMessage message = session.createTextMessage("Test message"); - producer.send(producerQueue, message); + producer.send(message); int count = getMessageCount(producerQueue, session); assertEquals(1, count); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java index 26a8c4daa7e..43c0566de45 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java @@ -466,12 +466,12 @@ public void testConsumeExpiredQueueAndDlq() throws Exception { String msgBody = new String(new byte[20*1024]); for (int i = 0; i < data.length; i++) { Message message = session.createTextMessage(msgBody); - producerExpire.send(queue, message); + producerExpire.send(message); } for (int i = 0; i < data.length; i++) { Message message = session.createTextMessage(msgBody); - producerNormal.send(queue, message); + producerNormal.send(message); } ArrayList messages = new ArrayList(); From cbd30895d782e26927baab5bc33ab0f3a70d663c Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 26 Feb 2026 13:06:27 +0100 Subject: [PATCH 5/9] fix(tests): handle connection closure and expired messages in JMS tests --- .../test/java/org/apache/activemq/JMSConsumerTest.java | 9 +++++++-- .../apache/activemq/JmsQueueBrowserExpirationTest.java | 3 +++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java index 66df1d4647a..57dbb2ae629 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -495,8 +495,8 @@ public void onMessage(Message m) { counter.incrementAndGet(); if (counter.get() == 2) { sendDone.await(); - connection.close(); got2Done.countDown(); + return; // Don't acknowledge - message stays unacked (CLIENT_ACK mode) } tm.acknowledge(); } catch (Throwable e) { @@ -511,6 +511,8 @@ public void onMessage(Message m) { // Wait for first 2 messages to arrive. assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + // Close connection from main thread (spec: Connection.close() from MessageListener throws ISE) + connection.close(); // Re-start connection. connection = (ActiveMQConnection)factory.createConnection(); @@ -584,8 +586,9 @@ public void onMessage(Message m) { m.acknowledge(); if (counter.get() == 2) { sendDone.await(); - connection.close(); got2Done.countDown(); + // Don't call connection.close() from MessageListener - spec violation (throws ISE) + // Main thread will close the connection after this latch } } catch (Throwable e) { e.printStackTrace(); @@ -599,6 +602,8 @@ public void onMessage(Message m) { // Wait for first 2 messages to arrive. assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + // Close connection from main thread (spec: Connection.close() from MessageListener throws ISE) + connection.close(); // Re-start connection. connection = (ActiveMQConnection)factory.createConnection(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java index 65d5db068ad..2f6345d0781 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java @@ -165,6 +165,9 @@ private int browse(ActiveMQQueue queue, Connection connection) throws JMSExcepti int browsed = 0; while (enumeration.hasMoreElements()) { TextMessage m = (TextMessage) enumeration.nextElement(); + if (m == null) { + continue; // message expired during browse + } browsed++; LOG.debug("B[{}]: {}", browsed, m.getText()); } From 04a4325ad7a04f6cbd46c312a0380d53c829ee39 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Tue, 3 Mar 2026 22:24:33 +0100 Subject: [PATCH 6/9] fix(client): handle send failures in async send to prevent deadlocks --- .../org/apache/activemq/ActiveMQSession.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 4aaa6940693..6d6308b5fdb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -2409,6 +2409,7 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin final ActiveMQMessage msg; final Message originalMessage = message; + JMSException sendException = null; synchronized (sendMutex) { doStartTransaction(); @@ -2463,23 +2464,31 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin // Perform the wire-level send synchronously while holding sendMutex. // This ensures messages are delivered to the broker in send order. + // Capture any send failure so the callback can be invoked outside sendMutex, + // preventing a deadlock if the CompletionListener calls producer.send(). try { this.connection.syncSendPacket(msg); } catch (JMSException sendEx) { - // Send failed - invoke onException on executor thread (not sender thread) - final Future future = asyncSendExecutor.submit(() -> { - IN_COMPLETION_LISTENER_CALLBACK.set(true); - try { - completionListener.onException(originalMessage, sendEx); - } finally { - IN_COMPLETION_LISTENER_CALLBACK.set(false); - } - }); - awaitAsyncSendFuture(future, originalMessage, completionListener); - return; + sendException = sendEx; } } + // Both success and error callbacks are invoked outside sendMutex to avoid deadlock. + // A CompletionListener is allowed to call producer.send() which would re-acquire sendMutex. + if (sendException != null) { + final JMSException finalEx = sendException; + final Future future = asyncSendExecutor.submit(() -> { + IN_COMPLETION_LISTENER_CALLBACK.set(true); + try { + completionListener.onException(originalMessage, finalEx); + } finally { + IN_COMPLETION_LISTENER_CALLBACK.set(false); + } + }); + awaitAsyncSendFuture(future, originalMessage, completionListener); + return; + } + // Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8 final Future future = asyncSendExecutor.submit(() -> { IN_COMPLETION_LISTENER_CALLBACK.set(true); From 9e21d33ba4c91de28f4d1529b188899b6397ccaf Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Thu, 5 Mar 2026 17:36:15 +0100 Subject: [PATCH 7/9] fix(client): update session checks to use an atomic reference to the thread instead of a global thread local We need the check to be session scoped and to JVM scoped per spec. --- .../apache/activemq/ActiveMQConnection.java | 12 ++++-- .../activemq/ActiveMQMessageConsumer.java | 4 +- .../activemq/ActiveMQMessageProducer.java | 2 +- .../org/apache/activemq/ActiveMQSession.java | 42 +++++++++---------- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index bd75cb97811..abe48100211 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -609,8 +609,10 @@ public void start() throws JMSException { */ @Override public void stop() throws JMSException { - ActiveMQSession.checkNotInCompletionListenerCallback("stop"); - ActiveMQSession.checkNotInMessageListenerCallback("stop"); + for (final ActiveMQSession session : sessions) { + session.checkNotInCompletionListenerCallback("stop"); + session.checkNotInMessageListenerCallback("stop"); + } doStop(true); } @@ -679,8 +681,10 @@ void doStop(boolean checkClosed) throws JMSException { */ @Override public void close() throws JMSException { - ActiveMQSession.checkNotInCompletionListenerCallback("close"); - ActiveMQSession.checkNotInMessageListenerCallback("close"); + for (final ActiveMQSession session : sessions) { + session.checkNotInCompletionListenerCallback("close"); + session.checkNotInMessageListenerCallback("close"); + } try { // If we were running, lets stop first. if (!closed.get() && !transportFailed.get()) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 469e04d43e8..1d7524e3889 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1454,11 +1454,11 @@ public void dispatch(MessageDispatch md) { try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if (!expired) { - ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(true); + session.messageListenerThread.set(Thread.currentThread()); try { listener.onMessage(message); } finally { - ActiveMQSession.IN_MESSAGE_LISTENER_CALLBACK.set(false); + session.messageListenerThread.set(null); } } afterMessageIsConsumed(md, expired); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 3a99cc02198..5b050d6b65b 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -168,7 +168,7 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { - ActiveMQSession.checkNotInCompletionListenerCallback("close"); + session.checkNotInCompletionListenerCallback("close"); if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 6d6308b5fdb..5e8ff3d23e0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -34,12 +34,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import jakarta.jms.BytesMessage; import jakarta.jms.CompletionListener; import jakarta.jms.Destination; import jakarta.jms.IllegalStateException; -import jakarta.jms.IllegalStateRuntimeException; import jakarta.jms.InvalidDestinationException; import jakarta.jms.InvalidSelectorException; import jakarta.jms.JMSException; @@ -65,7 +65,6 @@ import jakarta.jms.TopicSubscriber; import jakarta.jms.TransactionRolledBackException; -import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.blob.BlobDownloader; import org.apache.activemq.blob.BlobTransferPolicy; import org.apache.activemq.blob.BlobUploader; @@ -249,13 +248,13 @@ public static interface DeliveryListener { private final ExecutorService asyncSendExecutor = Executors.newSingleThreadExecutor( r -> new Thread(r, "ActiveMQ async-send")); - // Set to true on the executor thread while a CompletionListener callback is executing. - // Used to detect illegal session operations (close/commit/rollback) from within a callback. - static final ThreadLocal IN_COMPLETION_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false); + // Tracks the thread currently executing a CompletionListener callback for this session. + // Session-scoped (instance field) so checks only apply to this session's own callbacks, + // not to callbacks from other sessions running on the same thread. + final AtomicReference completionListenerThread = new AtomicReference<>(); - // Set to true on the dispatch thread while a MessageListener.onMessage() callback is executing. - // Used to detect illegal connection/session operations from within a MessageListener callback. - static final ThreadLocal IN_MESSAGE_LISTENER_CALLBACK = ThreadLocal.withInitial(() -> false); + // Tracks the thread currently executing a MessageListener.onMessage() callback for this session. + final AtomicReference messageListenerThread = new AtomicReference<>(); /** * Construct the Session @@ -1088,11 +1087,11 @@ public void run() { } LOG.trace("{} onMessage({})", this, message.getMessageId()); - IN_MESSAGE_LISTENER_CALLBACK.set(true); + messageListenerThread.set(Thread.currentThread()); try { messageListener.onMessage(message); } finally { - IN_MESSAGE_LISTENER_CALLBACK.set(false); + messageListenerThread.set(null); } } catch (Throwable e) { @@ -2478,11 +2477,11 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin if (sendException != null) { final JMSException finalEx = sendException; final Future future = asyncSendExecutor.submit(() -> { - IN_COMPLETION_LISTENER_CALLBACK.set(true); + completionListenerThread.set(Thread.currentThread()); try { completionListener.onException(originalMessage, finalEx); } finally { - IN_COMPLETION_LISTENER_CALLBACK.set(false); + completionListenerThread.set(null); } }); awaitAsyncSendFuture(future, originalMessage, completionListener); @@ -2491,14 +2490,14 @@ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destin // Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8 final Future future = asyncSendExecutor.submit(() -> { - IN_COMPLETION_LISTENER_CALLBACK.set(true); + completionListenerThread.set(Thread.currentThread()); try { completionListener.onCompletion(originalMessage); } catch (Exception e) { // Per spec 7.3.2, exceptions thrown by the callback are swallowed LOG.warn("CompletionListener.onCompletion threw an exception", e); } finally { - IN_COMPLETION_LISTENER_CALLBACK.set(false); + completionListenerThread.set(null); } }); awaitAsyncSendFuture(future, originalMessage, completionListener); @@ -2519,11 +2518,11 @@ private void awaitAsyncSendFuture(final Future future, final Message original /** * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a - * CompletionListener callback, per Jakarta Messaging 3.1 spec section 7.3.5. - * The classic JMS API uses checked IllegalStateException (not the runtime variant). + * CompletionListener callback for this session, per Jakarta Messaging 3.1 spec section 7.3.5. + * The check is session-scoped: callbacks from other sessions on the same thread are unaffected. */ - static void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { - if (Boolean.TRUE.equals(IN_COMPLETION_LISTENER_CALLBACK.get())) { + void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { + if (Thread.currentThread() == completionListenerThread.get()) { throw new jakarta.jms.IllegalStateException( "Cannot call " + operation + "() from within a CompletionListener callback"); } @@ -2531,10 +2530,11 @@ static void checkNotInCompletionListenerCallback(final String operation) throws /** * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a - * MessageListener.onMessage() callback, per Jakarta Messaging spec section 4.4. + * MessageListener.onMessage() callback for this session, per Jakarta Messaging spec section 4.4. + * The check is session-scoped: callbacks from other sessions on the same thread are unaffected. */ - static void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { - if (Boolean.TRUE.equals(IN_MESSAGE_LISTENER_CALLBACK.get())) { + void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException { + if (Thread.currentThread() == messageListenerThread.get()) { throw new jakarta.jms.IllegalStateException( "Cannot call " + operation + "() from within a MessageListener callback"); } From cf2fa9b3216efff22cf1e8faeeaf12dd2a3080f0 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Mon, 9 Mar 2026 10:41:54 +0100 Subject: [PATCH 8/9] fix(client): add missing recover() restriction check in CompletionListener callback --- .../org/apache/activemq/ActiveMQSession.java | 1 + .../jms2/ActiveMQJMS2ContextTest.java | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 5e8ff3d23e0..3fe319b1881 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -845,6 +845,7 @@ public boolean isClosed() { public void recover() throws JMSException { checkClosed(); + checkNotInCompletionListenerCallback("recover"); if (getTransacted()) { throw new IllegalStateException("This session is transacted"); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java index 46a81baec46..c7adfced516 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ContextTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.util.Enumeration; +import java.util.concurrent.atomic.AtomicReference; import jakarta.jms.CompletionListener; import jakarta.jms.Destination; @@ -319,6 +320,34 @@ public void testProducerSendDestinationMessageQosParamsCompletionListener() thro messageProducer.send(session.createQueue(methodNameDestinationName), null, 1, 4, 0l, null); } + /** + * Jakarta Messaging 3.1 spec section 7.3.8: calling recover() from within a CompletionListener + * callback must throw IllegalStateException. + */ + @Test + public void testRecoverThrowsIllegalStateFromCompletionListenerCallback() throws JMSException { + final AtomicReference callbackException = new AtomicReference<>(); + + messageProducer.send(session.createTextMessage("test"), new CompletionListener() { + @Override + public void onCompletion(final Message message) { + try { + session.recover(); + } catch (final Exception e) { + callbackException.set(e); + } + } + + @Override + public void onException(final Message message, final Exception exception) { + } + }); + + assertNotNull("recover() must throw from within CompletionListener callback", callbackException.get()); + assertTrue("recover() must throw IllegalStateException from within CompletionListener callback", + callbackException.get() instanceof jakarta.jms.IllegalStateException); + } + protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) { assertNotNull(jmsContext); JMSProducer jmsProducer = jmsContext.createProducer(); From ae1f26a150ec7eef7f888db86ef1043eec35cba0 Mon Sep 17 00:00:00 2001 From: Jean-Louis Monteiro Date: Wed, 11 Mar 2026 13:32:12 +0100 Subject: [PATCH 9/9] docs(client): add CompletionListener support documentation for various send methods --- .../activemq/ActiveMQMessageProducer.java | 84 +++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index 5b050d6b65b..5197c100172 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -257,6 +257,16 @@ public void send(Destination destination, Message message, int deliveryMode, int } /** + * Sends a message using the default delivery mode, priority and time to live, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *

Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * This is explicitly permitted by the JMS 2.0 specification (section 7.3). + * A future version may implement fully asynchronous sending; application code that follows + * the specification will be compatible with both behaviours. + * For high-throughput asynchronous sending outside the JMS specification, see + * {@link ActiveMQMessageProducer#send(Destination, Message, AsyncCallback)}. * * @param message the message to send * @param completionListener to callback @@ -280,6 +290,23 @@ public void send(Message message, CompletionListener completionListener) throws getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message with the specified delivery mode, priority and time to live, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *

Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on an anonymous producer (no fixed destination) + * @since 2.0 + */ @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { @@ -296,6 +323,23 @@ public void send(Message message, int deliveryMode, int priority, long timeToLiv getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message to the specified destination using the default delivery mode, priority + * and time to live, notifying the specified {@code CompletionListener} when the send + * has completed. + * + *

Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on a producer with a fixed destination + * @throws InvalidDestinationException if a null destination is specified + * @since 2.0 + */ @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { checkClosed(); @@ -313,6 +357,26 @@ public void send(Destination destination, Message message, CompletionListener co getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message to the specified destination with the specified delivery mode, priority + * and time to live, notifying the specified {@code CompletionListener} when the send + * has completed. + * + *

Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on a producer with a fixed destination + * @throws InvalidDestinationException if a null destination is specified + * @since 2.0 + */ @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { @@ -333,6 +397,26 @@ public void send(Destination destination, Message message, int deliveryMode, int getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message to the specified destination with full control over delivery parameters, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *

Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @param disableMessageID whether to disable setting the message ID + * @param disableMessageTimestamp whether to disable setting the message timestamp + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on a producer with a fixed destination + * @throws InvalidDestinationException if a null destination is specified + */ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, boolean disableMessageID, boolean disableMessageTimestamp, CompletionListener completionListener) throws JMSException {