diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index d024d43eb73db..d3e5067ed3123 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -53,8 +53,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; +import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -62,7 +63,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(MessageDispatchThrottlingTest.class); - @BeforeMethod + @BeforeClass @Override protected void setup() throws Exception { this.conf.setClusterName("test"); @@ -70,12 +71,35 @@ protected void setup() throws Exception { super.producerBaseSetup(); } - @AfterMethod(alwaysRun = true) + @AfterClass(alwaysRun = true) @Override protected void cleanup() throws Exception { super.internalCleanup(); } + @AfterMethod(alwaysRun = true) + protected void reset() throws Exception { + pulsar.getConfiguration().setForceDeleteTenantAllowed(true); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true); + + for (String tenant : admin.tenants().getTenants()) { + for (String namespace : admin.namespaces().getNamespaces(tenant)) { + admin.namespaces().deleteNamespace(namespace, true); + } + admin.tenants().deleteTenant(tenant, true); + } + + for (String cluster : admin.clusters().getClusters()) { + admin.clusters().deleteCluster(cluster); + } + + pulsar.getConfiguration().setForceDeleteTenantAllowed(false); + pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false); + + super.producerBaseSetup(); + } + + @DataProvider(name = "subscriptions") public Object[][] subscriptionsProvider() { return new Object[][] { new Object[] { SubscriptionType.Shared }, { SubscriptionType.Exclusive } }; @@ -280,6 +304,7 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception { final long byteRate = 1024 * 1024;// 1MB rate enough to let all msg to be delivered int initValue = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInMsg(); + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); // (1) Update message-dispatch-rate limit admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", Integer.toString(messageRate)); @@ -325,7 +350,9 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception { consumer.close(); producer.close(); - pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", + Integer.toString(initValue)); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInByte", Long.toString(initBytes)); log.info("-- Exiting {} test --", methodName); } @@ -675,7 +702,8 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) consumer.close(); producer.close(); - pulsar.getConfiguration().setDispatchThrottlingRatePerTopicInMsg(initValue); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", + Integer.toString(initValue)); log.info("-- Exiting {} test --", methodName); } @@ -981,7 +1009,8 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { producer.close(); producer2.close(); - + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerTopicInMsg", + Integer.toString(initValue)); log.info("-- Exiting {} test --", methodName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 7881038cc7aaf..642bec7321edd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -18,12 +18,11 @@ */ package org.apache.pulsar.client.api; +import static org.awaitility.Awaitility.await; import com.google.common.collect.Sets; - import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; @@ -38,8 +37,6 @@ import org.testng.Assert; import org.testng.annotations.Test; -import static org.awaitility.Awaitility.await; - @Test(groups = "flaky") public class SubscriptionMessageDispatchThrottlingTest extends MessageDispatchThrottlingTest { private static final Logger log = LoggerFactory.getLogger(SubscriptionMessageDispatchThrottlingTest.class); @@ -243,6 +240,7 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); admin.namespaces().setDispatchRate(namespace, topicDispatchRate); + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); final int numProducedMessages = 30; @@ -302,6 +300,9 @@ private void testMessageNotDuplicated(SubscriptionType subscription) throws Exce consumer.close(); producer.close(); + + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); + admin.topics().delete(topicName, true); admin.namespaces().deleteNamespace(namespace); } @@ -417,6 +418,7 @@ private void testDispatchRate(SubscriptionType subscription, admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate); admin.namespaces().setDispatchRate(namespace, topicDispatchRate); + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate); final int numProducedMessages = 30; @@ -480,6 +482,7 @@ private void testDispatchRate(SubscriptionType subscription, consumer.close(); producer.close(); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); admin.topics().delete(topicName, true); admin.namespaces().deleteNamespace(namespace); } @@ -532,6 +535,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll"); final String subName = "my-subscriber-name-" + subscription; + long initBytes = pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte(); final int byteRate = 1000; admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate); admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test")); @@ -591,6 +595,7 @@ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(Subscri consumer2.close(); producer1.close(); producer2.close(); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", Long.toString(initBytes)); log.info("-- Exiting {} test --", methodName); } @@ -739,7 +744,9 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) consumer.close(); producer.close(); - pulsar.getConfiguration().setDispatchThrottlingRatePerSubscriptionInMsg(initValue); + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", + Integer.toString(initValue)); + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(false); log.info("-- Exiting {} test --", methodName); } @@ -855,11 +862,12 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { producer.close(); producer2.close(); - + admin.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerSubscriptionInMsg", + Integer.toString(initValue)); log.info("-- Exiting {} test --", methodName); } - @Test(dataProvider = "subscriptions", timeOut = 10000) + @Test(dataProvider = "subscriptions", timeOut = 11000) public void testClosingRateLimiter(SubscriptionType subscription) throws Exception { log.info("-- Starting {} test --", methodName);