Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 148 additions & 6 deletions data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
* <http://www.gnu.org/licenses/>.
*
*/
#include "data_store_service_client.h"

#include <glog/logging.h>

#include <boost/lexical_cast.hpp>
Expand All @@ -36,7 +34,9 @@
#include <vector>

#include "cc_req_misc.h"
#include "data_store_service_client.h"
#include "data_store_service_client_closure.h"
#include "data_store_service_config.h"
#include "data_store_service_scanner.h"
#include "eloq_data_store_service/object_pool.h" // ObjectPool
#include "eloq_data_store_service/thread_worker_pool.h"
Expand Down Expand Up @@ -132,8 +132,78 @@ void DataStoreServiceClient::SetupConfig(
LOG(INFO) << "UpgradeShardVersion failed, retry";
bthread_usleep(1000000);
}
LOG(INFO) << "UpgradeShardVersion success, shard_id:"
<< group.shard_id_ << ", version:" << group.version_;
LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, "
"shard_id:"
<< group.shard_id_ << ", version:" << group.version_
<< ", owner_node:" << group.nodes_[0].host_name_ << ":"
<< group.nodes_[0].port_;
}
}
else
{
LOG(INFO)
<< "DataStoreServiceCliet SetupConfig skipped, current_version:"
<< current_version << ", new_version:" << new_version;
}
Comment on lines +135 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix log typos for clarity

"DataStoreServiceCliet" → "DataStoreServiceClient".

Apply:

-            LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, "
+            LOG(INFO) << "DataStoreServiceClient UpgradeShardVersion success, "
@@
-        LOG(INFO)
-            << "DataStoreServiceCliet SetupConfig skipped, current_version:"
+        LOG(INFO)
+            << "DataStoreServiceClient SetupConfig skipped, current_version:"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, "
"shard_id:"
<< group.shard_id_ << ", version:" << group.version_
<< ", owner_node:" << group.nodes_[0].host_name_ << ":"
<< group.nodes_[0].port_;
}
}
else
{
LOG(INFO)
<< "DataStoreServiceCliet SetupConfig skipped, current_version:"
<< current_version << ", new_version:" << new_version;
}
LOG(INFO) << "DataStoreServiceClient UpgradeShardVersion success, "
"shard_id:"
<< group.shard_id_ << ", version:" << group.version_
<< ", owner_node:" << group.nodes_[0].host_name_ << ":"
<< group.nodes_[0].port_;
}
}
else
{
LOG(INFO)
<< "DataStoreServiceClient SetupConfig skipped, current_version:"
<< current_version << ", new_version:" << new_version;
}
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 135 to 147, fix the typo in the
log messages where "DataStoreServiceCliet" should be "DataStoreServiceClient":
update both occurrences in the INFO logs (the success message and the skipped
SetupConfig message) to use the correct service name while keeping existing
context (shard_id, version, owner_node and current_version/new_version)
unchanged.

}

void DataStoreServiceClient::TxConfigsToDssClusterConfig(
uint32_t dss_node_id, // = 0,
uint32_t ng_id, // = 0,
const std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>>
&ng_configs,
uint32_t dss_leader_node_id, // if no leader,set uint32t_max
DataStoreServiceClusterManager &cluster_manager)
{
assert(ng_configs.size() == 1);
auto it = ng_configs.find(ng_id);
assert(it != ng_configs.end());
auto &ng_member_configs = it->second;

const txservice::NodeConfig *this_node = nullptr;
const txservice::NodeConfig *leader_node = nullptr;
for (auto &node_config : ng_member_configs)
{
if (node_config.node_id_ == dss_node_id)
{
this_node = &node_config;
}
if (node_config.node_id_ == dss_leader_node_id)
{
leader_node = &node_config;
}
}
assert(this_node != nullptr);
assert(dss_leader_node_id == UNKNOWN_DSS_LEADER_NODE_ID ||
leader_node != nullptr);
cluster_manager.Initialize(this_node->host_name_,
TxPort2DssPort(this_node->port_));

std::vector<DSSNode> shard_nodes;
for (auto &node_config : ng_member_configs)
{
if (node_config.node_id_ != dss_node_id)
{
DSSNode dss_node(node_config.host_name_,
TxPort2DssPort(node_config.port_));
cluster_manager.AddShardMember(ng_id, dss_node);
}
}

if (dss_leader_node_id != dss_node_id)
{
LOG(INFO) << "cluster_manager change shard status " << ng_id << " from "
<< static_cast<int>(
cluster_manager.FetchDSShardStatus(ng_id));
cluster_manager.SwitchShardToClosed(ng_id, DSShardStatus::ReadWrite);
LOG(INFO) << "cluster_manager change shard status " << ng_id << " to "
<< static_cast<int>(
cluster_manager.FetchDSShardStatus(ng_id));
if (dss_leader_node_id != UNKNOWN_DSS_LEADER_NODE_ID)
{
DSSNode dss_node(leader_node->host_name_,
TxPort2DssPort(leader_node->port_));
cluster_manager.UpdatePrimaryNode(ng_id, dss_node);
}
}
}
Expand Down Expand Up @@ -213,6 +283,8 @@ bool DataStoreServiceClient::PutAll(
std::vector<std::unique_ptr<txservice::FlushTaskEntry>>>
&flush_task)
{
DLOG(INFO) << "DataStoreServiceClient::PutAll called with "
<< flush_task.size() << " tables to flush.";
uint64_t now = txservice::LocalCcShards::ClockTsInMillseconds();

// Process each table
Expand Down Expand Up @@ -2868,6 +2940,31 @@ void DataStoreServiceClient::RestoreTxCache(txservice::NodeGroupId cc_ng_id,
*/
bool DataStoreServiceClient::OnLeaderStart(uint32_t *next_leader_node)
{
DLOG(INFO)
<< "DataStoreServiceClient OnLeaderStart called data_store_service_:"
<< data_store_service_;
if (data_store_service_ != nullptr)
{
// Now, only support one shard.
data_store_service_->OpenDataStore(0);
}

Connect();

return true;
}

bool DataStoreServiceClient::OnLeaderStop(int64_t term)
{
DLOG(INFO)
<< "DataStoreServiceClient OnLeaderStop called data_store_service_:"
<< data_store_service_;
// swith to read only in case of data store status is read write
if (data_store_service_ != nullptr)
{
// Now, only support one shard.
data_store_service_->CloseDataStore(0);
Comment on lines +2960 to +2966
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Nit: fix log/comment typos

"swith to read only" → "switch to read-only".

Apply:

-    // swith to read only in case of data store status is read write
+    // switch to read-only in case data store status is read-write
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2960 to 2966, fix the typo in the
inline comment and log text: change "swith to read only" to "switch to
read-only" (and ensure consistent hyphenation/casing if used elsewhere in the
block). Update the comment and any adjacent log strings to the corrected wording
so the code reads clearly and consistently.

}
return true;
}

Expand All @@ -2878,8 +2975,53 @@ bool DataStoreServiceClient::OnLeaderStart(uint32_t *next_leader_node)
* following another leader and can be used to perform follower-specific
* initialization.
*/
void DataStoreServiceClient::OnStartFollowing()
void DataStoreServiceClient::OnStartFollowing(uint32_t leader_node_id,
int64_t term,
int64_t standby_term,
bool resubscribe)
{
DLOG(INFO)
<< "DataStoreServiceClient OnStartFollowing called data_store_service_:"
<< data_store_service_;
if (data_store_service_ != nullptr)
{
// Now, only support one shard.
data_store_service_->CloseDataStore(0);
}

// Treat leader_node_id as dss_leader_node_id
uint32_t dss_leader_node_id = leader_node_id;
uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup();

// Update leader node in cluster_manager if necessary
auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs();
auto ng_config_it = ng_configs.find(dss_shard_id);
assert(ng_config_it != ng_configs.end());
auto ng_member_configs = ng_config_it->second;
const txservice::NodeConfig *dss_leader_node_config = nullptr;
for (const auto &node_config : ng_member_configs)
{
if (node_config.node_id_ == dss_leader_node_id)
{
dss_leader_node_config = &node_config;
break;
}
}
assert(dss_leader_node_config != nullptr);
DSSNode dss_leader_node(dss_leader_node_config->host_name_,
TxPort2DssPort(dss_leader_node_config->port_));
auto &cluster_manager = data_store_service_->GetClusterManager();
cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node);
DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id
<< ", DSSNode: " << dss_leader_node.host_name_ << ":"
<< dss_leader_node.port_;
// Pump the dss shard version
cluster_manager.UpdateDSShardVersion(
dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1);
// Update the client config
SetupConfig(cluster_manager);

Connect();
}
Comment on lines +2983 to 3025
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t dereference data_store_service_ when it’s null

data_store_service_ is optional; on remote clients it stays nullptr. After the early if, the code still calls data_store_service_->GetClusterManager() which dereferences null and crashes every follower transition. Keep all of the cluster-manager work inside the data_store_service_ != nullptr branch (or bail out early) so we only touch the colocated service when it actually exists.

Apply this diff:

-    if (data_store_service_ != nullptr)
-    {
-        // Now, only support one shard.
-        data_store_service_->CloseDataStore(0);
-    }
-
-    // Treat leader_node_id as dss_leader_node_id
-    uint32_t dss_leader_node_id = leader_node_id;
-    uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup();
-
-    // Update leader node in cluster_manager if necessary
-    auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs();
-    auto ng_config_it = ng_configs.find(dss_shard_id);
-    assert(ng_config_it != ng_configs.end());
-    auto ng_member_configs = ng_config_it->second;
-    const txservice::NodeConfig *dss_leader_node_config = nullptr;
-    for (const auto &node_config : ng_member_configs)
-    {
-        if (node_config.node_id_ == dss_leader_node_id)
-        {
-            dss_leader_node_config = &node_config;
-            break;
-        }
-    }
-    assert(dss_leader_node_config != nullptr);
-    DSSNode dss_leader_node(dss_leader_node_config->host_name_,
-                            TxPort2DssPort(dss_leader_node_config->port_));
-    auto &cluster_manager = data_store_service_->GetClusterManager();
-    cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node);
-    DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id
-               << ", DSSNode: " << dss_leader_node.host_name_ << ":"
-               << dss_leader_node.port_;
-    // Pump the dss shard version
-    cluster_manager.UpdateDSShardVersion(
-        dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1);
-    // Update the client config
-    SetupConfig(cluster_manager);
-
-    Connect();
+    if (data_store_service_ != nullptr)
+    {
+        // Now, only support one shard.
+        data_store_service_->CloseDataStore(0);
+
+        // Treat leader_node_id as dss_leader_node_id
+        uint32_t dss_leader_node_id = leader_node_id;
+        uint32_t dss_shard_id =
+            txservice::Sharder::Instance().NativeNodeGroup();
+
+        // Update leader node in cluster_manager if necessary
+        auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs();
+        auto ng_config_it = ng_configs.find(dss_shard_id);
+        assert(ng_config_it != ng_configs.end());
+        auto ng_member_configs = ng_config_it->second;
+        const txservice::NodeConfig *dss_leader_node_config = nullptr;
+        for (const auto &node_config : ng_member_configs)
+        {
+            if (node_config.node_id_ == dss_leader_node_id)
+            {
+                dss_leader_node_config = &node_config;
+                break;
+            }
+        }
+        assert(dss_leader_node_config != nullptr);
+        DSSNode dss_leader_node(dss_leader_node_config->host_name_,
+                                TxPort2DssPort(dss_leader_node_config->port_));
+        auto &cluster_manager = data_store_service_->GetClusterManager();
+        cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node);
+        DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id
+                   << ", DSSNode: " << dss_leader_node.host_name_ << ":"
+                   << dss_leader_node.port_;
+        // Pump the dss shard version
+        cluster_manager.UpdateDSShardVersion(
+            dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1);
+        // Update the client config
+        SetupConfig(cluster_manager);
+    }
+
+    Connect();
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2983-3025, the code dereferences
data_store_service_ after an early null check (calling
data_store_service_->GetClusterManager()), which will crash when
data_store_service_ is nullptr on remote clients; either move all
cluster-manager-related work (finding dss_leader_node_config, constructing
DSSNode, calling GetClusterManager(), UpdatePrimaryNode, UpdateDSShardVersion,
and SetupConfig) inside the existing if (data_store_service_ != nullptr) block,
or bail out early (return) right after CloseDataStore(0) when
data_store_service_ is nullptr so no member access occurs; ensure Connect() is
only called in the appropriate code path depending on whether it relies on
data_store_service_.


/**
Expand Down Expand Up @@ -4610,4 +4752,4 @@ void DataStoreServiceClient::PrepareRangePartitionBatches(
}
}

} // namespace EloqDS
} // namespace EloqD
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Namespace comment typo

Close comment says EloqD; should be EloqDS.

Apply:

-}  // namespace EloqD
+}  // namespace EloqDS
🤖 Prompt for AI Agents
In data_store_service_client.cpp around line 4755, the namespace closing comment
reads "EloqD" but should be "EloqDS"; update the trailing comment to exactly
match the opened namespace name (EloqDS) so it reads "}  // namespace EloqDS".

20 changes: 19 additions & 1 deletion data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
*/
void SetupConfig(const DataStoreServiceClusterManager &config);

static uint16_t TxPort2DssPort(uint16_t tx_port)
{
return tx_port + 7;
}
Comment on lines +129 to +132
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Guard against Tx→DSS port overflow

tx_port + 7 on a uint16_t silently wraps once tx_port > 65528, so callers end up dialing the wrong TCP port (e.g., 65535 becomes 6). That breaks reconnects for perfectly valid configurations. Please promote to a wider type and assert/guard before casting back so we never wrap.

Apply this diff:

-    static uint16_t TxPort2DssPort(uint16_t tx_port)
-    {
-        return tx_port + 7;
-    }
+    static uint16_t TxPort2DssPort(uint16_t tx_port)
+    {
+        constexpr uint16_t kDelta = 7;
+        assert(tx_port <= static_cast<uint16_t>(UINT16_MAX - kDelta));
+        return static_cast<uint16_t>(tx_port + kDelta);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static uint16_t TxPort2DssPort(uint16_t tx_port)
{
return tx_port + 7;
}
static uint16_t TxPort2DssPort(uint16_t tx_port)
{
constexpr uint16_t kDelta = 7;
assert(tx_port <= static_cast<uint16_t>(UINT16_MAX - kDelta));
return static_cast<uint16_t>(tx_port + kDelta);
}
🤖 Prompt for AI Agents
In data_store_service_client.h around lines 129 to 132, the function
TxPort2DssPort adds 7 to a uint16_t which can overflow and wrap for tx_port >
65528; change the implementation to perform the addition using a wider integer
type (e.g., uint32_t or unsigned), check/guard that tx_port + 7 <=
std::numeric_limits<uint16_t>::max() (or assert/throw if out of range), and only
then cast the result back to uint16_t when returning so wrapping cannot occur.


static void TxConfigsToDssClusterConfig(
uint32_t dss_node_id, // = 0,
uint32_t ng_id, // = 0,
const std::unordered_map<uint32_t, std::vector<txservice::NodeConfig>>
&ng_configs,
uint32_t dss_leader_node_id, // if no leader,set uint32t_max
DataStoreServiceClusterManager &cluster_manager);

void ConnectToLocalDataStoreService(
std::unique_ptr<DataStoreService> ds_serv);

Expand Down Expand Up @@ -377,7 +390,12 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

bool OnLeaderStart(uint32_t *next_leader_node) override;

void OnStartFollowing() override;
bool OnLeaderStop(int64_t term) override;

void OnStartFollowing(uint32_t leader_node_id,
int64_t term,
int64_t standby_term,
bool resubscribe) override;

void OnShutdown() override;

Expand Down
85 changes: 76 additions & 9 deletions eloq_data_store_service/data_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ DataStoreService::~DataStoreService()
}
}

bool DataStoreService::StartService(bool create_db_if_missing)
bool DataStoreService::StartService(bool create_db_if_missing,
uint32_t dss_leader_node_id,
uint32_t dss_node_id)
{
if (server_ != nullptr)
{
Expand All @@ -252,15 +254,25 @@ bool DataStoreService::StartService(bool create_db_if_missing)
{
shard_id_ = dss_shards.at(0);
auto open_mode = cluster_manager_.FetchDSShardStatus(shard_id_);
DLOG(INFO) << "StartService data store shard id:" << shard_id_
<< ", open_mode:" << static_cast<int>(open_mode)
<< ", create_db_if_missing:" << create_db_if_missing
<< ", dss_leader_node_id:" << dss_leader_node_id
<< ", dss_node_id:" << dss_node_id;
if (open_mode == DSShardStatus::ReadOnly ||
open_mode == DSShardStatus::ReadWrite)
{
auto expect_status = DSShardStatus::Closed;
if (shard_status_.compare_exchange_strong(expect_status,
DSShardStatus::Starting))
{
// start underling db if this dss node is the
// leader dss node
data_store_ = data_store_factory_->CreateDataStore(
create_db_if_missing, shard_id_, this, true);
create_db_if_missing,
shard_id_,
this,
dss_leader_node_id == dss_node_id);
if (data_store_ == nullptr)
{
LOG(ERROR) << "Failed to create data store on starting "
Expand All @@ -277,7 +289,7 @@ bool DataStoreService::StartService(bool create_db_if_missing)
}

DLOG(INFO) << "Created data store shard id:" << shard_id_
<< ", shard_status:" << shard_status_;
<< ", shard_status:" << static_cast<int>(shard_status_);
}

server_ = std::make_unique<brpc::Server>();
Expand Down Expand Up @@ -311,7 +323,7 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id,
{
return true;
}
assert(open_mode == DSShardStatus::ReadOnly);
// assert(open_mode == DSShardStatus::ReadOnly);

DSShardStatus expect_status = DSShardStatus::Closed;
if (!shard_status_.compare_exchange_strong(expect_status,
Expand All @@ -329,6 +341,11 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id,
return expect_status == open_mode;
}

DLOG(INFO) << "Connecting and starting data store for shard id:"
<< data_shard_id
<< ", open_mode:" << static_cast<int>(open_mode)
<< ", create_db_if_missing:" << create_db_if_missing
<< ", data_store_ is null:" << (data_store_ == nullptr);
assert(data_store_factory_ != nullptr);
if (data_store_ == nullptr)
{
Expand Down Expand Up @@ -358,10 +375,18 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id,
return false;
}
}

data_store_->SwitchToReadOnly();
cluster_manager_.SwitchShardToReadOnly(data_shard_id,
DSShardStatus::Closed);
if (open_mode == DSShardStatus::ReadOnly)
{
data_store_->SwitchToReadOnly();
cluster_manager_.SwitchShardToReadOnly(data_shard_id,
DSShardStatus::Closed);
}
else
{
assert(open_mode == DSShardStatus::ReadWrite);
cluster_manager_.SwitchShardToReadWrite(data_shard_id,
DSShardStatus::Closed);
}
Comment on lines +378 to +389
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

RW open path doesn’t switch engine to ReadWrite

When opening in READ_WRITE, we update cluster status but never call data_store_->SwitchToReadWrite(). Relying on implicit defaults is fragile and can leave the engine in RO after StartDB in some implementations.

Add an explicit switch before updating cluster state.

-    else
-    {
-        assert(open_mode == DSShardStatus::ReadWrite);
-        cluster_manager_.SwitchShardToReadWrite(data_shard_id,
-                                                DSShardStatus::Closed);
-    }
+    else
+    {
+        assert(open_mode == DSShardStatus::ReadWrite);
+        // Ensure engine mode matches cluster state before announcing RW.
+        data_store_->SwitchToReadWrite();
+        cluster_manager_.SwitchShardToReadWrite(data_shard_id,
+                                                DSShardStatus::Closed);
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (open_mode == DSShardStatus::ReadOnly)
{
data_store_->SwitchToReadOnly();
cluster_manager_.SwitchShardToReadOnly(data_shard_id,
DSShardStatus::Closed);
}
else
{
assert(open_mode == DSShardStatus::ReadWrite);
cluster_manager_.SwitchShardToReadWrite(data_shard_id,
DSShardStatus::Closed);
}
if (open_mode == DSShardStatus::ReadOnly)
{
data_store_->SwitchToReadOnly();
cluster_manager_.SwitchShardToReadOnly(data_shard_id,
DSShardStatus::Closed);
}
else
{
assert(open_mode == DSShardStatus::ReadWrite);
// Ensure engine mode matches cluster state before announcing RW.
data_store_->SwitchToReadWrite();
cluster_manager_.SwitchShardToReadWrite(data_shard_id,
DSShardStatus::Closed);
}
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 360 to 371, the
READ_WRITE branch updates cluster state but never calls
data_store_->SwitchToReadWrite(); add an explicit call to
data_store_->SwitchToReadWrite() in the else branch (before calling
cluster_manager_.SwitchShardToReadWrite) so the local engine is switched to
ReadWrite prior to updating cluster status; keep the existing assert and
ordering to ensure engine state is set then cluster state is updated.


expect_status = DSShardStatus::Starting;
shard_status_.compare_exchange_strong(
Expand Down Expand Up @@ -1786,6 +1811,48 @@ bool DataStoreService::FetchConfigFromPeer(
return true;
}

void DataStoreService::CloseDataStore(uint32_t shard_id)
{
assert(shard_id == shard_id_);
if (!IsOwnerOfShard(shard_id))
{
return;
}
if (shard_status_.load() == DSShardStatus::ReadWrite)
{
SwitchReadWriteToReadOnly(shard_id);
}

if (shard_status_.load() == DSShardStatus::ReadOnly)
{
SwitchReadOnlyToClosed(shard_id);
}
}

void DataStoreService::OpenDataStore(uint32_t shard_id)
{
// no-op if this DSS does not own any shard
if (shard_id_ == UINT32_MAX)
{
DLOG(INFO) << "OpenDataStore no-op for non-owner DSS"
<< ", shard " << shard_id
<< ", shard_id_: " << shard_id_;
return;
}

assert(shard_id == shard_id_);

DLOG(INFO) << "OpenDataStore for shard " << shard_id
<< ", current status: " << static_cast<int>(shard_status_.load());
if (shard_status_.load() != DSShardStatus::Closed)
{
return;
}
DSShardStatus open_mode = DSShardStatus::ReadWrite;
bool create_db_if_missing = false;
ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing);
}
Comment on lines +1832 to +1854
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

OpenDataStore ignores ConnectAndStartDataStore failure

Line 1853 calls ConnectAndStartDataStore but doesn't check or log the return value. If the operation fails (e.g., database initialization error), the shard remains Closed, but the caller receives no indication of failure.

Apply this diff to handle failures:

     DSShardStatus open_mode = DSShardStatus::ReadWrite;
     bool create_db_if_missing = false;
-    ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing);
+    bool res = ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing);
+    if (!res)
+    {
+        LOG(ERROR) << "OpenDataStore: failed to connect and start data store for shard " << shard_id;
+    }
 }
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 1832-1854,
OpenDataStore currently calls ConnectAndStartDataStore without checking its
result; modify it to capture the return value, verify success, and handle
failures: after calling ConnectAndStartDataStore(shard_id, open_mode,
create_db_if_missing) store its bool return, if false log an error with context
(shard id and any available error info), ensure shard_status_ remains Closed (or
explicitly set it back to Closed) and return/fail the caller (or propagate an
error/boolean) so callers can observe the failure; if true proceed as before.
Ensure logging is clear and avoid swallowing errors.


std::pair<remote::ShardMigrateError, std::string>
DataStoreService::NewMigrateTask(const std::string &event_id,
int data_shard_id,
Expand Down Expand Up @@ -2459,4 +2526,4 @@ void DataStoreService::CleanupOldMigrateLogs()
}
}

} // namespace EloqDS
} // namespace EloqDS
Loading