Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {

@VisibleForTesting
@NonNull CompletableFuture<Boolean> prepareInitPoliciesCacheAsync(@NonNull NamespaceName namespace) {
log.info("enter prepareInitPoliciesCacheAsync. ns:{}. this:{}", namespace, this);
requireNonNull(namespace);
if (closed.get()) {
return CompletableFuture.completedFuture(false);
Expand Down Expand Up @@ -825,6 +826,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
}
reader.readNextAsync()
.thenAccept(msg -> {
log.info("finish readMorePoliciesAsync 1. ns:{}, this:{}, reader:{}",
namespaceObject.toString(), this, reader);
try {
refreshTopicPoliciesCache(msg);
try {
Expand All @@ -838,6 +841,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader<PulsarEvent> reader)
}
})
.whenComplete((__, ex) -> {
log.info("finish readMorePoliciesAsync 2. ns:{}, this:{}, reader:{}, ex:{}",
namespaceObject.toString(), this, reader, ex);
if (ex == null) {
readMorePoliciesAsync(reader);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,10 @@ public void append(LogEvent event) {
logger.addAppender(appender);

// create namespace-5 and topic
log.info("origin SystemTopicBasedTopicPoliciesService:{}", pulsar.getTopicPoliciesService());
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
log.info("spy SystemTopicBasedTopicPoliciesService:{}", spyService);
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);


Expand All @@ -515,17 +517,22 @@ public void append(LogEvent event) {
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);

// set topic policy. create producer for __change_event topic
log.info("start setMaxConsumersPerSubscription");
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
log.info("finish setMaxConsumersPerSubscription");
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNotNull(future);
assertTrue(future.isDone());

// trigger close reader of __change_event directly, simulate that reader
// is closed for some reason, such as topic unload or broker restart.
// since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(),
// throw exception, output "Closing the topic policies reader for" and do cleanPoliciesCacheInitMap()
log.info("start close spy reader");
SystemTopicClient.Reader<PulsarEvent> reader = readerCompletableFuture.get();
reader.close();
log.info("successfully close spy reader");

Awaitility.await().untilAsserted(() -> {
boolean logFound = logMessages.stream()
.anyMatch(msg -> msg.contains("Closing the topic policies reader for"));
Expand Down Expand Up @@ -598,9 +605,11 @@ public void append(LogEvent event) {
logger.addAppender(appender);

// create namespace-5 and topic
log.info("origin SystemTopicBasedTopicPoliciesService:{}", pulsar.getTopicPoliciesService());
pulsar.getTopicPoliciesService().close();
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
log.info("spy SystemTopicBasedTopicPoliciesService:{}", spyService);


admin.namespaces().createNamespace(NAMESPACE5);
Expand Down
Loading