diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index 980bf80..cb5b647 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -19,8 +19,6 @@ * . * */ -#include "data_store_service_client.h" - #include #include @@ -36,7 +34,9 @@ #include #include "cc_req_misc.h" +#include "data_store_service_client.h" #include "data_store_service_client_closure.h" +#include "data_store_service_config.h" #include "data_store_service_scanner.h" #include "eloq_data_store_service/object_pool.h" // ObjectPool #include "eloq_data_store_service/thread_worker_pool.h" @@ -132,8 +132,78 @@ void DataStoreServiceClient::SetupConfig( LOG(INFO) << "UpgradeShardVersion failed, retry"; bthread_usleep(1000000); } - LOG(INFO) << "UpgradeShardVersion success, shard_id:" - << group.shard_id_ << ", version:" << group.version_; + LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, " + "shard_id:" + << group.shard_id_ << ", version:" << group.version_ + << ", owner_node:" << group.nodes_[0].host_name_ << ":" + << group.nodes_[0].port_; + } + } + else + { + LOG(INFO) + << "DataStoreServiceCliet SetupConfig skipped, current_version:" + << current_version << ", new_version:" << new_version; + } +} + +void DataStoreServiceClient::TxConfigsToDssClusterConfig( + uint32_t dss_node_id, // = 0, + uint32_t ng_id, // = 0, + const std::unordered_map> + &ng_configs, + uint32_t dss_leader_node_id, // if no leader,set uint32t_max + DataStoreServiceClusterManager &cluster_manager) +{ + assert(ng_configs.size() == 1); + auto it = ng_configs.find(ng_id); + assert(it != ng_configs.end()); + auto &ng_member_configs = it->second; + + const txservice::NodeConfig *this_node = nullptr; + const txservice::NodeConfig *leader_node = nullptr; + for (auto &node_config : ng_member_configs) + { + if (node_config.node_id_ == dss_node_id) + { + this_node = &node_config; + } + if (node_config.node_id_ == dss_leader_node_id) + { + leader_node = &node_config; + } + } + assert(this_node != nullptr); + assert(dss_leader_node_id == UNKNOWN_DSS_LEADER_NODE_ID || + leader_node != nullptr); + cluster_manager.Initialize(this_node->host_name_, + TxPort2DssPort(this_node->port_)); + + std::vector shard_nodes; + for (auto &node_config : ng_member_configs) + { + if (node_config.node_id_ != dss_node_id) + { + DSSNode dss_node(node_config.host_name_, + TxPort2DssPort(node_config.port_)); + cluster_manager.AddShardMember(ng_id, dss_node); + } + } + + if (dss_leader_node_id != dss_node_id) + { + LOG(INFO) << "cluster_manager change shard status " << ng_id << " from " + << static_cast( + cluster_manager.FetchDSShardStatus(ng_id)); + cluster_manager.SwitchShardToClosed(ng_id, DSShardStatus::ReadWrite); + LOG(INFO) << "cluster_manager change shard status " << ng_id << " to " + << static_cast( + cluster_manager.FetchDSShardStatus(ng_id)); + if (dss_leader_node_id != UNKNOWN_DSS_LEADER_NODE_ID) + { + DSSNode dss_node(leader_node->host_name_, + TxPort2DssPort(leader_node->port_)); + cluster_manager.UpdatePrimaryNode(ng_id, dss_node); } } } @@ -213,6 +283,8 @@ bool DataStoreServiceClient::PutAll( std::vector>> &flush_task) { + DLOG(INFO) << "DataStoreServiceClient::PutAll called with " + << flush_task.size() << " tables to flush."; uint64_t now = txservice::LocalCcShards::ClockTsInMillseconds(); // Process each table @@ -2868,6 +2940,31 @@ void DataStoreServiceClient::RestoreTxCache(txservice::NodeGroupId cc_ng_id, */ bool DataStoreServiceClient::OnLeaderStart(uint32_t *next_leader_node) { + DLOG(INFO) + << "DataStoreServiceClient OnLeaderStart called data_store_service_:" + << data_store_service_; + if (data_store_service_ != nullptr) + { + // Now, only support one shard. + data_store_service_->OpenDataStore(0); + } + + Connect(); + + return true; +} + +bool DataStoreServiceClient::OnLeaderStop(int64_t term) +{ + DLOG(INFO) + << "DataStoreServiceClient OnLeaderStop called data_store_service_:" + << data_store_service_; + // swith to read only in case of data store status is read write + if (data_store_service_ != nullptr) + { + // Now, only support one shard. + data_store_service_->CloseDataStore(0); + } return true; } @@ -2878,8 +2975,53 @@ bool DataStoreServiceClient::OnLeaderStart(uint32_t *next_leader_node) * following another leader and can be used to perform follower-specific * initialization. */ -void DataStoreServiceClient::OnStartFollowing() +void DataStoreServiceClient::OnStartFollowing(uint32_t leader_node_id, + int64_t term, + int64_t standby_term, + bool resubscribe) { + DLOG(INFO) + << "DataStoreServiceClient OnStartFollowing called data_store_service_:" + << data_store_service_; + if (data_store_service_ != nullptr) + { + // Now, only support one shard. + data_store_service_->CloseDataStore(0); + } + + // Treat leader_node_id as dss_leader_node_id + uint32_t dss_leader_node_id = leader_node_id; + uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup(); + + // Update leader node in cluster_manager if necessary + auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs(); + auto ng_config_it = ng_configs.find(dss_shard_id); + assert(ng_config_it != ng_configs.end()); + auto ng_member_configs = ng_config_it->second; + const txservice::NodeConfig *dss_leader_node_config = nullptr; + for (const auto &node_config : ng_member_configs) + { + if (node_config.node_id_ == dss_leader_node_id) + { + dss_leader_node_config = &node_config; + break; + } + } + assert(dss_leader_node_config != nullptr); + DSSNode dss_leader_node(dss_leader_node_config->host_name_, + TxPort2DssPort(dss_leader_node_config->port_)); + auto &cluster_manager = data_store_service_->GetClusterManager(); + cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node); + DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id + << ", DSSNode: " << dss_leader_node.host_name_ << ":" + << dss_leader_node.port_; + // Pump the dss shard version + cluster_manager.UpdateDSShardVersion( + dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1); + // Update the client config + SetupConfig(cluster_manager); + + Connect(); } /** @@ -4610,4 +4752,4 @@ void DataStoreServiceClient::PrepareRangePartitionBatches( } } -} // namespace EloqDS \ No newline at end of file +} // namespace EloqD \ No newline at end of file diff --git a/data_store_service_client.h b/data_store_service_client.h index cbc13d5..33c7501 100644 --- a/data_store_service_client.h +++ b/data_store_service_client.h @@ -126,6 +126,19 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler */ void SetupConfig(const DataStoreServiceClusterManager &config); + static uint16_t TxPort2DssPort(uint16_t tx_port) + { + return tx_port + 7; + } + + static void TxConfigsToDssClusterConfig( + uint32_t dss_node_id, // = 0, + uint32_t ng_id, // = 0, + const std::unordered_map> + &ng_configs, + uint32_t dss_leader_node_id, // if no leader,set uint32t_max + DataStoreServiceClusterManager &cluster_manager); + void ConnectToLocalDataStoreService( std::unique_ptr ds_serv); @@ -377,7 +390,12 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler bool OnLeaderStart(uint32_t *next_leader_node) override; - void OnStartFollowing() override; + bool OnLeaderStop(int64_t term) override; + + void OnStartFollowing(uint32_t leader_node_id, + int64_t term, + int64_t standby_term, + bool resubscribe) override; void OnShutdown() override; diff --git a/eloq_data_store_service/data_store_service.cpp b/eloq_data_store_service/data_store_service.cpp index ab1209d..d3056e8 100644 --- a/eloq_data_store_service/data_store_service.cpp +++ b/eloq_data_store_service/data_store_service.cpp @@ -237,7 +237,9 @@ DataStoreService::~DataStoreService() } } -bool DataStoreService::StartService(bool create_db_if_missing) +bool DataStoreService::StartService(bool create_db_if_missing, + uint32_t dss_leader_node_id, + uint32_t dss_node_id) { if (server_ != nullptr) { @@ -252,6 +254,11 @@ bool DataStoreService::StartService(bool create_db_if_missing) { shard_id_ = dss_shards.at(0); auto open_mode = cluster_manager_.FetchDSShardStatus(shard_id_); + DLOG(INFO) << "StartService data store shard id:" << shard_id_ + << ", open_mode:" << static_cast(open_mode) + << ", create_db_if_missing:" << create_db_if_missing + << ", dss_leader_node_id:" << dss_leader_node_id + << ", dss_node_id:" << dss_node_id; if (open_mode == DSShardStatus::ReadOnly || open_mode == DSShardStatus::ReadWrite) { @@ -259,8 +266,13 @@ bool DataStoreService::StartService(bool create_db_if_missing) if (shard_status_.compare_exchange_strong(expect_status, DSShardStatus::Starting)) { + // start underling db if this dss node is the + // leader dss node data_store_ = data_store_factory_->CreateDataStore( - create_db_if_missing, shard_id_, this, true); + create_db_if_missing, + shard_id_, + this, + dss_leader_node_id == dss_node_id); if (data_store_ == nullptr) { LOG(ERROR) << "Failed to create data store on starting " @@ -277,7 +289,7 @@ bool DataStoreService::StartService(bool create_db_if_missing) } DLOG(INFO) << "Created data store shard id:" << shard_id_ - << ", shard_status:" << shard_status_; + << ", shard_status:" << static_cast(shard_status_); } server_ = std::make_unique(); @@ -311,7 +323,7 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, { return true; } - assert(open_mode == DSShardStatus::ReadOnly); + // assert(open_mode == DSShardStatus::ReadOnly); DSShardStatus expect_status = DSShardStatus::Closed; if (!shard_status_.compare_exchange_strong(expect_status, @@ -329,6 +341,11 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, return expect_status == open_mode; } + DLOG(INFO) << "Connecting and starting data store for shard id:" + << data_shard_id + << ", open_mode:" << static_cast(open_mode) + << ", create_db_if_missing:" << create_db_if_missing + << ", data_store_ is null:" << (data_store_ == nullptr); assert(data_store_factory_ != nullptr); if (data_store_ == nullptr) { @@ -358,10 +375,18 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, return false; } } - - data_store_->SwitchToReadOnly(); - cluster_manager_.SwitchShardToReadOnly(data_shard_id, - DSShardStatus::Closed); + if (open_mode == DSShardStatus::ReadOnly) + { + data_store_->SwitchToReadOnly(); + cluster_manager_.SwitchShardToReadOnly(data_shard_id, + DSShardStatus::Closed); + } + else + { + assert(open_mode == DSShardStatus::ReadWrite); + cluster_manager_.SwitchShardToReadWrite(data_shard_id, + DSShardStatus::Closed); + } expect_status = DSShardStatus::Starting; shard_status_.compare_exchange_strong( @@ -1786,6 +1811,48 @@ bool DataStoreService::FetchConfigFromPeer( return true; } +void DataStoreService::CloseDataStore(uint32_t shard_id) +{ + assert(shard_id == shard_id_); + if (!IsOwnerOfShard(shard_id)) + { + return; + } + if (shard_status_.load() == DSShardStatus::ReadWrite) + { + SwitchReadWriteToReadOnly(shard_id); + } + + if (shard_status_.load() == DSShardStatus::ReadOnly) + { + SwitchReadOnlyToClosed(shard_id); + } +} + +void DataStoreService::OpenDataStore(uint32_t shard_id) +{ + // no-op if this DSS does not own any shard + if (shard_id_ == UINT32_MAX) + { + DLOG(INFO) << "OpenDataStore no-op for non-owner DSS" + << ", shard " << shard_id + << ", shard_id_: " << shard_id_; + return; + } + + assert(shard_id == shard_id_); + + DLOG(INFO) << "OpenDataStore for shard " << shard_id + << ", current status: " << static_cast(shard_status_.load()); + if (shard_status_.load() != DSShardStatus::Closed) + { + return; + } + DSShardStatus open_mode = DSShardStatus::ReadWrite; + bool create_db_if_missing = false; + ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing); +} + std::pair DataStoreService::NewMigrateTask(const std::string &event_id, int data_shard_id, @@ -2459,4 +2526,4 @@ void DataStoreService::CleanupOldMigrateLogs() } } -} // namespace EloqDS +} // namespace EloqDS \ No newline at end of file diff --git a/eloq_data_store_service/data_store_service.h b/eloq_data_store_service/data_store_service.h index 5c9742b..35bd337 100644 --- a/eloq_data_store_service/data_store_service.h +++ b/eloq_data_store_service/data_store_service.h @@ -187,7 +187,9 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService ~DataStoreService(); - bool StartService(bool create_db_if_missing); + bool StartService(bool create_db_if_missing, + uint32_t dss_leader_node_id, + uint32_t dss_node_id); brpc::Server *GetBrpcServer() { @@ -441,10 +443,11 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService * @param response Create snapshot for backup response * @param done Callback function */ - void CreateSnapshotForBackup(::google::protobuf::RpcController *controller, - const ::EloqDS::remote::CreateSnapshotForBackupRequest *request, - ::EloqDS::remote::CreateSnapshotForBackupResponse *response, - ::google::protobuf::Closure *done) override; + void CreateSnapshotForBackup( + ::google::protobuf::RpcController *controller, + const ::EloqDS::remote::CreateSnapshotForBackupRequest *request, + ::EloqDS::remote::CreateSnapshotForBackupResponse *response, + ::google::protobuf::Closure *done) override; /** * @brief Create snapshot for backup operation @@ -453,13 +456,12 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService * @param backup_ts Backup timestamp * @param done Callback function */ - void CreateSnapshotForBackup( - uint32_t shard_id, - std::string_view backup_name, - uint64_t backup_ts, - std::vector *backup_files, - remote::CommonResult *result, - ::google::protobuf::Closure *done); + void CreateSnapshotForBackup(uint32_t shard_id, + std::string_view backup_name, + uint64_t backup_ts, + std::vector *backup_files, + remote::CommonResult *result, + ::google::protobuf::Closure *done); /** * @brief Append the key string of this node to the specified string stream. @@ -603,11 +605,22 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService bool IsOwnerOfShard(uint32_t shard_id) const { + DLOG(INFO) << "IsOwnerOfShard check: shard_status=" + << static_cast(shard_status_.load()) << ", shard_id_=" + << shard_id_ << ", check_shard_id=" << shard_id; return shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed && shard_id_ == shard_id; } + void CloseDataStore(uint32_t shard_id); + void OpenDataStore(uint32_t shard_id); + + DataStoreServiceClusterManager &GetClusterManager() + { + return cluster_manager_; + } + private: uint32_t GetShardIdByPartitionId(int32_t partition_id) { @@ -616,7 +629,6 @@ class DataStoreService : EloqDS::remote::DataStoreRpcService // return cluster_manager_.GetShardIdByPartitionId(partition_id); } - DataStore *GetDataStore(uint32_t shard_id) { if (shard_id_ == shard_id) diff --git a/eloq_data_store_service/data_store_service_config.cpp b/eloq_data_store_service/data_store_service_config.cpp index 07c9164..d87e7ad 100644 --- a/eloq_data_store_service/data_store_service_config.cpp +++ b/eloq_data_store_service/data_store_service_config.cpp @@ -436,6 +436,10 @@ bool DataStoreServiceClusterManager::Load(const std::string &filename) bool DataStoreServiceClusterManager::Save(const std::string &filename) const { std::shared_lock lk(mutex_); + if (filename.empty()) + { + return true; + } return SaveInternal(filename); } @@ -821,7 +825,7 @@ bool DataStoreServiceClusterManager::SwitchShardToClosed(uint32_t shard_id, std::unique_lock lk(mutex_); auto shard_status = topology_.FetchDSShardStatus(shard_id); - assert(expected == DSShardStatus::ReadOnly); + // assert(expected == DSShardStatus::ReadOnly); if (shard_status != expected) { return false; @@ -834,12 +838,12 @@ bool DataStoreServiceClusterManager::SwitchShardToClosed(uint32_t shard_id, return true; } - if (shard_status != DSShardStatus::ReadOnly) - { - DLOG(ERROR) << "Shard " << shard_id << " is not in read only mode" - << ", status: " << shard_status; - return false; - } + // if (shard_status != DSShardStatus::ReadOnly) + // { + // DLOG(ERROR) << "Shard " << shard_id << " is not in read only mode" + // << ", status: " << static_cast(shard_status); + // return false; + // } topology_.UpdateDSShardStatus(shard_id, DSShardStatus::Closed); DLOG(INFO) << "SwitchToClosed, shard " << shard_id @@ -901,7 +905,6 @@ void DataStoreServiceClusterManager::PrepareShardingError( uint32_t shard_id, ::EloqDS::remote::CommonResult *result) { std::shared_lock lk(mutex_); - result->set_error_code( ::EloqDS::remote::DataStoreError::REQUESTED_NODE_NOT_OWNER); DLOG(INFO) << "=====PrepareShardingError"; diff --git a/eloq_data_store_service/data_store_service_config.h b/eloq_data_store_service/data_store_service_config.h index ae6b8aa..9cb696b 100644 --- a/eloq_data_store_service/data_store_service_config.h +++ b/eloq_data_store_service/data_store_service_config.h @@ -311,6 +311,9 @@ class Topology uint64_t topology_version_{1}; }; +// The DSS leader node ID is unknown if euqal to this value +static const uint32_t UNKNOWN_DSS_LEADER_NODE_ID = UINT32_MAX; + class DataStoreServiceClusterManager { public: diff --git a/eloq_data_store_service/main.cpp b/eloq_data_store_service/main.cpp index 0799ab8..fa222ca 100644 --- a/eloq_data_store_service/main.cpp +++ b/eloq_data_store_service/main.cpp @@ -67,9 +67,7 @@ DEFINE_string(eloq_dss_peer_node, "Data store peer node address. Used to get cluster topology if " "data_store_config_file is not provided."); -DEFINE_string(eloq_dss_branch_name, - "development", - "Data store branch name."); +DEFINE_string(eloq_dss_branch_name, "development", "Data store branch name."); DEFINE_string(ip, "127.0.0.1", "Server IP"); DEFINE_int32(port, 9100, "Server Port"); @@ -320,8 +318,8 @@ int main(int argc, char *argv[]) std::move(ds_factory)); // setup local data store service - bool ret = - data_store_service_->StartService(FLAGS_bootstrap || is_single_node); + bool ret = data_store_service_->StartService( + FLAGS_bootstrap || is_single_node, 0, 0); if (!ret) { LOG(ERROR) << "Failed to start data store service"; @@ -342,4 +340,4 @@ int main(int argc, char *argv[]) ShutDown(); return 0; -} +} \ No newline at end of file diff --git a/eloq_data_store_service/rocksdb_data_store_common.cpp b/eloq_data_store_service/rocksdb_data_store_common.cpp index 20d94f3..1e517bb 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.cpp +++ b/eloq_data_store_service/rocksdb_data_store_common.cpp @@ -271,6 +271,12 @@ bool RocksDBDataStoreCommon::Initialize() } } + if (query_worker_pool_ == nullptr) + { + query_worker_pool_ = + std::make_unique(query_worker_number_); + } + return true; } diff --git a/eloq_data_store_service/rocksdb_data_store_common.h b/eloq_data_store_service/rocksdb_data_store_common.h index 54959a3..f2663cb 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.h +++ b/eloq_data_store_service/rocksdb_data_store_common.h @@ -150,11 +150,10 @@ class RocksDBDataStoreCommon : public DataStore received_snapshot_path_(storage_path_ + "/received_snapshot/"), create_db_if_missing_(create_if_missing), tx_enable_cache_replacement_(tx_enable_cache_replacement), - ttl_compaction_filter_(nullptr) + ttl_compaction_filter_(nullptr), + query_worker_number_(config.query_worker_num_) { info_log_level_ = StringToInfoLogLevel(config.info_log_level_); - query_worker_pool_ = - std::make_unique(config.query_worker_num_); } /** @@ -321,6 +320,7 @@ class RocksDBDataStoreCommon : public DataStore bool tx_enable_cache_replacement_{true}; std::unique_ptr ttl_compaction_filter_{ nullptr}; + size_t query_worker_number_{4}; std::unique_ptr query_worker_pool_; std::shared_mutex db_mux_; diff --git a/rocksdb_handler.cpp b/rocksdb_handler.cpp index 01fa9f0..d403f49 100644 --- a/rocksdb_handler.cpp +++ b/rocksdb_handler.cpp @@ -20,8 +20,6 @@ * */ -#include "rocksdb_handler.h" - #include #include #include @@ -72,6 +70,7 @@ #include "redis_zset_object.h" #include "rocksdb/iostats_context.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb_handler.h" #include "store_util.h" #include "tx_key.h" #include "tx_record.h" @@ -2153,7 +2152,10 @@ uint16_t RocksDBHandler::DecodeBucketIdFromKvKey(const char *data, size_t size) return EloqShare::big_endian_to_host(be_bucket_id); } -void RocksDBHandler::OnStartFollowing() +void RocksDBHandler::OnStartFollowing(uint32_t leader_node_id, + int64_t term, + int64_t standby_term, + bool resubscribe) { // shutdown previous opened db bthread::Mutex mux; @@ -3651,4 +3653,4 @@ bool RocksDBHandlerImpl::SendFileToRemoteNode(const std::string &snapshot_path, } #endif // ROCKSDB_CLOUD_FS -} // namespace EloqKV +} // namespace EloqKV \ No newline at end of file diff --git a/rocksdb_handler.h b/rocksdb_handler.h index 903ec18..55529e8 100644 --- a/rocksdb_handler.h +++ b/rocksdb_handler.h @@ -530,7 +530,10 @@ class RocksDBHandler : public txservice::store::DataStoreHandler bool OnLeaderStart(uint32_t *next_leader_node) override; - void OnStartFollowing() override; + void OnStartFollowing(uint32_t leader_node_id, + int64_t term, + int64_t standby_term, + bool resubscribe) override; void OnShutdown() override; @@ -754,4 +757,4 @@ class RocksDBHandlerImpl : public RocksDBHandler #endif // ROCKSDB_CLOUD_FS -} // namespace EloqKV +} // namespace EloqKV \ No newline at end of file