diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index e06dc3e1f55..3c705d52358 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -362,20 +362,23 @@ public void testBrokerZeroPrefetchConfigWithConsumerControl() throws Exception { final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(brokerZeroQueue); - // Wait for broker subscription to be created and policy applied + // Wait for broker subscription to be created, policy applied, and ConsumerControl + // propagated back to the client (the broker sends a ConsumerControl to override + // the prefetch to 0, but the client processes it asynchronously) final ActiveMQDestination transformedDest = ActiveMQDestination.transform(brokerZeroQueue); org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() { @Override public boolean isSatisified() throws Exception { return broker.getRegionBroker().getDestinationMap().get(transformedDest) != null - && !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty(); + && !broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty() + && consumer.info.getCurrentPrefetchSize() == 0; } }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)); assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); // verify sub view broker - Subscription sub = + final Subscription sub = broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().get(0); assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize()); @@ -388,8 +391,14 @@ public boolean isSatisified() throws Exception { Object reply = ((ActiveMQConnection) connection).getTransport().request(consumerControl); assertTrue("good request", !(reply instanceof ExceptionResponse)); - // Wait for the ConsumerControl to be processed - Thread.sleep(500); + // Wait for the ConsumerControl to be processed - broker policy should override back to 0 + org.apache.activemq.util.Wait.waitFor(new org.apache.activemq.util.Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return consumer.info.getCurrentPrefetchSize() == 0 + && sub.getConsumerInfo().getCurrentPrefetchSize() == 0; + } + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100)); assertEquals("broker config prefetch in effect", 0, consumer.info.getCurrentPrefetchSize()); assertEquals("broker sub prefetch is correct", 0, sub.getConsumerInfo().getCurrentPrefetchSize());