Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class MemoryUsageConcurrencyTest {
@Test
public void testCycle() throws Exception {
final Random r = new Random(0xb4a14);
for (int i = 0; i < 30000; i++) {
for (int i = 0; i < 3000; i++) {
checkPercentage(i, i, r.nextInt(100) + 10, i % 2 == 0, i % 5 == 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1701,7 +1701,7 @@ public void testReceiveMessageSentWhileOffline() throws Exception {
// ConsumerInfo asynchronously, so messages may not be ready for dispatch yet.
assertTrue("Subscription should become active in run " + (j + 1),
Wait.waitFor(() -> isSubscriptionActive(topics[0], mqttSub.getClientId().toString()),
TimeUnit.SECONDS.toMillis(30), 100));
TimeUnit.SECONDS.toMillis(60), 100));

for (int i = 0; i < messagesPerRun; ++i) {
Message message = connectionSub.receive(5, TimeUnit.SECONDS);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -338,17 +338,17 @@ public void testBrokerZeroPrefetchConfig() throws Exception {
final MessageProducer producer = session.createProducer(brokerZeroQueue);
producer.send(session.createTextMessage("Msg1"));
// now lets receive it
final MessageConsumer consumer = session.createConsumer(brokerZeroQueue);
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);

// Wait for broker subscription to be created and policy applied (same as testBrokerZeroPrefetchConfigWithConsumerControl)
// Wait for broker subscription to be created, policy applied, and ConsumerControl
// propagated back to the client (the broker sends a ConsumerControl to override
// the prefetch to 0, but the client processes it asynchronously)
final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
org.apache.activemq.util.Wait.waitFor(() ->
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
&& consumer.info.getCurrentPrefetchSize() == 0
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));

final TextMessage answer = (TextMessage)consumer.receive(TimeUnit.SECONDS.toMillis(5));
assertNotNull("Consumer should have read a message", answer);
Expand All @@ -358,22 +358,19 @@ public boolean isSatisified() throws Exception {
// https://issues.apache.org/jira/browse/AMQ-4234
// https://issues.apache.org/jira/browse/AMQ-4235
public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);

// Wait for broker subscription to be created, policy applied, and ConsumerControl
// propagated back to the client (the broker sends a ConsumerControl to override
// the prefetch to 0, but the client processes it asynchronously)
final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
&& consumer.info.getCurrentPrefetchSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
org.apache.activemq.util.Wait.waitFor(() ->
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
&& consumer.info.getCurrentPrefetchSize() == 0
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));

assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());

Expand All @@ -383,32 +380,29 @@ public boolean isSatisified() throws Exception {
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());

// manipulate Prefetch (like failover and stomp)
ConsumerControl consumerControl = new ConsumerControl();
final ConsumerControl consumerControl = new ConsumerControl();
consumerControl.setConsumerId(consumer.info.getConsumerId());
consumerControl.setDestination(transformedDest);
consumerControl.setPrefetch(1000); // default for a q

Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
final Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
assertTrue("good request", !(reply instanceof ExceptionResponse));

// Wait for the ConsumerControl to be processed - broker policy should override back to 0
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return consumer.info.getCurrentPrefetchSize() == 0
&& sub.getConsumerInfo().getCurrentPrefetchSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
org.apache.activemq.util.Wait.waitFor(() ->
consumer.info.getCurrentPrefetchSize() == 0
&& sub.getConsumerInfo().getCurrentPrefetchSize() == 0
, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));

assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
}

@Override
protected BrokerService createBroker() throws Exception {
BrokerService brokerService = super.createBroker();
PolicyMap policyMap = new PolicyMap();
PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
final BrokerService brokerService = super.createBroker();
final PolicyMap policyMap = new PolicyMap();
final PolicyEntry zeroPrefetchPolicy = new PolicyEntry();
zeroPrefetchPolicy.setQueuePrefetch(0);
policyMap.put(ActiveMQDestination.transform(brokerZeroQueue), zeroPrefetchPolicy);
brokerService.setDestinationPolicy(policyMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void testMBeanPresenceOnRestart() throws Exception {
private void restartBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
Thread.sleep(5 * 1000);
createBroker(false);
broker.waitUntilStarted();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
*/
package org.apache.activemq.broker.advisory;

import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;

import java.net.URI;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;

public class AdvisoryDuplexNetworkBridgeTest extends AdvisoryNetworkBridgeTest {

@Override
public void createBroker1() throws Exception {
broker1 = new BrokerService();
broker1.setBrokerName("broker1");
broker1.addConnector("tcp://localhost:61617");
broker1.addConnector("tcp://localhost:0");
broker1.setUseJmx(false);
broker1.setPersistent(false);
broker1.start();
Expand All @@ -36,12 +36,28 @@ public void createBroker1() throws Exception {

@Override
public void createBroker2() throws Exception {
broker2 = BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/duplexLocalBroker.xml"));
// Programmatic equivalent of duplexLocalBroker.xml with ephemeral port
broker2 = new BrokerService();
broker2.setBrokerName("localBroker");
broker2.setPersistent(true);
broker2.setUseShutdownHook(false);
broker2.setUseJmx(false);
broker2.addConnector("tcp://localhost:0");

final String broker1Uri = broker1.getTransportConnectors().get(0).getConnectUri().toString();
final NetworkConnector nc = broker2.addNetworkConnector("static:(" + broker1Uri + ")");
nc.setDuplex(true);
nc.setDynamicOnly(false);
nc.setConduitSubscriptions(true);
nc.setDecreaseNetworkConsumerPriority(false);
nc.getExcludedDestinations().add(new ActiveMQQueue("exclude.test.foo"));
nc.getExcludedDestinations().add(new ActiveMQTopic("exclude.test.bar"));

broker2.start();
broker2.waitUntilStarted();
}

public void assertCreatedByDuplex(boolean createdByDuplex) {
public void assertCreatedByDuplex(final boolean createdByDuplex) {
assertTrue(createdByDuplex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.util.Wait;

import java.net.URI;

Expand All @@ -38,26 +39,24 @@ public class AdvisoryNetworkBridgeTest extends TestCase {
public void testAdvisory() throws Exception {
createBroker1();

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
Connection conn = factory.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
final Connection conn = factory.createConnection();
final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());

Thread.sleep(1000);
final MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());

createBroker2();
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);

ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(5000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertTrue(advisory.getBooleanProperty("started"));
assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));

broker2.stop();
broker2.waitUntilStopped();

advisory = (ActiveMQMessage)consumer.receive(2000);
advisory = (ActiveMQMessage)consumer.receive(5000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertFalse(advisory.getBooleanProperty("started"));
Expand All @@ -70,15 +69,13 @@ public void testAddConsumerLater() throws Exception {

createBroker2();

Thread.sleep(1000);

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
Connection conn = factory.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://broker1");
final Connection conn = factory.createConnection();
final Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
MessageConsumer consumer = sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());

ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(5000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertTrue(advisory.getBooleanProperty("started"));
Expand All @@ -87,7 +84,7 @@ public void testAddConsumerLater() throws Exception {
broker2.stop();
broker2.waitUntilStopped();

advisory = (ActiveMQMessage)consumer.receive(2000);
advisory = (ActiveMQMessage)consumer.receive(5000);
assertNotNull(advisory);
assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
assertFalse(advisory.getBooleanProperty("started"));
Expand Down
Loading
Loading