Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,20 +362,23 @@ public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception {

final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue);

// Wait for broker subscription to be created and policy applied
// Wait for broker subscription to be created, policy applied, and ConsumerControl
// propagated back to the client (the broker sends a ConsumerControl to override
// the prefetch to 0, but the client processes it asynchronously)
final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue);
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
&& !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
&& consumer.info.getCurrentPrefetchSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));

assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());

// verify sub view broker
Subscription sub =
final Subscription sub =
broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().get(0);
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());

Expand All @@ -388,8 +391,14 @@ public boolean isSatisified() throws Exception {
Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl);
assertTrue("good request", !(reply instanceof ExceptionResponse));

// Wait for the ConsumerControl to be processed
Thread.sleep(500);
// Wait for the ConsumerControl to be processed - broker policy should override back to 0
org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return consumer.info.getCurrentPrefetchSize() == 0
&& sub.getConsumerInfo().getCurrentPrefetchSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));

assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize());
assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());
Expand Down
Loading