Conversation
WalkthroughThis PR introduces partition-filtering infrastructure with optional group ID mapping, new ChangePartition and ApplyPartitionChange request types to manage partition state transitions, a force-abort mechanism for tasks, error-to-IO result conversion utilities, and updates partition filter semantics from boolean to optional partition group ID across storage and task layers. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant EloqStore
participant Shard
participant TaskManager
Client->>EloqStore: HandleChangePartitionRequest(ChangePartitionRequest)
activate EloqStore
EloqStore->>TaskManager: ForEachActiveTask(visitor)
activate TaskManager
TaskManager-->>EloqStore: iterate active tasks
deactivate TaskManager
EloqStore->>Shard: SendRequest(ApplyPartitionChangeRequest)
activate Shard
Shard->>Shard: abort tasks outside new filter
Shard->>Shard: clear queued requests for excluded tables
Shard->>Shard: update partition_filter_ atomically
Shard-->>EloqStore: completion
deactivate Shard
EloqStore->>EloqStore: wait for pending changes
opt stop_store flag set
EloqStore->>EloqStore: stop store
end
EloqStore-->>Client: complete request
deactivate EloqStore
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| } | ||
| if (req->Type() == RequestType::ChangePartition) | ||
| { | ||
| HandleChangePartitionRequest(static_cast<ChangePartitionRequest *>(req)); |
There was a problem hiding this comment.
[cpplint] reported by reviewdog 🐶
Lines should be <= 80 characters long [whitespace/line_length] [2]
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@include/eloq_store.h`:
- Around line 660-674: ApplyPartitionChangeRequest::pending_ currently points at
a stack atomic leading to dangling pointers when requests are enqueued; change
pending_ to own shared state (e.g. std::shared_ptr<std::atomic<uint32_t>>) and
allocate that shared atomic with std::make_shared in the code that builds
sub_reqs, then store the same shared_ptr into each ApplyPartitionChangeRequest
so queued shards hold a heap-owned counter; similarly ensure queued requests
themselves are owned (use std::shared_ptr<ApplyPartitionChangeRequest> or
heap-allocate each request instead of taking addresses of stack elements) or,
alternatively, drain/rollback already-enqueued requests before returning on
error—update all places that assign to pending_ and that enqueue requests
(references to ApplyPartitionChangeRequest::pending_, the code that builds
sub_reqs, and AddKvRequest calls) to use the shared ownership model.
In `@include/kv_options.h`:
- Around line 311-319: Update the stale doc comment preceding the
PartitionFilter declaration to reflect the current contract: replace the old
"returns true" wording with language that the filter returns a
std::optional<PartitionGroupId> (i.e., returning std::nullopt means the
partition does not belong to this instance, returning a PartitionGroupId assigns
it to that group and runtime partition-group state decides activity), and note
that an empty PartitionFilter means all partitions belong to this instance;
reference the PartitionFilter type and PartitionGroupId in the updated text so
the behavior matches the code.
In `@include/tasks/task.h`:
- Around line 118-125: force_aborted_ on Task objects is never cleared on reuse
so tasks stay in a force-aborted state; ensure the flag is reset when tasks are
recycled by setting force_aborted_ = false in WriteTask::Reset()
(src/tasks/write_task.cpp) and clear the flag immediately after acquiring pooled
tasks in the task-manager getters GetReadTask(), GetScanTask(),
GetListObjectTask(), and GetListStandbyPartitionTask()
(src/tasks/task_manager.cpp) so that any reused
ReadTask/ScanTask/ListObjectTask/ListStandbyPartitionTask start with
ForceAborted() == false before use.
In `@src/async_io_manager.cpp`:
- Around line 1003-1028: KvErrorToIoResult currently maps unknown KvError values
to -EIO, which loses domain-specific errors—update the function so
unmapped/standby-produced KvError values round-trip correctly by returning the
negative numeric KvError value instead of -EIO in the default case; specifically
modify KvErrorToIoResult (the switch over KvError) so that known KvError cases
keep their current errno mappings but the default branch returns -(int)err (with
an explicit cast if needed) to preserve the original KvError when converted back
by ToKvError.
In `@src/eloq_store.cpp`:
- Around line 1555-1588: The loop in HandleChangePartitionRequest can return
early on AddKvRequest failure leaving already-enqueued
ApplyPartitionChangeRequest instances running and causing partial application;
modify the logic to detect a failure (e.g., set a local bool enqueue_failed),
stop enqueuing further requests (break the for loop) but do NOT return
immediately—wait for the atomic pending to reach zero (reuse the existing
while(pending.load(...)) backoff) so all already-enqueued sub_reqs finish, then
call req->SetDone(KvError::Busy) (and avoid calling Stop() on req->stop_store_
when enqueue_failed) to report the error; this keeps
HandleChangePartitionRequest, ApplyPartitionChangeRequest, pending,
AddKvRequest, and req->SetDone as the reference points to locate and implement
the change.
In `@src/tasks/task.cpp`:
- Around line 73-76: The current check only sets io_res_ = -ECANCELED when
force_aborted_ && io_res_ == 0, which lets positive (successful byte counts)
slip through; change the condition to detect any non-negative (successful)
completion: when force_aborted_ is true and io_res_ is >= 0, set io_res_ =
-ECANCELED so that force-abort overrides any successful result before conversion
via ToKvError(io_res_).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2ccf4779-3c6e-4a5a-b845-c95154cbf80d
📒 Files selected for processing (17)
include/async_io_manager.hinclude/eloq_store.hinclude/error.hinclude/kv_options.hinclude/storage/shard.hinclude/tasks/task.hinclude/tasks/task_manager.hinclude/types.hsrc/async_io_manager.cppsrc/eloq_store.cppsrc/standby_service.cppsrc/storage/shard.cppsrc/tasks/prewarm_task.cppsrc/tasks/reopen_task.cppsrc/tasks/task.cpptests/cloud.cpptests/manifest.cpp
| class ApplyPartitionChangeRequest : public KvRequest | ||
| { | ||
| public: | ||
| RequestType Type() const override | ||
| { | ||
| return RequestType::ApplyPartitionChange; | ||
| } | ||
|
|
||
| private: | ||
| std::shared_ptr<const PartitionFilter> partition_filter_{nullptr}; | ||
| std::atomic<uint32_t> *pending_{nullptr}; | ||
|
|
||
| friend class EloqStore; | ||
| friend class Shard; | ||
| }; |
There was a problem hiding this comment.
pending_ needs owned lifetime across shard queues.
The implementation at Lines 1555-1576 in src/eloq_store.cpp points this field at a stack std::atomic<uint32_t> and enqueues raw ApplyPartitionChangeRequest* owned by a local sub_reqs vector. If any later AddKvRequest() fails, the function returns while already-queued shards still hold dangling pointers to both the counter and the request objects. Please switch this to heap/shared state or drain already-enqueued work before returning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/eloq_store.h` around lines 660 - 674,
ApplyPartitionChangeRequest::pending_ currently points at a stack atomic leading
to dangling pointers when requests are enqueued; change pending_ to own shared
state (e.g. std::shared_ptr<std::atomic<uint32_t>>) and allocate that shared
atomic with std::make_shared in the code that builds sub_reqs, then store the
same shared_ptr into each ApplyPartitionChangeRequest so queued shards hold a
heap-owned counter; similarly ensure queued requests themselves are owned (use
std::shared_ptr<ApplyPartitionChangeRequest> or heap-allocate each request
instead of taking addresses of stack elements) or, alternatively, drain/rollback
already-enqueued requests before returning on error—update all places that
assign to pending_ and that enqueue requests (references to
ApplyPartitionChangeRequest::pending_, the code that builds sub_reqs, and
AddKvRequest calls) to use the shared ownership model.
| * @brief Classify which partition group a table partition belongs to. | ||
| * Returning std::nullopt means that this partition does not belong to this | ||
| * instance. If the function returns a partition group id, runtime | ||
| * partition-group state decides whether that partition is currently active | ||
| * for operations like prewarming, snapshotting, and reopen. | ||
| * If not set (empty), all partitions are considered to belong to this | ||
| * instance. | ||
| */ | ||
| std::function<bool(const TableIdent &)> partition_filter; | ||
| PartitionFilter partition_filter; |
There was a problem hiding this comment.
There’s still one stale bool-era doc block above.
The comment at Lines 299-301 still says the filter "returns true", but the public contract is now std::optional<PartitionGroupId>. Please update that comment too so the options docs stay self-consistent.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/kv_options.h` around lines 311 - 319, Update the stale doc comment
preceding the PartitionFilter declaration to reflect the current contract:
replace the old "returns true" wording with language that the filter returns a
std::optional<PartitionGroupId> (i.e., returning std::nullopt means the
partition does not belong to this instance, returning a PartitionGroupId assigns
it to that group and runtime partition-group state decides activity), and note
that an empty PartitionFilter means all partitions belong to this instance;
reference the PartitionFilter type and PartitionGroupId in the updated text so
the behavior matches the code.
| void ForceAbort() | ||
| { | ||
| force_aborted_ = true; | ||
| } | ||
| bool ForceAborted() const | ||
| { | ||
| return force_aborted_; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "Assignments to force_aborted_:"
rg -n -C2 '\bforce_aborted_\s*='
echo
echo "Task acquisition/reset sites:"
rg -n -C5 'TaskManager::Get(BatchWriteTask|BackgroundWrite|ReadTask|ScanTask|ListObjectTask|ListStandbyPartitionTask|ReopenTask)|\bFreeTask\s*\('Repository: eloqdata/eloqstore
Length of output: 9054
🏁 Script executed:
# Find Reset() implementations in task classes
ast-grep --pattern 'void Reset($$$) {
$$$
}'Repository: eloqdata/eloqstore
Length of output: 1731
🏁 Script executed:
# Also check the base KvTask class and derived task classes for initialization
rg -n 'class.*Task.*:' include/tasks/ | head -20Repository: eloqdata/eloqstore
Length of output: 606
🏁 Script executed:
# Check if force_aborted_ is initialized in constructors
rg -n -B5 -A10 'force_aborted_' include/tasks/task.hRepository: eloqdata/eloqstore
Length of output: 634
🏁 Script executed:
# Search for Reset() in WriteTask and derived classes
rg -n -A15 'void Reset' include/tasks/write_task.hRepository: eloqdata/eloqstore
Length of output: 1552
🏁 Script executed:
# Search for Reset() in ReadTask
rg -n -A15 'void Reset' include/tasks/read_task.hRepository: eloqdata/eloqstore
Length of output: 44
🏁 Script executed:
# Search for Reset() in ScanTask
rg -n -A15 'void Reset' include/tasks/scan_task.hRepository: eloqdata/eloqstore
Length of output: 465
🏁 Script executed:
# Check ListObjectTask and ListStandbyPartitionTask
rg -n -A15 'void Reset' include/tasks/list_object_task.h include/tasks/list_standby_partition_task.hRepository: eloqdata/eloqstore
Length of output: 44
🏁 Script executed:
# Find WriteTask::Reset() implementation
rg -n -A20 'void WriteTask::Reset' src/Repository: eloqdata/eloqstore
Length of output: 1727
🏁 Script executed:
# Check if ReadTask constructor/destructor or any Reset exists
cat -n include/tasks/read_task.hRepository: eloqdata/eloqstore
Length of output: 1215
🏁 Script executed:
# Check KvTask base class for any Reset method
rg -n 'virtual.*Reset|void Reset' include/tasks/task.hRepository: eloqdata/eloqstore
Length of output: 44
Force-aborted state persists on task pool reuse—all task types need explicit reset.
force_aborted_ is never cleared when tasks are returned to the pool. WriteTask::Reset() does not reset this flag, and ReadTask, ScanTask, ListObjectTask, and ListStandbyPartitionTask have no Reset() method at all. Once a task is force-aborted, the flag remains true on the next reuse, causing subsequent requests to be incorrectly cancelled in FinishIo().
Add force_aborted_ = false; to:
WriteTask::Reset()in src/tasks/write_task.cpp- Task acquisition paths without
Reset():GetReadTask(),GetScanTask(),GetListObjectTask(),GetListStandbyPartitionTask()in src/tasks/task_manager.cpp
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@include/tasks/task.h` around lines 118 - 125, force_aborted_ on Task objects
is never cleared on reuse so tasks stay in a force-aborted state; ensure the
flag is reset when tasks are recycled by setting force_aborted_ = false in
WriteTask::Reset() (src/tasks/write_task.cpp) and clear the flag immediately
after acquiring pooled tasks in the task-manager getters GetReadTask(),
GetScanTask(), GetListObjectTask(), and GetListStandbyPartitionTask()
(src/tasks/task_manager.cpp) so that any reused
ReadTask/ScanTask/ListObjectTask/ListStandbyPartitionTask start with
ForceAborted() == false before use.
| int KvErrorToIoResult(KvError err) | ||
| { | ||
| switch (err) | ||
| { | ||
| case KvError::NoError: | ||
| return 0; | ||
| case KvError::NoPermission: | ||
| return -EPERM; | ||
| case KvError::NotFound: | ||
| return -ENOENT; | ||
| case KvError::TryAgain: | ||
| return -EAGAIN; | ||
| case KvError::OutOfMem: | ||
| return -ENOMEM; | ||
| case KvError::Busy: | ||
| return -EBUSY; | ||
| case KvError::OpenFileLimit: | ||
| return -EMFILE; | ||
| case KvError::OutOfSpace: | ||
| return -ENOSPC; | ||
| case KvError::Aborted: | ||
| return -ECANCELED; | ||
| default: | ||
| return -EIO; | ||
| } | ||
| } |
There was a problem hiding this comment.
KvErrorToIoResult currently loses domain-specific errors (non-round-trip mapping).
Unmapped values fall back to -EIO, so errors like KvError::ExpiredTerm get collapsed to KvError::IoFail after ToKvError(...) (downstream at Line 4705), which changes control-flow semantics.
🔧 Proposed fix (make conversion symmetric for standby-produced errors)
KvError ToKvError(int err_no)
{
if (err_no >= 0)
{
return KvError::NoError;
}
switch (err_no)
{
+ case -EINVAL:
+ return KvError::InvalidArgs;
case -EPERM:
return KvError::NoPermission;
case -ENOENT:
return KvError::NotFound;
+ case -ESTALE:
+ return KvError::ExpiredTerm;
+ case -ESHUTDOWN:
+ return KvError::NotRunning;
case -EINTR:
case -EAGAIN:
case -ENOBUFS:
return KvError::TryAgain;
case -ENOMEM:
@@
int KvErrorToIoResult(KvError err)
{
switch (err)
{
case KvError::NoError:
return 0;
+ case KvError::InvalidArgs:
+ return -EINVAL;
case KvError::NoPermission:
return -EPERM;
case KvError::NotFound:
return -ENOENT;
+ case KvError::ExpiredTerm:
+ return -ESTALE;
+ case KvError::NotRunning:
+ return -ESHUTDOWN;
case KvError::TryAgain:
return -EAGAIN;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/async_io_manager.cpp` around lines 1003 - 1028, KvErrorToIoResult
currently maps unknown KvError values to -EIO, which loses domain-specific
errors—update the function so unmapped/standby-produced KvError values
round-trip correctly by returning the negative numeric KvError value instead of
-EIO in the default case; specifically modify KvErrorToIoResult (the switch over
KvError) so that known KvError cases keep their current errno mappings but the
default branch returns -(int)err (with an explicit cast if needed) to preserve
the original KvError when converted back by ToKvError.
| void EloqStore::HandleChangePartitionRequest(ChangePartitionRequest *req) | ||
| { | ||
| std::vector<std::unique_ptr<ApplyPartitionChangeRequest>> sub_reqs; | ||
| sub_reqs.reserve(shards_.size()); | ||
| std::atomic<uint32_t> pending(static_cast<uint32_t>(shards_.size())); | ||
|
|
||
| for (auto &shard : shards_) | ||
| { | ||
| auto sub_req = std::make_unique<ApplyPartitionChangeRequest>(); | ||
| sub_req->partition_filter_ = req->partition_filter_; | ||
| sub_req->pending_ = &pending; | ||
| if (!shard->AddKvRequest(sub_req.get())) | ||
| { | ||
| req->SetDone(KvError::Busy); | ||
| return; | ||
| } | ||
| sub_reqs.push_back(std::move(sub_req)); | ||
| } | ||
|
|
||
| while (pending.load(std::memory_order_acquire) > 0) | ||
| { | ||
| #ifdef ELOQ_MODULE_ENABLED | ||
| bthread_usleep(1000); | ||
| #else | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(1)); | ||
| #endif | ||
| } | ||
|
|
||
| if (req->stop_store_) | ||
| { | ||
| Stop(); | ||
| } | ||
| req->SetDone(KvError::NoError); | ||
| } |
There was a problem hiding this comment.
Consider handling partial enqueue failures more gracefully.
If AddKvRequest fails for a shard (Line 1566), the function returns KvError::Busy immediately. However, sub-requests already enqueued to earlier shards will still be processed and decrement pending, while the caller receives an error. This creates a partial state where some shards apply the change while others don't.
Consider either:
- Waiting for already-enqueued requests to complete before returning the error
- Adding a mechanism to cancel/rollback the partial change
This may be acceptable if Busy errors are rare and the system is expected to retry, but it's worth noting the potential for inconsistent partition filter state across shards.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/eloq_store.cpp` around lines 1555 - 1588, The loop in
HandleChangePartitionRequest can return early on AddKvRequest failure leaving
already-enqueued ApplyPartitionChangeRequest instances running and causing
partial application; modify the logic to detect a failure (e.g., set a local
bool enqueue_failed), stop enqueuing further requests (break the for loop) but
do NOT return immediately—wait for the atomic pending to reach zero (reuse the
existing while(pending.load(...)) backoff) so all already-enqueued sub_reqs
finish, then call req->SetDone(KvError::Busy) (and avoid calling Stop() on
req->stop_store_ when enqueue_failed) to report the error; this keeps
HandleChangePartitionRequest, ApplyPartitionChangeRequest, pending,
AddKvRequest, and req->SetDone as the reference points to locate and implement
the change.
| if (force_aborted_ && io_res_ == 0) | ||
| { | ||
| io_res_ = -ECANCELED; | ||
| } |
There was a problem hiding this comment.
Abort should override any successful io_res_, not only zero.
io_res_ carries raw completion results, so successful reads/writes can be positive byte counts. With == 0, a force-aborted task can still bubble up NoError through ToKvError(io_res_).
Suggested fix
- if (force_aborted_ && io_res_ == 0)
+ if (force_aborted_ && io_res_ >= 0)
{
io_res_ = -ECANCELED;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (force_aborted_ && io_res_ == 0) | |
| { | |
| io_res_ = -ECANCELED; | |
| } | |
| if (force_aborted_ && io_res_ >= 0) | |
| { | |
| io_res_ = -ECANCELED; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/tasks/task.cpp` around lines 73 - 76, The current check only sets io_res_
= -ECANCELED when force_aborted_ && io_res_ == 0, which lets positive
(successful byte counts) slip through; change the condition to detect any
non-negative (successful) completion: when force_aborted_ is true and io_res_ is
>= 0, set io_res_ = -ECANCELED so that force-abort overrides any successful
result before conversion via ToKvError(io_res_).
Here are some reminders before you submit the pull request
fixes eloqdb/eloqstore#issue_idctest --test-dir build/tests/Summary by CodeRabbit
Release Notes
New Features
Improvements