Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion store_handler/data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ bool DataStoreServiceClient::PutAll(
DLOG(INFO) << "DataStoreServiceClient::PutAll called with "
<< flush_task.size() << " tables to flush.";
uint64_t now = txservice::LocalCcShards::ClockTsInMillseconds();

// Create global coordinator
SyncPutAllData *sync_putall = sync_putall_data_pool_.NextObject();
PoolableGuard sync_putall_guard(sync_putall);
Expand Down Expand Up @@ -478,6 +477,7 @@ bool DataStoreServiceClient::PutAll(
callback_data->Clear();
callback_data->Free();
}

return false;
}
}
Expand All @@ -493,6 +493,7 @@ bool DataStoreServiceClient::PutAll(
metrics::kv_meter->Collect(
metrics::NAME_KV_FLUSH_ROWS_TOTAL, records_count, "base");
}

return true;
}

Expand Down
39 changes: 27 additions & 12 deletions store_handler/eloq_data_store_service/eloq_store_data_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ inline void BuildKey(const WriteRecordsRequest &write_req,
uint16_t key_parts,
std::string &key_out)
{
size_t total_size = 0;
for (uint16_t i = 0; i < key_parts; ++i)
{
total_size += write_req.GetKeyPart(key_first_idx + i).size();
}
key_out.reserve(key_out.size() + total_size);
size_t part_idx = key_first_idx;
for (uint16_t i = 0; i < key_parts; ++i, ++part_idx)
{
Expand All @@ -73,6 +79,12 @@ inline void BuildValue(const WriteRecordsRequest &write_req,
uint16_t rec_parts,
std::string &rec_out)
{
size_t total_size = 0;
for (uint16_t i = 0; i < rec_parts; ++i)
{
total_size += write_req.GetRecordPart(rec_first_idx + i).size();
}
rec_out.reserve(rec_out.size() + total_size);
size_t part_idx = rec_first_idx;
for (uint16_t i = 0; i < rec_parts; ++i, ++part_idx)
{
Expand Down Expand Up @@ -215,26 +227,28 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req)

::eloqstore::BatchWriteRequest &kv_write_req = write_op->EloqStoreRequest();

std::vector<::eloqstore::WriteDataEntry> entries;
size_t rec_cnt = write_req->RecordsCount();
entries.reserve(rec_cnt);
const size_t rec_cnt = write_req->RecordsCount();
const uint16_t parts_per_key = write_req->PartsCountPerKey();
const uint16_t parts_per_record = write_req->PartsCountPerRecord();
size_t first_idx = 0;

std::vector<::eloqstore::WriteDataEntry> entries(rec_cnt);
size_t key_offset = 0;
size_t val_offset = 0;

for (size_t i = 0; i < rec_cnt; ++i)
{
::eloqstore::WriteDataEntry entry;
first_idx = i * parts_per_key;
BuildKey(*write_req, first_idx, parts_per_key, entry.key_);
first_idx = i * parts_per_record;
BuildValue(*write_req, first_idx, parts_per_record, entry.val_);
::eloqstore::WriteDataEntry &entry = entries[i];
BuildKey(*write_req, key_offset, parts_per_key, entry.key_);
BuildValue(*write_req, val_offset, parts_per_record, entry.val_);
key_offset += parts_per_key;
val_offset += parts_per_record;

entry.timestamp_ = write_req->GetRecordTs(i);
entry.op_ = (write_req->KeyOpType(i) == WriteOpType::PUT
? ::eloqstore::WriteOp::Upsert
: ::eloqstore::WriteOp::Delete);
uint64_t ttl = write_req->GetRecordTtl(i);
entry.expire_ts_ = ttl == UINT64_MAX ? 0 : ttl;
entries.emplace_back(std::move(entry));
const uint64_t ttl = write_req->GetRecordTtl(i);
entry.expire_ts_ = (ttl == UINT64_MAX) ? 0u : ttl;
}

if (!std::ranges::is_sorted(
Expand All @@ -251,6 +265,7 @@ void EloqStoreDataStore::BatchWriteRecords(WriteRecordsRequest *write_req)
kv_write_req.SetArgs(eloq_store_table_id, std::move(entries));

uint64_t user_data = reinterpret_cast<uint64_t>(write_op);

if (!eloq_store_service_->ExecAsyn(&kv_write_req, user_data, OnBatchWrite))
{
LOG(ERROR) << "Send write request to EloqStore failed for table: "
Expand Down
52 changes: 10 additions & 42 deletions tx_service/include/cc/cc_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -8991,20 +8991,14 @@ struct ScanSliceDeltaSizeCcForRangePartition : public CcRequestBase

struct ScanDeltaSizeCcForHashPartition : public CcRequestBase
{
static constexpr size_t ScanBatchSize = 128;

ScanDeltaSizeCcForHashPartition(const TableName &table_name,
uint64_t last_datasync_ts,
uint64_t scan_ts,
uint64_t ng_id,
int64_t ng_term,
uint64_t txn,
uint64_t schema_version)
: table_name_(table_name),
node_group_id_(ng_id),
node_group_term_(ng_term),
last_datasync_ts_(last_datasync_ts),
scan_ts_(scan_ts),
schema_version_(schema_version)
{
tx_number_ = txn;
Expand Down Expand Up @@ -9096,44 +9090,27 @@ struct ScanDeltaSizeCcForHashPartition : public CcRequestBase
return node_group_term_;
}

uint64_t LastDataSyncTs() const
{
return last_datasync_ts_;
}

uint64_t ScanTs() const
{
return scan_ts_;
}

void AbortCcRequest(CcErrorCode err_code) override
{
assert(err_code != CcErrorCode::NO_ERROR);
SetError(err_code);
}

TxKey &PausedKey()
void SetKeyCounts(const size_t data_key_count, const size_t dirty_key_count)
{
return pause_key_;
}

void UpdateKeyCount(const bool need_ckpt)
{
++scanned_key_count_;
if (need_ckpt)
{
++updated_key_count_;
}
data_key_count_ = data_key_count;
dirty_key_count_ = dirty_key_count;
}

size_t UpdatedMemory() const
{
const uint64_t scanned = scanned_key_count_;
if (scanned == 0)
if (data_key_count_ == 0)
return 0;
// Use cc map's data_key_count_ and dirty_key_count_ for estimation.
// integer math with rounding up to avoid systematic underestimation
return static_cast<size_t>(
(updated_key_count_ * memory_usage_ + scanned - 1) / scanned);
(dirty_key_count_ * memory_usage_ + data_key_count_ - 1) /
data_key_count_);
Comment on lines 9105 to +9113
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard UpdatedMemory() against counter drift and arithmetic overflow.

Line 9112 can overflow in dirty_key_count_ * memory_usage_, and if dirty_key_count_ temporarily exceeds data_key_count_, the estimate can exceed total memory and distort flush decisions. Clamp dirty count to total count and compute with overflow-safe arithmetic.

🔧 Suggested fix
 size_t UpdatedMemory() const
 {
     if (data_key_count_ == 0)
         return 0;
-    // Use cc map's data_key_count_ and dirty_key_count_ for estimation.
-    // integer math with rounding up to avoid systematic underestimation
-    return static_cast<size_t>(
-        (dirty_key_count_ * memory_usage_ + data_key_count_ - 1) /
-        data_key_count_);
+    // Use cc map's data_key_count_ and dirty_key_count_ for estimation.
+    // Clamp to avoid over-estimation when counters are temporarily inconsistent.
+    const size_t effective_dirty_key_count =
+        std::min(dirty_key_count_, data_key_count_);
+    // Integer math with rounding up; split multiply/divide to reduce overflow risk.
+    const uint64_t base = memory_usage_ / data_key_count_;
+    const uint64_t rem = memory_usage_ % data_key_count_;
+    return static_cast<size_t>(
+        base * effective_dirty_key_count +
+        (rem * effective_dirty_key_count + data_key_count_ - 1) / data_key_count_);
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tx_service/include/cc/cc_request.h` around lines 9105 - 9113, The
UpdatedMemory() method can overflow when computing dirty_key_count_ *
memory_usage_ and can return values > total memory if dirty_key_count_
temporarily exceeds data_key_count_; update UpdatedMemory() to first clamp
dirty_key_count_ to at most data_key_count_, perform the arithmetic in a wider
unsigned integer type (e.g., uint64_t or size_t_t widened type) to avoid
multiplication overflow, compute the rounded-up division safely, and finally
clamp the result to not exceed memory_usage_ (or the total memory represented by
memory_usage_) before casting back to size_t; reference UpdatedMemory(),
dirty_key_count_, data_key_count_, and memory_usage_ to locate and change the
logic.

}

void SetMemoryUsage(const uint64_t memory)
Expand All @@ -9150,20 +9127,11 @@ struct ScanDeltaSizeCcForHashPartition : public CcRequestBase
const TableName &table_name_;
uint32_t node_group_id_;
int64_t node_group_term_;
// It is used as a hint to decide if a page has dirty data since last round
// of checkpoint. It is guaranteed that all entries committed before this ts
// are synced into data store.
uint64_t last_datasync_ts_;
// Target ts. Collect all data changes committed before this ts into data
// sync vec.
uint64_t scan_ts_;
// Position that we left off during last round of scan.
TxKey pause_key_;
uint64_t schema_version_;

// Number of keys scanned / updated, and per-core memory usage.
uint64_t scanned_key_count_{0};
uint64_t updated_key_count_{0};
// From cc map: total data keys and dirty keys needing checkpoint.
size_t data_key_count_{0};
size_t dirty_key_count_{0};
uint64_t memory_usage_{0};

CcErrorCode err_{CcErrorCode::NO_ERROR};
Expand Down
34 changes: 23 additions & 11 deletions tx_service/include/cc/local_cc_shards.h
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ class LocalCcShards
range_entry->UpdateRangeEntry(version, std::move(range_slices));
}

mi_heap_set_default(prev_heap);
if (is_override_thd)
{
mi_override_thread(prev_thd);
Expand All @@ -1025,7 +1026,6 @@ class LocalCcShards
{
mi_restore_default_thread_id();
}
mi_heap_set_default(prev_heap);

#if defined(WITH_JEMALLOC)
JemallocArenaSwitcher::SwitchToArena(prev_arena_id);
Expand Down Expand Up @@ -2452,6 +2452,13 @@ class LocalCcShards
uint32_t range_cnt,
int32_t &out_next_partition_id);

/**
* @brief Map a data_sync_worker index to the fixed flush_data_worker index.
* Used when data_sync_worker count != flush_data_worker count so that each
* data_sync_worker consistently targets one flush_data_worker.
*/
size_t DataSyncWorkerToFlushDataWorker(size_t data_sync_worker_id) const;

Comment on lines +2455 to +2461
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the .cpp file and locate the implementation of DataSyncWorkerToFlushDataWorker
fd -e cpp -path "*/tx_service/*" local_cc_shards | head -20

Repository: eloqdata/tx_service

Length of output: 231


🏁 Script executed:

# Search for DataSyncWorkerToFlushDataWorker implementation
rg -n "DataSyncWorkerToFlushDataWorker" --type cpp -A 10

Repository: eloqdata/tx_service

Length of output: 3867


🏁 Script executed:

# Search for flush_data_worker_ctx_ initialization and worker_num_
rg -n "flush_data_worker_ctx_" --type cpp -A 3 -B 1

Repository: eloqdata/tx_service

Length of output: 8710


🏁 Script executed:

# Search for worker_num_ validation/checks
rg -n "worker_num_" --type cpp --type h -A 2 -B 1

Repository: eloqdata/tx_service

Length of output: 9840


Validate flush_data_worker_ctx_.worker_num_ is never zero to prevent divide-by-zero errors.

The modulo operation in DataSyncWorkerToFlushDataWorker (line 5809) will crash if flush_data_worker_ctx_.worker_num_ is 0. In non-EXT_TX_PROC_ENABLED builds, the initialization at line 141 uses std::min(conf.at("core_num"), 10), which could be 0 if core_num is misconfigured. A divide-by-zero would occur even earlier in StartBackgroundWorkers (line 290) when calculating buffer size, but the root issue is the lack of validation at initialization. Add an assert or check to ensure worker_num_ >= 1 immediately after initialization in the constructor initializer list.

🤖 Prompt for AI Agents
In `@tx_service/include/cc/local_cc_shards.h` around lines 2455 - 2461, Ensure
flush_data_worker_ctx_.worker_num_ is validated right after it is initialized in
the constructor for the class that defines DataSyncWorkerToFlushDataWorker; add
an assert or runtime check (and fallback/throw) so worker_num_ >= 1 (e.g.,
assert(worker_num_ >= 1) or if (worker_num_ == 0) set to 1 or throw
std::runtime_error). This prevents divide-by-zero in
DataSyncWorkerToFlushDataWorker (which uses flush_data_worker_ctx_.worker_num_)
and in StartBackgroundWorkers buffer-size calculations; locate the constructor
that sets flush_data_worker_ctx_.worker_num_ and add the validation there.

/**
* @brief Add a flush task entry to the flush task. If the there's no
* pending flush task, create a new flush task and add the entry to it.
Expand All @@ -2463,16 +2470,21 @@ class LocalCcShards

WorkerThreadContext flush_data_worker_ctx_;

// The flush task that has not reached the max pending flush size.
// New flush task entry will be added to this buffer. This task will
// be appended to pending_flush_work_ when it reaches the max pending flush
// size, which will then be processed by flush data worker.
FlushDataTask cur_flush_buffer_;
// Flush task queue for flush data worker to process.
std::deque<std::unique_ptr<FlushDataTask>> pending_flush_work_;

void FlushDataWorker();
void FlushData(std::unique_lock<std::mutex> &flush_worker_lk);
// Per-worker flush buffers. Each DataSyncWorker has its own buffer.
// New flush task entry will be added to the corresponding buffer. This task
// will be appended to pending_flush_work_[worker_idx] when it reaches the
// max pending flush size, which will then be processed by the corresponding
// flush data worker. Store as pointers because FlushDataTask contains a
// bthread::Mutex and is non-movable/non-copyable, which cannot be stored
// directly in a vector that may reallocate.
std::vector<std::unique_ptr<FlushDataTask>> cur_flush_buffers_;
// Per-worker flush task queues. Each FlushDataWorker processes its
// corresponding queue.
std::vector<std::deque<std::unique_ptr<FlushDataTask>>> pending_flush_work_;

void FlushDataWorker(size_t worker_idx);
void FlushData(std::unique_lock<std::mutex> &flush_worker_lk,
size_t worker_idx);

// Memory controller for data sync.
DataSyncMemoryController data_sync_mem_controller_;
Expand Down
Loading