Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/utility/buffer_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class BufferStorage : public IndexStorage {
}

//! Open storage
int open(const std::string &path, bool /*create*/) override {
int open(const std::string &path, bool /*create_if_missing*/) override {
file_name_ = path;
buffer_pool_ = std::make_shared<ailego::VecBufferPool>(path);
buffer_pool_handle_ = std::make_shared<ailego::VecBufferPoolHandle>(
Expand Down
4 changes: 2 additions & 2 deletions src/core/utility/mmap_file_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ class MMapFileStorage : public IndexStorage {
}

//! Open storage
int open(const std::string &path, bool create) override {
if (!ailego::File::IsExist(path) && create) {
int open(const std::string &path, bool create_if_missing) override {
if (!ailego::File::IsExist(path) && create_if_missing) {
size_t last_slash = path.rfind('/');
if (last_slash != std::string::npos) {
ailego::File::MakePath(path.substr(0, last_slash));
Expand Down
6 changes: 3 additions & 3 deletions src/db/collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ std::vector<SegmentTask::Ptr> CollectionImpl::build_compact_task(
if (current_actual_doc_count + actual_doc_count >
max_doc_count_per_segment) {
// only create SegmentCompactTask when rebuild=true
task = SegmentTask::CreateComapctTask(
task = SegmentTask::CreateCompactTask(
CompactTask{path_, schema, current_group,
allocate_segment_id_for_tmp_segment(), filter,
!options_.enable_mmap_, concurrency});
Expand All @@ -972,7 +972,7 @@ std::vector<SegmentTask::Ptr> CollectionImpl::build_compact_task(
current_group[0], "", nullptr, concurrency});
skip_task = current_group[0]->all_vector_index_ready();
} else {
task = SegmentTask::CreateComapctTask(
task = SegmentTask::CreateCompactTask(
CompactTask{path_, schema, current_group,
allocate_segment_id_for_tmp_segment(), nullptr,
!options_.enable_mmap_, concurrency});
Expand Down Expand Up @@ -1001,7 +1001,7 @@ std::vector<SegmentTask::Ptr> CollectionImpl::build_compact_task(
task = SegmentTask::CreateCreateVectorIndexTask(
CreateVectorIndexTask{current_group[0], "", nullptr, concurrency});
} else {
task = SegmentTask::CreateComapctTask(CompactTask{
task = SegmentTask::CreateCompactTask(CompactTask{
path_, schema, current_group, allocate_segment_id_for_tmp_segment(),
rebuild ? filter : nullptr, !options_.enable_mmap_, concurrency});
}
Expand Down
4 changes: 2 additions & 2 deletions src/db/index/common/doc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1154,15 +1154,15 @@ struct Doc::ValueEqual {
const std::vector<float> &b) const {
if (a.size() != b.size()) return false;
for (size_t i = 0; i < a.size(); ++i)
if (std::fabs(a[i] - b[i]) >= 1e-6f) return false;
if (std::fabs(a[i] - b[i]) >= 1e-4f) return false;
return true;
}

bool operator()(const std::vector<double> &a,
const std::vector<double> &b) const {
if (a.size() != b.size()) return false;
for (size_t i = 0; i < a.size(); ++i)
if (std::fabs(a[i] - b[i]) >= 1e-9) return false;
if (std::fabs(a[i] - b[i]) >= 1e-6) return false;
return true;
}
};
Expand Down
18 changes: 16 additions & 2 deletions src/db/index/segment/segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class SegmentImpl : public Segment,
const std::vector<int> &indices) const override;

ExecBatchPtr fetch(const std::vector<std::string> &columns,
int indice) const override;
int index) const override;

RecordBatchReaderPtr scan(
const std::vector<std::string> &columns) const override;
Expand Down Expand Up @@ -1631,6 +1631,13 @@ Status SegmentImpl::create_vector_index(

std::string index_file_path = FileHelper::MakeVectorIndexPath(
path_, column, segment_meta_->id(), block_id);
if (FileHelper::FileExists(index_file_path)) {
LOG_WARN(
"Index file[%s] already exists (possible crash residue); cleaning "
"and overwriting.",
index_file_path.c_str());
FileHelper::RemoveFile(index_file_path);
}
auto vector_indexer = merge_vector_indexer(
index_file_path, column, *field_with_new_index_params, concurrency);
if (!vector_indexer.has_value()) {
Expand Down Expand Up @@ -1668,6 +1675,13 @@ Status SegmentImpl::create_vector_index(

std::string index_file_path = FileHelper::MakeVectorIndexPath(
path_, column, segment_meta_->id(), block_id);
if (FileHelper::FileExists(index_file_path)) {
LOG_WARN(
"Index file[%s] already exists (possible crash residue); cleaning "
"and overwriting.",
index_file_path.c_str());
FileHelper::RemoveFile(index_file_path);
}
auto vector_indexer = merge_vector_indexer(index_file_path, column,
*field_with_flat, concurrency);
if (!vector_indexer.has_value()) {
Expand Down Expand Up @@ -2031,7 +2045,7 @@ Status SegmentImpl::create_scalar_index(const std::vector<std::string> &columns,
s = SegmentHelper::ReduceScalarIndex(new_scalar_indexer, batch_value,
accu_doc_count);
if (!s.ok()) {
LOG_ERROR("Reduce Scalar Index faield, err: %s", s.message().c_str());
LOG_ERROR("Reduce Scalar Index failed, err: %s", s.message().c_str());
}
CHECK_RETURN_STATUS(s);

Expand Down
6 changes: 3 additions & 3 deletions src/db/index/segment/segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Segment {
virtual Status drop_column(const std::string &column_name) = 0;

virtual Status create_all_vector_index(
int concurrency, SegmentMeta::Ptr *new_segmnet_meta,
int concurrency, SegmentMeta::Ptr *new_segment_meta,
std::unordered_map<std::string, VectorColumnIndexer::Ptr>
*vector_indexers,
std::unordered_map<std::string, VectorColumnIndexer::Ptr>
Expand All @@ -86,14 +86,14 @@ class Segment {
// defined in segment.h cause it needs to access block_id generator
virtual Status create_vector_index(
const std::string &column, const IndexParams::Ptr &index_params,
int concurrency, SegmentMeta::Ptr *new_segmnet_meta,
int concurrency, SegmentMeta::Ptr *new_segment_meta,
std::unordered_map<std::string, VectorColumnIndexer::Ptr>
*vector_indexers,
std::unordered_map<std::string, VectorColumnIndexer::Ptr>
*quant_vector_indexers) = 0;

virtual Status drop_vector_index(
const std::string &column, SegmentMeta::Ptr *new_segmnet_meta,
const std::string &column, SegmentMeta::Ptr *new_segment_meta,
std::unordered_map<std::string, VectorColumnIndexer::Ptr>
*vector_indexers) = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/db/index/segment/segment_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class SegmentTask {
std::variant<CompactTask, CreateVectorIndexTask, DropVectorIndexTask,
CreateScalarIndexTask, DropScalarIndexTask>;

static Ptr CreateComapctTask(const CompactTask &task) {
static Ptr CreateCompactTask(const CompactTask &task) {
return std::make_shared<SegmentTask>(task);
}

Expand Down
2 changes: 1 addition & 1 deletion src/include/zvec/core/framework/index_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ class IndexStorage : public IndexModule {
virtual int cleanup(void) = 0;

//! Open storage
virtual int open(const std::string &path, bool create) = 0;
virtual int open(const std::string &path, bool create_if_missing) = 0;

//! Flush storage
virtual int flush(void) = 0;
Expand Down
40 changes: 22 additions & 18 deletions tests/db/crash_recovery/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ if(APPLE)
endif()


# Build data_generator executable
cc_binary(
NAME data_generator
LIBS zvec_db
# Common libraries
set(CRASH_RECOVERY_COMMON_LIBS
zvec_db
zvec_proto
core_knn_flat
core_knn_flat_sparse
Expand All @@ -34,36 +33,41 @@ cc_binary(
core_quantizer
${CMAKE_THREAD_LIBS_INIT}
${CMAKE_DL_LIBS}
)


# Build data_generator executable
cc_binary(
NAME data_generator
LIBS ${CRASH_RECOVERY_COMMON_LIBS}
SRCS data_generator.cc
INCS .. ../../src
LDFLAGS ${APPLE_FRAMEWORK_LIBS}
)


# Build collection_optimizer executable
cc_binary(
NAME collection_optimizer
LIBS ${CRASH_RECOVERY_COMMON_LIBS}
SRCS collection_optimizer.cc
INCS .. ../../src
LDFLAGS ${APPLE_FRAMEWORK_LIBS}
)


# Build test executables
file(GLOB ALL_TEST_SRCS *_test.cc)
foreach(CC_SRCS ${ALL_TEST_SRCS})
get_filename_component(CC_TARGET ${CC_SRCS} NAME_WE)
cc_gmock(
NAME ${CC_TARGET} STRICT
LIBS zvec_db
zvec_proto
core_knn_flat
core_knn_flat_sparse
core_knn_hnsw
core_knn_hnsw_sparse
core_knn_ivf
core_knn_hnsw_rabitq
core_mix_reducer
core_metric
core_utility
core_quantizer
${CMAKE_THREAD_LIBS_INIT}
${CMAKE_DL_LIBS}
LIBS ${CRASH_RECOVERY_COMMON_LIBS}
SRCS ${CC_SRCS}
INCS .. ../../src
LDFLAGS ${APPLE_FRAMEWORK_LIBS}
)
add_dependencies(${CC_TARGET} data_generator)
add_dependencies(${CC_TARGET} collection_optimizer)
cc_test_suite(zvec_crash_recovery ${CC_TARGET})
endforeach()
106 changes: 106 additions & 0 deletions tests/db/crash_recovery/collection_optimizer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2025-present the zvec project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


#include <unistd.h>
#include <filesystem>
#include <zvec/db/collection.h>
#include <zvec/db/options.h>
#include "zvec/ailego/logger/logger.h"


struct Config {
std::string path;
};


bool ParseArgs(int argc, char **argv, Config &config) {
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];

if (arg == "--path" && i + 1 < argc) {
config.path = argv[++i];
} else if (arg == "--help" || arg == "-h") {
return false;
}
}

// Validate required arguments
if (config.path.empty()) {
return false;
}

return true;
}


void PrintUsage(const char *program) {
std::cout << "Usage: " << program << " --path <collection_path>" << std::endl;
std::cout << std::endl;
std::cout << "Arguments:" << std::endl;
std::cout << " --path Path to the collection (required)"
<< std::endl;
}


int main(int argc, char **argv) {
Config config;

// Parse arguments
if (!ParseArgs(argc, argv, config)) {
PrintUsage(argv[0]);
return 1;
}

try {
std::filesystem::path cwd = std::filesystem::current_path();
std::cout << "[collection_optimizer] Current Working Directory: "
<< cwd.string() << std::endl;
} catch (const std::filesystem::filesystem_error &e) {
std::cout << "[collection_optimizer] Failed to get the current working "
"directory: "
<< e.what() << std::endl;
}

std::cout << "Configuration:" << std::endl;
std::cout << " Path: " << config.path << std::endl;
std::cout << std::endl;

auto result =
zvec::Collection::Open(config.path, zvec::CollectionOptions{false, true});
Comment on lines +80 to +81
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 CollectionOptions mismatch with test configuration

The test file optimize_recovery_test.cc creates and reopens the collection with zvec::CollectionOptions{false, true, 256 * 1024} (a max-doc-per-segment limit of 256 * 1024), but the collection_optimizer binary opens the same on-disk collection with only zvec::CollectionOptions{false, true} (no segment size limit). This mismatch means the optimizer may behave differently than expected during the crash recovery test — for example, it might not trigger segment compaction at the same thresholds, leading to a test that doesn't actually exercise the intended code paths.

Suggested change
auto result =
zvec::Collection::Open(config.path, zvec::CollectionOptions{false, true});
zvec::Collection::Open(config.path, zvec::CollectionOptions{false, true, 256 * 1024});

if (!result) {
LOG_ERROR("Failed to open collection[%s]: %s", config.path.c_str(),
result.error().c_str());
return -1;
}

auto collection = result.value();
std::cout << "Collection[" << config.path.c_str() << "] opened successfully"
<< std::endl;

// Print initial stats
std::cout << "Initial stats: " << collection->Stats()->to_string_formatted()
<< std::endl;

auto s = collection->Optimize();
if (s.ok()) {
std::cout << "Optimize completed successfully" << std::endl;
// Print final stats
std::cout << "Final stats: " << collection->Stats()->to_string_formatted();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing newline after final stats output

The final stats line has no std::endl or "\n", so the output may be buffered and mixed with subsequent output (e.g., the process's exit trace) without a line break. All other output statements in this file include std::endl.

Suggested change
std::cout << "Final stats: " << collection->Stats()->to_string_formatted();
std::cout << "Final stats: " << collection->Stats()->to_string_formatted() << std::endl;

return 0;
} else {
std::cout << "Optimize failed: " << s.message() << std::endl;
return 1;
}
}
Loading
Loading