Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
private int requestTimeout =0;
private boolean sendAcksAsync=true;
private boolean checkForDuplicates = true;
private boolean queueOnlyConnection = false;
Expand Down Expand Up @@ -1509,7 +1510,7 @@ public Response syncSendPacket(Command command, int timeout) throws JMSException
* @throws JMSException
*/
public Response syncSendPacket(Command command) throws JMSException {
return syncSendPacket(command, 0);
return syncSendPacket(command, requestTimeout);
}

/**
Expand Down Expand Up @@ -1838,6 +1839,20 @@ public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}

/**
* @return the requestTimeout (in milliseconds)
*/
public int getRequestTimeout() {
return requestTimeout;
}

/**
* @param requestTimeout the requestTimeout to set (in milliseconds)
*/
public void setRequestTimeout(int requestTimeout) {
this.requestTimeout = requestTimeout;
}

/**
* @return the sendAcksAsync
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout = 0;
private int requestTimeout = 0;
private int connectResponseTimeout = 0;
private boolean sendAcksAsync=true;
private TransportListener transportListener;
Expand Down Expand Up @@ -434,6 +435,7 @@ protected void configureConnection(ActiveMQConnection connection) throws JMSExce
connection.setProducerWindowSize(getProducerWindowSize());
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
connection.setRequestTimeout(getRequestTimeout());
connection.setCloseTimeout(getCloseTimeout());
connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth());
Expand Down Expand Up @@ -738,6 +740,20 @@ public void setSendTimeout(int sendTimeout) {
this.sendTimeout = sendTimeout;
}

/**
* @return the requestTimeout (in milliseconds)
*/
public int getRequestTimeout() {
return requestTimeout;
}

/**
* @param requestTimeout the requestTimeout to set (in milliseconds)
*/
public void setRequestTimeout(int requestTimeout) {
this.requestTimeout = requestTimeout;
}

/**
* @return the sendAcksAsync
*/
Expand Down Expand Up @@ -864,6 +880,7 @@ public void populateProperties(Properties props) {
props.setProperty("alwaysSyncSend", Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize", Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
props.setProperty("requestTimeout", Integer.toString(getRequestTimeout()));
props.setProperty("connectResponseTimeout", Integer.toString(getConnectResponseTimeout()));
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void rollback() throws JMSException {
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;
//make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : this.connection.getRequestTimeout());
// Notify the listener that the tx was rolled back
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.transport.RequestTimedOutIOException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
* Tests that {@link ActiveMQConnection#syncSendPacket(org.apache.activemq.command.Command)}
* applies the configured {@code requestTimeout}.
*/
public class SyncSendPacketTimeoutTest {

private BrokerService broker;
private String brokerUrl;

@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString();
}

@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
}
}

@Test
public void testRequestTimeoutDefaultIsZero() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
assertEquals("Default requestTimeout should be 0", 0, connection.getRequestTimeout());
}
}

@Test
public void testRequestTimeoutConfiguredViaFactory() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setRequestTimeout(5000);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
assertEquals("requestTimeout should be propagated from factory", 5000, connection.getRequestTimeout());
}
}

@Test
public void testSyncSendPacketSucceedsWithRequestTimeout() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setRequestTimeout(5000);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.start();
// Creating a session triggers syncSendPacket internally — should succeed within
// timeout
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Creating a consumer triggers syncSendPacket internally — should succeed
// within timeout
MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) {
assertNotNull("Session should be created successfully", session);
assertNotNull("Consumer should be created successfully", consumer);
}
}
}

@Test
public void testSyncSendPacketSucceedsWithoutRequestTimeout() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
// requestTimeout=0 means no timeout (default)
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.start();
try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Creating a consumer triggers syncSendPacket internally — should succeed
// no timeout
MessageConsumer consumer = session.createConsumer(session.createQueue("TEST.QUEUE"))) {
assertNotNull("Session should be created successfully with no timeout", session);
assertNotNull("Consumer should be created successfully", consumer);
}
}
}

@Test
public void testRequestTimeoutConfiguredViaUrl() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl + "?jms.requestTimeout=3000");
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
assertEquals("requestTimeout should be set via URL parameter", 3000, connection.getRequestTimeout());
}
}

@Test
public void testSyncSendPacketFailFromTimeout() throws Exception {
// Restart the broker with a plugin that delays addConsumer responses
broker.stop();
broker.waitUntilStopped();

broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
@Override
public Broker installPlugin(Broker broker) {
return new BrokerFilter(broker) {
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
// Only delay consumers on our test queue, not advisory consumers
if (info.getDestination().getPhysicalName().equals("test")) {
Thread.sleep(5000);
}
return super.addConsumer(context, info);
}
};
}
}});
broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
brokerUrl = broker.getTransportConnectors().get(0).getPublishableConnectString();

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
factory.setWatchTopicAdvisories(false);
factory.setRequestTimeout(500);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Exception exception = null;
try {
session.createConsumer(session.createQueue("test"));
fail("Expected JMSException due to request timeout");
} catch (JMSException expected) {
exception = expected;
}
assertNotNull("Should have caught a JMSException", exception);
assertEquals(RequestTimedOutIOException.class,
TransportConnector.getRootCause(exception).getClass());
}
}

@Test
public void testSyncSendPacketOverrideDefaultRequestTimeout() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
try (ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection()) {
connection.start();
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// After session creation set the timeout default to be very short to test that
// overriding directly works
connection.setRequestTimeout(1);
ConsumerInfo info = new ConsumerInfo(session.getSessionInfo(),
session.getNextConsumerId().getValue());
info.setDestination(new ActiveMQQueue("test"));
// Send info packet with timeout override
assertNotNull("Consumer should be created successfully with no timeout",
connection.syncSendPacket(info, 5000));
}
}
}
Loading