Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tx_service/include/cc/cc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -6373,6 +6373,11 @@ struct KickoutCcEntryCc : public TemplatedCcRequest<KickoutCcEntryCc, Void>
return clean_type_;
}

int32_t GetPartitionId() const
{
return range_id_;
}

private:
CleanType clean_type_;
// Target buckets to be cleaned if clean type is CleanBucketData.
Expand Down
23 changes: 20 additions & 3 deletions tx_service/include/cc/template_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -6899,10 +6899,11 @@ class TemplateCcMap : public CcMap
uint16_t pause_idx = shard_->core_id_;
CleanType clean_type = req.GetCleanType();
if (clean_type == CleanType::CleanBucketData ||
clean_type == CleanType::CleanRangeData)
clean_type == CleanType::CleanRangeData ||
clean_type == CleanType::CleanRangeDataForMigration)
{
// For clean bucket data and range data, cc req is only sent to 1
// core.
// For clean bucket data and range data (for data migration), cc req
// is only sent to 1 core.
pause_idx = 0;
}
if (req.ResumeKey(pause_idx)->KeyPtr() != nullptr)
Expand Down Expand Up @@ -6982,6 +6983,14 @@ class TemplateCcMap : public CcMap

if (ccp == &pos_inf_page_ || !(ccp->FirstKey() < *end_key))
{
if (req.GetCleanType() == CleanType::CleanRangeDataForMigration)
{
// For data migration, we need to delete the range size info
// for the range that has been migrated to other node group.
int32_t partition_id = req.GetPartitionId();
assert(partition_id > 0 && partition_id != INT32_MAX);
RemoveRangeSize(static_cast<uint32_t>(partition_id));
}
Comment on lines +6986 to +6993
Copy link

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect KickoutCcEntryCc partition-id type/default/sentinels.
rg -n -C4 '\bclass\s+KickoutCcEntryCc\b|\brange_id_\b|GetPartitionId\s*\(|SetPartitionId\s*\(' tx_service/include/cc/cc_request.h

# Inspect where migration-clean requests set/read the partition id.
rg -n -C3 'CleanRangeDataForMigration|SetPartitionId\s*\(|GetPartitionId\s*\(' tx_service -g '!**/build/**'

# Confirm all RemoveRangeSize callsites and casts.
rg -n -C3 'RemoveRangeSize\s*\(' tx_service/include/cc/template_cc_map.h tx_service -g '!**/build/**'

Repository: eloqdata/tx_service

Length of output: 14477


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where range_id is set for CleanRangeDataForMigration requests
rg -n -B5 -A5 'clean_type.*CleanRangeDataForMigration|CleanRangeDataForMigration.*range_id' tx_service/src -g '!**/build/**'

# Check the Reset method that sets range_id for KickoutCcEntryCc
rg -n -B3 -A10 'void Reset.*range_id|range_id_\s*=\s*range_id' tx_service/include/cc/cc_request.h | head -60

# Check RemoveRangeSize implementation to confirm it uses uint32_t as key
rg -n -A15 'void RemoveRangeSize\(uint32_t' tx_service/include/cc/template_cc_map.h

# Check if there are other casts from int32_t to uint32_t in similar cleanup paths
rg -n 'static_cast<uint32_t>' tx_service/include/cc/template_cc_map.h | grep -i 'partition\|range'

Repository: eloqdata/tx_service

Length of output: 5027


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find where CleanRangeDataForMigration requests are created (from tx_operation.cpp hint)
rg -n -B10 -A10 'CleanRangeDataForMigration.*=' tx_service/src/tx_operation.cpp | head -80

# Look for where range_id is populated for migration requests
rg -n -B5 'range_id.*=' tx_service/src/tx_operation.cpp | grep -A5 -B5 'kickout\|migration'

# Check function signature for CreateKickoutCcEntry / similar to see range_id source
rg -n 'CreateKickoutCcEntry|PrepareCcRequest' tx_service/src -g '!**/build/**' | head -20

Repository: eloqdata/tx_service

Length of output: 45


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Look at tx_operation.cpp around the migration cleanup code
sed -n '7880,7950p' tx_service/src/tx_operation.cpp

# Search for where range_id is passed to Reset() in migration context
rg -n 'Reset.*range_id|range_id.*Reset' tx_service/src/tx_operation.cpp

# Look for migration-related functions that might create requests
rg -n 'MigrateRange|migration.*range|range.*migration' tx_service/src/tx_operation.cpp | head -20

Repository: eloqdata/tx_service

Length of output: 3167


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find declaration of ranges_in_bucket_snapshot_
rg -n 'ranges_in_bucket_snapshot_' tx_service/src/tx_operation.cpp | head -10

# Look at the type and how it's populated
rg -n -B3 -A3 'ranges_in_bucket_snapshot_.*=' tx_service/src/tx_operation.cpp | head -40

# Check GetTableRangeKeys to understand what range IDs are returned
rg -n 'GetTableRangeKeys' tx_service/include -g '!**/build/**' | head -5

Repository: eloqdata/tx_service

Length of output: 705


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find the declaration of ranges_in_bucket_snapshot_ in class definition
rg -n 'ranges_in_bucket_snapshot_' tx_service/src/tx_operation.h tx_service/include -g '!**/build/**'

# Look at the try_emplace context to see what's being inserted
sed -n '7605,7620p' tx_service/src/tx_operation.cpp

# Look at what populates the container values
sed -n '7620,7640p' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 224


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get declaration context around line 1504
sed -n '1495,1515p' tx_service/include/tx_operation.h

# Look at where ranges_in_bucket_snapshot_ is populated with range IDs
sed -n '7605,7635p' tx_service/src/tx_operation.cpp

Repository: eloqdata/tx_service

Length of output: 2167


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check if there are existing checks for negative partition/range IDs in similar code paths
rg -n 'partition.*>= 0|range_id.*>= 0|if.*>=.*0.*partition|if.*>=.*0.*range' tx_service/include/cc -g '!**/build/**'

# Check all places where partition_id is cast to uint32_t to see if any validate
rg -n -B2 'static_cast<uint32_t>\(.*partition' tx_service/include/cc/template_cc_map.h

# Look at UpdateRangeSize calls to see if they have bounds checking
rg -n -B5 'UpdateRangeSize\(static_cast<uint32_t>' tx_service/include/cc/template_cc_map.h | head -40

Repository: eloqdata/tx_service

Length of output: 2146


Add runtime bounds check before casting partition_id to uint32_t.

At lines 6990–6992, the code directly casts GetPartitionId() (int32_t) to uint32_t without checking for negative values. The assertion at line 6991 only guards against INT32_MAX and is disabled in release builds. If a negative value reaches this path (through upstream corruption or a bug), it will wrap to a large positive value when cast, erasing the wrong entry from range_sizes_ and leaving stale range-size metadata.

This pattern already exists defensively in the codebase (e.g., local_cc_shards.h:1192, cc_request.h:7836). Add a runtime check consistent with established practice:

Suggested fix
             if (req.GetCleanType() == CleanType::CleanRangeDataForMigration)
             {
                 // For data migration, we need to delete the range size info
                 // for the range that has been migrated to other node group.
                 int32_t partition_id = req.GetPartitionId();
-                assert(partition_id != INT32_MAX);
-                RemoveRangeSize(static_cast<uint32_t>(partition_id));
+                if (partition_id >= 0 && partition_id != INT32_MAX)
+                {
+                    RemoveRangeSize(static_cast<uint32_t>(partition_id));
+                }
+                else
+                {
+                    LOG(ERROR) << "Invalid partition_id in migration cleanup: "
+                               << partition_id;
+                }
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (req.GetCleanType() == CleanType::CleanRangeDataForMigration)
{
// For data migration, we need to delete the range size info
// for the range that has been migrated to other node group.
int32_t partition_id = req.GetPartitionId();
assert(partition_id != INT32_MAX);
RemoveRangeSize(static_cast<uint32_t>(partition_id));
}
if (req.GetCleanType() == CleanType::CleanRangeDataForMigration)
{
// For data migration, we need to delete the range size info
// for the range that has been migrated to other node group.
int32_t partition_id = req.GetPartitionId();
if (partition_id >= 0 && partition_id != INT32_MAX)
{
RemoveRangeSize(static_cast<uint32_t>(partition_id));
}
else
{
LOG(ERROR) << "Invalid partition_id in migration cleanup: "
<< partition_id;
}
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/template_cc_map.h` around lines 6986 - 6993, The code
casts req.GetPartitionId() (int32_t) to uint32_t and calls RemoveRangeSize when
CleanType::CleanRangeDataForMigration, but only asserts partition_id !=
INT32_MAX which is disabled in release builds; add a runtime bounds check to
ensure partition_id is non-negative (and not INT32_MAX if needed) before the
static_cast and RemoveRangeSize call to prevent negative values wrapping to huge
uint32_t values. Locate the block handling CleanType::CleanRangeDataForMigration
around req.GetPartitionId(), validate partition_id >= 0 (and still guard against
INT32_MAX if required), and only call
RemoveRangeSize(static_cast<uint32_t>(partition_id)) when the check passes; if
the check fails, log or handle the error consistently with existing patterns in
local_cc_shards.h and cc_request.h.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller will guarantee that this value is non-negative.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yi-xmu Thanks for the clarification. If the caller guarantees partition_id is non-negative, I’m good to drop the runtime check.

Optional tiny follow-up for maintainability: tighten the existing debug assertion to reflect the invariant and leave a brief comment. For example:

// Caller guarantees non-negative partition_id for migration cleanups.
assert(partition_id >= 0 && partition_id != INT32_MAX);
RemoveRangeSize(static_cast<uint32_t>(partition_id));

If you prefer, I can prep a small patch to do just this (assert + comment) and nothing else.


✏️ Learnings added
Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 462
File: tx_service/include/cc/template_cc_map.h:6986-6993
Timestamp: 2026-03-18T04:01:54.203Z
Learning: In eloqdata/tx_service, during CleanRangeDataForMigration handling in TemplateCcMap::Execute(KickoutCcEntryCc &req) (tx_service/include/cc/template_cc_map.h), the caller guarantees KickoutCcEntryCc::GetPartitionId() returns a non-negative partition_id; negative IDs will not reach RemoveRangeSize().

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 438
File: tx_service/include/cc/template_cc_map.h:11969-11981
Timestamp: 2026-03-12T05:24:16.445Z
Learning: In eloqdata/tx_service, TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) in tx_service/include/cc/template_cc_map.h must return true only once when the loaded, non-dirty range size first crosses StoreRange::range_max_size; it must latch the internal need_split flag as true until ResetRangeStatus(partition_id) clears it, and must not overwrite the latched flag back to false on later calls.

Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 454
File: tx_service/include/cc/object_cc_map.h:1574-1575
Timestamp: 2026-03-16T02:32:29.159Z
Learning: In `tx_service/include/cc/object_cc_map.h`, the `flags_str` field unpacked from entry tuples in `Execute(UploadBatchCc &req)` is intentionally unused. `ObjectCcMap` is non-range-partitioned (`TemplateCcMap<KeyT, ValueT, false, false>`), so `range_size_flags` carried in `flags_str` have no meaning in this context. The pause/resume logic does not need to track a flags offset for this class.

Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 454
File: tx_service/include/data_sync_task.h:185-189
Timestamp: 2026-03-16T02:36:20.407Z
Learning: In eloqdata/tx_service, within a `DataSyncTask` (tx_service/include/data_sync_task.h), the member `id_` represents the partition ID of the current partition for range-partitioned tables (and core_idx for hash-partitioned tables). It is the same identifier used as the key in `range_sizes_` by `UpdateRangeSize()` and `ResetRangeStatus()`. Do not confuse `id_` with a separate "range_id" — in the range partition context, `id_` IS the partition_id.

Comment on lines +6986 to +6993
Copy link

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Allow partition 0 in the migration cleanup assertion.

At Line 6991, assert(partition_id > 0 && partition_id != INT32_MAX); is stricter than the established invariant and can trip debug builds for a valid partition_id == 0.

Suggested fix
-                assert(partition_id > 0 && partition_id != INT32_MAX);
+                assert(partition_id >= 0 && partition_id != INT32_MAX);
                 RemoveRangeSize(static_cast<uint32_t>(partition_id));

Based on learnings: during CleanRangeDataForMigration, the caller guarantees KickoutCcEntryCc::GetPartitionId() is non-negative.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/template_cc_map.h` around lines 6986 - 6993, The
assertion in the CleanRangeDataForMigration branch is too strict and disallows a
valid partition_id == 0; update the check where you read partition_id from
req.GetPartitionId() (inside the CleanType::CleanRangeDataForMigration block) to
allow zero by asserting partition_id >= 0 && partition_id != INT32_MAX, then
continue to call RemoveRangeSize(static_cast<uint32_t>(partition_id)) as before;
ensure you only change the assert condition (not the cast or RemoveRangeSize
call) so negative values and INT32_MAX remain guarded.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yi-xmu Got it—leaving the assert as-is. If the invariant ever changes and a debug hit shows partition_id can be 0 during migration cleanup, we can relax it to >= 0 at that time.


🧠 Learnings used
Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 462
File: tx_service/include/cc/template_cc_map.h:6986-6993
Timestamp: 2026-03-18T04:01:54.203Z
Learning: In eloqdata/tx_service, during CleanRangeDataForMigration handling in TemplateCcMap::Execute(KickoutCcEntryCc &req) (tx_service/include/cc/template_cc_map.h), the caller guarantees KickoutCcEntryCc::GetPartitionId() returns a non-negative partition_id; negative IDs will not reach RemoveRangeSize().

Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 438
File: tx_service/include/cc/template_cc_map.h:11969-11981
Timestamp: 2026-03-12T05:24:16.445Z
Learning: In eloqdata/tx_service, TemplateCcMap::UpdateRangeSize(uint32_t,int32_t,bool) in tx_service/include/cc/template_cc_map.h must return true only once when the loaded, non-dirty range size first crosses StoreRange::range_max_size; it must latch the internal need_split flag as true until ResetRangeStatus(partition_id) clears it, and must not overwrite the latched flag back to false on later calls.

Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Learnt from: yi-xmu
Repo: eloqdata/tx_service PR: 454
File: tx_service/include/cc/object_cc_map.h:1574-1575
Timestamp: 2026-03-16T02:32:29.159Z
Learning: In `tx_service/include/cc/object_cc_map.h`, the `flags_str` field unpacked from entry tuples in `Execute(UploadBatchCc &req)` is intentionally unused. `ObjectCcMap` is non-range-partitioned (`TemplateCcMap<KeyT, ValueT, false, false>`), so `range_size_flags` carried in `flags_str` have no meaning in this context. The pause/resume logic does not need to track a flags offset for this class.

Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Learnt from: lokax
Repo: eloqdata/tx_service PR: 149
File: src/remote/cc_stream_receiver.cpp:1066-1075
Timestamp: 2025-10-21T06:46:53.700Z
Learning: In src/remote/cc_stream_receiver.cpp, for ScanNextRequest handling, BucketIds() on RemoteScanNextBatch should never be empty—this is an expected invariant of the scan protocol.

Learnt from: lokax
Repo: eloqdata/tx_service PR: 254
File: tx_service/src/cc/local_cc_shards.cpp:2949-3188
Timestamp: 2025-12-02T10:43:27.431Z
Learning: In tx_service/src/cc/local_cc_shards.cpp, whenever TryPinNodeGroupData is used, only call Sharder::Instance().UnpinNodeGroupData(node_group) if the recorded term is >= 0 (i.e., pin succeeded). Example: LocalCcShards::PostProcessFlushTaskEntries guards the unpin with `if (term >= 0)`.

return req.SetFinish();
}
else
Expand Down Expand Up @@ -11551,6 +11560,14 @@ class TemplateCcMap : public CcMap
return false;
}

void RemoveRangeSize(uint32_t partition_id)
{
if constexpr (RangePartitioned)
{
range_sizes_.erase(partition_id);
}
}

absl::btree_map<
KeyT,
std::unique_ptr<
Expand Down
1 change: 1 addition & 0 deletions tx_service/include/read_write_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ struct WriteSetEntry
// Used in double write scenarios during online DDL.
// key shard code -> (partition id, cce addr)
std::unordered_map<uint32_t, std::pair<int32_t, CcEntryAddr>> forward_addr_;
// True if the keys is located in a splitting/migrating range.
bool on_dirty_range_{false};
};

Expand Down
13 changes: 8 additions & 5 deletions tx_service/src/cc/local_cc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1901,10 +1901,12 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name,
KickoutCcEntryCc *req = kickout_ccentry_pool_.NextRequest();
// For hash partition, all data in a single bucket should be hashed to
// the same core.
uint16_t core_cnt = (clean_type == CleanType::CleanBucketData ||
clean_type == CleanType::CleanRangeData)
? 1
: Sharder::Instance().GetLocalCcShardsCount();
uint16_t core_cnt =
(clean_type == CleanType::CleanBucketData ||
clean_type == CleanType::CleanRangeData ||
clean_type == CleanType::CleanRangeDataForMigration)
? 1
: Sharder::Instance().GetLocalCcShardsCount();
req->Reset(table_name,
ng_id,
&hres,
Expand All @@ -1929,7 +1931,8 @@ void txservice::LocalCcHandler::KickoutData(const TableName &table_name,
Sharder::Instance().ShardBucketIdToCoreIdx((*bucket_id)[0]),
req);
}
else if (clean_type == CleanType::CleanRangeData)
else if (clean_type == CleanType::CleanRangeData ||
clean_type == CleanType::CleanRangeDataForMigration)
{
assert(range_id != INT32_MAX);
uint16_t dest_core = static_cast<uint16_t>(
Expand Down
182 changes: 106 additions & 76 deletions tx_service/src/tx_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -723,21 +723,25 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm)
auto *range_info = txm->range_rec_.GetRangeInfo();
int32_t range_id = range_info->PartitionId();
uint32_t residual = static_cast<uint32_t>(range_id & 0x3FF);
bool on_dirty_range = range_info->IsDirty();
bool range_splitting = range_info->IsDirty();
while (write_key_it_ != next_range_start)
{
const TxKey &write_tx_key = write_key_it_->first;
WriteSetEntry &write_entry = write_key_it_->second;
write_entry.key_shard_code_ = (range_ng << 10) | residual;
write_entry.partition_id_ = range_id;
write_entry.on_dirty_range_ = on_dirty_range;
write_entry.on_dirty_range_ = range_splitting;

uint32_t new_bucket_forward_key = UINT32_MAX;
// If current range is migrating, forward to new range owner.
if (new_bucket_ng != UINT32_MAX)
{
assert(new_bucket_ng != range_ng);
new_bucket_forward_key = (new_bucket_ng << 10) | residual;
write_entry.forward_addr_.try_emplace(
((new_bucket_ng << 10) | residual),
new_bucket_forward_key,
std::make_pair(range_id, CcEntryAddr()));
write_entry.on_dirty_range_ = true;
}

// If range is splitting and the key will fall on a new range after
Expand Down Expand Up @@ -765,36 +769,76 @@ void LockWriteRangeBucketsOp::Advance(TransactionExecution *txm)
static_cast<uint16_t>(new_residual % core_cnt);
uint16_t range_shard =
static_cast<uint16_t>(residual % core_cnt);
if (new_range_ng != range_ng || new_range_shard != range_shard)
if ((new_range_ng != range_ng &&
(new_bucket_ng == UINT32_MAX ||
new_range_ng != new_bucket_ng)) ||
new_range_shard != range_shard)
{
write_entry.forward_addr_.try_emplace(
((new_range_ng << 10) | new_residual),
std::make_pair(new_range_id, CcEntryAddr()));
// There is no need to update the range size of the old
// range.
write_entry.partition_id_ = -1;
if (new_bucket_forward_key != UINT32_MAX)
{
auto fwd_it = write_entry.forward_addr_.find(
new_bucket_forward_key);
assert(fwd_it != write_entry.forward_addr_.end());
fwd_it->second.first = -1;
}
}
else if (new_range_ng == range_ng &&
new_range_shard == range_shard)
else if (new_range_ng == range_ng)
{
assert(new_range_shard == range_shard);
// Only update the range size on the new range id in case of
// the new range and the old range are located on the same
// shard.
write_entry.partition_id_ = new_range_id;
}
else if (new_bucket_ng != UINT32_MAX &&
new_range_ng == new_bucket_ng)
{
assert(new_range_shard == range_shard);
assert(new_bucket_forward_key != UINT32_MAX);
auto fwd_it =
write_entry.forward_addr_.find(new_bucket_forward_key);
assert(fwd_it != write_entry.forward_addr_.end());
fwd_it->second.first = new_range_id;
}

// If the new range is migrating, forward to the new owner of
// new range.
// TODO(ysw): double check the logic here.
if (new_range_new_bucket_ng != UINT32_MAX)
{
assert(new_range_new_bucket_ng != new_range_ng);
if (new_range_new_bucket_ng != range_ng ||
if ((new_range_new_bucket_ng != range_ng &&
(new_bucket_ng == UINT32_MAX ||
new_range_new_bucket_ng != new_bucket_ng)) ||
new_range_shard != range_shard)
{
write_entry.forward_addr_.try_emplace(
((new_range_new_bucket_ng << 10) | new_residual),
std::make_pair(new_range_id, CcEntryAddr()));
write_entry.on_dirty_range_ = true;
}
else if (new_range_new_bucket_ng == range_ng)
{
assert(new_range_shard == range_shard);
// Only update the range size on the new range id in
// case of the new range and the old range are located
// on the same shard.
write_entry.partition_id_ = new_range_id;
}
else if (new_bucket_ng != UINT32_MAX &&
new_range_new_bucket_ng == new_bucket_ng)
{
assert(new_range_shard == range_shard);
assert(new_bucket_forward_key != UINT32_MAX);
auto fwd_it = write_entry.forward_addr_.find(
new_bucket_forward_key);
assert(fwd_it != write_entry.forward_addr_.end());
fwd_it->second.first = new_range_id;
}
}
}
Expand Down Expand Up @@ -7847,20 +7891,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm)

// Table name is ranges_in_bucket_snapshot_ is of type range
// partition, need to convert it first.
TableType type;
if (TableName::IsBase(kickout_range_tbl_it_->first.StringView()))
{
type = TableType::Primary;
}
else if (TableName::IsUniqueSecondary(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::UniqueSecondary;
}
else
{
type = TableType::Secondary;
}
TableType type =
TableName::Type(kickout_range_tbl_it_->first.StringView());
kickout_range_table_ =
TableName{kickout_range_tbl_it_->first.StringView(),
type,
Expand Down Expand Up @@ -7892,21 +7924,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm)
break;
}

TableType type;
if (TableName::IsBase(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::Primary;
}
else if (TableName::IsUniqueSecondary(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::UniqueSecondary;
}
else
{
type = TableType::Secondary;
}
TableType type = TableName::Type(
kickout_range_tbl_it_->first.StringView());
kickout_range_table_ =
TableName{kickout_range_tbl_it_->first.StringView(),
type,
Expand Down Expand Up @@ -7962,9 +7981,8 @@ void DataMigrationOp::Forward(TransactionExecution *txm)

if (kickout_data_op_.hd_result_.IsError())
{
LOG(ERROR) << "Data migration: fail to kickout range data"
<< ", table name "
<< kickout_range_tbl_it_->first.StringView()
LOG(ERROR) << "Data migration: fail to kickout data, table name: "
<< kickout_data_op_.table_name_->StringView()
<< ", tx_number:" << txm->TxNumber()
<< ", keep retrying";
RetrySubOperation(txm, &kickout_data_op_);
Expand All @@ -7978,28 +7996,31 @@ void DataMigrationOp::Forward(TransactionExecution *txm)
if (++kickout_range_tbl_it_ ==
ranges_in_bucket_snapshot_.cend())
{
LOG(INFO) << "Data migration: post write all"
<< ", txn: " << txm->TxNumber();
post_all_bucket_lock_op_.write_type_ =
PostWriteType::PostCommit;
ForwardToSubOperation(txm, &post_all_bucket_lock_op_);
// Try handle hash partitioned tables
if (kickout_hash_partitioned_tbl_it_ ==
hash_partitioned_tables_snapshot_.cend())
{
LOG(INFO) << "Data migration: post write all"
<< ", txn: " << txm->TxNumber();
post_all_bucket_lock_op_.write_type_ =
PostWriteType::PostCommit;
ForwardToSubOperation(txm, &post_all_bucket_lock_op_);
return;
}
kickout_data_op_.node_group_ = txm->TxCcNodeId();
kickout_data_op_.table_name_ =
&(*kickout_hash_partitioned_tbl_it_);
kickout_data_op_.start_key_ = TxKey();
kickout_data_op_.end_key_ = TxKey();
kickout_data_op_.bucket_ids_ =
&status_->bucket_ids_[migrate_bucket_idx_];
// Check if the key is hashed to this bucket
kickout_data_op_.clean_type_ = CleanType::CleanBucketData;
ForwardToSubOperation(txm, &kickout_data_op_);
return;
}
TableType type;
if (TableName::IsBase(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::Primary;
}
else if (TableName::IsUniqueSecondary(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::UniqueSecondary;
}
else
{
type = TableType::Secondary;
}
TableType type =
TableName::Type(kickout_range_tbl_it_->first.StringView());
kickout_range_table_ =
TableName{kickout_range_tbl_it_->first.StringView(),
type,
Expand Down Expand Up @@ -8029,23 +8050,32 @@ void DataMigrationOp::Forward(TransactionExecution *txm)
ranges_in_bucket_snapshot_.cend())
{
// Move to hash partitioned tables
break;
}
TableType type;
if (TableName::IsBase(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::Primary;
}
else if (TableName::IsUniqueSecondary(
kickout_range_tbl_it_->first.StringView()))
{
type = TableType::UniqueSecondary;
}
else
{
type = TableType::Secondary;
if (kickout_hash_partitioned_tbl_it_ ==
hash_partitioned_tables_snapshot_.cend())
{
LOG(INFO) << "Data migration: post write all"
<< ", txn: " << txm->TxNumber();
post_all_bucket_lock_op_.write_type_ =
PostWriteType::PostCommit;
ForwardToSubOperation(txm,
&post_all_bucket_lock_op_);
return;
}
kickout_data_op_.node_group_ = txm->TxCcNodeId();
kickout_data_op_.table_name_ =
&(*kickout_hash_partitioned_tbl_it_);
kickout_data_op_.start_key_ = TxKey();
kickout_data_op_.end_key_ = TxKey();
kickout_data_op_.bucket_ids_ =
&status_->bucket_ids_[migrate_bucket_idx_];
// Check if the key is hashed to this bucket
kickout_data_op_.clean_type_ =
CleanType::CleanBucketData;
ForwardToSubOperation(txm, &kickout_data_op_);
return;
}
TableType type = TableName::Type(
kickout_range_tbl_it_->first.StringView());
kickout_range_table_ =
TableName{kickout_range_tbl_it_->first.StringView(),
type,
Expand Down
Loading