Skip to content

Remove lock serv_mux_ from DataStoreService#68

Merged
lzxddz merged 4 commits intomainfrom
remove-mutex
Oct 14, 2025
Merged

Remove lock serv_mux_ from DataStoreService#68
lzxddz merged 4 commits intomainfrom
remove-mutex

Conversation

@lzxddz
Copy link
Collaborator

@lzxddz lzxddz commented Sep 7, 2025

Change DataStoreService: only contain one data shard in one DataStoreService.

Create DataStore in DataStoreService instead in eloqsql or eloqkv server.

Remove lock serv_mux_ from DataStoreService, use atomic type shard_status to identify the data_store is available.

Add write_req_cnt and read_req_cnt in DataStoreService to identify the datastore mode can be changed to ReadOnly or Closed

RelatedPR:
eloqdata/eloqkv#140
eloqdata/eloqsql#107
eloqdata/eloqdoc#225

Summary by CodeRabbit

  • New Features

    • Optional DB auto-creation at startup for bootstrap/single-node mode.
    • Shard-aware routing for remote requests with per-node targeting.
  • Improvements

    • Simplified single-shard lifecycle with explicit Read/Write/Closed states and ownership checks.
    • Exposed write-request accounting for accurate tracking.
    • Thread pool now reports submission success.
    • Sharding messages include shard_version.
  • Bug Fixes

    • Requests fail fast with clear errors when work cannot be queued.
    • Missing shard status standardized to "Closed".

@lzxddz lzxddz self-assigned this Sep 7, 2025
@lzxddz lzxddz linked an issue Sep 7, 2025 that may be closed by this pull request

void IncreaseReadReqCount()
{
read_req_cnt_++;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_req_cnt_++ is euqal to read_req_cnt_.fetch_add(1, std::memory_order_seq_cst).
Do we need the strongest memory order here? Or, just std::memory_order_release is enough?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, just std::memory_order_release is enough.

if (shard_status_.load(std::memory_order_acquire) ==
DSShardStatus::ReadOnly)
{
cluster_manager_.SwitchShardToReadOnly(shard_id, expected);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The status of data store service, cluster_manager and data store should be kept in sync, how can we guarantee they are kept in sync in a transaction manner?


// shutdown all data_store
if (!data_store_map_.empty())
if (data_store_ != nullptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is data_store_ protected?

return true;
}

auto dss_shards = cluster_manager_.GetShardsForThisNode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API should be updated. If each node only contains 1 shard, then it should only return a uint64_t as shard id

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 23, 2025

Walkthrough

Refactors service from multi-shard maps to a single-shard model with a single data_store_, explicit shard_status_ and ownership checks; StartService now requires a boolean; RPC requests carry DataStoreService pointers for write-count accounting; ThreadWorkerPool::SubmitWork returns bool and submission-failure paths propagate across datastore operations.

Changes

Cohort / File(s) Summary of Changes
Service single-shard refactor
eloq_data_store_service/data_store_service.cpp, eloq_data_store_service/data_store_service.h
Replace per-shard map with single data_store_, shard_id_, shard_status_, and ongoing_write_requests_; add ownership helpers (IsOwnerOfShard, GetDataStore), Increase/DecreaseWriteReqCount, new shard transition methods, and change StartService to StartService(bool create_db_if_missing); remove Connect/Disconnect public APIs.
Request lifecycle wiring
eloq_data_store_service/internal_request.h
Many Reset(...) methods now accept DataStoreService* and store it in request objects; SetFinish/completion handlers call DecreaseWriteReqCount() to decrement service write counters.
Worker pool API change
eloq_data_store_service/thread_worker_pool.h, eloq_data_store_service/thread_worker_pool.cpp
SubmitWork changed from void to bool; callers now check submission acceptance (false when pool is shutting down or rejects work).
RocksDB submission/error handling
eloq_data_store_service/rocksdb_data_store_common.cpp, eloq_data_store_service/rocksdb_data_store_common.h, eloq_data_store_service/rocksdb_cloud_data_store.cpp
Removed per-store write-counter APIs and ongoing_write_requests_; operations now inspect SubmitWork return and fail early with appropriate errors (e.g., DB_NOT_OPEN, CREATE_SNAPSHOT_ERROR) when submission is rejected.
Client shard/node routing & closures
data_store_service_client.cpp, data_store_service_client.h, data_store_service_client_closure.h
Replace global channel accessors with shard-aware per-node routing and DssNode array; add shard helpers (GetShardIdByPartitionId, AllDataShardCount, GetOwnerNodeIndexOfShard, UpdateOwnerNodeIndexOfShard, FindFreeNodeIndex, UpgradeShardVersion, HandleShardingError); closures gain remote_node_index_ and SetRemoteNodeIndex for retry/ownership updates.
Service config / enum / proto changes
eloq_data_store_service/data_store_service_config.cpp, eloq_data_store_service/data_store_service_util.h, eloq_data_store_service/ds_request.proto
FetchDSShardStatus now returns Closed for missing shards; DSShardStatus becomes enum : uint8_t, Unknown removed, Starting=4 added; KeyShardingChanged proto adds uint64 shard_version = 4 and moves new_cluster_config to tag 5.
Startup/main simplification
eloq_data_store_service/main.cpp
Removed per-shard startup loop and per-shard Connect/Start; compute is_single_node and call StartService(bool) once; simplified startup/shutdown to single-store flow.
API surface / headers
multiple headers & protos (see diffs)
Public signature changes: StartService(bool); removed public Connect/Disconnect/ConnectAndStart; added DataStoreService helpers and closure setter methods; ThreadWorkerPool::SubmitWork now returns bool.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Main
  participant Service
  participant Pool
  participant DS
  Note over Main,Service: Startup (single-store)
  Main->>Service: StartService(create_db_if_missing)
  Service->>Service: set shard_id_ & shard_status_
  Service->>Pool: SubmitWork(init task)
  alt Submit accepted
    Pool-->>DS: run init task
    DS-->>Service: init result
    Service-->>Main: StartService OK
  else Submit rejected
    Service-->>Main: StartService fail (DB_NOT_OPEN)
  end
Loading
sequenceDiagram
  autonumber
  participant Client
  participant Service
  participant Pool
  participant DS
  Note over Client,Service: Write request flow
  Client->>Service: Request(partition/shard, payload)
  Service->>Service: IsOwnerOfShard / check shard_status_
  alt Not owner or Closed
    Service-->>Client: Error (non-owner / DB_NOT_OPEN)
  else Owned & Open
    Service->>Service: IncreaseWriteReqCount()
    Service->>Pool: SubmitWork(operation) returns bool
    alt Submit accepted
      Pool-->>DS: Execute operation
      DS-->>Service: Callback/finish
      Service->>Service: DecreaseWriteReqCount()
      Service-->>Client: Result
    else Submit rejected
      Service->>Service: DecreaseWriteReqCount()
      Service-->>Client: Error (DB_NOT_OPEN)
    end
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested reviewers

  • liunyl

Poem

In one snug burrow, one shard I tend,
No branching maps — a single friend.
I submit my work, accept or toss,
I count the hops, then fix what’s lost.
Closed or open, I tidy the nest and thump for tests. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title “Remove lock serv_mux_ from DataStoreService” accurately reflects a significant part of the changeset by emphasizing the removal of the serv_mux_ mutex, which is integral to the broader refactor introducing atomic shard state and single-shard hosting in DataStoreService.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch remove-mutex

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
eloq_data_store_service/data_store_service.cpp (1)

411-432: Race condition vulnerability in write request handling

There's a potential race condition: the write count is incremented before checking shard status. If the status changes between lines 411 and 413, the count may never be decremented, leading to a deadlock during migration when waiting for ongoing_write_requests_ to reach zero.

Move the write count increment after the status check to prevent this race condition:

-    IncreaseWriteReqCount();
-
-    auto shard_status = shard_status_.load(std::memory_order_acquire);
+    auto shard_status = shard_status_.load(std::memory_order_acquire);
     if (shard_status != DSShardStatus::ReadWrite)
     {
-        DecreaseWriteReqCount();
         brpc::ClosureGuard done_guard(done);
         ::EloqDS::remote::CommonResult *result = response->mutable_result();
         if (shard_status == DSShardStatus::Closed)
         {
             PrepareShardingError(shard_id, result);
         }
         else
         {
             assert(shard_status == DSShardStatus::ReadOnly);
             result->set_error_code(
                 ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB);
             result->set_error_msg("Write to read-only DB.");
         }
         return;
     }
+
+    IncreaseWriteReqCount();

This same pattern appears in multiple places and should be fixed consistently.

🧹 Nitpick comments (3)
eloq_data_store_service/main.cpp (1)

275-275: Initialization order consideration for is_single_node.

The is_single_node flag is initialized based on eloq_dss_peer_node.empty(), but eloq_dss_peer_node gets its value from command line or config later (lines 175-179). Ensure this initialization happens after the value is properly set.

-    bool is_single_node = eloq_dss_peer_node.empty();
+    // Move this after line 179 where eloq_dss_peer_node is fully initialized
+    bool is_single_node = eloq_dss_peer_node.empty();
eloq_data_store_service/rocksdb_data_store_common.cpp (1)

484-631: Consider extracting common error handling logic.

The error handling pattern for SubmitWork failures is repeated across multiple methods. Consider extracting it into a helper method to reduce code duplication:

+template<typename RequestType>
+void HandleSubmitFailure(RequestType* req, const char* operation_name) {
+    LOG(ERROR) << "Failed to submit " << operation_name << " work to query worker pool";
+    ::EloqDS::remote::CommonResult result;
+    result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN);
+    result.set_error_msg("DB is not opened");
+    req->SetFinish(result);
+    req->Clear();
+    req->Free();
+}

 void RocksDBDataStoreCommon::BatchWriteRecords(
     WriteRecordsRequest *batch_write_req)
 {
     // ... existing code ...
     
     if (!res)
     {
-        LOG(ERROR) << "Failed to submit batch write work to query worker pool";
-        ::EloqDS::remote::CommonResult result;
-        result.set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN);
-        result.set_error_msg("DB is not opened");
-        batch_write_req->SetFinish(result);
-        batch_write_req->Clear();
-        batch_write_req->Free();
-        return;
+        HandleSubmitFailure(batch_write_req, "batch write");
+        return;
     }
 }
eloq_data_store_service/data_store_service.cpp (1)

229-233: Use unique_ptr::reset() for clarity

Consider using reset() instead of explicit assignment to nullptr for better semantics.

-        data_store_->Shutdown();
-        data_store_ = nullptr;
+        data_store_->Shutdown();
+        data_store_.reset();
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 12155c6 and 1e78c2e.

📒 Files selected for processing (11)
  • eloq_data_store_service/data_store_service.cpp (40 hunks)
  • eloq_data_store_service/data_store_service.h (4 hunks)
  • eloq_data_store_service/data_store_service_config.cpp (1 hunks)
  • eloq_data_store_service/data_store_service_util.h (1 hunks)
  • eloq_data_store_service/internal_request.h (27 hunks)
  • eloq_data_store_service/main.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.cpp (12 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.h (1 hunks)
  • eloq_data_store_service/thread_worker_pool.cpp (1 hunks)
  • eloq_data_store_service/thread_worker_pool.h (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • eloq_data_store_service/data_store_service.cpp
🧬 Code graph analysis (5)
eloq_data_store_service/thread_worker_pool.h (1)
eloq_data_store_service/thread_worker_pool.cpp (2)
  • SubmitWork (79-89)
  • SubmitWork (79-79)
eloq_data_store_service/data_store_service.cpp (5)
eloq_data_store_service/data_store_service_util.h (1)
  • shard_id_ (115-115)
eloq_data_store_service/rocksdb_data_store_common.cpp (13)
  • Read (409-470)
  • Read (409-409)
  • key (823-823)
  • FlushData (276-330)
  • FlushData (276-276)
  • start_key (360-360)
  • start_key (670-670)
  • end_key (375-375)
  • end_key (675-675)
  • CreateTable (634-643)
  • CreateTable (634-634)
  • DropTable (645-724)
  • DropTable (645-645)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
data_store_service_client.h (1)
  • GetShardIdByPartitionId (625-628)
eloq_data_store_service/data_store_service.h (4)
  • IsOwnerOfShard (611-614)
  • IncreaseWriteReqCount (593-596)
  • DecreaseWriteReqCount (598-601)
  • GetDataStore (616-626)
eloq_data_store_service/data_store_service.h (3)
eloq_data_store_service/data_store_service.cpp (10)
  • StartService (236-284)
  • StartService (236-236)
  • ConnectAndStartDataStore (286-326)
  • ConnectAndStartDataStore (286-288)
  • SwitchReadWriteToReadOnly (2144-2178)
  • SwitchReadWriteToReadOnly (2144-2144)
  • SwitchReadOnlyToClosed (2180-2203)
  • SwitchReadOnlyToClosed (2180-2180)
  • SwitchReadOnlyToReadWrite (2205-2225)
  • SwitchReadOnlyToReadWrite (2205-2205)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
eloq_data_store_service/data_store.h (1)
  • DataStore (42-144)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)
eloq_data_store_service/data_store_service.cpp (16)
  • DeleteRange (484-530)
  • DeleteRange (484-488)
  • DeleteRange (532-584)
  • DeleteRange (532-538)
  • Read (328-359)
  • Read (328-331)
  • Read (361-394)
  • Read (361-368)
  • ScanNext (812-873)
  • ScanNext (812-825)
  • ScanNext (875-916)
  • ScanNext (875-878)
  • ScanClose (918-959)
  • ScanClose (918-921)
  • ScanClose (961-1000)
  • ScanClose (961-965)
eloq_data_store_service/internal_request.h (1)
data_store_service_client_closure.h (31)
  • Reset (81-85)
  • Reset (144-153)
  • Reset (210-216)
  • Reset (269-274)
  • Reset (383-390)
  • Reset (598-614)
  • Reset (914-928)
  • Reset (1125-1145)
  • Reset (1345-1359)
  • Reset (1541-1564)
  • Reset (1567-1596)
  • Reset (1598-1630)
  • Reset (1840-1871)
  • Reset (2195-2213)
  • Reset (2478-2484)
  • Reset (2875-2883)
  • string_view (311-366)
  • string_view (745-748)
  • string_view (755-758)
  • string_view (780-785)
  • string_view (1257-1260)
  • string_view (1482-1485)
  • string_view (2013-2016)
  • string_view (2018-2021)
  • string_view (2116-2121)
  • string_view (2330-2333)
  • vector (1056-1059)
  • vector (1093-1100)
  • vector (2128-2131)
  • string (792-797)
  • string (2048-2053)
🔇 Additional comments (22)
eloq_data_store_service/data_store_service_config.cpp (1)

191-199: Good change: Returning Closed for missing shards aligns with the single-store model.

The change from Unknown to Closed for missing shards is semantically clearer and aligns well with the new single-store ownership model.

eloq_data_store_service/thread_worker_pool.h (1)

42-42: LGTM: Good API improvement for error handling.

Changing SubmitWork to return bool enables proper error handling when the thread pool cannot accept work (e.g., during shutdown). This is a necessary improvement for the new error handling patterns throughout the codebase.

eloq_data_store_service/rocksdb_cloud_data_store.cpp (1)

633-699: Good error handling pattern for async submission failures.

The code correctly checks the return value of SubmitWork and handles submission failures by setting an appropriate error status. This prevents silent failures when the worker pool is shutting down.

eloq_data_store_service/thread_worker_pool.cpp (1)

79-89: LGTM: Clean implementation of submission result signaling.

The implementation correctly returns false when shutdown is active and true after successfully queuing work. This provides clear feedback to callers about submission success.

eloq_data_store_service/rocksdb_data_store_common.h (1)

106-154: Good cleanup: Removing write counter management simplifies the concurrency model.

The removal of WaitForPendingWrites, IncreaseWriteCounter, DecreaseWriteCounter, and ongoing_write_requests_ aligns with the new single-store model and atomic shard status tracking. The simplified approach using submission results is cleaner than maintaining per-operation counters.

eloq_data_store_service/main.cpp (1)

318-326: Good: Simplified service startup with bootstrap flag.

The change to pass a boolean parameter to StartService based on bootstrap or single-node mode is cleaner than the previous per-shard initialization approach.

eloq_data_store_service/rocksdb_data_store_common.cpp (2)

278-330: Consistent error handling pattern for async operations.

The pattern of checking SubmitWork return value and handling failures with appropriate error codes is consistently applied. Good defensive programming.


1063-1084: No dangling-reference risk in SwitchToReadOnly(). Locally declared mutex, cond_var, and done remain in scope until after the lambda signals and the wait completes, so capturing them by reference here is safe.

eloq_data_store_service/data_store_service_util.h (1)

31-34: Resolved — removal of Unknown is safe.
All DSShardStatus instances are explicitly initialized or set via explicit remote->local mapping; no uninitialized/default-zero usage found. See: eloq_data_store_service/data_store_service_util.h:118 (status_{DSShardStatus::Closed}), eloq_data_store_service/data_store_service.h:668 (shard_status_{DSShardStatus::Closed}), eloq_data_store_service/data_store_service.cpp:1360–1374 (remote->local switch).

eloq_data_store_service/data_store_service.cpp (9)

236-261: Good architectural change to single-shard design

The StartService refactoring properly initializes a single data store per service instance and handles the optional database creation flag appropriately.


454-482: Apply the race condition fix consistently

The same race condition pattern exists in these functions. The write count should be incremented only after confirming shard_status is ReadWrite.

Also applies to: 499-530, 549-584


601-630: Apply the race condition fix to table operations

The same race condition exists in CreateTable and DropTable operations (both RPC and local versions). Apply the same fix pattern.

Also applies to: 644-672, 689-718, 732-760


779-810: Apply the race condition fix to BatchWriteRecords

Apply the same fix to prevent the write count race condition.


1076-1117: Apply the race condition fix to local BatchWriteRecords

Apply the same fix to prevent the write count race condition.


1135-1163: Apply the race condition fix to CreateSnapshotForBackup

Apply the same fix to prevent the write count race condition in both RPC and local versions of CreateSnapshotForBackup.

Also applies to: 1180-1210


2144-2178: Good implementation of state transition synchronization

The SwitchReadWriteToReadOnly implementation properly uses compare_exchange_strong and waits for ongoing write requests to complete before transitioning. The atomic operations ensure thread safety.

Verify that the busy-wait loop with 1ms sleep is acceptable for your latency requirements. Consider implementing an exponential backoff or using a condition variable for more efficient waiting if migrations happen frequently.


2168-2168: Status synchronization needs careful handling

The synchronization between data store service, cluster_manager, and data store statuses needs to be atomic or properly ordered to prevent inconsistencies. Consider using a two-phase commit pattern or ensuring strict ordering of updates.


320-326: Verify ConnectAndStartDataStore precondition: shard_status_ == Closed

rg shows the function (definition ~ eloq_data_store_service/data_store_service.cpp) and a caller at line ~1381, but I couldn't inspect all call sites. Confirm every call path guarantees shard_status_ == DSShardStatus::Closed before the hard assert at eloq_data_store_service/data_store_service.cpp:320-326; if not, remove/replace the assert with a runtime check (log + return false) or document the precondition.

eloq_data_store_service/data_store_service.h (2)

593-601: Memory ordering can be optimized for write counters

The memory ordering for write request counting can be relaxed. Use std::memory_order_relaxed for the increment since we don't need synchronization with other memory operations, only the atomic counter itself matters.

 void IncreaseWriteReqCount()
 {
-    ongoing_write_requests_.fetch_add(1, std::memory_order_release);
+    ongoing_write_requests_.fetch_add(1, std::memory_order_relaxed);
 }

 void DecreaseWriteReqCount()
 {
-    ongoing_write_requests_.fetch_sub(1, std::memory_order_release);
+    ongoing_write_requests_.fetch_sub(1, std::memory_order_relaxed);
 }

The acquire ordering in SwitchReadWriteToReadOnly when reading the counter is sufficient for synchronization.


658-669: Good architectural documentation

The detailed comment explaining the concurrency control strategy using atomic variables instead of mutexes is excellent. This makes the design intent clear for future maintainers.

eloq_data_store_service/internal_request.h (2)

83-92: LGTM: Consistent DataStoreService pointer integration

The changes properly wire the DataStoreService pointer through all request types to enable write count tracking. The implementation is consistent across RPC and local request variants.

Also applies to: 198-225


160-160: Verify SetFinish (and DecreaseWriteReqCount) on every path
For each RPC request obtained via rpc_*_req_pool_.NextObject() in data_store_service.cpp (e.g. FlushDataRpcRequest @ L435, DeleteRangeRpcRequest @ L525, CreateTableRpcRequest @ L625, DropTableRpcRequest @ L713, WriteRecordsRpcRequest @ L805, ScanRpcRequest, CreateSnapshotForBackupRpcRequest @ L1159), confirm that its SetFinish override in internal_request.h is invoked on both success and all error branches so DecreaseWriteReqCount() can never be skipped.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
eloq_data_store_service/data_store_service_util.h (1)

24-27: Add missing cstdint include for fixed-width integer types

This header uses uint8_t/uint16_t/uint64_t but does not include . Add it here to avoid relying on transitive includes.

 #include <string>
+#include <cstdint>
 #include <utility>
 #include <vector>
eloq_data_store_service/rocksdb_data_store_common.h (1)

24-38: Include headers for shared_mutex, mutex, memory, and string_view

This header declares members and methods that require these standard headers but does not include them. Relying on transitive includes is brittle and may break builds.

 #pragma once

 #include <rocksdb/compaction_filter.h>
 #include <rocksdb/db.h>
 #include <rocksdb/listener.h>
 #include <rocksdb/slice.h>

 #include <cassert>
 #include <chrono>
 #include <string>
+#include <string_view>
+#include <memory>
+#include <shared_mutex>
+#include <mutex>
eloq_data_store_service/data_store_service.cpp (1)

836-852: Fix: Scan must be allowed in ReadOnly; also fix incorrect PrepareShardingError arg

ScanNext/ScanClose currently reject ReadOnly with WRITE_TO_READ_ONLY_DB and also pass partition_id to PrepareShardingError (expects shard_id). This breaks read-only nodes and returns the wrong error on closed shards.

Apply these diffs:

@@ void DataStoreService::ScanNext(..., CommonResult *result, Closure *done)
-    auto shard_status = shard_status_.load(std::memory_order_acquire);
-    if (shard_status != DSShardStatus::ReadWrite)
-    {
-        brpc::ClosureGuard done_guard(done);
-        if (shard_status == DSShardStatus::Closed)
-        {
-            PrepareShardingError(partition_id, result);
-        }
-        else
-        {
-            assert(shard_status == DSShardStatus::ReadOnly);
-            result->set_error_code(
-                ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB);
-            result->set_error_msg("Write to read-only DB.");
-        }
-        return;
-    }
+    auto shard_status = shard_status_.load(std::memory_order_acquire);
+    if (shard_status == DSShardStatus::Closed)
+    {
+        brpc::ClosureGuard done_guard(done);
+        PrepareShardingError(shard_id, result);
+        return;
+    }
@@ void DataStoreService::ScanNext(RpcController*, const ScanRequest*, ScanResponse*, Closure*)
-    auto shard_status = shard_status_.load(std::memory_order_acquire);
-    if (shard_status != DSShardStatus::ReadWrite)
-    {
-        brpc::ClosureGuard done_guard(done);
-        if (shard_status == DSShardStatus::Closed)
-        {
-            PrepareShardingError(partition_id, response->mutable_result());
-        }
-        else
-        {
-            assert(shard_status == DSShardStatus::ReadOnly);
-            auto *result = response->mutable_result();
-            result->set_error_code(
-                ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB);
-            result->set_error_msg("Write to read-only DB.");
-        }
-        return;
-    }
+    auto shard_status = shard_status_.load(std::memory_order_acquire);
+    if (shard_status == DSShardStatus::Closed)
+    {
+        brpc::ClosureGuard done_guard(done);
+        PrepareShardingError(shard_id, response->mutable_result());
+        return;
+    }
@@ void DataStoreService::ScanClose(RpcController*, const ScanRequest*, ScanResponse*, Closure*)
-    auto shard_status = shard_status_.load(std::memory_order_acquire);
-    if (shard_status != DSShardStatus::ReadWrite)
-    {
-        brpc::ClosureGuard done_guard(done);
-        if (shard_status == DSShardStatus::Closed)
-        {
-            PrepareShardingError(partition_id, response->mutable_result());
-        }
-        else
-        {
-            assert(shard_status == DSShardStatus::ReadOnly);
-            auto *result = response->mutable_result();
-            result->set_error_code(
-                ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB);
-            result->set_error_msg("Write to read-only DB.");
-        }
-        return;
-    }
+    auto shard_status = shard_status_.load(std::memory_order_acquire);
+    if (shard_status == DSShardStatus::Closed)
+    {
+        brpc::ClosureGuard done_guard(done);
+        PrepareShardingError(shard_id, response->mutable_result());
+        return;
+    }
@@ void DataStoreService::ScanClose(const std::string_view, uint32_t, std::string*, CommonResult*, Closure*)
-    auto shard_status = shard_status_.load(std::memory_order_acquire);
-    if (shard_status != DSShardStatus::ReadWrite)
-    {
-        brpc::ClosureGuard done_guard(done);
-        if (shard_status == DSShardStatus::Closed)
-        {
-            PrepareShardingError(partition_id, result);
-        }
-        else
-        {
-            assert(shard_status == DSShardStatus::ReadOnly);
-            result->set_error_code(
-                ::EloqDS::remote::DataStoreError::WRITE_TO_READ_ONLY_DB);
-            result->set_error_msg("Write to read-only DB.");
-        }
-        return;
-    }
+    auto shard_status = shard_status_.load(std::memory_order_acquire);
+    if (shard_status == DSShardStatus::Closed)
+    {
+        brpc::ClosureGuard done_guard(done);
+        PrepareShardingError(shard_id, result);
+        return;
+    }

Also applies to: 969-992, 891-909, 934-951

🧹 Nitpick comments (4)
eloq_data_store_service/data_store_service.cpp (2)

1007-1051: Scan iterator helpers: assert shard_id in all helpers

Borrow/Return/Erase/ForceErase helpers ignore the shard_id parameter except for a couple asserts. For consistency, add asserts in Return/Erase/ForceErase too.

 void DataStoreService::ReturnScanIter(uint32_t shard_id, TTLWrapper *iter)
 {
-    scan_iter_cache_.Return(iter);
+    assert(shard_id == shard_id_);
+    scan_iter_cache_.Return(iter);
 }
@@
 void DataStoreService::EraseScanIter(uint32_t shard_id,
                                      const std::string &session_id)
 {
-    scan_iter_cache_.Erase(session_id);
+    assert(shard_id == shard_id_);
+    scan_iter_cache_.Erase(session_id);
 }
@@
 void DataStoreService::ForceEraseScanIters(uint32_t shard_id)
 {
-    scan_iter_cache_.ForceEraseIters();
+    assert(shard_id == shard_id_);
+    scan_iter_cache_.ForceEraseIters();
 }

328-359: Remove or implement read counters The inline “decrease read req count when read done” comment in DataStoreService::Read lacks a matching IncreaseReadReqCount; either add the counter at entry or remove the comment (also apply to the Write handler at lines 361–394).

eloq_data_store_service/data_store_service.h (2)

575-580: Explicitly load atomic shard_status with acquire

Avoid implicit atomic-to-enum conversion (seq_cst). Use an explicit acquire load for clarity and sufficient ordering.

-        if (shard_id_ == shard_id)
-        {
-            return shard_status_;
-        }
+        if (shard_id_ == shard_id)
+        {
+            return shard_status_.load(std::memory_order_acquire);
+        }

593-601: Relax increment; keep release on decrement for the zero-observation handoff

Increment can be relaxed; release on decrement pairs with the acquire load in the waiter.

-        ongoing_write_requests_.fetch_add(1, std::memory_order_release);
+        ongoing_write_requests_.fetch_add(1, std::memory_order_relaxed);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e78c2e and b9e65f7.

📒 Files selected for processing (11)
  • eloq_data_store_service/data_store_service.cpp (40 hunks)
  • eloq_data_store_service/data_store_service.h (4 hunks)
  • eloq_data_store_service/data_store_service_config.cpp (1 hunks)
  • eloq_data_store_service/data_store_service_util.h (1 hunks)
  • eloq_data_store_service/internal_request.h (27 hunks)
  • eloq_data_store_service/main.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.cpp (12 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.h (1 hunks)
  • eloq_data_store_service/thread_worker_pool.cpp (1 hunks)
  • eloq_data_store_service/thread_worker_pool.h (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • eloq_data_store_service/thread_worker_pool.h
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp
  • eloq_data_store_service/internal_request.h
  • eloq_data_store_service/data_store_service_config.cpp
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-17T11:08:35.322Z
Learnt from: liunyl
PR: eloqdata/store_handler#83
File: data_store_service_client_closure.cpp:408-448
Timestamp: 2025-09-17T11:08:35.322Z
Learning: In the DataStoreServiceClient batch write system, the global coordinator (SyncPutAllData) manages concurrency at the partition level, not at the individual batch level. The global unfinished_request_cnt_ is only decremented when an entire partition is fully completed or failed, not after each individual batch within a partition.

Applied to files:

  • eloq_data_store_service/data_store_service.cpp
🧬 Code graph analysis (3)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)
eloq_data_store_service/data_store_service.cpp (16)
  • DeleteRange (484-530)
  • DeleteRange (484-488)
  • DeleteRange (532-584)
  • DeleteRange (532-538)
  • Read (328-359)
  • Read (328-331)
  • Read (361-394)
  • Read (361-368)
  • ScanNext (812-873)
  • ScanNext (812-825)
  • ScanNext (875-916)
  • ScanNext (875-878)
  • ScanClose (918-959)
  • ScanClose (918-921)
  • ScanClose (961-1000)
  • ScanClose (961-965)
eloq_data_store_service/data_store_service.h (4)
eloq_data_store_service/data_store_service.cpp (10)
  • StartService (236-284)
  • StartService (236-236)
  • ConnectAndStartDataStore (286-326)
  • ConnectAndStartDataStore (286-288)
  • SwitchReadWriteToReadOnly (2144-2178)
  • SwitchReadWriteToReadOnly (2144-2144)
  • SwitchReadOnlyToClosed (2180-2203)
  • SwitchReadOnlyToClosed (2180-2180)
  • SwitchReadOnlyToReadWrite (2205-2225)
  • SwitchReadOnlyToReadWrite (2205-2205)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
data_store_service_client.h (1)
  • GetShardIdByPartitionId (626-629)
eloq_data_store_service/data_store.h (1)
  • DataStore (42-143)
eloq_data_store_service/data_store_service.cpp (4)
eloq_data_store_service/data_store_service_util.h (1)
  • shard_id_ (115-115)
eloq_data_store_service/rocksdb_data_store_common.cpp (5)
  • Read (409-470)
  • Read (409-409)
  • key (823-823)
  • FlushData (276-330)
  • FlushData (276-276)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
eloq_data_store_service/data_store_service.h (4)
  • IsOwnerOfShard (611-616)
  • IncreaseWriteReqCount (593-596)
  • DecreaseWriteReqCount (598-601)
  • GetDataStore (618-628)
🔇 Additional comments (12)
eloq_data_store_service/thread_worker_pool.cpp (1)

79-89: SubmitWork() return contract looks good

Return false when shutdown is active and true on successful enqueue is clear and safe. Callers correctly handle the bool in downstream changes.

eloq_data_store_service/rocksdb_data_store_common.h (1)

153-154: Ctor cleanup looks fine

Dropping ongoing_write_requests_ and related methods aligns with service-level accounting.

eloq_data_store_service/data_store_service.cpp (2)

229-234: Guard against null data_store_ in destructor; avoid double-shutdown

If shard_status_ is not Closed but data_store_ is null, this dereferences null. Also, Shutdown may be called elsewhere (migration/close), so make guard idempotent.

-    if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed)
-    {
-        data_store_->Shutdown();
-        data_store_ = nullptr;
-    }
+    if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed &&
+        data_store_)
+    {
+        data_store_->Shutdown();
+        data_store_ = nullptr;
+    }

A prior review also flagged idempotent Shutdown. Please ensure underlying DataStore::Shutdown() is idempotent as well.


243-261: CreateDataStore(true) performs Initialize() and StartDB()
Verified that all DataStoreFactory implementations (RocksDBDataStoreFactory, RocksDBCloudDataStoreFactory, EloqStoreDataStoreFactory) call ds->Initialize() and, when start_db==true, ds->StartDB(). No further initialization calls are needed.

eloq_data_store_service/data_store_service_util.h (1)

31-41: No remaining references to DSShardStatus::Unknown

Ran rg -nP --type=cpp '\bDSShardStatus\s*::\s*Unknown\b'; no occurrences found.

eloq_data_store_service/main.cpp (1)

279-325: Auto-initialization covered by CreateDataStore
CreateDataStore(bool…, bool start_db=true) and both calls pass true, so the DB is opened/initialized automatically.

eloq_data_store_service/rocksdb_data_store_common.cpp (1)

276-330: Verified counter decrement on enqueue failure
All FlushDataRequest::SetFinish implementations (RPC and local) invoke DecreaseWriteReqCount() before Clear/Free in the SubmitWork failure path, ensuring no write‐request count leak.

eloq_data_store_service/data_store_service.h (5)

603-609: Single-shard stub for partition→shard mapping — document or restore mapping

This always returns 0. If intentional for single-shard, add a clear comment/TODO. Otherwise, call cluster_manager_.GetShardIdByPartitionId.


652-652: Removal of serv_mux_ aligns with new atomic-based concurrency design

LGTM; matches PR goal to remove the lock.


630-633: ConnectAndStartDataStore private usage verified Only internal calls within DataStoreService implementation remain.


190-190: StartService signature change verified main.cpp now passes the new boolean flag and checks the return value to abort startup on failure.


674-674: TTL check worker lifecycle verified TTLWrapperCache starts the worker in its constructor (creates ThreadWorkerPool and submits TTLCheckWorker) and the destructor clears ttl_check_running_, notifies the CV and shuts down the worker pool — lifecycle is covered.

Comment on lines +2144 to 2178
bool DataStoreService::SwitchReadWriteToReadOnly(uint32_t shard_id)
{
if (!cluster_manager_.SwitchShardToReadOnly(shard_id, expected))
if (!IsOwnerOfShard(shard_id))
{
return false;
}

// Wait for all started write requests to finish before opening db on
// target node
std::shared_lock<std::shared_mutex> lk(serv_mux_);
data_store_map_[shard_id]->SwitchToReadOnly();

return true;
DSShardStatus expected = DSShardStatus::ReadWrite;
if (!shard_status_.compare_exchange_strong(expected,
DSShardStatus::ReadOnly) &&
expected != DSShardStatus::ReadOnly)
{
DLOG(ERROR) << "SwitchReadWriteToReadOnly failed, shard status is not "
"ReadWrite or ReadOnly";
return false;
}
// wait for all write requests to finish
while (ongoing_write_requests_.load(std::memory_order_acquire) > 0)
{
bthread_usleep(1000);
}
if (shard_status_.load(std::memory_order_acquire) ==
DSShardStatus::ReadOnly)
{
cluster_manager_.SwitchShardToReadOnly(shard_id, expected);
data_store_->SwitchToReadOnly();
return true;
}
else
{
DLOG(ERROR) << "Switch data store to ReadOnly failed for shard "
"status never be ReadOnly";
return false;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Wait only for writes when switching to ReadOnly; ensure reads drain before closing

SwitchReadWriteToReadOnly drains writes, which is correct. However, SwitchReadOnlyToClosed immediately calls data_store_->Shutdown() without ensuring reads have drained. That risks shutting down the DB while read tasks still hold db_mux_ shared locks.

  • Ensure read requests are tracked (IncreaseReadReqCount/DecreaseReadReqCount) and wait for them to reach zero before Shutdown in SwitchReadOnlyToClosed.
  • Alternatively, make Shutdown acquire an exclusive lock and wait until no readers are active.

Please confirm read counters exist and are wired in read paths:

Also applies to: 2180-2203


🏁 Script executed:

#!/bin/bash
rg -nP --type=cpp -C3 '(IncreaseReadReqCount|DecreaseReadReqCount|ongoing_read_requests_)'

Length of output: 0


🏁 Script executed:

#!/bin/bash
rg -nP --type=cpp -C5 'SwitchReadOnlyToClosed'

Length of output: 3822


Drain active reads before Shutdown in SwitchReadOnlyToClosed
SwitchReadOnlyToClosed (lines 2180–2203) calls data_store_->Shutdown() immediately after marking the shard Closed, without waiting for in‐flight reads to finish. Track active read requests (e.g. IncreaseReadReqCount/DecreaseReadReqCount) and block until the counter is zero—or have Shutdown acquire an exclusive db_mux_ lock to wait for all shared readers to release—before calling Shutdown.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
eloq_data_store_service/data_store_service.h (1)

28-34: Add the missing <atomic> include.

This header uses std::atomic but doesn’t include <atomic>. Depending on transitive includes is brittle and can break compilation as soon as upstream headers change. Please include it explicitly.

 #include <bthread/mutex.h>

 #include <chrono>
+#include <atomic>
 #include <memory>
♻️ Duplicate comments (2)
eloq_data_store_service/data_store_service.cpp (2)

229-233: Verify data_store_->Shutdown() idempotency.

The destructor now calls data_store_->Shutdown() on the single data store instance. As noted in previous review comments, ensure that Shutdown() is safe to call multiple times, as concurrent state transitions (e.g., in SwitchReadOnlyToClosed) may also invoke it.


2194-2217: Drain active reads before Shutdown in SwitchReadOnlyToClosed.

As noted in previous review comments, SwitchReadOnlyToClosed calls data_store_->Shutdown() (line 2214) immediately after transitioning to Closed, without waiting for in-flight read requests to complete. This risks shutting down the database while read tasks still hold db_mux_ shared locks.

Fix: Track active read requests (e.g., ongoing_read_requests_ counter similar to ongoing_write_requests_) and wait for them to drain before calling Shutdown().

🧹 Nitpick comments (4)
eloq_data_store_service/data_store_service_config.cpp (1)

868-868: Consider using an enum-to-string helper for better log readability.

Logging static_cast<int>(shard_status) reduces readability compared to printing the enum name. Consider adding a helper function to convert DSShardStatus to a string for clearer diagnostics.

eloq_data_store_service/data_store_service.cpp (2)

320-325: Consider using condition variable instead of spin-wait.

The busy-wait loop (lines 320-325) polling shard_status_ every 10ms is inefficient. Consider using a condition variable to block until the state transition completes, reducing CPU usage during concurrent initialization attempts.


455-484: Consider RAII guard for write request counting.

Write handlers manually call IncreaseWriteReqCount() and DecreaseWriteReqCount() (lines 455, 460, etc.). Consider using an RAII guard class to ensure the counter is always decremented, even if early returns or exceptions occur:

class WriteReqGuard {
  DataStoreService* svc_;
public:
  explicit WriteReqGuard(DataStoreService* svc) : svc_(svc) { svc_->IncreaseWriteReqCount(); }
  ~WriteReqGuard() { svc_->DecreaseWriteReqCount(); }
};

This pattern prevents counter leaks and simplifies error handling.

eloq_data_store_service/data_store_service_util.h (1)

34-34: Remove commented-out Unknown enumerator
In eloq_data_store_service/data_store_service_util.h, delete the commented lines

    // Node status is unknown, can be dead or alive, need to check
    // Unknown = 0,

—they’re no longer referenced and only clutter the enum definition.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b9e65f7 and 57d1ffc.

📒 Files selected for processing (8)
  • data_store_service_client.cpp (15 hunks)
  • data_store_service_client.h (4 hunks)
  • data_store_service_client_closure.h (34 hunks)
  • eloq_data_store_service/data_store_service.cpp (37 hunks)
  • eloq_data_store_service/data_store_service.h (4 hunks)
  • eloq_data_store_service/data_store_service_config.cpp (4 hunks)
  • eloq_data_store_service/data_store_service_util.h (1 hunks)
  • eloq_data_store_service/ds_request.proto (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
data_store_service_client.cpp (2)
data_store_service_client.h (1)
  • AllDataShardCount (635-647)
eloq_data_store_service/data_store_service_config.cpp (4)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • HandleShardingError (947-989)
  • HandleShardingError (947-948)
eloq_data_store_service/data_store_service.h (4)
eloq_data_store_service/data_store_service.cpp (10)
  • StartService (236-300)
  • StartService (236-236)
  • ConnectAndStartDataStore (302-366)
  • ConnectAndStartDataStore (302-304)
  • SwitchReadWriteToReadOnly (2158-2192)
  • SwitchReadWriteToReadOnly (2158-2158)
  • SwitchReadOnlyToClosed (2194-2217)
  • SwitchReadOnlyToClosed (2194-2194)
  • SwitchReadOnlyToReadWrite (2219-2241)
  • SwitchReadOnlyToReadWrite (2219-2219)
eloq_data_store_service/data_store_service_config.cpp (6)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
data_store_service_client.cpp (2)
  • GetShardIdByPartitionId (2943-2948)
  • GetShardIdByPartitionId (2943-2944)
eloq_data_store_service/data_store.h (1)
  • DataStore (42-143)
data_store_service_client.h (2)
data_store_service_client.cpp (18)
  • FindFreeNodeIndex (3015-3034)
  • FindFreeNodeIndex (3015-3015)
  • IsLocalShard (2918-2926)
  • IsLocalShard (2918-2918)
  • IsLocalPartition (2938-2941)
  • IsLocalPartition (2938-2938)
  • GetShardIdByPartitionId (2943-2948)
  • GetShardIdByPartitionId (2943-2944)
  • AllDataShardCount (2950-2953)
  • AllDataShardCount (2950-2950)
  • GetOwnerNodeIndexOfShard (2955-2960)
  • GetOwnerNodeIndexOfShard (2955-2956)
  • UpdateOwnerNodeIndexOfShard (2962-3013)
  • UpdateOwnerNodeIndexOfShard (2962-2963)
  • HandleShardingError (3036-3070)
  • HandleShardingError (3036-3037)
  • UpgradeShardVersion (3072-3120)
  • UpgradeShardVersion (3072-3075)
eloq_data_store_service/data_store_service_config.cpp (4)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • HandleShardingError (947-989)
  • HandleShardingError (947-948)
eloq_data_store_service/data_store_service.cpp (4)
eloq_data_store_service/data_store_service_util.h (1)
  • shard_id_ (118-118)
eloq_data_store_service/rocksdb_data_store_common.cpp (9)
  • Read (409-470)
  • Read (409-409)
  • key (823-823)
  • FlushData (276-330)
  • FlushData (276-276)
  • CreateTable (634-643)
  • CreateTable (634-634)
  • DropTable (645-724)
  • DropTable (645-645)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
eloq_data_store_service/data_store_service.h (4)
  • IsOwnerOfShard (603-608)
  • IncreaseWriteReqCount (593-596)
  • DecreaseWriteReqCount (598-601)
  • GetDataStore (619-629)
data_store_service_client_closure.h (1)
eloq_data_store_service/internal_request.h (5)
  • partition_id_ (601-601)
  • partition_id_ (807-807)
  • partition_id_ (1400-1400)
  • parts_cnt_per_key_ (309-309)
  • parts_cnt_per_record_ (310-310)
🔇 Additional comments (9)
eloq_data_store_service/data_store_service_util.h (2)

31-31: LGTM: Explicit underlying type improves atomic operations.

The explicit uint8_t underlying type ensures portability and proper sizing for the atomic shard_status_ field used throughout the refactored single-shard model.


41-42: LGTM: Starting state clarifies initialization flow.

The new Starting = 4 state explicitly models the transient phase during data store initialization, improving state tracking in the single-shard model (used in ConnectAndStartDataStore and StartService).

eloq_data_store_service/data_store_service_config.cpp (2)

198-198: LGTM: Returning Closed for unknown shards is semantically correct.

Returning DSShardStatus::Closed when a shard is not found aligns with the removal of the Unknown state and correctly represents that the shard is not owned by this node.


922-922: LGTM: Shard version propagation enables client-side version tracking.

Adding shard_version to both cluster config broadcasts (line 922) and primary-node-changed error messages (lines 936, 943) enables clients to track shard versions for consistency checks.

Note: This implementation depends on the proto field addition at line 82 of ds_request.proto, which currently has a breaking compatibility issue (see comment on that file).

Also applies to: 936-936, 943-943

eloq_data_store_service/data_store_service.cpp (3)

243-277: LGTM: Single-shard initialization with proper atomic state management.

The refactored StartService correctly:

  • Enforces the single-shard-per-node constraint with an assertion (line 244)
  • Uses atomic compare-exchange to transition Closed → Starting → ReadOnly/ReadWrite (lines 255-256, 271)
  • Initializes the data store conditionally based on cluster config status

The assertion at line 244 will catch configuration errors during development/testing.


373-401: LGTM: Consistent ownership and state checking pattern.

The Read handler (and others following the same pattern) correctly:

  1. Resolves partition → shard (line 374)
  2. Checks ownership via IsOwnerOfShard() (line 376)
  3. Checks shard open status via atomic shard_status_ (lines 384-387)
  4. Asserts data_store_ != nullptr before use (line 395)

This layered checking prevents operations on non-owned or closed shards.


1884-1904: LGTM: Migration flow correctly uses refactored state transitions.

The migration logic properly:

  • Waits for data store availability (lines 1884-1892)
  • Transitions ReadWrite → ReadOnly before snapshot transfer (line 1904)
  • Transitions ReadOnly → Closed after target node takes over (line 2031)

This sequencing ensures safe handoff during shard migration.

Also applies to: 2031-2031

data_store_service_client_closure.h (2)

614-614: LGTM: Consistent remote node index tracking across closures.

The addition of remote_node_index_ with:

  • Sentinel value UINT32_MAX initialization (line 614, 876)
  • Reset on local requests (line 642)
  • Public setter SetRemoteNodeIndex() (lines 861-864)

enables proper shard-aware retry logic and aligns with the client-side per-node channel management introduced in this PR.

Also applies to: 861-864, 876-876


680-686: LGTM: Retry logic correctly updates shard ownership.

The refactored retry logic (lines 680-686):

  1. Resolves partition → shard via GetShardIdByPartitionId()
  2. Updates owner node index via UpdateOwnerNodeIndexOfShard()
  3. Retries the request with updated routing

This pattern correctly handles shard ownership changes and is consistently applied across all closure types.

Comment on lines +82 to +94
dss_topology_version_ = cluster_manager.GetTopologyVersion();
auto all_shards = cluster_manager.GetAllShards();
assert(all_shards.size() == 1);
for (auto &[shard_id, shard] : all_shards)
{
uint32_t node_idx = FindFreeNodeIndex();
auto &node_ref = dss_nodes_[node_idx];
node_ref.Reset(shard.nodes_[0].host_name_,
shard.nodes_[0].port_,
shard.version_);
dss_shards_[shard_id].store(shard_id);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix out-of-bounds access on dss_shards_.

dss_shards_ now has size 1, but we still subscript it with shard_id. Any non-zero shard id (perfectly valid even in a single-shard deployment) will index past the end and corrupt memory. Please store the owner slot at index 0 (and assert shard.shard_id_ == 0 if that’s the intended invariant), or switch back to a container keyed by the actual shard id. Keeping the current code is undefined behaviour.

🤖 Prompt for AI Agents
data_store_service_client.h around lines 82-94: the code subscripts dss_shards_
with shard_id while dss_shards_ has fixed size 1, causing out-of-bounds access
for non-zero shard ids. Fix by either (A) if the design guarantees a single
shard, assert that the shard's id is 0 and store the owner into dss_shards_[0]
instead of dss_shards_[shard_id]; or (B) if multiple shard ids must be
supported, replace dss_shards_ with a container keyed by shard_id (e.g., a map
or vector sized to max shard id) and insert/store using the actual shard_id.
Implement one of these two fixes and add an assertion or bounds-check to prevent
future UB.

Comment on lines +619 to +629
DataStore *GetDataStore(uint32_t shard_id)
{
if (shard_id_ == shard_id)
{
return data_store_.get();
}
else
{
return nullptr;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t expose data_store_ once the shard is Closed/Starting.

After the lock removal, callers can grab this raw pointer while the shard is transitioning. If SwitchReadOnlyToClosed wins the CAS and calls Shutdown(), any caller that fetched the pointer just before the status change will continue using a store that is closing. Please gate the return on shard_status_ (Load Acquire) and refuse to hand out the pointer when the status is not ReadOnly/ReadWrite.

     DataStore *GetDataStore(uint32_t shard_id)
     {
-        if (shard_id_ == shard_id)
-        {
-            return data_store_.get();
-        }
-        else
-        {
-            return nullptr;
-        }
+        if (shard_id_ == shard_id)
+        {
+            auto status = shard_status_.load(std::memory_order_acquire);
+            if (status == DSShardStatus::ReadOnly ||
+                status == DSShardStatus::ReadWrite)
+            {
+                return data_store_.get();
+            }
+        }
+        return nullptr;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DataStore *GetDataStore(uint32_t shard_id)
{
if (shard_id_ == shard_id)
{
return data_store_.get();
}
else
{
return nullptr;
}
}
DataStore *GetDataStore(uint32_t shard_id)
{
if (shard_id_ == shard_id)
{
auto status = shard_status_.load(std::memory_order_acquire);
if (status == DSShardStatus::ReadOnly ||
status == DSShardStatus::ReadWrite)
{
return data_store_.get();
}
}
return nullptr;
}
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.h around lines 619 to 629, the
GetDataStore method currently returns data_store_ solely based on shard_id_,
which can hand out the raw pointer while the shard is transitioning; change it
to also load shard_status_ with std::memory_order_acquire and only return
data_store_.get() when shard_status_ is either ReadOnly or ReadWrite (otherwise
return nullptr). Keep the existing shard_id_ check, perform the shard_status_
check after validating the id, and use the atomic load acquire semantics to
prevent racing with SwitchReadOnlyToClosed/Shutdown.

Comment on lines +661 to 673
// Now, there is only one data store shard in a DataStoreService.
// To avoid using mutex in read or write APIs, use a atomic variable
// (shard_status_) to control concurrency conflicts.
// - During migraion, we change the shard_status_ firstly, then change the
// data_store_ after all read/write requests are finished.
// - In write functions, we increase the ongoing_write_requests_ firstly and
// then check the shard_status_. After the request is executed or if
// shard_status_ is not required, decrease them.
uint32_t shard_id_{UINT32_MAX};
std::unique_ptr<DataStore> data_store_{nullptr};
std::atomic<DSShardStatus> shard_status_{DSShardStatus::Closed};
std::atomic<uint64_t> ongoing_write_requests_{0};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

We still need to track in-flight reads before shutdown.

Removing serv_mux_ means read RPCs now run without any mutual exclusion, yet we only count writes. When SwitchReadOnlyToClosed flips the status and calls data_store_->Shutdown(), ongoing reads in ReadOnly mode can still be using that pointer and will race with the shutdown path, violating the “after all read/write requests are finished” guarantee and risking crashes or corrupted responses. We need a symmetric read counter (increment after allowing the read, decrement in the finally path) and SwitchReadOnlyToClosed must wait until both read and write counters reach zero before shutting the store down.

Comment on lines +82 to +83
uint64 shard_version = 4;
DSSClusterConfig new_cluster_config = 5;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Proto field renumbering breaks wire compatibility.

Moving new_cluster_config from field number 4 to 5 and assigning 4 to the new shard_version field breaks backward compatibility with existing clients. In protobuf, field numbers are part of the wire format.

Fix: Assign shard_version a new field number (e.g., 6) without renumbering existing fields:

   DSSNodeBuf new_primary_node = 2;
   uint32 shard_id = 3;
+  DSSClusterConfig new_cluster_config = 4;
+  uint64 shard_version = 5;
-  uint64 shard_version = 4;
-  DSSClusterConfig new_cluster_config = 5;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uint64 shard_version = 4;
DSSClusterConfig new_cluster_config = 5;
DSSNodeBuf new_primary_node = 2;
uint32 shard_id = 3;
DSSClusterConfig new_cluster_config = 4;
uint64 shard_version = 5;
🤖 Prompt for AI Agents
In eloq_data_store_service/ds_request.proto around lines 82 to 83, the proto
field numbers were renumbered which breaks wire compatibility; restore the
original numbers by leaving DSSClusterConfig as field 5 (do not change its
number) and assign shard_version a new unused field number (e.g., 6) instead of
4, then update any server/client code or generated stubs that reference the new
field number to use the new number 6 so existing clients remain compatible.

Copy link
Collaborator

@githubzilla githubzilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

lzxddz and others added 3 commits October 13, 2025 17:01
Change DataStoreService: only contain one data shard in one
DataStoreService.

Create DataStore in DataStoreService instead in eloqsql or eloqkv server.

remove ongoing_write_requests_ from RocksDBDataStoreCommon
* Update cluster config in client

* fix datastore migration error
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
eloq_data_store_service/rocksdb_cloud_data_store.cpp (3)

166-193: Deadlock risk and null-derefs in Shutdown; reorder and guard pointers

Holding db_mux_ unique_lock while calling query_worker_pool_->Shutdown() can deadlock if in‑flight tasks are blocked on db_mux_ shared lock. Also, query_worker_pool_ and data_store_service_ are dereferenced unconditionally.

Apply this fix: drain the pool first, then take the exclusive DB lock; guard all pointers.

 void RocksDBCloudDataStore::Shutdown()
 {
-    std::unique_lock<std::shared_mutex> db_lk(db_mux_);
-
-    // shutdown query worker pool
-    query_worker_pool_->Shutdown();
-    query_worker_pool_ = nullptr;
-
-    data_store_service_->ForceEraseScanIters(shard_id_);
-    data_store_service_ = nullptr;
+    // 1) Stop accepting new work and drain in-flight tasks
+    if (query_worker_pool_ != nullptr)
+    {
+        query_worker_pool_->Shutdown();
+        query_worker_pool_ = nullptr;
+    }
+
+    // 2) Exclusive DB lock after workers are drained to avoid deadlocks
+    std::unique_lock<std::shared_mutex> db_lk(db_mux_);
+
+    if (data_store_service_ != nullptr)
+    {
+        data_store_service_->ForceEraseScanIters(shard_id_);
+        data_store_service_ = nullptr;
+    }
 
     if (db_ != nullptr)
     {
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, db->Close()";
         db_->Close();
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, delete db_";
         delete db_;
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, db_ = nullptr";
         db_ = nullptr;
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, ttl_compaction_filter_ "
                       "= nullptr";
         ttl_compaction_filter_ = nullptr;
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, cloud_env_ = nullptr";
         cloud_env_ = nullptr;
         DLOG(INFO) << "RocksDBCloudDataStore Shutdown, cloud_fs_ = nullptr";
         cloud_fs_ = nullptr;
     }
 }

157-164: Make destructor Shutdown path safe

Shutdown unconditionally dereferences members. Call Shutdown without relying on the loose OR condition, or ensure Shutdown has null-guards (see above). At minimum, guard the call.

-RocksDBCloudDataStore::~RocksDBCloudDataStore()
-{
-    if (query_worker_pool_ != nullptr || data_store_service_ != nullptr ||
-        db_ != nullptr)
-    {
-        Shutdown();
-    }
-}
+RocksDBCloudDataStore::~RocksDBCloudDataStore()
+{
+    Shutdown(); // idempotent with null checks inside
+}

247-251: HTTPS verifySSL is disabled

verifySSL=false undermines TLS. Gate it behind a config flag and keep verification enabled by default.

♻️ Duplicate comments (4)
eloq_data_store_service/ds_request.proto (1)

82-84: Critical: field renumbering breaks wire compatibility (same as prior review)

Keep existing tag numbers; add shard_version on a new tag instead of reusing 4.

 message KeyShardingChanged {
   KeyShardingErrorType type = 1;
   DSSNodeBuf new_primary_node = 2;
   uint32 shard_id = 3;
-  uint64 shard_version = 4;
-  DSSClusterConfig new_cluster_config = 5;
+  DSSClusterConfig new_cluster_config = 4;
+  uint64 shard_version = 5; // or the next free tag
 }
eloq_data_store_service/data_store_service.cpp (2)

2194-2217: Drain active reads before Shutdown, or make Shutdown wait for readers

SwitchReadOnlyToClosed immediately shuts down after marking Closed. Track read requests and wait for zero, or guarantee the datastore Shutdown blocks until all shared readers release (without deadlock).

If read counters exist, increment/decrement around Read/Scan and wait here; otherwise, ensure datastore Shutdown strategy handles reader drain safely.


228-234: Guard data_store_ in destructor

Status may be Closed or Starting; add a null check to avoid potential deref if data_store_ wasn’t created.

-    if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed)
-    {
-        data_store_->Shutdown();
-        data_store_ = nullptr;
-    }
+    if (shard_status_.load(std::memory_order_acquire) != DSShardStatus::Closed &&
+        data_store_ != nullptr)
+    {
+        data_store_->Shutdown();
+        data_store_ = nullptr;
+    }
eloq_data_store_service/data_store_service.h (1)

619-629: Don’t hand out data_store_ during Closed/Starting transitions

Return the raw pointer only when status is ReadOnly/ReadWrite; else return nullptr to avoid races with Shutdown/Start.

Apply this diff:

-        if (shard_id_ == shard_id)
-        {
-            return data_store_.get();
-        }
-        else
-        {
-            return nullptr;
-        }
+        if (shard_id_ == shard_id)
+        {
+            auto s = shard_status_.load(std::memory_order_acquire);
+            if (s == DSShardStatus::ReadOnly || s == DSShardStatus::ReadWrite)
+            {
+                return data_store_.get();
+            }
+        }
+        return nullptr;
🧹 Nitpick comments (7)
eloq_data_store_service/thread_worker_pool.h (1)

42-42: Enforce handling of SubmitWork’s return value

Mark SubmitWork as nodiscard so call sites can’t accidentally ignore the boolean. Safe in C++17+.

-    bool SubmitWork(std::function<void()> work);
+    [[nodiscard]] bool SubmitWork(std::function<void()> work);
eloq_data_store_service/thread_worker_pool.cpp (1)

79-89: LGTM; consider unlocking before notify (minor)

Implementation is correct. Optionally unlock before notify_one() to reduce unnecessary wakeups under the mutex.

-    work_queue_.push_back(std::move(work));
-    work_queue_cv_.notify_one();
+    work_queue_.push_back(std::move(work));
+    lk.unlock();
+    work_queue_cv_.notify_one();
+    return true;
-    return true;
eloq_data_store_service/rocksdb_data_store_common.cpp (1)

1056-1085: Best-effort fallback when pausing background work fails to enqueue

If SubmitWork fails, background jobs remain unpaused. Consider pausing inline as a fallback (holding db_mux_ shared lock) to honor the ReadOnly transition intent.

data_store_service_client_closure.h (1)

861-877: Ensure remote_node_index_ is set before UpdateOwnerNodeIndexOfShard usage

Run paths call UpdateOwnerNodeIndexOfShard using remote_node_index_, but it’s reset to UINT32_MAX in Prepare/Reset. Verify DataStoreServiceClient sets it on each remote attempt before RPC.

Also, several Clear() methods call cntl_.Reset() twice; drop the duplicate.

Also applies to: 1096-1106, 1329-1335, 1545-1555, 1827-1837, 2199-2209, 2456-2466

eloq_data_store_service/data_store_service.cpp (2)

267-272: Call SwitchToReadWrite when open_mode is ReadWrite (symmetry/clarity)

For consistency with the ReadOnly branch, explicitly invoke SwitchToReadWrite when starting in RW.

-        if (open_mode == DSShardStatus::ReadOnly)
-        {
-            data_store_->SwitchToReadOnly();
-        }
+        if (open_mode == DSShardStatus::ReadOnly)
+        {
+            data_store_->SwitchToReadOnly();
+        }
+        else
+        {
+            data_store_->SwitchToReadWrite();
+        }

962-971: Avoid asserts in RPC error paths

Asserts here can crash in production on legitimate states. Return DB_NOT_OPEN without assert.

-    if (shard_status != DSShardStatus::ReadWrite &&
-        shard_status != DSShardStatus::ReadOnly)
-    {
-        assert(false);
+    if (shard_status != DSShardStatus::ReadWrite &&
+        shard_status != DSShardStatus::ReadOnly)
+    {
         brpc::ClosureGuard done_guard(done);
         auto *result = response->mutable_result();
         result->set_error_code(::EloqDS::remote::DataStoreError::DB_NOT_OPEN);
         result->set_error_msg("KV store not opened yet.");
         return;
     }

Also applies to: 998-1005

data_store_service_client.h (1)

671-674: Avoid string copies in Reset

Pass hostname by const reference to prevent copies.

-        void Reset(const std::string hostname,
+        void Reset(const std::string& hostname,
                    uint16_t port,
                    uint64_t shard_version)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 57d1ffc and 52ecce2.

📒 Files selected for processing (15)
  • data_store_service_client.cpp (15 hunks)
  • data_store_service_client.h (4 hunks)
  • data_store_service_client_closure.h (34 hunks)
  • eloq_data_store_service/data_store_service.cpp (37 hunks)
  • eloq_data_store_service/data_store_service.h (4 hunks)
  • eloq_data_store_service/data_store_service_config.cpp (4 hunks)
  • eloq_data_store_service/data_store_service_util.h (1 hunks)
  • eloq_data_store_service/ds_request.proto (1 hunks)
  • eloq_data_store_service/internal_request.h (27 hunks)
  • eloq_data_store_service/main.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.cpp (12 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.h (1 hunks)
  • eloq_data_store_service/thread_worker_pool.cpp (1 hunks)
  • eloq_data_store_service/thread_worker_pool.h (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • eloq_data_store_service/data_store_service_config.cpp
  • eloq_data_store_service/rocksdb_data_store_common.h
  • eloq_data_store_service/data_store_service_util.h
  • eloq_data_store_service/internal_request.h
🧰 Additional context used
🧬 Code graph analysis (6)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)
eloq_data_store_service/data_store_service.cpp (16)
  • DeleteRange (528-574)
  • DeleteRange (528-532)
  • DeleteRange (576-628)
  • DeleteRange (576-582)
  • Read (368-401)
  • Read (368-371)
  • Read (403-438)
  • Read (403-410)
  • ScanNext (856-909)
  • ScanNext (856-869)
  • ScanNext (911-944)
  • ScanNext (911-914)
  • ScanClose (946-980)
  • ScanClose (946-949)
  • ScanClose (982-1014)
  • ScanClose (982-986)
data_store_service_client.cpp (2)
data_store_service_client.h (1)
  • AllDataShardCount (635-647)
eloq_data_store_service/data_store_service_config.cpp (4)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • HandleShardingError (947-989)
  • HandleShardingError (947-948)
eloq_data_store_service/data_store_service.cpp (4)
eloq_data_store_service/data_store_service_util.h (1)
  • shard_id_ (118-118)
data_store_service_client.cpp (2)
  • GetShardIdByPartitionId (2937-2942)
  • GetShardIdByPartitionId (2937-2938)
eloq_data_store_service/data_store_service_config.cpp (6)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
eloq_data_store_service/data_store_service.h (4)
  • IsOwnerOfShard (603-608)
  • IncreaseWriteReqCount (593-596)
  • DecreaseWriteReqCount (598-601)
  • GetDataStore (619-629)
eloq_data_store_service/data_store_service.h (5)
eloq_data_store_service/data_store_service.cpp (10)
  • StartService (236-300)
  • StartService (236-236)
  • ConnectAndStartDataStore (302-366)
  • ConnectAndStartDataStore (302-304)
  • SwitchReadWriteToReadOnly (2158-2192)
  • SwitchReadWriteToReadOnly (2158-2158)
  • SwitchReadOnlyToClosed (2194-2217)
  • SwitchReadOnlyToClosed (2194-2194)
  • SwitchReadOnlyToReadWrite (2219-2241)
  • SwitchReadOnlyToReadWrite (2219-2219)
eloq_data_store_service/data_store_service_util.h (1)
  • shard_id_ (118-118)
eloq_data_store_service/data_store_service_config.cpp (6)
  • IsOwnerOfShard (285-293)
  • IsOwnerOfShard (285-285)
  • IsOwnerOfShard (686-697)
  • IsOwnerOfShard (686-687)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
data_store_service_client.cpp (2)
  • GetShardIdByPartitionId (2937-2942)
  • GetShardIdByPartitionId (2937-2938)
eloq_data_store_service/data_store.h (1)
  • DataStore (42-143)
eloq_data_store_service/thread_worker_pool.h (1)
eloq_data_store_service/thread_worker_pool.cpp (2)
  • SubmitWork (79-89)
  • SubmitWork (79-79)
data_store_service_client.h (2)
data_store_service_client.cpp (18)
  • FindFreeNodeIndex (3009-3028)
  • FindFreeNodeIndex (3009-3009)
  • IsLocalShard (2912-2920)
  • IsLocalShard (2912-2912)
  • IsLocalPartition (2932-2935)
  • IsLocalPartition (2932-2932)
  • GetShardIdByPartitionId (2937-2942)
  • GetShardIdByPartitionId (2937-2938)
  • AllDataShardCount (2944-2947)
  • AllDataShardCount (2944-2944)
  • GetOwnerNodeIndexOfShard (2949-2954)
  • GetOwnerNodeIndexOfShard (2949-2950)
  • UpdateOwnerNodeIndexOfShard (2956-3007)
  • UpdateOwnerNodeIndexOfShard (2956-2957)
  • HandleShardingError (3030-3064)
  • HandleShardingError (3030-3031)
  • UpgradeShardVersion (3066-3114)
  • UpgradeShardVersion (3066-3069)
eloq_data_store_service/data_store_service_config.cpp (4)
  • GetShardIdByPartitionId (798-803)
  • GetShardIdByPartitionId (798-799)
  • HandleShardingError (947-989)
  • HandleShardingError (947-948)
🔇 Additional comments (4)
eloq_data_store_service/main.cpp (1)

322-325: LGTM; startup flag propagation is clear

Passing create_db_if_missing as (bootstrap || single-node) aligns with the new API.

eloq_data_store_service/rocksdb_data_store_common.cpp (1)

276-330: Good: handles SubmitWork failure and cleans up request

The res path sets DB_NOT_OPEN and returns the object to the pool. Pattern looks consistent.

data_store_service_client.cpp (2)

2956-3007: Avoid OOB and incorrect mapping in owner-index updates

Route through slot 0 and assert shard id; otherwise UB.

-    new_node_index = dss_shards_[shard_id].load(std::memory_order_acquire);
+    assert(shard_id == single_shard_id_);
+    new_node_index = dss_shards_[0].load(std::memory_order_acquire);
     if (new_node_index != old_node_index)
     {
         return true;
     }
@@
-        if (dss_shards_[shard_id].compare_exchange_strong(old_node_index,
-                                                          free_index))
+        if (dss_shards_[0].compare_exchange_strong(old_node_index,
+                                                   free_index))
         {
             new_node_index = free_index;
             return true;
         }
@@
-            DLOG(INFO) << "Other thread updated the data shard, shard_id:"
-                       << shard_id;
+            DLOG(INFO) << "Other thread updated the data shard, shard_id:" << shard_id;

Likely an incorrect or invalid review comment.


2949-2954: Avoid OOB: index slot 0 and validate shard id

Guard shard id and read dss_shards_[0].

-    assert(dss_shards_[shard_id].load(std::memory_order_acquire) != UINT32_MAX);
-    return dss_shards_[shard_id].load(std::memory_order_acquire);
+    assert(shard_id == single_shard_id_);
+    return dss_shards_[0].load(std::memory_order_acquire);

Likely an incorrect or invalid review comment.

Comment on lines +105 to 134
assert(cluster_manager.GetShardCount() == 1);
auto current_version =
dss_topology_version_.load(std::memory_order_acquire);
auto new_version = cluster_manager.GetTopologyVersion();
if (current_version <= cluster_manager.GetTopologyVersion() &&
dss_topology_version_.compare_exchange_strong(current_version,
new_version))
{
for (const auto &node : group.nodes_)
for (const auto &[_, group] : cluster_manager.GetAllShards())
{
LOG(INFO) << "Node Hostname: " << node.host_name_
<< ", Port: " << node.port_;
for (const auto &node : group.nodes_)
{
LOG(INFO) << "Node Hostname: " << node.host_name_
<< ", Port: " << node.port_;
}
// The first node is the owner of shard.
assert(group.nodes_.size() > 0);
while (!UpgradeShardVersion(group.shard_id_,
group.version_,
group.nodes_[0].host_name_,
group.nodes_[0].port_))
{
LOG(INFO) << "UpgradeShardVersion failed, retry";
bthread_usleep(1000000);
}
LOG(INFO) << "UpgradeShardVersion success, shard_id:"
<< group.shard_id_ << ", version:" << group.version_;
}
}
cluster_manager_ = cluster_manager;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Setup config: bind to actual single shard id and store owner index at slot 0

Use single_shard_id_ and store node_idx in dss_shards_[0]. Current code will crash/UB if the shard id isn’t 0.

-    assert(cluster_manager.GetShardCount() == 1);
+    assert(cluster_manager.GetShardCount() == 1);
     auto current_version =
         dss_topology_version_.load(std::memory_order_acquire);
     auto new_version = cluster_manager.GetTopologyVersion();
     if (current_version <= cluster_manager.GetTopologyVersion() &&
         dss_topology_version_.compare_exchange_strong(current_version,
                                                       new_version))
     {
-        for (const auto &[_, group] : cluster_manager.GetAllShards())
+        for (const auto &[_, group] : cluster_manager.GetAllShards())
         {
             for (const auto &node : group.nodes_)
             {
                 LOG(INFO) << "Node Hostname: " << node.host_name_
                           << ", Port: " << node.port_;
             }
             // The first node is the owner of shard.
             assert(group.nodes_.size() > 0);
+            // Remember the actual shard id we operate on
+            single_shard_id_ = group.shard_id_;
             while (!UpgradeShardVersion(group.shard_id_,
                                         group.version_,
                                         group.nodes_[0].host_name_,
                                         group.nodes_[0].port()))
             {
                 LOG(INFO) << "UpgradeShardVersion failed, retry";
                 bthread_usleep(1000000);
             }
+            // Initialize owner index at slot 0 if still unset
+            uint32_t node_idx = GetOwnerNodeIndexOfShard(group.shard_id_);
+            (void)node_idx; // ensure mapping exists
             LOG(INFO) << "UpgradeShardVersion success, shard_id:"
                       << group.shard_id_ << ", version:" << group.version_;
         }
     }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 105-134, the code assumes the
single shard's id is 0 and directly indexes dss_shards_[group.shard_id_] which
causes UB if shard id != 0; update the logic to bind to single_shard_id_ (assert
cluster_manager.GetShardCount() == 1), set single_shard_id_ = group.shard_id_,
and write the owner node index into dss_shards_[0] (store node_idx = 0 as the
owner slot) instead of indexing by group.shard_id_; ensure subsequent calls
(UpgradeShardVersion) use single_shard_id_ and access dss_shards_[0] for owner
info so the implementation no longer assumes shard id==0.

Comment on lines 2758 to 2765
create_snapshot_for_backup_closure_pool_.NextObject();
auto shards = cluster_manager_.GetAllShards();
uint32_t shard_cnt = AllDataShardCount();
std::vector<uint32_t> shard_ids;
shard_ids.reserve(shards.size());
for (auto &[s_id, _] : shards)
shard_ids.reserve(shard_cnt);
for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
{
shard_ids.push_back(s_id);
shard_ids.push_back(shard_id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Build shard-id list from the actual shard id, not 0..count-1

Loops over 0..count-1 are wrong if the only shard id ≠ 0. Use single_shard_id_.

-    uint32_t shard_cnt = AllDataShardCount();
-    std::vector<uint32_t> shard_ids;
-    shard_ids.reserve(shard_cnt);
-    for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
-    {
-        shard_ids.push_back(shard_id);
-    }
+    std::vector<uint32_t> shard_ids = { single_shard_id_ };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
create_snapshot_for_backup_closure_pool_.NextObject();
auto shards = cluster_manager_.GetAllShards();
uint32_t shard_cnt = AllDataShardCount();
std::vector<uint32_t> shard_ids;
shard_ids.reserve(shards.size());
for (auto &[s_id, _] : shards)
shard_ids.reserve(shard_cnt);
for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
{
shard_ids.push_back(s_id);
shard_ids.push_back(shard_id);
}
create_snapshot_for_backup_closure_pool_.NextObject();
std::vector<uint32_t> shard_ids = { single_shard_id_ };
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2758 to 2765, the code builds
shard_ids by iterating 0..shard_cnt-1 which is incorrect when the service uses a
single non-zero shard id; replace the loop with logic that uses the actual shard
id(s) (e.g., push_back(single_shard_id_) when running in single-shard mode) so
shard_ids contains the real shard identifier(s) instead of 0..count-1; ensure
shard_cnt and single_shard_id_ are used consistently and remove the incorrect
for-loop.

Comment on lines +2937 to +2942
uint32_t DataStoreServiceClient::GetShardIdByPartitionId(
int32_t partition_id) const
{
// Now, only support one shard.
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return the actual single shard id

Hardcoding 0 breaks when the singleton shard id differs.

-    // Now, only support one shard.
-    return 0;
+    // Single-shard client: always map partitions to the actual lone shard id
+    return single_shard_id_;
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 2937-2942, the
GetShardIdByPartitionId implementation returns a hardcoded 0; replace this with
logic that returns the actual singleton shard id (read from the service/client
configuration or metadata source used elsewhere in the class — e.g., a member
variable, config manager, or cluster metadata/shard manager API), and add a safe
fallback (log/error or assert) if that metadata cannot be obtained instead of
returning 0.

Comment on lines +3066 to 3114
bool DataStoreServiceClient::UpgradeShardVersion(uint32_t shard_id,
uint64_t shard_version,
const std::string &host_name,
uint16_t port)
{
if (shard_id >= dss_shards_.size())
{
assert(false);
// Now only support one shard.
LOG(FATAL) << "Shard id not found, shard_id: " << shard_id;
return true;
}

uint32_t node_index = dss_shards_[shard_id].load(std::memory_order_acquire);
auto &node_ref = dss_nodes_[node_index];
if (node_ref.ShardVersion() < shard_version)
{
uint64_t expect_val = 0;
uint64_t current_ts =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
if (!node_ref.expired_ts_.compare_exchange_strong(expect_val,
current_ts))
{
// Other thread is updating the shard, retry.
DLOG(INFO) << "Other thread is updating the data shard, shard_id: "
<< shard_id;
return false;
}

uint32_t free_node_index = FindFreeNodeIndex();
if (free_node_index == dss_nodes_.size())
{
DLOG(INFO) << "Find free node index failed";
node_ref.expired_ts_.store(expect_val, std::memory_order_release);
return false;
}
auto &free_node_ref = dss_nodes_[free_node_index];
free_node_ref.Reset(host_name, port, shard_version);
if (!dss_shards_[shard_id].compare_exchange_strong(node_index,
free_node_index))
{
assert(false);
free_node_ref.expired_ts_.store(1, std::memory_order_release);
}
}
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Upgrade shard version: handle arbitrary shard id and update slot 0

Do not fatal if shard_id != 0; validate against single_shard_id_ and use slot 0.

-    if (shard_id >= dss_shards_.size())
-    {
-        assert(false);
-        // Now only support one shard.
-        LOG(FATAL) << "Shard id not found, shard_id: " << shard_id;
-        return true;
-    }
+    // Single-shard mode: ensure shard_id matches the configured lone shard
+    assert(shard_id == single_shard_id_);
 
-    uint32_t node_index = dss_shards_[shard_id].load(std::memory_order_acquire);
+    uint32_t node_index = dss_shards_[0].load(std::memory_order_acquire);
@@
-        if (!dss_shards_[shard_id].compare_exchange_strong(node_index,
-                                                           free_node_index))
+        if (!dss_shards_[0].compare_exchange_strong(node_index,
+                                                    free_node_index))
         {
             assert(false);
             free_node_ref.expired_ts_.store(1, std::memory_order_release);
         }

Comment on lines +3282 to 3288
uint32_t shard_cnt = AllDataShardCount();
std::vector<uint32_t> shard_ids;
shard_ids.reserve(shards.size());
for (auto &[s_id, _] : shards)
shard_ids.reserve(shard_cnt);
for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
{
shard_ids.push_back(s_id);
shard_ids.push_back(shard_id);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

FlushData: use the actual shard id

Same shard-id enumeration issue as above.

-    uint32_t shard_cnt = AllDataShardCount();
-    std::vector<uint32_t> shard_ids;
-    shard_ids.reserve(shard_cnt);
-    for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
-    {
-        shard_ids.push_back(shard_id);
-    }
+    std::vector<uint32_t> shard_ids = { single_shard_id_ };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
uint32_t shard_cnt = AllDataShardCount();
std::vector<uint32_t> shard_ids;
shard_ids.reserve(shards.size());
for (auto &[s_id, _] : shards)
shard_ids.reserve(shard_cnt);
for (uint32_t shard_id = 0; shard_id < shard_cnt; shard_id++)
{
shard_ids.push_back(s_id);
shard_ids.push_back(shard_id);
}
std::vector<uint32_t> shard_ids = { single_shard_id_ };
🤖 Prompt for AI Agents
In data_store_service_client.cpp around lines 3282 to 3288, the loop currently
builds shard_ids by assuming shard IDs are 0..shard_cnt-1; instead fetch and use
the actual shard identifiers (e.g., call the function that returns the
list/vector of shard IDs or iterate the container of shards) and push those real
IDs into shard_ids so FlushData operates on the correct shard IDs.

Comment on lines +74 to +94
: catalog_factory_array_{catalog_factory[0],
catalog_factory[1],
catalog_factory[2],
&range_catalog_factory_,
&hash_catalog_factory_},
cluster_manager_(cluster_manager),
data_store_service_(data_store_service),
flying_remote_fetch_count_(0)
data_store_service_(data_store_service)
{
// Init dss cluster config.
dss_topology_version_ = cluster_manager.GetTopologyVersion();
auto all_shards = cluster_manager.GetAllShards();
assert(all_shards.size() == 1);
for (auto &[shard_id, shard] : all_shards)
{
uint32_t node_idx = FindFreeNodeIndex();
auto &node_ref = dss_nodes_[node_idx];
node_ref.Reset(shard.nodes_[0].host_name_,
shard.nodes_[0].port_,
shard.version_);
dss_shards_[shard_id].store(shard_id);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix OOB and wrong value stored in dss_shards_; remember actual shard id

dss_shards_ has size 1, but you subscript with shard_id and store shard_id (not node index). This is UB and incorrect mapping when the actual lone shard id ≠ 0.

Apply this diff to the ctor body:

-        for (auto &[shard_id, shard] : all_shards)
+        for (auto &[shard_id, shard] : all_shards)
         {
             uint32_t node_idx = FindFreeNodeIndex();
             auto &node_ref = dss_nodes_[node_idx];
             node_ref.Reset(shard.nodes_[0].host_name_,
                            shard.nodes_[0].port_,
                            shard.version_);
-            dss_shards_[shard_id].store(shard_id);
+            // Remember the only shard id, and store owner node index at slot 0
+            single_shard_id_ = shard_id;
+            dss_shards_[0].store(node_idx, std::memory_order_release);
         }

Declare a field to hold the actual shard id (outside this hunk, in the private section):

uint32_t single_shard_id_{0};

Follow-up: update client.cpp call sites to (a) assert the passed shard_id equals single_shard_id_, and (b) read/write dss_shards_[0]. See suggested diffs in client.cpp comments.

🤖 Prompt for AI Agents
In data_store_service_client.h around lines 74-94, the ctor indexes dss_shards_
with the actual shard_id (which can be nonzero) causing OOB and stores the
shard_id instead of recording the actual shard id; declare a new private field
uint32_t single_shard_id_ and in this ctor set single_shard_id_ = shard_id for
the lone shard, then always read/write dss_shards_[0] (not
dss_shards_[shard_id]) and store the node index/slot or relevant node data
there; additionally update client.cpp call sites to assert that any passed
shard_id equals single_shard_id_ and change accesses to use dss_shards_[0] for
reads/writes.

Comment on lines +713 to +719
std::array<DssNode, 1024> dss_nodes_;
const uint64_t NodeExpiredTime = 10 * 1000 * 1000; // 10s
// Now only support one shard. dss_shards_ caches the index in dss_nodes_ of
// shard owner.
std::array<std::atomic<uint32_t>, 1> dss_shards_;
std::atomic<uint64_t> dss_topology_version_{0};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Single-shard modeling: clarify and constrain API

With std::array<std::atomic<uint32_t>, 1> dss_shards_, all getters/setters should use index 0 and validate shard_id == single_shard_id_. Current API suggests arbitrary shard ids and leads to UB elsewhere.

If multi-shard isn't supported yet:

  • Add asserts in getters/setters that shard_id == single_shard_id_.
  • Route all reads/writes to dss_shards_[0].
  • Expose single_shard_id_ to build correct shard_id vectors (don’t synthesize 0..N-1). See client.cpp diffs.

Comment on lines 573 to 580
DSShardStatus FetchDSShardStatus(uint32_t shard_id)
{
return cluster_manager_.FetchDSShardStatus(shard_id);
if (shard_id_ == shard_id)
{
return shard_status_;
}
return DSShardStatus::Closed;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Atomic returned without .load(); enforce acquire load

Returning shard_status_ directly is invalid and bypasses needed ordering. Load it with acquire.

Apply this diff:

-        if (shard_id_ == shard_id)
-        {
-            return shard_status_;
-        }
+        if (shard_id_ == shard_id)
+        {
+            return shard_status_.load(std::memory_order_acquire);
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
DSShardStatus FetchDSShardStatus(uint32_t shard_id)
{
return cluster_manager_.FetchDSShardStatus(shard_id);
if (shard_id_ == shard_id)
{
return shard_status_;
}
return DSShardStatus::Closed;
}
DSShardStatus FetchDSShardStatus(uint32_t shard_id)
{
if (shard_id_ == shard_id)
{
return shard_status_.load(std::memory_order_acquire);
}
return DSShardStatus::Closed;
}
🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.h around lines 573 to 580, the
function returns the atomic shard_status_ directly which bypasses ordering;
change the return to load the atomic with acquire semantics (e.g., return
shard_status_.load(std::memory_order_acquire);) when shard_id_ != shard_id, and
when returning the current status for a matching shard_id also use
shard_status_.load(std::memory_order_acquire) instead of returning the atomic
itself; ensure <atomic> is available and include the memory_order reference.

Comment on lines +593 to +601
void IncreaseWriteReqCount()
{
ongoing_write_requests_.fetch_add(1, std::memory_order_release);
}

void DecreaseWriteReqCount()
{
ongoing_write_requests_.fetch_sub(1, std::memory_order_release);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing in‑flight read tracking; refine memory ordering of counters

Only writes are counted. Reads can still be in-flight when SwitchReadOnlyToClosed calls Shutdown(), risking UAF/corruption. Add symmetric read counters and wait for both to reach zero before shutdown. Also, use acq_rel for inc/dec, acquire for wait loads.

Header additions (outside current hunk):

// In class DataStoreService private section
std::atomic<uint64_t> ongoing_read_requests_{0};

// In public section
void IncreaseReadReqCount() {
  ongoing_read_requests_.fetch_add(1, std::memory_order_acq_rel);
}
void DecreaseReadReqCount() {
  ongoing_read_requests_.fetch_sub(1, std::memory_order_acq_rel);
}

Tighten write counters:

-ongoing_write_requests_.fetch_add(1, std::memory_order_release);
+ongoing_write_requests_.fetch_add(1, std::memory_order_acq_rel);
...
-ongoing_write_requests_.fetch_sub(1, std::memory_order_release);
+ongoing_write_requests_.fetch_sub(1, std::memory_order_acq_rel);

And in eloq_data_store_service/data_store_service.cpp (shutdown paths), wait for both counters, e.g.:

 bool DataStoreService::SwitchReadOnlyToClosed(uint32_t shard_id)
 {
   ...
-  if (expected == DSShardStatus::ReadOnly)
-  {
-      cluster_manager_.SwitchShardToClosed(shard_id, expected);
-      data_store_->Shutdown();
-  }
+  if (expected == DSShardStatus::ReadOnly)
+  {
+      // wait for all in-flight ops to drain
+      while (ongoing_write_requests_.load(std::memory_order_acquire) > 0 ||
+             ongoing_read_requests_.load(std::memory_order_acquire) > 0)
+      {
+          bthread_usleep(1000);
+      }
+      cluster_manager_.SwitchShardToClosed(shard_id, expected);
+      data_store_->Shutdown();
+  }
   return true;
 }

Instrument read entry/exit in server read paths (e.g., before calling into store, increment; on completion/defer, decrement). Based on learnings.

🤖 Prompt for AI Agents
In eloq_data_store_service/data_store_service.h around lines 593-601, the
current code only tracks ongoing_write_requests_ with memory_order_release on
fetch_add/fetch_sub; add a symmetric std::atomic<uint64_t>
ongoing_read_requests_{0} in the private section, add public
IncreaseReadReqCount/DecreaseReadReqCount methods and change
IncreaseWriteReqCount/DecreaseWriteReqCount to use std::memory_order_acq_rel for
both fetch_add and fetch_sub; in shutdown paths in
eloq_data_store_service/data_store_service.cpp change any wait logic to loop by
loading both counters with std::memory_order_acquire and wait until both are
zero before calling Shutdown(); finally instrument server read entry/exit points
(before calling into the store increment IncreaseReadReqCount, and on
completion/defer call DecreaseReadReqCount) so reads are tracked symmetrically
with writes.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)

1065-1094: Critical: Deadlock when SubmitWork fails.

If SubmitWork returns false (line 1081), the method logs an error and returns early (lines 1082-1086). However, the condition variable setup on lines 1059-1061 remains, and the main thread proceeds to wait on line 1090-1093. Since the lambda never executes, done is never set to true and cond_var.notify_one() is never called, causing a permanent deadlock.

Apply this diff to signal the condition variable before returning on failure:

     bool res = query_worker_pool_->SubmitWork(
         [this, &mutex, &cond_var, &done]()
         {
             // Run pause background work in a separate thread to avoid blocking
             // bthread worker threads
             std::shared_lock<std::shared_mutex> db_lk(db_mux_);
             auto db = GetDBPtr();
             if (db != nullptr)
             {
                 db->PauseBackgroundWork();
             }
 
             std::unique_lock<bthread::Mutex> lk(mutex);
             done = true;
             cond_var.notify_one();
         });
     if (!res)
     {
         LOG(ERROR)
             << "Failed to submit switch to read only work to query worker pool";
+        std::unique_lock<bthread::Mutex> lk(mutex);
+        done = true;
+        cond_var.notify_one();
         return;
     }
🧹 Nitpick comments (1)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)

318-329: Consider extracting common error handling pattern.

The submission failure handling code is duplicated across seven methods. Consider extracting this into a helper method to improve maintainability.

Example helper method:

template<typename RequestType>
void HandleSubmitWorkFailure(RequestType* request, const char* operation_name) {
    LOG(ERROR) << "Failed to submit " << operation_name << " work to query worker pool";
    request->SetFinish(::EloqDS::remote::DataStoreError::DB_NOT_OPEN);
    request->Clear();
    request->Free();
}

Then replace each error handling block with:

if (!res)
{
    HandleSubmitWorkFailure(flush_data_req, "flush data");
    return;
}

Also applies to: 396-406, 462-469, 622-633, 714-725, 1015-1022, 1042-1048

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 52ecce2 and 45f7dca.

📒 Files selected for processing (2)
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp (2 hunks)
  • eloq_data_store_service/rocksdb_data_store_common.cpp (12 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • eloq_data_store_service/rocksdb_cloud_data_store.cpp
🧰 Additional context used
🧬 Code graph analysis (1)
eloq_data_store_service/rocksdb_data_store_common.cpp (1)
eloq_data_store_service/data_store_service.cpp (16)
  • DeleteRange (528-574)
  • DeleteRange (528-532)
  • DeleteRange (576-628)
  • DeleteRange (576-582)
  • Read (368-401)
  • Read (368-371)
  • Read (403-438)
  • Read (403-410)
  • ScanNext (856-909)
  • ScanNext (856-869)
  • ScanNext (911-944)
  • ScanNext (911-914)
  • ScanClose (946-980)
  • ScanClose (946-949)
  • ScanClose (982-1014)
  • ScanClose (982-986)
🔇 Additional comments (7)
eloq_data_store_service/rocksdb_data_store_common.cpp (7)

278-329: LGTM: Proper submission failure handling.

The error handling for SubmitWork failure is correctly implemented. On failure, the code logs an error, returns DB_NOT_OPEN to the caller, and properly cleans up resources.


334-406: LGTM: Consistent error handling pattern.

The submission failure handling follows the same correct pattern as FlushData.


411-469: LGTM: Proper error handling.

The Read method correctly handles submission failures with appropriate cleanup.


472-634: LGTM: Handles both early return and submission failure correctly.

The method properly handles the edge case where RecordsCount() is zero, and follows the standard error handling pattern for submission failures.


649-725: LGTM: Consistent error handling.

The DropTable method follows the established pattern for handling submission failures.


730-1022: LGTM: Proper submission failure handling.

The ScanNext method correctly implements the submission failure handling pattern.


1027-1048: LGTM: Consistent error handling.

The ScanClose method follows the standard pattern for handling submission failures.

@lzxddz lzxddz merged commit a3fdbbc into main Oct 14, 2025
1 check passed
@lzxddz lzxddz deleted the remove-mutex branch October 14, 2025 07:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Remove mutex from DataStoreService

3 participants