Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/kv_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ struct KvOptions
* @brief The maximum number of running archive tasks at the same time.
*/
uint16_t max_archive_tasks = 256;
/**
* @brief Maximum number of per-partition requests submitted at a time by
* global operations such as global archive/reopen.
*/
uint32_t max_global_request_batch = 1000;
/**
* @brief Move pages in data file that space amplification factor
* bigger than this value.
Expand Down
264 changes: 196 additions & 68 deletions src/eloq_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ bool EloqStore::ValidateOptions(KvOptions &opts)
LOG(ERROR) << "Option max_inflight_write cannot be zero";
return false;
}
if (opts.max_global_request_batch == 0)
{
LOG(ERROR) << "Option max_global_request_batch cannot be zero";
return false;
}
if ((opts.data_page_size & (page_align - 1)) != 0)
{
LOG(ERROR) << "Option data_page_size is not page aligned";
Expand Down Expand Up @@ -930,40 +935,98 @@ void EloqStore::HandleDropTableRequest(DropTableRequest *req)
req->pending_.store(static_cast<uint32_t>(partitions.size()),
std::memory_order_relaxed);

auto on_truncate_done = [req](KvRequest *sub_req)
for (const TableIdent &partition : partitions)
{
auto trunc_req = std::make_unique<TruncateRequest>();
trunc_req->SetArgs(partition, std::string_view{});
req->truncate_reqs_.push_back(std::move(trunc_req));
}

struct DropTableScheduleState
: public std::enable_shared_from_this<DropTableScheduleState>
{
KvError sub_err = sub_req->Error();
if (sub_err != KvError::NoError)
EloqStore *store = nullptr;
DropTableRequest *req = nullptr;
size_t total = 0;
std::atomic<size_t> next_index{0};

bool HandleTruncateResult(KvError sub_err)
{
uint8_t expected = static_cast<uint8_t>(KvError::NoError);
uint8_t desired = static_cast<uint8_t>(sub_err);
req->first_error_.compare_exchange_strong(
expected,
desired,
std::memory_order_relaxed,
std::memory_order_relaxed);
if (sub_err != KvError::NoError)
{
uint8_t expected = static_cast<uint8_t>(KvError::NoError);
uint8_t desired = static_cast<uint8_t>(sub_err);
req->first_error_.compare_exchange_strong(
expected,
desired,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1)
{
KvError final_err = static_cast<KvError>(
req->first_error_.load(std::memory_order_relaxed));
req->SetDone(final_err);
return true;
}
return false;
}
if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1)

void OnTruncateDone(KvRequest *sub_req)
{
KvError final_err = static_cast<KvError>(
req->first_error_.load(std::memory_order_relaxed));
req->SetDone(final_err);
if (HandleTruncateResult(sub_req->Error()))
{
return;
}
ScheduleNext();
}
};

req->truncate_reqs_.reserve(partitions.size());
for (const TableIdent &partition : partitions)
{
auto trunc_req = std::make_unique<TruncateRequest>();
trunc_req->SetArgs(partition, std::string_view{});
TruncateRequest *ptr = trunc_req.get();
req->truncate_reqs_.push_back(std::move(trunc_req));
if (!ExecAsyn(ptr, 0, on_truncate_done))
void ScheduleNext()
{
LOG(ERROR)
<< "Handle droptable request, enqueue truncate request fail";
ptr->SetDone(KvError::NotRunning);
while (true)
{
size_t idx = next_index.fetch_add(1, std::memory_order_relaxed);
if (idx >= total)
{
return;
}

TruncateRequest *ptr = req->truncate_reqs_[idx].get();
auto self = shared_from_this();
auto on_truncate_done = [self](KvRequest *sub_req)
{ self->OnTruncateDone(sub_req); };
if (store->ExecAsyn(ptr, 0, on_truncate_done))
{
return;
}

LOG(ERROR) << "Handle droptable request, enqueue truncate "
"request fail";
ptr->callback_ = nullptr;
ptr->SetDone(KvError::NotRunning);
if (HandleTruncateResult(KvError::NotRunning))
{
return;
}
}
}
};

auto state = std::make_shared<DropTableScheduleState>();
state->store = this;
state->req = req;
state->total = req->truncate_reqs_.size();

size_t max_inflight =
std::max<uint32_t>(options_.max_global_request_batch, 1);
if (max_inflight > state->total)
{
max_inflight = state->total;
}

for (size_t i = 0; i < max_inflight; ++i)
{
state->ScheduleNext();
}
}

Expand Down Expand Up @@ -1211,6 +1274,9 @@ void EloqStore::HandleGlobalArchiveRequest(GlobalArchiveRequest *req)
{
max_inflight = 1;
}
max_inflight = std::min(max_inflight,
static_cast<size_t>(std::max<uint32_t>(
options_.max_global_request_batch, 1)));
if (max_inflight > state->total)
{
max_inflight = state->total;
Expand Down Expand Up @@ -1334,59 +1400,121 @@ void EloqStore::HandleGlobalReopenRequest(GlobalReopenRequest *req)
req->pending_.store(static_cast<uint32_t>(partitions.size()),
std::memory_order_relaxed);

auto on_reopen_done = [req](KvRequest *sub_req)
{
auto *reopen_req = static_cast<ReopenRequest *>(sub_req);
KvError sub_err = reopen_req->Error();
if (sub_err != KvError::NoError)
{
LOG(ERROR) << "HandleGlobalReopenRequest sub request failed, table "
<< reopen_req->TableId() << ", tag " << reopen_req->Tag()
<< ", error " << static_cast<uint32_t>(sub_err)
<< ", msg " << reopen_req->ErrMessage();
uint8_t expected = static_cast<uint8_t>(KvError::NoError);
uint8_t desired = static_cast<uint8_t>(sub_err);
req->first_error_.compare_exchange_strong(
expected,
desired,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
else
{
DLOG(INFO) << "HandleGlobalReopenRequest sub request succeeded, "
<< "table " << reopen_req->TableId() << ", tag "
<< reopen_req->Tag();
}
if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1)
{
KvError final_err = static_cast<KvError>(
req->first_error_.load(std::memory_order_relaxed));
DLOG(INFO) << "HandleGlobalReopenRequest finish, tag " << req->Tag()
<< ", final_error " << static_cast<uint32_t>(final_err);
req->SetDone(final_err);
}
};

for (const TableIdent &partition : partitions)
{
DLOG(INFO) << "HandleGlobalReopenRequest enqueue partition "
<< partition << ", tag " << req->Tag();
auto reopen_req = std::make_unique<ReopenRequest>();
reopen_req->SetArgs(partition);
if (!req->Tag().empty())
{
reopen_req->SetTag(req->Tag());
}
ReopenRequest *ptr = reopen_req.get();
req->reopen_reqs_.push_back(std::move(reopen_req));
if (!ExecAsyn(ptr, 0, on_reopen_done))
}

struct ReopenScheduleState
: public std::enable_shared_from_this<ReopenScheduleState>
{
EloqStore *store = nullptr;
GlobalReopenRequest *req = nullptr;
size_t total = 0;
std::atomic<size_t> next_index{0};

bool HandleReopenResult(ReopenRequest *reopen_req)
{
LOG(ERROR)
<< "Handle global reopen request, enqueue reopen request fail, "
<< "partition " << partition << ", tag " << req->Tag();
ptr->SetDone(KvError::NotRunning);
KvError sub_err = reopen_req->Error();
if (sub_err != KvError::NoError)
{
LOG(ERROR) << "HandleGlobalReopenRequest sub request failed, "
<< "table " << reopen_req->TableId() << ", tag "
<< reopen_req->Tag() << ", error "
<< static_cast<uint32_t>(sub_err) << ", msg "
<< reopen_req->ErrMessage();
uint8_t expected = static_cast<uint8_t>(KvError::NoError);
uint8_t desired = static_cast<uint8_t>(sub_err);
req->first_error_.compare_exchange_strong(
expected,
desired,
std::memory_order_relaxed,
std::memory_order_relaxed);
}
else
{
DLOG(INFO) << "HandleGlobalReopenRequest sub request "
<< "succeeded, table " << reopen_req->TableId()
<< ", tag " << reopen_req->Tag();
}
if (req->pending_.fetch_sub(1, std::memory_order_acq_rel) == 1)
{
KvError final_err = static_cast<KvError>(
req->first_error_.load(std::memory_order_relaxed));
DLOG(INFO) << "HandleGlobalReopenRequest finish, tag "
<< req->Tag() << ", final_error "
<< static_cast<uint32_t>(final_err);
req->SetDone(final_err);
return true;
}
return false;
}

void OnReopenDone(KvRequest *sub_req)
{
auto *reopen_req = static_cast<ReopenRequest *>(sub_req);
if (HandleReopenResult(reopen_req))
{
return;
}
ScheduleNext();
}

void ScheduleNext()
{
while (true)
{
size_t idx = next_index.fetch_add(1, std::memory_order_relaxed);
if (idx >= total)
{
return;
}

ReopenRequest *ptr = req->reopen_reqs_[idx].get();
DLOG(INFO) << "HandleGlobalReopenRequest enqueue partition "
<< ptr->TableId() << ", tag " << req->Tag();
auto self = shared_from_this();
auto on_reopen_done = [self](KvRequest *sub_req)
{ self->OnReopenDone(sub_req); };
if (store->ExecAsyn(ptr, 0, on_reopen_done))
{
return;
}

LOG(ERROR) << "Handle global reopen request, enqueue reopen "
"request fail, partition "
<< ptr->TableId() << ", tag " << req->Tag();
ptr->callback_ = nullptr;
ptr->SetDone(KvError::NotRunning);
if (HandleReopenResult(ptr))
{
return;
}
}
}
};

auto state = std::make_shared<ReopenScheduleState>();
state->store = this;
state->req = req;
state->total = req->reopen_reqs_.size();

size_t max_inflight =
std::max<uint32_t>(options_.max_global_request_batch, 1);
if (max_inflight > state->total)
{
max_inflight = state->total;
}

for (size_t i = 0; i < max_inflight; ++i)
{
state->ScheduleNext();
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/kv_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ int KvOptions::LoadFromIni(const char *path)
max_archive_tasks =
reader.GetUnsigned(sec_run, "max_archive_tasks", 256);
}
if (reader.HasValue(sec_run, "max_global_request_batch"))
{
max_global_request_batch = reader.GetUnsigned(
sec_run, "max_global_request_batch", max_global_request_batch);
}
if (reader.HasValue(sec_run, "file_amplify_factor"))
{
file_amplify_factor =
Expand Down Expand Up @@ -392,6 +397,7 @@ bool KvOptions::operator==(const KvOptions &other) const
num_retained_archives == other.num_retained_archives &&
archive_interval_secs == other.archive_interval_secs &&
max_archive_tasks == other.max_archive_tasks &&
max_global_request_batch == other.max_global_request_batch &&
file_amplify_factor == other.file_amplify_factor &&
local_space_limit == other.local_space_limit &&
reserve_space_ratio == other.reserve_space_ratio &&
Expand Down
Loading