From d245ac01f8746634070da101b0d01c596ca13389 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Fri, 19 Dec 2025 11:26:59 +0800 Subject: [PATCH] debug --- .../service/SystemTopicBasedTopicPoliciesService.java | 5 +++++ .../SystemTopicBasedTopicPoliciesServiceTest.java | 11 ++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index c3d88b9c7237d..33f68e19b5abb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -565,6 +565,7 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) { @VisibleForTesting @NonNull CompletableFuture prepareInitPoliciesCacheAsync(@NonNull NamespaceName namespace) { + log.info("enter prepareInitPoliciesCacheAsync. ns:{}. this:{}", namespace, this); requireNonNull(namespace); if (closed.get()) { return CompletableFuture.completedFuture(false); @@ -825,6 +826,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) } reader.readNextAsync() .thenAccept(msg -> { + log.info("finish readMorePoliciesAsync 1. ns:{}, this:{}, reader:{}", + namespaceObject.toString(), this, reader); try { refreshTopicPoliciesCache(msg); try { @@ -838,6 +841,8 @@ private void readMorePoliciesAsync(SystemTopicClient.Reader reader) } }) .whenComplete((__, ex) -> { + log.info("finish readMorePoliciesAsync 2. ns:{}, this:{}, reader:{}, ex:{}", + namespaceObject.toString(), this, reader, ex); if (ex == null) { readMorePoliciesAsync(reader); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 2f503e5512a22..b4da532d75238 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -492,8 +492,10 @@ public void append(LogEvent event) { logger.addAppender(appender); // create namespace-5 and topic + log.info("origin SystemTopicBasedTopicPoliciesService:{}", pulsar.getTopicPoliciesService()); SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); + log.info("spy SystemTopicBasedTopicPoliciesService:{}", spyService); FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); @@ -515,17 +517,22 @@ public void append(LogEvent event) { FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true); // set topic policy. create producer for __change_event topic + log.info("start setMaxConsumersPerSubscription"); admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1); + log.info("finish setMaxConsumersPerSubscription"); future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); Assert.assertNotNull(future); + assertTrue(future.isDone()); // trigger close reader of __change_event directly, simulate that reader // is closed for some reason, such as topic unload or broker restart. // since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(), // throw exception, output "Closing the topic policies reader for" and do cleanPoliciesCacheInitMap() + log.info("start close spy reader"); SystemTopicClient.Reader reader = readerCompletableFuture.get(); reader.close(); log.info("successfully close spy reader"); + Awaitility.await().untilAsserted(() -> { boolean logFound = logMessages.stream() .anyMatch(msg -> msg.contains("Closing the topic policies reader for")); @@ -598,9 +605,11 @@ public void append(LogEvent event) { logger.addAppender(appender); // create namespace-5 and topic + log.info("origin SystemTopicBasedTopicPoliciesService:{}", pulsar.getTopicPoliciesService()); + pulsar.getTopicPoliciesService().close(); SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); - FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); + log.info("spy SystemTopicBasedTopicPoliciesService:{}", spyService); admin.namespaces().createNamespace(NAMESPACE5);