Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/async_io_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ class AsyncIoManager
};

KvError ToKvError(int err_no);
int KvErrorToIoResult(KvError err);

class IouringMgr : public AsyncIoManager
{
Expand Down
63 changes: 62 additions & 1 deletion include/eloq_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <vector>

Expand Down Expand Up @@ -51,7 +52,9 @@ enum class RequestType : uint8_t
CleanExpired,
GlobalArchive,
GlobalReopen,
GlobalListArchiveTags
GlobalListArchiveTags,
ChangePartition,
ApplyPartitionChange
};

inline const char *RequestTypeToString(RequestType type)
Expand Down Expand Up @@ -90,6 +93,10 @@ inline const char *RequestTypeToString(RequestType type)
return "global_reopen";
case RequestType::GlobalListArchiveTags:
return "global_list_archive_tags";
case RequestType::ChangePartition:
return "change_partition";
case RequestType::ApplyPartitionChange:
return "apply_partition_change";
default:
return "unknown";
}
Expand Down Expand Up @@ -617,6 +624,55 @@ class GlobalListArchiveTagsRequest : public KvRequest
friend class EloqStore;
};

class ChangePartitionRequest : public KvRequest
{
public:
RequestType Type() const override
{
return RequestType::ChangePartition;
}

void SetPartitionFilter(PartitionFilter partition_filter)
{
if (partition_filter)
{
partition_filter_ = std::make_shared<PartitionFilter>(
std::move(partition_filter));
}
else
{
partition_filter_.reset();
}
}

void SetStopStore(bool stop_store)
{
stop_store_ = stop_store;
}

private:
std::shared_ptr<const PartitionFilter> partition_filter_{nullptr};
bool stop_store_{false};

friend class EloqStore;
};

class ApplyPartitionChangeRequest : public KvRequest
{
public:
RequestType Type() const override
{
return RequestType::ApplyPartitionChange;
}

private:
std::shared_ptr<const PartitionFilter> partition_filter_{nullptr};
std::atomic<uint32_t> *pending_{nullptr};

friend class EloqStore;
friend class Shard;
};
Comment on lines +660 to +674
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

pending_ needs owned lifetime across shard queues.

The implementation at Lines 1555-1576 in src/eloq_store.cpp points this field at a stack std::atomic<uint32_t> and enqueues raw ApplyPartitionChangeRequest* owned by a local sub_reqs vector. If any later AddKvRequest() fails, the function returns while already-queued shards still hold dangling pointers to both the counter and the request objects. Please switch this to heap/shared state or drain already-enqueued work before returning.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/eloq_store.h` around lines 660 - 674,
ApplyPartitionChangeRequest::pending_ currently points at a stack atomic leading
to dangling pointers when requests are enqueued; change pending_ to own shared
state (e.g. std::shared_ptr<std::atomic<uint32_t>>) and allocate that shared
atomic with std::make_shared in the code that builds sub_reqs, then store the
same shared_ptr into each ApplyPartitionChangeRequest so queued shards hold a
heap-owned counter; similarly ensure queued requests themselves are owned (use
std::shared_ptr<ApplyPartitionChangeRequest> or heap-allocate each request
instead of taking addresses of stack elements) or, alternatively, drain/rollback
already-enqueued requests before returning on error—update all places that
assign to pending_ and that enqueue requests (references to
ApplyPartitionChangeRequest::pending_, the code that builds sub_reqs, and
AddKvRequest calls) to use the shared ownership model.


class CompactRequest : public WriteRequest
{
public:
Expand Down Expand Up @@ -747,6 +803,10 @@ class EloqStore
void HandleGlobalArchiveRequest(GlobalArchiveRequest *req);
void HandleGlobalReopenRequest(GlobalReopenRequest *req);
void HandleGlobalListArchiveTagsRequest(GlobalListArchiveTagsRequest *req);
void HandleChangePartitionRequest(ChangePartitionRequest *req);
std::optional<PartitionGroupId> ResolvePartitionGroup(
const TableIdent &tbl_id) const;
bool PartitionIncluded(const TableIdent &tbl_id) const;
KvError CollectTablePartitions(const std::string &table_name,
std::vector<TableIdent> &partitions) const;
KvError InitStoreSpace();
Expand All @@ -773,6 +833,7 @@ class EloqStore

bool enable_eloqstore_metrics_{false};
std::atomic<StoreMode> store_mode_{StoreMode::Local};
std::shared_ptr<const PartitionFilter> partition_filter_{nullptr};

friend class Shard;
friend class AsyncIoManager;
Expand Down
3 changes: 3 additions & 0 deletions include/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum struct KvError : uint8_t
NoPermission, // Permission denied (EPERM).
CloudErr, // Cloud service error (non-timeout HTTP/CURL).
IoFail, // Unclassified local I/O error.
Aborted, // Operation aborted after in-flight I/O drained.
ExpiredTerm, // Cloud term file indicates stale process term.
OssInsufficientStorage, // Object storage out of capacity (HTTP 507).
};
Expand Down Expand Up @@ -59,6 +60,8 @@ constexpr const char *ErrorString(KvError err)
return "Device or resource busy";
case KvError::IoFail:
return "I/O failure";
case KvError::Aborted:
return "Operation aborted";
case KvError::CloudErr:
return "Cloud service is unavailable";
case KvError::Timeout:
Expand Down
16 changes: 10 additions & 6 deletions include/kv_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <cstdint>
#include <cstring>
#include <functional>
#include <optional>
#include <string>
#include <vector>

Expand All @@ -20,6 +21,9 @@ constexpr int64_t TB = 1LL << 40;

constexpr uint8_t max_overflow_pointers = 128;
constexpr uint16_t max_read_pages_batch = max_overflow_pointers;
using PartitionFilter =
std::function<std::optional<PartitionGroupId>(const TableIdent &)>;

struct KvOptions
{
int LoadFromIni(const char *path);
Expand Down Expand Up @@ -304,15 +308,15 @@ struct KvOptions
uint16_t prewarm_task_count = 3;

/**
* @brief Filter function to determine which partitions belong to this
* instance.
* The filter returning true means that this table partition belongs to the
* current instance and should be included in operations like prewarming,
* snapshotting, etc.
* @brief Classify which partition group a table partition belongs to.
* Returning std::nullopt means that this partition does not belong to this
* instance. If the function returns a partition group id, runtime
* partition-group state decides whether that partition is currently active
* for operations like prewarming, snapshotting, and reopen.
* If not set (empty), all partitions are considered to belong to this
* instance.
*/
std::function<bool(const TableIdent &)> partition_filter;
PartitionFilter partition_filter;
Comment on lines +311 to +319
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

There’s still one stale bool-era doc block above.

The comment at Lines 299-301 still says the filter "returns true", but the public contract is now std::optional<PartitionGroupId>. Please update that comment too so the options docs stay self-consistent.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/kv_options.h` around lines 311 - 319, Update the stale doc comment
preceding the PartitionFilter declaration to reflect the current contract:
replace the old "returns true" wording with language that the filter returns a
std::optional<PartitionGroupId> (i.e., returning std::nullopt means the
partition does not belong to this instance, returning a PartitionGroupId assigns
it to that group and runtime partition-group state decides activity), and note
that an empty PartitionFilter means all partitions belong to this instance;
reference the PartitionFilter type and PartitionGroupId in the updated text so
the behavior matches the code.

};

} // namespace eloqstore
16 changes: 16 additions & 0 deletions include/storage/shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@ class Shard
void OnTaskFinished(KvTask *task);
void OnReceivedReq(KvRequest *req);
bool ProcessReq(KvRequest *req);
static bool ShouldCheckPartitionFilter(RequestType type)
{
switch (type)
{
case RequestType::ListObject:
case RequestType::ListStandbyPartition:
case RequestType::GlobalArchive:
case RequestType::GlobalReopen:
case RequestType::GlobalListArchiveTags:
case RequestType::ChangePartition:
case RequestType::ApplyPartitionChange:
return false;
default:
return true;
}
}
void TryStartPendingWrite(const TableIdent &tbl_id);
void TryDispatchPendingWrites();

Expand Down
9 changes: 9 additions & 0 deletions include/tasks/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,19 @@ class KvTask
int WaitIoResult();
void WaitIo();
void FinishIo();
void ForceAbort()
{
force_aborted_ = true;
}
bool ForceAborted() const
{
return force_aborted_;
}
Comment on lines +118 to +125
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Assignments to force_aborted_:"
rg -n -C2 '\bforce_aborted_\s*='

echo
echo "Task acquisition/reset sites:"
rg -n -C5 'TaskManager::Get(BatchWriteTask|BackgroundWrite|ReadTask|ScanTask|ListObjectTask|ListStandbyPartitionTask|ReopenTask)|\bFreeTask\s*\('

Repository: eloqdata/eloqstore

Length of output: 9054


🏁 Script executed:

# Find Reset() implementations in task classes
ast-grep --pattern 'void Reset($$$) {
  $$$
}'

Repository: eloqdata/eloqstore

Length of output: 1731


🏁 Script executed:

# Also check the base KvTask class and derived task classes for initialization
rg -n 'class.*Task.*:' include/tasks/ | head -20

Repository: eloqdata/eloqstore

Length of output: 606


🏁 Script executed:

# Check if force_aborted_ is initialized in constructors
rg -n -B5 -A10 'force_aborted_' include/tasks/task.h

Repository: eloqdata/eloqstore

Length of output: 634


🏁 Script executed:

# Search for Reset() in WriteTask and derived classes
rg -n -A15 'void Reset' include/tasks/write_task.h

Repository: eloqdata/eloqstore

Length of output: 1552


🏁 Script executed:

# Search for Reset() in ReadTask
rg -n -A15 'void Reset' include/tasks/read_task.h

Repository: eloqdata/eloqstore

Length of output: 44


🏁 Script executed:

# Search for Reset() in ScanTask
rg -n -A15 'void Reset' include/tasks/scan_task.h

Repository: eloqdata/eloqstore

Length of output: 465


🏁 Script executed:

# Check ListObjectTask and ListStandbyPartitionTask
rg -n -A15 'void Reset' include/tasks/list_object_task.h include/tasks/list_standby_partition_task.h

Repository: eloqdata/eloqstore

Length of output: 44


🏁 Script executed:

# Find WriteTask::Reset() implementation
rg -n -A20 'void WriteTask::Reset' src/

Repository: eloqdata/eloqstore

Length of output: 1727


🏁 Script executed:

# Check if ReadTask constructor/destructor or any Reset exists
cat -n include/tasks/read_task.h

Repository: eloqdata/eloqstore

Length of output: 1215


🏁 Script executed:

# Check KvTask base class for any Reset method
rg -n 'virtual.*Reset|void Reset' include/tasks/task.h

Repository: eloqdata/eloqstore

Length of output: 44


Force-aborted state persists on task pool reuse—all task types need explicit reset.

force_aborted_ is never cleared when tasks are returned to the pool. WriteTask::Reset() does not reset this flag, and ReadTask, ScanTask, ListObjectTask, and ListStandbyPartitionTask have no Reset() method at all. Once a task is force-aborted, the flag remains true on the next reuse, causing subsequent requests to be incorrectly cancelled in FinishIo().

Add force_aborted_ = false; to:

  • WriteTask::Reset() in src/tasks/write_task.cpp
  • Task acquisition paths without Reset(): GetReadTask(), GetScanTask(), GetListObjectTask(), GetListStandbyPartitionTask() in src/tasks/task_manager.cpp
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/tasks/task.h` around lines 118 - 125, force_aborted_ on Task objects
is never cleared on reuse so tasks stay in a force-aborted state; ensure the
flag is reset when tasks are recycled by setting force_aborted_ = false in
WriteTask::Reset() (src/tasks/write_task.cpp) and clear the flag immediately
after acquiring pooled tasks in the task-manager getters GetReadTask(),
GetScanTask(), GetListObjectTask(), and GetListStandbyPartitionTask()
(src/tasks/task_manager.cpp) so that any reused
ReadTask/ScanTask/ListObjectTask/ListStandbyPartitionTask start with
ForceAborted() == false before use.


uint32_t inflight_io_{0};
int io_res_{0};
uint32_t io_flags_{0};
bool force_aborted_{false};

TaskStatus status_{TaskStatus::Idle};
KvRequest *req_{nullptr};
Expand Down
20 changes: 20 additions & 0 deletions include/tasks/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@ class TaskManager
void FinishExternalTask();

size_t NumActive() const;
template <typename F>
void ForEachActiveTask(F &&visitor)
{
auto visit = [&](KvTask *task)
{
if (task->status_ == TaskStatus::Idle ||
task->status_ == TaskStatus::Finished)
{
return;
}
visitor(task);
};
batch_write_pool_.ForEachTask(visit);
bg_write_pool_.ForEachTask(visit);
read_pool_.ForEachTask(visit);
scan_pool_.ForEachTask(visit);
list_object_pool_.ForEachTask(visit);
list_standby_partition_pool_.ForEachTask(visit);
reopen_pool_.ForEachTask(visit);
}

void Shutdown();

Expand Down
2 changes: 2 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

namespace eloqstore
{
using PartitionGroupId = uint32_t;

enum class StoreMode
{
Local = 0,
Expand Down
31 changes: 30 additions & 1 deletion src/async_io_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,12 +992,41 @@ KvError ToKvError(int err_no)
return KvError::OpenFileLimit;
case -ENOSPC:
return KvError::OutOfSpace;
case -ECANCELED:
return KvError::Aborted;
default:
LOG(ERROR) << "ToKvError: " << err_no;
return KvError::IoFail;
}
}

int KvErrorToIoResult(KvError err)
{
switch (err)
{
case KvError::NoError:
return 0;
case KvError::NoPermission:
return -EPERM;
case KvError::NotFound:
return -ENOENT;
case KvError::TryAgain:
return -EAGAIN;
case KvError::OutOfMem:
return -ENOMEM;
case KvError::Busy:
return -EBUSY;
case KvError::OpenFileLimit:
return -EMFILE;
case KvError::OutOfSpace:
return -ENOSPC;
case KvError::Aborted:
return -ECANCELED;
default:
return -EIO;
}
}
Comment on lines +1003 to +1028
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

KvErrorToIoResult currently loses domain-specific errors (non-round-trip mapping).

Unmapped values fall back to -EIO, so errors like KvError::ExpiredTerm get collapsed to KvError::IoFail after ToKvError(...) (downstream at Line 4705), which changes control-flow semantics.

🔧 Proposed fix (make conversion symmetric for standby-produced errors)
 KvError ToKvError(int err_no)
 {
     if (err_no >= 0)
     {
         return KvError::NoError;
     }
     switch (err_no)
     {
+    case -EINVAL:
+        return KvError::InvalidArgs;
     case -EPERM:
         return KvError::NoPermission;
     case -ENOENT:
         return KvError::NotFound;
+    case -ESTALE:
+        return KvError::ExpiredTerm;
+    case -ESHUTDOWN:
+        return KvError::NotRunning;
     case -EINTR:
     case -EAGAIN:
     case -ENOBUFS:
         return KvError::TryAgain;
     case -ENOMEM:
@@
 int KvErrorToIoResult(KvError err)
 {
     switch (err)
     {
     case KvError::NoError:
         return 0;
+    case KvError::InvalidArgs:
+        return -EINVAL;
     case KvError::NoPermission:
         return -EPERM;
     case KvError::NotFound:
         return -ENOENT;
+    case KvError::ExpiredTerm:
+        return -ESTALE;
+    case KvError::NotRunning:
+        return -ESHUTDOWN;
     case KvError::TryAgain:
         return -EAGAIN;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/async_io_manager.cpp` around lines 1003 - 1028, KvErrorToIoResult
currently maps unknown KvError values to -EIO, which loses domain-specific
errors—update the function so unmapped/standby-produced KvError values
round-trip correctly by returning the negative numeric KvError value instead of
-EIO in the default case; specifically modify KvErrorToIoResult (the switch over
KvError) so that known KvError cases keep their current errno mappings but the
default branch returns -(int)err (with an explicit cast if needed) to preserve
the original KvError when converted back by ToKvError.


std::pair<void *, IouringMgr::UserDataType> IouringMgr::DecodeUserData(
uint64_t user_data)
{
Expand Down Expand Up @@ -4673,7 +4702,7 @@ std::pair<ManifestFilePtr, KvError> StandbyStoreMgr::GetManifest(
return {nullptr, prep_err};
}
current_task->WaitIo();
prep_err = static_cast<KvError>(current_task->io_res_);
prep_err = ToKvError(current_task->io_res_);
if (prep_err != KvError::NoError)
{
return {nullptr, prep_err};
Expand Down
Loading
Loading