Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ protected CompletableFuture<Void> closeProducerAsync(boolean closeTheStartingPro
return;
}
if (setDisconnectedRes.getRight() == State.Terminating
|| setDisconnectingRes.getRight() == State.Terminated) {
|| setDisconnectedRes.getRight() == State.Terminated) {
log.info("[{}] Skip setting state to terminated because it was terminated, state : {}",
replicatorId, state);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.logging.log4j.Level;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -46,10 +47,12 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.utils.TestLogAppender;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.Assert;
Expand Down Expand Up @@ -132,6 +135,70 @@ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
});
}

@Test
public void testCloseProducerAsyncNoWarnWhenAlreadyTerminated() throws Exception {
final String localCluster = "localCluster";
final String remoteCluster = "remoteCluster";
final String topicName = "remoteTopicName";
final String replicatorPrefix = "pulsar.repl";

// Mock services.
final ServiceConfiguration pulsarConfig = mock(ServiceConfiguration.class);
final PulsarService pulsar = mock(PulsarService.class);
final BrokerService broker = mock(BrokerService.class);
final Topic localTopic = mock(Topic.class);
final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
final ProducerBuilderImpl producerBuilder = mock(ProducerBuilderImpl.class);
when(broker.pulsar()).thenReturn(pulsar);
when(pulsar.getClient()).thenReturn(localClient);
when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
when(localTopic.getName()).thenReturn(topicName);
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
when(producerBuilder.topic(any())).thenReturn(producerBuilder);
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
when(producerBuilder.sendTimeout(anyInt(), any())).thenReturn(producerBuilder);
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);

@Cleanup
PulsarAdmin admin = mock(PulsarAdmin.class);

final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, localTopic, remoteCluster, topicName,
replicatorPrefix, broker, remoteClient, admin);

ProducerImpl producer = mock(ProducerImpl.class);
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
when(producer.closeAsync()).thenReturn(closeFuture);
replicator.producer = producer;
AbstractReplicator.STATE_UPDATER.set(replicator, AbstractReplicator.State.Started);

@Cleanup
TestLogAppender appender = TestLogAppender.create(AbstractReplicator.class);

CompletableFuture<Void> closing = replicator.closeProducerAsync(false);
Assert.assertEquals(replicator.getState(), AbstractReplicator.State.Disconnecting);

// Simulate another thread terminating the replicator while the producer is closing.
AbstractReplicator.STATE_UPDATER.set(replicator, AbstractReplicator.State.Terminated);
closeFuture.complete(null);
closing.join();

boolean hasUnexpectedWarnLog = appender.getEvents().stream()
.anyMatch(event -> event.getLevel() == Level.WARN
&& event.getMessage().getFormattedMessage()
.contains("Other task has change the state to terminated"));
Assert.assertFalse(hasUnexpectedWarnLog);

boolean hasExpectedInfoLog = appender.getEvents().stream()
.anyMatch(event -> event.getLevel() == Level.INFO
&& event.getMessage().getFormattedMessage()
.contains("Skip setting state to terminated because it was terminated"));
Assert.assertTrue(hasExpectedInfoLog);
}

private static class ReplicatorInTest extends AbstractReplicator {

public ReplicatorInTest(String localCluster, Topic localTopic, String remoteCluster, String remoteTopicName,
Expand Down
Loading