diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index c7a36ad1b21d7..15470a03c049a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -396,7 +396,7 @@ protected CompletableFuture closeProducerAsync(boolean closeTheStartingPro return; } if (setDisconnectedRes.getRight() == State.Terminating - || setDisconnectingRes.getRight() == State.Terminated) { + || setDisconnectedRes.getRight() == State.Terminated) { log.info("[{}] Skip setting state to terminated because it was terminated, state : {}", replicatorId, state); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java index 5f1d3a8a6c50b..421e98b188743 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java @@ -37,6 +37,7 @@ import lombok.Cleanup; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; +import org.apache.logging.log4j.Level; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -46,10 +47,12 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.ProducerBuilderImpl; +import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; +import org.apache.pulsar.utils.TestLogAppender; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; import org.testng.Assert; @@ -132,6 +135,70 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception { }); } + @Test + public void testCloseProducerAsyncNoWarnWhenAlreadyTerminated() throws Exception { + final String localCluster = "localCluster"; + final String remoteCluster = "remoteCluster"; + final String topicName = "remoteTopicName"; + final String replicatorPrefix = "pulsar.repl"; + + // Mock services. + final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class); + final PulsarService pulsar = mock(PulsarService.class); + final BrokerService broker = mock(BrokerService.class); + final Topic localTopic = mock(Topic.class); + final PulsarClientImpl localClient = mock(PulsarClientImpl.class); + final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class); + final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class); + when(broker.pulsar()).thenReturn(pulsar); + when(pulsar.getClient()).thenReturn(localClient); + when(pulsar.getConfiguration()).thenReturn(pulsarConfig); + when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100); + when(localTopic.getName()).thenReturn(topicName); + when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder); + when(producerBuilder.topic(any())).thenReturn(producerBuilder); + when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder); + when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder); + when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder); + when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder); + when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder); + + @Cleanup + PulsarAdmin admin = mock(PulsarAdmin.class); + + final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName, + replicatorPrefix, broker, remoteClient, admin); + + ProducerImpl producer = mock(ProducerImpl.class); + CompletableFuture closeFuture = new CompletableFuture<>(); + when(producer.closeAsync()).thenReturn(closeFuture); + replicator.producer = producer; + AbstractReplicator.STATE_UPDATER.set(replicator, AbstractReplicator.State.Started); + + @Cleanup + TestLogAppender appender = TestLogAppender.create(AbstractReplicator.class); + + CompletableFuture closing = replicator.closeProducerAsync(false); + Assert.assertEquals(replicator.getState(), AbstractReplicator.State.Disconnecting); + + // Simulate another thread terminating the replicator while the producer is closing. + AbstractReplicator.STATE_UPDATER.set(replicator, AbstractReplicator.State.Terminated); + closeFuture.complete(null); + closing.join(); + + boolean hasUnexpectedWarnLog = appender.getEvents().stream() + .anyMatch(event -> event.getLevel() == Level.WARN + && event.getMessage().getFormattedMessage() + .contains("Other task has change the state to terminated")); + Assert.assertFalse(hasUnexpectedWarnLog); + + boolean hasExpectedInfoLog = appender.getEvents().stream() + .anyMatch(event -> event.getLevel() == Level.INFO + && event.getMessage().getFormattedMessage() + .contains("Skip setting state to terminated because it was terminated")); + Assert.assertTrue(hasExpectedInfoLog); + } + private static class ReplicatorInTest extends AbstractReplicator { public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,