Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,17 @@ migrate-batch-size-kb 16
# Default: 16M
migrate-batch-rate-limit-mb 16


# If it is set to yes, kvrocks will skip the deallocation of block cache
# while closing the database to speed up the shutdown
#
# Default: no
# skip-block-cache-deallocation-on-close no

# The parallelism of slot migration passing SST files
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "passing SST files" but the actual implementation sends snapshots by raw key-value pairs, not SST files. This is misleading. The comment should accurately describe that this setting controls the parallelism of sending snapshot data during slot migration.

Suggested change
# The parallelism of slot migration passing SST files
# The parallelism of sending snapshot data (raw key-value pairs) during slot migration

Copilot uses AI. Check for mistakes.
#
# Default: the number of Kvrocks node cores
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is incomplete - it should be "Default: 0 (which uses the number of Kvrocks node cores)" to clarify that 0 is a valid value that triggers automatic detection, not just stating what the automatic value would be.

Suggested change
# Default: the number of Kvrocks node cores
# Default: 0 (which uses the number of Kvrocks node cores)

Copilot uses AI. Check for mistakes.
# migrate-slots-send-snapshots-parallelism

################################ ROCKSDB #####################################

# Specify the capacity of column family block cache. A larger block cache
Expand Down
16 changes: 3 additions & 13 deletions src/cluster/batch_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ Status BatchSender::Send() {
}

// rate limit
if (bytes_per_sec_ > 0) {
auto single_burst = rate_limiter_->GetSingleBurstBytes();
if (global_rate_limiter_) {
auto single_burst = global_rate_limiter_->GetSingleBurstBytes();
auto left = static_cast<int64_t>(write_batch_.GetDataSize());
while (left > 0) {
auto request_size = std::min(left, single_burst);
rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr);
global_rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr);
left -= request_size;
}
}
Expand Down Expand Up @@ -109,16 +109,6 @@ Status BatchSender::sendApplyBatchCmd(int fd, const rocksdb::WriteBatch &write_b
return Status::OK();
}

void BatchSender::SetBytesPerSecond(size_t bytes_per_sec) {
if (bytes_per_sec_ == bytes_per_sec) {
return;
}
bytes_per_sec_ = bytes_per_sec;
if (bytes_per_sec > 0) {
rate_limiter_->SetBytesPerSecond(static_cast<int64_t>(bytes_per_sec));
}
}

double BatchSender::GetRate(uint64_t since) const {
auto t = util::GetTimeStampMS();
if (t <= since) {
Expand Down
12 changes: 3 additions & 9 deletions src/cluster/batch_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@
class BatchSender {
public:
BatchSender() = default;
BatchSender(int fd, size_t max_bytes, size_t bytes_per_sec)
: dst_fd_(fd),
max_bytes_(max_bytes),
bytes_per_sec_(bytes_per_sec),
rate_limiter_(std::unique_ptr<rocksdb::RateLimiter>(
rocksdb::NewGenericRateLimiter(static_cast<int64_t>(bytes_per_sec_)))) {}
BatchSender(int fd, size_t max_bytes, std::shared_ptr<rocksdb::RateLimiter> global_rate_limiter)
: dst_fd_(fd), max_bytes_(max_bytes), global_rate_limiter_(std::move(global_rate_limiter)) {}

~BatchSender() = default;

Expand All @@ -50,7 +46,6 @@ class BatchSender {
uint64_t GetSentBytes() const { return sent_bytes_; }
uint32_t GetSentBatchesNum() const { return sent_batches_num_; }
uint32_t GetEntriesNum() const { return entries_num_; }
void SetBytesPerSecond(size_t bytes_per_sec);
double GetRate(uint64_t since) const;

private:
Expand All @@ -66,6 +61,5 @@ class BatchSender {
int dst_fd_;
size_t max_bytes_;

size_t bytes_per_sec_ = 0; // 0 means no limit
std::unique_ptr<rocksdb::RateLimiter> rate_limiter_;
std::shared_ptr<rocksdb::RateLimiter> global_rate_limiter_;
};
85 changes: 70 additions & 15 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

#include "slot_migrate.h"

#include <future>
#include <memory>
#include <utility>

#include "arpa/inet.h"
#include "db_util.h"
#include "event_util.h"
#include "fmt/format.h"
#include "io_util.h"
#include "netinet/tcp.h"
#include "storage/batch_extractor.h"
#include "storage/iterator.h"
#include "storage/redis_metadata.h"
Expand All @@ -52,7 +55,8 @@ SlotMigrator::SlotMigrator(Server *srv)
max_pipeline_size_(srv->GetConfig()->pipeline_size),
seq_gap_limit_(srv->GetConfig()->sequence_gap),
migrate_batch_bytes_per_sec_(srv->GetConfig()->migrate_batch_rate_limit_mb * MiB),
migrate_batch_size_bytes_(srv->GetConfig()->migrate_batch_size_kb * KiB) {
migrate_batch_size_bytes_(srv->GetConfig()->migrate_batch_size_kb * KiB),
migrate_slots_send_snapshots_parallelism_(srv->GetConfig()->migrate_slots_send_snapshots_parallelism) {
// Let metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer,
// because metadata_cf_handle_ and db_ will be destroyed if DB is reopened.
// [Situation]:
Expand All @@ -69,6 +73,7 @@ SlotMigrator::SlotMigrator(Server *srv)
// [Note]:
// This problem may exist in all functions of Database called in slot migration process.
metadata_cf_handle_ = nullptr;
global_rate_limiter_.reset(rocksdb::NewGenericRateLimiter(static_cast<int64_t>(migrate_batch_bytes_per_sec_)));
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global_rate_limiter_ is shared across multiple threads without any synchronization mechanism. While RocksDB's RateLimiter is designed to be thread-safe, the shared_ptr itself needs to be properly initialized before being accessed by multiple threads. Currently, it's initialized in the constructor but then accessed in parallel threads in migrateSlotRange. Ensure that the rate limiter is fully initialized before any parallel migration begins.

Copilot uses AI. Check for mistakes.

if (srv->IsSlave()) {
SetStopMigrationFlag(true);
Expand Down Expand Up @@ -1251,7 +1256,6 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) {
Status SlotMigrator::sendMigrationBatch(BatchSender *batch) {
// user may dynamically change some configs, apply it when send data
batch->SetMaxBytes(migrate_batch_size_bytes_);
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removal of the SetBytesPerSecond call means that dynamic changes to migrate_batch_bytes_per_sec_ during migration will not be applied to the global rate limiter. Since the rate limiter is now created once in the constructor and shared globally, updates to the config value won't take effect until the SlotMigrator is recreated. Consider whether dynamic rate limit updates should still be supported and if so, add thread-safe updating of the global rate limiter.

Suggested change
batch->SetMaxBytes(migrate_batch_size_bytes_);
batch->SetMaxBytes(migrate_batch_size_bytes_);
batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_);

Copilot uses AI. Check for mistakes.
batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_);
return batch->Send();
}

Expand All @@ -1260,8 +1264,48 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto slot_range = slot_range_.load();
info("[migrate] Migrating snapshot of slot(s) {} by raw key value", slot_range.String());

auto prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
auto upper_bound = ComposeSlotKeyUpperBound(namespace_, slot_range.end);
int total_slots = slot_range.end - slot_range.start + 1;
int parallelism = std::min(migrate_slots_send_snapshots_parallelism_, total_slots);
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If parallelism equals 0 (when migrate_slots_send_snapshots_parallelism_ is not properly initialized), std::min will return 0, and no threads will be created. This would result in silent failure to migrate any data. Add explicit validation to ensure parallelism is at least 1 before proceeding with the migration.

Suggested change
int parallelism = std::min(migrate_slots_send_snapshots_parallelism_, total_slots);
int parallelism = std::min(migrate_slots_send_snapshots_parallelism_, total_slots);
if (parallelism < 1) {
parallelism = 1;
}

Copilot uses AI. Check for mistakes.
int slots_per_thread = total_slots / parallelism;
int remain_slots = total_slots % parallelism;

std::vector<std::future<Status>> results;
int cur_start = slot_range.start;
for (int i = 0; i < parallelism; i++) {
int count = slots_per_thread + (i < remain_slots ? 1 : 0);
int cur_end = cur_start + count - 1;

results.emplace_back(std::async(std::launch::async, [=]() -> Status {
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda captures variables by value (using [=]), including the loop variable 'i'. However, 'i' is only used in the error message on line 1281, which makes it valuable. The other captured variables (cur_start, cur_end) are correctly captured by value since they change in each iteration. This is correct, but consider being explicit about what's captured for better code clarity.

Copilot uses AI. Check for mistakes.
int fd = createConnectToDstNode();
if (fd < 0) {
return {Status::NotOK, fmt::format("failed to connect the destination node in thread[{}]", i)};
}
Comment on lines +1278 to +1282
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each parallel thread creates its own connection to the destination node via createConnectToDstNode(), but there's no mechanism to ensure these connections don't overwhelm the destination node. Consider adding configuration or documentation about the impact of parallel connections, or implementing connection pooling/throttling to prevent resource exhaustion on the destination.

Copilot uses AI. Check for mistakes.
auto s = migrateSlotRange(cur_start, cur_end, fd);
close(fd);
return s;
}));

cur_start = cur_end + 1;
}

// Wait til finish
for (auto &result : results) {
auto s = result.get();
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("[migrate] Parallel migrate get result error: {}", s.Msg())};
}
}

Comment on lines +1292 to +1298
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When any thread fails during parallel migration, the function returns immediately without waiting for other threads to complete. This could leave running threads that continue to execute and potentially access shared resources after the migration has been marked as failed. Consider implementing proper cleanup or cancellation of remaining threads when one fails, or at minimum wait for all threads to finish before returning the error.

Suggested change
for (auto &result : results) {
auto s = result.get();
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("[migrate] Parallel migrate get result error: {}", s.Msg())};
}
}
Status first_error;
bool has_error = false;
for (auto &result : results) {
auto s = result.get();
if (!s.IsOK() && !has_error) {
first_error = s;
has_error = true;
}
}
if (has_error) {
return {Status::NotOK,
fmt::format("[migrate] Parallel migrate get result error: {}", first_error.Msg())};
}

Copilot uses AI. Check for mistakes.
auto elapsed = util::GetTimeStampMS() - start_ts;
info("[migrate] Parallel snapshot migrate succeeded, slot(s) {}, elapsed: {} ms", slot_range.String(), elapsed);

return Status::OK();
}

Status SlotMigrator::migrateSlotRange(int start_slot, int end_slot, int fd) {
SlotRange sub{start_slot, end_slot};
auto prefix = ComposeSlotKeyPrefix(namespace_, start_slot);
auto upper_bound = ComposeSlotKeyUpperBound(namespace_, end_slot);
Comment on lines +1307 to +1308
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace_ member variable is accessed by multiple threads in parallel without synchronization. While it's likely set before migration begins and not modified during migration, this should be verified. Consider documenting thread-safety assumptions for member variables accessed in parallel contexts.

Copilot uses AI. Check for mistakes.

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
read_options.snapshot = slot_snapshot_;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The slot_snapshot_ member variable is accessed by multiple threads without synchronization. While RocksDB snapshots are immutable and thread-safe to read from, the pointer itself should be properly synchronized or documented as being set before parallel access begins. Verify that slot_snapshot_ is fully initialized and won't change during the parallel migration phase.

Copilot uses AI. Check for mistakes.
Expand All @@ -1272,12 +1316,11 @@ Status SlotMigrator::sendSnapshotByRawKV() {
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
engine::DBIterator iter(no_txn_ctx, read_options);

BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);
BatchSender batch_sender(fd, migrate_batch_size_bytes_, global_rate_limiter_);

for (iter.Seek(prefix); iter.Valid(); iter.Next()) {
// Iteration is out of range
auto key_slot_id = ExtractSlotId(iter.Key());
if (!slot_range.Contains(key_slot_id)) {
if (!sub.Contains(key_slot_id)) {
break;
}

Expand Down Expand Up @@ -1325,20 +1368,32 @@ Status SlotMigrator::sendSnapshotByRawKV() {

GET_OR_RET(sendMigrationBatch(&batch_sender));

auto elapsed = util::GetTimeStampMS() - start_ts;
info(
"[migrate] Succeed to migrate snapshot range, slot(s): {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, "
"batches: {}, entries: {}",
slot_range.String(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts),
batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum());

return Status::OK();
Comment on lines 1369 to 1371
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logging information about migration progress (bytes sent, rate, batches, entries) has been removed from the individual thread migrations. This makes debugging and monitoring parallel migrations more difficult, as there's no per-thread visibility. Consider adding aggregate logging or at least debug-level logs for each thread's progress to aid troubleshooting.

Copilot uses AI. Check for mistakes.
}

Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dst_ip_ and dst_port_ member variables are accessed by multiple threads without synchronization. While these are set before the parallel migration begins and are not modified during migration, they should be documented as thread-safe or protected. Consider making them const or adding documentation that they must not be modified during parallel operations.

Suggested change
// NOTE: dst_ip_ and dst_port_ are configured before any parallel migration begins
// and are not modified during migration. They must not be mutated while parallel
// operations are in progress, so concurrent reads from multiple threads here are
// considered thread-safe by design.

Copilot uses AI. Check for mistakes.
int SlotMigrator::createConnectToDstNode() {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
error("failed to connect to the node error: {}", fd.Msg());
return -1;
}

std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
error("failed to authenticate on destination node error: {}", s.Msg());
return -1;
Comment on lines +1384 to +1387
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling returns -1 but doesn't close the file descriptor that was successfully created by SockConnect. When authentication fails, the established connection is leaked. The file descriptor from SockConnect should be closed before returning on authentication failure.

Copilot uses AI. Check for mistakes.
}
}
return *fd;
Comment on lines +1374 to +1390
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function signature returns an int that can be negative to indicate failure, but the return type should more clearly express this. Consider using a StatusOr or Result type pattern instead of returning a raw int where negative values mean error. This would make the API more consistent with the rest of the codebase which uses Status objects.

Suggested change
int SlotMigrator::createConnectToDstNode() {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
error("failed to connect to the node error: {}", fd.Msg());
return -1;
}
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
error("failed to authenticate on destination node error: {}", s.Msg());
return -1;
}
}
return *fd;
Status SlotMigrator::createConnectToDstNode(int *out_fd) {
// Connect to the destination node
auto fd = util::SockConnect(dst_ip_, dst_port_);
if (!fd.IsOK()) {
auto msg = fmt::format("failed to connect to the node error: {}", fd.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
std::string pass = srv_->GetConfig()->requirepass;
if (!pass.empty()) {
auto s = authOnDstNode(*fd, pass);
if (!s.IsOK()) {
auto msg = fmt::format("failed to authenticate on destination node error: {}", s.Msg());
error("{}", msg);
return {Status::NotOK, msg};
}
}
*out_fd = *fd;
return Status::OK();

Copilot uses AI. Check for mistakes.
}

Status SlotMigrator::syncWALByRawKV() {
uint64_t start_ts = util::GetTimeStampMS();
info("[migrate] Syncing WAL of slot(s) {} by raw key value", slot_range_.load().String());
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);
BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, global_rate_limiter_);

int epoch = 1;
uint64_t wal_incremental_seq = 0;
Expand Down
9 changes: 9 additions & 0 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#pragma once

#include <rocksdb/db.h>
#include <rocksdb/rate_limiter.h>
#include <rocksdb/status.h>
#include <rocksdb/transaction_log.h>
#include <rocksdb/write_batch.h>
Expand Down Expand Up @@ -99,6 +100,9 @@ class SlotMigrator : public redis::Database {
void SetSequenceGapLimit(int value) {
if (value > 0) seq_gap_limit_ = value;
}
void SetMigrateSlotsSendSnapshotsParallelism(int value) {
if (value > 0) migrate_slots_send_snapshots_parallelism_ = value;
}
Comment on lines +103 to +105
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation only checks if the value is greater than 0, but the configuration allows 0 as a valid value (minimum is 0 in IntField). This inconsistency means if a user explicitly sets the value to 0, it will be accepted in configuration but ignored here. Consider aligning the validation with the configuration constraints or handling 0 explicitly as a special case in the setter.

Copilot uses AI. Check for mistakes.
void SetMigrateBatchRateLimit(size_t bytes_per_sec) { migrate_batch_bytes_per_sec_ = bytes_per_sec; }
void SetMigrateBatchSize(size_t size) { migrate_batch_size_bytes_ = size; }
void SetStopMigrationFlag(bool value) { stop_migration_ = value; }
Expand Down Expand Up @@ -148,6 +152,8 @@ class SlotMigrator : public redis::Database {

Status sendMigrationBatch(BatchSender *batch);
Status sendSnapshotByRawKV();
Status migrateSlotRange(int start_slot, int end_slot, int fd);
int createConnectToDstNode();
Status syncWALByRawKV();
bool catchUpIncrementalWAL();
Status migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender *batch_sender);
Expand All @@ -173,6 +179,9 @@ class SlotMigrator : public redis::Database {
uint64_t seq_gap_limit_ = kDefaultSequenceGapLimit;
std::atomic<size_t> migrate_batch_bytes_per_sec_ = 1 * GiB;
std::atomic<size_t> migrate_batch_size_bytes_;
int migrate_slots_send_snapshots_parallelism_ = 0;
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The member variable is initialized to 0, which would be treated as an invalid value by SetMigrateSlotsSendSnapshotsParallelism. However, the configuration callback sets it to hardware_concurrency() when 0 is detected. This creates a potential race condition or inconsistent state if the setter is called before the configuration is fully initialized. Consider initializing this to a sensible default value (e.g., 1 or std::thread::hardware_concurrency()) in the constructor instead of relying on the callback.

Suggested change
int migrate_slots_send_snapshots_parallelism_ = 0;
int migrate_slots_send_snapshots_parallelism_ =
std::thread::hardware_concurrency() == 0
? 1
: static_cast<int>(std::thread::hardware_concurrency());

Copilot uses AI. Check for mistakes.

std::shared_ptr<rocksdb::RateLimiter> global_rate_limiter_;

SlotMigrationStage current_stage_ = SlotMigrationStage::kNone;
ParserState parser_state_ = ParserState::ArrayLen;
Expand Down
12 changes: 12 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ Config::Config() {
new EnumField<MigrationType>(&migrate_type, migration_types, MigrationType::kRawKeyValue)},
{"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)},
{"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 1, INT_MAX)},
{"migrate-slots-send-snapshots-parallelism", false,
new IntField(&migrate_slots_send_snapshots_parallelism, 0, 0, INT_MAX)},
{"unixsocket", true, new StringField(&unixsocket, "")},
{"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)},
{"log-retention-days", true, new IntField(&log_retention_days, -1, -1, INT_MAX)},
Expand Down Expand Up @@ -610,6 +612,16 @@ void Config::initFieldCallback() {
srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB);
return Status::OK();
}},
{"migrate-slots-send-snapshots-parallelism",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (migrate_slots_send_snapshots_parallelism == 0) {
unsigned int max_parallelism = std::thread::hardware_concurrency();
migrate_slots_send_snapshots_parallelism = static_cast<int>(max_parallelism);
}
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(migrate_slots_send_snapshots_parallelism);
Comment on lines +617 to +622
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback modifies the configuration value (migrate_slots_send_snapshots_parallelism) directly when it's set to 0, but this modified value won't be persisted to the configuration file. This could lead to inconsistency between the runtime value and the saved configuration. Consider whether the default should be set during initialization instead, or if the behavior should be documented more clearly.

Suggested change
if (migrate_slots_send_snapshots_parallelism == 0) {
unsigned int max_parallelism = std::thread::hardware_concurrency();
migrate_slots_send_snapshots_parallelism = static_cast<int>(max_parallelism);
}
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(migrate_slots_send_snapshots_parallelism);
int effective_parallelism = migrate_slots_send_snapshots_parallelism;
if (effective_parallelism == 0) {
unsigned int max_parallelism = std::thread::hardware_concurrency();
effective_parallelism = static_cast<int>(max_parallelism);
}
if (!srv) return Status::OK();
srv->slot_migrator->SetMigrateSlotsSendSnapshotsParallelism(effective_parallelism);

Copilot uses AI. Check for mistakes.
return Status::OK();
}},
{"log-level",
[this](Server *srv, [[maybe_unused]] const std::string &k, [[maybe_unused]] const std::string &v) -> Status {
if (!srv) return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ struct Config {
MigrationType migrate_type;
int migrate_batch_size_kb;
int migrate_batch_rate_limit_mb;
int migrate_slots_send_snapshots_parallelism;

bool redis_cursor_compatible = false;
bool resp3_enabled = false;
Expand Down