From 160f421efe3ca001880ea2ffc912c580cd08f953 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 4 Nov 2025 17:33:56 +0800 Subject: [PATCH 1/3] Fix possible zombie consumer when closing after reconnection --- lib/ConsumerImpl.cc | 31 +++++++++++++++++++++++++++++-- lib/HandlerBase.h | 5 +++++ tests/ConsumerTest.cc | 15 +++++++++++++++ 3 files changed, 49 insertions(+), 2 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 92d25cb0..a4b64e3c 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -293,13 +293,40 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result Result handleResult = ResultOk; if (result == ResultOk) { - LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); { Lock mutexLock(mutex_); + if (!changeToReadyState()) { + resetCnx(); + // The consumer has been + auto client = client_.lock(); + if (client) { + LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); + int requestId = client->newRequestId(); + auto name = getName(); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + .addListener([name](Result result, const ResponseData&) { + if (result == ResultOk) { + LOG_INFO(name << "Closed consumer successfully after subscribe completed"); + } else { + LOG_WARN(name << "Failed to close consumer: " << strResult(result)); + } + }); + } else { + // This should not happen normally because if client is destroyed, the connection pool + // should also be closed, which means all connections should be closed. Close the + // connection to let broker know this registered consumer is inactive. + LOG_WARN(getName() + << "Client already closed when subscribe completed, close the connection " + << cnx->cnxString()); + cnx->close(ResultNotConnected); + } + return ResultAlreadyClosed; + } + + LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); setCnx(cnx); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); - state_ = Ready; backoff_.reset(); if (!messageListener_ && config_.getReceiverQueueSize() == 0) { // Complicated logic since we don't have a isLocked() function for mutex diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index df7dc244..967322fe 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -148,6 +148,11 @@ class HandlerBase : public std::enable_shared_from_this { firstRequestIdAfterConnect_.store(requestId, std::memory_order_release); } + bool changeToReadyState() noexcept { + State expected = Pending; + return state_ == Ready || state_.compare_exchange_strong(expected, Ready); + } + private: DeadlineTimerPtr timer_; DeadlineTimerPtr creationTimer_; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 3aa1dd3c..31730892 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1573,4 +1573,19 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { ASSERT_EQ(ResultOk, client.close()); } +TEST(ConsumerTest, testCloseAfterSeek) { + const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr)); + const auto subscription = "sub"; + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer)); + ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis())); + consumer.closeAsync(nullptr); + + // Test the previous consumer will be closed even after seek is done, at the moment the connection might + // not be established. + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer)); + client.close(); +} + } // namespace pulsar From 271d00458132df2cbc8e7627209ed9faa0bef8a6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 4 Nov 2025 17:37:08 +0800 Subject: [PATCH 2/3] Remove unused code --- lib/ConsumerImpl.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index a4b64e3c..3d5d294b 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -296,8 +296,6 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result { Lock mutexLock(mutex_); if (!changeToReadyState()) { - resetCnx(); - // The consumer has been auto client = client_.lock(); if (client) { LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); From dc8c2be5d46343e036c5907a427d991ae6d8cb9d Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 5 Nov 2025 17:15:41 +0800 Subject: [PATCH 3/3] Improve tests --- tests/ConsumerTest.cc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 31730892..f1bca77d 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1585,7 +1585,14 @@ TEST(ConsumerTest, testCloseAfterSeek) { // Test the previous consumer will be closed even after seek is done, at the moment the connection might // not be established. ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer)); + + // Test creating a consumer from a different client should also work for this case + Client anotherClient(lookupUrl); + consumer.closeAsync(nullptr); + ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer)); + client.close(); + anotherClient.close(); } } // namespace pulsar