From cc67cce418f76e4259dfc7e7151c12d8ad54f207 Mon Sep 17 00:00:00 2001 From: lzxddz Date: Fri, 7 Nov 2025 15:32:05 +0800 Subject: [PATCH 1/2] DataStoreService and DataStoreServiceClient support multi shard Update bucket map to key-slot for total_bucket_count changed to 1024 Remove compile option USE_ONE_ELOQDSS_PARTITION_ENABLED Only use one node group to start logservice and txservice on bootstrap Update ClusterKeySlot command Dss client add arg bind_data_shard_with_ng Update test case expire.tcl --- CMakeLists.txt | 6 - concourse/scripts/build_tarball.bash | 2 +- concourse/scripts/common.sh | 4 +- include/redis_service.h | 2 +- src/redis_command.cpp | 3 +- src/redis_service.cpp | 202 +++++++++++++++++++-------- tests/unit/mono/expire.tcl | 5 +- 7 files changed, 151 insertions(+), 73 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ffe4105a..7b70edbf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,12 +56,6 @@ set(WITH_DATA_STORE "ROCKSDB" CACHE STRING "The KV data store for EloqKV") set_property(CACHE WITH_DATA_STORE PROPERTY STRINGS "DYNAMODB" "ROCKSDB" "ROCKSDB_CLOUD_S3" "ROCKSDB_CLOUD_GCS" "ELOQDSS_ROCKSDB_CLOUD_S3" "ELOQDSS_ROCKSDB_CLOUD_GCS" "ELOQDSS_ROCKSDB" "ELOQDSS_ELOQSTORE") message(NOTICE "With DATA_STORE: ${WITH_DATA_STORE}") -option(USE_ONE_ELOQDSS_PARTITION_ENABLED "Whether use one partition for kv store" OFF) -message(NOTICE "USE_ONE_ELOQDSS_PARTITION_ENABLED : ${USE_ONE_ELOQDSS_PARTITION_ENABLED}") - -if(USE_ONE_ELOQDSS_PARTITION_ENABLED) - add_compile_definitions(USE_ONE_ELOQDSS_PARTITION) -endif() # Add compile flags for KV stores if(WITH_DATA_STORE STREQUAL "DYNAMODB") diff --git a/concourse/scripts/build_tarball.bash b/concourse/scripts/build_tarball.bash index a15155df..1828b20a 100755 --- a/concourse/scripts/build_tarball.bash +++ b/concourse/scripts/build_tarball.bash @@ -192,7 +192,7 @@ if [ -n "${DSS_TYPE}" ]; then DSS_SRC_DIR="${ELOQKV_SRC}/store_handler/eloq_data_store_service" cd "${DSS_SRC_DIR}" mkdir -p build && cd build - cmake .. -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DWITH_DATA_STORE=${DSS_TYPE} -DUSE_ONE_ELOQDSS_PARTITION_ENABLED=OFF + cmake .. -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DWITH_DATA_STORE=${DSS_TYPE} cmake --build . --config ${BUILD_TYPE} -j${NCORE} copy_libraries dss_server ${DEST_DIR}/lib mv dss_server ${DEST_DIR}/bin/ diff --git a/concourse/scripts/common.sh b/concourse/scripts/common.sh index fe37e961..186fb283 100644 --- a/concourse/scripts/common.sh +++ b/concourse/scripts/common.sh @@ -151,8 +151,7 @@ function run_build() { -DCMAKE_BUILD_TYPE=$build_type \ -DWITH_DATA_STORE=$kv_store_type \ -DBUILD_WITH_TESTS=ON \ - -DWITH_LOG_SERVICE=ON \ - -DUSE_ONE_ELOQDSS_PARTITION_ENABLED=ON + -DWITH_LOG_SERVICE=ON # Define the output log file log_file="/tmp/compile_info.log" @@ -214,7 +213,6 @@ function run_build_ent() { -DWITH_DATA_STORE=$kv_store_type \ -DBUILD_WITH_TESTS=ON \ -DWITH_LOG_SERVICE=ON \ - -DUSE_ONE_ELOQDSS_PARTITION_ENABLED=ON \ -DOPEN_LOG_SERVICE=OFF \ -DFORK_HM_PROCESS=ON diff --git a/include/redis_service.h b/include/redis_service.h index 801740a3..2897baa8 100644 --- a/include/redis_service.h +++ b/include/redis_service.h @@ -239,7 +239,7 @@ class RedisServiceImpl : public brpc::RedisService // For ClusterSlots command. Results will be returned through arg 'info'. void RedisClusterSlots(std::vector &info); - std::string GenerateMovedErrorMessage(uint16_t slot_num); + std::string GenerateMovedErrorMessage(uint16_t slot_id); std::unique_ptr NewConnectionContext( brpc::Socket *socket) const override; diff --git a/src/redis_command.cpp b/src/redis_command.cpp index 9700cd29..5fb211bf 100644 --- a/src/redis_command.cpp +++ b/src/redis_command.cpp @@ -1676,8 +1676,7 @@ void ClusterSlotsCommand::Execute(RedisServiceImpl *redis_impl, void ClusterKeySlotCommand::Execute(RedisServiceImpl *redis_impl, RedisConnectionContext *ctx) { - int32_t slot_id = key_.Hash() & 0x3fff; - result_ = slot_id % total_range_buckets; + result_ = key_.Hash() & 0x3fff; } void FailoverCommand::Execute(RedisServiceImpl *redis_impl, diff --git a/src/redis_service.cpp b/src/redis_service.cpp index 4cbb0f20..7ff927ac 100644 --- a/src/redis_service.cpp +++ b/src/redis_service.cpp @@ -838,21 +838,32 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) } } - if (!txservice::ReadClusterConfigFile( - cluster_config_file_path, ng_configs, cluster_config_version)) - { - // Read cluster topology from general config file in this case - auto parse_res = txservice::ParseNgConfig(ip_port_list, - standby_ip_port_list, - voter_ip_port_list, - ng_configs, - tx_ng_replica_num, - 10000); - if (!parse_res) - { - LOG(ERROR) - << "Failed to extract cluster configs from ip_port_list."; - return false; + if (FLAGS_bootstrap) + { + // For bootstrap, only use current node to start txservie and + // logservice. + std::vector soloConfig; + soloConfig.emplace_back(0, local_ip, local_tx_port, true); + ng_configs.try_emplace(0, std::move(soloConfig)); + } + else + { + if (!txservice::ReadClusterConfigFile( + cluster_config_file_path, ng_configs, cluster_config_version)) + { + // Read cluster topology from general config file in this case + auto parse_res = txservice::ParseNgConfig(ip_port_list, + standby_ip_port_list, + voter_ip_port_list, + ng_configs, + tx_ng_replica_num, + 10000); + if (!parse_res) + { + LOG(ERROR) + << "Failed to extract cluster configs from ip_port_list."; + return false; + } } } @@ -1056,21 +1067,9 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) std::string dss_config_file_path = ""; EloqDS::DataStoreServiceClusterManager ds_config; - uint32_t dss_leader_id = EloqDS::UNKNOWN_DSS_LEADER_NODE_ID; - - // use tx node id as the dss node id - // since they are deployed together - uint32_t dss_node_id = node_id; - if (FLAGS_bootstrap || is_single_node) - { - dss_leader_id = node_id; - } if (!eloq_dss_peer_node.empty()) { - ds_config.SetThisNode( - local_ip, - EloqDS::DataStoreServiceClient::TxPort2DssPort(local_tx_port)); // Fetch ds topology from peer node if (!EloqDS::DataStoreService::FetchConfigFromPeer( eloq_dss_peer_node, ds_config)) @@ -1079,22 +1078,85 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) << eloq_dss_peer_node; return false; } + ds_config.SetThisNode( + local_ip, + EloqDS::DataStoreServiceClient::TxPort2DssPort(local_tx_port)); } else { - if (ng_configs.size() > 1) + if (FLAGS_bootstrap) { - LOG(ERROR) << "EloqDS multi-node cluster must specify " - "eloq_dss_peer_node."; - return false; - } + // For bootstrap mode, we need to fetch all node groups to init + // data store shards. + std::unordered_map> + tmp_ng_configs; + uint64_t tmp_cluster_version = 2; + if (!txservice::ReadClusterConfigFile(cluster_config_file_path, + tmp_ng_configs, + tmp_cluster_version)) + { + auto parse_res = + txservice::ParseNgConfig(ip_port_list, + standby_ip_port_list, + voter_ip_port_list, + tmp_ng_configs, + tx_ng_replica_num, + 10000); + if (!parse_res) + { + LOG(ERROR) << "Failed to extract cluster configs from " + "ip_port_list."; + return false; + } + } + + bool found = false; + uint32_t tmp_node_id = 0; + for (auto &pair : tmp_ng_configs) + { + auto &ng_nodes = pair.second; + for (size_t i = 0; i < ng_nodes.size(); i++) + { + if (ng_nodes[i].host_name_ == local_ip && + ng_nodes[i].port_ == local_tx_port) + { + tmp_node_id = ng_nodes[i].node_id_; + found = true; + break; + } + } + if (found) + { + break; + } + } + if (!found) + { + LOG(ERROR) << "Failed to find this node in cluster config."; + return false; + } - EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig( - dss_node_id, - native_ng_id, - ng_configs, - dss_leader_id, - ds_config); + std::unordered_map ng_leaders; + for (const auto &[ng_id, ng_config] : tmp_ng_configs) + { + ng_leaders.emplace(ng_id, tmp_node_id); + } + EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig( + tmp_node_id, tmp_ng_configs, ng_leaders, ds_config); + } + else + { + std::unordered_map ng_leaders; + if (is_single_node) + { + for (const auto &[ng_id, ng_config] : ng_configs) + { + ng_leaders.emplace(ng_id, node_id); + } + } + EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig( + node_id, ng_configs, ng_leaders, ds_config); + } } // Set file cache sync interval @@ -1137,11 +1199,10 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) // we always set create_if_missing to true // since non conflicts will happen under // multi-node deployment. - ret = - data_store_service_->StartService(true, dss_leader_id, dss_node_id); + ret = data_store_service_->StartService(true); #else - ret = data_store_service_->StartService( - (FLAGS_bootstrap || is_single_node), dss_leader_id, dss_node_id); + ret = data_store_service_->StartService(FLAGS_bootstrap || + is_single_node); #endif if (!ret) @@ -1151,7 +1212,15 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) } // setup data store service client store_hd_ = std::make_unique( - catalog_factory, ds_config, data_store_service_.get()); +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB) + true, +#else + FLAGS_bootstrap || is_single_node, +#endif + catalog_factory, + ds_config, + eloq_dss_peer_node.empty(), + data_store_service_.get()); #endif @@ -2231,21 +2300,36 @@ void RedisServiceImpl::GetNodeSlotsInfo( { continue; } - std::sort(bucket_ids.begin(), bucket_ids.end()); - nodes_slots.try_emplace(node_id); - std::vector &slots_vec = nodes_slots.at(node_id); + // map buckets to slots + assert(txservice::total_range_buckets == 1024); + std::vector slot_ids; + // slots count is 16 times of bucket count + slot_ids.reserve(bucket_ids.size() * 16); + for (uint16_t bucket_id : bucket_ids) + { + for (int i = 0; i < 16; i++) + { + slot_ids.push_back(bucket_id + i * 1024); + } + } + + std::sort(slot_ids.begin(), slot_ids.end()); + + auto res = nodes_slots.try_emplace(node_id); + std::vector &slots_vec = res.first->second; SlotPair *slot_pair = &(slots_vec.emplace_back()); - slot_pair->start_slot_range = bucket_ids.front(); + slot_pair->start_slot_range = slot_ids.front(); + slot_pair->end_slot_range = slot_ids.front(); - for (size_t idx = 1; idx < bucket_ids.size(); idx++) + for (size_t idx = 1; idx < slot_ids.size(); idx++) { - if (bucket_ids[idx - 1] + 1 != bucket_ids[idx]) + if (slot_ids[idx - 1] + 1 != slot_ids[idx]) { slot_pair = &(slots_vec.emplace_back()); - slot_pair->start_slot_range = bucket_ids[idx]; + slot_pair->start_slot_range = slot_ids[idx]; } - slot_pair->end_slot_range = bucket_ids[idx]; + slot_pair->end_slot_range = slot_ids[idx]; } } } @@ -2433,17 +2517,17 @@ void RedisServiceImpl::RedisClusterSlots(std::vector &info) } } -std::string RedisServiceImpl::GenerateMovedErrorMessage(uint16_t slot_num) +std::string RedisServiceImpl::GenerateMovedErrorMessage(uint16_t slot_id) { std::vector slot_infos; RedisClusterSlots(slot_infos); std::string error_msg("MOVED "); - error_msg.append(std::to_string(slot_num)); + error_msg.append(std::to_string(slot_id)); for (auto &slot_info : slot_infos) { - if (slot_info.start_slot_range <= slot_num && - slot_info.end_slot_range >= slot_num) + if (slot_info.start_slot_range <= slot_id && + slot_info.end_slot_range >= slot_id) { error_msg.append(" "); error_msg.append(slot_info.hosts.front().ip); @@ -2520,11 +2604,11 @@ bool RedisServiceImpl::SendTxRequestAndWaitResult( dynamic_cast(tx_req)) { auto key = object_tx_req->Key(); - uint16_t slot_num = key->Hash() & 0x3fff; + uint16_t slot_id = key->Hash() & 0x3fff; if (error != nullptr) { - std::string error_msg = GenerateMovedErrorMessage(slot_num); + std::string error_msg = GenerateMovedErrorMessage(slot_id); error->OnError(error_msg); } return false; @@ -2539,11 +2623,11 @@ bool RedisServiceImpl::SendTxRequestAndWaitResult( multi_obj_cmd->KeyPointers(0); assert(!keys->empty()); // first key slot - uint16_t slot_num = keys->at(0).Hash() & 0x3fff; + uint16_t slot_id = keys->at(0).Hash() & 0x3fff; if (error != nullptr) { - std::string error_msg = GenerateMovedErrorMessage(slot_num); + std::string error_msg = GenerateMovedErrorMessage(slot_id); error->OnError(error_msg); } diff --git a/tests/unit/mono/expire.tcl b/tests/unit/mono/expire.tcl index 92e06903..f1b5c70b 100644 --- a/tests/unit/mono/expire.tcl +++ b/tests/unit/mono/expire.tcl @@ -153,7 +153,10 @@ start_server {tags {"expire"}} { r del c r setex c 1 somevalue set ttl [r pttl c] - assert {$ttl > 500 && $ttl <= 1000} + assert {$ttl > 500 && $ttl <= 1100} + # Why ttl < 1100 ? Because, SETEX calculate the expired timestamp on the node that client connected + # and PTTL calculate the expired timestamp on the node that key located. + # The clock on the two nodes is not same. (TODO: the two command should calculate on same node) } test {TTL / PTTL / EXPIRETIME / PEXPIRETIME return -1 if key has no expire} { From e89f616135a8fb03768b29dcab22f69e93fed29a Mon Sep 17 00:00:00 2001 From: lzxddz Date: Fri, 21 Nov 2025 11:26:29 +0800 Subject: [PATCH 2/2] update submodule --- store_handler | 2 +- tx_service | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/store_handler b/store_handler index 9e6949e9..9fc13524 160000 --- a/store_handler +++ b/store_handler @@ -1 +1 @@ -Subproject commit 9e6949e9400e95dc389043a91fa5b5799389f0ff +Subproject commit 9fc13524deb51390850482456d2a0b524a1e83ba diff --git a/tx_service b/tx_service index 0965ea45..7e38a11f 160000 --- a/tx_service +++ b/tx_service @@ -1 +1 @@ -Subproject commit 0965ea4556d742f5530db7e6a8ba179c809c02ae +Subproject commit 7e38a11f4f8dafe6e6c38fa7f10c83c48412699c