Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions store_handler/bigtable_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,13 @@ void EloqDS::BigTableHandler::FetchRangeSlices(
fetch_cc));
}

void EloqDS::BigTableHandler::FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc)
{
LOG(ERROR) << "BigTableHandler::FetchTableRangeSize not implemented";
assert(false);
}

void EloqDS::BigTableHandler::OnFetchRangeSlices(
google::cloud::future<google::cloud::StatusOr<
std::pair<bool, google::cloud::bigtable::Row>>> f,
Expand Down
3 changes: 3 additions & 0 deletions store_handler/bigtable_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class BigTableHandler : public txservice::store::DataStoreHandler

void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override;

void FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc) override;

/**
* @brief Read a row from base table or skindex table in datastore with
* specified key. Caller should pass in complete primary key or skindex key.
Expand Down
48 changes: 43 additions & 5 deletions store_handler/data_store_service_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1059,6 +1059,30 @@ void DataStoreServiceClient::FetchRangeSlices(
&FetchRangeSlicesCallback);
}

void DataStoreServiceClient::FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc)
{
txservice::TableName range_table_name(fetch_cc->table_name_->StringView(),
txservice::TableType::RangePartition,
fetch_cc->table_name_->Engine());

int32_t kv_partition_id =
KvPartitionIdOfRangeSlices(range_table_name, fetch_cc->partition_id_);
uint32_t shard_id = GetShardIdByPartitionId(kv_partition_id, false);

auto catalog_factory = GetCatalogFactory(range_table_name.Engine());
assert(catalog_factory != nullptr);
fetch_cc->kv_start_key_ =
EncodeRangeKey(catalog_factory, range_table_name, fetch_cc->start_key_);

Read(kv_range_table_name,
kv_partition_id,
shard_id,
fetch_cc->kv_start_key_,
fetch_cc,
&FetchRangeSizeCallback);
}

/**
* @brief Deletes data that is out of the specified range.
*
Expand Down Expand Up @@ -1275,16 +1299,19 @@ std::string DataStoreServiceClient::EncodeRangeKey(
* @param range_version The version of the range.
* @param version The general version number.
* @param segment_cnt The number of segments in the range.
* @param range_size The size of the range.
* @return Binary string containing the encoded range value.
*/
std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id,
uint64_t range_version,
uint64_t version,
uint32_t segment_cnt)
uint32_t segment_cnt,
int32_t range_size)
{
std::string kv_range_record;
kv_range_record.reserve(sizeof(int32_t) + sizeof(uint64_t) +
sizeof(uint64_t) + sizeof(uint32_t));
sizeof(uint64_t) + sizeof(uint32_t) +
sizeof(int32_t));
kv_range_record.append(reinterpret_cast<const char *>(&range_id),
sizeof(int32_t));
kv_range_record.append(reinterpret_cast<const char *>(&range_version),
Expand All @@ -1294,6 +1321,8 @@ std::string DataStoreServiceClient::EncodeRangeValue(int32_t range_id,
// segment_cnt of slices
kv_range_record.append(reinterpret_cast<const char *>(&segment_cnt),
sizeof(uint32_t));
kv_range_record.append(reinterpret_cast<const char *>(&range_size),
sizeof(int32_t));
return kv_range_record;
}

Expand Down Expand Up @@ -1361,6 +1390,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches(
RangeSliceBatchPlan plan;
plan.segment_cnt = 0;
plan.version = version;
plan.range_size = 0;

// Estimate capacity based on slices size
plan.segment_keys.reserve(slices.size() / 10 + 1); // Rough estimate
Expand Down Expand Up @@ -1409,6 +1439,7 @@ RangeSliceBatchPlan DataStoreServiceClient::PrepareRangeSliceBatches(
sizeof(uint32_t));
segment_record.append(slice_start_key.Data(), key_size);
uint32_t slice_size = static_cast<uint32_t>(slices[i]->Size());
plan.range_size += static_cast<int32_t>(slice_size);
segment_record.append(reinterpret_cast<const char *>(&slice_size),
sizeof(uint32_t));
}
Expand Down Expand Up @@ -1574,6 +1605,7 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord(
uint64_t range_version,
uint64_t version,
uint32_t segment_cnt,
int32_t range_size,
RangeMetadataAccumulator &accumulator)
{
// Compute kv_table_name and kv_partition_id
Expand All @@ -1584,8 +1616,8 @@ void DataStoreServiceClient::EnqueueRangeMetadataRecord(
// Encode key and value
std::string key_str =
EncodeRangeKey(catalog_factory, table_name, range_start_key);
std::string rec_str =
EncodeRangeValue(partition_id, range_version, version, segment_cnt);
std::string rec_str = EncodeRangeValue(
partition_id, range_version, version, segment_cnt, range_size);

// Get or create entry in accumulator
auto key = std::make_pair(kv_table_name, kv_partition_id);
Expand Down Expand Up @@ -1753,6 +1785,7 @@ bool DataStoreServiceClient::UpdateRangeSlices(
req.range_slices_,
req.partition_id_);
uint32_t segment_cnt = slice_plan.segment_cnt;
int32_t range_size = slice_plan.range_size;
int32_t kv_partition_id =
KvPartitionIdOfRangeSlices(*req.table_name_, req.partition_id_);
auto iter = slice_plans.find(kv_partition_id);
Expand All @@ -1777,6 +1810,7 @@ bool DataStoreServiceClient::UpdateRangeSlices(
req.range_version_,
req.ckpt_ts_,
segment_cnt,
range_size,
meta_acc);
}

Expand Down Expand Up @@ -1978,6 +2012,7 @@ bool DataStoreServiceClient::UpdateRangeSlices(
range_version,
version,
segment_cnt,
slice_plans[0].range_size,
meta_acc);

SyncConcurrentRequest *meta_sync_concurrent =
Expand Down Expand Up @@ -2069,6 +2104,7 @@ bool DataStoreServiceClient::UpsertRanges(
auto slice_plan = PrepareRangeSliceBatches(
table_name, version, range.slices_, range.partition_id_);
uint32_t segment_cnt = slice_plan.segment_cnt;
int32_t range_size = slice_plan.range_size;

int32_t kv_partition_id =
KvPartitionIdOfRangeSlices(table_name, range.partition_id_);
Expand All @@ -2092,6 +2128,7 @@ bool DataStoreServiceClient::UpsertRanges(
version, // range_version (using version for now)
version,
segment_cnt,
range_size,
meta_acc);
}

Expand Down Expand Up @@ -4683,7 +4720,8 @@ bool DataStoreServiceClient::InitTableRanges(

std::string key_str =
EncodeRangeKey(catalog_factory, table_name, *neg_inf_key);
std::string rec_str = EncodeRangeValue(init_range_id, version, version, 0);
std::string rec_str =
EncodeRangeValue(init_range_id, version, version, 0, 0);

keys.emplace_back(std::string_view(key_str.data(), key_str.size()));
records.emplace_back(std::string_view(rec_str.data(), rec_str.size()));
Expand Down
14 changes: 13 additions & 1 deletion store_handler/data_store_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct RangeSliceBatchPlan
std::vector<std::string> segment_keys; // Owned string buffers
std::vector<std::string> segment_records; // Owned string buffers
size_t version;
int32_t range_size{0};

// Clear method for reuse
void Clear()
Expand All @@ -74,6 +75,7 @@ struct RangeSliceBatchPlan
segment_keys.clear();
segment_records.clear();
version = 0;
range_size = 0;
}
};

Expand Down Expand Up @@ -278,6 +280,9 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler

void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override;

void FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc) override;

bool DeleteOutOfRangeData(
const txservice::TableName &table_name,
int32_t partition_id,
Expand Down Expand Up @@ -346,7 +351,8 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
std::string EncodeRangeValue(int32_t range_id,
uint64_t range_version,
uint64_t version,
uint32_t segment_cnt);
uint32_t segment_cnt,
int32_t range_size);
std::string EncodeRangeSliceKey(const txservice::TableName &table_name,
int32_t range_id,
uint32_t segment_id);
Expand Down Expand Up @@ -654,6 +660,7 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
uint64_t range_version,
uint64_t version,
uint32_t segment_cnt,
int32_t range_size,
RangeMetadataAccumulator &accumulator);

void DispatchRangeMetadataBatches(
Expand Down Expand Up @@ -934,6 +941,11 @@ class DataStoreServiceClient : public txservice::store::DataStoreHandler
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);

friend void FetchRangeSizeCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);
};

struct UpsertTableData
Expand Down
49 changes: 45 additions & 4 deletions store_handler/data_store_service_client_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,9 @@ void FetchTableRangesCallback(void *data,
for (uint32_t i = 0; i < items_size; i++)
{
scan_next_closure->GetItem(i, key, value, ts, ttl);
assert(value.size() == (sizeof(int32_t) + sizeof(uint64_t) +
sizeof(uint64_t) + sizeof(uint32_t)));
assert(value.size() ==
(sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) +
sizeof(uint32_t) + sizeof(int32_t)));
const char *buf = value.data();
int32_t partition_id = *(reinterpret_cast<const int32_t *>(buf));
buf += sizeof(partition_id);
Expand Down Expand Up @@ -925,6 +926,45 @@ void FetchTableRangesCallback(void *data,
}
}

void FetchRangeSizeCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result)
{
txservice::FetchTableRangeSizeCc *fetch_range_size_cc =
static_cast<txservice::FetchTableRangeSizeCc *>(data);

if (result.error_code() == remote::DataStoreError::KEY_NOT_FOUND)
{
fetch_range_size_cc->store_range_size_ = 0;
fetch_range_size_cc->SetFinish(
static_cast<uint32_t>(txservice::CcErrorCode::NO_ERROR));
}
else if (result.error_code() != remote::DataStoreError::NO_ERROR)
{
LOG(ERROR) << "Fetch range size failed with error code: "
<< result.error_code();
fetch_range_size_cc->SetFinish(
static_cast<uint32_t>(txservice::CcErrorCode::DATA_STORE_ERR));
}
else
{
ReadClosure *read_closure = static_cast<ReadClosure *>(closure);
std::string_view read_val = read_closure->Value();
assert(read_closure->TableName() == kv_range_table_name);
assert(read_val.size() ==
(sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) +
sizeof(uint32_t) + sizeof(int32_t)));
const char *buf = read_val.data();
buf += read_val.size() - sizeof(int32_t);
fetch_range_size_cc->store_range_size_ =
*reinterpret_cast<const int32_t *>(buf);

fetch_range_size_cc->SetFinish(
static_cast<uint32_t>(txservice::CcErrorCode::NO_ERROR));
}
}

void FetchRangeSlicesCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
Expand Down Expand Up @@ -965,8 +1005,9 @@ void FetchRangeSlicesCallback(void *data,
else
{
assert(read_closure->TableName() == kv_range_table_name);
assert(read_val.size() == (sizeof(int32_t) + sizeof(uint64_t) +
sizeof(uint64_t) + sizeof(uint32_t)));
assert(read_val.size() ==
(sizeof(int32_t) + sizeof(uint64_t) + sizeof(uint64_t) +
sizeof(uint32_t) + sizeof(int32_t)));
const char *buf = read_val.data();
int32_t range_partition_id =
*(reinterpret_cast<const int32_t *>(buf));
Expand Down
8 changes: 8 additions & 0 deletions store_handler/data_store_service_client_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -3102,6 +3102,14 @@ void FetchTableRangesCallback(void *data,
DataStoreServiceClient &client,
const remote::CommonResult &result);

/**
* Callback for fetching range size from table_ranges.
*/
void FetchRangeSizeCallback(void *data,
::google::protobuf::Closure *closure,
DataStoreServiceClient &client,
const remote::CommonResult &result);

/**
* Callback for fetching range slices.
*
Expand Down
6 changes: 6 additions & 0 deletions store_handler/dynamo_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2534,6 +2534,12 @@ void EloqDS::DynamoHandler::FetchRangeSlices(FetchRangeSlicesReq *fetch_cc)
assert(false);
}

void EloqDS::DynamoHandler::FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc)
{
LOG(ERROR) << "DynamoHandler::FetchTableRangeSize not implemented";
assert(false);
}

void EloqDS::DynamoHandler::OnFetchRangeSlices(
const Aws::DynamoDB::DynamoDBClient *client,
const Aws::DynamoDB::Model::GetItemRequest &request,
Expand Down
1 change: 1 addition & 0 deletions store_handler/dynamo_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class DynamoHandler : public txservice::store::DataStoreHandler
//-- range partition
void FetchTableRanges(FetchTableRangesCc *fetch_cc) override;
void FetchRangeSlices(FetchRangeSlicesReq *fetch_cc) override;
void FetchTableRangeSize(FetchTableRangeSizeCc *fetch_cc) override;

bool DeleteOutOfRangeData(
const txservice::TableName &table_name,
Expand Down
7 changes: 7 additions & 0 deletions store_handler/rocksdb_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,13 @@ void RocksDBHandler::FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc)
assert(false);
}

void RocksDBHandler::FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc)
{
LOG(ERROR) << "RocksDBHandler::FetchTableRangeSize not implemented";
assert(false);
}

bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql,
int32_t partition_id,
const txservice::TxKey *start_k)
Expand Down
3 changes: 3 additions & 0 deletions store_handler/rocksdb_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ class RocksDBHandler : public txservice::store::DataStoreHandler

void FetchRangeSlices(txservice::FetchRangeSlicesReq *fetch_cc) override;

void FetchTableRangeSize(
txservice::FetchTableRangeSizeCc *fetch_cc) override;

bool DeleteOutOfRangeDataInternal(std::string delete_from_partition_sql,
int32_t partition_id,
const txservice::TxKey *start_k);
Expand Down
4 changes: 3 additions & 1 deletion tx_service/include/cc/cc_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ class CcHandler
const TxRecord *record,
OperationType operation_type,
uint32_t key_shard_code,
CcHandlerResult<PostProcessResult> &hres) = 0;
CcHandlerResult<PostProcessResult> &hres,
int32_t partition_id = -1,
bool on_dirty_range = false) = 0;

/**
* @briefPost-processes a read/scan key. Post-processing clears the read
Expand Down
Loading
Loading