From df20da32638c93253ce2075af85a201ee7c2d374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Mon, 9 Mar 2026 20:56:02 +0100 Subject: [PATCH 1/3] Add requestTimeout to ConnectionFactory to prevent hanging threads in syncSendPacket() When the broker becomes unreachable without the TCP connection being properly closed (network partition, half-open connection), FutureResponse.getResult() calls ArrayBlockingQueue.take() which blocks indefinitely, causing threads to hang forever. Introduce a configurable requestTimeout (default 0, no timeout) on ActiveMQConnectionFactory and ActiveMQConnection, similar to the existing sendTimeout. When set, syncSendPacket(Command) uses poll(timeout) instead of take(), throwing RequestTimedOutIOException when the timeout expires. Can be configured programmatically via factory.setRequestTimeout(ms) or via URL parameter jms.requestTimeout=ms. --- .../apache/activemq/ActiveMQConnection.java | 17 +- .../activemq/ActiveMQConnectionFactory.java | 17 ++ .../apache/activemq/TransactionContext.java | 2 +- .../activemq/SyncSendPacketTimeoutTest.java | 156 ++++++++++++++++++ 4 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java index 86008616759..9c0cee7bbf4 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -158,6 +158,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon private boolean watchTopicAdvisories = true; private long warnAboutUnstartedConnectionTimeout = 500L; private int sendTimeout =0; + private int requestTimeout =0; private boolean sendAcksAsync=true; private boolean checkForDuplicates = true; private boolean queueOnlyConnection = false; @@ -1509,7 +1510,7 @@ public Response syncSendPacket(Command command, int timeout) throws JMSException * @throws JMSException */ public Response syncSendPacket(Command command) throws JMSException { - return syncSendPacket(command, 0); + return syncSendPacket(command, requestTimeout); } /** @@ -1838,6 +1839,20 @@ public void setSendTimeout(int sendTimeout) { this.sendTimeout = sendTimeout; } + /** + * @return the requestTimeout (in milliseconds) + */ + public int getRequestTimeout() { + return requestTimeout; + } + + /** + * @param requestTimeout the requestTimeout to set (in milliseconds) + */ + public void setRequestTimeout(int requestTimeout) { + this.requestTimeout = requestTimeout; + } + /** * @return the sendAcksAsync */ diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index ae57d2624b6..14ee96a80bb 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -141,6 +141,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE; private long warnAboutUnstartedConnectionTimeout = 500L; private int sendTimeout = 0; + private int requestTimeout = 0; private int connectResponseTimeout = 0; private boolean sendAcksAsync=true; private TransportListener transportListener; @@ -434,6 +435,7 @@ protected void configureConnection(ActiveMQConnection connection) throws JMSExce connection.setProducerWindowSize(getProducerWindowSize()); connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout()); connection.setSendTimeout(getSendTimeout()); + connection.setRequestTimeout(getRequestTimeout()); connection.setCloseTimeout(getCloseTimeout()); connection.setSendAcksAsync(isSendAcksAsync()); connection.setAuditDepth(getAuditDepth()); @@ -738,6 +740,20 @@ public void setSendTimeout(int sendTimeout) { this.sendTimeout = sendTimeout; } + /** + * @return the requestTimeout (in milliseconds) + */ + public int getRequestTimeout() { + return requestTimeout; + } + + /** + * @param requestTimeout the requestTimeout to set (in milliseconds) + */ + public void setRequestTimeout(int requestTimeout) { + this.requestTimeout = requestTimeout; + } + /** * @return the sendAcksAsync */ @@ -864,6 +880,7 @@ public void populateProperties(Properties props) { props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend())); props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize())); props.setProperty("sendTimeout", Integer.toString(getSendTimeout())); + props.setProperty("requestTimeout", Integer.toString(getRequestTimeout())); props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout())); props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync())); props.setProperty("auditDepth", Integer.toString(getAuditDepth())); diff --git a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java index 652eeba3b14..4166f35af5c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java @@ -281,7 +281,7 @@ public void rollback() throws JMSException { TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); this.transactionId = null; //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 - this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); + this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : this.connection.getRequestTimeout()); // Notify the listener that the tx was rolled back if (localTransactionEventListener != null) { localTransactionEventListener.rollbackEvent(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java new file mode 100644 index 00000000000..f03b03e2bdf --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.transport.RequestTimedOutIOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that {@link ActiveMQConnection#syncSendPacket(org.apache.activemq.command.Command)} + * applies the configured {@code requestTimeout}. + */ +public class SyncSendPacketTimeoutTest { + + private BrokerService broker; + private String brokerUrl; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testRequestTimeoutDefaultIsZero() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + assertEquals("Default requestTimeout should be 0", 0, connection.getRequestTimeout()); + } + } + + @Test + public void testRequestTimeoutConfiguredViaFactory() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + factory.setRequestTimeout(5000); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + assertEquals("requestTimeout should be propagated from factory", 5000, connection.getRequestTimeout()); + } + } + + @Test + public void testSyncSendPacketSucceedsWithRequestTimeout() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + factory.setRequestTimeout(5000); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + connection.start(); + // Creating a session triggers syncSendPacket internally — should succeed within timeout + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Creating a consumer triggers syncSendPacket internally — should succeed within timeout + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) { + assertNotNull("Session should be created successfully", session); + assertNotNull("Consumer should be created successfully", consumer); + } + } + } + + @Test + public void testSyncSendPacketSucceedsWithoutRequestTimeout() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + // requestTimeout=0 means no timeout (default) + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + assertNotNull("Session should be created successfully with no timeout", session); + } + } + } + + @Test + public void testRequestTimeoutConfiguredViaUrl() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl + "?jms.requestTimeout=3000"); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + assertEquals("requestTimeout should be set via URL parameter", 3000, connection.getRequestTimeout()); + } + } + + @Test + public void testSyncSendPacketFailFromTimeout() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + // Set to super short 1 millisecond so we always time out + factory.setRequestTimeout(1); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + Exception exception = null; + try { + connection.start(); + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue("test"))) { + assertNotNull("Consumer should be created successfully", consumer); + } + fail("Expected JMSException due to request timeout"); + } catch (JMSException expected) { + exception = expected; + } + assertEquals(RequestTimedOutIOException.class, + TransportConnector.getRootCause(exception).getClass()); + } + } + + @Test + public void testSyncSendPacketOverrideDefaultRequestTimeout() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); + try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + connection.start(); + ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // After session creation set the timeout default to be very short to test that + // overriding directly works + connection.setRequestTimeout(1); + ConsumerInfo info = new ConsumerInfo(session.getSessionInfo(), + session.getNextConsumerId().getValue()); + info.setDestination(new ActiveMQQueue("test")); + // Send info packet with timeout override + assertNotNull("Consumer should be created successfully with no timeout", + connection.syncSendPacket(info, 5000)); + } + } +} From cd13ad951a1efae5bb4700f76d8ccbfdc11c8157 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Tue, 10 Mar 2026 19:33:42 +0100 Subject: [PATCH 2/3] Add consumer creation in without timeout test --- .../apache/activemq/SyncSendPacketTimeoutTest.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java index f03b03e2bdf..003017ccd31 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java @@ -84,10 +84,12 @@ public void testSyncSendPacketSucceedsWithRequestTimeout() throws Exception { factory.setRequestTimeout(5000); try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { connection.start(); - // Creating a session triggers syncSendPacket internally — should succeed within timeout + // Creating a session triggers syncSendPacket internally — should succeed within + // timeout try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Creating a consumer triggers syncSendPacket internally — should succeed within timeout - MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) { + // Creating a consumer triggers syncSendPacket internally — should succeed + // within timeout + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) { assertNotNull("Session should be created successfully", session); assertNotNull("Consumer should be created successfully", consumer); } @@ -100,8 +102,12 @@ public void testSyncSendPacketSucceedsWithoutRequestTimeout() throws Exception { // requestTimeout=0 means no timeout (default) try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { connection.start(); - try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) { + try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + // Creating a consumer triggers syncSendPacket internally — should succeed + // no timeout + MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) { assertNotNull("Session should be created successfully with no timeout", session); + assertNotNull("Consumer should be created successfully", consumer); } } } From 53393851d9cf2adfbdddb25ba9d0180d915eba3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?JB=20Onofr=C3=A9?= Date: Fri, 13 Mar 2026 09:43:26 +0100 Subject: [PATCH 3/3] Fix flaky testSyncSendPacketFailFromTimeout by using BrokerPlugin to delay responses Use a BrokerFilter that delays addConsumer for the test queue by 5s, making the 500ms requestTimeout fire deterministically instead of relying on a 1ms race condition. --- .../activemq/SyncSendPacketTimeoutTest.java | 46 ++++++++++++++++--- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java index 003017ccd31..fb2a13f8dfb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java @@ -24,8 +24,13 @@ import jakarta.jms.MessageConsumer; import jakarta.jms.Session; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.transport.RequestTimedOutIOException; @@ -122,21 +127,48 @@ public void testRequestTimeoutConfiguredViaUrl() throws Exception { @Test public void testSyncSendPacketFailFromTimeout() throws Exception { + // Restart the broker with a plugin that delays addConsumer responses + broker.stop(); + broker.waitUntilStopped(); + + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() { + @Override + public Broker installPlugin(Broker broker) { + return new BrokerFilter(broker) { + @Override + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + // Only delay consumers on our test queue, not advisory consumers + if (info.getDestination().getPhysicalName().equals("test")) { + Thread.sleep(5000); + } + return super.addConsumer(context, info); + } + }; + } + }}); + broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl); - // Set to super short 1 millisecond so we always time out - factory.setRequestTimeout(1); + factory.setWatchTopicAdvisories(false); + factory.setRequestTimeout(500); try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Exception exception = null; try { - connection.start(); - try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(session.createQueue("test"))) { - assertNotNull("Consumer should be created successfully", consumer); - } + session.createConsumer(session.createQueue("test")); fail("Expected JMSException due to request timeout"); } catch (JMSException expected) { exception = expected; } + assertNotNull("Should have caught a JMSException", exception); assertEquals(RequestTimedOutIOException.class, TransportConnector.getRootCause(exception).getClass()); }