From cea8902e18fbe6672461f437728d6272a399946b Mon Sep 17 00:00:00 2001 From: lzxddz Date: Fri, 10 Oct 2025 16:54:16 +0800 Subject: [PATCH] Don't save deleted records into datastore if mvcc not enabled --- data_store_service_client.cpp | 18 +++++++++++------- .../rocksdb_data_store_common.cpp | 4 ++-- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/data_store_service_client.cpp b/data_store_service_client.cpp index eab681d..c3a1426 100644 --- a/data_store_service_client.cpp +++ b/data_store_service_client.cpp @@ -206,7 +206,6 @@ bool DataStoreServiceClient::PutAll( std::unordered_map>> hash_partitions_map; std::unordered_map> range_partitions_map; - std::unordered_map partition_record_cnt; size_t flush_task_entry_idx = 0; for (auto &entry : entries) @@ -232,9 +231,6 @@ bool DataStoreServiceClient::PutAll( } it->second.emplace_back( std::make_pair(flush_task_entry_idx, i)); - - partition_record_cnt.try_emplace(kv_partition_id, 0); - partition_record_cnt[kv_partition_id]++; } } else @@ -246,8 +242,6 @@ bool DataStoreServiceClient::PutAll( auto [it, inserted] = range_partitions_map.try_emplace(parition_id); it->second.emplace_back(flush_task_entry_idx); - partition_record_cnt.try_emplace(parition_id, 0); - partition_record_cnt[parition_id] += batch.size(); } flush_task_entry_idx++; } @@ -4313,6 +4307,9 @@ void DataStoreServiceClient::PrepareRangePartitionBatches( size_t write_batch_size = 0; PartitionBatchRequest batch_request; + bool enabled_mvcc = + txservice::Sharder::Instance().GetLocalCcShards()->EnableMvcc(); + auto PrepareRecordData = [&](txservice::FlushRecord &ckpt_rec, size_t &batch_size, PartitionBatchRequest &batch_request) @@ -4336,7 +4333,14 @@ void DataStoreServiceClient::PrepareRangePartitionBatches( } batch_size += sizeof(uint64_t); - batch_request.op_types.push_back(WriteOpType::PUT); + if (is_deleted && !enabled_mvcc) + { + batch_request.op_types.push_back(WriteOpType::DELETE); + } + else + { + batch_request.op_types.push_back(WriteOpType::PUT); + } batch_size += sizeof(WriteOpType); SerializeTxRecord(is_deleted, diff --git a/eloq_data_store_service/rocksdb_data_store_common.cpp b/eloq_data_store_service/rocksdb_data_store_common.cpp index cfc0f7f..9515de5 100644 --- a/eloq_data_store_service/rocksdb_data_store_common.cpp +++ b/eloq_data_store_service/rocksdb_data_store_common.cpp @@ -702,8 +702,8 @@ void RocksDBDataStoreCommon::ScanNext(ScanRequest *scan_req) query_worker_pool_->SubmitWork( [this, scan_req]() { - DLOG(INFO) << "RocksDBDataStoreCommon::ScanNext " - << scan_req->GetPartitionId(); + // DLOG(INFO) << "RocksDBDataStoreCommon::ScanNext " + // << scan_req->GetPartitionId(); PoolableGuard poolable_guard(scan_req); uint32_t partition_id = scan_req->GetPartitionId();