Convert txservice cluster config to dss_config#122
Conversation
WalkthroughAdds utilities to convert TX node-group configs into DSS cluster configs, extends DataStoreService lifecycle with OpenDataStore/CloseDataStore and mode-aware StartService, updates leader/follower handler signatures and wiring, defers RocksDB query worker-pool creation, introduces UNKNOWN_DSS_LEADER_NODE_ID, and tightens logging and namespace qualifier. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as DataStoreServiceClient
participant CM as DataStoreServiceClusterManager
participant DS as DataStoreService
rect rgb(248,250,255)
Note over Client,CM: Build DSS cluster config from TX configs
Client->>Client: TxConfigsToDssClusterConfig(dss_node_id, ng_id, ng_configs, dss_leader_node_id, cluster_manager)
Client->>CM: Init(local node host:port)
Client->>CM: AddShardMembers(ng_id, members...)
alt dss_leader_node_id specified
Client->>CM: SetPrimaryNode(leader_node)
end
end
rect rgb(240,255,245)
Note over Client,DS: Leadership lifecycle (high-level)
alt Become leader
Client->>DS: OnLeaderStart(term)
DS->>DS: OpenDataStore(shard=0)
DS->>DS: ConnectAndStartDataStore(ReadWrite)
else Start following
Client->>DS: OnStartFollowing(leader_node_id, term, standby_term, resubscribe)
DS->>CM: Update membership & SetPrimaryNode
DS->>DS: SetupConfig(...), bump shard version
DS->>DS: ConnectAndStartDataStore(ReadOnly) (if not owner)
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (4)
data_store_service_client.h (1)
121-128: Clarify single-shard mapping and inputs for TxConfigsToDssClusterConfigThis API should state that DSS currently supports a single shard (id 0) and map any TX ng_id to shard 0, or update the client to support multiple shards. Also document that leader_node_id may be UINT32_MAX.
Would you like me to add a brief comment block and precondition checks here?
data_store_service_client.cpp (2)
2932-2937: Guard OpenDataStore call with service stateOpenDataStore(0) will assert in its impl if shard_id_ isn’t yet initialized to 0. Add a precondition check or make OpenDataStore tolerate initial open.
Example:
- Ensure StartService/ConnectAndStartDataStore ran and set shard_id_ before calling.
- Or update OpenDataStore to set shard_id_ if unset.
2950-2954: CloseDataStore preconditionSame concern as open: ensure shard_id_ matches 0 before calling, or make CloseDataStore a no-op if uninitialized to avoid assertions.
eloq_data_store_service/data_store_service.h (1)
610-612: Define safe semantics for OpenDataStore/CloseDataStoreThe impl asserts shard_id == shard_id_; that can trip before shard_id_ is initialized. Document preconditions or make these APIs idempotent and tolerant of uninitialized state (e.g., set shard_id_ on first open, no-op close if not owner).
Do you want a patch to relax the assert and guard transitions around shard_status_ and ongoing_write_requests_?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
data_store_service_client.cpp(3 hunks)data_store_service_client.h(1 hunks)eloq_data_store_service/data_store_service.cpp(3 hunks)eloq_data_store_service/data_store_service.h(1 hunks)eloq_data_store_service/data_store_service_config.cpp(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
data_store_service_client.h (1)
data_store_service_client.cpp (2)
TxConfigsToDssClusterConfig(136-188)TxConfigsToDssClusterConfig(136-142)
eloq_data_store_service/data_store_service.cpp (3)
eloq_data_store_service/data_store_service_util.h (1)
shard_id_(118-118)eloq_data_store_service/data_store_service.h (1)
IsOwnerOfShard(603-608)eloq_data_store_service/data_store_service_config.cpp (4)
IsOwnerOfShard(285-293)IsOwnerOfShard(285-285)IsOwnerOfShard(690-701)IsOwnerOfShard(690-691)
data_store_service_client.cpp (1)
data_store_service_client.h (1)
TxPort2DssPort(117-120)
eloq_data_store_service/data_store_service.h (1)
eloq_data_store_service/data_store_service.cpp (4)
CloseDataStore(1794-1809)CloseDataStore(1794-1794)OpenDataStore(1811-1821)OpenDataStore(1811-1811)
🔇 Additional comments (2)
eloq_data_store_service/data_store_service.cpp (2)
313-314: Assert removal is appropriateGiven RW is now supported, removing the old invariant on READ_ONLY is correct. No action needed.
1811-1822: Verification clarifies the review concern but reveals misdirection on leadership gatingThe only caller of
OpenDataStore()is atdata_store_service_client.cpp:2935withinOnLeaderStart(), which is already gated by leadership—the method is "called when the node becomes a leader." However, the review's suggestion to addcluster_manager_.IsOwnerOfShard()is valid for a different reason: it checks cluster topology membership (whether this node is primary), which is semantically distinct from the localIsOwnerOfShard()check that examines shard status. The methodCloseDataStore()at line 1797 already uses this pattern.The secondary suggestions (memory_order_acquire, handling ConnectAndStartDataStore return value) remain valid for defensive programming and error visibility.
| if (filename.empty()) | ||
| { | ||
| return true; | ||
| } | ||
| return SaveInternal(filename); |
There was a problem hiding this comment.
Avoid silent success on empty filename in Save()
Returning true hides misconfiguration. Emit a warning or return false so callers can react.
Apply one of:
bool DataStoreServiceClusterManager::Save(const std::string &filename) const
{
std::shared_lock<std::shared_mutex> lk(mutex_);
- if (filename.empty())
- {
- return true;
- }
+ if (filename.empty())
+ {
+ LOG(WARNING) << "Save skipped: empty filename";
+ return false;
+ }
return SaveInternal(filename);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (filename.empty()) | |
| { | |
| return true; | |
| } | |
| return SaveInternal(filename); | |
| if (filename.empty()) | |
| { | |
| LOG(WARNING) << "Save skipped: empty filename"; | |
| return false; | |
| } | |
| return SaveInternal(filename); |
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service_config.cpp around lines 439-443
the Save() function currently returns true when filename.empty(), which hides
misconfiguration; change this to emit a warning via the project's logging
facility (or std::cerr if no logger exists) and return false instead so callers
can detect the error, or at minimum log a warning and return false consistently;
ensure the log message clearly references Save() and the empty filename so it’s
actionable.
| if (open_mode == DSShardStatus::ReadOnly) | ||
| { | ||
| data_store_->SwitchToReadOnly(); | ||
| cluster_manager_.SwitchShardToReadOnly(data_shard_id, | ||
| DSShardStatus::Closed); | ||
| } | ||
| else | ||
| { | ||
| assert(open_mode == DSShardStatus::ReadWrite); | ||
| cluster_manager_.SwitchShardToReadWrite(data_shard_id, | ||
| DSShardStatus::Closed); | ||
| } |
There was a problem hiding this comment.
RW open path doesn’t switch engine to ReadWrite
When opening in READ_WRITE, we update cluster status but never call data_store_->SwitchToReadWrite(). Relying on implicit defaults is fragile and can leave the engine in RO after StartDB in some implementations.
Add an explicit switch before updating cluster state.
- else
- {
- assert(open_mode == DSShardStatus::ReadWrite);
- cluster_manager_.SwitchShardToReadWrite(data_shard_id,
- DSShardStatus::Closed);
- }
+ else
+ {
+ assert(open_mode == DSShardStatus::ReadWrite);
+ // Ensure engine mode matches cluster state before announcing RW.
+ data_store_->SwitchToReadWrite();
+ cluster_manager_.SwitchShardToReadWrite(data_shard_id,
+ DSShardStatus::Closed);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (open_mode == DSShardStatus::ReadOnly) | |
| { | |
| data_store_->SwitchToReadOnly(); | |
| cluster_manager_.SwitchShardToReadOnly(data_shard_id, | |
| DSShardStatus::Closed); | |
| } | |
| else | |
| { | |
| assert(open_mode == DSShardStatus::ReadWrite); | |
| cluster_manager_.SwitchShardToReadWrite(data_shard_id, | |
| DSShardStatus::Closed); | |
| } | |
| if (open_mode == DSShardStatus::ReadOnly) | |
| { | |
| data_store_->SwitchToReadOnly(); | |
| cluster_manager_.SwitchShardToReadOnly(data_shard_id, | |
| DSShardStatus::Closed); | |
| } | |
| else | |
| { | |
| assert(open_mode == DSShardStatus::ReadWrite); | |
| // Ensure engine mode matches cluster state before announcing RW. | |
| data_store_->SwitchToReadWrite(); | |
| cluster_manager_.SwitchShardToReadWrite(data_shard_id, | |
| DSShardStatus::Closed); | |
| } |
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 360 to 371, the
READ_WRITE branch updates cluster state but never calls
data_store_->SwitchToReadWrite(); add an explicit call to
data_store_->SwitchToReadWrite() in the else branch (before calling
cluster_manager_.SwitchShardToReadWrite) so the local engine is switched to
ReadWrite prior to updating cluster status; keep the existing assert and
ordering to ensure engine state is set then cluster state is updated.
| } | ||
|
|
||
| } // namespace EloqDS No newline at end of file | ||
| } // namespace EloqD No newline at end of file |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
| bool DataStoreServiceClusterManager::Save(const std::string &filename) const | ||
| { | ||
| std::shared_lock<std::shared_mutex> lk(mutex_); | ||
| if (filename.empty()) |
There was a problem hiding this comment.
when is this func called with filename empty?
43a2aa6 to
aee83c3
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
data_store_service_client.cpp(6 hunks)data_store_service_client.h(2 hunks)eloq_data_store_service/data_store_service.cpp(7 hunks)eloq_data_store_service/data_store_service.h(4 hunks)eloq_data_store_service/data_store_service_config.cpp(3 hunks)eloq_data_store_service/data_store_service_config.h(1 hunks)eloq_data_store_service/main.cpp(3 hunks)eloq_data_store_service/rocksdb_data_store_common.cpp(1 hunks)eloq_data_store_service/rocksdb_data_store_common.h(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- eloq_data_store_service/data_store_service_config.h
- eloq_data_store_service/data_store_service_config.cpp
- eloq_data_store_service/rocksdb_data_store_common.cpp
- eloq_data_store_service/data_store_service.cpp
🧰 Additional context used
🧬 Code graph analysis (3)
eloq_data_store_service/data_store_service.h (2)
eloq_data_store_service/data_store_service.cpp (10)
StartService(240-316)StartService(240-242)CreateSnapshotForBackup(1164-1208)CreateSnapshotForBackup(1164-1168)CreateSnapshotForBackup(1210-1255)CreateSnapshotForBackup(1210-1216)CloseDataStore(1814-1830)CloseDataStore(1814-1814)OpenDataStore(1832-1844)OpenDataStore(1832-1832)eloq_data_store_service/data_store_service_config.h (1)
DataStoreServiceClusterManager(317-513)
data_store_service_client.cpp (2)
data_store_service_client.h (2)
TxPort2DssPort(129-132)Connect(150-155)rocksdb_handler.cpp (4)
Connect(139-257)Connect(139-139)OnStartFollowing(2156-2177)OnStartFollowing(2156-2156)
data_store_service_client.h (1)
data_store_service_client.cpp (6)
TxConfigsToDssClusterConfig(150-209)TxConfigsToDssClusterConfig(150-156)OnLeaderStop(2957-2969)OnLeaderStop(2957-2957)OnStartFollowing(2978-3025)OnStartFollowing(2978-2981)
🔇 Additional comments (1)
eloq_data_store_service/rocksdb_data_store_common.h (1)
153-154: LGTM: Constructor correctly captures worker count.The initialization of
query_worker_number_from the config is correct and supports the deferred ThreadWorkerPool creation pattern.
| DLOG(INFO) | ||
| << "DataStoreServiceClient OnStartFollowing called data_store_service_:" | ||
| << data_store_service_; | ||
| if (data_store_service_ != nullptr) | ||
| { | ||
| // Now, only support one shard. | ||
| data_store_service_->CloseDataStore(0); | ||
| } | ||
|
|
||
| // Treat leader_node_id as dss_leader_node_id | ||
| uint32_t dss_leader_node_id = leader_node_id; | ||
| uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup(); | ||
|
|
||
| // Update leader node in cluster_manager if necessary | ||
| auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs(); | ||
| auto ng_config_it = ng_configs.find(dss_shard_id); | ||
| assert(ng_config_it != ng_configs.end()); | ||
| auto ng_member_configs = ng_config_it->second; | ||
| const txservice::NodeConfig *dss_leader_node_config = nullptr; | ||
| for (const auto &node_config : ng_member_configs) | ||
| { | ||
| if (node_config.node_id_ == dss_leader_node_id) | ||
| { | ||
| dss_leader_node_config = &node_config; | ||
| break; | ||
| } | ||
| } | ||
| assert(dss_leader_node_config != nullptr); | ||
| DSSNode dss_leader_node(dss_leader_node_config->host_name_, | ||
| TxPort2DssPort(dss_leader_node_config->port_)); | ||
| auto &cluster_manager = data_store_service_->GetClusterManager(); | ||
| cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node); | ||
| DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id | ||
| << ", DSSNode: " << dss_leader_node.host_name_ << ":" | ||
| << dss_leader_node.port_; | ||
| // Pump the dss shard version | ||
| cluster_manager.UpdateDSShardVersion( | ||
| dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1); | ||
| // Update the client config | ||
| SetupConfig(cluster_manager); | ||
|
|
||
| Connect(); | ||
| } |
There was a problem hiding this comment.
Don’t dereference data_store_service_ when it’s null
data_store_service_ is optional; on remote clients it stays nullptr. After the early if, the code still calls data_store_service_->GetClusterManager() which dereferences null and crashes every follower transition. Keep all of the cluster-manager work inside the data_store_service_ != nullptr branch (or bail out early) so we only touch the colocated service when it actually exists.
Apply this diff:
- if (data_store_service_ != nullptr)
- {
- // Now, only support one shard.
- data_store_service_->CloseDataStore(0);
- }
-
- // Treat leader_node_id as dss_leader_node_id
- uint32_t dss_leader_node_id = leader_node_id;
- uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup();
-
- // Update leader node in cluster_manager if necessary
- auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs();
- auto ng_config_it = ng_configs.find(dss_shard_id);
- assert(ng_config_it != ng_configs.end());
- auto ng_member_configs = ng_config_it->second;
- const txservice::NodeConfig *dss_leader_node_config = nullptr;
- for (const auto &node_config : ng_member_configs)
- {
- if (node_config.node_id_ == dss_leader_node_id)
- {
- dss_leader_node_config = &node_config;
- break;
- }
- }
- assert(dss_leader_node_config != nullptr);
- DSSNode dss_leader_node(dss_leader_node_config->host_name_,
- TxPort2DssPort(dss_leader_node_config->port_));
- auto &cluster_manager = data_store_service_->GetClusterManager();
- cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node);
- DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id
- << ", DSSNode: " << dss_leader_node.host_name_ << ":"
- << dss_leader_node.port_;
- // Pump the dss shard version
- cluster_manager.UpdateDSShardVersion(
- dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1);
- // Update the client config
- SetupConfig(cluster_manager);
-
- Connect();
+ if (data_store_service_ != nullptr)
+ {
+ // Now, only support one shard.
+ data_store_service_->CloseDataStore(0);
+
+ // Treat leader_node_id as dss_leader_node_id
+ uint32_t dss_leader_node_id = leader_node_id;
+ uint32_t dss_shard_id =
+ txservice::Sharder::Instance().NativeNodeGroup();
+
+ // Update leader node in cluster_manager if necessary
+ auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs();
+ auto ng_config_it = ng_configs.find(dss_shard_id);
+ assert(ng_config_it != ng_configs.end());
+ auto ng_member_configs = ng_config_it->second;
+ const txservice::NodeConfig *dss_leader_node_config = nullptr;
+ for (const auto &node_config : ng_member_configs)
+ {
+ if (node_config.node_id_ == dss_leader_node_id)
+ {
+ dss_leader_node_config = &node_config;
+ break;
+ }
+ }
+ assert(dss_leader_node_config != nullptr);
+ DSSNode dss_leader_node(dss_leader_node_config->host_name_,
+ TxPort2DssPort(dss_leader_node_config->port_));
+ auto &cluster_manager = data_store_service_->GetClusterManager();
+ cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node);
+ DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id
+ << ", DSSNode: " << dss_leader_node.host_name_ << ":"
+ << dss_leader_node.port_;
+ // Pump the dss shard version
+ cluster_manager.UpdateDSShardVersion(
+ dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1);
+ // Update the client config
+ SetupConfig(cluster_manager);
+ }
+
+ Connect();🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2983-3025, the code dereferences
data_store_service_ after an early null check (calling
data_store_service_->GetClusterManager()), which will crash when
data_store_service_ is nullptr on remote clients; either move all
cluster-manager-related work (finding dss_leader_node_config, constructing
DSSNode, calling GetClusterManager(), UpdatePrimaryNode, UpdateDSShardVersion,
and SetupConfig) inside the existing if (data_store_service_ != nullptr) block,
or bail out early (return) right after CloseDataStore(0) when
data_store_service_ is nullptr so no member access occurs; ensure Connect() is
only called in the appropriate code path depending on whether it relies on
data_store_service_.
| static uint16_t TxPort2DssPort(uint16_t tx_port) | ||
| { | ||
| return tx_port + 7; | ||
| } |
There was a problem hiding this comment.
Guard against Tx→DSS port overflow
tx_port + 7 on a uint16_t silently wraps once tx_port > 65528, so callers end up dialing the wrong TCP port (e.g., 65535 becomes 6). That breaks reconnects for perfectly valid configurations. Please promote to a wider type and assert/guard before casting back so we never wrap.
Apply this diff:
- static uint16_t TxPort2DssPort(uint16_t tx_port)
- {
- return tx_port + 7;
- }
+ static uint16_t TxPort2DssPort(uint16_t tx_port)
+ {
+ constexpr uint16_t kDelta = 7;
+ assert(tx_port <= static_cast<uint16_t>(UINT16_MAX - kDelta));
+ return static_cast<uint16_t>(tx_port + kDelta);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| static uint16_t TxPort2DssPort(uint16_t tx_port) | |
| { | |
| return tx_port + 7; | |
| } | |
| static uint16_t TxPort2DssPort(uint16_t tx_port) | |
| { | |
| constexpr uint16_t kDelta = 7; | |
| assert(tx_port <= static_cast<uint16_t>(UINT16_MAX - kDelta)); | |
| return static_cast<uint16_t>(tx_port + kDelta); | |
| } |
🤖 Prompt for AI Agents
In data_store_service_client.h around lines 129 to 132, the function
TxPort2DssPort adds 7 to a uint16_t which can overflow and wrap for tx_port >
65528; change the implementation to perform the addition using a wider integer
type (e.g., uint32_t or unsigned), check/guard that tx_port + 7 <=
std::numeric_limits<uint16_t>::max() (or assert/throw if out of range), and only
then cast the result back to uint16_t when returning so wrapping cannot occur.
| // setup local data store service | ||
| bool ret = | ||
| data_store_service_->StartService(FLAGS_bootstrap || is_single_node); | ||
| bool ret = data_store_service_->StartService( | ||
| FLAGS_bootstrap || is_single_node, 0, 0); | ||
| if (!ret) |
There was a problem hiding this comment.
Pass real leader/node IDs into StartService
With the new signature we now key leadership off dss_leader_node_id == dss_node_id. This call site always feeds 0, 0, so every process thinks it is the leader and opens the DB in read-write mode even when the config says otherwise. Please plumb the actual node id for this process and the current leader id from ds_config (or teach StartService to derive it from cluster_manager_) before invoking it.
Need help wiring the IDs? I can sketch the change once you point me at the helpers you want to use.
🤖 Prompt for AI Agents
In eloq_data_store_service/main.cpp around lines 320 to 323, the call to
data_store_service_->StartService currently passes literal 0, 0 which causes
every process to think it's the leader; instead obtain the real local node id
and current leader id and pass them. Retrieve the local node id from your config
or cluster manager (e.g. ds_config.node_id() or
cluster_manager_->GetLocalNodeId()) and the leader id from ds_config or
cluster_manager (e.g. ds_config.leader_node_id() or
cluster_manager_->GetLeaderNodeId()), handle the single-node/bootstrap override
(FLAGS_bootstrap || is_single_node) if still desired, and call
StartService(FLAGS_bootstrap || is_single_node, local_node_id, leader_node_id)
so the service opens the DB with correct leadership information.
| bool tx_enable_cache_replacement_{true}; | ||
| std::unique_ptr<EloqDS::TTLCompactionFilter> ttl_compaction_filter_{ | ||
| nullptr}; | ||
| size_t query_worker_number_{4}; |
There was a problem hiding this comment.
🧩 Analysis chain
Verify thread-safe lazy initialization of query_worker_pool_.
The new query_worker_number_ member stores the worker count for deferred pool creation. Ensure that the lazy initialization of query_worker_pool_ in Initialize() is thread-safe, especially given the presence of db_mux_ for synchronization.
Run the following script to verify the initialization pattern and usage:
🏁 Script executed:
#!/bin/bash
# Description: Verify lazy initialization pattern for query_worker_pool_
# Check Initialize() implementation for thread-safe lazy initialization
echo "=== Initialize() implementation ==="
ast-grep --pattern 'bool RocksDBDataStoreCommon::Initialize() {
$$$
}'
# Find all accesses to query_worker_pool_ to ensure proper initialization checks
echo -e "\n=== All accesses to query_worker_pool_ ==="
rg -nP --type=cpp -C3 'query_worker_pool_\s*[->=]'
# Check if there are any accesses before Initialize() is called
echo -e "\n=== Constructor usages (should not access query_worker_pool_) ==="
ast-grep --pattern 'RocksDBDataStoreCommon::RocksDBDataStoreCommon($$$) {
$$$
query_worker_pool_
$$$
}'Length of output: 12158
---
🏁 Script executed:
```shell
#!/bin/bash
# Get full context around the lazy initialization code
echo "=== Full context around lines 274-278 in rocksdb_data_store_common.cpp ==="
sed -n '250,290p' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Check for db_mux_ usage in Initialize() or around lazy init ==="
rg -nP --type=cpp -B5 -A10 'query_worker_pool_.*nullptr' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Search for Initialize() implementation in rocksdb_data_store_common.cpp ==="
rg -nP --type=cpp -A30 'void.*Initialize\(\)' eloq_data_store_service/rocksdb_data_store_common.cpp | head -80
Length of output: 509
---
🏁 Script executed:
```shell
#!/bin/bash
# Get full context around the lazy initialization code
echo "=== Full context around lines 274-278 in rocksdb_data_store_common.cpp ==="
sed -n '250,290p' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Check for db_mux_ usage in Initialize() or around lazy init ==="
rg -nP --type=cpp -B5 -A10 'query_worker_pool_.*nullptr' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Search for Initialize() implementation in rocksdb_data_store_common.cpp ==="
rg -nP --type=cpp -B2 -A40 'void.*Initialize\(\)' eloq_data_store_service/rocksdb_data_store_common.cpp | head -100
Length of output: 1990
---
🏁 Script executed:
```shell
#!/bin/bash
# Find the method signature containing the lazy initialization
echo "=== Method signature containing lazy initialization ==="
sed -n '200,280p' eloq_data_store_service/rocksdb_data_store_common.cpp | grep -E '^\s*(bool|void|[a-zA-Z_:]+)\s+\w+\s*\(.*\)\s*$' -A 80
echo -e "\n=== Broader context: lines 200-280 to see full method ==="
sed -n '200,282p' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Check if db_mux_ is used in this section ==="
sed -n '200,290p' eloq_data_store_service/rocksdb_data_store_common.cpp | grep -i 'db_mux_\|lock\|mutex'
Length of output: 2889
---
🏁 Script executed:
```shell
#!/bin/bash
# Find method signature by searching backwards
echo "=== Search for method containing initialization (lines 100-180) ==="
sed -n '100,180p' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Search for all 'bool' method definitions ==="
rg -nP --type=cpp '^\s*bool\s+\w+::\w+\(' eloq_data_store_service/rocksdb_data_store_common.cpp
echo -e "\n=== Look for methods that initialize query_worker_pool_ ==="
rg -nP --type=cpp -B10 'if \(query_worker_pool_.*nullptr\)' eloq_data_store_service/rocksdb_data_store_common.cpp | head -50
Length of output: 3800
---
🏁 Script executed:
```shell
#!/bin/bash
# Find all calls to Initialize()
echo "=== Find all calls to RocksDBDataStoreCommon::Initialize() ==="
rg -nP --type=cpp 'Initialize\(\)' --glob='*.cpp' --glob='*.h' | grep -v '^\s*//'
echo -e "\n=== Check if Initialize() is virtual or protected by mutex elsewhere ==="
rg -nP --type=cpp -B3 -A3 'Initialize\(\)' eloq_data_store_service/rocksdb_data_store_common.h
echo -e "\n=== Check caller patterns in rocksdb_data_store.cpp and rocksdb_cloud_data_store.cpp ==="
rg -nP --type=cpp -B5 -A5 '->Initialize\(\)|\.Initialize\(\)' eloq_data_store_service/ | head -60
Length of output: 1588
---
🏁 Script executed:
```shell
#!/bin/bash
# Check Initialize() call contexts
echo "=== Context around Initialize() call in data_store_service.cpp ==="
sed -n '355,375p' eloq_data_store_service/data_store_service.cpp
echo -e "\n=== Context in rocksdb_data_store_factory.h ==="
sed -n '50,70p' eloq_data_store_service/rocksdb_data_store_factory.h
echo -e "\n=== Check if query_worker_pool_ accesses check for nullptr anywhere ==="
rg -nP --type=cpp 'if.*query_worker_pool_' eloq_data_store_service/
echo -e "\n=== All code paths accessing query_worker_pool_ to verify they assume it's initialized ==="
rg -nP --type=cpp -A2 'query_worker_pool_->SubmitWork|query_worker_pool_->Shutdown|query_worker_pool_->BulkSubmitWork' eloq_data_store_service/ | head -30
Length of output: 3790
Fix race condition in query_worker_pool_ lazy initialization and add defensive nullptr checks.
The lazy initialization at lines 274-278 is unprotected and violates thread-safe initialization patterns. While Initialize() is called once during setup, the code lacks defensive measures:
-
Unprotected initialization (lines 274-278): The nullptr check and assignment are not protected by
db_mux_, creating a classic Double-Checked Locking anti-pattern. If two threads call Initialize() concurrently, both may see nullptr and create separate pools, causing resource leaks. -
No defensive nullptr checks: All operation methods (FlushData, Read, Write, DeleteRange, etc.) call
query_worker_pool_->SubmitWork(...)without checking if the pool exists. If Initialize() is skipped or Shutdown() is called prematurely, crashes will occur.
Required fixes:
- Protect the initialization with
std::unique_lock<std::shared_mutex> lk(db_mux_)at line 274 - Add assertion or early return checks in operation methods before accessing
query_worker_pool_to catch missing Initialize() calls - Document the contract that Initialize() must be called before operations and after Shutdown() the object cannot be reused
🤖 Prompt for AI Agents
In eloq_data_store_service/rocksdb_data_store_common.h around line 274 (and
referencing the declaration at 323), the lazy initialization of
query_worker_pool_ uses an unprotected nullptr check (double-checked locking)
and operation methods call query_worker_pool_->SubmitWork(...) without verifying
the pool exists; to fix, wrap the initialization block with
std::unique_lock<std::shared_mutex> lk(db_mux_) so only one thread creates the
pool, remove the unprotected DCL pattern, add defensive checks
(assert(query_worker_pool_) or early-return/error) at the start of all methods
that use query_worker_pool_ (FlushData, Read, Write, DeleteRange, etc.) to avoid
dereferencing a null pointer, and update the header comments to state the
Initialize() must be called before any operations and that after Shutdown() the
object is not reusable.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
eloq_data_store_service/data_store_service.cpp (2)
378-389: Missing explicit SwitchToReadWrite() call in ReadWrite branchThe ReadOnly branch (lines 378-383) correctly calls
data_store_->SwitchToReadOnly()before updating cluster state, but the ReadWrite branch (lines 384-389) only updates cluster state without callingdata_store_->SwitchToReadWrite(). This creates an inconsistency where the local engine mode may not match the cluster-announced mode.Apply this diff to add the explicit switch:
else { assert(open_mode == DSShardStatus::ReadWrite); + data_store_->SwitchToReadWrite(); cluster_manager_.SwitchShardToReadWrite(data_shard_id, DSShardStatus::Closed); }
1814-1830: CloseDataStore implementation doesn't align with previously addressed reviewThe past review comment (marked as addressed in commits 9e63cf3 to 34b41ab) recommended using
cluster_manager_.IsOwnerOfShardto verify primary ownership, using acquire memory order, and checking return values of Switch operations. However, the current implementation:
- Still uses the local
IsOwnerOfShard(shard_id)(line 1817) which only checks if the shard is not Closed and IDs match—a read-only follower would pass this check- Uses
shard_status_.load()without explicit memory order (lines 1821, 1826)- Doesn't check return values of
SwitchReadWriteToReadOnlyandSwitchReadOnlyToClosed(lines 1823, 1828)Apply this diff to address the concerns:
void DataStoreService::CloseDataStore(uint32_t shard_id) { assert(shard_id == shard_id_); - if (!IsOwnerOfShard(shard_id)) + uint64_t shard_ver{}; + if (!cluster_manager_.IsOwnerOfShard(shard_id, &shard_ver)) { + DLOG(INFO) << "CloseDataStore skipped; node is not primary for shard " << shard_id; return; } - if (shard_status_.load() == DSShardStatus::ReadWrite) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::ReadWrite) { - SwitchReadWriteToReadOnly(shard_id); + if (!SwitchReadWriteToReadOnly(shard_id)) { + LOG(WARNING) << "CloseDataStore: RW->RO switch failed for shard " << shard_id; + return; + } } - if (shard_status_.load() == DSShardStatus::ReadOnly) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::ReadOnly) { - SwitchReadOnlyToClosed(shard_id); + if (!SwitchReadOnlyToClosed(shard_id)) { + LOG(WARNING) << "CloseDataStore: RO->Closed switch failed for shard " << shard_id; + } } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
eloq_data_store_service/data_store_service.cpp(8 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
eloq_data_store_service/data_store_service.cpp (2)
eloq_data_store_service/data_store_service_config.cpp (4)
IsOwnerOfShard(285-293)IsOwnerOfShard(285-285)IsOwnerOfShard(690-701)IsOwnerOfShard(690-691)eloq_data_store_service/data_store_service.h (1)
IsOwnerOfShard(606-614)
| void DataStoreService::OpenDataStore(uint32_t shard_id) | ||
| { | ||
| // no-op if this DSS does not own any shard | ||
| if (shard_id_ == UINT32_MAX) | ||
| { | ||
| DLOG(INFO) << "OpenDataStore no-op for non-owner DSS" | ||
| << ", shard " << shard_id | ||
| << ", shard_id_: " << shard_id_; | ||
| return; | ||
| } | ||
|
|
||
| assert(shard_id == shard_id_); | ||
|
|
||
| DLOG(INFO) << "OpenDataStore for shard " << shard_id | ||
| << ", current status: " << static_cast<int>(shard_status_.load()); | ||
| if (shard_status_.load() != DSShardStatus::Closed) | ||
| { | ||
| return; | ||
| } | ||
| DSShardStatus open_mode = DSShardStatus::ReadWrite; | ||
| bool create_db_if_missing = false; | ||
| ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing); | ||
| } |
There was a problem hiding this comment.
OpenDataStore ignores ConnectAndStartDataStore failure
Line 1853 calls ConnectAndStartDataStore but doesn't check or log the return value. If the operation fails (e.g., database initialization error), the shard remains Closed, but the caller receives no indication of failure.
Apply this diff to handle failures:
DSShardStatus open_mode = DSShardStatus::ReadWrite;
bool create_db_if_missing = false;
- ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing);
+ bool res = ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing);
+ if (!res)
+ {
+ LOG(ERROR) << "OpenDataStore: failed to connect and start data store for shard " << shard_id;
+ }
}🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.cpp around lines 1832-1854,
OpenDataStore currently calls ConnectAndStartDataStore without checking its
result; modify it to capture the return value, verify success, and handle
failures: after calling ConnectAndStartDataStore(shard_id, open_mode,
create_db_if_missing) store its bool return, if false log an error with context
(shard id and any available error info), ensure shard_status_ remains Closed (or
explicitly set it back to Closed) and return/fail the caller (or propagate an
error/boolean) so callers can observe the failure; if true proceed as before.
Ensure logging is clear and avoid swallowing errors.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
rocksdb_handler.h (1)
533-536: LGTM! Consider adding parameter documentation.The updated signature correctly extends
OnStartFollowingto receive leader context, term information, and resubscribe flag, enabling proper follower lifecycle handling as outlined in the PR objectives.Optionally add a brief comment documenting the parameters:
+ /** + * @brief Called when this node starts following a leader + * @param leader_node_id ID of the leader node + * @param term Current term + * @param standby_term Standby term + * @param resubscribe Whether to resubscribe to updates + */ void OnStartFollowing(uint32_t leader_node_id, int64_t term, int64_t standby_term, bool resubscribe) override;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocksdb_handler.cpp(3 hunks)rocksdb_handler.h(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rocksdb_handler.h (2)
rocksdb_handler.cpp (2)
OnStartFollowing(2155-2179)OnStartFollowing(2155-2158)data_store_service_client.cpp (2)
OnStartFollowing(2978-3025)OnStartFollowing(2978-2981)
rocksdb_handler.cpp (1)
data_store_service_client.cpp (2)
OnStartFollowing(2978-3025)OnStartFollowing(2978-2981)
🔇 Additional comments (1)
rocksdb_handler.cpp (1)
2155-2179: Review manual code verification of parameter usage inOnStartFollowing.The method signature in
rocksdb_handler.cppincludes parametersleader_node_id,term,standby_term, andresubscribe, which are not used in the implementation. This is intentional for a base class providing simplified behavior. IfRocksDBHandleris designed as a base class and these parameters are required in derived classes, marking them as intentionally unused would improve code clarity. However, the implementation's purpose needs verification.Using the class hierarchy
RocksDBHandler->txservice::store::DataStoreHandleridentified earlier, locate theDataStoreHandlerbase class definition and check ifOnStartFollowingis declared as virtual. If its parameters match, they are part of the interface contract. Investigate if other derived classes override this method and how they utilize these parameters. If the base class doesn't declare these parameters, consider removing them fromRocksDBHandlerto avoid confusion.
ccd48e3 to
e8d2088
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (7)
data_store_service_client.h (1)
129-133: Guard Tx→DSS port addition against overflowAdding 7 on uint16_t wraps for tx_port > 65528; callers will dial wrong ports.
Apply:
- static uint16_t TxPort2DssPort(uint16_t tx_port) - { - return tx_port + 7; - } + static uint16_t TxPort2DssPort(uint16_t tx_port) + { + constexpr uint16_t kDelta = 7; + assert(tx_port <= static_cast<uint16_t>(UINT16_MAX - kDelta)); + return static_cast<uint16_t>(tx_port + kDelta); + }data_store_service_client.cpp (2)
2978-3025: Avoid null deref when following: keep cluster-manager work inside the guardIf data_store_service_ is nullptr (remote client), dereferencing it crashes. Move all cluster-manager updates inside the existing if (data_store_service_ != nullptr) block.
Apply:
- if (data_store_service_ != nullptr) - { - // Now, only support one shard. - data_store_service_->CloseDataStore(0); - } - - // Treat leader_node_id as dss_leader_node_id - uint32_t dss_leader_node_id = leader_node_id; - uint32_t dss_shard_id = txservice::Sharder::Instance().NativeNodeGroup(); - - // Update leader node in cluster_manager if necessary - auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs(); - auto ng_config_it = ng_configs.find(dss_shard_id); - assert(ng_config_it != ng_configs.end()); - auto ng_member_configs = ng_config_it->second; - const txservice::NodeConfig *dss_leader_node_config = nullptr; - for (const auto &node_config : ng_member_configs) - { - if (node_config.node_id_ == dss_leader_node_id) - { - dss_leader_node_config = &node_config; - break; - } - } - assert(dss_leader_node_config != nullptr); - DSSNode dss_leader_node(dss_leader_node_config->host_name_, - TxPort2DssPort(dss_leader_node_config->port_)); - auto &cluster_manager = data_store_service_->GetClusterManager(); - cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node); - DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id - << ", DSSNode: " << dss_leader_node.host_name_ << ":" - << dss_leader_node.port_; - // Pump the dss shard version - cluster_manager.UpdateDSShardVersion( - dss_shard_id, cluster_manager.FetchDSShardVersion(dss_shard_id) + 1); - // Update the client config - SetupConfig(cluster_manager); - - Connect(); + if (data_store_service_ != nullptr) + { + // Now, only support one shard. + data_store_service_->CloseDataStore(0); + + // Treat leader_node_id as dss_leader_node_id + uint32_t dss_leader_node_id = leader_node_id; + uint32_t dss_shard_id = + txservice::Sharder::Instance().NativeNodeGroup(); + + // Update leader node in cluster_manager if necessary + auto ng_configs = txservice::Sharder::Instance().GetNodeGroupConfigs(); + auto ng_config_it = ng_configs.find(dss_shard_id); + assert(ng_config_it != ng_configs.end()); + auto ng_member_configs = ng_config_it->second; + const txservice::NodeConfig *dss_leader_node_config = nullptr; + for (const auto &node_config : ng_member_configs) + { + if (node_config.node_id_ == dss_leader_node_id) + { + dss_leader_node_config = &node_config; + break; + } + } + assert(dss_leader_node_config != nullptr); + DSSNode dss_leader_node(dss_leader_node_config->host_name_, + TxPort2DssPort(dss_leader_node_config->port_)); + auto &cluster_manager = data_store_service_->GetClusterManager(); + cluster_manager.UpdatePrimaryNode(dss_shard_id, dss_leader_node); + DLOG(INFO) << "UpdatePrimaryNode, dss_shard_id: " << dss_shard_id + << ", DSSNode: " << dss_leader_node.host_name_ << ":" + << dss_leader_node.port_; + // Pump the dss shard version + cluster_manager.UpdateDSShardVersion( + dss_shard_id, + cluster_manager.FetchDSShardVersion(dss_shard_id) + 1); + // Update the client config + SetupConfig(cluster_manager); + } + + Connect();
150-209: Fix shard-id mapping, status transition, and unused var
- Single-shard client expects shard_id = 0; using ng_id risks OOB later.
- SwitchShardToClosed expects ReadOnly as prior state; passing ReadWrite asserts.
- Remove unused shard_nodes.
Apply:
- std::vector<DSSNode> shard_nodes; + // DSS currently supports a single data shard (id 0). + constexpr uint32_t kDssShardId = 0; @@ - cluster_manager.AddShardMember(ng_id, dss_node); + cluster_manager.AddShardMember(kDssShardId, dss_node); @@ - if (dss_leader_node_id != dss_node_id) + if (dss_leader_node_id != dss_node_id) { - LOG(INFO) << "cluster_manager change shard status " << ng_id << " from " + LOG(INFO) << "cluster_manager change shard status " << kDssShardId + << " from " << static_cast<int>( - cluster_manager.FetchDSShardStatus(ng_id)); - cluster_manager.SwitchShardToClosed(ng_id, DSShardStatus::ReadWrite); - LOG(INFO) << "cluster_manager change shard status " << ng_id << " to " + cluster_manager.FetchDSShardStatus(kDssShardId)); + // Transition ReadWrite -> ReadOnly -> Closed + cluster_manager.SwitchShardToReadOnly(kDssShardId, + DSShardStatus::ReadWrite); + cluster_manager.SwitchShardToClosed(kDssShardId, + DSShardStatus::ReadOnly); + LOG(INFO) << "cluster_manager change shard status " << kDssShardId + << " to " << static_cast<int>( - cluster_manager.FetchDSShardStatus(ng_id)); + cluster_manager.FetchDSShardStatus(kDssShardId)); if (dss_leader_node_id != UNKNOWN_DSS_LEADER_NODE_ID) { DSSNode dss_node(leader_node->host_name_, TxPort2DssPort(leader_node->port_)); - cluster_manager.UpdatePrimaryNode(ng_id, dss_node); + cluster_manager.UpdatePrimaryNode(kDssShardId, dss_node); } }eloq_data_store_service/main.cpp (1)
321-323: Pass real DSS node/leader IDs into StartService (avoid always-leader bug)Hardcoding 0,0 makes every node believe it's the leader, opening RW database access in multi-node clusters.
Apply this corrected fix (the proposed diff uses an incorrect shard lookup):
bool ret = data_store_service_->StartService( FLAGS_bootstrap || is_single_node, 0, 0);Replace with:
+ // Determine DSS node indices from cluster config. + // Note: shard_id_ will be set on line 255, but we need to compute + // node indices BEFORE calling StartService. + uint32_t dss_node_id = 0; + uint32_t dss_leader_node_id = 0; + { + auto dss_shards = ds_config.GetShardsForThisNode(); + if (!dss_shards.empty()) { + uint32_t shard_id = dss_shards[0]; + const auto& all_shards = ds_config.GetAllShards(); + auto shard_it = all_shards.find(shard_id); + if (shard_it != all_shards.end()) { + const auto& shard = shard_it->second; + auto this_node = ds_config.GetThisNode(); + // Find this node's index in the shard + for (uint32_t i = 0; i < shard.nodes_.size(); ++i) { + if (shard.nodes_[i].host_name_ == this_node.host_name_ && + shard.nodes_[i].port_ == this_node.port_) { + dss_node_id = i; + break; + } + } + // Leader is always nodes_[0] per GetPrimaryNode contract + dss_leader_node_id = 0; + } + } + } bool ret = data_store_service_->StartService( - FLAGS_bootstrap || is_single_node, 0, 0); + FLAGS_bootstrap || is_single_node, dss_leader_node_id, dss_node_id);Key difference from original proposal: Use
GetShardsForThisNode()to identify the correct shard for this node, then lookup that specific shard fromGetAllShards()map—not an arbitrary shard frombegin()->second.eloq_data_store_service/data_store_service.cpp (3)
378-389: Missing explicit SwitchToReadWrite() call in READ_WRITE open path.When opening in READ_WRITE mode (lines 384-389), the code updates cluster state but never calls
data_store_->SwitchToReadWrite(). This matches the concern raised in a previous review that was marked as addressed, but the fix is not present in the current code.Apply this diff to ensure the engine switches to ReadWrite before updating cluster state:
else { assert(open_mode == DSShardStatus::ReadWrite); + data_store_->SwitchToReadWrite(); cluster_manager_.SwitchShardToReadWrite(data_shard_id, DSShardStatus::Closed); }
1814-1830: CloseDataStore still uses unsafe ownership check and ignores switch failures.This method has the same issues flagged in a previous review marked as addressed:
Incorrect ownership check: Line 1817 uses the local
IsOwnerOfShard(shard_id)which only verifiesshard_status_ != Closed && shard_id_ == shard_id. A read-only follower would incorrectly pass this check. The correct approach iscluster_manager_.IsOwnerOfShard(shard_id)which verifies primary ownership.Unchecked return values: Lines 1823 and 1828 call
SwitchReadWriteToReadOnlyandSwitchReadOnlyToClosedbut ignore their boolean return values, preventing error detection.Apply this diff to fix both issues:
void DataStoreService::CloseDataStore(uint32_t shard_id) { assert(shard_id == shard_id_); - if (!IsOwnerOfShard(shard_id)) + uint64_t shard_ver{}; + if (!cluster_manager_.IsOwnerOfShard(shard_id, &shard_ver)) { + DLOG(INFO) << "CloseDataStore skipped; node is not primary for shard " << shard_id; return; } - if (shard_status_.load() == DSShardStatus::ReadWrite) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::ReadWrite) { - SwitchReadWriteToReadOnly(shard_id); + if (!SwitchReadWriteToReadOnly(shard_id)) + { + LOG(WARNING) << "CloseDataStore: RW->RO switch failed for shard " << shard_id; + return; + } } - if (shard_status_.load() == DSShardStatus::ReadOnly) + if (shard_status_.load(std::memory_order_acquire) == DSShardStatus::ReadOnly) { - SwitchReadOnlyToClosed(shard_id); + if (!SwitchReadOnlyToClosed(shard_id)) + { + LOG(WARNING) << "CloseDataStore: RO->Closed switch failed for shard " << shard_id; + } } }
1832-1854: OpenDataStore ignores ConnectAndStartDataStore failure.Line 1853 calls
ConnectAndStartDataStorebut doesn't check its boolean return value. This issue was flagged in a previous review marked as addressed, but the fix is not present in the current code. If the connection fails, the shard remains Closed with no error indication to the caller.Apply this diff to handle failures:
DSShardStatus open_mode = DSShardStatus::ReadWrite; bool create_db_if_missing = false; - ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing); + bool res = ConnectAndStartDataStore(shard_id, open_mode, create_db_if_missing); + if (!res) + { + LOG(ERROR) << "OpenDataStore: failed to connect and start data store for shard " << shard_id; + } }
🧹 Nitpick comments (4)
eloq_data_store_service/data_store_service_config.h (1)
314-316: Make sentinel constexpr and fix typo; avoid macro dependencyPrefer an inline constexpr with std::numeric_limits and correct the comment spelling.
Apply:
-// The DSS leader node ID is unknown if euqal to this value -static const uint32_t UNKNOWN_DSS_LEADER_NODE_ID = UINT32_MAX; +// The DSS leader node ID is unknown if equal to this value +#include <limits> +inline constexpr uint32_t UNKNOWN_DSS_LEADER_NODE_ID = + std::numeric_limits<uint32_t>::max();eloq_data_store_service/data_store_service.cpp (2)
240-275: Consider validating leadership parameters before use.The leadership determination
(dss_leader_node_id == dss_node_id)assumes valid IDs. If both parameters are default/sentinel values (e.g., 0 or UINT32_MAX), this could incorrectly identify a node as leader.Consider adding validation:
+ if (dss_leader_node_id == UINT32_MAX || dss_node_id == UINT32_MAX) + { + LOG(ERROR) << "Invalid DSS node IDs provided to StartService"; + return false; + } // start underling db if this dss node is the // leader dss node data_store_ = data_store_factory_->CreateDataStore( create_db_if_missing, shard_id_, this, dss_leader_node_id == dss_node_id);
326-326: Clarify the removal of the ReadOnly assertion.The commented-out assertion suggests this function now handles modes beyond ReadOnly, but the reason isn't documented. If this is intentional, remove the commented code; otherwise, restore the assertion with appropriate logic.
eloq_data_store_service/data_store_service.h (1)
619-622: Fix indentation in GetClusterManager.The method body uses 2-space indentation while the rest of the codebase uses 4 spaces.
Apply this diff to fix indentation:
DataStoreServiceClusterManager &GetClusterManager() { - return cluster_manager_; + return cluster_manager_; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
data_store_service_client.cpp(6 hunks)data_store_service_client.h(2 hunks)eloq_data_store_service/data_store_service.cpp(8 hunks)eloq_data_store_service/data_store_service.h(4 hunks)eloq_data_store_service/data_store_service_config.cpp(3 hunks)eloq_data_store_service/data_store_service_config.h(1 hunks)eloq_data_store_service/main.cpp(3 hunks)eloq_data_store_service/rocksdb_data_store_common.cpp(1 hunks)eloq_data_store_service/rocksdb_data_store_common.h(2 hunks)rocksdb_handler.cpp(3 hunks)rocksdb_handler.h(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- eloq_data_store_service/rocksdb_data_store_common.cpp
- eloq_data_store_service/rocksdb_data_store_common.h
- rocksdb_handler.cpp
- rocksdb_handler.h
- eloq_data_store_service/data_store_service_config.cpp
🧰 Additional context used
🧬 Code graph analysis (4)
eloq_data_store_service/data_store_service.cpp (3)
eloq_data_store_service/data_store_service_util.h (1)
shard_id_(118-118)eloq_data_store_service/data_store_service.h (1)
IsOwnerOfShard(606-614)eloq_data_store_service/data_store_service_config.cpp (4)
IsOwnerOfShard(285-293)IsOwnerOfShard(285-285)IsOwnerOfShard(690-701)IsOwnerOfShard(690-691)
data_store_service_client.h (2)
data_store_service_client.cpp (6)
TxConfigsToDssClusterConfig(150-209)TxConfigsToDssClusterConfig(150-156)OnLeaderStop(2957-2969)OnLeaderStop(2957-2957)OnStartFollowing(2978-3025)OnStartFollowing(2978-2981)rocksdb_handler.cpp (2)
OnStartFollowing(2155-2179)OnStartFollowing(2155-2158)
data_store_service_client.cpp (2)
data_store_service_client.h (2)
TxPort2DssPort(129-132)Connect(150-155)rocksdb_handler.cpp (4)
Connect(138-256)Connect(138-138)OnStartFollowing(2155-2179)OnStartFollowing(2155-2158)
eloq_data_store_service/data_store_service.h (2)
eloq_data_store_service/data_store_service.cpp (10)
StartService(240-316)StartService(240-242)CreateSnapshotForBackup(1164-1208)CreateSnapshotForBackup(1164-1168)CreateSnapshotForBackup(1210-1255)CreateSnapshotForBackup(1210-1216)CloseDataStore(1814-1830)CloseDataStore(1814-1814)OpenDataStore(1832-1854)OpenDataStore(1832-1832)eloq_data_store_service/data_store_service_config.h (1)
DataStoreServiceClusterManager(317-513)
🔇 Additional comments (7)
data_store_service_client.cpp (3)
286-288: LGTM: extra debug log helps visibilityNon-invasive DLOG with table count is fine.
2943-2955: Leader start: open local store then reconnectFlow looks correct and guarded by nullptr check.
Please confirm OpenDataStore(0) is idempotent when called repeatedly during re-elections.
2957-2969: Leader stop: close local storeLooks good and safe under nullptr check.
eloq_data_store_service/data_store_service.cpp (1)
2529-2529: LGTM: Namespace closing comment is clear.The explicit namespace qualifier in the closing comment aids readability in large files.
eloq_data_store_service/data_store_service.h (3)
190-192: LGTM: StartService signature extension is appropriate.The addition of
dss_leader_node_idanddss_node_idparameters enables leadership-aware initialization, consistent with the implementation.
606-614: LGTM: Diagnostic logging aids ownership debugging.The added DLOG statements provide valuable context for troubleshooting ownership checks. Since this is a frequently-called inline method, using DLOG (debug-only) is appropriate to avoid production overhead.
616-617: LGTM: Lifecycle method declarations support dynamic shard management.The
CloseDataStoreandOpenDataStoredeclarations provide the necessary public interface for shard lifecycle control. (Note: Implementation issues are flagged in the .cpp file review.)
| LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, " | ||
| "shard_id:" | ||
| << group.shard_id_ << ", version:" << group.version_ | ||
| << ", owner_node:" << group.nodes_[0].host_name_ << ":" | ||
| << group.nodes_[0].port_; | ||
| } | ||
| } | ||
| else | ||
| { | ||
| LOG(INFO) | ||
| << "DataStoreServiceCliet SetupConfig skipped, current_version:" | ||
| << current_version << ", new_version:" << new_version; | ||
| } |
There was a problem hiding this comment.
Fix log typos for clarity
"DataStoreServiceCliet" → "DataStoreServiceClient".
Apply:
- LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, "
+ LOG(INFO) << "DataStoreServiceClient UpgradeShardVersion success, "
@@
- LOG(INFO)
- << "DataStoreServiceCliet SetupConfig skipped, current_version:"
+ LOG(INFO)
+ << "DataStoreServiceClient SetupConfig skipped, current_version:"📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| LOG(INFO) << "DataStoreServiceCliet UpgradeShardVersion success, " | |
| "shard_id:" | |
| << group.shard_id_ << ", version:" << group.version_ | |
| << ", owner_node:" << group.nodes_[0].host_name_ << ":" | |
| << group.nodes_[0].port_; | |
| } | |
| } | |
| else | |
| { | |
| LOG(INFO) | |
| << "DataStoreServiceCliet SetupConfig skipped, current_version:" | |
| << current_version << ", new_version:" << new_version; | |
| } | |
| LOG(INFO) << "DataStoreServiceClient UpgradeShardVersion success, " | |
| "shard_id:" | |
| << group.shard_id_ << ", version:" << group.version_ | |
| << ", owner_node:" << group.nodes_[0].host_name_ << ":" | |
| << group.nodes_[0].port_; | |
| } | |
| } | |
| else | |
| { | |
| LOG(INFO) | |
| << "DataStoreServiceClient SetupConfig skipped, current_version:" | |
| << current_version << ", new_version:" << new_version; | |
| } |
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 135 to 147, fix the typo in the
log messages where "DataStoreServiceCliet" should be "DataStoreServiceClient":
update both occurrences in the INFO logs (the success message and the skipped
SetupConfig message) to use the correct service name while keeping existing
context (shard_id, version, owner_node and current_version/new_version)
unchanged.
| << "DataStoreServiceClient OnLeaderStop called data_store_service_:" | ||
| << data_store_service_; | ||
| // swith to read only in case of data store status is read write | ||
| if (data_store_service_ != nullptr) | ||
| { | ||
| // Now, only support one shard. | ||
| data_store_service_->CloseDataStore(0); |
There was a problem hiding this comment.
Nit: fix log/comment typos
"swith to read only" → "switch to read-only".
Apply:
- // swith to read only in case of data store status is read write
+ // switch to read-only in case data store status is read-write🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2960 to 2966, fix the typo in the
inline comment and log text: change "swith to read only" to "switch to
read-only" (and ensure consistent hyphenation/casing if used elsewhere in the
block). Update the comment and any adjacent log strings to the corrected wording
so the code reads clearly and consistently.
| } | ||
|
|
||
| } // namespace EloqDS No newline at end of file | ||
| } // namespace EloqD No newline at end of file |
There was a problem hiding this comment.
Namespace comment typo
Close comment says EloqD; should be EloqDS.
Apply:
-} // namespace EloqD
+} // namespace EloqDS🤖 Prompt for AI Agents
In data_store_service_client.cpp around line 4755, the namespace closing comment
reads "EloqD" but should be "EloqDS"; update the trailing comment to exactly
match the opened namespace name (EloqDS) so it reads "} // namespace EloqDS".
Summary by CodeRabbit
New Features
Chores
Note
Integrates txservice-to-DSS config translation and leader/follower lifecycle handling, expands DSS service startup/mode transitions, relaxes config status checks, and defers RocksDB worker pool init.
TxPort2DssPortandTxConfigsToDssClusterConfigto translate txservice configs into DSS cluster config and update client topology.PutAll; minor lambda formatting.StartService(create_db_if_missing, dss_leader_node_id, dss_node_id)and start underlying DB only on leader nodes.ConnectAndStartDataStoreto support both ReadOnly and ReadWrite modes; add detailed logging.OpenDataStore/CloseDataStoreto transition shard states.Saveno-op on empty filename.query_worker_pool_untilInitialize(); trackquery_worker_number_from config.Written by Cursor Bugbot for commit 34b41ab. This will update automatically on new commits. Configure here.