diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 92d25cb0..3d5d294b 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -293,13 +293,38 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result Result handleResult = ResultOk; if (result == ResultOk) { - LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); { Lock mutexLock(mutex_); + if (!changeToReadyState()) { + auto client = client_.lock(); + if (client) { + LOG_INFO(getName() << "Closing subscribed consumer since it was already closed"); + int requestId = client->newRequestId(); + auto name = getName(); + cnx->sendRequestWithId(Commands::newCloseConsumer(consumerId_, requestId), requestId) + .addListener([name](Result result, const ResponseData&) { + if (result == ResultOk) { + LOG_INFO(name << "Closed consumer successfully after subscribe completed"); + } else { + LOG_WARN(name << "Failed to close consumer: " << strResult(result)); + } + }); + } else { + // This should not happen normally because if client is destroyed, the connection pool + // should also be closed, which means all connections should be closed. Close the + // connection to let broker know this registered consumer is inactive. + LOG_WARN(getName() + << "Client already closed when subscribe completed, close the connection " + << cnx->cnxString()); + cnx->close(ResultNotConnected); + } + return ResultAlreadyClosed; + } + + LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); setCnx(cnx); incomingMessages_.clear(); possibleSendToDeadLetterTopicMessages_.clear(); - state_ = Ready; backoff_.reset(); if (!messageListener_ && config_.getReceiverQueueSize() == 0) { // Complicated logic since we don't have a isLocked() function for mutex diff --git a/lib/HandlerBase.h b/lib/HandlerBase.h index df7dc244..967322fe 100644 --- a/lib/HandlerBase.h +++ b/lib/HandlerBase.h @@ -148,6 +148,11 @@ class HandlerBase : public std::enable_shared_from_this { firstRequestIdAfterConnect_.store(requestId, std::memory_order_release); } + bool changeToReadyState() noexcept { + State expected = Pending; + return state_ == Ready || state_.compare_exchange_strong(expected, Ready); + } + private: DeadlineTimerPtr timer_; DeadlineTimerPtr creationTimer_; diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 3aa1dd3c..f1bca77d 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1573,4 +1573,26 @@ TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) { ASSERT_EQ(ResultOk, client.close()); } +TEST(ConsumerTest, testCloseAfterSeek) { + const auto topic = "test-close-after-seek-" + std::to_string(time(nullptr)); + const auto subscription = "sub"; + Client client(lookupUrl); + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer)); + ASSERT_EQ(ResultOk, consumer.seek(TimeUtils::currentTimeMillis())); + consumer.closeAsync(nullptr); + + // Test the previous consumer will be closed even after seek is done, at the moment the connection might + // not be established. + ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, consumer)); + + // Test creating a consumer from a different client should also work for this case + Client anotherClient(lookupUrl); + consumer.closeAsync(nullptr); + ASSERT_EQ(ResultOk, anotherClient.subscribe(topic, subscription, consumer)); + + client.close(); + anotherClient.close(); +} + } // namespace pulsar