diff --git a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java index 630fb560f5e..e4381ca35c8 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java +++ b/activemq-broker/src/test/java/org/apache/activemq/bugs/AMQ7085Test.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { final Message toSend = session.createMessage(); toSend.setStringProperty("foo", "bar"); final MessageProducer producer = session.createProducer(queue); - producer.send(queue, toSend); + producer.send(toSend); } finally { conn.close(); } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 86008616759..abe48100211 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -609,6 +609,10 @@ public void start() throws JMSException { */ @Override public void stop() throws JMSException { + for (final ActiveMQSession session : sessions) { + session.checkNotInCompletionListenerCallback("stop"); + session.checkNotInMessageListenerCallback("stop"); + } doStop(true); } @@ -677,6 +681,10 @@ void doStop(boolean checkClosed) throws JMSException { */ @Override public void close() throws JMSException { + for (final ActiveMQSession session : sessions) { + session.checkNotInCompletionListenerCallback("close"); + session.checkNotInMessageListenerCallback("close"); + } try { // If we were running, lets stop first. if (!closed.get() && !transportFailed.get()) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index a6bf1b20952..1d7524e3889 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -1454,7 +1454,12 @@ public void dispatch(MessageDispatch md) { try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if (!expired) { - listener.onMessage(message); + session.messageListenerThread.set(Thread.currentThread()); + try { + listener.onMessage(message); + } finally { + session.messageListenerThread.set(null); + } } afterMessageIsConsumed(md, expired); } catch (RuntimeException e) { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java index fbd093f10f2..5197c100172 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java @@ -168,6 +168,7 @@ public Destination getDestination() throws JMSException { */ @Override public void close() throws JMSException { + session.checkNotInCompletionListenerCallback("close"); if (!closed) { dispose(); this.session.asyncSendPacket(info.createRemoveCommand()); @@ -197,6 +198,35 @@ protected void checkClosed() throws IllegalStateException { } } + @Override + public void send(Message message) throws JMSException { + checkClosed(); + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + super.send(message); + } + + @Override + public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { + checkClosed(); + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + super.send(message, deliveryMode, priority, timeToLive); + } + + @Override + public void send(Destination destination, Message message) throws JMSException { + checkClosed(); + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message) variants."); + } + super.send(destination, message); + } + /** * Sends a message to a destination for an unidentified message producer, * specifying delivery mode, priority and time to live. @@ -221,42 +251,217 @@ protected void checkClosed() throws IllegalStateException { */ @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { - this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback)null); + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.send(destination, message, deliveryMode, priority, timeToLive, (AsyncCallback) null); } /** + * Sends a message using the default delivery mode, priority and time to live, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *
Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * This is explicitly permitted by the JMS 2.0 specification (section 7.3). + * A future version may implement fully asynchronous sending; application code that follows + * the specification will be compatible with both behaviours. + * For high-throughput asynchronous sending outside the JMS specification, see + * {@link ActiveMQMessageProducer#send(Destination, Message, AsyncCallback)}. * * @param message the message to send - * @param CompletionListener to callback + * @param completionListener to callback * @throws JMSException if the JMS provider fails to send the message due to * some internal error. - * @throws UnsupportedOperationException if an invalid destination is - * specified. - * @throws InvalidDestinationException if a client uses this method with an - * invalid destination. + * @throws UnsupportedOperationException if called on an anonymous producer (no fixed destination) * @see jakarta.jms.Session#createProducer * @since 2.0 */ @Override public void send(Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + this.doSendWithCompletionListener(info.getDestination(), message, this.defaultDeliveryMode, + this.defaultPriority, this.defaultTimeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message with the specified delivery mode, priority and time to live, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *
Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on an anonymous producer (no fixed destination) + * @since 2.0 + */ @Override public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + checkClosed(); + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (info.getDestination() == null) { + throw new UnsupportedOperationException("A destination must be specified."); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.doSendWithCompletionListener(info.getDestination(), message, deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message to the specified destination using the default delivery mode, priority + * and time to live, notifying the specified {@code CompletionListener} when the send + * has completed. + * + *
Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on a producer with a fixed destination + * @throws InvalidDestinationException if a null destination is specified + * @since 2.0 + */ @Override public void send(Destination destination, Message message, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, CompletionListener) is not supported"); + checkClosed(); + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); + } + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message, + this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); } + /** + * Sends a message to the specified destination with the specified delivery mode, priority + * and time to live, notifying the specified {@code CompletionListener} when the send + * has completed. + * + *
Implementation note: the current ActiveMQ Classic implementation performs the + * send synchronously and then invokes the {@code CompletionListener} on a separate thread. + * See {@link #send(Message, CompletionListener)} for details. + * + * @param destination the destination to send this message to + * @param message the message to send + * @param deliveryMode the delivery mode to use + * @param priority the priority for this message + * @param timeToLive the message's lifetime (in milliseconds) + * @param completionListener to callback + * @throws JMSException if the JMS provider fails to send the message due to some internal error. + * @throws UnsupportedOperationException if called on a producer with a fixed destination + * @throws InvalidDestinationException if a null destination is specified + * @since 2.0 + */ @Override public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException { - throw new UnsupportedOperationException("send(Destination, Message, deliveryMode, priority, timetoLive, CompletionListener) is not supported"); + checkClosed(); + if (info.getDestination() != null) { + throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants."); + } + if (completionListener == null) { + throw new IllegalArgumentException("CompletionListener must not be null"); + } + if (destination == null) { + throw new InvalidDestinationException("Don't understand null destinations"); + } + validateDeliveryMode(deliveryMode); + validatePriority(priority); + this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message, + deliveryMode, priority, timeToLive, + getDisableMessageID(), getDisableMessageTimestamp(), completionListener); + } + + /** + * Sends a message to the specified destination with full control over delivery parameters, + * notifying the specified {@code CompletionListener} when the send has completed. + * + *
Implementation note: the current ActiveMQ Classic implementation performs the
+ * send synchronously and then invokes the {@code CompletionListener} on a separate thread.
+ * See {@link #send(Message, CompletionListener)} for details.
+ *
+ * @param destination the destination to send this message to
+ * @param message the message to send
+ * @param deliveryMode the delivery mode to use
+ * @param priority the priority for this message
+ * @param timeToLive the message's lifetime (in milliseconds)
+ * @param disableMessageID whether to disable setting the message ID
+ * @param disableMessageTimestamp whether to disable setting the message timestamp
+ * @param completionListener to callback
+ * @throws JMSException if the JMS provider fails to send the message due to some internal error.
+ * @throws UnsupportedOperationException if called on a producer with a fixed destination
+ * @throws InvalidDestinationException if a null destination is specified
+ */
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
+ boolean disableMessageID, boolean disableMessageTimestamp,
+ CompletionListener completionListener) throws JMSException {
+ checkClosed();
+ if (info.getDestination() != null) {
+ throw new UnsupportedOperationException("This producer was created with a specific destination. Use send(Message, CompletionListener) variants.");
+ }
+ if (completionListener == null) {
+ throw new IllegalArgumentException("CompletionListener must not be null");
+ }
+ if (destination == null) {
+ throw new InvalidDestinationException("Don't understand null destinations");
+ }
+ validateDeliveryMode(deliveryMode);
+ validatePriority(priority);
+ this.doSendWithCompletionListener(ActiveMQDestination.transform(destination), message,
+ deliveryMode, priority, timeToLive, disableMessageID, disableMessageTimestamp, completionListener);
+ }
+
+ private void doSendWithCompletionListener(final ActiveMQDestination dest, Message message,
+ final int deliveryMode, final int priority, final long timeToLive,
+ final boolean disableMessageID, final boolean disableMessageTimestamp,
+ final CompletionListener completionListener) throws JMSException {
+ if (dest == null) {
+ throw new JMSException("No destination specified");
+ }
+
+ if (transformer != null) {
+ final Message transformedMessage = transformer.producerTransform(session, this, message);
+ if (transformedMessage != null) {
+ message = transformedMessage;
+ }
+ }
+
+ if (producerWindow != null) {
+ try {
+ producerWindow.waitForSpace();
+ } catch (InterruptedException e) {
+ throw new JMSException("Send aborted due to thread interrupt.");
+ }
+ }
+
+ this.session.send(this, dest, message, deliveryMode, priority, timeToLive,
+ disableMessageID, disableMessageTimestamp, producerWindow, sendTimeout, completionListener);
+ stats.onMessage();
}
public void send(Message message, AsyncCallback onComplete) throws JMSException {
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
index 5816d70e30c..5e36a15f55f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageProducerSupport.java
@@ -336,6 +336,18 @@ public void send(Destination destination, Message message) throws JMSException {
protected abstract void checkClosed() throws IllegalStateException;
+ protected static void validateDeliveryMode(final int deliveryMode) throws JMSException {
+ if (deliveryMode != DeliveryMode.PERSISTENT && deliveryMode != DeliveryMode.NON_PERSISTENT) {
+ throw new JMSException("Invalid delivery mode: " + deliveryMode);
+ }
+ }
+
+ protected static void validatePriority(final int priority) throws JMSException {
+ if (priority < 0 || priority > 9) {
+ throw new JMSException("Invalid priority: " + priority + " (must be 0-9)");
+ }
+ }
+
/**
* @return the sendTimeout
*/
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
index abf74930242..ae574565e0e 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQProducer.java
@@ -57,6 +57,8 @@ public class ActiveMQProducer implements JMSProducer {
// Properties applied to all messages on a per-JMS producer instance basis
private Map
+ * The wire-level send is performed synchronously (inside sendMutex to preserve ordering). The
+ * CompletionListener is then invoked on a dedicated single-threaded executor, ensuring:
+ *
+ *
+ * The sender thread blocks until the send completes and the callback has been invoked.
+ */
+ protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message,
+ int deliveryMode, int priority, long timeToLive,
+ boolean disableMessageID, boolean disableMessageTimestamp,
+ MemoryUsage producerWindow, int sendTimeout,
+ CompletionListener completionListener) throws JMSException {
+
+ checkClosed();
+ if (destination.isTemporary() && connection.isDeleted(destination)) {
+ throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
+ }
+
+ final ActiveMQMessage msg;
+ final Message originalMessage = message;
+ JMSException sendException = null;
+
+ synchronized (sendMutex) {
+ doStartTransaction();
+ if (transactionContext.isRollbackOnly()) {
+ throw new IllegalStateException("transaction marked rollback only");
+ }
+ final TransactionId txid = transactionContext.getTransactionId();
+ final long sequenceNumber = producer.getMessageSequence();
+
+ // Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
+ message.setJMSDeliveryMode(deliveryMode);
+ final long timeStamp = System.currentTimeMillis();
+ final long expiration = timeToLive > 0 ? timeToLive + timeStamp : 0L;
+
+ if (!(message instanceof ActiveMQMessage)) {
+ setForeignMessageDeliveryTime(message, timeStamp);
+ } else {
+ message.setJMSDeliveryTime(timeStamp);
+ }
+ if (!disableMessageTimestamp && !producer.getDisableMessageTimestamp()) {
+ message.setJMSTimestamp(timeStamp);
+ } else {
+ message.setJMSTimestamp(0L);
+ }
+ message.setJMSExpiration(expiration);
+ message.setJMSPriority(priority);
+ message.setJMSRedelivered(false);
+
+ // Transform to our own message format
+ ActiveMQMessage amqMsg = ActiveMQMessageTransformation.transformMessage(message, connection);
+ amqMsg.setDestination(destination);
+ amqMsg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
+
+ // Propagate the message id and destination back to the original message
+ if (amqMsg != message) {
+ message.setJMSMessageID(amqMsg.getMessageId().toString());
+ message.setJMSDestination(destination);
+ }
+ amqMsg.setBrokerPath(null);
+ amqMsg.setTransactionId(txid);
+
+ // Always copy when sending async so the user can safely modify the message after send()
+ // returns without affecting the in-flight message
+ msg = (ActiveMQMessage) amqMsg.copy();
+ msg.setConnection(connection);
+ msg.onSend();
+ msg.setProducerId(msg.getMessageId().getProducerId());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(getSessionId() + " async sending message: " + msg);
+ }
+
+ // Perform the wire-level send synchronously while holding sendMutex.
+ // This ensures messages are delivered to the broker in send order.
+ // Capture any send failure so the callback can be invoked outside sendMutex,
+ // preventing a deadlock if the CompletionListener calls producer.send().
+ try {
+ this.connection.syncSendPacket(msg);
+ } catch (JMSException sendEx) {
+ sendException = sendEx;
+ }
+ }
+
+ // Both success and error callbacks are invoked outside sendMutex to avoid deadlock.
+ // A CompletionListener is allowed to call producer.send() which would re-acquire sendMutex.
+ if (sendException != null) {
+ final JMSException finalEx = sendException;
+ final Future> future = asyncSendExecutor.submit(() -> {
+ completionListenerThread.set(Thread.currentThread());
+ try {
+ completionListener.onException(originalMessage, finalEx);
+ } finally {
+ completionListenerThread.set(null);
+ }
+ });
+ awaitAsyncSendFuture(future, originalMessage, completionListener);
+ return;
+ }
+
+ // Send succeeded - invoke onCompletion on executor thread (not sender thread) per spec 7.3.8
+ final Future> future = asyncSendExecutor.submit(() -> {
+ completionListenerThread.set(Thread.currentThread());
+ try {
+ completionListener.onCompletion(originalMessage);
+ } catch (Exception e) {
+ // Per spec 7.3.2, exceptions thrown by the callback are swallowed
+ LOG.warn("CompletionListener.onCompletion threw an exception", e);
+ } finally {
+ completionListenerThread.set(null);
+ }
+ });
+ awaitAsyncSendFuture(future, originalMessage, completionListener);
+ }
+
+ private void awaitAsyncSendFuture(final Future> future, final Message originalMessage,
+ final CompletionListener completionListener) throws JMSException {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new JMSException("Async send interrupted while waiting for CompletionListener");
+ } catch (ExecutionException e) {
+ // Should not happen since we catch all exceptions inside the submitted task
+ LOG.warn("Unexpected error executing CompletionListener", e.getCause());
+ }
+ }
+
+ /**
+ * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a
+ * CompletionListener callback for this session, per Jakarta Messaging 3.1 spec section 7.3.5.
+ * The check is session-scoped: callbacks from other sessions on the same thread are unaffected.
+ */
+ void checkNotInCompletionListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
+ if (Thread.currentThread() == completionListenerThread.get()) {
+ throw new jakarta.jms.IllegalStateException(
+ "Cannot call " + operation + "() from within a CompletionListener callback");
+ }
+ }
+
+ /**
+ * Throws {@link jakarta.jms.IllegalStateException} if the current thread is executing a
+ * MessageListener.onMessage() callback for this session, per Jakarta Messaging spec section 4.4.
+ * The check is session-scoped: callbacks from other sessions on the same thread are unaffected.
+ */
+ void checkNotInMessageListenerCallback(final String operation) throws jakarta.jms.IllegalStateException {
+ if (Thread.currentThread() == messageListenerThread.get()) {
+ throw new jakarta.jms.IllegalStateException(
+ "Cannot call " + operation + "() from within a MessageListener callback");
+ }
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
index 66df1d4647a..57dbb2ae629 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java
@@ -495,8 +495,8 @@ public void onMessage(Message m) {
counter.incrementAndGet();
if (counter.get() == 2) {
sendDone.await();
- connection.close();
got2Done.countDown();
+ return; // Don't acknowledge - message stays unacked (CLIENT_ACK mode)
}
tm.acknowledge();
} catch (Throwable e) {
@@ -511,6 +511,8 @@ public void onMessage(Message m) {
// Wait for first 2 messages to arrive.
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+ // Close connection from main thread (spec: Connection.close() from MessageListener throws ISE)
+ connection.close();
// Re-start connection.
connection = (ActiveMQConnection)factory.createConnection();
@@ -584,8 +586,9 @@ public void onMessage(Message m) {
m.acknowledge();
if (counter.get() == 2) {
sendDone.await();
- connection.close();
got2Done.countDown();
+ // Don't call connection.close() from MessageListener - spec violation (throws ISE)
+ // Main thread will close the connection after this latch
}
} catch (Throwable e) {
e.printStackTrace();
@@ -599,6 +602,8 @@ public void onMessage(Message m) {
// Wait for first 2 messages to arrive.
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
+ // Close connection from main thread (spec: Connection.close() from MessageListener throws ISE)
+ connection.close();
// Re-start connection.
connection = (ActiveMQConnection)factory.createConnection();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
index 901fbca7e64..2f6345d0781 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserExpirationTest.java
@@ -130,7 +130,7 @@ public void testDoNotReceiveExpiredMessage() throws Exception {
producer.setTimeToLive(WAIT_TIME);
TextMessage message = session.createTextMessage("Test message");
- producer.send(producerQueue, message);
+ producer.send(message);
int count = getMessageCount(producerQueue, session);
assertEquals(1, count);
@@ -165,6 +165,9 @@ private int browse(ActiveMQQueue queue, Connection connection) throws JMSExcepti
int browsed = 0;
while (enumeration.hasMoreElements()) {
TextMessage m = (TextMessage) enumeration.nextElement();
+ if (m == null) {
+ continue; // message expired during browse
+ }
browsed++;
LOG.debug("B[{}]: {}", browsed, m.getText());
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
index 6c1a3e540ad..0de8434a885 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyMemoryUsageTest.java
@@ -105,7 +105,7 @@ public void testMemoryUsageBodyIncrease() throws Exception {
BytesMessage sendMessageP = session.createBytesMessage();
byte[] origBody = new byte[1*1024];
sendMessageP.writeBytes(origBody);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
}
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
@@ -127,7 +127,7 @@ public void testMemoryUsageBodyDecrease() throws Exception {
BytesMessage sendMessageP = session.createBytesMessage();
byte[] origBody = new byte[1*1024*1024];
sendMessageP.writeBytes(origBody);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
}
QueueViewMBean queueViewMBean = getProxyToQueue(queueName);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
index 5537037cb74..267bd7a5a59 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageInterceptorStrategyTest.java
@@ -99,11 +99,11 @@ public void testForceDeliveryModePersistent() throws Exception {
Queue queue = createQueue("mis.forceDeliveryMode.true");
Message sendMessageP = session.createTextMessage("forceDeliveryMode=true");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=true");
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(queue, sendMessageNP);
+ producer.send(sendMessageNP);
queueBrowser = session.createBrowser(queue);
Enumeration> browseEnumeration = queueBrowser.getEnumeration();
@@ -127,11 +127,11 @@ public void testForceDeliveryModeNonPersistent() throws Exception {
Queue queue = createQueue("mis.forceDeliveryMode.false");
Message sendMessageP = session.createTextMessage("forceDeliveryMode=false");
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
Message sendMessageNP = session.createTextMessage("forceDeliveryMode=false");
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- producer.send(queue, sendMessageNP);
+ producer.send(sendMessageNP);
queueBrowser = session.createBrowser(queue);
Enumeration> browseEnumeration = queueBrowser.getEnumeration();
@@ -155,7 +155,7 @@ public void testForceExpirationDisabled() throws Exception {
Queue queue = createQueue("mis.forceExpiration.zero");
Message sendMessageP = session.createTextMessage("expiration=zero");
producer.setTimeToLive(0l);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration> browseEnumeration = queueBrowser.getEnumeration();
@@ -181,7 +181,7 @@ public void testForceExpirationZeroOverride() throws Exception {
Queue queue = createQueue("mis.forceExpiration.100k");
Message sendMessageP = session.createTextMessage("expiration=zero");
producer.setTimeToLive(100_000l);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration> browseEnumeration = queueBrowser.getEnumeration();
@@ -205,7 +205,7 @@ public void testForceExpirationZeroOverrideDLQ() throws Exception {
Queue queue = createQueue("mis.forceExpiration.zero-no-dlq-expiry");
Message sendMessageP = session.createTextMessage("expiration=zero-no-dlq-expiry");
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
Thread.sleep(250l);
@@ -245,7 +245,7 @@ public void testForceExpirationCeilingOverride() throws Exception {
Queue queue = createQueue("mis.forceExpiration.maxValue");
Message sendMessageP = session.createTextMessage("expiration=ceiling");
producer.setTimeToLive(expiryTime);
- producer.send(queue, sendMessageP);
+ producer.send(sendMessageP);
queueBrowser = session.createBrowser(queue);
Enumeration> browseEnumeration = queueBrowser.getEnumeration();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
index 59e3f15bace..1a8a09cf69f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3934Test.java
@@ -74,7 +74,7 @@ public void sendMessage() throws Exception {
final Destination queue = session.createQueue(TEST_QUEUE);
final Message toSend = session.createMessage();
final MessageProducer producer = session.createProducer(queue);
- producer.send(queue, toSend);
+ producer.send(toSend);
} finally {
conn.close();
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
index 26a8c4daa7e..43c0566de45 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4083Test.java
@@ -466,12 +466,12 @@ public void testConsumeExpiredQueueAndDlq() throws Exception {
String msgBody = new String(new byte[20*1024]);
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(msgBody);
- producerExpire.send(queue, message);
+ producerExpire.send(message);
}
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(msgBody);
- producerNormal.send(queue, message);
+ producerNormal.send(message);
}
ArrayList