Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@ set(WITH_DATA_STORE "ROCKSDB" CACHE STRING "The KV data store for EloqKV")
set_property(CACHE WITH_DATA_STORE PROPERTY STRINGS "DYNAMODB" "ROCKSDB" "ROCKSDB_CLOUD_S3" "ROCKSDB_CLOUD_GCS" "ELOQDSS_ROCKSDB_CLOUD_S3" "ELOQDSS_ROCKSDB_CLOUD_GCS" "ELOQDSS_ROCKSDB" "ELOQDSS_ELOQSTORE")
message(NOTICE "With DATA_STORE: ${WITH_DATA_STORE}")

option(USE_ONE_ELOQDSS_PARTITION_ENABLED "Whether use one partition for kv store" OFF)
message(NOTICE "USE_ONE_ELOQDSS_PARTITION_ENABLED : ${USE_ONE_ELOQDSS_PARTITION_ENABLED}")

if(USE_ONE_ELOQDSS_PARTITION_ENABLED)
add_compile_definitions(USE_ONE_ELOQDSS_PARTITION)
endif()

# Add compile flags for KV stores
if(WITH_DATA_STORE STREQUAL "DYNAMODB")
Expand Down
2 changes: 1 addition & 1 deletion concourse/scripts/build_tarball.bash
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ if [ -n "${DSS_TYPE}" ]; then
DSS_SRC_DIR="${ELOQKV_SRC}/store_handler/eloq_data_store_service"
cd "${DSS_SRC_DIR}"
mkdir -p build && cd build
cmake .. -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DWITH_DATA_STORE=${DSS_TYPE} -DUSE_ONE_ELOQDSS_PARTITION_ENABLED=OFF
cmake .. -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DWITH_DATA_STORE=${DSS_TYPE}
cmake --build . --config ${BUILD_TYPE} -j${NCORE}
copy_libraries dss_server ${DEST_DIR}/lib
mv dss_server ${DEST_DIR}/bin/
Expand Down
4 changes: 1 addition & 3 deletions concourse/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ function run_build() {
-DCMAKE_BUILD_TYPE=$build_type \
-DWITH_DATA_STORE=$kv_store_type \
-DBUILD_WITH_TESTS=ON \
-DWITH_LOG_SERVICE=ON \
-DUSE_ONE_ELOQDSS_PARTITION_ENABLED=ON
-DWITH_LOG_SERVICE=ON

# Define the output log file
log_file="/tmp/compile_info.log"
Expand Down Expand Up @@ -214,7 +213,6 @@ function run_build_ent() {
-DWITH_DATA_STORE=$kv_store_type \
-DBUILD_WITH_TESTS=ON \
-DWITH_LOG_SERVICE=ON \
-DUSE_ONE_ELOQDSS_PARTITION_ENABLED=ON \
-DOPEN_LOG_SERVICE=OFF \
-DFORK_HM_PROCESS=ON

Expand Down
2 changes: 1 addition & 1 deletion include/redis_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class RedisServiceImpl : public brpc::RedisService
// For ClusterSlots command. Results will be returned through arg 'info'.
void RedisClusterSlots(std::vector<SlotInfo> &info);

std::string GenerateMovedErrorMessage(uint16_t slot_num);
std::string GenerateMovedErrorMessage(uint16_t slot_id);

std::unique_ptr<brpc::ConnectionContext> NewConnectionContext(
brpc::Socket *socket) const override;
Expand Down
3 changes: 1 addition & 2 deletions src/redis_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1676,8 +1676,7 @@ void ClusterSlotsCommand::Execute(RedisServiceImpl *redis_impl,
void ClusterKeySlotCommand::Execute(RedisServiceImpl *redis_impl,
RedisConnectionContext *ctx)
{
int32_t slot_id = key_.Hash() & 0x3fff;
result_ = slot_id % total_range_buckets;
result_ = key_.Hash() & 0x3fff;
}

void FailoverCommand::Execute(RedisServiceImpl *redis_impl,
Expand Down
202 changes: 143 additions & 59 deletions src/redis_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -838,21 +838,32 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server)
}
}

if (!txservice::ReadClusterConfigFile(
cluster_config_file_path, ng_configs, cluster_config_version))
{
// Read cluster topology from general config file in this case
auto parse_res = txservice::ParseNgConfig(ip_port_list,
standby_ip_port_list,
voter_ip_port_list,
ng_configs,
tx_ng_replica_num,
10000);
if (!parse_res)
{
LOG(ERROR)
<< "Failed to extract cluster configs from ip_port_list.";
return false;
if (FLAGS_bootstrap)
{
// For bootstrap, only use current node to start txservie and
// logservice.
std::vector<txservice::NodeConfig> soloConfig;
soloConfig.emplace_back(0, local_ip, local_tx_port, true);
ng_configs.try_emplace(0, std::move(soloConfig));
}
else
{
if (!txservice::ReadClusterConfigFile(
cluster_config_file_path, ng_configs, cluster_config_version))
{
// Read cluster topology from general config file in this case
auto parse_res = txservice::ParseNgConfig(ip_port_list,
standby_ip_port_list,
voter_ip_port_list,
ng_configs,
tx_ng_replica_num,
10000);
if (!parse_res)
{
LOG(ERROR)
<< "Failed to extract cluster configs from ip_port_list.";
return false;
}
}
}

Expand Down Expand Up @@ -1056,21 +1067,9 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server)

std::string dss_config_file_path = "";
EloqDS::DataStoreServiceClusterManager ds_config;
uint32_t dss_leader_id = EloqDS::UNKNOWN_DSS_LEADER_NODE_ID;

// use tx node id as the dss node id
// since they are deployed together
uint32_t dss_node_id = node_id;
if (FLAGS_bootstrap || is_single_node)
{
dss_leader_id = node_id;
}

if (!eloq_dss_peer_node.empty())
{
ds_config.SetThisNode(
local_ip,
EloqDS::DataStoreServiceClient::TxPort2DssPort(local_tx_port));
// Fetch ds topology from peer node
if (!EloqDS::DataStoreService::FetchConfigFromPeer(
eloq_dss_peer_node, ds_config))
Expand All @@ -1079,22 +1078,85 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server)
<< eloq_dss_peer_node;
return false;
}
ds_config.SetThisNode(
local_ip,
EloqDS::DataStoreServiceClient::TxPort2DssPort(local_tx_port));
}
else
{
if (ng_configs.size() > 1)
if (FLAGS_bootstrap)
{
LOG(ERROR) << "EloqDS multi-node cluster must specify "
"eloq_dss_peer_node.";
return false;
}
// For bootstrap mode, we need to fetch all node groups to init
// data store shards.
std::unordered_map<uint32_t, std::vector<NodeConfig>>
tmp_ng_configs;
uint64_t tmp_cluster_version = 2;
if (!txservice::ReadClusterConfigFile(cluster_config_file_path,
tmp_ng_configs,
tmp_cluster_version))
{
auto parse_res =
txservice::ParseNgConfig(ip_port_list,
standby_ip_port_list,
voter_ip_port_list,
tmp_ng_configs,
tx_ng_replica_num,
10000);
if (!parse_res)
{
LOG(ERROR) << "Failed to extract cluster configs from "
"ip_port_list.";
return false;
}
}

bool found = false;
uint32_t tmp_node_id = 0;
for (auto &pair : tmp_ng_configs)
{
auto &ng_nodes = pair.second;
for (size_t i = 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_tx_port)
{
tmp_node_id = ng_nodes[i].node_id_;
found = true;
break;
}
}
if (found)
{
break;
}
}
if (!found)
{
LOG(ERROR) << "Failed to find this node in cluster config.";
return false;
}

EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
dss_node_id,
native_ng_id,
ng_configs,
dss_leader_id,
ds_config);
std::unordered_map<uint32_t, uint32_t> ng_leaders;
for (const auto &[ng_id, ng_config] : tmp_ng_configs)
{
ng_leaders.emplace(ng_id, tmp_node_id);
}
EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
tmp_node_id, tmp_ng_configs, ng_leaders, ds_config);
}
else
{
std::unordered_map<uint32_t, uint32_t> ng_leaders;
if (is_single_node)
{
for (const auto &[ng_id, ng_config] : ng_configs)
{
ng_leaders.emplace(ng_id, node_id);
}
}
EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
node_id, ng_configs, ng_leaders, ds_config);
}
}

// Set file cache sync interval
Expand Down Expand Up @@ -1137,11 +1199,10 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server)
// we always set create_if_missing to true
// since non conflicts will happen under
// multi-node deployment.
ret =
data_store_service_->StartService(true, dss_leader_id, dss_node_id);
ret = data_store_service_->StartService(true);
#else
ret = data_store_service_->StartService(
(FLAGS_bootstrap || is_single_node), dss_leader_id, dss_node_id);
ret = data_store_service_->StartService(FLAGS_bootstrap ||
is_single_node);
#endif

if (!ret)
Expand All @@ -1151,7 +1212,15 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server)
}
// setup data store service client
store_hd_ = std::make_unique<EloqDS::DataStoreServiceClient>(
catalog_factory, ds_config, data_store_service_.get());
#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB)
true,
#else
FLAGS_bootstrap || is_single_node,
#endif
catalog_factory,
ds_config,
eloq_dss_peer_node.empty(),
data_store_service_.get());

#endif

Expand Down Expand Up @@ -2231,21 +2300,36 @@ void RedisServiceImpl::GetNodeSlotsInfo(
{
continue;
}
std::sort(bucket_ids.begin(), bucket_ids.end());

nodes_slots.try_emplace(node_id);
std::vector<SlotPair> &slots_vec = nodes_slots.at(node_id);
// map buckets to slots
assert(txservice::total_range_buckets == 1024);
std::vector<uint32_t> slot_ids;
// slots count is 16 times of bucket count
slot_ids.reserve(bucket_ids.size() * 16);
for (uint16_t bucket_id : bucket_ids)
{
for (int i = 0; i < 16; i++)
{
slot_ids.push_back(bucket_id + i * 1024);
}
}

std::sort(slot_ids.begin(), slot_ids.end());

auto res = nodes_slots.try_emplace(node_id);
std::vector<SlotPair> &slots_vec = res.first->second;
SlotPair *slot_pair = &(slots_vec.emplace_back());
slot_pair->start_slot_range = bucket_ids.front();
slot_pair->start_slot_range = slot_ids.front();
slot_pair->end_slot_range = slot_ids.front();

for (size_t idx = 1; idx < bucket_ids.size(); idx++)
for (size_t idx = 1; idx < slot_ids.size(); idx++)
{
if (bucket_ids[idx - 1] + 1 != bucket_ids[idx])
if (slot_ids[idx - 1] + 1 != slot_ids[idx])
{
slot_pair = &(slots_vec.emplace_back());
slot_pair->start_slot_range = bucket_ids[idx];
slot_pair->start_slot_range = slot_ids[idx];
}
slot_pair->end_slot_range = bucket_ids[idx];
slot_pair->end_slot_range = slot_ids[idx];
}
}
}
Expand Down Expand Up @@ -2433,17 +2517,17 @@ void RedisServiceImpl::RedisClusterSlots(std::vector<SlotInfo> &info)
}
}

std::string RedisServiceImpl::GenerateMovedErrorMessage(uint16_t slot_num)
std::string RedisServiceImpl::GenerateMovedErrorMessage(uint16_t slot_id)
{
std::vector<SlotInfo> slot_infos;
RedisClusterSlots(slot_infos);

std::string error_msg("MOVED ");
error_msg.append(std::to_string(slot_num));
error_msg.append(std::to_string(slot_id));
for (auto &slot_info : slot_infos)
{
if (slot_info.start_slot_range <= slot_num &&
slot_info.end_slot_range >= slot_num)
if (slot_info.start_slot_range <= slot_id &&
slot_info.end_slot_range >= slot_id)
{
error_msg.append(" ");
error_msg.append(slot_info.hosts.front().ip);
Expand Down Expand Up @@ -2520,11 +2604,11 @@ bool RedisServiceImpl::SendTxRequestAndWaitResult(
dynamic_cast<ObjectCommandTxRequest *>(tx_req))
{
auto key = object_tx_req->Key();
uint16_t slot_num = key->Hash() & 0x3fff;
uint16_t slot_id = key->Hash() & 0x3fff;

if (error != nullptr)
{
std::string error_msg = GenerateMovedErrorMessage(slot_num);
std::string error_msg = GenerateMovedErrorMessage(slot_id);
error->OnError(error_msg);
}
return false;
Expand All @@ -2539,11 +2623,11 @@ bool RedisServiceImpl::SendTxRequestAndWaitResult(
multi_obj_cmd->KeyPointers(0);
assert(!keys->empty());
// first key slot
uint16_t slot_num = keys->at(0).Hash() & 0x3fff;
uint16_t slot_id = keys->at(0).Hash() & 0x3fff;

if (error != nullptr)
{
std::string error_msg = GenerateMovedErrorMessage(slot_num);
std::string error_msg = GenerateMovedErrorMessage(slot_id);
error->OnError(error_msg);
}

Expand Down
5 changes: 4 additions & 1 deletion tests/unit/mono/expire.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ start_server {tags {"expire"}} {
r del c
r setex c 1 somevalue
set ttl [r pttl c]
assert {$ttl > 500 && $ttl <= 1000}
assert {$ttl > 500 && $ttl <= 1100}
# Why ttl < 1100 ? Because, SETEX calculate the expired timestamp on the node that client connected
# and PTTL calculate the expired timestamp on the node that key located.
# The clock on the two nodes is not same. (TODO: the two command should calculate on same node)
}

test {TTL / PTTL / EXPIRETIME / PEXPIRETIME return -1 if key has no expire} {
Expand Down