Skip to content

feat: adapt to eloqstore local standby feature#424

Merged
thweetkomputer merged 11 commits intomainfrom
feat-eloqstore-local-standby-zc
Mar 17, 2026
Merged

feat: adapt to eloqstore local standby feature#424
thweetkomputer merged 11 commits intomainfrom
feat-eloqstore-local-standby-zc

Conversation

@thweetkomputer
Copy link
Collaborator

@thweetkomputer thweetkomputer commented Mar 4, 2026

Here are some reminders before you submit the pull request

  • Add tests for the change
  • Document changes
  • Reference the link of issue using fixes eloqdb/tx_service#issue_id
  • Reference the link of RFC if exists
  • Pass ./mtr --suite=mono_main,mono_multi,mono_basic

Summary by CodeRabbit

  • New Features
    • Standby snapshot lifecycle: create, request sync, sync status, and delete with coordinated acknowledgements and archive cleanup.
    • Support for weighted storage path lists and automatic cloud credential retrieval.
  • Chores
    • Per-shard write concurrency controls and clarified DirectIO buffer pool sizing documentation.

@coderabbitai
Copy link

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds comprehensive EloqStore standby support: new standby snapshot APIs, reload/sync flows, task queue and RPC coordination, config updates, build inclusion of standby_service.cpp, and multiple interface and implementation changes across data-store and transaction-service layers.

Changes

Cohort / File(s) Summary
Build & Config
store_handler/eloq_data_store_service/build_eloq_store.cmake, store_handler/eloq_data_store_service/eloq_store_config.cpp, store_handler/eloq_data_store_service/eloq_store_config.h
Add src/standby_service.cpp to ELOQ_STORE_SOURCES; update eloq_store flags (data_path_list format with weights, per-shard max write concurrency wording, DirectIO buffer pool description); add eloq_store_cloud_auto_credentials; remove ParseStoragePath declaration.
Core DataStore Interfaces
store_handler/eloq_data_store_service/data_store.h, store_handler/eloq_data_store_service/rocksdb_data_store_common.h
Introduce virtual APIs for standby: DeleteStandbySnapshot, ReloadData(term,snapshot_ts), UpdateStandbyMasterStorePaths, UpdateStandbyMasterAddr; replace ReloadDataFromCloud with generalized ReloadData (bool).
EloqStore Implementation
store_handler/eloq_data_store_service/eloq_store_data_store.h, .../eloq_store_data_store.cpp, .../eloq_store_data_store_factory.h
Add ArchiveEntry struct and methods: CreateSnapshotForStandby, DeleteStandbySnapshot, ListArchiveTags, ReloadData, UpdateStandbyMasterStorePaths/Addr; factory setters and enable_local_standby setter added.
DataStore Service & Client
store_handler/eloq_data_store_service/data_store_service.h, .../data_store_service.cpp, store_handler/data_store_service_client.h, .../data_store_service_client.cpp
Expose standby APIs (SetStandbySnapshotPayload, ClearStandby..., Create/Delete snapshots, ReloadData, CurrentStandbySnapshotTs, archive tracking); update OnSnapshotReceived signature; per-shard atomic latest_snapshot_ts/latest_delete_archive_ts; leader/follower hooks set enable_local_standby.
RocksDB Handler & Common
store_handler/rocksdb_handler.h, store_handler/rocksdb_handler.cpp, store_handler/eloq_data_store_service/rocksdb_data_store_common.h
Extend CreateSnapshotForStandby to return snapshot_ts via out param; extend SendSnapshotToRemote with standby_node_id; update rocksdb-common ReloadData signature.
TX Service RPCs & Interfaces
tx_service/include/proto/cc_request.proto, tx_service/include/remote/cc_node_service.h, tx_service/include/store/data_store_handler.h
Add RequestSyncSnapshot RPC and messages; add snapshot_path/current_ckpt_ts fields; add CcNodeService RequestSyncSnapshot, standby task worker infrastructure; extend DataStoreHandler virtuals for RequestSyncSnapshot, DeleteStandbySnapshot(s), CurrentStandbySnapshotTs, SetStandbySnapshotPayload; adjust signatures to include snapshot_ts/standby_node_id.
Snapshot Management & Standby Coordination
tx_service/src/store/snapshot_manager.cpp, tx_service/src/standby.cpp, tx_service/src/data_sync_task.cpp, tx_service/src/checkpointer.cpp, tx_service/src/fault/cc_node.cpp, tx_service/src/remote/cc_node_service.cpp
Add async aggregation contexts, per-RPC contexts and completion closures for broadcasted standby RPCs; SnapshotManager/Standby flows now create standby snapshots with snapshot_ts, dispatch RequestSyncSnapshot to standbys, wait for acknowledgements (min_ack_ckpt_ts), and delete old standby archives; add standby task queue/worker and enqueue semantics.
Misc & Submodule
store_handler/eloq_data_store_service/eloqstore
Bumped eloqstore submodule pointer (commit update only).
Other minor
tx_service/include/cc/catalog_cc_map.h
Add diagnostic logging on unexpected DDL error codes.

Sequence Diagram(s)

sequenceDiagram
    participant Leader as Leader Node
    participant TxSvc as TX Service
    participant StandbyTask as Standby Task Queue
    participant DataStore as DataStoreService
    participant EloqStore as EloqStore

    Leader->>TxSvc: OnLeaderStart()
    TxSvc->>DataStore: SetEnableLocalStandby(true)
    DataStore->>EloqStore: Configure standby mode

    TxSvc->>TxSvc: Create checkpoint snapshot
    TxSvc->>DataStore: CreateSnapshotForStandby(ng_id, snapshot_ts)
    DataStore->>EloqStore: Create snapshot tag

    TxSvc->>StandbyTask: EnqueueStandbyTask(sync_fn)
    StandbyTask->>DataStore: RequestSyncSnapshot(ng_id, term, snapshot_ts)
    DataStore->>StandbyTask: Dispatch RequestSyncSnapshot RPCs to standby nodes
    StandbyTask->>DataStore: Collect min_ack_ckpt_ts
    DataStore->>EloqStore: DeleteStandbySnapshotsBefore(min_ack_ckpt_ts)
Loading
sequenceDiagram
    participant Standby as Standby Node
    participant RpcSvc as CcNodeService
    participant StandbyWorker as Standby Task Worker
    participant DataStore as DataStoreService

    Standby->>RpcSvc: RequestSyncSnapshot(snapshot_ts)
    RpcSvc->>StandbyWorker: EnqueueStandbyTask(reload_fn)
    StandbyWorker->>DataStore: ReloadData(term, snapshot_ts)
    DataStore->>DataStore: Reload from master paths
    StandbyWorker->>RpcSvc: Task complete
    RpcSvc->>Standby: RequestSyncSnapshotResponse(current_ckpt_ts)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • lzxddz
  • liunyl

Poem

🐰
I hopped through snapshots, tags in tow,
Enqueued the tasks, set sync to go,
Archives trimmed where soft winds blow,
Standby hums in a steady flow,
EloqStore sings—replicas grow.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is incomplete and only contains the template checklist without any actual implementation details, context, testing information, or issue/RFC references. Fill in the description with: (1) summary of changes made, (2) explanation of the local standby feature, (3) testing status, (4) issue/RFC references, (5) confirmation of passing test suite.
Docstring Coverage ⚠️ Warning Docstring coverage is 10.28% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: adapt to eloqstore local standby feature' clearly and concisely describes the main change—adding support for EloqStore's local standby feature across multiple files.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat-eloqstore-local-standby-zc
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@store_handler/eloq_data_store_service/eloq_store_config.cpp`:
- Around line 294-299: ParseUint64List currently calls std::stoull(token) after
checking is_number(token) but std::stoull can still throw for out-of-range
values; modify ParseUint64List to handle oversized numeric tokens explicitly by
either validating the token against numeric limits before conversion or wrapping
the std::stoull call in a try/catch that catches std::out_of_range and logs the
same LOG(FATAL) error ("Invalid uint64 list entry: " + token) before failing;
update the code paths around is_number, std::stoull, LOG(FATAL), and
values.emplace_back to ensure no exceptions escape and out-of-range tokens
produce the explicit fatal log.

ℹ️ Review info
Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5bb1989a-771d-4431-8924-152f038b2287

📥 Commits

Reviewing files that changed from the base of the PR and between 225ad67 and 5763f59.

📒 Files selected for processing (3)
  • store_handler/eloq_data_store_service/build_eloq_store.cmake
  • store_handler/eloq_data_store_service/eloq_store_config.cpp
  • store_handler/eloq_data_store_service/eloqstore

@thweetkomputer thweetkomputer force-pushed the feat-eloqstore-local-standby-zc branch from 5763f59 to ffb191a Compare March 5, 2026 06:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
tx_service/src/store/snapshot_manager.cpp (1)

280-330: ⚠️ Potential issue | 🟠 Major

Don't delete the standby snapshot unless OnSnapshotSynced actually succeeded.

all_notify_succeeded only flips to false when the send step fails. A null channel or five failed OnSnapshotSynced attempts still leave it true, so Line 385 can delete the snapshot archive while a standby never received the reopen notification.

Suggested change
-            if (channel)
+            bool notify_succeeded = false;
+            if (channel)
             {
                 // needs retry if failed
                 // since the standby node may be still spinning up.
                 remote::CcRpcService_Stub stub(channel.get());
                 int retry_times = 5;
@@
                     else
                     {
+                        notify_succeeded = true;
                         DLOG(INFO)
                             << "OnSnapshotSynced notify succeeded, ng_id="
                             << req.ng_id() << ", standby_node_id="
                             << req.standby_node_id() << ", standby_term="
                             << req.standby_node_term();
                         break;
                     }
                 }
             }
+            all_notify_succeeded = all_notify_succeeded && notify_succeeded;

Also applies to: 377-385

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/store/snapshot_manager.cpp` around lines 280 - 330, The code
currently may delete the standby snapshot even when OnSnapshotSynced never
succeeded (null channel or all retries failed); update the logic in
snapshot_manager.cpp so that all_notify_succeeded is set to false when channel
is null and also set to false when the retry loop exhausts without a successful
stub.OnSnapshotSynced; within the retry loop, set all_notify_succeeded = true
only after a successful RPC return (i.e., when !cntl.Failed() and before break),
and ensure after the loop you check that all_notify_succeeded is true before
proceeding to delete the snapshot archive.
store_handler/eloq_data_store_service/data_store_service.cpp (1)

2449-2512: ⚠️ Potential issue | 🔴 Critical

The standby reload guard still allows overlap and drops retries.

is_standby_reloading_ is never consulted before leaving the lock, so snapshot N+1 can start a second ReloadDataFromCloud() while N is still running. Then clear_reloading_flag() advances latest_snapshot_ts_ even on the closed/null-data-store path, which suppresses retries for a snapshot that was never applied. Serialize one reload per shard and advance latest_snapshot_ts_ only after a successful apply.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines
2449 - 2512, Inside the StandbySyncAndReloadData flow, prevent overlapping
reloads by checking ds_ref.is_standby_reloading_ under the standby_reload_mutex_
before releasing the lock and starting ReloadDataFromCloud; if it's already
true, return early. Only set ds_ref.is_standby_reloading_ and
ds_ref.reloading_snapshot_ts_ while holding the lock. Move the advancement of
ds_ref.latest_snapshot_ts_ into the success path: after ReloadDataFromCloud()
completes successfully (and only when ds_ref.data_store_ was non-null and shard
not Closed), acquire the standby_reload_mutex_ and update latest_snapshot_ts_
and clear is_standby_reloading_; in all other paths (no-op/skip/failure) clear
is_standby_reloading_ under the lock but do not advance latest_snapshot_ts_. Use
the existing lambda names (clear_reloading_flag) or replace with an explicit
locked cleanup to ensure the flag is always cleared and snapshots are only
advanced on successful apply.
🧹 Nitpick comments (2)
tx_service/include/cc/cc_request.h (1)

6027-6032: Update the standby-path comment to match the new behavior.

Lines 6030-6031 still say purge-deleted is the only kickout variant that runs on standby, but Line 6028 now routes CleanDataForTest through the same fallback. Please fix the comment so it doesn't mislead the next change here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 6027 - 6032, Update the
misleading comment that states "Purge deleted data is the only type of kickout
cc that will be executed on standby node" to reflect that both
CleanType::CleanDeletedData and CleanType::CleanDataForTest are now routed to
the standby fallback when ng_term < 0; edit the comment immediately above the
ng_term check (the block using clean_type_ and CleanType::CleanDeletedData /
CleanType::CleanDataForTest and the subsequent call to
Sharder::Instance().StandbyNodeTerm()) to describe that both clean types will
run on standby via the same fallback path.
store_handler/data_store_service_client.h (1)

509-510: Document the snapshot_path wire format here.

SetStandbySnapshotPayload() now takes an encoded string that is parsed elsewhere as [addr:]store_path_list with extra address heuristics. Putting that grammar on the public API would make the producer/parser contract much harder to drift out of sync.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.h` around lines 509 - 510, Add a
clear header comment above the SetStandbySnapshotPayload declaration that
documents the wire format of snapshot_path (e.g. snapshot_path is an encoded
string of the form "[addr:]store_path_list"), describe what "addr" and
"store_path_list" represent and mention any address heuristics or parsing
expectations used by the consumer/parser (so producer and parser contracts are
explicit); reference the parser function or class that consumes this string if
one exists to avoid future drift between SetStandbySnapshotPayload and the
parser.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2368-2419: The code prematurely updates the factory/live store and
overwrites a single pending payload (ds_ref.pending_standby_snapshot_path_)
before validation and without tying the payload to its snapshot_ts; change the
logic to (1) keep pending standby payloads keyed by snapshot_ts (e.g.
replace/augment pending_standby_snapshot_path_ with a map like
pending_standby_payloads_[snapshot_ts] = snapshot_path), (2) run
ParseStorePathListWithWeights(store_path_list, ...) and any other validation
before publishing/applying, and (3) only call
EloqStoreDataStoreFactory::UpdateStandbyMasterAddr,
EloqStoreDataStoreFactory::UpdateStandbyMasterStorePaths and
DataStore::UpdateStandbyMasterAddr/UpdateStandbyMasterStorePaths after
validation succeeds and when the matching snapshot ready notification for that
snapshot_ts is handled; also ensure malformed payloads are not marked as present
or used.

In `@store_handler/eloq_data_store_service/data_store.h`:
- Around line 160-165: The base implementation of ReloadDataFromMasterNode
incorrectly reports success; change the default virtual implementation of
ReloadDataFromMasterNode(int64_t term, uint64_t snapshot_ts) to return false (or
otherwise indicate "not implemented") instead of true so
DataStoreService::OnStandbySnapshotReady does not assume a missing override
succeeded; update the method comment/signature if needed to document that
derived stores must override to perform a reload and propagate actual
success/failure.

In `@store_handler/eloq_data_store_service/internal_request.h`:
- Line 1534: The code treats the literal backup_name == "standby_snapshot" as
control flow via IsStandbySnapshot(), which conflates a user backup name with a
control flag; add an explicit boolean or enum field (e.g., standby_snapshot_flag
or BackupType enum) to the request/proto and implement IsStandbySnapshot() to
return that field instead of inspecting backup_name, update call sites that
construct the request to populate the new field, and change the logic in the
archive/tagging path (the code that currently reads backup_name to choose
between <ts> and snapshot_<ts>, referenced by IsStandbySnapshot and the
archive-tagging logic in eloq_store_data_store.cpp) to consult the new explicit
field so a real backup named "standby_snapshot" is treated as a normal name.

In `@tx_service/src/remote/cc_node_service.cpp`:
- Around line 1664-1690: The snapshot_path fallback emits "leader_ip:path" when
USER is missing which lacks '@' and will be parsed as a plain store-path list;
change the fallback to include an '@' so the prefix contains '@' (e.g.
"@leader_ip:...") so SetStandbySnapshotPayload recognizes it as a standby
address. Locate the block using local_shards_.store_hd_,
Sharder::Instance().GetNodeAddress(...), and response->set_snapshot_path(...)
and replace the else branch that builds snapshot_path = leader_ip + ":" +
store_hd->SnapshotSyncDestPath() with a variant that inserts an '@' before
leader_ip.

In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 297-307: The code sends an empty synced_snapshot_path to
standbys—populate synced_snapshot_path with the actual snapshot path before
calling on_synced_req.set_snapshot_path so file-based standbys receive the real
path; locate the block that declares synced_snapshot_path in
snapshot_manager.cpp (used with on_synced_req.set_snapshot_path and DLOG) and
assign it the path produced by the snapshot transfer/receiver (the same value
used by OnSnapshotReceived/OverrideDB on the standby) or fall back to the
transfer result, then pass that populated string into
on_synced_req.set_snapshot_path and the DLOG.
- Around line 234-265: The code builds remote_dest but then calls
store_hd_->SendSnapshotToRemote with an empty string literal; replace that ""
with the computed remote_dest so the destination is forwarded into
SendSnapshotToRemote (i.e., change the parameter passed in the
SendSnapshotToRemote call in snapshot_manager.cpp to use remote_dest) to ensure
rsync/remote transfer receives the correct standby destination.

---

Outside diff comments:
In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2449-2512: Inside the StandbySyncAndReloadData flow, prevent
overlapping reloads by checking ds_ref.is_standby_reloading_ under the
standby_reload_mutex_ before releasing the lock and starting
ReloadDataFromCloud; if it's already true, return early. Only set
ds_ref.is_standby_reloading_ and ds_ref.reloading_snapshot_ts_ while holding the
lock. Move the advancement of ds_ref.latest_snapshot_ts_ into the success path:
after ReloadDataFromCloud() completes successfully (and only when
ds_ref.data_store_ was non-null and shard not Closed), acquire the
standby_reload_mutex_ and update latest_snapshot_ts_ and clear
is_standby_reloading_; in all other paths (no-op/skip/failure) clear
is_standby_reloading_ under the lock but do not advance latest_snapshot_ts_. Use
the existing lambda names (clear_reloading_flag) or replace with an explicit
locked cleanup to ensure the flag is always cleared and snapshots are only
advanced on successful apply.

In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 280-330: The code currently may delete the standby snapshot even
when OnSnapshotSynced never succeeded (null channel or all retries failed);
update the logic in snapshot_manager.cpp so that all_notify_succeeded is set to
false when channel is null and also set to false when the retry loop exhausts
without a successful stub.OnSnapshotSynced; within the retry loop, set
all_notify_succeeded = true only after a successful RPC return (i.e., when
!cntl.Failed() and before break), and ensure after the loop you check that
all_notify_succeeded is true before proceeding to delete the snapshot archive.

---

Nitpick comments:
In `@store_handler/data_store_service_client.h`:
- Around line 509-510: Add a clear header comment above the
SetStandbySnapshotPayload declaration that documents the wire format of
snapshot_path (e.g. snapshot_path is an encoded string of the form
"[addr:]store_path_list"), describe what "addr" and "store_path_list" represent
and mention any address heuristics or parsing expectations used by the
consumer/parser (so producer and parser contracts are explicit); reference the
parser function or class that consumes this string if one exists to avoid future
drift between SetStandbySnapshotPayload and the parser.

In `@tx_service/include/cc/cc_request.h`:
- Around line 6027-6032: Update the misleading comment that states "Purge
deleted data is the only type of kickout cc that will be executed on standby
node" to reflect that both CleanType::CleanDeletedData and
CleanType::CleanDataForTest are now routed to the standby fallback when ng_term
< 0; edit the comment immediately above the ng_term check (the block using
clean_type_ and CleanType::CleanDeletedData / CleanType::CleanDataForTest and
the subsequent call to Sharder::Instance().StandbyNodeTerm()) to describe that
both clean types will run on standby via the same fallback path.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 9681aeaa-4af2-4513-a24a-233ee6c707c1

📥 Commits

Reviewing files that changed from the base of the PR and between 5763f59 and 770299a.

📒 Files selected for processing (23)
  • store_handler/data_store_service_client.cpp
  • store_handler/data_store_service_client.h
  • store_handler/eloq_data_store_service/build_eloq_store.cmake
  • store_handler/eloq_data_store_service/data_store.h
  • store_handler/eloq_data_store_service/data_store_service.cpp
  • store_handler/eloq_data_store_service/data_store_service.h
  • store_handler/eloq_data_store_service/eloq_store_config.cpp
  • store_handler/eloq_data_store_service/eloq_store_config.h
  • store_handler/eloq_data_store_service/eloq_store_data_store.cpp
  • store_handler/eloq_data_store_service/eloq_store_data_store.h
  • store_handler/eloq_data_store_service/eloq_store_data_store_factory.h
  • store_handler/eloq_data_store_service/eloqstore
  • store_handler/eloq_data_store_service/internal_request.h
  • store_handler/rocksdb_handler.cpp
  • store_handler/rocksdb_handler.h
  • tx_service/include/cc/cc_request.h
  • tx_service/include/proto/cc_request.proto
  • tx_service/include/remote/cc_node_service.h
  • tx_service/include/store/data_store_handler.h
  • tx_service/src/checkpointer.cpp
  • tx_service/src/fault/cc_node.cpp
  • tx_service/src/remote/cc_node_service.cpp
  • tx_service/src/store/snapshot_manager.cpp
💤 Files with no reviewable changes (1)
  • store_handler/eloq_data_store_service/eloq_store_config.h

@thweetkomputer thweetkomputer force-pushed the feat-eloqstore-local-standby-zc branch from 770299a to 3403738 Compare March 11, 2026 11:21
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
tx_service/include/cc/cc_request.h (1)

6027-6032: ⚠️ Potential issue | 🟡 Minor

Clarify and cover the new standby cleanup contract.

CleanDataForTest now enters the standby-term path too, but the adjacent comment still says deleted-data purge is the only kickout type supported on standby. Please update that invariant/comment and add a regression test for the standby CleanDataForTest flow so this behavior does not get reverted accidentally later.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 6027 - 6032, The comment and
invariant near the ng_term check are out of date: CleanType::CleanDataForTest
now follows the standby-term path alongside CleanType::CleanDeletedData, so
update the inlined comment next to the ng_term check (the block that references
CleanType::CleanDeletedData and CleanType::CleanDataForTest) to reflect that
both CleanDeletedData and CleanDataForTest are permitted on standby; then add a
regression test that exercises the standby cleanup path (drive ng_term < 0 +
clean_type_ == CleanType::CleanDataForTest and assert the code follows the
StandbyNodeTerm() branch and performs the expected cleanup behavior) to prevent
accidental reversion of this contract.
tx_service/src/checkpointer.cpp (1)

170-189: ⚠️ Potential issue | 🟠 Major

Don't dereference a failed popen().

If popen("echo $USER", "r") fails, Line 174 passes a null handle to fgets and the checkpointer crashes before it can request snapshot sync. std::getenv("USER") would avoid the subprocess entirely.

🛠️ Safer alternative
-            std::array<char, 200> buffer;
-            std::string username;
-            FILE *output_stream = popen("echo $USER", "r");
-            while (fgets(buffer.data(), 200, output_stream) != nullptr)
-            {
-                username.append(buffer.data());
-            }
-            if (!username.empty())
-            {
-                // remove the trailing \n of output.
-                assert(username.back() == '\n');
-                username.pop_back();
-            }
-            pclose(output_stream);
+            std::string username;
+            if (const char *user = std::getenv("USER"); user != nullptr)
+            {
+                username = user;
+            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/checkpointer.cpp` around lines 170 - 189, The code
dereferences the result of popen("echo $USER", "r") without checking for
failure; replace this subprocess approach with a direct getenv lookup (use
std::getenv("USER") and treat nullptr as empty) to populate username for
snapshot_req.set_user, and if you must keep the popen branch ensure you check
the FILE* before calling fgets/pclose. Update the block that currently opens
output_stream, reads into buffer, and pclose it (the code around
popen/fgets/username) so it uses getenv("USER") first and only falls back to a
safely-checked popen path; keep the subsequent snapshot_req.set_user(username)
and use store_hd_->SnapshotSyncDestPath() unchanged.
tx_service/src/data_sync_task.cpp (1)

156-171: ⚠️ Potential issue | 🔴 Critical

Stop the checkpoint flow if CreateSnapshotForStandby() fails.

This branch still falls through to BrocastPrimaryCkptTs() even when the standby snapshot was not created. That can advertise truncate_log_ts_ to followers for a snapshot tag that does not exist, while the primary has already treated the checkpoint as successful. Handle the false return here and convert it into a checkpoint failure instead of broadcasting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/data_sync_task.cpp` around lines 156 - 171, The code currently
ignores failures from
Sharder::Instance().GetDataStoreHandler()->CreateSnapshotForStandby(...) and
proceeds to call BrocastPrimaryCkptTs(...), which can advertise a
truncate_log_ts_ for a non-existent snapshot; change the flow to check the
boolean return of CreateSnapshotForStandby (when status_->HasDataStoreWrite()
and IsSharedStorage() is false), and if it returns false, abort/mark the
checkpoint as failed (e.g., set the checkpoint status to failure/return early or
invoke the existing checkpoint-failure handling code) and do NOT call
BrocastPrimaryCkptTs(node_group_id_, node_group_term_,
status_->truncate_log_ts_, status_->HasDataStoreWrite()); ensure the failure
path logs the error and preserves existing semantics for successful snapshot
creation.
tx_service/include/store/data_store_handler.h (1)

373-395: ⚠️ Potential issue | 🔴 Critical

Fix signature mismatch in RocksDBHandlerImpl::CreateSnapshotForStandby and RocksDBCloudHandlerImpl::CreateSnapshotForStandby.

The base class takes uint64_t snapshot_ts (by value), but the RocksDB implementations take uint64_t *snapshot_ts (by pointer). This parameter type mismatch prevents these methods from properly overriding the base virtual and breaks virtual dispatch. Change the RocksDB implementations to use uint64_t snapshot_ts by value to match the base signature.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/store/data_store_handler.h` around lines 373 - 395, The
RocksDB implementations have a mismatched parameter type for
CreateSnapshotForStandby; update the method signatures in
RocksDBHandlerImpl::CreateSnapshotForStandby and
RocksDBCloudHandlerImpl::CreateSnapshotForStandby to accept uint64_t snapshot_ts
(by value) to match the base class, then update any uses inside those functions
to stop treating snapshot_ts as a pointer (remove dereferences like
*snapshot_ts) and use the value directly; also add/keep the override specifier
so the compiler verifies the override.
♻️ Duplicate comments (2)
store_handler/data_store_service_client.h (1)

507-508: ⚠️ Potential issue | 🟠 Major

Carry snapshot_ts in SetStandbySnapshotPayload().

This API cannot tie a payload to the snapshot it belongs to. If standby notifications are reordered, snapshot N can be reloaded using snapshot N+1's source settings; the payload needs to stay keyed by snapshot_ts until that exact snapshot is applied.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.h` around lines 507 - 508,
SetStandbySnapshotPayload currently takes (uint32_t ng_id, const std::string
&snapshot_path) and cannot associate the payload with a specific snapshot
timestamp, causing reordering bugs; change its signature to include a
snapshot_ts (e.g., uint64_t snapshot_ts) so the payload is keyed by snapshot_ts.
Update the declaration of SetStandbySnapshotPayload in
data_store_service_client.h, adjust all implementing/overriding definitions and
call sites to accept and pass the new snapshot_ts parameter, and ensure the
implementation stores/looks up the payload keyed by (ng_id, snapshot_ts) rather
than (ng_id, snapshot_path) so the exact snapshot's payload persists until
applied.
tx_service/src/remote/cc_node_service.cpp (1)

1718-1728: ⚠️ Potential issue | 🟠 Major

Keep the USER-missing fallback in remote-address form.

When USER is unavailable, this builds leader_ip:path, which drops the @ delimiter. The standby side then parses it as a plain path list instead of <user>@<host>:<path>, so the leader address/path metadata never gets installed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_node_service.cpp` around lines 1718 - 1728, The
constructed snapshot_path must always use the remote-address form with the '@'
delimiter so the standby parses it as <user>@<host>:<path>; if getenv("USER")
returns null or empty, build snapshot_path by prefixing an '@' before leader_ip
(e.g., "@" + leader_ip + ":" + store_hd->SnapshotSyncDestPath()) instead of
omitting the '@', and update the code paths that set snapshot_path (the
getenv("USER") check and the snapshot_path assignment) to ensure the '@' is
present even when the username is missing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3899-3903: SnapshotSyncDestPath() currently returns the raw
FLAGS_eloq_store_data_path_list which may be a multi/weighted-list; change it to
parse that config and return a single concrete filesystem directory (e.g., pick
the primary/first path or the unweighted path entry), not the raw blob. In the
DataStoreServiceClient::SnapshotSyncDestPath implementation, replace the direct
return of FLAGS_eloq_store_data_path_list with logic to split the list format,
handle weights/whitespace, validate the chosen path exists or is well-formed,
and return that single path string so callers get a usable destination rather
than the full config list.
- Around line 3639-3649: CreateSnapshotForStandby is updating a single
process-wide latest_standby_snapshot_ts_ which breaks per-node-group semantics
used by CurrentStandbySnapshotTs(ng_id); instead maintain and update a per-ng_id
timestamp store (e.g., std::unordered_map<ng_id, uint64_t> or concurrent map
keyed by ng_id, or a map of std::atomic<uint64_t> values) and update the entry
for the incoming ng_id when CreateSnapshotForStandby succeeds; make sure to add
thread-safety (mutex or atomic entries) and replace reads of
latest_standby_snapshot_ts_ with lookups in the per-ng_id structure so
CurrentStandbySnapshotTs(ng_id) returns the correct timestamp for that ng_id.
- Around line 3944-3968: In DataStoreServiceClient::RequestSyncSnapshot: after
building bucket_ids (from bucket_infos_), add a guard to fail fast when
bucket_ids is empty before calling OpenDataStore or ReloadData—mirror the
behavior in OnLeaderStart by returning an error/false (instead of proceeding) if
bucket_ids.empty(); ensure this check sits before the
FetchDSShardStatus/OpenDataStore block so you don't open or reload an empty
shard; keep references to ng_id/ng_term and still call FetchDSShardStatus only
when bucket_ids is non-empty if needed.

In `@store_handler/data_store_service_client.h`:
- Line 898: The single global std::atomic<uint64_t> latest_standby_snapshot_ts_
must be replaced with a per-node-group mapping so updates/query are scoped by
ng_id; change latest_standby_snapshot_ts_ to a map keyed by uint32_t (e.g.
std::unordered_map<uint32_t, std::atomic<uint64_t>> or a vector indexed by
ng_id) and protect map mutations/initialization with a mutex, then update all
callers (e.g. CurrentStandbySnapshotTs(uint32_t ng_id) and the standby snapshot
cleanup APIs) to read/write the timestamp using the provided ng_id key and
initialize an entry if missing. Ensure reads/writes of the stored atomic value
remain atomic while map access is synchronized.

In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2352-2382: ReloadData can apply snapshots out-of-order because two
threads can both pass the pre-check on ds_ref.latest_snapshot_ts_ and then
finish in reverse order; fix by serializing per-shard reloads or re-checking
staleness after acquiring a per-shard reload lock: add a per-shard mutex/flag on
the shard reference (e.g., ds_ref.reload_mutex_ or ds_ref.reload_in_progress_),
acquire it before re-checking and before calling
ds_ref.data_store_->ReloadData(term, snapshot_ts), then after acquiring the lock
re-read ds_ref.latest_snapshot_ts_ and bail if snapshot_ts <
latest_snapshot_ts_, otherwise perform ReloadData and update
ds_ref.latest_snapshot_ts_ as now but only if snapshot_ts is newer; release the
lock at the end so concurrent reloads are ordered and stale snapshots cannot
overwrite newer ones.

In `@store_handler/rocksdb_handler.cpp`:
- Around line 2442-2450: The CreateSnapshotForStandby implementation is
discarding the snapshot identifier and always reusing ckpt_path_, which risks
concurrent overwrite; modify RocksDBCloudHandlerImpl::CreateSnapshotForStandby
to obtain a unique snapshot id (assign into snapshot_ts) and create/point
CreateSnapshot at an isolated snapshot directory/name derived from that id
(instead of ckpt_path_), so the standby sync uses the per-snapshot path; apply
the same change to the other
EloqKV::RocksDBCloudHandlerImpl::CreateSnapshotForStandby occurrence.

In `@store_handler/rocksdb_handler.h`:
- Around line 691-693: The override signatures for CreateSnapshotForStandby in
rocksdb_handler (the methods named CreateSnapshotForStandby) currently take a
uint64_t *snapshot_ts but the base declaration expects uint64_t snapshot_ts by
value; update the derived declarations to match the base exactly by changing the
parameter to uint64_t snapshot_ts (and adjust any corresponding
implementation/uses in rocksdb_handler.cpp to accept/use a value, not a pointer)
so the override keyword compiles correctly.

In `@tx_service/src/checkpointer.cpp`:
- Around line 353-365: The new CreateSnapshotForStandby(...) calls currently
ignore their bool return value; update the blocks where
status->HasDataStoreWrite() and Sharder::Instance().GetDataStoreHandler() are
used (both occurrences, including the similar block at lines ~432-444) to
capture the returned bool, and if it is false treat snapshot creation as a
failure: log an error with context, abort/propagate failure so the checkpoint is
not advertised to standbys (i.e., do not move forward with the
checkpoint/standby-visible state), and ensure any calling code (the checkpoint
publish path) observes this failure and refrains from announcing the checkpoint.
Ensure you respect store_handler->IsSharedStorage() logic and only perform this
check when snapshot creation was attempted.
- Around line 205-209: The conditional currently compiled under
DATA_STORE_TYPE_ELOQDSS_ELOQSTORE makes the if reduce to just is_standby_node
and incorrectly skips the local-standby checkpoint flow for EloqStore; change
the condition so it always checks the datastore sharing at runtime rather than
relying on the preprocessor. Replace the preprocessor-wrapped clause with a
runtime check using Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()
so the if becomes a combined check of is_standby_node &&
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage(), ensuring EloqStore
standbys do not incorrectly skip checkpointing and the local-standby flow still
runs when appropriate.

In `@tx_service/src/remote/cc_node_service.cpp`:
- Around line 63-83: The destructor currently flips standby_task_running_ to
false causing StandbyTaskWorkerMain() to exit immediately and drop queued tasks;
modify the worker loop so it only exits when both standby_task_running_ is false
AND standby_tasks_ is empty (e.g., loop condition: while (standby_task_running_
|| !standby_tasks_.empty()) ), and ensure condition_variable waits/notifications
in EnqueueStandbyTask and ~CcNodeService still wake the worker so it can drain
remaining tasks; update StandbyTaskWorkerMain(), reference to
standby_task_running_, standby_tasks_, EnqueueStandbyTask(), and
~CcNodeService() accordingly.
- Around line 1765-1777: The async task currently relies on a pre-enqueue
stale-check and can still apply an older checkpoint; inside the lambda passed to
EnqueueStandbyTask re-validate monotonicity before updating by either calling
store_hd->OnUpdateStandbyCkptTs against the current in-memory checkpoint (i.e.,
perform the same check again) or compute a clamped value = max(existing_ckpt_ts,
ckpt_ts) and pass that to OnUpdateStandbyCkptTs, and only if the resulting value
actually increases call Sharder::Instance().UpdateNodeGroupCkptTs(ng_id,
new_ckpt_ts).

In `@tx_service/src/standby.cpp`:
- Around line 65-74: The current update of agg->min_ack_ckpt_ts in the
UpdateStandbyCkptTs success path incorrectly includes zero timestamps, which can
pin the cleanup watermark to 0; modify the success branch in the block handling
succ (around UpdateStandbyCkptTs / ctx_->resp.current_ckpt_ts()) to only update
agg->min_ack_ckpt_ts when ctx_->resp.current_ckpt_ts() > 0 (i.e., ignore zero
ack timestamps), leaving the existing success++ logic intact, so
DeleteStandbySnapshotsBefore() can advance correctly.

In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 313-345: The inner `#ifdef` duplicates the outer
DATA_STORE_TYPE_ELOQDSS_ELOQSTORE guard and makes the SendSnapshotToRemote
branch unreachable; flatten the preprocessor logic in SyncWithStandby by
removing the nested `#ifdef` and using a single conditional around the alternate
behavior: if DATA_STORE_TYPE_ELOQDSS_ELOQSTORE then log the skip and set succ =
true (keep the message referencing req.ng_id(), node_id, and
req.standby_node_id()), else call store_hd_->SendSnapshotToRemote (use the
signature that includes req.standby_node_id() as in the inner dead branch) and
preserve the surrounding DLOGs and succ handling; ensure references to
store_hd_, SendSnapshotToRemote, succ and the DLOGs are updated so there is only
one `#ifdef` controlling the eloqstore-special-case.
- Around line 289-292: min_ack_ckpt_ts is declared unconditionally but only used
inside the DATA_STORE_TYPE_ELOQDSS_ELOQSTORE blocks; move the declaration of
uint64_t min_ack_ckpt_ts (and RequestSyncSnapshotAggregate if appropriate)
inside the `#ifdef` DATA_STORE_TYPE_ELOQDSS_ELOQSTORE region so they are only
compiled when used, ensuring references at the usage sites (around the code that
references min_ack_ckpt_ts at the later lines) remain valid.

---

Outside diff comments:
In `@tx_service/include/cc/cc_request.h`:
- Around line 6027-6032: The comment and invariant near the ng_term check are
out of date: CleanType::CleanDataForTest now follows the standby-term path
alongside CleanType::CleanDeletedData, so update the inlined comment next to the
ng_term check (the block that references CleanType::CleanDeletedData and
CleanType::CleanDataForTest) to reflect that both CleanDeletedData and
CleanDataForTest are permitted on standby; then add a regression test that
exercises the standby cleanup path (drive ng_term < 0 + clean_type_ ==
CleanType::CleanDataForTest and assert the code follows the StandbyNodeTerm()
branch and performs the expected cleanup behavior) to prevent accidental
reversion of this contract.

In `@tx_service/include/store/data_store_handler.h`:
- Around line 373-395: The RocksDB implementations have a mismatched parameter
type for CreateSnapshotForStandby; update the method signatures in
RocksDBHandlerImpl::CreateSnapshotForStandby and
RocksDBCloudHandlerImpl::CreateSnapshotForStandby to accept uint64_t snapshot_ts
(by value) to match the base class, then update any uses inside those functions
to stop treating snapshot_ts as a pointer (remove dereferences like
*snapshot_ts) and use the value directly; also add/keep the override specifier
so the compiler verifies the override.

In `@tx_service/src/checkpointer.cpp`:
- Around line 170-189: The code dereferences the result of popen("echo $USER",
"r") without checking for failure; replace this subprocess approach with a
direct getenv lookup (use std::getenv("USER") and treat nullptr as empty) to
populate username for snapshot_req.set_user, and if you must keep the popen
branch ensure you check the FILE* before calling fgets/pclose. Update the block
that currently opens output_stream, reads into buffer, and pclose it (the code
around popen/fgets/username) so it uses getenv("USER") first and only falls back
to a safely-checked popen path; keep the subsequent
snapshot_req.set_user(username) and use store_hd_->SnapshotSyncDestPath()
unchanged.

In `@tx_service/src/data_sync_task.cpp`:
- Around line 156-171: The code currently ignores failures from
Sharder::Instance().GetDataStoreHandler()->CreateSnapshotForStandby(...) and
proceeds to call BrocastPrimaryCkptTs(...), which can advertise a
truncate_log_ts_ for a non-existent snapshot; change the flow to check the
boolean return of CreateSnapshotForStandby (when status_->HasDataStoreWrite()
and IsSharedStorage() is false), and if it returns false, abort/mark the
checkpoint as failed (e.g., set the checkpoint status to failure/return early or
invoke the existing checkpoint-failure handling code) and do NOT call
BrocastPrimaryCkptTs(node_group_id_, node_group_term_,
status_->truncate_log_ts_, status_->HasDataStoreWrite()); ensure the failure
path logs the error and preserves existing semantics for successful snapshot
creation.

---

Duplicate comments:
In `@store_handler/data_store_service_client.h`:
- Around line 507-508: SetStandbySnapshotPayload currently takes (uint32_t
ng_id, const std::string &snapshot_path) and cannot associate the payload with a
specific snapshot timestamp, causing reordering bugs; change its signature to
include a snapshot_ts (e.g., uint64_t snapshot_ts) so the payload is keyed by
snapshot_ts. Update the declaration of SetStandbySnapshotPayload in
data_store_service_client.h, adjust all implementing/overriding definitions and
call sites to accept and pass the new snapshot_ts parameter, and ensure the
implementation stores/looks up the payload keyed by (ng_id, snapshot_ts) rather
than (ng_id, snapshot_path) so the exact snapshot's payload persists until
applied.

In `@tx_service/src/remote/cc_node_service.cpp`:
- Around line 1718-1728: The constructed snapshot_path must always use the
remote-address form with the '@' delimiter so the standby parses it as
<user>@<host>:<path>; if getenv("USER") returns null or empty, build
snapshot_path by prefixing an '@' before leader_ip (e.g., "@" + leader_ip + ":"
+ store_hd->SnapshotSyncDestPath()) instead of omitting the '@', and update the
code paths that set snapshot_path (the getenv("USER") check and the
snapshot_path assignment) to ensure the '@' is present even when the username is
missing.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 47094bf6-8d1e-4f04-b27f-b8fe252e6c91

📥 Commits

Reviewing files that changed from the base of the PR and between 770299a and 3403738.

📒 Files selected for processing (26)
  • store_handler/data_store_service_client.cpp
  • store_handler/data_store_service_client.h
  • store_handler/eloq_data_store_service/build_eloq_store.cmake
  • store_handler/eloq_data_store_service/data_store.h
  • store_handler/eloq_data_store_service/data_store_service.cpp
  • store_handler/eloq_data_store_service/data_store_service.h
  • store_handler/eloq_data_store_service/eloq_store_config.cpp
  • store_handler/eloq_data_store_service/eloq_store_config.h
  • store_handler/eloq_data_store_service/eloq_store_data_store.cpp
  • store_handler/eloq_data_store_service/eloq_store_data_store.h
  • store_handler/eloq_data_store_service/eloq_store_data_store_factory.h
  • store_handler/eloq_data_store_service/eloqstore
  • store_handler/eloq_data_store_service/internal_request.h
  • store_handler/eloq_data_store_service/rocksdb_data_store_common.h
  • store_handler/rocksdb_handler.cpp
  • store_handler/rocksdb_handler.h
  • tx_service/include/cc/cc_request.h
  • tx_service/include/proto/cc_request.proto
  • tx_service/include/remote/cc_node_service.h
  • tx_service/include/store/data_store_handler.h
  • tx_service/src/checkpointer.cpp
  • tx_service/src/data_sync_task.cpp
  • tx_service/src/fault/cc_node.cpp
  • tx_service/src/remote/cc_node_service.cpp
  • tx_service/src/standby.cpp
  • tx_service/src/store/snapshot_manager.cpp
💤 Files with no reviewable changes (1)
  • store_handler/eloq_data_store_service/eloq_store_config.h
✅ Files skipped from review due to trivial changes (1)
  • store_handler/eloq_data_store_service/eloqstore
🚧 Files skipped from review as they are similar to previous changes (4)
  • store_handler/eloq_data_store_service/data_store.h
  • store_handler/eloq_data_store_service/eloq_store_data_store_factory.h
  • tx_service/include/remote/cc_node_service.h
  • tx_service/src/fault/cc_node.cpp

Comment on lines +3899 to +3903
std::string DataStoreServiceClient::SnapshotSyncDestPath() const
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
return FLAGS_eloq_store_data_path_list;
#else
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return a real sync directory here, not the raw *_path_list config.

SnapshotSyncDestPath() is a singular destination accessor, but it currently returns FLAGS_eloq_store_data_path_list verbatim. As soon as that flag contains multiple or weighted paths, callers will get a config blob instead of a usable filesystem path, and snapshot sync will target an invalid destination.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 3899 - 3903,
SnapshotSyncDestPath() currently returns the raw FLAGS_eloq_store_data_path_list
which may be a multi/weighted-list; change it to parse that config and return a
single concrete filesystem directory (e.g., pick the primary/first path or the
unweighted path entry), not the raw blob. In the
DataStoreServiceClient::SnapshotSyncDestPath implementation, replace the direct
return of FLAGS_eloq_store_data_path_list with logic to split the list format,
handle weights/whitespace, validate the chosen path exists or is well-formed,
and return that single path string so callers get a usable destination rather
than the full config list.

// shard owner.
std::array<std::atomic<uint32_t>, 1000> dss_shards_;
std::atomic<uint64_t> dss_topology_version_{0};
std::atomic<uint64_t> latest_standby_snapshot_ts_{0};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Track standby snapshot ts per ng_id, not globally.

CurrentStandbySnapshotTs(uint32_t ng_id) and the standby snapshot cleanup APIs are scoped by node group, but this stores only one timestamp for the whole client. As soon as two node groups advance independently, an update from ng B can be reported back for ng A, which can make the primary garbage-collect the wrong standby snapshots.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.h` at line 898, The single global
std::atomic<uint64_t> latest_standby_snapshot_ts_ must be replaced with a
per-node-group mapping so updates/query are scoped by ng_id; change
latest_standby_snapshot_ts_ to a map keyed by uint32_t (e.g.
std::unordered_map<uint32_t, std::atomic<uint64_t>> or a vector indexed by
ng_id) and protect map mutations/initialization with a mutex, then update all
callers (e.g. CurrentStandbySnapshotTs(uint32_t ng_id) and the standby snapshot
cleanup APIs) to read/write the timestamp using the provided ng_id key and
initialize an entry if missing. Ensure reads/writes of the stored atomic value
remain atomic while map access is synchronized.

@thweetkomputer thweetkomputer force-pushed the feat-eloqstore-local-standby-zc branch from 3403738 to 8425155 Compare March 12, 2026 10:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tx_service/src/store/snapshot_manager.cpp (1)

276-282: ⚠️ Potential issue | 🟠 Major

Don't consume the queued request before the retry decision.

pending_task is a reference to an entry in pending_req_, and std::move(pending_task) moves the actual map element into the tasks vector. Although the iterator advances and the entry remains in the map, after the move the map entry is in a "valid but unspecified state" per C++ and protobuf guarantees. The retry path that reads from pending_req_ later cannot reliably access field values like standby_node_term(), causing wrong term comparisons or other incorrect behavior.

🩹 Suggested fix
-            tasks.emplace_back(std::move(pending_task));
+            tasks.emplace_back(pending_task);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/store/snapshot_manager.cpp` around lines 276 - 282, The
current code moves the map element referenced by pending_task into tasks
(std::move(pending_task)), leaving the pending_req_ entry in a
valid-but-unspecified state and breaking later retry reads (e.g.,
standby_node_term()); change this to copy the request/value instead of moving
the map element so the entry in pending_req_ remains intact. Locate the loop
where pending_task (the map element/iterator into pending_req_) is pushed into
tasks and replace the move with a copy/emplace of the element's value (retain
the iterator advancement logic: it++), ensuring subsequent retry logic that
reads pending_req_ reads stable data.
♻️ Duplicate comments (11)
store_handler/rocksdb_handler.h (1)

691-693: ⚠️ Potential issue | 🔴 Critical

Fix the CreateSnapshotForStandby override signature.

The base now requires (uint32_t ng_id, std::vector<std::string> &, uint64_t snapshot_ts), but these declarations still use (std::vector<std::string> &, uint64_t *). With override, this is a hard compile failure, so neither RocksDB implementation actually overrides the new standby snapshot API.

🛠 Suggested fix
-    bool CreateSnapshotForStandby(
-        std::vector<std::string> &snapshot_files,
-        uint64_t *snapshot_ts) override;
+    bool CreateSnapshotForStandby(
+        uint32_t ng_id,
+        std::vector<std::string> &snapshot_files,
+        uint64_t snapshot_ts) override;

Apply the same change to both declarations and their corresponding .cpp definitions.

Run this to verify the base/derived mismatch:

#!/bin/bash
set -euo pipefail

echo "Base declaration:"
fd 'data_store_handler\.h$' | xargs -r rg -n -C2 '\bCreateSnapshotForStandby\s*\('

echo
echo "Derived declarations:"
fd 'rocksdb_handler\.h$' | xargs -r rg -n -C2 '\bCreateSnapshotForStandby\s*\('

Also applies to: 749-751

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/rocksdb_handler.h` around lines 691 - 693, The
CreateSnapshotForStandby override signature in rocksdb_handler currently uses
(std::vector<std::string>&, uint64_t*) and must be changed to match the base:
use (uint32_t ng_id, std::vector<std::string>& snapshot_files, uint64_t
snapshot_ts) — update both declarations in rocksdb_handler.h (both occurrences)
and the corresponding definitions in rocksdb_handler.cpp to accept the uint32_t
ng_id and a uint64_t snapshot_ts (not a pointer) so the override matches the
base method name CreateSnapshotForStandby.
tx_service/src/checkpointer.cpp (2)

353-366: ⚠️ Potential issue | 🟠 Major

Don't publish a ckpt ts if standby snapshot creation failed.

Both CreateSnapshotForStandby(...) calls drop the bool result and BrocastPrimaryCkptTs(...) still runs immediately after. That advertises a ckpt ts the standby may never be able to fetch.

Also applies to: 429-445

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/checkpointer.cpp` around lines 353 - 366, The code currently
ignores the boolean result from CreateSnapshotForStandby and always calls
BrocastPrimaryCkptTs, which can advertise a checkpoint the standby can't fetch;
update the logic around
Sharder::Instance().GetDataStoreHandler()->CreateSnapshotForStandby(...) to
capture and check its returned success bool and only call
BrocastPrimaryCkptTs(node_group, leader_term, last_succ_ckpt_ts,
status->HasDataStoreWrite()) when the snapshot creation succeeded (and preserve
existing behavior when status->HasDataStoreWrite() is false); apply the same fix
to the other identical block that calls CreateSnapshotForStandby and
BrocastPrimaryCkptTs.

205-209: ⚠️ Potential issue | 🔴 Critical

This #ifndef disables local-standby checkpointing for EloqStore.

With DATA_STORE_TYPE_ELOQDSS_ELOQSTORE defined, the condition collapses to if (is_standby_node), so the standby checkpoint path below never executes.

🛠 Suggested fix
-    if (is_standby_node
-#ifndef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
-        && Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()
-#endif
-    )
+    if (is_standby_node &&
+        Sharder::Instance().GetDataStoreHandler()->IsSharedStorage())
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/checkpointer.cpp` around lines 205 - 209, The preprocessor
guard around the shared-storage check causes the condition to be reduced to only
is_standby_node when DATA_STORE_TYPE_ELOQDSS_ELOQSTORE is defined, preventing
the standby checkpoint branch from running; remove the `#ifndef/` `#endif` so the if
reads: if (is_standby_node &&
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) thereby ensuring
the IsSharedStorage() check always runs (references: is_standby_node,
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage(),
DATA_STORE_TYPE_ELOQDSS_ELOQSTORE).
tx_service/src/remote/cc_node_service.cpp (3)

63-74: ⚠️ Potential issue | 🟠 Major

Drain queued standby work before exiting the worker.

~CcNodeService() flips standby_task_running_ to false, and StandbyTaskWorkerMain() breaks as soon as the flag is false. Anything already accepted through EnqueueStandbyTask() can be dropped during teardown.

🛠 Suggested fix
 void CcNodeService::StandbyTaskWorkerMain()
 {
     while (true)
     {
         std::function<void()> task;
         {
             std::unique_lock<std::mutex> lk(standby_task_mu_);
-            while (standby_task_running_ && standby_tasks_.empty())
-            {
-                standby_task_cv_.wait_for(lk, std::chrono::milliseconds(100));
-            }
-            if (!standby_task_running_)
+            standby_task_cv_.wait(
+                lk,
+                [this]
+                { return !standby_task_running_ || !standby_tasks_.empty(); });
+            if (!standby_task_running_ && standby_tasks_.empty())
             {
                 break;
             }
             task = std::move(standby_tasks_.front());
             standby_tasks_.pop_front();

Also applies to: 85-104

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_node_service.cpp` around lines 63 - 74, Destructor
currently flips standby_task_running_ to false causing StandbyTaskWorkerMain to
exit immediately and drop items enqueued via EnqueueStandbyTask; fix by ensuring
queued tasks are drained before stopping the worker: either (A) change the
worker loop in StandbyTaskWorkerMain to continue processing until
standby_task_queue_ is empty even after standby_task_running_ becomes false, or
(B) in CcNodeService::~CcNodeService wait (using standby_task_mu_ and
standby_task_cv_) until standby_task_queue_ is empty before setting
standby_task_running_ to false and notifying/joining standby_task_worker_;
update synchronization around standby_task_queue_, standby_task_running_,
EnqueueStandbyTask(), standby_task_cv_ to avoid races and ensure proper wakeups.

1765-1777: ⚠️ Potential issue | 🟠 Major

Re-check ckpt monotonicity inside the queued task.

The stale-check only happens before enqueue. If a newer ckpt is queued first and an older one is queued before the worker updates NativeNodeGroupCkptTs(), both tasks still run and the later task can move the standby ckpt backward.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_node_service.cpp` around lines 1765 - 1777, The
queued lambda must re-check checkpoint monotonicity to avoid moving standby ckpt
backwards: inside the EnqueueStandbyTask lambda (the closure capturing store_hd,
ng_id, ng_term, ckpt_ts) fetch the current checkpoint for ng_id (e.g., via
Sharder::Instance().NativeNodeGroupCkptTs(ng_id) or equivalent getter) and skip
calling OnUpdateStandbyCkptTs/UpdateNodeGroupCkptTs if ckpt_ts is older or not
greater than the current value; alternatively ensure OnUpdateStandbyCkptTs
itself enforces monotonicity and returns false for stale ckpts so
UpdateNodeGroupCkptTs is only called when the stored checkpoint actually
advanced.

1718-1729: ⚠️ Potential issue | 🟠 Major

Don't emit a bare leader_ip:path fallback.

When USER is missing, this builds leader_ip:path. The standby side only treats payloads with an empty/local prefix or an @ as remote addresses, so this fallback is parsed as a plain store-path list and the master address never gets installed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_node_service.cpp` around lines 1718 - 1729, The
fallback when getenv("USER") is empty currently builds "leader_ip:path" which is
parsed as a local store-path; instead ensure the master address is recognized by
prepending an '@' (or otherwise ensuring a remote-prefix) when constructing
snapshot_path in the else branch where user is null/empty — update the logic
that builds snapshot_path (using leader_ip and store_hd->SnapshotSyncDestPath())
before calling response->set_snapshot_path(...) so the resulting string includes
the '@' marker (e.g., "@"+leader_ip+":"+store_hd->SnapshotSyncDestPath()).
tx_service/src/standby.cpp (1)

67-73: ⚠️ Potential issue | 🟠 Major

Ignore zero checkpoint ACKs when advancing the cleanup watermark.

A successful standby can legitimately reply with current_ckpt_ts() == 0. Folding that into min_ack_ckpt_ts pins the watermark to 0, so the snapshot cleanup below never advances.

🩹 Suggested fix
         {
             std::lock_guard<bthread::Mutex> lk(agg->mux);
             if (succ)
             {
                 agg->success++;
-                agg->min_ack_ckpt_ts = std::min(agg->min_ack_ckpt_ts,
-                                                ctx_->resp.current_ckpt_ts());
+                const uint64_t ack_ckpt_ts = ctx_->resp.current_ckpt_ts();
+                if (ack_ckpt_ts != 0)
+                {
+                    agg->min_ack_ckpt_ts =
+                        std::min(agg->min_ack_ckpt_ts, ack_ckpt_ts);
+                }
             }
             agg->pending--;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/standby.cpp` around lines 67 - 73, The code updates
agg->min_ack_ckpt_ts with ctx_->resp.current_ckpt_ts() even when that value is
0, which can pin the cleanup watermark; change the update in the success path
(inside the std::lock_guard on agg->mux where succ is true and agg->success is
incremented) to only take the min when ctx_->resp.current_ckpt_ts() > 0 (i.e.,
skip zero ACKs), leaving all locking and agg->success logic unchanged.
store_handler/rocksdb_handler.cpp (1)

2442-2450: ⚠️ Potential issue | 🟠 Major

Use the standby snapshot id to isolate checkpoint directories.

SnapshotManager::SyncWithStandby() now creates a per-sync snapshot timestamp, but both implementations overwrite it with 0 and still recreate ckpt_path_. That lets a newer standby sync clobber files an older sync is still transferring, and the timestamp-based cleanup path no longer matches the on-disk snapshot.

🩹 Suggested fix shape
 bool RocksDBHandlerImpl::CreateSnapshotForStandby(
     std::vector<std::string> &snapshot_files,
     uint64_t *snapshot_ts)
 {
-    if (snapshot_ts != nullptr)
-    {
-        *snapshot_ts = 0;
-    }
-    return CreateSnapshot(ckpt_path_, snapshot_files);
+    const uint64_t ts =
+        (snapshot_ts != nullptr && *snapshot_ts != 0)
+            ? *snapshot_ts
+            : static_cast<uint64_t>(
+                  std::chrono::duration_cast<std::chrono::microseconds>(
+                      std::chrono::system_clock::now().time_since_epoch())
+                      .count());
+    if (snapshot_ts != nullptr)
+    {
+        *snapshot_ts = ts;
+    }
+    return CreateSnapshot(ckpt_path_ + std::to_string(ts) + "/",
+                          snapshot_files);
 }

Apply the same idea to EloqKV::RocksDBCloudHandlerImpl::CreateSnapshotForStandby.

Also applies to: 3775-3783

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/rocksdb_handler.cpp` around lines 2442 - 2450,
CreateSnapshotForStandby currently zeroes out the per-sync snapshot timestamp
and reuses ckpt_path_, allowing concurrent standby syncs to clobber each other's
checkpoint dirs; instead propagate the per-sync timestamp returned by
SnapshotManager::SyncWithStandby into snapshot_ts and use it to build a unique
checkpoint path rather than always recreating ckpt_path_. Concretely, in
RocksDBCloudHandlerImpl::CreateSnapshotForStandby (and
EloqKV::RocksDBCloudHandlerImpl::CreateSnapshotForStandby) do not set
*snapshot_ts = 0; accept and assign the provided snapshot_ts from the created
snapshot, compute a timestamped ckpt path (derived from ckpt_path_ +
snapshot_ts) and pass that path to CreateSnapshot so each standby sync writes to
an isolated directory.
store_handler/eloq_data_store_service/data_store.h (1)

152-160: ⚠️ Potential issue | 🟠 Major

Don't report success from the default reload hook.

This implementation is documented as a no-op, so returning true makes a store with no standby-reload support look like it successfully reopened. That masks real install failures at the call site.

🩹 Suggested fix
     virtual bool ReloadData(int64_t term, uint64_t snapshot_ts)
     {
         (void) term;
         (void) snapshot_ts;
-        return true;
+        return false;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store.h` around lines 152 - 160,
The default ReloadData hook currently returns true, which incorrectly signals
success for stores that are no-op; change the default implementation of virtual
bool ReloadData(int64_t term, uint64_t snapshot_ts) to return false (indicating
unsupported/failure) while keeping the unused-parameter casts ((void) term;
(void) snapshot_ts;) and update the brief comment if needed; ensure the
signature and behavior of ReloadData are preserved so concrete stores that
actually implement standby reload continue to override and return true on
success.
store_handler/eloq_data_store_service/data_store_service.cpp (2)

2379-2401: ⚠️ Potential issue | 🔴 Critical

Serialize ReloadData() per shard before advancing latest_snapshot_ts_.

The pre-check is still racy: two reloads can both pass it, then finish in reverse order. If snapshot 200 completes first and snapshot 100 completes later, the store is reopened back to 100 while CurrentStandbySnapshotTs() still reports 200. Take a per-shard reload lock and re-check staleness after acquiring it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines
2379 - 2401, This is racy: serialize ReloadData per shard by acquiring a
per-shard reload lock (e.g., a mutex on ds_ref such as ds_ref.reload_mutex_)
before performing the operation; after taking that lock re-check
latest_snapshot_ts_.load(...) to ensure snapshot_ts is still >=
latest_snapshot_ts_, then call ds_ref.data_store_->ReloadData(term, snapshot_ts)
and only on success store the new latest_snapshot_ts_.store(snapshot_ts, ...);
release the lock after updating so concurrent reloads cannot interleave and
regress the snapshot time.

2276-2335: ⚠️ Potential issue | 🔴 Critical

Keep standby source state scoped to the target snapshot instead of mutating shared defaults.

This setter immediately writes the payload into EloqStoreDataStoreFactory, which is process-wide, and it still has no snapshot_ts to bind the payload to a specific reload. On a multi-shard node, shard B can overwrite shard A’s standby source before A opens/reloads; even on one shard, a later payload can retarget an earlier snapshot. Store pending payloads by (shard_id, snapshot_ts) and apply them only when handling that snapshot on the target store.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/eloq_data_store_service/data_store_service.cpp` around lines
2276 - 2335, The current SetStandbySnapshotPayload mutates process-wide defaults
on EloqStoreDataStoreFactory (via
UpdateStandbyMasterAddr/UpdateStandbyMasterStorePaths) causing
cross-shard/snapshot races; instead, change SetStandbySnapshotPayload to record
the parsed standby_addr and standby_master_store_paths/weights into a pending
map keyed by (shard_id, snapshot_ts) (associate with data_shards_ or a new
pending_payloads_ member) and do NOT call
EloqStoreDataStoreFactory::UpdateStandbyMasterAddr or
UpdateStandbyMasterStorePaths there; apply those pending values only when the
target shard actually opens or reloads that specific snapshot (in the store
open/reload path that has access to snapshot_ts and ds_ref/data_store_), and
then clear the pending entry; keep references to symbols:
DataStoreService::SetStandbySnapshotPayload, data_shards_/ds_ref,
EloqStoreDataStoreFactory, UpdateStandbyMasterAddr,
UpdateStandbyMasterStorePaths.
🧹 Nitpick comments (2)
tx_service/include/cc/catalog_cc_map.h (1)

2166-2172: Consider using LOG(ERROR) instead of DLOG(INFO) for unexpected error conditions.

DLOG macros are typically compiled out in release builds (when NDEBUG is defined), which means this diagnostic information won't be captured in production logs when the unexpected error occurs. Since this is an error path leading to assert(false), using LOG(ERROR) would ensure the diagnostic context is always logged, aiding in production debugging.

♻️ Suggested change
-                    DLOG(INFO) << "Unexpected DDLKvOpErrorCode in KvOpPhase, "
+                    LOG(ERROR) << "Unexpected DDLKvOpErrorCode in KvOpPhase, "
                                << "table=" << table_key->Name().StringView()
                                << ", ng_id=" << cc_ng_id_
                                << ", commit_ts=" << commit_ts
                                << ", ddl_kv_op_err="
                                << static_cast<int>(req.DDLKvOpErrorCode());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/catalog_cc_map.h` around lines 2166 - 2172, The
unexpected-error logging in KvOpPhase currently uses DLOG(INFO) which may be
compiled out; replace the DLOG(INFO) call that logs
table_key->Name().StringView(), cc_ng_id_, commit_ts and
static_cast<int>(req.DDLKvOpErrorCode()) with LOG(ERROR) so the diagnostic is
always emitted in production before the assert(false); ensure the message
content remains the same and keep the subsequent assert(false) intact (or
convert to an explicit abort if desired) so the error path behavior is
unchanged.
store_handler/data_store_service_client.cpp (1)

3904-3909: Add a clarifying comment explaining the no-op design.

OnSnapshotReceived unconditionally returns true without processing the request. This is intentional for DataStoreServiceClient, which is an RPC client class that does not store data locally. Actual snapshot handling occurs either on the remote service side or via handler implementations like RocksDBHandlerImpl. However, add a comment explaining why this method is intentionally a no-op to prevent future confusion.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 3904 - 3909, Add a
clarifying comment to the DataStoreServiceClient::OnSnapshotReceived method
explaining that it is intentionally a no-op because this class is an RPC client
that does not persist snapshots locally; note that snapshot processing is
performed on the remote service or in server-side handlers such as
RocksDBHandlerImpl, and that returning true acknowledges receipt for RPC
semantics to avoid future confusion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3639-3663: The CreateSnapshotForStandby implementation clears
snapshot_files but never fills it; update
DataStoreServiceClient::CreateSnapshotForStandby to either (A) populate
snapshot_files by invoking or adding a method on data_store_service_ that
returns the generated snapshot file paths (e.g., extend
data_store_service_->CreateSnapshotForStandby or add
data_store_service_->GetSnapshotFiles(ng_id, snapshot_ts, snapshot_files) and
call it after the snapshot is created), or (B) remove the snapshot_files
parameter from the DataStoreServiceClient::CreateSnapshotForStandby signature
and update callers (snapshot_manager.cpp, data_sync_task.cpp, checkpointer.cpp)
accordingly so they no longer expect file paths, or (C) if files are
intentionally unavailable, document in the function comment and update all
callers to handle an empty snapshot_files; ensure you reference the
DataStoreServiceClient::CreateSnapshotForStandby and
data_store_service_->CreateSnapshotForStandby symbols when making the change.

In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2455-2471: The method DataStoreService::DeleteStandbySnapshot
currently returns true for any non-null data_store_ without confirming deletion;
change it to reflect the real outcome by using the result from
ds_ref.data_store_->DeleteStandbySnapshot (or by checking that the snapshot tag
no longer exists before returning) and propagate that success/failure to callers
(and adjust callers like DeleteStandbySnapshotsBefore() to rely on the
propagated result) — locate the call site in DeleteStandbySnapshot, replace the
unconditional return true with a boolean based on the
data_store_->DeleteStandbySnapshot return value or an existence-check of the
"snapshot_<ts>" tag, and ensure logging reports success only when the store
confirms deletion.

In `@tx_service/include/store/data_store_handler.h`:
- Around line 373-383: The base implementations of CreateSnapshotForStandby (and
the similar method at lines 393-402) currently assert and then return true,
which in release builds falsely reports success; change these defaults to "fail
closed" by returning false (and optionally logging an error) instead of true so
any handler that forgets to override will cause the caller to detect failure;
update the implementations of CreateSnapshotForStandby and the corresponding
method(s) to remove the assert and return false (or throw a clear exception) so
callers do not proceed on a missing snapshot.

In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 75-80: The aggregate min ACK watermark is being set to
ctx_->resp.current_ckpt_ts() even when that value is 0, which pins
agg->min_ack_ckpt_ts to 0; change the update in the success path to ignore zero
checkpoint timestamps by only updating agg->min_ack_ckpt_ts when succ is true
AND ctx_->resp.current_ckpt_ts() != 0 (or > 0), i.e., modify the block that
updates agg->min_ack_ckpt_ts to skip RequestSyncSnapshot ACKs reporting
current_ckpt_ts() == 0 so archive cleanup can advance.

---

Outside diff comments:
In `@tx_service/src/store/snapshot_manager.cpp`:
- Around line 276-282: The current code moves the map element referenced by
pending_task into tasks (std::move(pending_task)), leaving the pending_req_
entry in a valid-but-unspecified state and breaking later retry reads (e.g.,
standby_node_term()); change this to copy the request/value instead of moving
the map element so the entry in pending_req_ remains intact. Locate the loop
where pending_task (the map element/iterator into pending_req_) is pushed into
tasks and replace the move with a copy/emplace of the element's value (retain
the iterator advancement logic: it++), ensuring subsequent retry logic that
reads pending_req_ reads stable data.

---

Duplicate comments:
In `@store_handler/eloq_data_store_service/data_store_service.cpp`:
- Around line 2379-2401: This is racy: serialize ReloadData per shard by
acquiring a per-shard reload lock (e.g., a mutex on ds_ref such as
ds_ref.reload_mutex_) before performing the operation; after taking that lock
re-check latest_snapshot_ts_.load(...) to ensure snapshot_ts is still >=
latest_snapshot_ts_, then call ds_ref.data_store_->ReloadData(term, snapshot_ts)
and only on success store the new latest_snapshot_ts_.store(snapshot_ts, ...);
release the lock after updating so concurrent reloads cannot interleave and
regress the snapshot time.
- Around line 2276-2335: The current SetStandbySnapshotPayload mutates
process-wide defaults on EloqStoreDataStoreFactory (via
UpdateStandbyMasterAddr/UpdateStandbyMasterStorePaths) causing
cross-shard/snapshot races; instead, change SetStandbySnapshotPayload to record
the parsed standby_addr and standby_master_store_paths/weights into a pending
map keyed by (shard_id, snapshot_ts) (associate with data_shards_ or a new
pending_payloads_ member) and do NOT call
EloqStoreDataStoreFactory::UpdateStandbyMasterAddr or
UpdateStandbyMasterStorePaths there; apply those pending values only when the
target shard actually opens or reloads that specific snapshot (in the store
open/reload path that has access to snapshot_ts and ds_ref/data_store_), and
then clear the pending entry; keep references to symbols:
DataStoreService::SetStandbySnapshotPayload, data_shards_/ds_ref,
EloqStoreDataStoreFactory, UpdateStandbyMasterAddr,
UpdateStandbyMasterStorePaths.

In `@store_handler/eloq_data_store_service/data_store.h`:
- Around line 152-160: The default ReloadData hook currently returns true, which
incorrectly signals success for stores that are no-op; change the default
implementation of virtual bool ReloadData(int64_t term, uint64_t snapshot_ts) to
return false (indicating unsupported/failure) while keeping the unused-parameter
casts ((void) term; (void) snapshot_ts;) and update the brief comment if needed;
ensure the signature and behavior of ReloadData are preserved so concrete stores
that actually implement standby reload continue to override and return true on
success.

In `@store_handler/rocksdb_handler.cpp`:
- Around line 2442-2450: CreateSnapshotForStandby currently zeroes out the
per-sync snapshot timestamp and reuses ckpt_path_, allowing concurrent standby
syncs to clobber each other's checkpoint dirs; instead propagate the per-sync
timestamp returned by SnapshotManager::SyncWithStandby into snapshot_ts and use
it to build a unique checkpoint path rather than always recreating ckpt_path_.
Concretely, in RocksDBCloudHandlerImpl::CreateSnapshotForStandby (and
EloqKV::RocksDBCloudHandlerImpl::CreateSnapshotForStandby) do not set
*snapshot_ts = 0; accept and assign the provided snapshot_ts from the created
snapshot, compute a timestamped ckpt path (derived from ckpt_path_ +
snapshot_ts) and pass that path to CreateSnapshot so each standby sync writes to
an isolated directory.

In `@store_handler/rocksdb_handler.h`:
- Around line 691-693: The CreateSnapshotForStandby override signature in
rocksdb_handler currently uses (std::vector<std::string>&, uint64_t*) and must
be changed to match the base: use (uint32_t ng_id, std::vector<std::string>&
snapshot_files, uint64_t snapshot_ts) — update both declarations in
rocksdb_handler.h (both occurrences) and the corresponding definitions in
rocksdb_handler.cpp to accept the uint32_t ng_id and a uint64_t snapshot_ts (not
a pointer) so the override matches the base method name
CreateSnapshotForStandby.

In `@tx_service/src/checkpointer.cpp`:
- Around line 353-366: The code currently ignores the boolean result from
CreateSnapshotForStandby and always calls BrocastPrimaryCkptTs, which can
advertise a checkpoint the standby can't fetch; update the logic around
Sharder::Instance().GetDataStoreHandler()->CreateSnapshotForStandby(...) to
capture and check its returned success bool and only call
BrocastPrimaryCkptTs(node_group, leader_term, last_succ_ckpt_ts,
status->HasDataStoreWrite()) when the snapshot creation succeeded (and preserve
existing behavior when status->HasDataStoreWrite() is false); apply the same fix
to the other identical block that calls CreateSnapshotForStandby and
BrocastPrimaryCkptTs.
- Around line 205-209: The preprocessor guard around the shared-storage check
causes the condition to be reduced to only is_standby_node when
DATA_STORE_TYPE_ELOQDSS_ELOQSTORE is defined, preventing the standby checkpoint
branch from running; remove the `#ifndef/` `#endif` so the if reads: if
(is_standby_node &&
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage()) thereby ensuring
the IsSharedStorage() check always runs (references: is_standby_node,
Sharder::Instance().GetDataStoreHandler()->IsSharedStorage(),
DATA_STORE_TYPE_ELOQDSS_ELOQSTORE).

In `@tx_service/src/remote/cc_node_service.cpp`:
- Around line 63-74: Destructor currently flips standby_task_running_ to false
causing StandbyTaskWorkerMain to exit immediately and drop items enqueued via
EnqueueStandbyTask; fix by ensuring queued tasks are drained before stopping the
worker: either (A) change the worker loop in StandbyTaskWorkerMain to continue
processing until standby_task_queue_ is empty even after standby_task_running_
becomes false, or (B) in CcNodeService::~CcNodeService wait (using
standby_task_mu_ and standby_task_cv_) until standby_task_queue_ is empty before
setting standby_task_running_ to false and notifying/joining
standby_task_worker_; update synchronization around standby_task_queue_,
standby_task_running_, EnqueueStandbyTask(), standby_task_cv_ to avoid races and
ensure proper wakeups.
- Around line 1765-1777: The queued lambda must re-check checkpoint monotonicity
to avoid moving standby ckpt backwards: inside the EnqueueStandbyTask lambda
(the closure capturing store_hd, ng_id, ng_term, ckpt_ts) fetch the current
checkpoint for ng_id (e.g., via Sharder::Instance().NativeNodeGroupCkptTs(ng_id)
or equivalent getter) and skip calling
OnUpdateStandbyCkptTs/UpdateNodeGroupCkptTs if ckpt_ts is older or not greater
than the current value; alternatively ensure OnUpdateStandbyCkptTs itself
enforces monotonicity and returns false for stale ckpts so UpdateNodeGroupCkptTs
is only called when the stored checkpoint actually advanced.
- Around line 1718-1729: The fallback when getenv("USER") is empty currently
builds "leader_ip:path" which is parsed as a local store-path; instead ensure
the master address is recognized by prepending an '@' (or otherwise ensuring a
remote-prefix) when constructing snapshot_path in the else branch where user is
null/empty — update the logic that builds snapshot_path (using leader_ip and
store_hd->SnapshotSyncDestPath()) before calling
response->set_snapshot_path(...) so the resulting string includes the '@' marker
(e.g., "@"+leader_ip+":"+store_hd->SnapshotSyncDestPath()).

In `@tx_service/src/standby.cpp`:
- Around line 67-73: The code updates agg->min_ack_ckpt_ts with
ctx_->resp.current_ckpt_ts() even when that value is 0, which can pin the
cleanup watermark; change the update in the success path (inside the
std::lock_guard on agg->mux where succ is true and agg->success is incremented)
to only take the min when ctx_->resp.current_ckpt_ts() > 0 (i.e., skip zero
ACKs), leaving all locking and agg->success logic unchanged.

---

Nitpick comments:
In `@store_handler/data_store_service_client.cpp`:
- Around line 3904-3909: Add a clarifying comment to the
DataStoreServiceClient::OnSnapshotReceived method explaining that it is
intentionally a no-op because this class is an RPC client that does not persist
snapshots locally; note that snapshot processing is performed on the remote
service or in server-side handlers such as RocksDBHandlerImpl, and that
returning true acknowledges receipt for RPC semantics to avoid future confusion.

In `@tx_service/include/cc/catalog_cc_map.h`:
- Around line 2166-2172: The unexpected-error logging in KvOpPhase currently
uses DLOG(INFO) which may be compiled out; replace the DLOG(INFO) call that logs
table_key->Name().StringView(), cc_ng_id_, commit_ts and
static_cast<int>(req.DDLKvOpErrorCode()) with LOG(ERROR) so the diagnostic is
always emitted in production before the assert(false); ensure the message
content remains the same and keep the subsequent assert(false) intact (or
convert to an explicit abort if desired) so the error path behavior is
unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 68080583-3a60-42a9-a35b-18b1c0227fa2

📥 Commits

Reviewing files that changed from the base of the PR and between 3403738 and 8425155.

📒 Files selected for processing (25)
  • store_handler/data_store_service_client.cpp
  • store_handler/data_store_service_client.h
  • store_handler/eloq_data_store_service/build_eloq_store.cmake
  • store_handler/eloq_data_store_service/data_store.h
  • store_handler/eloq_data_store_service/data_store_service.cpp
  • store_handler/eloq_data_store_service/data_store_service.h
  • store_handler/eloq_data_store_service/eloq_store_config.cpp
  • store_handler/eloq_data_store_service/eloq_store_config.h
  • store_handler/eloq_data_store_service/eloq_store_data_store.cpp
  • store_handler/eloq_data_store_service/eloq_store_data_store.h
  • store_handler/eloq_data_store_service/eloq_store_data_store_factory.h
  • store_handler/eloq_data_store_service/eloqstore
  • store_handler/eloq_data_store_service/rocksdb_data_store_common.h
  • store_handler/rocksdb_handler.cpp
  • store_handler/rocksdb_handler.h
  • tx_service/include/cc/catalog_cc_map.h
  • tx_service/include/proto/cc_request.proto
  • tx_service/include/remote/cc_node_service.h
  • tx_service/include/store/data_store_handler.h
  • tx_service/src/checkpointer.cpp
  • tx_service/src/data_sync_task.cpp
  • tx_service/src/fault/cc_node.cpp
  • tx_service/src/remote/cc_node_service.cpp
  • tx_service/src/standby.cpp
  • tx_service/src/store/snapshot_manager.cpp
💤 Files with no reviewable changes (1)
  • store_handler/eloq_data_store_service/eloq_store_config.h
🚧 Files skipped from review as they are similar to previous changes (3)
  • store_handler/eloq_data_store_service/eloqstore
  • tx_service/src/data_sync_task.cpp
  • tx_service/include/remote/cc_node_service.h

Comment on lines +3639 to +3663
bool DataStoreServiceClient::CreateSnapshotForStandby(
uint32_t ng_id,
std::vector<std::string> &snapshot_files,
uint64_t snapshot_ts)
{
#ifdef DATA_STORE_TYPE_ELOQDSS_ELOQSTORE
if (!bind_data_shard_with_ng_ || data_store_service_ == nullptr)
{
return false;
}
DLOG(INFO) << "CreateSnapshotForStandby begin, generated_snapshot_ts="
<< snapshot_ts;
const bool ok = data_store_service_->CreateSnapshotForStandby(
ng_id, ng_id, snapshot_ts);
snapshot_files.clear();
DLOG(INFO) << "CreateSnapshotForStandby success, snapshot_ts=" << snapshot_ts
<< ", snapshot_files=" << snapshot_files.size() << ", ok=" << ok;
return ok;
#else
(void) ng_id;
(void) snapshot_files;
(void) snapshot_ts;
return false;
#endif
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the signature of DataStoreService::CreateSnapshotForStandby
ast-grep --pattern $'CreateSnapshotForStandby($_, $_, $_) {
  $$$
}'

# Also search for declaration
rg -n "CreateSnapshotForStandby" --type cpp -A 2 -B 1

Repository: eloqdata/tx_service

Length of output: 8104


Output parameter snapshot_files is cleared but never populated.

The function signature promises to populate snapshot_files with snapshot file paths (line 3641), but the implementation clears the vector (line 3653) and never populates it. The underlying data_store_service_->CreateSnapshotForStandby() method doesn't provide a way to retrieve snapshot files, making it impossible to fulfill this contract. Multiple callers (snapshot_manager.cpp, data_sync_task.cpp, checkpointer.cpp) expect snapshot_files to contain actual file paths after this call returns.

Either remove the snapshot_files parameter from the function signature, provide a way to retrieve snapshot file paths from the underlying service, or document that this parameter will remain empty.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 3639 - 3663, The
CreateSnapshotForStandby implementation clears snapshot_files but never fills
it; update DataStoreServiceClient::CreateSnapshotForStandby to either (A)
populate snapshot_files by invoking or adding a method on data_store_service_
that returns the generated snapshot file paths (e.g., extend
data_store_service_->CreateSnapshotForStandby or add
data_store_service_->GetSnapshotFiles(ng_id, snapshot_ts, snapshot_files) and
call it after the snapshot is created), or (B) remove the snapshot_files
parameter from the DataStoreServiceClient::CreateSnapshotForStandby signature
and update callers (snapshot_manager.cpp, data_sync_task.cpp, checkpointer.cpp)
accordingly so they no longer expect file paths, or (C) if files are
intentionally unavailable, document in the function comment and update all
callers to handle an empty snapshot_files; ensure you reference the
DataStoreServiceClient::CreateSnapshotForStandby and
data_store_service_->CreateSnapshotForStandby symbols when making the change.

Comment on lines +75 to +80
{
std::lock_guard<bthread::Mutex> lk(agg->mux);
if (succ)
{
agg->min_ack_ckpt_ts = std::min(agg->min_ack_ckpt_ts,
ctx_->resp.current_ckpt_ts());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ignore zero RequestSyncSnapshot ACKs in the aggregate.

A successful standby can return current_ckpt_ts() == 0. Recording that as the minimum ACK watermark pins min_ack_ckpt_ts to 0, so the archive cleanup later in this function never advances.

🩹 Suggested fix
         {
             std::lock_guard<bthread::Mutex> lk(agg->mux);
             if (succ)
             {
-                agg->min_ack_ckpt_ts = std::min(agg->min_ack_ckpt_ts,
-                                                ctx_->resp.current_ckpt_ts());
+                const uint64_t ack_ckpt_ts = ctx_->resp.current_ckpt_ts();
+                if (ack_ckpt_ts != 0)
+                {
+                    agg->min_ack_ckpt_ts =
+                        std::min(agg->min_ack_ckpt_ts, ack_ckpt_ts);
+                }
             }
             agg->pending--;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/store/snapshot_manager.cpp` around lines 75 - 80, The
aggregate min ACK watermark is being set to ctx_->resp.current_ckpt_ts() even
when that value is 0, which pins agg->min_ack_ckpt_ts to 0; change the update in
the success path to ignore zero checkpoint timestamps by only updating
agg->min_ack_ckpt_ts when succ is true AND ctx_->resp.current_ckpt_ts() != 0 (or
> 0), i.e., modify the block that updates agg->min_ack_ckpt_ts to skip
RequestSyncSnapshot ACKs reporting current_ckpt_ts() == 0 so archive cleanup can
advance.

@thweetkomputer thweetkomputer force-pushed the feat-eloqstore-local-standby-zc branch 5 times, most recently from 7944161 to 3daf349 Compare March 16, 2026 15:41
@MrGuin MrGuin force-pushed the feat-eloqstore-local-standby-zc branch from 7c941bc to 1dc28b7 Compare March 17, 2026 09:13
@thweetkomputer thweetkomputer merged commit 1a4729b into main Mar 17, 2026
4 checks passed
@thweetkomputer thweetkomputer deleted the feat-eloqstore-local-standby-zc branch March 17, 2026 10:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants