From 4cb7775a50bcf1735c66a60c67f5dcda1046fa94 Mon Sep 17 00:00:00 2001 From: lzxddz Date: Wed, 3 Sep 2025 14:34:58 +0800 Subject: [PATCH 1/4] Remove lock serv_mux_ Change DataStoreService: only contain one data shard in one DataStoreService. Create DataStore in DataStoreService instead in eloqsql or eloqkv server. remove ongoing_write_requests_ from RocksDBDataStoreCommon --- .../data_store_service.cpp | 727 +++++++++--------- eloq_data_store_service/data_store_service.h | 78 +- .../data_store_service_config.cpp | 2 +- .../data_store_service_util.h | 4 +- eloq_data_store_service/internal_request.h | 62 +- eloq_data_store_service/main.cpp | 56 +- .../rocksdb_cloud_data_store.cpp | 17 +- .../rocksdb_data_store_common.cpp | 147 ++-- .../rocksdb_data_store_common.h | 20 +- .../thread_worker_pool.cpp | 5 +- eloq_data_store_service/thread_worker_pool.h | 2 +- 11 files changed, 593 insertions(+), 527 deletions(-) diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index 2e47afc..bd836f4 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -216,8 +216,6 @@ DataStoreService::DataStoreService( DataStoreService::~DataStoreService() { - std::unique_lock lk(serv_mux_); - if (server_ != nullptr) { server_->Stop(0); @@ -225,33 +223,43 @@ DataStoreService::~DataStoreService() server_.reset(nullptr); } - // shutdown scan iter ttl check worker - { - } - migrate_worker_.Shutdown(); // shutdown all data_store - if (!data_store_map_.empty()) + if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed) { - for (auto &it : data_store_map_) - { - if (it.second != nullptr) - { - it.second->Shutdown(); - } - } + data_store_->Shutdown(); + data_store_ = nullptr; } } -bool DataStoreService::StartService() +bool DataStoreService::StartService(bool create_db_if_missing) { - std::unique_lock lk(serv_mux_); if (server_ != nullptr) { return true; } + auto dss_shards = cluster_manager_.GetShardsForThisNode(); + assert(dss_shards.size() <= 1); + if (!dss_shards.empty()) + { + shard_id_ = dss_shards.at(0); + shard_status_ = cluster_manager_.FetchDSShardStatus(shard_id_); + if (shard_status_ == DSShardStatus::ReadOnly || + shard_status_ == DSShardStatus::ReadWrite) + { + data_store_ = data_store_factory_->CreateDataStore( + create_db_if_missing, shard_id_, this, true); + if (data_store_ == nullptr) + { + LOG(ERROR) << "Failed to create data store on starting " + "DataStoreService."; + return false; + } + } + } + server_ = std::make_unique(); if (server_->AddService(this, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { @@ -275,112 +283,79 @@ bool DataStoreService::StartService() return true; } -void DataStoreService::ConnectDataStore( - std::unordered_map> &&data_store_map) -{ - std::unique_lock lk(serv_mux_); - data_store_map_ = std::move(data_store_map); - // create scan iterator cache for each data store - for (auto &data_store : data_store_map_) - { - std::unique_lock lk(scan_iter_cache_map_mux_); - scan_iter_cache_map_.emplace(data_store.first, - std::make_unique()); - } -} - -void DataStoreService::DisconnectDataStore() -{ - std::unique_lock lk(serv_mux_); - for (auto &data_store : data_store_map_) - { - data_store.second->Shutdown(); - } - data_store_map_.clear(); -} - bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, DSShardStatus open_mode, bool create_db_if_missing) { + assert(data_store_factory_ != nullptr); + if (data_store_ == nullptr) { - std::unique_lock lk(serv_mux_); - assert(data_store_factory_ != nullptr); - if (data_store_map_.find(data_shard_id) == data_store_map_.end() || - data_store_map_[data_shard_id] == nullptr) + shard_id_ = data_shard_id; + data_store_ = data_store_factory_->CreateDataStore( + create_db_if_missing, data_shard_id, this, true); + if (data_store_ == nullptr) { - data_store_map_[data_shard_id] = - data_store_factory_->CreateDataStore( - create_db_if_missing, data_shard_id, this, true); - if (data_store_map_[data_shard_id] == nullptr) - { - LOG(ERROR) << "Failed to create data store"; - return false; - } - } - else - { - bool res = data_store_map_[data_shard_id]->Initialize(); - if (!res) - { - LOG(ERROR) << "Failed to initialize data store"; - return false; - } - - res = data_store_map_[data_shard_id]->StartDB(); - if (!res) - { - LOG(ERROR) - << "Failed to start db instance in data store service"; - return false; - } + LOG(ERROR) << "Failed to create data store"; + return false; } } + else { - if (open_mode == DSShardStatus::ReadOnly) - { - SwitchToReadOnly(data_shard_id, DSShardStatus::Closed); - } - else if (open_mode == DSShardStatus::ReadWrite) + assert(shard_id_ == data_shard_id); + bool res = data_store_->Initialize(); + if (!res) { - SwitchToReadWrite(data_shard_id, DSShardStatus::Closed); + LOG(ERROR) << "Failed to initialize data store"; + return false; } - else + + res = data_store_->StartDB(); + if (!res) { - assert(false); + LOG(ERROR) << "Failed to start db instance in data store service"; + return false; } } + + cluster_manager_.SwitchShardToReadOnly(data_shard_id, + DSShardStatus::Closed); + assert(shard_status_.load(std::memory_order_acquire) == + DSShardStatus::Closed); + shard_status_.store(open_mode, std::memory_order_release); return true; } + void DataStoreService::Read(::google::protobuf::RpcController *controller, const ::EloqDS::remote::ReadRequest *request, ::EloqDS::remote::ReadResponse *response, ::google::protobuf::Closure *done) { uint32_t partition_id = request->partition_id(); - uint32_t shard_id = cluster_manager_.GetShardIdByPartitionId(partition_id); + uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto *result = response->mutable_result(); - if (!cluster_manager_.IsOwnerOfShard(shard_id)) + if (!IsOwnerOfShard(shard_id)) { brpc::ClosureGuard done_guard(done); + auto *result = response->mutable_result(); cluster_manager_.PrepareShardingError(shard_id, result); return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) { brpc::ClosureGuard done_guard(done); + auto *result = response->mutable_result(); result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); result->set_error_msg("KV store not opened yet."); return; } + assert(data_store_ != nullptr); + // decrease read req count when read done ReadRpcRequest *req = rpc_read_request_pool_.NextObject(); req->Reset(this, request, response, done); - data_store_map_[shard_id]->Read(req); + data_store_->Read(req); } void DataStoreService::Read(const std::string_view table_name, @@ -392,17 +367,16 @@ void DataStoreService::Read(const std::string_view table_name, ::EloqDS::remote::CommonResult *result, ::google::protobuf::Closure *done) { - uint32_t shard_id = cluster_manager_.GetShardIdByPartitionId(partition_id); + uint32_t shard_id = GetShardIdByPartitionId(partition_id); - if (!cluster_manager_.IsOwnerOfShard(shard_id)) + if (!IsOwnerOfShard(shard_id)) { brpc::ClosureGuard done_guard(done); cluster_manager_.PrepareShardingError(shard_id, result); return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) { brpc::ClosureGuard done_guard(done); record->clear(); @@ -412,11 +386,11 @@ void DataStoreService::Read(const std::string_view table_name, return; } + assert(data_store_ != nullptr); ReadLocalRequest *req = local_read_request_pool_.NextObject(); req->Reset( this, table_name, partition_id, key, record, ts, ttl, result, done); - - data_store_map_[shard_id]->Read(req); + data_store_->Read(req); } void DataStoreService::FlushData( @@ -425,16 +399,23 @@ void DataStoreService::FlushData( ::EloqDS::remote::FlushDataResponse *response, ::google::protobuf::Closure *done) { - // This object helps to call done->Run() in RAII style. If you need to - // process the request asynchronously, pass done_guard.release(). - brpc::ClosureGuard done_guard(done); + uint32_t shard_id = request->shard_id(); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } - ::EloqDS::remote::CommonResult *result = response->mutable_result(); + IncreaseWriteReqCount(); - uint32_t shard_id = request->shard_id(); - auto shard_status = FetchDSShardStatus(shard_id); + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); if (shard_status == DSShardStatus::Closed) { PrepareShardingError(shard_id, result); @@ -449,20 +430,13 @@ void DataStoreService::FlushData( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); FlushDataRpcRequest *req = rpc_flush_data_req_pool_.NextObject(); - req->Reset(request, response, done); + req->Reset(this, request, response, done); // Process request async. - data_store_map_[shard_id]->FlushData(req); - done_guard.release(); + data_store_->FlushData(req); } void DataStoreService::FlushData(const std::vector &kv_table_names, @@ -470,9 +444,19 @@ void DataStoreService::FlushData(const std::vector &kv_table_names, remote::CommonResult &result, ::google::protobuf::Closure *done) { - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, &result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); if (shard_status == DSShardStatus::Closed) { @@ -488,20 +472,13 @@ void DataStoreService::FlushData(const std::vector &kv_table_names, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result.set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); FlushDataLocalRequest *req = local_flush_data_req_pool_.NextObject(); - req->Reset(&kv_table_names, result, done); + req->Reset(this, &kv_table_names, result, done); // Process request async. - data_store_map_[shard_id]->FlushData(req); + data_store_->FlushData(req); } void DataStoreService::DeleteRange( @@ -510,16 +487,25 @@ void DataStoreService::DeleteRange( ::EloqDS::remote::DeleteRangeResponse *response, ::google::protobuf::Closure *done) { - // This object helps to call done->Run() in RAII style. If you need to - // process the request asynchronously, pass done_guard.release(). - brpc::ClosureGuard done_guard(done); + uint32_t shard_id = GetShardIdByPartitionId(request->partition_id()); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } - ::EloqDS::remote::CommonResult *result = response->mutable_result(); + IncreaseWriteReqCount(); - uint32_t shard_id = GetShardIdByPartitionId(request->partition_id()); - auto shard_status = FetchDSShardStatus(shard_id); + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); + // This object helps to call done->Run() in RAII style. If you need to + // process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); if (shard_status == DSShardStatus::Closed) { PrepareShardingError(shard_id, result); @@ -534,20 +520,13 @@ void DataStoreService::DeleteRange( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); DeleteRangeRpcRequest *req = rpc_delete_range_req_pool_.NextObject(); - req->Reset(request, response, done); + req->Reset(this, request, response, done); // Process request async. - data_store_map_[shard_id]->DeleteRange(req); - done_guard.release(); + data_store_->DeleteRange(req); } void DataStoreService::DeleteRange(const std::string_view table_name, @@ -560,9 +539,19 @@ void DataStoreService::DeleteRange(const std::string_view table_name, { uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, &result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); if (shard_status == DSShardStatus::Closed) { @@ -578,21 +567,20 @@ void DataStoreService::DeleteRange(const std::string_view table_name, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result.set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); DeleteRangeLocalRequest *req = local_delete_range_req_pool_.NextObject(); - req->Reset( - table_name, partition_id, start_key, end_key, skip_wal, result, done); + req->Reset(this, + table_name, + partition_id, + start_key, + end_key, + skip_wal, + result, + done); // Process request async. - data_store_map_[shard_id]->DeleteRange(req); + data_store_->DeleteRange(req); } void DataStoreService::CreateTable( @@ -601,16 +589,23 @@ void DataStoreService::CreateTable( ::EloqDS::remote::CreateTableResponse *response, ::google::protobuf::Closure *done) { - // This object helps to call done->Run() in RAII style. If you need to - // process the request asynchronously, pass done_guard.release(). - brpc::ClosureGuard done_guard(done); + uint32_t shard_id = request->shard_id(); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } - ::EloqDS::remote::CommonResult *result = response->mutable_result(); + IncreaseWriteReqCount(); - uint32_t shard_id = request->shard_id(); - auto shard_status = FetchDSShardStatus(shard_id); + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); if (shard_status == DSShardStatus::Closed) { PrepareShardingError(shard_id, result); @@ -625,20 +620,13 @@ void DataStoreService::CreateTable( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); CreateTableRpcRequest *req = rpc_create_table_req_pool_.NextObject(); - req->Reset(request, response, done); + req->Reset(this, request, response, done); // Process request async. - data_store_map_[shard_id]->CreateTable(req); - done_guard.release(); + data_store_->CreateTable(req); } void DataStoreService::CreateTable(const std::string_view table_name, @@ -646,9 +634,19 @@ void DataStoreService::CreateTable(const std::string_view table_name, remote::CommonResult &result, ::google::protobuf::Closure *done) { - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, &result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); if (shard_status == DSShardStatus::Closed) { @@ -664,20 +662,13 @@ void DataStoreService::CreateTable(const std::string_view table_name, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result.set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); CreateTableLocalRequest *req = local_create_table_req_pool_.NextObject(); - req->Reset(table_name, result, done); + req->Reset(this, table_name, result, done); // Process request async. - data_store_map_[shard_id]->CreateTable(req); + data_store_->CreateTable(req); } void DataStoreService::DropTable( @@ -686,16 +677,23 @@ void DataStoreService::DropTable( ::EloqDS::remote::DropTableResponse *response, ::google::protobuf::Closure *done) { - // This object helps to call done->Run() in RAII style. If you need to - // process the request asynchronously, pass done_guard.release(). - brpc::ClosureGuard done_guard(done); + uint32_t shard_id = request->shard_id(); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } - ::EloqDS::remote::CommonResult *result = response->mutable_result(); + IncreaseWriteReqCount(); - uint32_t shard_id = request->shard_id(); - auto shard_status = FetchDSShardStatus(shard_id); + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); + brpc::ClosureGuard done_guard(done); + ::EloqDS::remote::CommonResult *result = response->mutable_result(); if (shard_status == DSShardStatus::Closed) { PrepareShardingError(shard_id, result); @@ -710,20 +708,13 @@ void DataStoreService::DropTable( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); DropTableRpcRequest *req = rpc_drop_table_req_pool_.NextObject(); - req->Reset(request, response, done); + req->Reset(this, request, response, done); // Process request async. - data_store_map_[shard_id]->DropTable(req); - done_guard.release(); + data_store_->DropTable(req); } void DataStoreService::DropTable(const std::string_view table_name, @@ -731,9 +722,19 @@ void DataStoreService::DropTable(const std::string_view table_name, remote::CommonResult &result, ::google::protobuf::Closure *done) { - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, &result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); if (shard_status == DSShardStatus::Closed) { @@ -749,20 +750,13 @@ void DataStoreService::DropTable(const std::string_view table_name, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result.set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); DropTableLocalRequest *req = local_drop_table_req_pool_.NextObject(); - req->Reset(table_name, result, done); + req->Reset(this, table_name, result, done); // Process request async. - data_store_map_[shard_id]->DropTable(req); + data_store_->DropTable(req); } void DataStoreService::BatchWriteRecords( @@ -773,12 +767,23 @@ void DataStoreService::BatchWriteRecords( { uint32_t partition_id = request->partition_id(); uint32_t shard_id = GetShardIdByPartitionId(partition_id); - ::EloqDS::remote::CommonResult *result = response->mutable_result(); - auto shard_status = cluster_manager_.FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + auto *result = response->mutable_result(); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); + auto *result = response->mutable_result(); if (shard_status == DSShardStatus::Closed) { result->set_error_code( @@ -795,22 +800,13 @@ void DataStoreService::BatchWriteRecords( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - auto *result = response->mutable_result(); - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); - // TODO(lzx): fetch req from pool. WriteRecordsRpcRequest *batch_write_req = rpc_write_records_request_pool_.NextObject(); - batch_write_req->Reset(request, response, done); + batch_write_req->Reset(this, request, response, done); - data_store_map_[shard_id]->BatchWriteRecords(batch_write_req); + data_store_->BatchWriteRecords(batch_write_req); } void DataStoreService::ScanNext( @@ -830,7 +826,14 @@ void DataStoreService::ScanNext( { uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); @@ -848,14 +851,7 @@ void DataStoreService::ScanNext( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); ScanLocalRequest *req = local_scan_request_pool_.NextObject(); req->Reset(this, @@ -873,7 +869,7 @@ void DataStoreService::ScanNext( result, done); - data_store_map_[shard_id]->ScanNext(req); + data_store_->ScanNext(req); } void DataStoreService::ScanNext(::google::protobuf::RpcController *controller, @@ -884,7 +880,15 @@ void DataStoreService::ScanNext(::google::protobuf::RpcController *controller, uint32_t partition_id = request->partition_id(); uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, + response->mutable_result()); + return; + } + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); @@ -903,20 +907,12 @@ void DataStoreService::ScanNext(::google::protobuf::RpcController *controller, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - auto *result = response->mutable_result(); - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); ScanRpcRequest *req = rpc_scan_request_pool_.NextObject(); req->Reset(this, request, response, done); - data_store_map_[shard_id]->ScanNext(req); + data_store_->ScanNext(req); } void DataStoreService::ScanClose(::google::protobuf::RpcController *controller, @@ -927,7 +923,15 @@ void DataStoreService::ScanClose(::google::protobuf::RpcController *controller, uint32_t partition_id = request->partition_id(); uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, + response->mutable_result()); + return; + } + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); @@ -946,20 +950,12 @@ void DataStoreService::ScanClose(::google::protobuf::RpcController *controller, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - response->mutable_result()->set_error_code( - ::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - response->mutable_result()->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); ScanRpcRequest *req = rpc_scan_request_pool_.NextObject(); req->Reset(this, request, response, done); - data_store_map_[shard_id]->ScanClose(req); + data_store_->ScanClose(req); } void DataStoreService::ScanClose(const std::string_view table_name, @@ -970,7 +966,14 @@ void DataStoreService::ScanClose(const std::string_view table_name, { uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, result); + return; + } + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); @@ -988,19 +991,12 @@ void DataStoreService::ScanClose(const std::string_view table_name, return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); - return; - } + assert(data_store_ != nullptr); ScanLocalRequest *req = local_scan_request_pool_.NextObject(); req->Reset(this, table_name, partition_id, session_id, result, done); - data_store_map_[shard_id]->ScanClose(req); + data_store_->ScanClose(req); } void DataStoreService::AppendThisNodeKey(std::stringstream &ss) @@ -1026,56 +1022,32 @@ void DataStoreService::EmplaceScanIter(uint32_t shard_id, std::string &session_id, std::unique_ptr iter) { - std::shared_lock lk(scan_iter_cache_map_mux_); - auto scan_iter_cache = scan_iter_cache_map_.find(shard_id); - if (scan_iter_cache != scan_iter_cache_map_.end()) - { - scan_iter_cache->second->Emplace(session_id, std::move(iter)); - } + assert(shard_id == shard_id_); + scan_iter_cache_.Emplace(session_id, std::move(iter)); } TTLWrapper *DataStoreService::BorrowScanIter(uint32_t shard_id, const std::string &session_id) { - std::shared_lock lk(scan_iter_cache_map_mux_); - auto scan_iter_cache = scan_iter_cache_map_.find(shard_id); - if (scan_iter_cache != scan_iter_cache_map_.end()) - { - auto *scan_iter_wrapper = scan_iter_cache->second->Borrow(session_id); - return scan_iter_wrapper; - } - return nullptr; + assert(shard_id == shard_id_); + auto *scan_iter_wrapper = scan_iter_cache_.Borrow(session_id); + return scan_iter_wrapper; } void DataStoreService::ReturnScanIter(uint32_t shard_id, TTLWrapper *iter) { - std::shared_lock lk(scan_iter_cache_map_mux_); - auto scan_iter_cache = scan_iter_cache_map_.find(shard_id); - if (scan_iter_cache != scan_iter_cache_map_.end()) - { - scan_iter_cache->second->Return(iter); - } + scan_iter_cache_.Return(iter); } void DataStoreService::EraseScanIter(uint32_t shard_id, const std::string &session_id) { - std::shared_lock lk(scan_iter_cache_map_mux_); - auto scan_iter_cache = scan_iter_cache_map_.find(shard_id); - if (scan_iter_cache != scan_iter_cache_map_.end()) - { - scan_iter_cache->second->Erase(session_id); - } + scan_iter_cache_.Erase(session_id); } void DataStoreService::ForceEraseScanIters(uint32_t shard_id) { - std::shared_lock lk(scan_iter_cache_map_mux_); - auto scan_iter_cache = scan_iter_cache_map_.find(shard_id); - if (scan_iter_cache != scan_iter_cache_map_.end()) - { - scan_iter_cache->second->ForceEraseIters(); - } + scan_iter_cache_.ForceEraseIters(); } void DataStoreService::BatchWriteRecords( @@ -1094,9 +1066,19 @@ void DataStoreService::BatchWriteRecords( { uint32_t shard_id = GetShardIdByPartitionId(partition_id); - auto shard_status = cluster_manager_.FetchDSShardStatus(shard_id); + if (!IsOwnerOfShard(shard_id)) + { + brpc::ClosureGuard done_guard(done); + cluster_manager_.PrepareShardingError(shard_id, &result); + return; + } + + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); if (shard_status == DSShardStatus::Closed) { @@ -1114,19 +1096,11 @@ void DataStoreService::BatchWriteRecords( return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) - { - brpc::ClosureGuard done_guard(done); - result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result.set_error_msg("KV store not opened yet."); - return; - } - - // TODO(lzx): fetch req from pool. + assert(data_store_ != nullptr); WriteRecordsLocalRequest *batch_write_req = local_write_records_request_pool_.NextObject(); - batch_write_req->Reset(table_name, + batch_write_req->Reset(this, + table_name, partition_id, key_parts, record_parts, @@ -1139,7 +1113,7 @@ void DataStoreService::BatchWriteRecords( parts_cnt_per_key, parts_cnt_per_record); - data_store_map_[shard_id]->BatchWriteRecords(batch_write_req); + data_store_->BatchWriteRecords(batch_write_req); } void DataStoreService::CreateSnapshotForBackup( @@ -1151,18 +1125,33 @@ void DataStoreService::CreateSnapshotForBackup( auto *result = response->mutable_result(); uint32_t shard_id = request->shard_id(); - if (!cluster_manager_.IsOwnerOfShard(shard_id)) + if (!IsOwnerOfShard(shard_id)) { + brpc::ClosureGuard done_guard(done); cluster_manager_.PrepareShardingError(shard_id, result); return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadWrite) { + DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done); - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); + if (shard_status == DSShardStatus::Closed) + { + result->set_error_code( + ::EloqDS::remote::DataStoreError::REQUESTED_NODE_NOT_OWNER); + result->set_error_msg("Requested data not on local node."); + } + else + { + assert(shard_status == DSShardStatus::ReadOnly); + result->set_error_code( + ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); + result->set_error_msg("Write to read-only DB."); + } return; } @@ -1170,7 +1159,7 @@ void DataStoreService::CreateSnapshotForBackup( rpc_create_snapshot_req_pool_.NextObject(); req->Reset(this, request, response, done); - data_store_map_[shard_id]->CreateSnapshotForBackup(req); + data_store_->CreateSnapshotForBackup(req); } void DataStoreService::CreateSnapshotForBackup( @@ -1181,17 +1170,33 @@ void DataStoreService::CreateSnapshotForBackup( remote::CommonResult *result, ::google::protobuf::Closure *done) { - if (!cluster_manager_.IsOwnerOfShard(shard_id)) + if (!IsOwnerOfShard(shard_id)) { + brpc::ClosureGuard done_guard(done); cluster_manager_.PrepareShardingError(shard_id, result); return; } - std::shared_lock lk(serv_mux_); - if (!data_store_map_[shard_id]) + IncreaseWriteReqCount(); + + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadWrite) { - result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); - result->set_error_msg("KV store not opened yet."); + DecreaseWriteReqCount(); + brpc::ClosureGuard done_guard(done); + if (shard_status == DSShardStatus::Closed) + { + result->set_error_code( + ::EloqDS::remote::DataStoreError::REQUESTED_NODE_NOT_OWNER); + result->set_error_msg("Requested data not on local node."); + } + else + { + assert(shard_status == DSShardStatus::ReadOnly); + result->set_error_code( + ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); + result->set_error_msg("Write to read-only DB."); + } return; } @@ -1201,7 +1206,7 @@ void DataStoreService::CreateSnapshotForBackup( req->Reset(this, backup_name, backup_ts, backup_files, result, done); // Process request async - data_store_map_[shard_id]->CreateSnapshotForBackup(req); + data_store_->CreateSnapshotForBackup(req); } void DataStoreService::FetchDSSClusterConfig( @@ -1463,12 +1468,12 @@ void DataStoreService::SwitchDSShardMode( if (mode == DSShardStatus::ReadOnly) { DLOG(INFO) << "SwitchDSShardMode to read only for shard " << shard_id; - res = SwitchToReadOnly(shard_id, DSShardStatus::ReadWrite); + res = SwitchReadWriteToReadOnly(shard_id); } else if (mode == DSShardStatus::ReadWrite) { DLOG(INFO) << "SwitchDSShardMode to read write for shard " << shard_id; - res = SwitchToReadWrite(shard_id, DSShardStatus::ReadOnly); + res = SwitchReadOnlyToReadWrite(shard_id); } ACTION_FAULT_INJECTOR("panic_after_target_switch_rw"); @@ -1862,12 +1867,11 @@ bool DataStoreService::DoMigrate(const std::string &event_id, << log_ptr->shard_id; break; } - std::shared_lock lk(serv_mux_); - if (data_store_map_.find(log_ptr->shard_id) != data_store_map_.end()) + auto data_store = GetDataStore(log_ptr->shard_id); + if (data_store != nullptr) { break; } - lk.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(500)); LOG(INFO) << "Waiting for data store connection for shard " << log_ptr->shard_id; @@ -1883,7 +1887,7 @@ bool DataStoreService::DoMigrate(const std::string &event_id, if (log.status == 1) { LOG(INFO) << "Switching shard " << log.shard_id << " to read-only"; - bool res = SwitchToReadOnly(log.shard_id, DSShardStatus::ReadWrite); + bool res = SwitchReadWriteToReadOnly(log.shard_id); // Only when shard status is not rw, SwitchToReadOnly return false. if (!res) { @@ -2010,7 +2014,7 @@ bool DataStoreService::DoMigrate(const std::string &event_id, LOG(INFO) << "Notifying target node to switch to read-write mode"; // Must switch to closed before notify target node switch mode - SwitchToClosed(log.shard_id, DSShardStatus::ReadOnly); + SwitchReadOnlyToClosed(log.shard_id); bool res = NotifyTargetNodeSwitchDSShardMode( target_node, @@ -2137,61 +2141,86 @@ bool DataStoreService::NotifyTargetNodeOpenDSShard( } } -bool DataStoreService::SwitchToReadOnly(uint32_t shard_id, - DSShardStatus expected) +bool DataStoreService::SwitchReadWriteToReadOnly(uint32_t shard_id) { - if (!cluster_manager_.SwitchShardToReadOnly(shard_id, expected)) + if (!IsOwnerOfShard(shard_id)) { return false; } - // Wait for all started write requests to finish before opening db on - // target node - std::shared_lock lk(serv_mux_); - data_store_map_[shard_id]->SwitchToReadOnly(); - - return true; + DSShardStatus expected = DSShardStatus::ReadWrite; + if (!shard_status_.compare_exchange_strong(expected, + DSShardStatus::ReadOnly) && + expected != DSShardStatus::ReadOnly) + { + DLOG(ERROR) << "SwitchReadWriteToReadOnly failed, shard status is not " + "ReadWrite or ReadOnly"; + return false; + } + // wait for all write requests to finish + while (ongoing_write_requests_.load(std::memory_order_acquire) > 0) + { + bthread_usleep(1000); + } + if (shard_status_.load(std::memory_order_acquire) == + DSShardStatus::ReadOnly) + { + cluster_manager_.SwitchShardToReadOnly(shard_id, expected); + data_store_->SwitchToReadOnly(); + return true; + } + else + { + DLOG(ERROR) << "Switch data store to ReadOnly failed for shard " + "status never be ReadOnly"; + return false; + } } -bool DataStoreService::SwitchToReadWrite(uint32_t shard_id, - DSShardStatus expected) +bool DataStoreService::SwitchReadOnlyToClosed(uint32_t shard_id) { - if (!cluster_manager_.SwitchShardToReadWrite(shard_id, expected)) + if (!IsOwnerOfShard(shard_id)) { return false; } - // Wait for all started write requests to finish before opening db on - // target node - std::shared_lock lk(serv_mux_); - data_store_map_[shard_id]->SwitchToReadWrite(); + DSShardStatus expected = DSShardStatus::ReadOnly; + if (!shard_status_.compare_exchange_strong(expected, + DSShardStatus::Closed) && + expected != DSShardStatus::Closed) + { + DLOG(ERROR) << "SwitchReadOnlyToClosed failed, shard status is not " + "ReadOnly or Closed"; + return false; + } + if (expected == DSShardStatus::ReadOnly) + { + cluster_manager_.SwitchShardToClosed(shard_id, expected); + data_store_->Shutdown(); + } return true; } -bool DataStoreService::SwitchToClosed(uint32_t shard_id, DSShardStatus expected) +bool DataStoreService::SwitchReadOnlyToReadWrite(uint32_t shard_id) { - if (!cluster_manager_.SwitchShardToClosed(shard_id, expected)) + if (!IsOwnerOfShard(shard_id)) { return false; } - // Wait for all started read and write requests to finish before - // shutting down db + DSShardStatus expected = DSShardStatus::ReadOnly; + if (!shard_status_.compare_exchange_strong(expected, + DSShardStatus::ReadWrite) && + expected != DSShardStatus::ReadWrite) { - std::shared_lock lk(serv_mux_); - if (data_store_map_[shard_id] == nullptr) - { - return true; - } - data_store_map_[shard_id]->Shutdown(); - } - { - std::unique_lock lk(serv_mux_); - data_store_map_[shard_id] = nullptr; + DLOG(ERROR) << "SwitchReadOnlyToReadWrite failed, shard status is not " + "ReadOnly or ReadWrite"; + return false; } - DLOG(INFO) << "SwitchToClosed, shutdown shard " << shard_id << " success"; + data_store_->SwitchToReadWrite(); + cluster_manager_.SwitchShardToReadWrite(shard_id, expected); return true; } diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index 45a88af..66f0ade 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -187,21 +187,10 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ~DataStoreService(); - bool StartService(); - - void ConnectDataStore( - std::unordered_map> - &&data_store_map); - - void DisconnectDataStore(); - - bool ConnectAndStartDataStore(uint32_t data_shard_id, - DSShardStatus open_mode, - bool create_db_if_missing = false); + bool StartService(bool create_db_if_missing); brpc::Server *GetBrpcServer() { - std::shared_lock lk(serv_mux_); return server_.get(); } @@ -583,7 +572,11 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // ======================================================================= DSShardStatus FetchDSShardStatus(uint32_t shard_id) { - return cluster_manager_.FetchDSShardStatus(shard_id); + if (shard_id_ == shard_id) + { + return shard_status_; + } + return DSShardStatus::Closed; } void AddListenerForUpdateConfig( @@ -597,17 +590,48 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService return data_store_factory_.get(); } + void IncreaseWriteReqCount() + { + ongoing_write_requests_.fetch_add(1, std::memory_order_release); + } + + void DecreaseWriteReqCount() + { + ongoing_write_requests_.fetch_sub(1, std::memory_order_release); + } + private: uint32_t GetShardIdByPartitionId(int32_t partition_id) { - return cluster_manager_.GetShardIdByPartitionId(partition_id); + // Now only support single data shard + return 0; + // return cluster_manager_.GetShardIdByPartitionId(partition_id); } - bool SwitchToReadOnly(uint32_t shard_id, DSShardStatus expected); + bool IsOwnerOfShard(uint32_t shard_id) + { + return shard_id_ == shard_id; + } - bool SwitchToReadWrite(uint32_t shard_id, DSShardStatus expected); + DataStore *GetDataStore(uint32_t shard_id) + { + if (shard_id_ == shard_id) + { + return data_store_.get(); + } + else + { + return nullptr; + } + } + + bool ConnectAndStartDataStore(uint32_t data_shard_id, + DSShardStatus open_mode, + bool create_db_if_missing = false); - bool SwitchToClosed(uint32_t shard_id, DSShardStatus expected); + bool SwitchReadWriteToReadOnly(uint32_t shard_id); + bool SwitchReadOnlyToClosed(uint32_t shard_id); + bool SwitchReadOnlyToReadWrite(uint32_t shard_id); bool WriteMigrationLog(uint32_t shard_id, const std::string &event_id, @@ -623,7 +647,7 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService uint32_t &migration_status, uint64_t &shard_next_version); - std::shared_mutex serv_mux_; + // std::shared_mutex serv_mux_; int32_t service_port_; std::unique_ptr server_; @@ -631,13 +655,21 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService std::string config_file_path_; std::string migration_log_path_; - // shard id to data store - std::unordered_map> data_store_map_; + // Now, there is only one data store shard in a DataStoreService. + // To avoid using mutex in read or write APIs, use a atomic variable + // (shard_status_) to control concurrency conflicts. + // - During migraion, we change the shard_status_ firstly, then change the + // data_store_ after all read/write requests are finished. + // - In write functions, we increase the ongoing_write_requests_ firstly and + // then check the shard_status_. After the request is executed or if + // shard_status_ is not required, decrease them. + uint32_t shard_id_{UINT32_MAX}; + std::unique_ptr data_store_{nullptr}; + std::atomic shard_status_{DSShardStatus::Closed}; + std::atomic ongoing_write_requests_{0}; // scan iterator cache - std::shared_mutex scan_iter_cache_map_mux_; - std::unordered_map> - scan_iter_cache_map_; + TTLWrapperCache scan_iter_cache_; std::unique_ptr data_store_factory_; diff --git a/eloq_data_store_service/data_store_service_config.cpp b/eloq_data_store_service/data_store_service_config.cpp index 6bc7920..a0a4e97 100644 --- a/eloq_data_store_service/data_store_service_config.cpp +++ b/eloq_data_store_service/data_store_service_config.cpp @@ -195,7 +195,7 @@ DSShardStatus Topology::FetchDSShardStatus(uint32_t shard_id) const { return it->second.status_; } - return DSShardStatus::Unknown; + return DSShardStatus::Closed; } void Topology::UpdateDSShardStatus(uint32_t shard_id, DSShardStatus status) diff --git a/eloq_data_store_service/data_store_service_util.h b/eloq_data_store_service/data_store_service_util.h index 5075b58..f157371 100644 --- a/eloq_data_store_service/data_store_service_util.h +++ b/eloq_data_store_service/data_store_service_util.h @@ -28,10 +28,10 @@ namespace EloqDS { // Node status in DSS -enum DSShardStatus +enum DSShardStatus : uint8_t { // Node status is unknown, can be dead or alive, need to check - Unknown = 0, + // Unknown = 0, // Node is online with read only mode ReadOnly = 1, // Node is online with read write mode diff --git a/eloq_data_store_service/internal_request.h b/eloq_data_store_service/internal_request.h index df116ce..dabec17 100644 --- a/eloq_data_store_service/internal_request.h +++ b/eloq_data_store_service/internal_request.h @@ -80,10 +80,12 @@ class WriteRecordsRpcRequest : public WriteRecordsRequest done_ = nullptr; } - void Reset(const remote::BatchWriteRecordsRequest *req, + void Reset(DataStoreService *ds_service, + const remote::BatchWriteRecordsRequest *req, remote::BatchWriteRecordsResponse *resp, google::protobuf::Closure *done) { + data_store_service_ = ds_service; req_ = req; resp_ = resp; done_ = done; @@ -155,6 +157,7 @@ class WriteRecordsRpcRequest : public WriteRecordsRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); // Set error code and error message resp_->mutable_result()->set_error_code(result.error_code()); @@ -162,6 +165,7 @@ class WriteRecordsRpcRequest : public WriteRecordsRequest } private: + DataStoreService *data_store_service_{nullptr}; const remote::BatchWriteRecordsRequest *req_{nullptr}; remote::BatchWriteRecordsResponse *resp_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -191,7 +195,8 @@ class WriteRecordsLocalRequest : public WriteRecordsRequest parts_cnt_per_record_ = 1; } - void Reset(std::string_view table_name, + void Reset(DataStoreService *ds_service, + std::string_view table_name, int32_t partition_id, const std::vector &key_parts, const std::vector &record_parts, @@ -204,6 +209,7 @@ class WriteRecordsLocalRequest : public WriteRecordsRequest const uint16_t parts_cnt_per_key, const uint16_t parts_cnt_per_record) { + data_store_service_ = ds_service; table_name_ = table_name; partition_id_ = partition_id; key_parts_ = &key_parts; @@ -281,6 +287,7 @@ class WriteRecordsLocalRequest : public WriteRecordsRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); // Set error code and error message result_->set_error_code(result.error_code()); @@ -288,6 +295,7 @@ class WriteRecordsLocalRequest : public WriteRecordsRequest } private: + DataStoreService *data_store_service_{nullptr}; std::string_view table_name_; int32_t partition_id_; const std::vector *key_parts_{nullptr}; @@ -333,10 +341,12 @@ class FlushDataRpcRequest : public FlushDataRequest done_ = nullptr; } - void Reset(const remote::FlushDataRequest *req, + void Reset(DataStoreService *ds_service, + const remote::FlushDataRequest *req, remote::FlushDataResponse *resp, google::protobuf::Closure *done) { + data_store_service_ = ds_service; for (int idx = 0; idx < req->kv_table_name_size(); ++idx) { kv_table_names_.push_back(req->kv_table_name(idx)); @@ -354,6 +364,7 @@ class FlushDataRpcRequest : public FlushDataRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); ::EloqDS::remote::CommonResult *res = resp_->mutable_result(); @@ -372,6 +383,7 @@ class FlushDataRpcRequest : public FlushDataRequest } private: + DataStoreService *data_store_service_{nullptr}; std::vector kv_table_names_; const remote::FlushDataRequest *req_{nullptr}; remote::FlushDataResponse *resp_{nullptr}; @@ -393,10 +405,12 @@ class FlushDataLocalRequest : public FlushDataRequest done_ = nullptr; } - void Reset(const std::vector *kv_table_names, + void Reset(DataStoreService *ds_service, + const std::vector *kv_table_names, remote::CommonResult &result, google::protobuf::Closure *done) { + data_store_service_ = ds_service; kv_table_names_ = kv_table_names; result_ = &result; done_ = done; @@ -409,12 +423,14 @@ class FlushDataLocalRequest : public FlushDataRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); result_->set_error_code(result.error_code()); result_->set_error_msg(result.error_msg()); } private: + DataStoreService *data_store_service_{nullptr}; const std::vector *kv_table_names_{nullptr}; remote::CommonResult *result_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -455,10 +471,12 @@ class DeleteRangeRpcRequest : public DeleteRangeRequest done_ = nullptr; } - void Reset(const remote::DeleteRangeRequest *req, + void Reset(DataStoreService *ds_service, + const remote::DeleteRangeRequest *req, remote::DeleteRangeResponse *resp, google::protobuf::Closure *done) { + data_store_service_ = ds_service; req_ = req; resp_ = resp; done_ = done; @@ -491,6 +509,7 @@ class DeleteRangeRpcRequest : public DeleteRangeRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); ::EloqDS::remote::CommonResult *res = resp_->mutable_result(); @@ -499,6 +518,7 @@ class DeleteRangeRpcRequest : public DeleteRangeRequest } private: + DataStoreService *data_store_service_{nullptr}; const remote::DeleteRangeRequest *req_{nullptr}; remote::DeleteRangeResponse *resp_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -523,7 +543,8 @@ class DeleteRangeLocalRequest : public DeleteRangeRequest done_ = nullptr; } - void Reset(const std::string_view table_name, + void Reset(DataStoreService *ds_service, + const std::string_view table_name, const uint32_t partition_id, const std::string_view start_key, const std::string_view end_key, @@ -531,6 +552,7 @@ class DeleteRangeLocalRequest : public DeleteRangeRequest remote::CommonResult &result, google::protobuf::Closure *done) { + data_store_service_ = ds_service; table_name_ = table_name; partition_id_ = partition_id; start_key_ = start_key; @@ -567,12 +589,14 @@ class DeleteRangeLocalRequest : public DeleteRangeRequest void SetFinish(const remote::CommonResult &result) override { + data_store_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); result_->set_error_code(result.error_code()); result_->set_error_msg(result.error_msg()); } private: + DataStoreService *data_store_service_{nullptr}; std::string_view table_name_{""}; uint32_t partition_id_{0}; std::string_view start_key_{""}; @@ -819,10 +843,12 @@ class CreateTableRpcRequest : public CreateTableRequest done_ = nullptr; } - void Reset(const remote::CreateTableRequest *req, + void Reset(DataStoreService *ds_service, + const remote::CreateTableRequest *req, remote::CreateTableResponse *resp, google::protobuf::Closure *done) { + ds_service_ = ds_service; req_ = req; resp_ = resp; done_ = done; @@ -835,6 +861,7 @@ class CreateTableRpcRequest : public CreateTableRequest void SetFinish(const remote::CommonResult &result) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); ::EloqDS::remote::CommonResult *res = resp_->mutable_result(); @@ -853,6 +880,7 @@ class CreateTableRpcRequest : public CreateTableRequest } private: + DataStoreService *ds_service_{nullptr}; const remote::CreateTableRequest *req_{nullptr}; remote::CreateTableResponse *resp_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -873,10 +901,12 @@ class CreateTableLocalRequest : public CreateTableRequest done_ = nullptr; } - void Reset(const std::string_view table_name, + void Reset(DataStoreService *ds_service, + const std::string_view table_name, remote::CommonResult &result, google::protobuf::Closure *done) { + ds_service_ = ds_service; table_name_ = table_name; result_ = &result; done_ = done; @@ -889,12 +919,14 @@ class CreateTableLocalRequest : public CreateTableRequest void SetFinish(const remote::CommonResult &result) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); result_->set_error_code(result.error_code()); result_->set_error_msg(result.error_msg()); } private: + DataStoreService *ds_service_{nullptr}; std::string_view table_name_{""}; remote::CommonResult *result_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -930,10 +962,12 @@ class DropTableRpcRequest : public DropTableRequest done_ = nullptr; } - void Reset(const remote::DropTableRequest *req, + void Reset(DataStoreService *ds_service, + const remote::DropTableRequest *req, remote::DropTableResponse *resp, google::protobuf::Closure *done) { + ds_service_ = ds_service; req_ = req; resp_ = resp; done_ = done; @@ -946,6 +980,7 @@ class DropTableRpcRequest : public DropTableRequest void SetFinish(const remote::CommonResult &result) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); ::EloqDS::remote::CommonResult *res = resp_->mutable_result(); @@ -964,6 +999,7 @@ class DropTableRpcRequest : public DropTableRequest } private: + DataStoreService *ds_service_{nullptr}; const remote::DropTableRequest *req_{nullptr}; remote::DropTableResponse *resp_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -984,10 +1020,12 @@ class DropTableLocalRequest : public DropTableRequest done_ = nullptr; } - void Reset(const std::string_view table_name, + void Reset(DataStoreService *ds_service, + const std::string_view table_name, remote::CommonResult &result, google::protobuf::Closure *done) { + ds_service_ = ds_service; table_name_ = table_name; result_ = &result; done_ = done; @@ -1000,12 +1038,14 @@ class DropTableLocalRequest : public DropTableRequest void SetFinish(const remote::CommonResult &result) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); result_->set_error_code(result.error_code()); result_->set_error_msg(result.error_msg()); } private: + DataStoreService *ds_service_{nullptr}; std::string_view table_name_{""}; remote::CommonResult *result_{nullptr}; google::protobuf::Closure *done_{nullptr}; @@ -1437,6 +1477,7 @@ class CreateSnapshotForBackupRpcRequest : public CreateSnapshotForBackupRequest void SetFinish(const ::EloqDS::remote::DataStoreError error_code, const std::string error_message) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); ::EloqDS::remote::CommonResult *result = resp_->mutable_result(); result->set_error_code(error_code); @@ -1503,6 +1544,7 @@ class CreateSnapshotForBackupLocalRequest void SetFinish(const ::EloqDS::remote::DataStoreError error_code, const std::string error_message) override { + ds_service_->DecreaseWriteReqCount(); brpc::ClosureGuard done_guard(done_); result_->set_error_code(error_code); result_->set_error_msg(error_message); diff --git a/eloq_data_store_service/main.cpp b/eloq_data_store_service/main.cpp index cfebed1..0799ab8 100644 --- a/eloq_data_store_service/main.cpp +++ b/eloq_data_store_service/main.cpp @@ -125,7 +125,6 @@ void ShutDown() if (data_store_service_ != nullptr) { - data_store_service_->DisconnectDataStore(); data_store_service_ = nullptr; } @@ -277,10 +276,10 @@ int main(int argc, char *argv[]) Aws::InitAPI(*aws_options_); #endif + bool is_single_node = eloq_dss_peer_node.empty(); #if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \ defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS) bool enable_cache_replacement_ = FLAGS_enable_cache_replacement; - bool is_single_node = eloq_dss_peer_node.empty(); // INIReader config_reader(nullptr, 0); EloqDS::RocksDBConfig rocksdb_config(config_reader, data_path); @@ -291,7 +290,6 @@ int main(int argc, char *argv[]) #elif defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB) bool enable_cache_replacement_ = FLAGS_enable_cache_replacement; - bool is_single_node = eloq_dss_peer_node.empty(); EloqDS::RocksDBConfig rocksdb_config(config_reader, data_path); auto ds_factory = std::make_unique( @@ -320,64 +318,16 @@ int main(int argc, char *argv[]) ds_config_file_path, data_path + "/DSMigrateLog", std::move(ds_factory)); - std::vector dss_shards = ds_config.GetShardsForThisNode(); - std::unordered_map> - dss_shards_map; - // setup rocksdb cloud data store - for (int shard_id : dss_shards) - { -#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \ - defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS) - // TODO(lzx): move setup datastore to data_store_service - auto ds = std::make_unique( - rocksdb_cloud_config, - rocksdb_config, - (FLAGS_bootstrap || is_single_node), - enable_cache_replacement_, - shard_id, - data_store_service_.get()); -#elif defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB) - auto ds = std::make_unique( - rocksdb_config, - (FLAGS_bootstrap || is_single_node), - enable_cache_replacement_, - shard_id, - data_store_service_.get()); - -#elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) - auto ds = std::make_unique( - shard_id, data_store_service_.get()); -#else - assert(false); - std::unique_ptr ds = nullptr; -#endif - ds->Initialize(); - - // Start db if the shard status is not closed - if (ds_config.FetchDSShardStatus(shard_id) != - EloqDS::DSShardStatus::Closed) - { - bool ret = ds->StartDB(); - if (!ret) - { - LOG(ERROR) - << "Failed to start db instance in data store service"; - ShutDown(); - return 0; - } - } - dss_shards_map[shard_id] = std::move(ds); - } // setup local data store service - bool ret = data_store_service_->StartService(); + bool ret = + data_store_service_->StartService(FLAGS_bootstrap || is_single_node); if (!ret) { LOG(ERROR) << "Failed to start data store service"; ShutDown(); return 0; } - data_store_service_->ConnectDataStore(std::move(dss_shards_map)); if (!FLAGS_alsologtostderr) { diff --git a/eloq_data_store_service/rocksdb_cloud_data_store.cpp b/eloq_data_store_service/rocksdb_cloud_data_store.cpp index 0e4af21..c148999 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store.cpp +++ b/eloq_data_store_service/rocksdb_cloud_data_store.cpp @@ -705,16 +705,13 @@ bool RocksDBCloudDataStore::OpenCloudDB( void RocksDBCloudDataStore::CreateSnapshotForBackup( CreateSnapshotForBackupRequest *req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, req]() { // Create a guard to ensure the poolable object is released to pool std::unique_ptr poolable_guard = std::make_unique(req); - // Increase write counter at the start of the operation - IncreaseWriteCounter(); - std::unique_lock db_lk(db_mux_); if (db_ == nullptr) @@ -764,10 +761,16 @@ void RocksDBCloudDataStore::CreateSnapshotForBackup( // Resume background work db_->ContinueBackgroundWork(); - - // Decrease counter before return - DecreaseWriteCounter(); }); + + if (!res) + { + LOG(ERROR) << "Failed to submit switch to create snapshot work to " + "query worker pool"; + req->SetFinish(::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR, + "Fail to create snapshot, error: Fail to submit work to " + "query worker pool"); + } } rocksdb::DBCloud *RocksDBCloudDataStore::GetDBPtr() diff --git a/eloq_data_store_service/rocksdb_data_store_common.cpp b/eloq_data_store_service/rocksdb_data_store_common.cpp index 9515de5..780f2eb 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.cpp +++ b/eloq_data_store_service/rocksdb_data_store_common.cpp @@ -275,22 +275,18 @@ bool RocksDBDataStoreCommon::Initialize() void RocksDBDataStoreCommon::FlushData(FlushDataRequest *flush_data_req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, flush_data_req]() { // Create a guard to ensure the poolable object is released to pool std::unique_ptr poolable_guard = std::make_unique(flush_data_req); - // Increase write counter at the start of the operation - IncreaseWriteCounter(); - ::EloqDS::remote::CommonResult result; std::shared_lock db_lk(db_mux_); auto db = GetDBPtr(); if (!db) { - DecreaseWriteCounter(); // Decrease counter before error return result.set_error_code( ::EloqDS::remote::DataStoreError::DB_NOT_OPEN); result.set_error_msg("DB is not opened"); @@ -307,7 +303,6 @@ void RocksDBDataStoreCommon::FlushData(FlushDataRequest *flush_data_req) { LOG(ERROR) << "Unable to flush db with error: " << status.ToString(); - DecreaseWriteCounter(); // Decrease counter before error return result.set_error_code( ::EloqDS::remote::DataStoreError::FLUSH_FAILED); result.set_error_msg(status.ToString()); @@ -315,33 +310,39 @@ void RocksDBDataStoreCommon::FlushData(FlushDataRequest *flush_data_req) return; } - // Decrease counter before successful return - DecreaseWriteCounter(); result.set_error_code(::EloqDS::remote::DataStoreError::NO_ERROR); flush_data_req->SetFinish(result); DLOG(INFO) << "FlushData successfully."; }); + + if (!res) + { + LOG(ERROR) << "Failed to submit flush data work to query worker pool"; + ::EloqDS::remote::CommonResult result; + result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result.set_error_msg("DB is not opened"); + flush_data_req->SetFinish(result); + + flush_data_req->Clear(); + flush_data_req->Free(); + return; + } } void RocksDBDataStoreCommon::DeleteRange(DeleteRangeRequest *delete_range_req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, delete_range_req]() { // Create a guard to ensure the poolable object is released to pool std::unique_ptr poolable_guard = std::make_unique(delete_range_req); - // Increase write counter at the start of the operation - IncreaseWriteCounter(); - ::EloqDS::remote::CommonResult result; std::shared_lock db_lk(db_mux_); auto db = GetDBPtr(); if (!db) { - // Decrease counter before error return - DecreaseWriteCounter(); result.set_error_code( ::EloqDS::remote::DataStoreError::DB_NOT_OPEN); result.set_error_msg("DB is not opened"); @@ -380,9 +381,6 @@ void RocksDBDataStoreCommon::DeleteRange(DeleteRangeRequest *delete_range_req) { LOG(ERROR) << "Unable to delete range with error: " << status.ToString(); - - // Decrease counter before error return - DecreaseWriteCounter(); result.set_error_code( ::EloqDS::remote::DataStoreError::WRITE_FAILED); result.set_error_msg(status.ToString()); @@ -390,17 +388,27 @@ void RocksDBDataStoreCommon::DeleteRange(DeleteRangeRequest *delete_range_req) return; } - // Decrease counter before successful return - DecreaseWriteCounter(); result.set_error_code(::EloqDS::remote::DataStoreError::NO_ERROR); delete_range_req->SetFinish(result); DLOG(INFO) << "DeleteRange successfully."; }); + + if (!res) + { + LOG(ERROR) << "Failed to submit delete range work to query worker pool"; + ::EloqDS::remote::CommonResult result; + result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result.set_error_msg("DB is not opened"); + delete_range_req->SetFinish(result); + delete_range_req->Clear(); + delete_range_req->Free(); + return; + } } void RocksDBDataStoreCommon::Read(ReadRequest *req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, req]() { // Create a guard to ensure the poolable object is released to pool @@ -451,6 +459,14 @@ void RocksDBDataStoreCommon::Read(ReadRequest *req) << " failed, status:" << status.ToString(); } }); + if (!res) + { + LOG(ERROR) << "Failed to submit read work to query worker pool"; + req->SetFinish(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + req->Clear(); + req->Free(); + return; + } } void RocksDBDataStoreCommon::BatchWriteRecords( @@ -465,7 +481,7 @@ void RocksDBDataStoreCommon::BatchWriteRecords( return; } - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, batch_write_req]() mutable { // Create a guard to ensure the poolable object is released to pool @@ -506,9 +522,6 @@ void RocksDBDataStoreCommon::BatchWriteRecords( return; } - // Increase write counter before starting the write operation - IncreaseWriteCounter(); - const uint16_t parts_cnt_per_key = batch_write_req->PartsCountPerKey(); const uint16_t parts_cnt_per_record = @@ -600,10 +613,22 @@ void RocksDBDataStoreCommon::BatchWriteRecords( } } - DecreaseWriteCounter(); result.set_error_code(::EloqDS::remote::DataStoreError::NO_ERROR); batch_write_req->SetFinish(result); }); + + if (!res) + { + LOG(ERROR) << "Failed to submit batch write work to query worker pool"; + ::EloqDS::remote::CommonResult result; + result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result.set_error_msg("DB is not opened"); + batch_write_req->SetFinish(result); + + batch_write_req->Clear(); + batch_write_req->Free(); + return; + } } void RocksDBDataStoreCommon::CreateTable(CreateTableRequest *create_table_req) @@ -619,23 +644,18 @@ void RocksDBDataStoreCommon::CreateTable(CreateTableRequest *create_table_req) void RocksDBDataStoreCommon::DropTable(DropTableRequest *drop_table_req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, drop_table_req]() { // Create a guard to ensure the poolable object is released to pool std::unique_ptr poolable_guard = std::make_unique(drop_table_req); - // Increase write counter at the start of the operation - IncreaseWriteCounter(); - ::EloqDS::remote::CommonResult result; std::shared_lock db_lk(db_mux_); auto db = GetDBPtr(); if (!db) { - // Decrease counter before error return - DecreaseWriteCounter(); result.set_error_code( ::EloqDS::remote::DataStoreError::DB_NOT_OPEN); result.set_error_msg("DB is not opened"); @@ -661,9 +681,6 @@ void RocksDBDataStoreCommon::DropTable(DropTableRequest *drop_table_req) { LOG(ERROR) << "Unable to drop table with error: " << status.ToString(); - - // Decrease counter before error return - DecreaseWriteCounter(); result.set_error_code( ::EloqDS::remote::DataStoreError::WRITE_FAILED); result.set_error_msg(status.ToString()); @@ -680,8 +697,6 @@ void RocksDBDataStoreCommon::DropTable(DropTableRequest *drop_table_req) { LOG(ERROR) << "Unable to drop table with error: " << status.ToString(); - // Decrease counter before error return - DecreaseWriteCounter(); result.set_error_code( ::EloqDS::remote::DataStoreError::FLUSH_FAILED); result.set_error_msg(status.ToString()); @@ -689,17 +704,28 @@ void RocksDBDataStoreCommon::DropTable(DropTableRequest *drop_table_req) return; } - // Decrease counter before successful return - DecreaseWriteCounter(); result.set_error_code(::EloqDS::remote::DataStoreError::NO_ERROR); drop_table_req->SetFinish(result); DLOG(INFO) << "DropTable successfully."; }); + + if (!res) + { + LOG(ERROR) << "Failed to submit drop table work to query worker pool"; + ::EloqDS::remote::CommonResult result; + result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result.set_error_msg("DB is not opened"); + drop_table_req->SetFinish(result); + + drop_table_req->Clear(); + drop_table_req->Free(); + return; + } } void RocksDBDataStoreCommon::ScanNext(ScanRequest *scan_req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, scan_req]() { // DLOG(INFO) << "RocksDBDataStoreCommon::ScanNext " @@ -983,11 +1009,20 @@ void RocksDBDataStoreCommon::ScanNext(ScanRequest *scan_req) scan_req->SetFinish(::EloqDS::remote::DataStoreError::NO_ERROR); }); + + if (!res) + { + LOG(ERROR) << "Failed to submit scan work to query worker pool"; + scan_req->SetFinish(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + scan_req->Clear(); + scan_req->Free(); + return; + } } void RocksDBDataStoreCommon::ScanClose(ScanRequest *scan_req) { - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, scan_req]() { // Create a guard to ensure the poolable object is released to pool @@ -1002,14 +1037,12 @@ void RocksDBDataStoreCommon::ScanClose(ScanRequest *scan_req) scan_req->SetFinish(::EloqDS::remote::DataStoreError::NO_ERROR); }); -} - -void RocksDBDataStoreCommon::WaitForPendingWrites() -{ - // Wait until all ongoing write requests are completed - while (ongoing_write_requests_.load(std::memory_order_acquire) > 0) + if (!res) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + LOG(ERROR) << "Failed to submit scan close work to query worker pool"; + scan_req->SetFinish(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + scan_req->Clear(); + scan_req->Free(); } } @@ -1019,27 +1052,15 @@ DSShardStatus RocksDBDataStoreCommon::FetchDSShardStatus() const return data_store_service_->FetchDSShardStatus(shard_id_); } -void RocksDBDataStoreCommon::IncreaseWriteCounter() -{ - ongoing_write_requests_.fetch_add(1, std::memory_order_release); -} - -void RocksDBDataStoreCommon::DecreaseWriteCounter() -{ - ongoing_write_requests_.fetch_sub(1, std::memory_order_release); -} - void RocksDBDataStoreCommon::SwitchToReadOnly() { - WaitForPendingWrites(); - bthread::Mutex mutex; bthread::ConditionVariable cond_var; bool done = false; // pause all background jobs to stop compaction and obselete file // deletion - query_worker_pool_->SubmitWork( + bool res = query_worker_pool_->SubmitWork( [this, &mutex, &cond_var, &done]() { // Run pause background work in a separate thread to avoid blocking @@ -1055,6 +1076,12 @@ void RocksDBDataStoreCommon::SwitchToReadOnly() done = true; cond_var.notify_one(); }); + if (!res) + { + LOG(ERROR) + << "Failed to submit switch to read only work to query worker pool"; + return; + } std::unique_lock lk(mutex); diff --git a/eloq_data_store_service/rocksdb_data_store_common.h b/eloq_data_store_service/rocksdb_data_store_common.h index ead10f1..617d54a 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.h +++ b/eloq_data_store_service/rocksdb_data_store_common.h @@ -150,8 +150,7 @@ class RocksDBDataStoreCommon : public DataStore received_snapshot_path_(storage_path_ + "/received_snapshot/"), create_db_if_missing_(create_if_missing), tx_enable_cache_replacement_(tx_enable_cache_replacement), - ttl_compaction_filter_(nullptr), - ongoing_write_requests_(0) + ttl_compaction_filter_(nullptr) { info_log_level_ = StringToInfoLogLevel(config.info_log_level_); query_worker_pool_ = @@ -229,21 +228,6 @@ class RocksDBDataStoreCommon : public DataStore DSShardStatus FetchDSShardStatus() const; protected: - /** - * @brief Wait for all pending writes to complete. - */ - void WaitForPendingWrites(); - - /** - * @brief Increase the pending write counter. - */ - void IncreaseWriteCounter(); - - /** - * @brief Decrease the pending write counter. - */ - void DecreaseWriteCounter(); - #ifdef DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3 virtual rocksdb::DBCloud *GetDBPtr() = 0; #else @@ -336,7 +320,5 @@ class RocksDBDataStoreCommon : public DataStore std::unique_ptr query_worker_pool_; std::shared_mutex db_mux_; std::mutex ddl_mux_; - - std::atomic ongoing_write_requests_{0}; }; } // namespace EloqDS \ No newline at end of file diff --git a/eloq_data_store_service/thread_worker_pool.cpp b/eloq_data_store_service/thread_worker_pool.cpp index 7378441..3585111 100644 --- a/eloq_data_store_service/thread_worker_pool.cpp +++ b/eloq_data_store_service/thread_worker_pool.cpp @@ -76,15 +76,16 @@ size_t ThreadWorkerPool::WorkQueueSize() return work_queue_.size(); } -void ThreadWorkerPool::SubmitWork(std::function work) +bool ThreadWorkerPool::SubmitWork(std::function work) { std::unique_lock lk(work_queue_mutex_); if (shutdown_indicator_.load(std::memory_order_acquire)) { - return; + return false; } work_queue_.push_back(std::move(work)); work_queue_cv_.notify_one(); + return true; } void ThreadWorkerPool::Shutdown() diff --git a/eloq_data_store_service/thread_worker_pool.h b/eloq_data_store_service/thread_worker_pool.h index e057126..95ea212 100644 --- a/eloq_data_store_service/thread_worker_pool.h +++ b/eloq_data_store_service/thread_worker_pool.h @@ -39,7 +39,7 @@ class ThreadWorkerPool ThreadWorkerPool(size_t max_workers_num = 1); ~ThreadWorkerPool() = default; - void SubmitWork(std::function work); + bool SubmitWork(std::function work); size_t WorkQueueSize(); void Shutdown(); size_t WorkerPoolSize() From 7f95ea71ea49ca64a7353f59e665015a27c2f5d1 Mon Sep 17 00:00:00 2001 From: lzxddz Date: Wed, 24 Sep 2025 19:01:57 +0800 Subject: [PATCH 2/4] update --- eloq_data_store_service/data_store_service.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index 66f0ade..b64a9f7 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -610,7 +610,9 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService bool IsOwnerOfShard(uint32_t shard_id) { - return shard_id_ == shard_id; + return shard_status_.load(std::memory_order_acquire) != + DSShardStatus::Closed && + shard_id_ == shard_id; } DataStore *GetDataStore(uint32_t shard_id) From 52ecce2f0cdeb59d5575d3502bd0c45e70ca964a Mon Sep 17 00:00:00 2001 From: LZX <37354838+lzxddz@users.noreply.github.com> Date: Tue, 30 Sep 2025 14:33:10 +0800 Subject: [PATCH 3/4] Refactor cluster manager on client (#95) * Update cluster config in client * fix datastore migration error --- data_store_service_client.cpp | 467 +++++++++++------- data_store_service_client.h | 152 ++++-- data_store_service_client_closure.h | 133 +++-- .../data_store_service.cpp | 144 +++--- eloq_data_store_service/data_store_service.h | 13 +- .../data_store_service_config.cpp | 5 +- .../data_store_service_util.h | 3 + eloq_data_store_service/ds_request.proto | 3 +- 8 files changed, 583 insertions(+), 337 deletions(-) diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index c3a1426..d885d0a 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -85,13 +85,6 @@ static const std::string_view KEY_SEPARATOR("\\"); DataStoreServiceClient::~DataStoreServiceClient() { - { - std::unique_lock lk(ds_service_mutex_); - ds_serv_shutdown_indicator_.store(true, std::memory_order_release); - ds_service_cv_.notify_all(); - LOG(INFO) << "Notify ds_serv_shutdown_indicator"; - } - upsert_table_worker_.Shutdown(); } @@ -109,15 +102,35 @@ DataStoreServiceClient::~DataStoreServiceClient() void DataStoreServiceClient::SetupConfig( const DataStoreServiceClusterManager &cluster_manager) { - for (const auto &[_, group] : cluster_manager.GetAllShards()) + assert(cluster_manager.GetShardCount() == 1); + auto current_version = + dss_topology_version_.load(std::memory_order_acquire); + auto new_version = cluster_manager.GetTopologyVersion(); + if (current_version <= cluster_manager.GetTopologyVersion() && + dss_topology_version_.compare_exchange_strong(current_version, + new_version)) { - for (const auto &node : group.nodes_) + for (const auto &[_, group] : cluster_manager.GetAllShards()) { - LOG(INFO) << "Node Hostname: " << node.host_name_ - << ", Port: " << node.port_; + for (const auto &node : group.nodes_) + { + LOG(INFO) << "Node Hostname: " << node.host_name_ + << ", Port: " << node.port_; + } + // The first node is the owner of shard. + assert(group.nodes_.size() > 0); + while (!UpgradeShardVersion(group.shard_id_, + group.version_, + group.nodes_[0].host_name_, + group.nodes_[0].port_)) + { + LOG(INFO) << "UpgradeShardVersion failed, retry"; + bthread_usleep(1000000); + } + LOG(INFO) << "UpgradeShardVersion success, shard_id:" + << group.shard_id_ << ", version:" << group.version_; } } - cluster_manager_ = cluster_manager; } /** @@ -2743,12 +2756,12 @@ bool DataStoreServiceClient::CreateSnapshotForBackup( { CreateSnapshotForBackupClosure *closure = create_snapshot_for_backup_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } CreateSnapshotForBackupCallbackData *callback_data = @@ -2807,17 +2820,11 @@ void DataStoreServiceClient::CreateSnapshotForBackupInternal( { // Handle remote shard closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - LOG(WARNING) << "Failed to get channel for shard " << shard_id; - // Continue with next shard - CreateSnapshotForBackupInternal(closure); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - closure->SetChannel(channel); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *closure->Controller(); cntl.set_timeout_ms(30000); // Longer timeout for backup operations auto *req = closure->RemoteRequest(); @@ -2904,9 +2911,12 @@ void DataStoreServiceClient::OnShutdown() */ bool DataStoreServiceClient::IsLocalShard(uint32_t shard_id) { - // this is a temporary solution for scale up scenario (from one smaller - // node to another bigger node) - return cluster_manager_.IsOwnerOfShard(shard_id); + if (data_store_service_ != nullptr) + { + return data_store_service_->IsOwnerOfShard(shard_id); + } + + return false; } /** @@ -2921,7 +2931,186 @@ bool DataStoreServiceClient::IsLocalShard(uint32_t shard_id) */ bool DataStoreServiceClient::IsLocalPartition(int32_t partition_id) { - return cluster_manager_.IsOwnerOfPartition(partition_id); + return IsLocalShard(GetShardIdByPartitionId(partition_id)); +} + +uint32_t DataStoreServiceClient::GetShardIdByPartitionId( + int32_t partition_id) const +{ + // Now, only support one shard. + return 0; +} + +uint32_t DataStoreServiceClient::AllDataShardCount() const +{ + return dss_shards_.size(); +} + +uint32_t DataStoreServiceClient::GetOwnerNodeIndexOfShard( + uint32_t shard_id) const +{ + assert(dss_shards_[shard_id].load(std::memory_order_acquire) != UINT32_MAX); + return dss_shards_[shard_id].load(std::memory_order_acquire); +} + +bool DataStoreServiceClient::UpdateOwnerNodeIndexOfShard( + uint32_t shard_id, uint32_t old_node_index, uint32_t &new_node_index) +{ + new_node_index = dss_shards_[shard_id].load(std::memory_order_acquire); + if (new_node_index != old_node_index) + { + return true; + } + + uint64_t expect_val = 0; + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + if (dss_nodes_[old_node_index].expired_ts_.compare_exchange_strong( + expect_val, current_ts)) + { + // The old node channle is not updated by other, update it. + uint32_t free_index = FindFreeNodeIndex(); + if (free_index == dss_nodes_.size()) + { + LOG(ERROR) << "Find free node index failed"; + dss_nodes_[old_node_index].expired_ts_.store( + expect_val, std::memory_order_release); + return false; + } + auto &node = dss_nodes_[free_index]; + node.Reset(dss_nodes_[old_node_index].HostName(), + dss_nodes_[old_node_index].Port(), + dss_nodes_[old_node_index].ShardVersion()); + if (dss_shards_[shard_id].compare_exchange_strong(old_node_index, + free_index)) + { + new_node_index = free_index; + return true; + } + else + { + DLOG(INFO) << "Other thread updated the data shard, shard_id:" + << shard_id; + node.expired_ts_.store(1, std::memory_order_release); + new_node_index = old_node_index; + return true; + } + } + else + { + // Other thread is updating the shard. Waiting. + return false; + } +} + +uint32_t DataStoreServiceClient::FindFreeNodeIndex() +{ + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + for (uint32_t i = 0; i < dss_nodes_.size(); i++) + { + uint64_t expired_ts = + dss_nodes_[i].expired_ts_.load(std::memory_order_acquire); + if (expired_ts > 0 && expired_ts < current_ts && + (current_ts - expired_ts) > NodeExpiredTime && + dss_nodes_[i].expired_ts_.compare_exchange_strong(expired_ts, 0)) + { + return i; + } + } + // not found + return dss_nodes_.size(); +} + +void DataStoreServiceClient::HandleShardingError( + const ::EloqDS::remote::CommonResult &result) +{ + assert(result.error_code() == + static_cast( + ::EloqDS::remote::DataStoreError::REQUESTED_NODE_NOT_OWNER)); + + auto &new_key_sharding = result.new_key_sharding(); + auto error_type = new_key_sharding.type(); + if (error_type == + ::EloqDS::remote::KeyShardingErrorType::PrimaryNodeChanged) + { + uint32_t shard_id = new_key_sharding.shard_id(); + uint64_t shard_version = new_key_sharding.shard_version(); + auto &primary_node = new_key_sharding.new_primary_node(); + DSSNode new_primary_node; + while (!UpgradeShardVersion(shard_id, + shard_version, + primary_node.host_name(), + primary_node.port())) + { + DLOG(INFO) << "Upgrade shard version failed, shard_id: " + << shard_id; + bthread_usleep(10000); + continue; + } + } + else + { + assert(false); + // the whole node group has changed + LOG(FATAL) << "The topology of data shards is changed"; + // TODO(lzx): handle the topology of cluster change. + } +} + +bool DataStoreServiceClient::UpgradeShardVersion(uint32_t shard_id, + uint64_t shard_version, + const std::string &host_name, + uint16_t port) +{ + if (shard_id >= dss_shards_.size()) + { + assert(false); + // Now only support one shard. + LOG(FATAL) << "Shard id not found, shard_id: " << shard_id; + return true; + } + + uint32_t node_index = dss_shards_[shard_id].load(std::memory_order_acquire); + auto &node_ref = dss_nodes_[node_index]; + if (node_ref.ShardVersion() < shard_version) + { + uint64_t expect_val = 0; + uint64_t current_ts = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + if (!node_ref.expired_ts_.compare_exchange_strong(expect_val, + current_ts)) + { + // Other thread is updating the shard, retry. + DLOG(INFO) << "Other thread is updating the data shard, shard_id: " + << shard_id; + return false; + } + + uint32_t free_node_index = FindFreeNodeIndex(); + if (free_node_index == dss_nodes_.size()) + { + DLOG(INFO) << "Find free node index failed"; + node_ref.expired_ts_.store(expect_val, std::memory_order_release); + return false; + } + auto &free_node_ref = dss_nodes_[free_node_index]; + free_node_ref.Reset(host_name, port, shard_version); + if (!dss_shards_[shard_id].compare_exchange_strong(node_index, + free_node_index)) + { + assert(false); + free_node_ref.expired_ts_.store(1, std::memory_order_release); + } + } + return true; } txservice::store::DataStoreHandler::DataStoreOpStatus @@ -2994,46 +3183,40 @@ void DataStoreServiceClient::Read(const std::string_view kv_table_name, void *callback_data, DataStoreCallback callback) { - ReadClosure *read_clouse = read_closure_pool_.NextObject(); - read_clouse->Reset( + ReadClosure *read_closure = read_closure_pool_.NextObject(); + read_closure->Reset( this, kv_table_name, partition_id, key, callback_data, callback); - ReadInternal(read_clouse); + ReadInternal(read_closure); } -void DataStoreServiceClient::ReadInternal(ReadClosure *read_clouse) +void DataStoreServiceClient::ReadInternal(ReadClosure *read_closure) { - if (IsLocalPartition(read_clouse->PartitionId())) + if (IsLocalPartition(read_closure->PartitionId())) { - read_clouse->PrepareRequest(true); - data_store_service_->Read(read_clouse->TableName(), - read_clouse->PartitionId(), - read_clouse->Key(), - &read_clouse->LocalValueRef(), - &read_clouse->LocalTsRef(), - &read_clouse->LocalTtlRef(), - &read_clouse->LocalResultRef(), - read_clouse); + read_closure->PrepareRequest(true); + data_store_service_->Read(read_closure->TableName(), + read_closure->PartitionId(), + read_closure->Key(), + &read_closure->LocalValueRef(), + &read_closure->LocalTsRef(), + &read_closure->LocalTtlRef(), + &read_closure->LocalResultRef(), + read_closure); } else { - read_clouse->PrepareRequest(false); - auto channel = - GetDataStoreServiceChannelByPartitionId(read_clouse->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(read_clouse); - ::EloqDS::remote::CommonResult &result = read_clouse->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } - - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); - brpc::Controller &cntl = *read_clouse->Controller(); + read_closure->PrepareRequest(false); + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(read_closure->PartitionId())); + read_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); + + EloqDS::remote::DataStoreRpcService_Stub stub(channel); + brpc::Controller &cntl = *read_closure->Controller(); cntl.set_timeout_ms(5000); - auto *req = read_clouse->ReadRequest(); - auto *resp = read_clouse->ReadResponse(); - stub.Read(&cntl, req, resp, read_clouse); + auto *req = read_closure->ReadRequest(); + auto *resp = read_closure->ReadResponse(); + stub.Read(&cntl, req, resp, read_closure); } } @@ -3076,19 +3259,12 @@ void DataStoreServiceClient::DeleteRangeInternal( else { delete_range_clouse->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - delete_range_clouse->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(delete_range_clouse); - ::EloqDS::remote::CommonResult &result = - delete_range_clouse->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(delete_range_clouse->PartitionId())); + delete_range_clouse->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *delete_range_clouse->Controller(); cntl.set_timeout_ms(5000); auto *req = delete_range_clouse->DeleteRangeRequest(); @@ -3103,12 +3279,12 @@ void DataStoreServiceClient::FlushData( DataStoreCallback callback) { FlushDataClosure *closure = flush_data_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } closure->Reset( @@ -3133,18 +3309,11 @@ void DataStoreServiceClient::FlushDataInternal( else { flush_data_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - brpc::ClosureGuard guard(flush_data_closure); - ::EloqDS::remote::CommonResult &result = - flush_data_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + flush_data_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *flush_data_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = flush_data_closure->FlushDataRequest(); @@ -3161,12 +3330,12 @@ void DataStoreServiceClient::DropTable(std::string_view table_name, DLOG(INFO) << "DropTableWithRetry for table: " << table_name; DropTableClosure *closure = drop_table_closure_pool_.NextObject(); - auto shards = cluster_manager_.GetAllShards(); + uint32_t shard_cnt = AllDataShardCount(); std::vector shard_ids; - shard_ids.reserve(shards.size()); - for (auto &[s_id, _] : shards) + shard_ids.reserve(shard_cnt); + for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++) { - shard_ids.push_back(s_id); + shard_ids.push_back(shard_id); } closure->Reset( @@ -3191,18 +3360,11 @@ void DataStoreServiceClient::DropTableInternal( else { drop_table_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByShardId(shard_id); - if (!channel) - { - brpc::ClosureGuard guard(drop_table_closure); - ::EloqDS::remote::CommonResult &result = - drop_table_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard(shard_id); + drop_table_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *drop_table_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = drop_table_closure->DropTableRequest(); @@ -3266,19 +3428,12 @@ void DataStoreServiceClient::ScanNextInternal( else { scan_next_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - scan_next_closure->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(scan_next_closure); - ::EloqDS::remote::CommonResult &result = - scan_next_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(scan_next_closure->PartitionId())); + scan_next_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *scan_next_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = scan_next_closure->ScanNextRequest(); @@ -3325,19 +3480,12 @@ void DataStoreServiceClient::ScanCloseInternal( else { scan_next_closure->PrepareRequest(false); - auto channel = GetDataStoreServiceChannelByPartitionId( - scan_next_closure->PartitionId()); - if (!channel) - { - brpc::ClosureGuard guard(scan_next_closure); - ::EloqDS::remote::CommonResult &result = - scan_next_closure->Result(); - result.set_error_code( - ::EloqDS::remote::DataStoreError::NETWORK_ERROR); - return; - } + uint32_t node_index = GetOwnerNodeIndexOfShard( + GetShardIdByPartitionId(scan_next_closure->PartitionId())); + scan_next_closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); - EloqDS::remote::DataStoreRpcService_Stub stub(channel.get()); + EloqDS::remote::DataStoreRpcService_Stub stub(channel); brpc::Controller &cntl = *scan_next_closure->Controller(); cntl.set_timeout_ms(5000); auto *req = scan_next_closure->ScanNextRequest(); @@ -3591,47 +3739,6 @@ bool DataStoreServiceClient::DeleteTableStatistics( return true; } -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannelByPartitionId( - uint32_t partition_id) -{ - return cluster_manager_.GetDataStoreServiceChannelByPartitionId( - partition_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannelByPartitionId( - uint32_t partition_id) -{ - return cluster_manager_.UpdateDataStoreServiceChannelByPartitionId( - partition_id); -} - -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannel(const DSSNode &node) -{ - return cluster_manager_.GetDataStoreServiceChannel(node); -} - -std::shared_ptr -DataStoreServiceClient::GetDataStoreServiceChannelByShardId(uint32_t shard_id) -{ - return cluster_manager_.GetDataStoreServiceChannelByShardId(shard_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannelByShardId( - uint32_t shard_id) -{ - return cluster_manager_.UpdateDataStoreServiceChannelByShardId(shard_id); -} - -std::shared_ptr -DataStoreServiceClient::UpdateDataStoreServiceChannel(const DSSNode &node) -{ - return cluster_manager_.UpdateDataStoreServiceChannel(node); -} - void DataStoreServiceClient::BatchWriteRecords( std::string_view kv_table_name, int32_t partition_id, @@ -3675,7 +3782,7 @@ void DataStoreServiceClient::BatchWriteRecordsInternal( if (IsLocalShard(req_shard_id)) { - closure->is_local_request_ = true; + closure->PrepareRequest(true); data_store_service_->BatchWriteRecords(closure->kv_table_name_, closure->partition_id_, closure->key_parts_, @@ -3691,26 +3798,14 @@ void DataStoreServiceClient::BatchWriteRecordsInternal( } else { - closure->is_local_request_ = false; - - auto channel = - cluster_manager_.GetDataStoreServiceChannelByShardId(req_shard_id); - if (!channel) - { - // TODO(lzx): retry.. - assert(false); - closure->result_.set_error_code( - remote::DataStoreError::NETWORK_ERROR); - closure->Run(); - return; - } - // prepare request - closure->PrepareRemoteRequest(); - // timeout is set in the PrepareRemoteRequest + closure->PrepareRequest(false); + uint32_t node_index = GetOwnerNodeIndexOfShard(req_shard_id); + closure->SetRemoteNodeIndex(node_index); + auto *channel = dss_nodes_[node_index].Channel(); // send request - remote::DataStoreRpcService_Stub stub(channel.get()); + remote::DataStoreRpcService_Stub stub(channel); stub.BatchWriteRecords(closure->Controller(), closure->RemoteRequest(), closure->RemoteResponse(), diff --git a/data_store_service_client.h b/data_store_service_client.h index 1df3da4..a244e92 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -55,6 +55,8 @@ class ScanNextClosure; class CreateSnapshotForBackupClosure; class SinglePartitionScanner; +class DssClusterConfig; + typedef void (*DataStoreCallback)(void *data, ::google::protobuf::Closure *closure, DataStoreServiceClient &client, @@ -69,16 +71,27 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler txservice::CatalogFactory *catalog_factory[3], const DataStoreServiceClusterManager &cluster_manager, DataStoreService *data_store_service = nullptr) - : ds_serv_shutdown_indicator_(false), - catalog_factory_array_{catalog_factory[0], + : catalog_factory_array_{catalog_factory[0], catalog_factory[1], catalog_factory[2], &range_catalog_factory_, &hash_catalog_factory_}, - cluster_manager_(cluster_manager), - data_store_service_(data_store_service), - flying_remote_fetch_count_(0) + data_store_service_(data_store_service) { + // Init dss cluster config. + dss_topology_version_ = cluster_manager.GetTopologyVersion(); + auto all_shards = cluster_manager.GetAllShards(); + assert(all_shards.size() == 1); + for (auto &[shard_id, shard] : all_shards) + { + uint32_t node_idx = FindFreeNodeIndex(); + auto &node_ref = dss_nodes_[node_idx]; + node_ref.Reset(shard.nodes_[0].host_name_, + shard.nodes_[0].port_, + shard.version_); + dss_shards_[shard_id].store(shard_id); + } + if (data_store_service_ != nullptr) { data_store_service_->AddListenerForUpdateConfig( @@ -359,24 +372,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler void OnShutdown() override; - void HandleShardingError(const ::EloqDS::remote::CommonResult &result) - { - cluster_manager_.HandleShardingError(result); - } - - std::shared_ptr GetDataStoreServiceChannelByPartitionId( - uint32_t partition_id); - std::shared_ptr UpdateDataStoreServiceChannelByPartitionId( - uint32_t partition_id); - std::shared_ptr GetDataStoreServiceChannelByShardId( - uint32_t shard_id); - std::shared_ptr UpdateDataStoreServiceChannelByShardId( - uint32_t shard_id); - std::shared_ptr GetDataStoreServiceChannel( - const DSSNode &node); - std::shared_ptr UpdateDataStoreServiceChannel( - const DSSNode &node); - /** * Serialize a record with is_deleted flag and record string. * @param is_deleted @@ -616,6 +611,12 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler #endif } + const txservice::CatalogFactory *GetCatalogFactory( + txservice::TableEngine table_engine) + { + return catalog_factory_array_.at(static_cast(table_engine) - 1); + } + /** * @brief Check if the shard_id is local to the current node. * @param shard_id @@ -623,17 +624,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ bool IsLocalShard(uint32_t shard_id); - uint32_t GetShardIdByPartitionId(int32_t partition_id) - { - return cluster_manager_.GetShardIdByPartitionId(partition_id); - } - - const txservice::CatalogFactory *GetCatalogFactory( - txservice::TableEngine table_engine) - { - return catalog_factory_array_.at(static_cast(table_engine) - 1); - } - /** * @brief Check if the partition_id is local to the current node. * @param partition_id @@ -641,9 +631,18 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ bool IsLocalPartition(int32_t partition_id); - bthread::Mutex ds_service_mutex_; - bthread::ConditionVariable ds_service_cv_; - std::atomic ds_serv_shutdown_indicator_; + uint32_t GetShardIdByPartitionId(int32_t partition_id) const; + uint32_t AllDataShardCount() const; + uint32_t GetOwnerNodeIndexOfShard(uint32_t shard_id) const; + bool UpdateOwnerNodeIndexOfShard(uint32_t shard_id, + uint32_t old_node_index, + uint32_t &new_node_index); + uint32_t FindFreeNodeIndex(); + void HandleShardingError(const ::EloqDS::remote::CommonResult &result); + bool UpgradeShardVersion(uint32_t shard_id, + uint64_t shard_version, + const std::string &host_name, + uint16_t port); txservice::EloqHashCatalogFactory hash_catalog_factory_{}; txservice::EloqRangeCatalogFactory range_catalog_factory_{}; @@ -651,15 +650,76 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler // EngineServer TxService and DataStoreHandler std::array catalog_factory_array_; - // remote data store service configuration - DataStoreServiceClusterManager cluster_manager_; - + // bthread::Mutex ds_service_mutex_; + // bthread::ConditionVariable ds_service_cv_; + // std::atomic ds_serv_shutdown_indicator_; // point to the data store service if it is colocated DataStoreService *data_store_service_; - std::atomic flying_remote_fetch_count_{0}; - // Work queue for fetch records from primary node - std::deque remote_fetch_cc_queue_; + struct DssNode + { + DssNode() = default; + ~DssNode() = default; + DssNode(const DssNode &rhs) + : host_name_(rhs.host_name_), + port_(rhs.port_), + shard_verion_(rhs.shard_verion_) + { + } + DssNode &operator=(const DssNode &) = delete; + + void Reset(const std::string hostname, + uint16_t port, + uint64_t shard_version) + { + assert(expired_ts_.load(std::memory_order_acquire) == 0); + host_name_ = hostname; + port_ = port; + shard_verion_ = shard_version; + channel_.Init(host_name_.c_str(), port_, nullptr); + } + + const std::string &HostName() const + { + return host_name_; + } + uint16_t Port() const + { + return port_; + } + uint64_t ShardVersion() const + { + return shard_verion_; + } + brpc::Channel *Channel() + { + assert(!host_name_.empty() && port_ != 0); + return &channel_; + } + + // expired_ts_ is the timestamp when the node is expired. + // If expired_ts_ is 0, the node is not expired. + // If expired_ts_ is not 0, the node is expired and the value is the + // timestamp when the node is expired. + std::atomic expired_ts_{1U}; + + private: + std::string host_name_; + uint16_t port_; + brpc::Channel channel_; + uint64_t shard_verion_; + }; + // Cached leader nodes info of data shard. + std::array dss_nodes_; + const uint64_t NodeExpiredTime = 10 * 1000 * 1000; // 10s + // Now only support one shard. dss_shards_ caches the index in dss_nodes_ of + // shard owner. + std::array, 1> dss_shards_; + std::atomic dss_topology_version_{0}; + + // std::atomic flying_remote_fetch_count_{0}; + // // Work queue for fetch records from primary node + // std::deque remote_fetch_cc_queue_; // table names and their kv table names std::unordered_map @@ -674,9 +734,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler friend class ScanNextClosure; friend class CreateSnapshotForBackupClosure; friend void PartitionBatchCallback(void *data, - ::google::protobuf::Closure *closure, - DataStoreServiceClient &client, - const remote::CommonResult &result); + ::google::protobuf::Closure *closure, + DataStoreServiceClient &client, + const remote::CommonResult &result); friend class SinglePartitionScanner; friend void FetchAllDatabaseCallback(void *data, ::google::protobuf::Closure *closure, diff --git a/data_store_service_client_closure.h b/data_store_service_client_closure.h index 7878c31..d7ee3b2 100644 --- a/data_store_service_client_closure.h +++ b/data_store_service_client_closure.h @@ -611,6 +611,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable ds_service_client_ = client; callback_data_ = callback_data; callback_ = callback; + remote_node_index_ = UINT32_MAX; } void Clear() override @@ -638,6 +639,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -675,10 +677,12 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - auto channel = - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( - partition_id_); + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( + partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -854,6 +858,11 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable return is_local_request_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: bool is_local_request_{false}; bool rpc_request_prepare_{false}; @@ -864,7 +873,7 @@ class ReadClosure : public ::google::protobuf::Closure, public Poolable brpc::Controller cntl_; EloqDS::remote::ReadRequest request_; EloqDS::remote::ReadResponse response_; - // keep channel alive + uint32_t remote_node_index_{UINT32_MAX}; // serve local call std::string_view table_name_; @@ -899,6 +908,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -919,6 +929,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; kv_table_names_ = kv_table_names; @@ -933,6 +944,7 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -973,8 +985,9 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - shard_ids_.back()); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_ids_.back(), remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1080,10 +1093,16 @@ class FlushDataClosure : public ::google::protobuf::Closure, public Poolable return shard_ids_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::FlushDataRequest request_; ::EloqDS::remote::FlushDataResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1112,6 +1131,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable request_.Clear(); response_.Clear(); cntl_.Reset(); + remote_node_index_ = UINT32_MAX; ds_service_client_ = nullptr; retry_count_ = 0; is_local_request_ = false; @@ -1133,6 +1153,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1150,6 +1171,7 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1189,9 +1211,12 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1296,10 +1321,17 @@ class DeleteRangeClosure : public ::google::protobuf::Closure, public Poolable } } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::DeleteRangeRequest request_; ::EloqDS::remote::DeleteRangeResponse response_; + // remote node index in dss_nodes_ + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1330,6 +1362,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -1350,6 +1383,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1364,6 +1398,7 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1399,8 +1434,9 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - shard_ids_.back()); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_ids_.back(), remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -1506,10 +1542,16 @@ class DropTableClosure : public ::google::protobuf::Closure, public Poolable return shard_ids_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::DropTableRequest request_; ::EloqDS::remote::DropTableResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -1559,8 +1601,10 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, request_.Clear(); response_.Clear(); cntl_.Reset(); + remote_node_index_ = UINT32_MAX; parts_cnt_per_key_ = 1; parts_cnt_per_record_ = 1; + result_.Clear(); } // for writing single record @@ -1649,8 +1693,10 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, uint32_t req_shard_id = ds_service_client_->GetShardIdByPartitionId( partition_id_); - ds_service_client_->UpdateDataStoreServiceChannelByShardId( - req_shard_id); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + req_shard_id, remote_node_index_, new_node_index); + need_retry = true; } else @@ -1690,8 +1736,16 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, (*callback_)(callback_data_, this, *ds_service_client_, result_); } - void PrepareRemoteRequest() + void PrepareRequest(bool is_local_request) { + if (is_local_request) + { + is_local_request_ = true; + result_.Clear(); + remote_node_index_ = UINT32_MAX; + return; + } + // clear cntl_.Reset(); cntl_.set_timeout_ms(5000); @@ -1770,10 +1824,16 @@ class BatchWriteRecordsClosure : public ::google::protobuf::Closure, return parts_cnt_per_record_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; EloqDS::remote::BatchWriteRecordsRequest request_; EloqDS::remote::BatchWriteRecordsResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_{nullptr}; uint16_t retry_count_{0}; @@ -1814,6 +1874,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable cntl_.Reset(); request_.Clear(); response_.Clear(); + remote_node_index_ = UINT32_MAX; cntl_.Reset(); ds_service_client_ = nullptr; retry_count_ = 0; @@ -1853,6 +1914,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; table_name_ = table_name; @@ -1876,6 +1938,7 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -1935,9 +1998,12 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable cntl_.ErrorCode() != EAGAIN && cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { - ds_service_client_ - ->UpdateDataStoreServiceChannelByPartitionId( + uint32_t shard_id = + ds_service_client_->GetShardIdByPartitionId( partition_id_); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) @@ -2130,10 +2196,16 @@ class ScanNextClosure : public ::google::protobuf::Closure, public Poolable return search_conditions_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::ScanRequest request_; ::EloqDS::remote::ScanResponse response_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; @@ -2175,7 +2247,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, cntl_.Reset(); request_.Clear(); response_.Clear(); - channel_ = nullptr; + remote_node_index_ = UINT32_MAX; ds_service_client_ = nullptr; retry_count_ = 0; is_local_request_ = false; @@ -2202,6 +2274,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, { is_local_request_ = true; rpc_request_prepare_ = false; + remote_node_index_ = UINT32_MAX; retry_count_ = 0; ds_service_client_ = &store_hd; shard_ids_ = std::move(shard_ids); @@ -2218,6 +2291,7 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, { is_local_request_ = true; result_.Clear(); + remote_node_index_ = UINT32_MAX; } else { @@ -2262,15 +2336,14 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, cntl_.ErrorCode() != brpc::ERPCTIMEDOUT) { uint32_t shard_id = shard_ids_.back(); - channel_ = - ds_service_client_ - ->UpdateDataStoreServiceChannelByShardId(shard_id); + uint32_t new_node_index; + ds_service_client_->UpdateOwnerNodeIndexOfShard( + shard_id, remote_node_index_, new_node_index); // Retry if (retry_count_ < ds_service_client_->retry_limit_) { self_guard.Release(); - channel_ = nullptr; retry_count_++; ds_service_client_->CreateSnapshotForBackupInternal( this); @@ -2300,7 +2373,6 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, if (retry_count_ < ds_service_client_->retry_limit_) { self_guard.Release(); - channel_ = nullptr; response_.Clear(); cntl_.Reset(); retry_count_++; @@ -2317,16 +2389,6 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, return &cntl_; } - brpc::Channel *GetChannel() - { - return channel_.get(); - } - - void SetChannel(std::shared_ptr channel) - { - channel_ = channel; - } - const std::string_view GetBackupName() { return backup_name_; @@ -2391,11 +2453,16 @@ class CreateSnapshotForBackupClosure : public ::google::protobuf::Closure, return &response_; } + void SetRemoteNodeIndex(uint32_t remote_node_index) + { + remote_node_index_ = remote_node_index; + } + private: brpc::Controller cntl_; ::EloqDS::remote::CreateSnapshotForBackupRequest request_; ::EloqDS::remote::CreateSnapshotForBackupResponse response_; - std::shared_ptr channel_; + uint32_t remote_node_index_{UINT32_MAX}; DataStoreServiceClient *ds_service_client_; uint16_t retry_count_{0}; diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index bd836f4..43b3e04 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -242,22 +242,38 @@ bool DataStoreService::StartService(bool create_db_if_missing) auto dss_shards = cluster_manager_.GetShardsForThisNode(); assert(dss_shards.size() <= 1); + assert(shard_status_.load(std::memory_order_acquire) == + DSShardStatus::Closed); if (!dss_shards.empty()) { shard_id_ = dss_shards.at(0); - shard_status_ = cluster_manager_.FetchDSShardStatus(shard_id_); - if (shard_status_ == DSShardStatus::ReadOnly || - shard_status_ == DSShardStatus::ReadWrite) + auto open_mode = cluster_manager_.FetchDSShardStatus(shard_id_); + if (open_mode == DSShardStatus::ReadOnly || + open_mode == DSShardStatus::ReadWrite) { - data_store_ = data_store_factory_->CreateDataStore( - create_db_if_missing, shard_id_, this, true); - if (data_store_ == nullptr) + auto expect_status = DSShardStatus::Closed; + if (shard_status_.compare_exchange_strong(expect_status, + DSShardStatus::Starting)) { - LOG(ERROR) << "Failed to create data store on starting " - "DataStoreService."; - return false; + data_store_ = data_store_factory_->CreateDataStore( + create_db_if_missing, shard_id_, this, true); + if (data_store_ == nullptr) + { + LOG(ERROR) << "Failed to create data store on starting " + "DataStoreService."; + return false; + } + + if (open_mode == DSShardStatus::ReadOnly) + { + data_store_->SwitchToReadOnly(); + } + shard_status_.store(open_mode, std::memory_order_release); } } + + DLOG(INFO) << "Created data store shard id:" << shard_id_ + << ", shard_status:" << shard_status_; } server_ = std::make_unique(); @@ -287,6 +303,28 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, DSShardStatus open_mode, bool create_db_if_missing) { + if (open_mode == DSShardStatus::Closed) + { + return true; + } + assert(open_mode == DSShardStatus::ReadOnly); + + DSShardStatus expect_status = DSShardStatus::Closed; + if (!shard_status_.compare_exchange_strong(expect_status, + DSShardStatus::Starting)) + { + if (expect_status == open_mode) + { + return true; + } + while (expect_status == DSShardStatus::Starting) + { + bthread_usleep(10000); + expect_status = shard_status_.load(std::memory_order_acquire); + } + return expect_status == open_mode; + } + assert(data_store_factory_ != nullptr); if (data_store_ == nullptr) { @@ -317,11 +355,13 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, } } + data_store_->SwitchToReadOnly(); cluster_manager_.SwitchShardToReadOnly(data_shard_id, DSShardStatus::Closed); - assert(shard_status_.load(std::memory_order_acquire) == - DSShardStatus::Closed); - shard_status_.store(open_mode, std::memory_order_release); + + expect_status = DSShardStatus::Starting; + shard_status_.compare_exchange_strong( + expect_status, open_mode, std::memory_order_release); return true; } @@ -341,7 +381,9 @@ void DataStoreService::Read(::google::protobuf::RpcController *controller, return; } - if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadOnly && + shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); auto *result = response->mutable_result(); @@ -376,7 +418,9 @@ void DataStoreService::Read(const std::string_view table_name, return; } - if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::Closed) + auto shard_status = shard_status_.load(std::memory_order_acquire); + if (shard_status != DSShardStatus::ReadOnly && + shard_status != DSShardStatus::ReadWrite) { brpc::ClosureGuard done_guard(done); record->clear(); @@ -834,20 +878,12 @@ void DataStoreService::ScanNext( } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, result); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -889,21 +925,13 @@ void DataStoreService::ScanNext(::google::protobuf::RpcController *controller, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, response->mutable_result()); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - auto *result = response->mutable_result(); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + auto *result = response->mutable_result(); + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -932,21 +960,14 @@ void DataStoreService::ScanClose(::google::protobuf::RpcController *controller, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { + assert(false); brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, response->mutable_result()); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - auto *result = response->mutable_result(); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + auto *result = response->mutable_result(); + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -974,20 +995,13 @@ void DataStoreService::ScanClose(const std::string_view table_name, } auto shard_status = shard_status_.load(std::memory_order_acquire); - if (shard_status != DSShardStatus::ReadWrite) + if (shard_status != DSShardStatus::ReadWrite && + shard_status != DSShardStatus::ReadOnly) { + assert(false); brpc::ClosureGuard done_guard(done); - if (shard_status == DSShardStatus::Closed) - { - PrepareShardingError(partition_id, result); - } - else - { - assert(shard_status == DSShardStatus::ReadOnly); - result->set_error_code( - ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB); - result->set_error_msg("Write to read-only DB."); - } + result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN); + result->set_error_msg("KV store not opened yet."); return; } @@ -2206,6 +2220,8 @@ bool DataStoreService::SwitchReadOnlyToReadWrite(uint32_t shard_id) { if (!IsOwnerOfShard(shard_id)) { + DLOG(INFO) << "SwitchReadOnlyToReadWrite failed, shard " << shard_id + << " is not owner"; return false; } diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index b64a9f7..e718aa3 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -600,6 +600,13 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ongoing_write_requests_.fetch_sub(1, std::memory_order_release); } + bool IsOwnerOfShard(uint32_t shard_id) const + { + return shard_status_.load(std::memory_order_acquire) != + DSShardStatus::Closed && + shard_id_ == shard_id; + } + private: uint32_t GetShardIdByPartitionId(int32_t partition_id) { @@ -608,12 +615,6 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // return cluster_manager_.GetShardIdByPartitionId(partition_id); } - bool IsOwnerOfShard(uint32_t shard_id) - { - return shard_status_.load(std::memory_order_acquire) != - DSShardStatus::Closed && - shard_id_ == shard_id; - } DataStore *GetDataStore(uint32_t shard_id) { diff --git a/eloq_data_store_service/data_store_service_config.cpp b/eloq_data_store_service/data_store_service_config.cpp index a0a4e97..07c9164 100644 --- a/eloq_data_store_service/data_store_service_config.cpp +++ b/eloq_data_store_service/data_store_service_config.cpp @@ -865,7 +865,7 @@ bool DataStoreServiceClusterManager::SwitchShardToReadWrite( topology_.UpdateDSShardStatus(shard_id, DSShardStatus::ReadWrite); DLOG(INFO) << "SwitchToReadWrite, shard " << shard_id - << " status: " << shard_status; + << " status: " << static_cast(shard_status); return true; } @@ -919,6 +919,7 @@ void DataStoreServiceClusterManager::PrepareShardingError( { auto *new_shard = new_shards->add_shards(); new_shard->set_shard_id(shard_id); + new_shard->set_shard_version(shard.version_); for (const auto &node : shard.nodes_) { auto *new_node = new_shard->add_member_nodes(); @@ -932,12 +933,14 @@ void DataStoreServiceClusterManager::PrepareShardingError( // Same shard but primary changed - just send new primary key_sharding_changed_message->set_type( ::EloqDS::remote::KeyShardingErrorType::PrimaryNodeChanged); + auto shard_version = topology_.FetchDSShardVersion(shard_id); DSSNode primary_node = topology_.GetPrimaryNode(shard_id); auto *new_primary_node = key_sharding_changed_message->mutable_new_primary_node(); new_primary_node->set_host_name(primary_node.host_name_); new_primary_node->set_port(primary_node.port_); key_sharding_changed_message->set_shard_id(shard_id); + key_sharding_changed_message->set_shard_version(shard_version); } } diff --git a/eloq_data_store_service/data_store_service_util.h b/eloq_data_store_service/data_store_service_util.h index f157371..cd1d6d7 100644 --- a/eloq_data_store_service/data_store_service_util.h +++ b/eloq_data_store_service/data_store_service_util.h @@ -38,6 +38,9 @@ enum DSShardStatus : uint8_t ReadWrite = 2, // Node is closed Closed = 3, + // Node is opening data store. + Starting = 4 + }; // TODO(liunyl): define error code diff --git a/eloq_data_store_service/ds_request.proto b/eloq_data_store_service/ds_request.proto index 9123b84..89ce577 100644 --- a/eloq_data_store_service/ds_request.proto +++ b/eloq_data_store_service/ds_request.proto @@ -79,7 +79,8 @@ message KeyShardingChanged { KeyShardingErrorType type = 1; DSSNodeBuf new_primary_node = 2; uint32 shard_id = 3; - DSSClusterConfig new_cluster_config = 4; + uint64 shard_version = 4; + DSSClusterConfig new_cluster_config = 5; } message CommonResult { From 45f7dca2839731663fc1a8e4997b002558d69712 Mon Sep 17 00:00:00 2001 From: lzxddz Date: Tue, 14 Oct 2025 11:57:41 +0800 Subject: [PATCH 4/4] cClear request after finish ing it in datastore --- eloq_data_store_service/rocksdb_cloud_data_store.cpp | 2 ++ eloq_data_store_service/rocksdb_data_store_common.cpp | 2 ++ 2 files changed, 4 insertions(+) diff --git a/eloq_data_store_service/rocksdb_cloud_data_store.cpp b/eloq_data_store_service/rocksdb_cloud_data_store.cpp index c148999..283dafb 100644 --- a/eloq_data_store_service/rocksdb_cloud_data_store.cpp +++ b/eloq_data_store_service/rocksdb_cloud_data_store.cpp @@ -770,6 +770,8 @@ void RocksDBCloudDataStore::CreateSnapshotForBackup( req->SetFinish(::EloqDS::remote::DataStoreError::CREATE_SNAPSHOT_ERROR, "Fail to create snapshot, error: Fail to submit work to " "query worker pool"); + req->Clear(); + req->Free(); } } diff --git a/eloq_data_store_service/rocksdb_data_store_common.cpp b/eloq_data_store_service/rocksdb_data_store_common.cpp index 780f2eb..2668e71 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.cpp +++ b/eloq_data_store_service/rocksdb_data_store_common.cpp @@ -478,6 +478,8 @@ void RocksDBDataStoreCommon::BatchWriteRecords( ::EloqDS::remote::CommonResult result; result.set_error_code(::EloqDS::remote::DataStoreError::NO_ERROR); batch_write_req->SetFinish(result); + batch_write_req->Clear(); + batch_write_req->Free(); return; }