Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 281 additions & 186 deletions data_store_service_client.cpp

Large diffs are not rendered by default.

152 changes: 106 additions & 46 deletions data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ScanNextClosure;
class CreateSnapshotForBackupClosure;
class SinglePartitionScanner;

class DssClusterConfig;

typedef void (*DataStoreCallback)(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
Expand All @@ -69,16 +71,27 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
txservice::CatalogFactory *catalog_factory[3],
const DataStoreServiceClusterManager &cluster_manager,
DataStoreService *data_store_service = nullptr)
: ds_serv_shutdown_indicator_(false),
catalog_factory_array_{catalog_factory[0],
: catalog_factory_array_{catalog_factory[0],
catalog_factory[1],
catalog_factory[2],
&range_catalog_factory_,
&hash_catalog_factory_},
cluster_manager_(cluster_manager),
data_store_service_(data_store_service),
flying_remote_fetch_count_(0)
data_store_service_(data_store_service)
{
// Init dss cluster config.
dss_topology_version_ = cluster_manager.GetTopologyVersion();
auto all_shards = cluster_manager.GetAllShards();
assert(all_shards.size() == 1);
for (auto &[shard_id, shard] : all_shards)
{
uint32_t node_idx = FindFreeNodeIndex();
auto &node_ref = dss_nodes_[node_idx];
node_ref.Reset(shard.nodes_[0].host_name_,
shard.nodes_[0].port_,
shard.version_);
dss_shards_[shard_id].store(shard_id);
}

Comment on lines +82 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix out-of-bounds access on dss_shards_.

dss_shards_ now has size 1, but we still subscript it with shard_id. Any non-zero shard id (perfectly valid even in a single-shard deployment) will index past the end and corrupt memory. Please store the owner slot at index 0 (and assert shard.shard_id_ == 0 if that’s the intended invariant), or switch back to a container keyed by the actual shard id. Keeping the current code is undefined behaviour.

🤖 Prompt for AI Agents
data_store_service_client.h around lines 82-94: the code subscripts dss_shards_
with shard_id while dss_shards_ has fixed size 1, causing out-of-bounds access
for non-zero shard ids. Fix by either (A) if the design guarantees a single
shard, assert that the shard's id is 0 and store the owner into dss_shards_[0]
instead of dss_shards_[shard_id]; or (B) if multiple shard ids must be
supported, replace dss_shards_ with a container keyed by shard_id (e.g., a map
or vector sized to max shard id) and insert/store using the actual shard_id.
Implement one of these two fixes and add an assertion or bounds-check to prevent
future UB.

Comment on lines +74 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix OOB and wrong value stored in dss_shards_; remember actual shard id

dss_shards_ has size 1, but you subscript with shard_id and store shard_id (not node index). This is UB and incorrect mapping when the actual lone shard id ≠ 0.

Apply this diff to the ctor body:

-        for (auto &[shard_id, shard] : all_shards)
+        for (auto &[shard_id, shard] : all_shards)
         {
             uint32_t node_idx = FindFreeNodeIndex();
             auto &node_ref = dss_nodes_[node_idx];
             node_ref.Reset(shard.nodes_[0].host_name_,
                            shard.nodes_[0].port_,
                            shard.version_);
-            dss_shards_[shard_id].store(shard_id);
+            // Remember the only shard id, and store owner node index at slot 0
+            single_shard_id_ = shard_id;
+            dss_shards_[0].store(node_idx, std::memory_order_release);
         }

Declare a field to hold the actual shard id (outside this hunk, in the private section):

uint32_t single_shard_id_{0};

Follow-up: update client.cpp call sites to (a) assert the passed shard_id equals single_shard_id_, and (b) read/write dss_shards_[0]. See suggested diffs in client.cpp comments.

🤖 Prompt for AI Agents
In data_store_service_client.h around lines 74-94, the ctor indexes dss_shards_
with the actual shard_id (which can be nonzero) causing OOB and stores the
shard_id instead of recording the actual shard id; declare a new private field
uint32_t single_shard_id_ and in this ctor set single_shard_id_ = shard_id for
the lone shard, then always read/write dss_shards_[0] (not
dss_shards_[shard_id]) and store the node index/slot or relevant node data
there; additionally update client.cpp call sites to assert that any passed
shard_id equals single_shard_id_ and change accesses to use dss_shards_[0] for
reads/writes.

if (data_store_service_ != nullptr)
{
data_store_service_->AddListenerForUpdateConfig(
Expand Down Expand Up @@ -359,24 +372,6 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

void OnShutdown() override;

void HandleShardingError(const ::EloqDS::remote::CommonResult &result)
{
cluster_manager_.HandleShardingError(result);
}

std::shared_ptr<brpc::Channel> GetDataStoreServiceChannelByPartitionId(
uint32_t partition_id);
std::shared_ptr<brpc::Channel> UpdateDataStoreServiceChannelByPartitionId(
uint32_t partition_id);
std::shared_ptr<brpc::Channel> GetDataStoreServiceChannelByShardId(
uint32_t shard_id);
std::shared_ptr<brpc::Channel> UpdateDataStoreServiceChannelByShardId(
uint32_t shard_id);
std::shared_ptr<brpc::Channel> GetDataStoreServiceChannel(
const DSSNode &node);
std::shared_ptr<brpc::Channel> UpdateDataStoreServiceChannel(
const DSSNode &node);

/**
* Serialize a record with is_deleted flag and record string.
* @param is_deleted
Expand Down Expand Up @@ -616,50 +611,115 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
#endif
}

const txservice::CatalogFactory *GetCatalogFactory(
txservice::TableEngine table_engine)
{
return catalog_factory_array_.at(static_cast<int>(table_engine) - 1);
}

/**
* @brief Check if the shard_id is local to the current node.
* @param shard_id
* @return true if the shard_id is local to the current node.
*/
bool IsLocalShard(uint32_t shard_id);

uint32_t GetShardIdByPartitionId(int32_t partition_id)
{
return cluster_manager_.GetShardIdByPartitionId(partition_id);
}

const txservice::CatalogFactory *GetCatalogFactory(
txservice::TableEngine table_engine)
{
return catalog_factory_array_.at(static_cast<int>(table_engine) - 1);
}

/**
* @brief Check if the partition_id is local to the current node.
* @param partition_id
* @return true if the partition_id is local to the current node.
*/
bool IsLocalPartition(int32_t partition_id);

bthread::Mutex ds_service_mutex_;
bthread::ConditionVariable ds_service_cv_;
std::atomic<bool> ds_serv_shutdown_indicator_;
uint32_t GetShardIdByPartitionId(int32_t partition_id) const;
uint32_t AllDataShardCount() const;
uint32_t GetOwnerNodeIndexOfShard(uint32_t shard_id) const;
bool UpdateOwnerNodeIndexOfShard(uint32_t shard_id,
uint32_t old_node_index,
uint32_t &new_node_index);
uint32_t FindFreeNodeIndex();
void HandleShardingError(const ::EloqDS::remote::CommonResult &result);
bool UpgradeShardVersion(uint32_t shard_id,
uint64_t shard_version,
const std::string &host_name,
uint16_t port);

txservice::EloqHashCatalogFactory hash_catalog_factory_{};
txservice::EloqRangeCatalogFactory range_catalog_factory_{};
// TODO(lzx): define a global catalog factory array that used by
// EngineServer TxService and DataStoreHandler
std::array<const txservice::CatalogFactory *, 5> catalog_factory_array_;

// remote data store service configuration
DataStoreServiceClusterManager cluster_manager_;

// bthread::Mutex ds_service_mutex_;
// bthread::ConditionVariable ds_service_cv_;
// std::atomic<bool> ds_serv_shutdown_indicator_;
// point to the data store service if it is colocated
DataStoreService *data_store_service_;

std::atomic<uint64_t> flying_remote_fetch_count_{0};
// Work queue for fetch records from primary node
std::deque<txservice::FetchRecordCc *> remote_fetch_cc_queue_;
struct DssNode
{
DssNode() = default;
~DssNode() = default;
DssNode(const DssNode &rhs)
: host_name_(rhs.host_name_),
port_(rhs.port_),
shard_verion_(rhs.shard_verion_)
{
}
DssNode &operator=(const DssNode &) = delete;

void Reset(const std::string hostname,
uint16_t port,
uint64_t shard_version)
{
assert(expired_ts_.load(std::memory_order_acquire) == 0);
host_name_ = hostname;
port_ = port;
shard_verion_ = shard_version;
channel_.Init(host_name_.c_str(), port_, nullptr);
}

const std::string &HostName() const
{
return host_name_;
}
uint16_t Port() const
{
return port_;
}
uint64_t ShardVersion() const
{
return shard_verion_;
}
brpc::Channel *Channel()
{
assert(!host_name_.empty() && port_ != 0);
return &channel_;
}

// expired_ts_ is the timestamp when the node is expired.
// If expired_ts_ is 0, the node is not expired.
// If expired_ts_ is not 0, the node is expired and the value is the
// timestamp when the node is expired.
std::atomic<uint64_t> expired_ts_{1U};

private:
std::string host_name_;
uint16_t port_;
brpc::Channel channel_;
uint64_t shard_verion_;
};
// Cached leader nodes info of data shard.
std::array<DssNode, 1024> dss_nodes_;
const uint64_t NodeExpiredTime = 10 * 1000 * 1000; // 10s
// Now only support one shard. dss_shards_ caches the index in dss_nodes_ of
// shard owner.
std::array<std::atomic<uint32_t>, 1> dss_shards_;
std::atomic<uint64_t> dss_topology_version_{0};

Comment on lines +713 to +719
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Single-shard modeling: clarify and constrain API

With std::array<std::atomic<uint32_t>, 1> dss_shards_, all getters/setters should use index 0 and validate shard_id == single_shard_id_. Current API suggests arbitrary shard ids and leads to UB elsewhere.

If multi-shard isn't supported yet:

  • Add asserts in getters/setters that shard_id == single_shard_id_.
  • Route all reads/writes to dss_shards_[0].
  • Expose single_shard_id_ to build correct shard_id vectors (don’t synthesize 0..N-1). See client.cpp diffs.

// std::atomic<uint64_t> flying_remote_fetch_count_{0};
// // Work queue for fetch records from primary node
// std::deque<txservice::FetchRecordCc *> remote_fetch_cc_queue_;

// table names and their kv table names
std::unordered_map<txservice::TableName, std::string>
Expand All @@ -674,9 +734,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
friend class ScanNextClosure;
friend class CreateSnapshotForBackupClosure;
friend void PartitionBatchCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);
friend class SinglePartitionScanner;
friend void FetchAllDatabaseCallback(void *data,
::google::protobuf::Closure *closure,
Expand Down
Loading