Skip to content

Shard keys on one core for range partition#454

Merged
yi-xmu merged 15 commits intomainfrom
shard_keys_one_core_for_rangepartition_base
Mar 16, 2026
Merged

Shard keys on one core for range partition#454
yi-xmu merged 15 commits intomainfrom
shard_keys_one_core_for_rangepartition_base

Conversation

@yi-xmu
Copy link
Collaborator

@yi-xmu yi-xmu commented Mar 13, 2026

For range partitioning, keys within the same range are sharded onto the same core.

Summary by CodeRabbit

  • New Features

    • Table-range size fetching and tracking, with range-split triggering based on size.
    • Partition-aware metadata propagation across write/read/sync flows.
    • Scan/stream responses now include core identification for targeted handling.
  • Performance Improvements

    • Unified (range-level) key cache replacing per-core caches.
    • Targeted single-core dispatch for range/partition work and upload batches.
    • Prioritized data-sync/task queuing and partition-aware batching for efficiency.

yi-xmu added 14 commits March 13, 2026 18:33
In order to trigger range splits in a timely manner, the range size is tracked in memory.

Init range sizes interface
Update range size interface
Reset range splitting status interface
1. Update storage range size
After a successful flush, the store range size is persisted.

2. Load range size from storage
When accessing the memory range size, if it has not yet been initialized, a fetch operation from storage is performed to retrieve the range size.
1. During a post-write operation, the range size information for the corresponding partition is maintained.

2. If a range is in the process of splitting, the range size is updated in the delta size information.

3. For a "double-write" operation, only the range size information for the newly split partition is updated.
1. In the post-commit phase of a range split transaction, the range size of all related partitions is updated: base range size + delta size.

2. Reset the range splitting flag.

3. Update the kickoutcc process to accommodate the new key sharding logic.

4. Update the processing procedure for SampleSubRangeKeys to accommodate the new key sharding logic.
1. Update range size during UploadBatchCc request for new index.

2. Update the UploadBatchCc process to accommodate the new key sharding logic.

3. Update the create secondary index process to accommodate the new key sharding logic.
1. Update the range size during data log replay.

2. For post commit range split log, update range size for each newly splitting ranges.

3. Update the data log replay process to accommodate the new key sharding logic.
Update the structure definitions and related processing procedures of ScanSliceCc and RemoteScanSlice to adapt to the new key sharding logic.
1. Update the structure definitions and related processing procedures of DataSyncScanCc and ScanSliceDeltaSizeCc, as well as the DataSync processing procedure, to adapt to the new key sharding logic.

2. Update key shard code for UpdateCkptTs request.
Update the structure definition and related processing procedures of FillStoreSlice to adapt to the new key sharding logic.
Adapt read operation with new key sharding for range partition.
Update datasynctask constructor to check new and old range owner shard
To reduce cache hit rate, during range splitting, keys located on the new range that fall on other cores (local nodes or remote nodes) can be sent to the corresponding core.

1. Update the logic and related requests for sending range cache during range split

2. Update key shard for UploadBatchSlices rpc.
Including InitKeyCacheCc , UpdateKeyCacheCc ,StoreSlice::cache_validity_  , StoreRange::key_cache_
If a datasync task that is supposed to trigger a range split ends without actually triggering the split, the range splitting state needs to be reset.
@yi-xmu yi-xmu self-assigned this Mar 13, 2026
@yi-xmu yi-xmu requested a review from liunyl March 13, 2026 10:46
@coderabbitai
Copy link

coderabbitai bot commented Mar 13, 2026

Walkthrough

The PR adds range-partition single-core sharding and range-size tracking: it introduces FetchTableRangeSize APIs, threads a new range_size field through data-store encoding and metadata batches, centralizes per-core state into single-range state, and adds split-triggering and data-sync task plumbing across CC, shard, and data-sync layers.

Changes

Cohort / File(s) Summary
Store Handler Interface Extensions
store_handler/bigtable_handler.h, store_handler/bigtable_handler.cpp, store_handler/dynamo_handler.h, store_handler/dynamo_handler.cpp, store_handler/rocksdb_handler.h, store_handler/rocksdb_handler.cpp
Added FetchTableRangeSize(FetchTableRangeSizeCc*) to DataStore handlers; .cpp stubs log error and assert (not implemented).
Data Store Client & Closures
store_handler/data_store_service_client.h, store_handler/data_store_service_client.cpp, store_handler/data_store_service_client_closure.h, store_handler/data_store_service_client_closure.cpp
Propagated new range_size through RangeSliceBatchPlan, EncodeRangeValue, metadata enqueue/dispatch, and added FetchTableRangeSize + FetchRangeSizeCallback; adjusted value-size assertions and parsing.
DataStoreHandler Interface
tx_service/include/store/data_store_handler.h
Added pure-virtual FetchTableRangeSize(FetchTableRangeSizeCc*) to the store handler interface.
CC Request / CC Misc Refactor
tx_service/include/cc/cc_req_misc.h, tx_service/src/cc/cc_req_misc.cpp, tx_service/include/cc/cc_request.h
Introduced FetchTableRangeSizeCc; removed many per-core parameters (core_id/core_cnt) and consolidated per-core state into single-instance fields; added partition_id and on_dirty_range propagation to PostWriteCc and related CC types.
CcMap / Range Size State
tx_service/include/cc/cc_map.h, tx_service/src/cc/cc_map.cpp, tx_service/include/type.h
Added range_sizes_ map, RangeSizeStatus enum, and InitRangeSize/ResetRangeStatus APIs to track persisted/loading sizes and decide split triggers.
TemplateCcMap & Range-split Logic
tx_service/include/cc/template_cc_map.h, tx_service/src/cc/template_cc_map.cpp
Added UpdateRangeSize(partition_id, delta_size, is_dirty) to centralize range-size updates and potential split triggering; replaced broadcast/per-core flows with partition-targeted dispatch.
CcShard & Local Shards
tx_service/include/cc/cc_shard.h, tx_service/src/cc/cc_shard.cpp, tx_service/include/cc/local_cc_shards.h, tx_service/src/cc/local_cc_shards.cpp
Added CcShard APIs: FetchTableRangeSize, ResetRangeSplittingStatus, CreateSplitRangeDataSyncTask; introduced fetch_range_size_cc_pool_; added CreateSplitRangeDataSyncTask and high_priority handling when enqueuing range-data-sync tasks; changed pending_tasks_ queue to deque and added priority enqueueing.
Per-core -> Single-range Key-cache & Scanners
tx_service/include/cc/range_slice.h, tx_service/src/cc/range_slice.cpp, tx_service/include/cc/ccm_scanner.h
Removed per-core key-cache APIs and data structures, moved to a single range-wide key_cache_; RangePartitionedCcmScanner now uses a single scan_cache_ instead of per-core scans; adjusted related APIs.
Remote/Local CC Handler Signatures & Routing
tx_service/include/cc/cc_handler.h, tx_service/include/cc/local_cc_handler.h, tx_service/include/remote/remote_cc_handler.h, tx_service/src/cc/local_cc_handler.cpp, tx_service/src/remote/remote_cc_handler.cpp
Extended PostWrite signatures to accept partition_id and on_dirty_range (defaulted); handlers propagate these fields into PostCommit messages; routing now often targets a single core derived from partition_id.
Remote CC Request / Stream / Node Service
tx_service/include/remote/remote_cc_request.h, tx_service/src/remote/remote_cc_request.cpp, tx_service/src/remote/cc_stream_receiver.cpp, tx_service/src/remote/cc_node_service.cpp
Removed per-core Reset parameters (core_cnt), replaced per-core scan_cache_vec_ with single scan_cache_, simplified scan response packing to a single tuple_cnt and added core_id mapping from range_id; UploadBatch now accepts partition_id and range_size_flags and routes to single core when partitioned.
WriteSetEntry & Upload Indexing
tx_service/include/read_write_entry.h, tx_service/src/sk_generator.cpp, tx_service/include/sk_generator.h
Added partition_id_ and on_dirty_range_ to WriteSetEntry, changed forward_addr_ map to carry partition_id with CcEntryAddr; UploadIndexContext::SendIndexes signature now accepts partition_id and per-entry range_size_flags alongside write entries.
Tx Operation & Execution Adjustments
tx_service/src/tx_execution.cpp, tx_service/src/tx_operation.cpp
Read-path shard calculation now derives core from range PartitionId for range-partitioned tables; propagate partition_id and on_dirty_range through PostWrite/forward paths; update forwarding/address pair handling.
DataSync Task & Scan Flow Simplification
tx_service/include/data_sync_task.h, tx_service/src/data_sync_task.cpp, tx_service/src/cc/local_cc_shards.cpp, tx_service/src/cc/range_slice.cpp
Added high_priority_ flag to DataSyncTask; added ResetRangeSplittingStatus() and logic to enqueue high-priority split tasks to front of per-partition queues; many per-core loops replaced by single-target enqueues derived from partition_id.
Log Replay / Recovery Split Info
tx_service/include/fault/log_replay_service.h, tx_service/src/fault/log_replay_service.cpp
Added tracking API and storage for split-range commit timestamps per node-group/table/partition (SetSplitRangeInfo, GetSplitRangeInfo, CleanSplitRangeInfo) and integrated split-range parsing during log replay.
Proto Changes
tx_service/include/proto/cc_request.proto
Added partition_id and range_size_flags to UploadBatchRequest; added partition_id and on_dirty_range to PostCommitRequest; simplified ScanSliceRequest/Response fields (single prior lock, added core_id).
Operation Result / Scan Result Simplification
tx_service/include/tx_operation_result.h
Removed per-shard trailing bookkeeping; simplified LastKey semantics (removed atomic status), adjusted RemoteScanSliceCache/RangeScanSliceResult shapes to single-instance caches.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client/CcShard
    participant TemplateCcMap as TemplateCcMap
    participant CcMap as CcMap
    participant DataStore as DataStoreServiceClient
    participant LocalShards as LocalCcShards

    Client->>TemplateCcMap: UpdateRangeSize(partition_id, delta_size, is_dirty)
    TemplateCcMap->>CcMap: InitRangeSize(partition_id, persisted_size, succeed, emplace)
    CcMap-->>TemplateCcMap: trigger_split (bool)
    
    alt Split Triggered
        TemplateCcMap->>LocalShards: CreateSplitRangeDataSyncTask(...)
        LocalShards->>LocalShards: EnqueueRangeDataSyncTask(high_priority=true)
    end
    
    alt Persisted Size Missing
        TemplateCcMap->>DataStore: FetchTableRangeSize(partition_id)
        DataStore->>DataStore: Read table_ranges
        DataStore-->>TemplateCcMap: store_range_size (via callback)
        TemplateCcMap->>CcMap: InitRangeSize(partition_id, store_range_size)
    end
Loading
sequenceDiagram
    participant Remote as RemoteNode
    participant Handler as RemoteCcHandler
    participant Request as PostWriteCc
    participant CcMap as TemplateCcMap

    Remote->>Handler: PostWrite(..., partition_id, on_dirty_range)
    Handler->>Request: Reset(..., partition_id, on_dirty_range)
    
    alt Range-Partitioned
        Request->>CcMap: Execute (includes partition context)
        alt Dirty Range
            CcMap->>CcMap: UpdateRangeSize(partition_id, delta, is_dirty=true)
        end
    end
    
    Handler-->>Remote: PostCommitResponse (with partition metadata)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Range sizes definition #438 — overlaps on range-size APIs and CcMap/TemplateCcMap range-size state changes (InitRangeSize/ResetRangeStatus, RangeSizeStatus).
  • add jemalloc support #287 — overlaps on data_store_service_client_closure callbacks and value layout parsing (FetchTableRanges/FetchRangeSlices changes and FetchRangeSizeCallback additions).
  • optimze update range slices #254 — touches DataStoreServiceClient range metadata batching and EncodeRangeValue/metadata propagation similar to this PR.

Suggested reviewers

  • liunyl
  • lokax

"I nibble bytes and bytes nibble me,
Ranges shrink and ranges grow with glee,
One core to rule them, sizes to trace,
A split, a hop — we keep the pace! 🐇"

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch shard_keys_one_core_for_rangepartition_base

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
tx_service/include/cc/cc_request.h (2)

2517-2522: ⚠️ Potential issue | 🟠 Major

Unpin retained slices before failing a resumed ScanSliceCc.

Line 2521 marks the request failed, but this path skips the slice cleanup that AbortCcRequest() already does. If leadership changes after the request has pinned slices, those pins are leaked.

🧹 Suggested fix
         if (!ValidTermCheck())
         {
             // Do not modify res_ directly since there could be other cores
             // still working on this cc req.
             SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER);
+            if (range_slice_id_.Range() != nullptr)
+            {
+                UnpinSlices();
+            }
             return true;
         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 2517 - 2522, The failure
path in ValidTermCheck() sets the error and returns without unpinning retained
slices for a resumed ScanSliceCc, which leaks pins; before calling
SetError(CcErrorCode::REQUESTED_NODE_NOT_LEADER) and returning, invoke the same
cleanup used by AbortCcRequest() (or inline the retained-slice unpin logic used
by AbortCcRequest()) for ScanSliceCc instances so all pinned slices are released
when leadership validation fails; locate ValidTermCheck(), ScanSliceCc, and
AbortCcRequest() to add the cleanup call just prior to SetError and return.

7558-7587: ⚠️ Potential issue | 🔴 Critical

The local UploadBatchCc::Reset() no longer matches the partition_id_ == -1 broadcast contract.

Lines 7583-7586 hardcode a single unfinished slot, but Lines 7809-7810 still say -1 means the hash-partition broadcast path. In that mode, the next shard to call SetPausedPosition() / GetPausedPosition() will index past paused_pos_, and SetFinish() can report completion after the first shard only.

🛠️ Suggested fix
         req_cv_ = &req_cv;
         finished_req_cnt_ = &finished_req_cnt;
         req_result_ = &req_result;
-        unfinished_cnt_.store(1, std::memory_order_relaxed);
+        const size_t slot_cnt =
+            partition_id >= 0 ? 1
+                              : Sharder::Instance().GetLocalCcShardsCount();
+        unfinished_cnt_.store(slot_cnt, std::memory_order_relaxed);
         err_code_.store(CcErrorCode::NO_ERROR, std::memory_order_relaxed);
         paused_pos_.clear();
-        paused_pos_.resize(1, {});
+        paused_pos_.resize(slot_cnt, {});
         data_type_ = data_type;

Also applies to: 7764-7786, 7809-7831

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 7558 - 7587, The Reset in
UploadBatchCc no longer respects the broadcast contract where partition_id_ ==
-1 means per-shard slots: update UploadBatchCc::Reset to detect when
partition_id == -1 and initialize unfinished_cnt_ and paused_pos_ to accommodate
all shards (use entry_vec.size() or another appropriate shard count instead of
hardcoding 1); specifically, replace the single-slot initialization
(unfinished_cnt_.store(1, ...) and paused_pos_.resize(1, {})) with logic that
sets unfinished_cnt_ to the shard count and paused_pos_.resize(shard_count, {}),
so SetPausedPosition(), GetPausedPosition() and SetFinish() index and complete
correctly in broadcast mode. Ensure the same change is applied in the other
Reset variants noted (the blocks around 7764-7786 and 7809-7831).
tx_service/src/tx_execution.cpp (1)

4615-4634: ⚠️ Potential issue | 🔴 Critical

Forward-write log entries are being dropped for existing node groups.

At Line 4617, if (!inserted) continue; skips appending when forward_ng_id already exists in ng_table_set. That causes only the first forwarded write per node group to be logged; subsequent forwarded writes are silently omitted from WAL payload construction.

💡 Proposed fix
-                auto [table_rec_it, inserted] =
-                    ng_table_set.try_emplace(forward_ng_id);
-                if (!inserted)
-                {
-                    continue;
-                }
+                auto [table_rec_it, inserted] =
+                    ng_table_set.try_emplace(forward_ng_id);
                 std::unordered_map<
                     TableName,
                     std::vector<
                         std::pair<const TxKey *, const WriteSetEntry *>>>
                     &table_rec_set = table_rec_it->second.second;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/tx_execution.cpp` around lines 4615 - 4634, The code currently
skips appending forwarded writes when ng_table_set already contains
forward_ng_id because of the early "if (!inserted) continue;" check; instead,
always use the iterator returned by ng_table_set.try_emplace (table_rec_it) to
obtain the table_rec_set and append the write. Remove the "if (!inserted)
continue;" branch and proceed to access table_rec_it->second.second (the
std::unordered_map named table_rec_set), then emulate the existing emplace into
that map (using rec_vec_it and emplacing the pair of &write_key and &wset_entry)
so subsequent forwarded writes for the same forward_ng_id are recorded.
tx_service/include/proto/cc_request.proto (1)

1114-1126: ⚠️ Potential issue | 🟠 Major

Avoid protobuf tag reuse for ScanSliceResponse.

Tag 16 was previously assigned to key_start_offsets in deployed versions. Reusing it for core_id is wire-incompatible and will cause deserialization errors in clients/servers running older code. Reserve the previous tags and assign core_id to a new tag instead.

🔧 Suggested fix
 message ScanSliceResponse {
     uint64 tx_number = 1;
     uint64 handler_addr = 2;
     int64 tx_term = 3;
     uint32 command_id=4;
     int32 error_code = 5;
+    reserved 16, 17, 18;
     bytes tuple_cnt = 6;
     bytes last_key = 7;
     SlicePosition slice_position = 8;
     bytes keys = 9;
     bytes records = 10;
     bytes rec_status = 11;
     bytes key_ts = 12;
     bytes gap_ts = 13;
     bytes cce_lock_ptr = 14;
     bytes term = 15;
     uint64 txm_addr = 19;
+    uint32 core_id = 20;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/proto/cc_request.proto` around lines 1114 - 1126,
ScanSliceResponse reuses protobuf tag 16 for the new field core_id but that tag
was previously used for key_start_offsets in deployed versions, causing
wire-incompatibility; update ScanSliceResponse to keep the old tag numbers
reserved and move core_id to an unused, new tag number (e.g., >19), ensure the
proto comment or reserved statement marks tag 16 as reserved for
key_start_offsets, and update any references to core_id to use the new tag
identifier so clients/servers with older code remain compatible.
tx_service/include/cc/template_cc_map.h (1)

6295-6339: ⚠️ Potential issue | 🟠 Major

Keep source and target range IDs separate during replay.

In the new_range_id path, partition_id still names the source range, but the record is replayed onto new_range_id’s core. The later UpdateRangeSize(...partition_id...) call will therefore charge the delta to the old range and leave the new range’s size stale after recovery. Keep the source ID for RangeSplitCommitTs() and use a separate target ID for size bookkeeping.

🛠️ Proposed fix
-            int32_t partition_id = -1;
+            int32_t partition_id = -1;
+            int32_t target_partition_id = -1;
...
-                partition_id = range_entry->GetRangeInfo()->PartitionId();
+                partition_id = range_entry->GetRangeInfo()->PartitionId();
+                target_partition_id = partition_id;
...
-                        core_id = static_cast<uint16_t>((new_range_id & 0x3FF) %
-                                                        shard_->core_cnt_);
+                        core_id = static_cast<uint16_t>((new_range_id & 0x3FF) %
+                                                        shard_->core_cnt_);
+                        target_partition_id = new_range_id;
                         is_dirty = true;
...
-                        UpdateRangeSize(static_cast<uint32_t>(partition_id),
+                        UpdateRangeSize(
+                            static_cast<uint32_t>(target_partition_id),
                             delta_size,
                             is_dirty);

Also applies to: 6499-6515

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/template_cc_map.h` around lines 6295 - 6339, When
handling the new_range_id path we are reusing partition_id (the source range)
for both commit-ts checks and later size updates, which charges size deltas to
the old range; introduce a separate target_range_id (or similarly named
variable) and keep partition_id as the source_range_id, call
req.RangeSplitCommitTs(source_range_id) and use target_range_id (new_range_id)
for computing core_id and for any UpdateRangeSize/size bookkeeping (i.e., use
target_range_id when invoking UpdateRangeSize and related size logic); make the
same change in the other occurrence referenced (around the block for lines
6499-6515) so split replay charges size to the target range not the source.
store_handler/data_store_service_client.cpp (1)

1305-1325: ⚠️ Potential issue | 🟠 Major

Fix backward compatibility for legacy range records without range_size field.

FetchRangeSizeCallback asserts that stored values must have size exactly equal to sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(uint32_t) + sizeof(int32_t) (lines 956-957). Pre-existing records without the trailing int32_t range_size will fail this assertion. Add conditional handling to support legacy record format: check actual value size and only read range_size if the field is present; otherwise default to 0 or appropriate fallback value.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` around lines 1305 - 1325,
FetchRangeSizeCallback currently asserts the stored value length equals the full
new format which breaks on legacy records; update FetchRangeSizeCallback to
check the actual value size and parse only the fields present (parse range_id,
range_version, version, segment_cnt always, and read range_size only if
value.size() >=
sizeof(int32_t)+sizeof(uint64_t)+sizeof(uint64_t)+sizeof(uint32_t)+sizeof(int32_t)),
otherwise set range_size to 0 (or the agreed default) and avoid the assertion;
keep EncodeRangeValue unchanged so new records continue to include range_size.
tx_service/src/cc/local_cc_shards.cpp (1)

2540-2582: ⚠️ Potential issue | 🔴 Critical

Use references when selecting the split-task queue.

Line 2540 and Line 2567 bind data_sync_task_queue_[...] by value, so both push_front() calls only mutate temporary deques and the split subtasks never reach the workers. After fixing that, keep the worker wakeup after the shared status->unfinished_* initialization; otherwise the now-visible tasks can complete against a zero-task status object.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/local_cc_shards.cpp` around lines 2540 - 2582, The code
binds data_sync_task_queue_[...] to a temporary deque (auto task_queue) so
push_front mutates a copy and tasks never reach workers; change both
declarations that create task_queue (the one before creating old_range_task and
the one inside the for-loop creating new_range_task) to use a reference (auto&
task_queue) so push_front mutates the actual queue in data_sync_task_queue_.
After making this change, ensure the worker wakeup call remains after the shared
status object's unfinished_* fields are initialized (the status used in
DataSyncTask) so workers see a non-zero unfinished count before tasks can
complete.
tx_service/include/cc/cc_req_misc.h (1)

439-447: ⚠️ Potential issue | 🔴 Critical

Remove unconditional Free() from AbortCcRequest to prevent use-after-free with async callbacks.

SetFinish() does not call Free() and the comment at line 707-708 explicitly states the object "will be freed after CommitLoading." Calling Free() unconditionally in AbortCcRequest creates a race: if an async data store callback still references this request (e.g., completion → SetFinish()), it becomes a use-after-free.

Proposed fix
 void AbortCcRequest(CcErrorCode err_code) override
 {
     assert(err_code != CcErrorCode::NO_ERROR);
     DLOG(ERROR) << "Abort this FillStoreSliceCc request with error: "
                 << CcErrorMessage(err_code);
     SetError(err_code);
-    // Recycle request
-    Free();
+    // Do not Free() here; let SetError/SetFinish own terminal lifecycle.
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_req_misc.h` around lines 439 - 447, AbortCcRequest
currently calls Free() unconditionally which can cause use-after-free when async
datastore callbacks still hold references (e.g., those that will call
SetFinish/CommitLoading); change AbortCcRequest (in class handling
FillStoreSliceCc requests) to set the error via SetError(err_code), log as
before, but do NOT call Free() here—let the normal completion path
(SetFinish()/CommitLoading) or the owner decide when to Free() the object; if a
synchronous early-free is truly required, gate Free() behind a safe check/flag
that ensures no outstanding callbacks, otherwise remove the Free() call and add
a clarifying comment.
🟠 Major comments (25)
tx_service/src/sk_generator.cpp-350-355 (1)

350-355: ⚠️ Potential issue | 🟠 Major

Leader-term check cadence is broken by a per-batch counter reset.

Line 350 resets batch_tuples each loop, so Line 453 (batch_tuples % 10240 == 0) usually won’t run for normal batch sizes. That delays/blocks termination on leader transfer.

Proposed fix
-    size_t batch_tuples = 0;
+    size_t batch_tuples = 0;
+    size_t term_check_counter = 0;
...
-                batch_tuples += scan_req.accumulated_scan_cnt_;
-                if (batch_tuples % 10240 == 0 &&
-                    !task_status_->CheckTxTermStatus())
-                {
-                    LOG(WARNING)
-                        << "ScanAndEncodeIndex: Terminate this task cause "
-                        << "the tx leader transferred of ng#" << node_group_id_;
-                    task_status_->TerminateGenerateSk();
-                    task_result_ = CcErrorCode::TX_NODE_NOT_LEADER;
-                    return;
-                }
+                batch_tuples += scan_req.accumulated_scan_cnt_;
+                term_check_counter += scan_req.accumulated_scan_cnt_;
+                if (term_check_counter >= 10240)
+                {
+                    if (!task_status_->CheckTxTermStatus())
+                    {
+                        LOG(WARNING)
+                            << "ScanAndEncodeIndex: Terminate this task cause "
+                            << "the tx leader transferred of ng#" << node_group_id_;
+                        task_status_->TerminateGenerateSk();
+                        task_result_ = CcErrorCode::TX_NODE_NOT_LEADER;
+                        return;
+                    }
+                    term_check_counter = 0;
+                }

Also applies to: 450-455

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/sk_generator.cpp` around lines 350 - 355, The bug is that
batch_tuples is being reset inside the per-batch loop (the line setting
batch_tuples = 0 before EnqueueToCcShard/scan_req.Wait()), which prevents the
periodic leader-term check (the condition using batch_tuples % 10240 == 0) from
ever firing for normal batch sizes; remove or move that reset so batch_tuples
persists across batches (only initialize it once before the outer scan loop or
reset it when you truly begin a new scan sequence) so the cadence check in the
code that references batch_tuples will run as intended; locate the reset near
the EnqueueToCcShard/scan_req.Wait() calls (references: batch_tuples,
partition_id_, cc_shards->EnqueueToCcShard, scan_req.Wait()) and adjust
initialization scope accordingly.
tx_service/include/cc/range_slice.h-1051-1068 (1)

1051-1068: ⚠️ Potential issue | 🟠 Major

Potential data race when replacing key_cache_ under shared lock.

The method acquires a shared_lock (line 1059) but then modifies key_cache_ (lines 1067-1068). Concurrent readers holding the same shared lock may be accessing key_cache_ (e.g., via ContainsKey, AddKey), leading to undefined behavior when the unique_ptr is reassigned.

Consider using an exclusive lock (unique_lock) for this operation, or restructuring to separate the invalidation of slice validity (which only needs shared lock) from the cache replacement.

🔒 Proposed fix to use exclusive lock
     void InvalidateKeyCache()
     {
         if (key_cache_ == nullptr)
         {
             return;
         }
         LOG(INFO) << "Invalidate key cache of range " << partition_id_
                   << " due to collision";
-        std::shared_lock<std::shared_mutex> s_lk(mux_);
+        std::unique_lock<std::shared_mutex> u_lk(mux_);
         // shared lock to avoid slice split
         for (auto &slice : slices_)
         {
             slice->SetKeyCacheValidity(false);
         }
         // Create a larger key cache if the old one cannot hold enough keys.
         size_t last_key_cache_size = key_cache_->Size();
         key_cache_ = std::make_unique<cuckoofilter::CuckooFilter<size_t, 12>>(
             last_key_cache_size * 1.2);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/range_slice.h` around lines 1051 - 1068,
InvalidateKeyCache currently holds a shared_lock on mux_ while reassigning
key_cache_, which can race with readers like ContainsKey/AddKey; change the
locking so mutation happens under exclusive ownership: keep the shared_lock to
iterate slices_ and call slice->SetKeyCacheValidity(false), then release it and
acquire a std::unique_lock<std::shared_mutex> (or upgrade to exclusive) before
reading last_key_cache_size and resetting key_cache_ (the std::unique_ptr
assignment). Ensure all accesses to key_cache_ (e.g., ContainsKey, AddKey)
require at least a shared_lock so the exclusive section is safe.
tx_service/include/cc/range_cc_map.h-775-793 (1)

775-793: ⚠️ Potential issue | 🟠 Major

Replay also needs to clear the old range's latched split flag.

The post-commit path reinitializes the old range size and then calls ResetRangeStatus(), but the replay path only reinitializes the size. If need_split was already latched before recovery, replay leaves the old partition immediately split-eligible again.

Proposed fix
             if (range_owner == this->cc_ng_id_ &&
                 static_cast<uint16_t>((old_range_id & 0x3FF) %
                                       shard_->core_cnt_) == shard_->core_id_)
             {
                 size_t old_range_size =
                     old_table_range_entry->RangeSlices()->PostCkptSize();
                 data_ccm->InitRangeSize(static_cast<uint32_t>(old_range_id),
                                         static_cast<int32_t>(old_range_size),
                                         true,
                                         true);
+                data_ccm->ResetRangeStatus(
+                    static_cast<uint32_t>(old_range_id));
             }

Based on learnings, TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) must keep need_split latched until ResetRangeStatus(partition_id) clears it.

Also applies to: 1269-1285

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/range_cc_map.h` around lines 775 - 793, Replay path
re-initializes the old range size via CcMap::InitRangeSize but fails to clear
the latched split flag (need_split), leaving the old partition immediately
split-eligible after recovery; modify the replay code that calls InitRangeSize
(the block using shard_->GetCcm(...), ccm->InitRangeSize(...)) to call
TemplateCcMap::UpdateRangeSize(partition_id, size, /*keep_latch=*/true) or
otherwise ensure UpdateRangeSize is used with the parameter that keeps the
need_split latch set until you then call CcMap::ResetRangeStatus(partition_id),
mirroring the post-commit path so ResetRangeStatus actually clears the latched
need_split; update both occurrences mentioned (around the
InitRangeSize/ResetRangeStatus block and the other similar block referenced) to
use UpdateRangeSize with the latch-preserving flag before ResetRangeStatus.
tx_service/include/cc/ccm_scanner.h-1200-1204 (1)

1200-1204: ⚠️ Potential issue | 🟠 Major

Don’t erase the owning shard code.

This refactor keeps one cache, but it also drops the actual shard_code and later reports 0 from ShardCacheSizes(). For any range that lives on a non-zero core, the scanner can no longer tell callers which shard owns the cached tuples.

Possible fix
     ScanCache *Cache(uint32_t shard_code) override
     {
-        (void) shard_code;
+        if (!has_shard_code_)
+        {
+            shard_code_ = shard_code;
+            has_shard_code_ = true;
+        }
+        else
+        {
+            assert(shard_code_ == shard_code);
+        }
         return &scan_cache_;
     }
 
     void ShardCacheSizes(std::vector<std::pair<uint32_t, size_t>>
                              *shard_code_and_sizes) const override
     {
-        shard_code_and_sizes->emplace_back(0u, scan_cache_.Size());
+        if (has_shard_code_)
+        {
+            shard_code_and_sizes->emplace_back(shard_code_, scan_cache_.Size());
+        }
     }
 
 private:
+    bool has_shard_code_{false};
+    uint32_t shard_code_{0};
     TemplateScanCache<KeyT, ValueT> scan_cache_;

Also clear the saved shard code in Reset()/Close() when the scanner is reused.

Also applies to: 1206-1210

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/ccm_scanner.h` around lines 1200 - 1204, The
Cache(uint32_t shard_code) implementation currently discards the incoming
shard_code causing ShardCacheSizes() to report 0; fix this by storing the
provided shard_code in a member (e.g., saved_shard_code_ or owned_shard_code_)
when Cache(uint32_t) is called and still return &scan_cache_; also update any
other Cache overloads (the block around the other occurrence at 1206-1210) to
set the same member; finally clear/reset that saved shard code in Reset() and
Close() so reused scanners do not retain stale ownership information.
tx_service/include/cc/ccm_scanner.h-1224-1238 (1)

1224-1238: ⚠️ Potential issue | 🟠 Major

Mark the scanner blocked when MoveNext() exhausts the cache.

After the final MoveNext(), status_ stays Open until Current() is called again. That leaves Status() and Current() out of sync and differs from HashParitionCcScanner::MoveNext().

Possible fix
     void MoveNext() override
     {
         if (status_ != ScannerStatus::Open)
         {
             return;
         }
 
         scan_cache_.MoveNext();
+        if (scan_cache_.Current() == nullptr)
+        {
+            status_ = ScannerStatus::Blocked;
+        }
     }

Also applies to: 1240-1248

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/ccm_scanner.h` around lines 1224 - 1238, The Current()
method leaves status_ as Open when the cache was exhausted by MoveNext(),
causing Status() and Current() to be out of sync; change Current() in the
scanner (and the analogous overload around the other implementation) to set
status_ = ScannerStatus::Blocked whenever scan_cache_.Current() returns nullptr
(just like HashParitionCcScanner::MoveNext() does) so callers see Blocked
immediately after the cache is exhausted; update the TemplateScanTuple<KeyT,
ValueT> pointer handling in Current() to set status_ before returning nullptr.
tx_service/include/cc/ccm_scanner.h-1185-1187 (1)

1185-1187: ⚠️ Potential issue | 🟠 Major

Don’t freeze the cache on the constructor schema.

scan_cache_ is now persistent, but Reset(const KeySchema *) only updates RangePartitionedCcmScanner::key_schema_. After reuse, DecodeKey() will parse with the new schema while scan_cache_ still deserializes tuples with the old one.

Possible fix
 template <typename KeyT, typename ValueT>
 struct TemplateScanCache : public ScanCache
 {
 public:
+    void SetKeySchema(const KeySchema *key_schema)
+    {
+        key_schema_ = key_schema;
+    }
+
 private:
     std::vector<TemplateScanTuple<KeyT, ValueT>> cache_;
-    const KeySchema *const key_schema_;
+    const KeySchema *key_schema_;
 };
 
     void Reset(const KeySchema *key_schema) override
     {
         key_schema_ = key_schema;
+        scan_cache_.Reset();
+        scan_cache_.SetKeySchema(key_schema);
+        status_ = ScannerStatus::Blocked;
         partition_ng_term_ = -1;
     }

Also applies to: 1292-1292

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/ccm_scanner.h` around lines 1185 - 1187, Ctor stores
the initial schema in scan_cache_ while Reset(const KeySchema*) only updates
RangePartitionedCcmScanner::key_schema_, which causes DecodeKey() to use the new
schema but scan_cache_ to keep deserializing with the old one; update
Reset(const KeySchema*) to also update scan_cache_'s schema (or provide a
scan_cache_.ResetSchema/SetKeySchema call) so scan_cache_ is reinitialized or
its key schema replaced whenever key_schema_ changes, and ensure any
constructors (CcScanner(...)) do not permanently bind scan_cache_ to the
original schema without a way to refresh it.
tx_service/src/tx_operation.cpp-739-740 (1)

739-740: ⚠️ Potential issue | 🟠 Major

Don’t collapse distinct forwarded partitions onto one shard-code entry.

forward_addr_ now carries partition_id because the target partition matters, but these inserts still use only ((ng << 10) | residual) as the key and try_emplace. If the old-range forward and a split-range forward alias on the same low 10 bits on the same NG (for example, partition ids 1 and 1025), the later insert is silently dropped even though it belongs to a different partition. That loses one forwarded acquire/post-write path and updates the wrong range size.

Also applies to: 771-772, 796-797

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/tx_operation.cpp` around lines 739 - 740, The current map key
built from ((new_bucket_ng << 10) | residual) used in try_emplace collapses
different forwarded partitions (forward_addr_.partition_id) that share the same
low-10 bits; change the keying so it includes the partition id as well (e.g.,
incorporate forward_addr_.partition_id into the key or switch to a composite
key/tuple) wherever that try_emplace is used (the entries around the insert
using ((new_bucket_ng << 10) | residual), and the analogous places mentioned at
the other occurrences), so that lookups/inserts distinguish by partition_id and
no forwarded entry is silently dropped or mis-attributed. Ensure the same new
key scheme is applied consistently to the inserts and subsequent lookups that
reference that map.
tx_service/src/tx_operation.cpp-5178-5184 (1)

5178-5184: ⚠️ Potential issue | 🟠 Major

Apply the same shard-move cleanup to the dirty-owner pass.

This pass now correctly kicks out subranges that stay on the same NG but move to a different core. The cleaning_old_range_dirty_owner_ branch below still only compares node-group ownership, so split+migration can leave stale entries behind when a subrange remains on the dirty owner NG but maps to a different core.

Possible follow-up
-            if (new_owner != old_range_dirty_owner &&
-                dirty_new_owner != old_range_dirty_owner)
+            uint16_t old_range_shard_id = static_cast<uint16_t>(
+                (range_info_->PartitionId() & 0x3FF) % local_shards->Count());
+            uint16_t new_range_shard_id = static_cast<uint16_t>(
+                (kickout_data_it_->second & 0x3FF) % local_shards->Count());
+            if ((new_owner != old_range_dirty_owner &&
+                 dirty_new_owner != old_range_dirty_owner) ||
+                old_range_shard_id != new_range_shard_id)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/tx_operation.cpp` around lines 5178 - 5184, The dirty-owner
cleanup branch (cleaning_old_range_dirty_owner_) currently only checks NG
ownership and can miss shard-moves; update its condition to mirror the earlier
check by computing range_shard_id and new_range_shard_id (using
range_info_->PartitionId(), kickout_data_it_->second, and local_shards->Count())
and include the shard comparison (range_shard_id != new_range_shard_id)
alongside the existing node-id checks involving new_owner, dirty_new_owner and
txm->TxCcNodeId(); effectively apply the same shard-move cleanup logic used in
the first branch so subranges that stay on the same NG but move to a different
core are also kicked out.
tx_service/src/remote/cc_stream_receiver.cpp-379-385 (1)

379-385: ⚠️ Potential issue | 🟠 Major

Use fixed-width integer type instead of size_t for wire encoding.

tuple_cnt() is serialized from a size_t and must be deserialized safely. The current code reads directly from a pointer cast (*((const size_t *) tuple_cnt_info)), which creates two problems:

  1. ABI incompatibility: size_t is 4 bytes on 32-bit systems and 8 bytes on 64-bit systems. If serialization and deserialization occur on builds with different pointer widths, the data will misalign.
  2. Alignment hazard: Protobuf string fields have no alignment guarantees, so dereferencing a cast pointer is undefined behavior.

Change both the encoder in remote_cc_request.cpp and decoders (here and in cc_request.h) to use uint64_t instead of size_t, with proper serialization via std::memcpy or protobuf's binary field methods.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_stream_receiver.cpp` around lines 379 - 385, The
current deserialization reads tuple_cnt via a pointer-cast to size_t
(tuple_cnt_info -> *((const size_t*)...)), which is unsafe across ABIs and can
violate alignment; update the wire type to a fixed-width integer (uint64_t) and
change both encoder in remote_cc_request.cpp and decoders in
cc_stream_receiver.cpp / cc_request.h to serialize/deserialize using uint64_t,
reading/writing via std::memcpy (or protobuf binary methods) into a local
uint64_t temp and then assign/convert to size_t if needed, and replace uses of
tuple_cnt_info/tuple_cnt with this safe memcpy-based uint64_t handling.
tx_service/include/cc/cc_request.h-8113-8130 (1)

8113-8130: ⚠️ Potential issue | 🟠 Major

Reset parsed_ and parse_offset_ when reusing UploadBatchSlicesCc.

This Reset() clears slice_data_ and next_idx_, but it leaves the previous parse cursor and parsed_ flag behind. A reused request can therefore skip parsing or resume in the middle of the last payload.

♻️ Suggested fix
         node_group_term_ = &ng_term;
         slice_data_.clear();
         next_idx_ = 0;
+        parse_offset_ = {0, 0, 0, 0};
+        parsed_ = false;
 
         entry_tuples_ = &entry_tuple;
         slices_info_ = slice_info;
 
         finished_ = false;
         err_code_ = CcErrorCode::NO_ERROR;

Also applies to: 8254-8268, 8285-8292

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 8113 - 8130, The Reset
method for UploadBatchSlicesCc currently clears slice_data_ and next_idx_ but
fails to reset the parsing state, so update UploadBatchSlicesCc::Reset to also
set parsed_ = false and parse_offset_ = 0 (or the appropriate initial offset)
whenever reinitializing the request; apply the same change to the other Reset
implementations referenced (the overloads around the other Reset occurrences) so
parsed_ and parse_offset_ are consistently reinitialized along with slice_data_,
next_idx_, entry_tuples_, slices_info_, finished_, and err_code_.
tx_service/include/cc/cc_request.h-2448-2450 (1)

2448-2450: ⚠️ Potential issue | 🟠 Major

Reset wait_for_snapshot_cnt_ in both Set() overloads.

blocking_info_.Reset() clears the wait type, but the new scalar counter survives object reuse. If a previous scan was aborted while waiting on snapshots, the next scan starts with a stale count and can either stall or trip the assert in DecreaseWaitForSnapshotCnt().

🔁 Suggested fix
         err_ = CcErrorCode::NO_ERROR;
         cache_hit_miss_collected_ = false;
         blocking_info_.Reset();
+        wait_for_snapshot_cnt_ = 0;

Apply the same reset in both Set() overloads.

Also applies to: 2510-2512, 2975-2975

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 2448 - 2450, The object
reuse path fails to reset the scalar counter wait_for_snapshot_cnt_, so after
blocking_info_.Reset() the leftover count can stall or trigger the assert in
DecreaseWaitForSnapshotCnt(); update both Set() overloads (the two
CcRequest::Set(...) methods referenced) to also reset wait_for_snapshot_cnt_
(e.g., assign zero) wherever blocking_info_.Reset() is called, and apply the
same fix at the other occurrences noted (around the blocks near the other Set()
overload and the third reset site referenced) to ensure the counter is cleared
on reuse.
store_handler/dynamo_handler.cpp-2537-2541 (1)

2537-2541: ⚠️ Potential issue | 🟠 Major

Complete fetch_cc on unsupported path instead of only asserting.

Line 2539–2540 can crash in debug and can leave the request unfinished in release (assert removed), which risks hangs in callers waiting for completion.

🔧 Proposed fix
 void EloqDS::DynamoHandler::FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc)
 {
     LOG(ERROR) << "DynamoHandler::FetchTableRangeSize not implemented";
-    assert(false);
+    if (fetch_cc != nullptr)
+    {
+        fetch_cc->SetFinish(
+            static_cast<int>(txservice::CcErrorCode::DATA_STORE_ERR));
+    }
+    assert(false);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/dynamo_handler.cpp` around lines 2537 - 2541, The function
EloqDS::DynamoHandler::FetchTableRangeSize currently asserts on the unsupported
path which can crash in debug or leave callers hanging in release; instead,
remove the assert and complete the request object: check fetch_cc for nullptr,
populate an appropriate error/status on the FetchTableRangeSizeCc (e.g. set an
error code/message or call its failure setter), log the error, and then call the
request completion method (e.g. fetch_cc->Complete() or the class's completion
API) so the caller is always notified of failure.
store_handler/data_store_service_client_closure.cpp-814-816 (1)

814-816: ⚠️ Potential issue | 🟠 Major

Keep range-metadata decoding backward-compatible.

These paths now require the new 28-byte table_ranges payload everywhere. Pre-upgrade rows are still 24 bytes, so debug builds trip the asserts, and FetchRangeSizeCallback can read past the buffer in release builds. Please accept both layouts and handle a missing trailing range_size explicitly.

Also applies to: 929-966, 1008-1010

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client_closure.cpp` around lines 814 - 816,
The assert in the range-metadata decoder forces a 28-byte layout and can read
past older 24-byte payloads; update the parsing in FetchRangeSizeCallback (and
the similar parsing branches around the other occurrences) to accept both sizes
by checking value.size() == 24 or value.size() == 28, parse the common fields
from the first 24 bytes, and only read/assign the trailing range_size when
value.size() == 28; if the trailing range_size is missing (24-byte pre-upgrade
row) set range_size to a safe default (e.g., 0) or mark it as absent and avoid
reading past the buffer.
store_handler/rocksdb_handler.cpp-1131-1136 (1)

1131-1136: ⚠️ Potential issue | 🟠 Major

Fail this request explicitly instead of asserting.

If FetchTableRangeSize() is hit, debug builds abort here, and release builds can leave fetch_cc unfinished because assert(false) disappears. Please complete the CC with an error until RocksDB has a real implementation.

💡 Suggested fallback
 void RocksDBHandler::FetchTableRangeSize(
     txservice::FetchTableRangeSizeCc *fetch_cc)
 {
     LOG(ERROR) << "RocksDBHandler::FetchTableRangeSize not implemented";
-    assert(false);
+    if (fetch_cc != nullptr)
+    {
+        fetch_cc->SetFinish(
+            static_cast<uint32_t>(txservice::CcErrorCode::DATA_STORE_ERR));
+    }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/rocksdb_handler.cpp` around lines 1131 - 1136, Replace the
assert with explicit failure handling in RocksDBHandler::FetchTableRangeSize:
set an error status/message on the txservice::FetchTableRangeSizeCc object
(fetch_cc) and complete the call (e.g., call its completion method such as
Done/Finish/Reply) so the RPC is returned with an error instead of aborting or
leaving the request hanging; include a clear message like "FetchTableRangeSize
not implemented" and an appropriate error code when completing fetch_cc.
tx_service/src/fault/log_replay_service.cpp-593-595 (1)

593-595: ⚠️ Potential issue | 🟠 Major

Delay split-range cleanup until the whole node-group replay is done.

split_range_info_ is scoped by cc_ng_id, but these paths erase it when a single stream errors out or finishes. Other log-group streams for the same node group can still be replaying and will lose the split timestamps they need for later ParseDataLogCc work.

Also applies to: 629-630, 710-711

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/fault/log_replay_service.cpp` around lines 593 - 595, The
current code sets recovery_error and calls CleanSplitRangeInfo(cc_ng_id) as soon
as a single stream errors/finishes, which erases split_range_info_ for that
cc_ng_id while other streams in the same node-group still need it for
ParseDataLogCc; remove those immediate CleanSplitRangeInfo(cc_ng_id) calls (the
ones adjacent to recovery_error and early returns) and instead defer cleanup
until the entire node-group replay is complete (e.g., when the last stream for
cc_ng_id has finished or in the node-group completion handler), by tracking
per-cc_ng_id active stream count or a node-group completion flag and invoking
CleanSplitRangeInfo(cc_ng_id) only when that count reaches zero or the
node-group completion path runs; ensure ParseDataLogCc callers still find
split_range_info_ available until then.
tx_service/src/cc/cc_shard.cpp-406-418 (1)

406-418: ⚠️ Potential issue | 🟠 Major

Handle a missing range entry before allocating and dereferencing.

Line 413 only asserts. In release builds, a stale or not-yet-loaded partition turns into a null dereference at Line 414, and NextRequest() has already handed out a pooled object. Do the lookup first and return cleanly when the range metadata is absent.

Suggested fix
-    FetchTableRangeSizeCc *fetch_cc = fetch_range_size_cc_pool_.NextRequest();
-
     const TableName range_table_name(table_name.StringView(),
                                      TableType::RangePartition,
                                      table_name.Engine());
     const TableRangeEntry *range_entry =
         GetTableRangeEntry(range_table_name, cc_ng_id, partition_id);
-    assert(range_entry != nullptr);
+    if (range_entry == nullptr)
+    {
+        LOG(WARNING) << "Skip FetchTableRangeSize: missing range entry for "
+                     << table_name.StringView() << ", partition "
+                     << partition_id;
+        return;
+    }
     TxKey start_key = range_entry->GetRangeInfo()->StartTxKey();
 
+    FetchTableRangeSizeCc *fetch_cc = fetch_range_size_cc_pool_.NextRequest();
     fetch_cc->Reset(
         table_name, partition_id, start_key, this, cc_ng_id, cc_ng_term);
     local_shards_.store_hd_->FetchTableRangeSize(fetch_cc);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/cc_shard.cpp` around lines 406 - 418, Move the
GetTableRangeEntry() lookup and null-check before calling
fetch_range_size_cc_pool_.NextRequest() so you don't allocate/hand out a pooled
FetchTableRangeSizeCc and then return or dereference a null range_entry;
specifically, call GetTableRangeEntry(range_table_name, cc_ng_id, partition_id),
if range_entry is nullptr return/exit cleanly, and only then call
fetch_range_size_cc_pool_.NextRequest() and use fetch_cc->Reset(...) followed by
local_shards_.store_hd_->FetchTableRangeSize(fetch_cc). If you must call
NextRequest() earlier, ensure you return the object to the pool on the error
path to avoid leaking a pooled object.
tx_service/src/cc/cc_shard.cpp-3583-3594 (1)

3583-3594: ⚠️ Potential issue | 🟠 Major

Reset against the range CCM, not the caller's table type.

range_sizes_ is only tracked on range-partition CCMs, but this helper calls GetCcm(table_name, ng_id) directly. If a base/index table name reaches here, the reset is a silent no-op and the split latch never clears.

Suggested fix
 void CcShard::ResetRangeSplittingStatus(const TableName &table_name,
                                         uint32_t ng_id,
                                         uint32_t range_id)
 {
-    CcMap *ccm = GetCcm(table_name, ng_id);
+    const TableName range_table_name(table_name.StringView(),
+                                     TableType::RangePartition,
+                                     table_name.Engine());
+    CcMap *ccm = GetCcm(range_table_name, ng_id);
     if (ccm == nullptr)
     {
         return;
     }
 
     ccm->ResetRangeStatus(range_id);
 }
Based on learnings: `TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool)` must keep the split flag latched until `ResetRangeStatus(partition_id)` clears it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/cc_shard.cpp` around lines 3583 - 3594,
ResetRangeSplittingStatus is calling GetCcm(table_name, ng_id) on the caller's
table type which can be a base/index table; change it to locate and use the
range-partition CCM (the CCM that actually tracks range_sizes_) instead of the
caller table's CCM, then call ResetRangeStatus(range_id) on that range-partition
CcMap and handle nullptr as before; also ensure
TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) keeps the split latch set
until ResetRangeStatus(partition_id) clears it.
tx_service/src/remote/remote_cc_handler.cpp-730-735 (1)

730-735: ⚠️ Potential issue | 🟠 Major

Align remote scan-resume condition with local semantics.

Line 730 uses cc_ng_term > 0, but the local range-scan path treats cc_ng_term < 0 as “first scan.” This makes cc_ng_term == 0 behave differently between local and remote resume flows.

🔧 Suggested fix
-    if (cc_ng_term > 0)
+    if (cc_ng_term >= 0)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/remote_cc_handler.cpp` around lines 730 - 735, The
remote resume branch uses the wrong condition for detecting a "first scan":
change the check on cc_ng_term so it matches local semantics (where cc_ng_term <
0 means first scan) — i.e. only set prior CCE lock when cc_ng_term >= 0; update
the if condition surrounding the call that gets ScanCache via scanner.Cache(0),
inspects ScanCache::LastTuple(), and calls scan_slice->set_prior_cce_lock(...)
so cc_ng_term == 0 is treated like other non-first scans.
tx_service/include/cc/template_cc_map.h-7338-7356 (1)

7338-7356: ⚠️ Potential issue | 🟠 Major

Use the uploaded commit timestamp as the split-task watermark.

This branch installs a version at commit_ts but seeds CreateSplitRangeDataSyncTask() with a fresh local timestamp. If local clock time is behind the uploaded commit ts, the split scan can start before the record that triggered it and miss that version. Use commit_ts here so the watermark stays in the same timestamp domain as the write path.

🛠️ Proposed fix
-                        uint64_t data_sync_ts =
-                            std::chrono::duration_cast<
-                                std::chrono::microseconds>(
-                                std::chrono::high_resolution_clock::now()
-                                    .time_since_epoch())
-                                .count();
                         shard_->CreateSplitRangeDataSyncTask(
                             table_name_,
                             cc_ng_id_,
                             req.CcNgTerm(),
                             static_cast<uint32_t>(partition_id),
-                            data_sync_ts);
+                            commit_ts);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/template_cc_map.h` around lines 7338 - 7356, The
split-range data-sync task is currently seeded with a fresh local timestamp
(data_sync_ts) causing potential ordering issues; change the watermark passed to
shard_->CreateSplitRangeDataSyncTask(...) to use the uploaded commit timestamp
(commit_ts) instead of the locally generated data_sync_ts so the split scan uses
the same timestamp domain as the write path; update the call site where
UpdateRangeSize(...) returns need_split and replace the data_sync_ts argument
with commit_ts (keeping table_name_, cc_ng_id_, req.CcNgTerm(), and
static_cast<uint32_t>(partition_id) unchanged).
store_handler/data_store_service_client.cpp-1393-1393 (1)

1393-1393: ⚠️ Potential issue | 🟠 Major

Protect range_size accumulation from signed overflow.

slice_size is uint32_t, but plan.range_size is int32_t. Large ranges can overflow to negative values and break split decisions.

Suggested fix
+#include <limits>
@@
     RangeSliceBatchPlan plan;
     plan.segment_cnt = 0;
     plan.version = version;
     plan.range_size = 0;
+    int64_t range_size_acc = 0;
@@
-        uint32_t slice_size = static_cast<uint32_t>(slices[i]->Size());
-        plan.range_size += static_cast<int32_t>(slice_size);
+        uint32_t slice_size = static_cast<uint32_t>(slices[i]->Size());
+        range_size_acc += static_cast<int64_t>(slice_size);
+        if (range_size_acc > std::numeric_limits<int32_t>::max())
+        {
+            range_size_acc = std::numeric_limits<int32_t>::max();
+        }
+        plan.range_size = static_cast<int32_t>(range_size_acc);

Based on learnings: TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) must reliably trigger and latch split state when range size crosses StoreRange::range_max_size.

Also applies to: 1442-1442

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@store_handler/data_store_service_client.cpp` at line 1393, The code sets
plan.range_size (int32_t) += slice_size (uint32_t) which can overflow for large
accumulations; change the accumulation to use a wider signed type (e.g.,
int64_t) or an unsigned 64-bit temp variable when adding slice_size, then clamp
or saturate the result back into plan.range_size safely (e.g., min(max_res,
INT32_MAX) or ensure it is at least StoreRange::range_max_size+1) so that
TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) reliably sees crosses of
StoreRange::range_max_size and latches split state without signed overflow.
Ensure the same fix is applied at the other occurrence referenced (around line
1442).
tx_service/src/remote/cc_node_service.cpp-1175-1175 (1)

1175-1175: ⚠️ Potential issue | 🟠 Major

Guard single-core UploadBatch routing behind explicit range-partition intent.

Routing switches to single-core whenever partition_id >= 0. If callers omit partition_id, protobuf defaults to 0, silently triggering single-core routing instead of the expected broadcast mode.

Suggested fix
-    size_t core_cnt = (partition_id >= 0) ? 1 : cc_shards->Count();
+    const bool single_core_route =
+        (table_type == TableType::RangePartition) && (partition_id >= 0);
+    size_t core_cnt = single_core_route ? 1 : cc_shards->Count();
@@
-    if (partition_id >= 0)
+    if (single_core_route)

Fix logging bug in UploadBatchSlices: variable err is initialized to NO_ERROR at line 1405 and logged at line 1411 before being assigned the actual error code at line 1412. This masks errors in the log output.

Suggested fix
     CcErrorCode err = CcErrorCode::NO_ERROR;
     if (req.ErrorCode() != CcErrorCode::NO_ERROR)
     {
         LOG(INFO) << "CcNodeService UploadBatchRecordCache RPC of `#ng`" << ng_id
                   << " for range#" << slices_info->range_ << ", new_range#"
-                  << slices_info->new_range_
-                  << " finished with error: " << static_cast<uint32_t>(err);
         err = req.ErrorCode();
+                  << slices_info->new_range_
+                  << " finished with error: " << static_cast<uint32_t>(err);
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/cc_node_service.cpp` at line 1175, The routing logic
currently treats partition_id >= 0 as intent for single-core routing but
protobuf defaults to 0, so change the check to require an explicit partition
intent (e.g., use request->has_partition_id() && request->partition_id() >= 0)
wherever partition_id is used for routing (reference the partition_id variable
and the UploadBatch routing code) so omission results in broadcast behavior;
also fix the logging bug in UploadBatchSlices by ensuring the err variable is
assigned the actual error code before logging (either move the
processLogger.error call to after err is set or initialize err with the call
result) and reference the err variable and UploadBatchSlices function to locate
the change.
tx_service/src/remote/remote_cc_request.cpp-1363-1365 (1)

1363-1365: ⚠️ Potential issue | 🟠 Major

Allow successful empty scan batches here.

LastKey() can legitimately be null when a slice returns zero tuples. This new assert turns those responses into debug-time crashes before slice_position_ can describe the empty/terminal batch.

🐛 Proposed fix
         const RangeScanSliceResult &slice_result = cc_res_.Value();
         output_msg_.clear_last_key();
         const TxKey *last_key = slice_result.LastKey();
-        assert(last_key != nullptr || cc_res_.IsError());
+        assert(last_key != nullptr || cc_res_.IsError() ||
+               slice_result.remote_scan_caches_->Size() == 0);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/remote/remote_cc_request.cpp` around lines 1363 - 1365, The
assert on LastKey() is too strict: LastKey() may be nullptr for successful empty
scan batches, causing debug crashes; update the check in the block where
slice_result.LastKey() is read (symbol: LastKey(), variable: slice_result) so
that nullptr is allowed when the response is successful and the batch is
terminal/empty (use slice_position_ to detect this) — i.e., replace or relax
assert(last_key != nullptr || cc_res_.IsError()) to permit (cc_res_.IsOK() &&
slice_position_ indicates terminal/empty) or remove the assert and explicitly
handle a null last_key as a valid empty/terminal batch when composing the
response. Ensure cc_res_ and slice_position_ are used to decide validity so only
real error cases still trigger failures.
tx_service/src/cc/local_cc_shards.cpp-6931-7002 (1)

6931-7002: ⚠️ Potential issue | 🟠 Major

Don't skip step 1 on same-NG cross-core handoff.

The new shard-based path can call SendRangeCacheRequest() even when new_range_owner_ == ng_id_ but the destination core changed. With this guard, that case skips UploadRangeSlices, so the receiver never gets the slice metadata or resolved ng_term before UploadBatchSlices starts. Those batch RPCs only carry slices_idxs, and the first wave still goes out with INIT_TERM.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/local_cc_shards.cpp` around lines 6931 - 7002, The check
"if (new_range_owner_ != ng_id_)" skips UploadRangeSlices when the range owner
NG matches but the destination core changed, causing the receiver to miss slice
metadata and ng_term; modify SendRangeCacheRequest so UploadRangeSlices is
invoked whenever the destination core differs (not just when new_range_owner_ !=
ng_id_) — i.e., replace or augment the new_range_owner_ vs ng_id_ guard with a
condition that compares both NG and destination core (or simply always call
remote::CcRpcService_Stub::UploadRangeSlices before UploadBatchSlices), ensure
you still handle cntl.Failed() and resp.error_code() the same way, and update
ng_term from resp.ng_term() so UploadBatchSlices does not proceed with INIT_TERM
(refer to new_range_owner_, ng_id_, ng_term, UploadRangeSlices,
UploadBatchSlices, and SendRangeCacheRequest).
tx_service/include/cc/cc_req_misc.h-479-489 (1)

479-489: ⚠️ Potential issue | 🟠 Major

Initialize next_idx_ defensively to avoid undefined reads.

Line 538 declares size_t next_idx_; without initialization, while Line 479-Line 483 read it. Any path that reads before Reset/SetNextIndex executes is undefined behavior.

Minimal safe initialization
-    size_t next_idx_;
+    size_t next_idx_{0};

Also applies to: 538-538

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_req_misc.h` around lines 479 - 489, The field
next_idx_ is used by NextIndex() before it is guaranteed to be set, so
initialize it defensively to avoid UB; update the declaration of next_idx_
(and/or the class constructor/Reset method) to initialize next_idx_ to a safe
default (e.g., 0) and ensure Reset() sets next_idx_ back to that same default;
references: next_idx_, NextIndex(), SetNextIndex(), and Reset() to locate where
to apply the initialization.
tx_service/include/cc/cc_req_misc.h-1157-1169 (1)

1157-1169: ⚠️ Potential issue | 🟠 Major

Change FetchTableRangeSizeCc to own the table_name_ value instead of storing a pointer.

Reset(const TableName &table_name, ...) combined with const TableName *table_name_; risks dangling pointers in async execution. Since Execute() dereferences table_name_ asynchronously (line 1523), the stored pointer can outlive its referent. Other similar code in the file (lines 938–939, 1055–1056) correctly copies the TableName object instead. Change table_name_ to store by value:

Fix
-    const TableName *table_name_;
+    TableName table_name_{std::string(""), TableType::Primary, TableEngine::None};

And in Reset (cc_req_misc.cpp:1499):

-    table_name_ = &table_name;
+    table_name_ = TableName(table_name.StringView(), table_name.Type(), table_name.Engine());
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_req_misc.h` around lines 1157 - 1169, The field
table_name_ currently stores a pointer which can dangle during async execution
(see Reset(...) and Execute()), so change table_name_ from const TableName* to a
TableName (or const TableName) member to own the value; update Reset(const
TableName &table_name, ...) to copy into table_name_ (e.g., table_name_ =
table_name) and update all uses that dereference the pointer (in Execute,
ValidTermCheck, SetFinish and any other accesses) to use the value member
directly; ensure construction/assignment and any headers/initializers reflect
the new type.

Comment on lines +1085 to +1118
void RecoveryService::SetSplitRangeInfo(uint32_t ng_id,
TableName table_name,
int32_t range_id,
uint64_t commit_ts)
{
auto ng_it = split_range_info_.try_emplace(ng_id).first;
auto &table_map = ng_it->second;
auto table_it =
table_map
.try_emplace(table_name, std::unordered_map<int32_t, uint64_t>{})
.first;
auto &range_map = table_it->second;
auto [it, inserted] = range_map.try_emplace(range_id, commit_ts);
if (!inserted)
{
it->second = commit_ts;
}
}

const std::unordered_map<TableName, std::unordered_map<int32_t, uint64_t>> *
RecoveryService::GetSplitRangeInfo(uint32_t ng_id) const
{
auto ng_it = split_range_info_.find(ng_id);
if (ng_it == split_range_info_.end())
{
return nullptr;
}
return &ng_it->second;
}

void RecoveryService::CleanSplitRangeInfo(uint32_t ng_id)
{
split_range_info_.erase(ng_id);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Synchronize access to split_range_info_.

on_received_messages() can run concurrently for multiple replay streams, but these helpers call try_emplace, find, and erase on the shared std::unordered_map with no lock. That makes the replay path racy and can corrupt the container or hand downstream code inconsistent split metadata.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/fault/log_replay_service.cpp` around lines 1085 - 1118, Access
to the shared split_range_info_ must be synchronized: add a mutex (e.g.,
split_range_info_mutex_) and take a std::lock_guard (or std::unique_lock) around
all manipulations in SetSplitRangeInfo and CleanSplitRangeInfo to protect
try_emplace/erase, and protect lookup in GetSplitRangeInfo as well; also do not
return a raw pointer into the map while unlocked—change GetSplitRangeInfo to
return a safe copy or an optional copy of the table map (e.g.,
std::optional<std::unordered_map<TableName, std::unordered_map<int32_t,
uint64_t>>>) so callers do not access internal state without holding the lock;
update callers (including on_received_messages()) accordingly to use the copied
data or new API.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
tx_service/src/cc/cc_req_misc.cpp (1)

1515-1540: ⚠️ Potential issue | 🔴 Critical

Return immediately on term mismatch before touching CCM/range-size state.

On Line 1517-1520, term failure sets error_code_ but execution continues into GetCcm/InitRangeSize (Line 1523-1530), and may still reach split-task scheduling (Line 1532-1539). This can mutate stale CCM state after leadership change. Also, Line 1526’s assert(error_code_ != 0) is brittle for async CCM disappearance and can crash debug builds.

🐛 Proposed fix
 bool FetchTableRangeSizeCc::Execute(CcShard &ccs)
 {
     if (!ValidTermCheck())
     {
         error_code_ = static_cast<uint32_t>(CcErrorCode::NG_TERM_CHANGED);
+        return true;
     }
 
     bool succ = (error_code_ == 0);
     CcMap *ccm = ccs.GetCcm(*table_name_, node_group_id_);
     if (ccm == nullptr)
     {
-        assert(error_code_ != 0);
         return true;
     }
     bool need_split = ccm->InitRangeSize(
         static_cast<uint32_t>(partition_id_), store_range_size_, succ);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/src/cc/cc_req_misc.cpp` around lines 1515 - 1540, In
FetchTableRangeSizeCc::Execute, stop further processing when ValidTermCheck()
fails by returning immediately after setting error_code_ so you do not call
GetCcm/InitRangeSize or schedule CreateSplitRangeDataSyncTask with a stale term;
remove the brittle assert(error_code_ != 0) and ensure any path that calls
CcShard::GetCcm, CcMap::InitRangeSize, or CcShard::CreateSplitRangeDataSyncTask
only runs when the term is valid (i.e., after confirming error_code_ == 0 or
succ == true).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tx_service/src/cc/local_cc_shards.cpp`:
- Around line 2960-2964: Get rid of the sole assert and add a runtime null guard
for range_entry returned by GetTableRangeEntryInternal: check if range_entry is
nullptr before calling EnqueueRangeDataSyncTask, log or handle the error (e.g.,
processLogger/error return) and return early to avoid dereferencing a null
pointer; this change touches the range lookup around GetTableRangeEntryInternal
and the subsequent call to EnqueueRangeDataSyncTask where range_entry is used.

---

Duplicate comments:
In `@tx_service/src/cc/cc_req_misc.cpp`:
- Around line 1515-1540: In FetchTableRangeSizeCc::Execute, stop further
processing when ValidTermCheck() fails by returning immediately after setting
error_code_ so you do not call GetCcm/InitRangeSize or schedule
CreateSplitRangeDataSyncTask with a stale term; remove the brittle
assert(error_code_ != 0) and ensure any path that calls CcShard::GetCcm,
CcMap::InitRangeSize, or CcShard::CreateSplitRangeDataSyncTask only runs when
the term is valid (i.e., after confirming error_code_ == 0 or succ == true).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2ebca5ec-6f9d-4de6-85f5-544b21ac003a

📥 Commits

Reviewing files that changed from the base of the PR and between 5eb0b7a and 39b6d07.

📒 Files selected for processing (2)
  • tx_service/src/cc/cc_req_misc.cpp
  • tx_service/src/cc/local_cc_shards.cpp

@yi-xmu yi-xmu merged commit a64a31d into main Mar 16, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants