Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
673 changes: 395 additions & 278 deletions data_store_service_client.cpp

Large diffs are not rendered by default.

132 changes: 59 additions & 73 deletions data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,44 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
~DataStoreServiceClient();

DataStoreServiceClient(
bool is_bootstrap,
txservice::CatalogFactory *catalog_factory[3],
const DataStoreServiceClusterManager &cluster_manager,
bool bind_data_shard_with_ng,
DataStoreService *data_store_service = nullptr)
: catalog_factory_array_{catalog_factory[0],
catalog_factory[1],
catalog_factory[2],
&range_catalog_factory_,
&hash_catalog_factory_},
data_store_service_(data_store_service)
data_store_service_(data_store_service),
need_bootstrap_(is_bootstrap),
bind_data_shard_with_ng_(bind_data_shard_with_ng)
{
// Init dss cluster config.
dss_topology_version_ = cluster_manager.GetTopologyVersion();
auto all_shards = cluster_manager.GetAllShards();
assert(all_shards.size() == 1);

for (auto &[shard_id, shard] : all_shards)
{
if (shard_id >= dss_shards_.size())
{
LOG(FATAL) << "Shard id " << shard_id
<< " is out of range, should expand the hard-coded "
"dss_shards_ size.";
}
Comment on lines +94 to +99
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add bounds validation for shard_id to prevent array overrun.

The constructor now checks shard_id >= dss_shards_.size() and logs a fatal error, but this only covers initialization. Runtime accesses at lines 659 and elsewhere (e.g., GetOwnerNodeIndexOfShard, UpdateOwnerNodeIndexOfShard) directly index dss_shards_[shard_id] without bounds checks. A shard_id ≥ 1000 from a malicious/buggy caller triggers undefined behavior.

Add validation before array access:

uint32_t GetOwnerNodeIndexOfShard(uint32_t shard_id) const
{
    if (shard_id >= dss_shards_.size()) {
        LOG(ERROR) << "shard_id " << shard_id << " exceeds array bounds";
        return UINT32_MAX;
    }
    assert(dss_shards_[shard_id].load(std::memory_order_acquire) != UINT32_MAX);
    return dss_shards_[shard_id].load(std::memory_order_acquire);
}

Apply similar checks in UpdateOwnerNodeIndexOfShard, UpgradeShardVersion, and other methods that index dss_shards_.

🤖 Prompt for AI Agents
In data_store_service_client.h around lines 94 to 99 and throughout the file
(notably methods that index dss_shards_ such as GetOwnerNodeIndexOfShard,
UpdateOwnerNodeIndexOfShard, UpgradeShardVersion and any other direct
dss_shards_[shard_id] accesses), add explicit bounds validation before accessing
dss_shards_: check if shard_id >= dss_shards_.size(), log an ERROR with the
offending shard_id, and return a sentinel (e.g., UINT32_MAX) for getter-style
methods; for mutators, log and early-return or return a failure code as
appropriate; after the bounds check you may keep the existing assert and
original load/store with the proper memory order. Ensure every direct
dss_shards_[shard_id] access is guarded this way to prevent out-of-range
indexing.

uint32_t node_idx = FindFreeNodeIndex();
auto &node_ref = dss_nodes_[node_idx];
node_ref.Reset(shard.nodes_[0].host_name_,
shard.nodes_[0].port_,
shard.version_);
dss_shards_[shard_id].store(shard_id);
dss_shards_[shard_id].store(node_idx);
dss_shard_ids_.insert(shard_id);
}

// init bucket infos
InitBucketsInfo(dss_shard_ids_, 0, bucket_infos_);
Comment on lines 88 to +110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

Good: Fatal error on shard_id bounds violation during construction.

The constructor now validates shard_id < dss_shards_.size() and logs a fatal error if exceeded. This prevents undefined behavior during initialization. Combined with bounds checking in runtime accessors like GetOwnerNodeIndexOfShard (see data_store_service_client.cpp:3198-3208), the array access pattern is safer.

However, as noted in past reviews, ensure config validation limits shard_count to ≤ 1000 during parsing to prevent invalid configurations from reaching this point. Verify that data_store_service_config.cpp enforces this constraint.


🏁 Script executed:

#!/bin/bash
# Verify shard_count validation in config parsing
echo "=== Checking config validation for shard_count bounds ==="
rg -n "shard_count.*1000|shard_count.*>.*999|MAX.*SHARD" eloq_data_store_service/data_store_service_config.cpp -C 3

echo ""
echo "=== Checking for shard_count assignment/validation ==="
ast-grep --pattern 'shard_count_ = $_' 

Length of output: 595


I need to examine the actual validation logic around these shard_count assignments to determine if the review comment's concern is valid.


🏁 Script executed:

#!/bin/bash
# Examine context around shard_count assignment from config file
echo "=== Context around shard_count assignment (lines 65-75) ==="
sed -n '65,75p' eloq_data_store_service/data_store_service_config.cpp

echo ""
echo "=== Context around shard_count assignment from shards_map (lines 180-190) ==="
sed -n '180,190p' eloq_data_store_service/data_store_service_config.cpp

echo ""
echo "=== Searching for validation functions or checks on shard_count ==="
rg -n "shard_count" eloq_data_store_service/data_store_service_config.cpp | head -20

echo ""
echo "=== Looking for dss_shards_ initialization to determine its size ==="
rg -n "dss_shards_.*size\|dss_shards_\s*\(" eloq_data_store_service/data_store_service_client.h -C 2

Length of output: 1391


🏁 Script executed:

#!/bin/bash
# Examine context around shard_count assignments
echo "=== Context around config file read (lines 65-75) ==="
sed -n '65,75p' eloq_data_store_service/data_store_service_config.cpp

echo ""
echo "=== Context around shards_map assignment (lines 180-190) ==="
sed -n '180,190p' eloq_data_store_service/data_store_service_config.cpp

echo ""
echo "=== Search for any validation or bounds checks on shard_count ==="
rg -n "shard_count.*[<>]|shard_count.*validate|shard_count.*check" eloq_data_store_service/ -C 2

echo ""
echo "=== Looking for MAX_SHARD or similar constants ==="
rg -n "MAX.*SHARD|SHARD.*MAX|1000" eloq_data_store_service/data_store_service_client.h -B 2 -A 2

echo ""
echo "=== Check dss_shards_ declaration and size ==="
rg -n "dss_shards_\s*\[|std::array.*dss_shards_" eloq_data_store_service/data_store_service_client.h -C 1

Length of output: 1517


I need to locate the correct path for the header file and dss_shards_ declaration. Let me search for the file.


🏁 Script executed:

#!/bin/bash
# Find the correct path for data_store_service_client.h
echo "=== Finding data_store_service_client.h ==="
fd -t f "data_store_service_client.h" 

echo ""
echo "=== Finding dss_shards_ declaration ==="
rg -n "dss_shards_\s*\[|std::array.*dss_shards_|dss_shards_\s*;" -C 2

echo ""
echo "=== Looking for array size constants (1000, MAX) ==="
rg -n "1000|MAX_SHARD" . -C 1 | head -30

Length of output: 4792


Config validation for shard_count is missing—add bounds check during parsing.

Verification confirms the concern: data_store_service_config.cpp (lines 69-70) reads shard_count from the config file with no upper bounds validation. While the constructor adds a runtime guard (shard_id < dss_shards_.size()), dss_shards_ is fixed at 1000 elements (line 749 of data_store_service_client.h), and invalid configs can be parsed and stored without detection.

Add validation during config parsing to reject shard_count > 1000 before it reaches the client, ensuring invalid configurations fail fast at parse time rather than at runtime.

🤖 Prompt for AI Agents
In data_store_service_config.cpp around lines 69-70, add validation to reject
parsed shard_count values greater than the hard limit (1000) before storing them
in the config: after reading shard_count, check if shard_count == 0 or
shard_count > 1000 and return/log a clear parsing/configuration error (or throw)
so the config load fails fast; update any config parsing return path to
propagate the failure to callers so invalid configs are not accepted and later
cause runtime FATALs in data_store_service_client.h.


if (data_store_service_ != nullptr)
{
data_store_service_->AddListenerForUpdateConfig(
Expand All @@ -107,15 +121,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
<< txservice::Sequences::table_name_sv_;

AppendPreBuiltTable(txservice::Sequences::table_name_);

be_bucket_ids_.reserve(txservice::Sharder::TotalRangeBuckets());
for (uint16_t bucket_id = 0;
bucket_id < txservice::Sharder::TotalRangeBuckets();
++bucket_id)
{
uint16_t be_bucket_id = EloqShare::host_to_big_endian(bucket_id);
be_bucket_ids_.push_back(be_bucket_id);
}
}

// The maximum number of retries for RPC requests.
Expand All @@ -132,11 +137,10 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
}

static void TxConfigsToDssClusterConfig(
uint32_t dss_node_id, // = 0,
uint32_t ng_id, // = 0,
uint32_t node_id, // = 0,
const std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>>
&ng_configs,
uint32_t dss_leader_node_id, // if no leader,set uint32t_max
const std::unordered_map<uint32_t, uint32_t> &ng_leaders,
DataStoreServiceClusterManager &cluster_manager);

void ConnectToLocalDataStoreService(
Expand Down Expand Up @@ -388,11 +392,12 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
void RestoreTxCache(txservice::NodeGroupId cc_ng_id,
int64_t cc_ng_term) override;

bool OnLeaderStart(uint32_t *next_leader_node) override;
bool OnLeaderStart(uint32_t ng_id, uint32_t *next_leader_node) override;

bool OnLeaderStop(int64_t term) override;
bool OnLeaderStop(uint32_t ng_id, int64_t term) override;

void OnStartFollowing(uint32_t leader_node_id,
void OnStartFollowing(uint32_t ng_id,
uint32_t leader_node_id,
int64_t term,
int64_t standby_term,
bool resubscribe) override;
Expand Down Expand Up @@ -439,15 +444,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
static uint32_t HashArchiveKey(const std::string &kv_table_name,
const txservice::TxKey &tx_key);

static void EncodeKvKeyForHashPart(uint16_t bucket_id,
std::string &key_out);
static void EncodeKvKeyForHashPart(uint16_t bucket_id,
const std::string_view &tx_key,
std::string &key_out);

static std::string_view DecodeKvKeyForHashPart(const char *data,
size_t size);

// NOTICE: be_commit_ts is the big endian encode value of commit_ts
static std::string EncodeArchiveKey(std::string_view table_name,
std::string_view key,
Expand Down Expand Up @@ -488,6 +484,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
bool DeleteCatalog(const txservice::TableName &base_table_name,
uint64_t write_time);

uint32_t GetShardIdByPartitionId(int32_t partition_id,
bool is_range_partition) const;

private:
int32_t MapKeyHashToPartitionId(const txservice::TxKey &key) const
{
Expand All @@ -500,8 +499,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
// =====================================================

void Read(const std::string_view kv_table_name,
const uint32_t partition_id,
const std::string_view be_bucket_id,
const int32_t partition_id,
const uint32_t shard_id,
const std::string_view key,
void *callback_data,
DataStoreCallback callback);
Expand All @@ -511,6 +510,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
void BatchWriteRecords(
std::string_view kv_table_name,
int32_t partition_id,
uint32_t shard_id,
std::vector<std::string_view> &&key_parts,
std::vector<std::string_view> &&record_parts,
std::vector<uint64_t> &&records_ts,
Expand Down Expand Up @@ -553,6 +553,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
*/
void DeleteRange(const std::string_view table_name,
const int32_t partition_id,
uint32_t shard_id,
const std::string &start_key,
const std::string &end_key,
const bool skip_wal,
Expand All @@ -572,7 +573,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

void ScanNext(
const std::string_view table_name,
uint32_t partition_id,
int32_t partition_id,
uint32_t shard_id,
const std::string_view start_key,
const std::string_view end_key,
const std::string_view session_id,
Expand All @@ -588,7 +590,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
void ScanNextInternal(ScanNextClosure *scan_next_closure);

void ScanClose(const std::string_view table_name,
uint32_t partition_id,
int32_t partition_id,
uint32_t shard_id,
std::string &session_id,
void *callback_data,
DataStoreCallback callback);
Expand Down Expand Up @@ -625,29 +628,15 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
// statistics and etc.).
int32_t KvPartitionIdOf(const txservice::TableName &table) const
{
#ifdef USE_ONE_ELOQDSS_PARTITION
return 0;
#else
std::string_view sv = table.StringView();
return (std::hash<std::string_view>()(sv)) & 0x3FF;
#endif
auto hash_code = std::hash<std::string_view>()(sv);
return txservice::Sharder::MapKeyHashToHashPartitionId(hash_code);
}

int32_t KvPartitionIdOf(int32_t key_partition,
bool is_range_partition = true)
{
#ifdef USE_ONE_ELOQDSS_PARTITION
if (is_range_partition)
{
return key_partition;
}
else
{
return 0;
}
#else
return key_partition;
#endif
}

const txservice::CatalogFactory *GetCatalogFactory(
Expand All @@ -657,51 +646,48 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
}

/**
* @brief Check if the shard_id is local to the current node.
* @brief Check if the owner of shard is the local DataStoreService node.
* @param shard_id
* @return true if the shard_id is local to the current node.
* @return true if the owner of shard is the local DataStoreService node
*/
bool IsLocalShard(uint32_t shard_id);

/**
* @brief Check if the partition_id is local to the current node.
* @param partition_id
* @return true if the partition_id is local to the current node.
* @brief Get the index of the shard's owner node in dss_nodes_.
* @param shard_id
* @return uint32_t
*/
bool IsLocalPartition(int32_t partition_id);

uint32_t GetShardIdByPartitionId(int32_t partition_id) const;
uint32_t AllDataShardCount() const;
uint32_t GetOwnerNodeIndexOfShard(uint32_t shard_id) const;
std::vector<uint32_t> GetAllDataShards();
bool UpdateOwnerNodeIndexOfShard(uint32_t shard_id,
uint32_t old_node_index,
uint32_t &new_node_index);
void InitBucketsInfo(
const std::set<uint32_t> &node_groups,
uint64_t version,
std::unordered_map<uint16_t, std::unique_ptr<txservice::BucketInfo>>
&ng_bucket_infos);

void UpdateShardOwner(uint32_t shard_id, uint32_t node_id);

uint32_t FindFreeNodeIndex();
void HandleShardingError(const ::EloqDS::remote::CommonResult &result);
bool UpgradeShardVersion(uint32_t shard_id,
uint64_t shard_version,
const std::string &host_name,
uint16_t port);

std::string_view EncodeBucketId(uint16_t bucket_id)
{
uint16_t &be_bucket_id = be_bucket_ids_[bucket_id];
return std::string_view(reinterpret_cast<const char *>(&be_bucket_id),
sizeof(uint16_t));
}

txservice::EloqHashCatalogFactory hash_catalog_factory_{};
txservice::EloqRangeCatalogFactory range_catalog_factory_{};
// TODO(lzx): define a global catalog factory array that used by
// EngineServer TxService and DataStoreHandler
std::array<const txservice::CatalogFactory *, 5> catalog_factory_array_;

// bthread::Mutex ds_service_mutex_;
// bthread::ConditionVariable ds_service_cv_;
// std::atomic<bool> ds_serv_shutdown_indicator_;
// point to the data store service if it is colocated
DataStoreService *data_store_service_;

bool need_bootstrap_{false};
bool bind_data_shard_with_ng_{false};

struct DssNode
{
DssNode() = default;
Expand Down Expand Up @@ -760,20 +746,20 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
const uint64_t NodeExpiredTime = 10 * 1000 * 1000; // 10s
// Now only support one shard. dss_shards_ caches the index in dss_nodes_ of
// shard owner.
std::array<std::atomic<uint32_t>, 1> dss_shards_;
std::array<std::atomic<uint32_t>, 1000> dss_shards_;
std::atomic<uint64_t> dss_topology_version_{0};

// std::atomic<uint64_t> flying_remote_fetch_count_{0};
// // Work queue for fetch records from primary node
// std::deque<txservice::FetchRecordCc *> remote_fetch_cc_queue_;
std::shared_mutex dss_shard_ids_mutex_;
std::set<uint32_t> dss_shard_ids_;
// key is bucket id, value is bucket info.
std::unordered_map<uint16_t, std::unique_ptr<txservice::BucketInfo>>
bucket_infos_;

// table names and their kv table names
std::unordered_map<txservice::TableName, std::string>
pre_built_table_names_;
ThreadWorkerPool upsert_table_worker_{1};

std::vector<uint16_t> be_bucket_ids_;

friend class ReadClosure;
friend class BatchWriteRecordsClosure;
friend class FlushDataClosure;
Expand Down
Loading