diff --git a/src/core/utility/buffer_storage.cc b/src/core/utility/buffer_storage.cc index 12a84bc7..a20a0316 100644 --- a/src/core/utility/buffer_storage.cc +++ b/src/core/utility/buffer_storage.cc @@ -187,7 +187,7 @@ class BufferStorage : public IndexStorage { } //! Open storage - int open(const std::string &path, bool /*create*/) override { + int open(const std::string &path, bool /*create_if_missing*/) override { file_name_ = path; buffer_pool_ = std::make_shared(path); buffer_pool_handle_ = std::make_shared( diff --git a/src/core/utility/mmap_file_storage.cc b/src/core/utility/mmap_file_storage.cc index e4a8e238..9a1261f4 100644 --- a/src/core/utility/mmap_file_storage.cc +++ b/src/core/utility/mmap_file_storage.cc @@ -171,8 +171,8 @@ class MMapFileStorage : public IndexStorage { } //! Open storage - int open(const std::string &path, bool create) override { - if (!ailego::File::IsExist(path) && create) { + int open(const std::string &path, bool create_if_missing) override { + if (!ailego::File::IsExist(path) && create_if_missing) { size_t last_slash = path.rfind('/'); if (last_slash != std::string::npos) { ailego::File::MakePath(path.substr(0, last_slash)); diff --git a/src/db/collection.cc b/src/db/collection.cc index 1ff0eecb..e510a228 100644 --- a/src/db/collection.cc +++ b/src/db/collection.cc @@ -958,7 +958,7 @@ std::vector CollectionImpl::build_compact_task( if (current_actual_doc_count + actual_doc_count > max_doc_count_per_segment) { // only create SegmentCompactTask when rebuild=true - task = SegmentTask::CreateComapctTask( + task = SegmentTask::CreateCompactTask( CompactTask{path_, schema, current_group, allocate_segment_id_for_tmp_segment(), filter, !options_.enable_mmap_, concurrency}); @@ -972,7 +972,7 @@ std::vector CollectionImpl::build_compact_task( current_group[0], "", nullptr, concurrency}); skip_task = current_group[0]->all_vector_index_ready(); } else { - task = SegmentTask::CreateComapctTask( + task = SegmentTask::CreateCompactTask( CompactTask{path_, schema, current_group, allocate_segment_id_for_tmp_segment(), nullptr, !options_.enable_mmap_, concurrency}); @@ -1001,7 +1001,7 @@ std::vector CollectionImpl::build_compact_task( task = SegmentTask::CreateCreateVectorIndexTask( CreateVectorIndexTask{current_group[0], "", nullptr, concurrency}); } else { - task = SegmentTask::CreateComapctTask(CompactTask{ + task = SegmentTask::CreateCompactTask(CompactTask{ path_, schema, current_group, allocate_segment_id_for_tmp_segment(), rebuild ? filter : nullptr, !options_.enable_mmap_, concurrency}); } diff --git a/src/db/index/common/doc.cc b/src/db/index/common/doc.cc index 2f9f12b0..39e7097f 100644 --- a/src/db/index/common/doc.cc +++ b/src/db/index/common/doc.cc @@ -1154,7 +1154,7 @@ struct Doc::ValueEqual { const std::vector &b) const { if (a.size() != b.size()) return false; for (size_t i = 0; i < a.size(); ++i) - if (std::fabs(a[i] - b[i]) >= 1e-6f) return false; + if (std::fabs(a[i] - b[i]) >= 1e-4f) return false; return true; } @@ -1162,7 +1162,7 @@ struct Doc::ValueEqual { const std::vector &b) const { if (a.size() != b.size()) return false; for (size_t i = 0; i < a.size(); ++i) - if (std::fabs(a[i] - b[i]) >= 1e-9) return false; + if (std::fabs(a[i] - b[i]) >= 1e-6) return false; return true; } }; diff --git a/src/db/index/segment/segment.cc b/src/db/index/segment/segment.cc index b3919d1e..748c9d88 100644 --- a/src/db/index/segment/segment.cc +++ b/src/db/index/segment/segment.cc @@ -213,7 +213,7 @@ class SegmentImpl : public Segment, const std::vector &indices) const override; ExecBatchPtr fetch(const std::vector &columns, - int indice) const override; + int index) const override; RecordBatchReaderPtr scan( const std::vector &columns) const override; @@ -1631,6 +1631,13 @@ Status SegmentImpl::create_vector_index( std::string index_file_path = FileHelper::MakeVectorIndexPath( path_, column, segment_meta_->id(), block_id); + if (FileHelper::FileExists(index_file_path)) { + LOG_WARN( + "Index file[%s] already exists (possible crash residue); cleaning " + "and overwriting.", + index_file_path.c_str()); + FileHelper::RemoveFile(index_file_path); + } auto vector_indexer = merge_vector_indexer( index_file_path, column, *field_with_new_index_params, concurrency); if (!vector_indexer.has_value()) { @@ -1668,6 +1675,13 @@ Status SegmentImpl::create_vector_index( std::string index_file_path = FileHelper::MakeVectorIndexPath( path_, column, segment_meta_->id(), block_id); + if (FileHelper::FileExists(index_file_path)) { + LOG_WARN( + "Index file[%s] already exists (possible crash residue); cleaning " + "and overwriting.", + index_file_path.c_str()); + FileHelper::RemoveFile(index_file_path); + } auto vector_indexer = merge_vector_indexer(index_file_path, column, *field_with_flat, concurrency); if (!vector_indexer.has_value()) { @@ -2031,7 +2045,7 @@ Status SegmentImpl::create_scalar_index(const std::vector &columns, s = SegmentHelper::ReduceScalarIndex(new_scalar_indexer, batch_value, accu_doc_count); if (!s.ok()) { - LOG_ERROR("Reduce Scalar Index faield, err: %s", s.message().c_str()); + LOG_ERROR("Reduce Scalar Index failed, err: %s", s.message().c_str()); } CHECK_RETURN_STATUS(s); diff --git a/src/db/index/segment/segment.h b/src/db/index/segment/segment.h index b59058db..263463ea 100644 --- a/src/db/index/segment/segment.h +++ b/src/db/index/segment/segment.h @@ -77,7 +77,7 @@ class Segment { virtual Status drop_column(const std::string &column_name) = 0; virtual Status create_all_vector_index( - int concurrency, SegmentMeta::Ptr *new_segmnet_meta, + int concurrency, SegmentMeta::Ptr *new_segment_meta, std::unordered_map *vector_indexers, std::unordered_map @@ -86,14 +86,14 @@ class Segment { // defined in segment.h cause it needs to access block_id generator virtual Status create_vector_index( const std::string &column, const IndexParams::Ptr &index_params, - int concurrency, SegmentMeta::Ptr *new_segmnet_meta, + int concurrency, SegmentMeta::Ptr *new_segment_meta, std::unordered_map *vector_indexers, std::unordered_map *quant_vector_indexers) = 0; virtual Status drop_vector_index( - const std::string &column, SegmentMeta::Ptr *new_segmnet_meta, + const std::string &column, SegmentMeta::Ptr *new_segment_meta, std::unordered_map *vector_indexers) = 0; diff --git a/src/db/index/segment/segment_helper.h b/src/db/index/segment/segment_helper.h index 31b09631..f30d9a80 100644 --- a/src/db/index/segment/segment_helper.h +++ b/src/db/index/segment/segment_helper.h @@ -136,7 +136,7 @@ class SegmentTask { std::variant; - static Ptr CreateComapctTask(const CompactTask &task) { + static Ptr CreateCompactTask(const CompactTask &task) { return std::make_shared(task); } diff --git a/src/include/zvec/core/framework/index_storage.h b/src/include/zvec/core/framework/index_storage.h index 5e8b7728..8273004a 100644 --- a/src/include/zvec/core/framework/index_storage.h +++ b/src/include/zvec/core/framework/index_storage.h @@ -228,7 +228,7 @@ class IndexStorage : public IndexModule { virtual int cleanup(void) = 0; //! Open storage - virtual int open(const std::string &path, bool create) = 0; + virtual int open(const std::string &path, bool create_if_missing) = 0; //! Flush storage virtual int flush(void) = 0; diff --git a/tests/db/crash_recovery/CMakeLists.txt b/tests/db/crash_recovery/CMakeLists.txt index aebf0bc6..12c2423a 100644 --- a/tests/db/crash_recovery/CMakeLists.txt +++ b/tests/db/crash_recovery/CMakeLists.txt @@ -17,10 +17,9 @@ if(APPLE) endif() -# Build data_generator executable -cc_binary( - NAME data_generator - LIBS zvec_db +# Common libraries +set(CRASH_RECOVERY_COMMON_LIBS + zvec_db zvec_proto core_knn_flat core_knn_flat_sparse @@ -34,36 +33,41 @@ cc_binary( core_quantizer ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS} +) + + +# Build data_generator executable +cc_binary( + NAME data_generator + LIBS ${CRASH_RECOVERY_COMMON_LIBS} SRCS data_generator.cc INCS .. ../../src LDFLAGS ${APPLE_FRAMEWORK_LIBS} ) +# Build collection_optimizer executable +cc_binary( + NAME collection_optimizer + LIBS ${CRASH_RECOVERY_COMMON_LIBS} + SRCS collection_optimizer.cc + INCS .. ../../src + LDFLAGS ${APPLE_FRAMEWORK_LIBS} +) + + # Build test executables file(GLOB ALL_TEST_SRCS *_test.cc) foreach(CC_SRCS ${ALL_TEST_SRCS}) get_filename_component(CC_TARGET ${CC_SRCS} NAME_WE) cc_gmock( NAME ${CC_TARGET} STRICT - LIBS zvec_db - zvec_proto - core_knn_flat - core_knn_flat_sparse - core_knn_hnsw - core_knn_hnsw_sparse - core_knn_ivf - core_knn_hnsw_rabitq - core_mix_reducer - core_metric - core_utility - core_quantizer - ${CMAKE_THREAD_LIBS_INIT} - ${CMAKE_DL_LIBS} + LIBS ${CRASH_RECOVERY_COMMON_LIBS} SRCS ${CC_SRCS} INCS .. ../../src LDFLAGS ${APPLE_FRAMEWORK_LIBS} ) add_dependencies(${CC_TARGET} data_generator) + add_dependencies(${CC_TARGET} collection_optimizer) cc_test_suite(zvec_crash_recovery ${CC_TARGET}) endforeach() diff --git a/tests/db/crash_recovery/collection_optimizer.cc b/tests/db/crash_recovery/collection_optimizer.cc new file mode 100644 index 00000000..726d8b1b --- /dev/null +++ b/tests/db/crash_recovery/collection_optimizer.cc @@ -0,0 +1,106 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include +#include +#include +#include +#include "zvec/ailego/logger/logger.h" + + +struct Config { + std::string path; +}; + + +bool ParseArgs(int argc, char **argv, Config &config) { + for (int i = 1; i < argc; i++) { + std::string arg = argv[i]; + + if (arg == "--path" && i + 1 < argc) { + config.path = argv[++i]; + } else if (arg == "--help" || arg == "-h") { + return false; + } + } + + // Validate required arguments + if (config.path.empty()) { + return false; + } + + return true; +} + + +void PrintUsage(const char *program) { + std::cout << "Usage: " << program << " --path " << std::endl; + std::cout << std::endl; + std::cout << "Arguments:" << std::endl; + std::cout << " --path Path to the collection (required)" + << std::endl; +} + + +int main(int argc, char **argv) { + Config config; + + // Parse arguments + if (!ParseArgs(argc, argv, config)) { + PrintUsage(argv[0]); + return 1; + } + + try { + std::filesystem::path cwd = std::filesystem::current_path(); + std::cout << "[collection_optimizer] Current Working Directory: " + << cwd.string() << std::endl; + } catch (const std::filesystem::filesystem_error &e) { + std::cout << "[collection_optimizer] Failed to get the current working " + "directory: " + << e.what() << std::endl; + } + + std::cout << "Configuration:" << std::endl; + std::cout << " Path: " << config.path << std::endl; + std::cout << std::endl; + + auto result = + zvec::Collection::Open(config.path, zvec::CollectionOptions{false, true}); + if (!result) { + LOG_ERROR("Failed to open collection[%s]: %s", config.path.c_str(), + result.error().c_str()); + return -1; + } + + auto collection = result.value(); + std::cout << "Collection[" << config.path.c_str() << "] opened successfully" + << std::endl; + + // Print initial stats + std::cout << "Initial stats: " << collection->Stats()->to_string_formatted() + << std::endl; + + auto s = collection->Optimize(); + if (s.ok()) { + std::cout << "Optimize completed successfully" << std::endl; + // Print final stats + std::cout << "Final stats: " << collection->Stats()->to_string_formatted(); + return 0; + } else { + std::cout << "Optimize failed: " << s.message() << std::endl; + return 1; + } +} diff --git a/tests/db/crash_recovery/optimize_recovery_test.cc b/tests/db/crash_recovery/optimize_recovery_test.cc new file mode 100644 index 00000000..fdc078dd --- /dev/null +++ b/tests/db/crash_recovery/optimize_recovery_test.cc @@ -0,0 +1,210 @@ +// Copyright 2025-present the zvec project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + + +#include +#include +#include +#include +#include +#include +#include +#include "utility.h" + + +namespace zvec { + + +static std::string optimizer_bin_; +const std::string collection_name_{"optimize_recovery_test"}; +const std::string dir_path_{"optimize_recovery_test_db"}; +const zvec::CollectionOptions options_{false, true, 256 * 1024}; +const int batch_size{50}; +const int num_batches{2000}; + + +static std::string LocateOptimizeGenerator() { + namespace fs = std::filesystem; + const std::vector candidates{"./collection_optimizer", + "./bin/collection_optimizer"}; + for (const auto &p : candidates) { + if (fs::exists(p)) { + return fs::canonical(p).string(); + } + } + throw std::runtime_error("collection_optimizer binary not found"); +} + + +void RunOptimizer(const std::string &path) { + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { // Child process + char arg_path[] = "--path"; + char *args[] = {const_cast(optimizer_bin_.c_str()), arg_path, + const_cast(path.c_str()), nullptr}; + execvp(args[0], args); + perror("execvp failed"); + _exit(1); + } + + int status; + waitpid(pid, &status, 0); + ASSERT_TRUE(WIFEXITED(status)) + << "Child process did not exit normally. Terminated by signal?"; + int exit_code = WEXITSTATUS(status); + ASSERT_EQ(exit_code, 0) << "optimizer failed with exit code: " << exit_code; +} + + +void RunOptimizerAndCrash(const std::string &path, int seconds) { + pid_t pid = fork(); + ASSERT_GE(pid, 0); + + if (pid == 0) { // Child process + char arg_path[] = "--path"; + char *args[] = {const_cast(optimizer_bin_.c_str()), arg_path, + const_cast(path.c_str()), nullptr}; + execvp(args[0], args); + perror("execvp failed"); + _exit(1); + } + + std::this_thread::sleep_for(std::chrono::seconds(seconds)); + if (kill(pid, 0) == 0) { + kill(pid, SIGKILL); + } + int status; + waitpid(pid, &status, 0); + ASSERT_TRUE(WIFSIGNALED(status)) + << "Child process was not killed by a signal. It exited normally?"; +} + + +class OptimizeRecoveryTest : public ::testing::Test { + protected: + void SetUp() override { + system("rm -rf ./optimize_recovery_test_db"); + ASSERT_NO_THROW(optimizer_bin_ = LocateOptimizeGenerator()); + } + + void TearDown() override { + system("rm -rf ./optimize_recovery_test_db"); + } +}; + + +TEST_F(OptimizeRecoveryTest, CrashDuringOptimize) { + { // Create a collection and insert some documents + auto schema = CreateTestSchema(collection_name_); + auto result = Collection::CreateAndOpen(dir_path_, *schema, options_); + ASSERT_TRUE(result.has_value()); + auto collection = result.value(); + + for (int batch = 0; batch < num_batches; batch++) { + std::vector docs; + for (int i = 0; i < batch_size; i++) { + docs.push_back(CreateTestDoc(batch * batch_size + i, 0)); + } + auto write_result = collection->Insert(docs); + ASSERT_TRUE(write_result); + for (auto &s : write_result.value()) { + ASSERT_TRUE(s.ok()); + } + } + + ASSERT_EQ(collection->Stats()->doc_count, num_batches * batch_size); + collection.reset(); + } + + RunOptimizerAndCrash(dir_path_, 4); + + { // Open the collection and verify data integrity + auto result = Collection::Open(dir_path_, options_); + ASSERT_TRUE(result.has_value()) + << "Failed to reopen collection after crash. " + "Recovery mechanism may be broken."; + auto collection = result.value(); + uint64_t doc_count{collection->Stats().value().doc_count}; + ASSERT_EQ(doc_count, num_batches * batch_size); + for (uint64_t doc_id = 0; doc_id < doc_count; doc_id++) { + Doc expected_doc = CreateTestDoc(doc_id, 0); + std::vector pks{}; + pks.emplace_back(expected_doc.pk()); + if (auto res = collection->Fetch(pks); res) { + auto map = res.value(); + if (map.find(expected_doc.pk()) == map.end()) { + FAIL() << "Returned map does not contain doc[" << expected_doc.pk() + << "]"; + } + const auto actual_doc = map.at(expected_doc.pk()); + ASSERT_EQ(*actual_doc, expected_doc) + << "Data mismatch for doc[" << expected_doc.pk() << "]"; + } else { + FAIL() << "Failed to fetch doc[" << expected_doc.pk() << "]"; + } + } + + // Insert some more documents + for (int batch = num_batches; batch < num_batches + 1000; batch++) { + std::vector docs; + for (int i = 0; i < batch_size; i++) { + docs.push_back(CreateTestDoc(batch * batch_size + i, 0)); + } + auto write_result = collection->Insert(docs); + ASSERT_TRUE(write_result); + for (auto &s : write_result.value()) { + ASSERT_TRUE(s.ok()); + } + } + + collection.reset(); + } + + RunOptimizer(dir_path_); + + // Open the collection and verify data integrity + auto result = Collection::Open(dir_path_, options_); + ASSERT_TRUE(result.has_value()) << "Failed to reopen collection after crash. " + "Recovery mechanism may be broken."; + auto collection = result.value(); + uint64_t doc_count{collection->Stats().value().doc_count}; + ASSERT_EQ(doc_count, (num_batches + 1000) * batch_size); + + // for (int batch = num_batches; batch < (num_batches + 1000); batch++) { + // std::vector docs; + // for (int i = 0; i < batch_size; i++) { + // docs.push_back(CreateTestDoc(batch * batch_size + i, 0)); + // } + // auto write_result = collection->Insert(docs); + // ASSERT_TRUE(write_result); + // for (auto &s : write_result.value()) { + // ASSERT_TRUE(s.ok()) << s.message(); + // } + // } + + // VectorQuery query; + // query.output_fields_ = {"name", "age"}; + // query.filter_ = "invert_id >= 6000 and id < 6080"; + // query.topk_ = 100; + // std::vector feature(4, 0.0); + // query.query_vector_.assign((const char *)feature.data(), + // feature.size() * sizeof(float)); + // query.field_name_ = "dense"; + // collection->Query(); +} + + +} // namespace zvec diff --git a/tests/db/crash_recovery/write_recovery_test.cc b/tests/db/crash_recovery/write_recovery_test.cc index 1f53a5f4..6c11e0c4 100644 --- a/tests/db/crash_recovery/write_recovery_test.cc +++ b/tests/db/crash_recovery/write_recovery_test.cc @@ -27,9 +27,9 @@ namespace zvec { static std::string data_generator_bin_; -const std::string collection_name_{"crash_test"}; -const std::string dir_path_{"crash_test_db"}; -const zvec::CollectionOptions options_{false, true}; +const std::string collection_name_{"write_recovery_test"}; +const std::string dir_path_{"write_recovery_test_db"}; +const zvec::CollectionOptions options_{false, true, 256 * 1024}; static std::string LocateDataGenerator() { diff --git a/tests/db/index/segment/segment_helper_test.cc b/tests/db/index/segment/segment_helper_test.cc index 03f1c377..ca9902f2 100644 --- a/tests/db/index/segment/segment_helper_test.cc +++ b/tests/db/index/segment/segment_helper_test.cc @@ -121,7 +121,7 @@ TEST_F(SegmentHelperTest, CompactTask_General) { ); // Create segment task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); // Verify task creation ASSERT_TRUE(segment_task != nullptr); @@ -214,7 +214,7 @@ TEST_F(SegmentHelperTest, CompactTask_ScalarIndex) { ); // Create segment task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); // Verify task creation ASSERT_TRUE(segment_task != nullptr); @@ -307,7 +307,7 @@ TEST_F(SegmentHelperTest, CompactTask_VectorIndex) { ); // Create segment task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); // Verify task creation ASSERT_TRUE(segment_task != nullptr); @@ -395,7 +395,7 @@ TEST_F(SegmentHelperTest, CompactTask_MultipleSegments) { ); // Create segment task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); // Verify task creation ASSERT_TRUE(segment_task != nullptr); @@ -484,7 +484,7 @@ TEST_F(SegmentHelperTest, CompactTask_Filter) { ); // Create and execute task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); ASSERT_TRUE(segment_task != nullptr); Status status = SegmentHelper::Execute(segment_task); @@ -563,7 +563,7 @@ TEST_F(SegmentHelperTest, CompactTask_FilterAll) { ); // Create and execute task - auto segment_task = SegmentTask::CreateComapctTask(task); + auto segment_task = SegmentTask::CreateCompactTask(task); ASSERT_TRUE(segment_task != nullptr); Status status = SegmentHelper::Execute(segment_task);