diff --git a/storage/eloq/CMakeLists.txt b/storage/eloq/CMakeLists.txt index 8c0b966474a..7830ab1a697 100644 --- a/storage/eloq/CMakeLists.txt +++ b/storage/eloq/CMakeLists.txt @@ -329,6 +329,7 @@ elseif(WITH_DATA_STORE STREQUAL "ELOQDSS_ELOQSTORE") store_handler/eloq_data_store_service/data_store_service_config.cpp store_handler/eloq_data_store_service/eloq_store_data_store.cpp store_handler/eloq_data_store_service/ds_request.pb.cc + store_handler/eloq_data_store_service/eloq_store_config.cpp ) SET(MYELOQ_LIBRARY ${MYELOQ_LIBRARY} txservice eloqstore) diff --git a/storage/eloq/ha_eloq.cc b/storage/eloq/ha_eloq.cc index 47d5ee248b4..611c5d3ccd4 100644 --- a/storage/eloq/ha_eloq.cc +++ b/storage/eloq/ha_eloq.cc @@ -365,7 +365,7 @@ static unsigned int eloq_dss_rocksdb_max_write_buffer_number= 8; static unsigned int eloq_dss_rocksdb_max_background_jobs= 8; #if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) static unsigned int eloq_eloqstore_worker_num= 1; -static char *eloq_eloqstore_data_path= nullptr; +static char *eloq_eloqstore_data_path_list= nullptr; static unsigned int eloq_eloqstore_open_files_limit= 1024; static char *eloq_eloqstore_cloud_store_path= nullptr; static unsigned int eloq_eloqstore_gc_threads= 1; @@ -1026,9 +1026,9 @@ static MYSQL_SYSVAR_UINT(eloqstore_worker_num, eloq_eloqstore_worker_num, "EloqStore server worker num.", NULL, NULL, 1, 1, UINT_MAX, 1); static MYSQL_SYSVAR_STR( - eloqstore_data_path, eloq_eloqstore_data_path, + eloqstore_data_path_list, eloq_eloqstore_data_path_list, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC, - "The data path of the EloqStore (use memory store if empty).", NULL, NULL, + "The data paths of the EloqStore (use memory store if empty).", NULL, NULL, ""); static MYSQL_SYSVAR_UINT(eloqstore_open_files_limit, eloq_eloqstore_open_files_limit, @@ -1253,7 +1253,7 @@ static struct st_mysql_sys_var *eloq_system_variables[]= { MYSQL_SYSVAR(dss_rocksdb_max_background_jobs), #if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) MYSQL_SYSVAR(eloqstore_worker_num), - MYSQL_SYSVAR(eloqstore_data_path), + MYSQL_SYSVAR(eloqstore_data_path_list), MYSQL_SYSVAR(eloqstore_open_files_limit), MYSQL_SYSVAR(eloqstore_cloud_store_path), MYSQL_SYSVAR(eloqstore_gc_threads), @@ -2310,6 +2310,94 @@ static void RegisterFactory() txservice::TxRecordFactory::RegisterCreateTxRecordFunc(EloqRecord::Create); } +#if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) +/** + * Configure EloqStore settings for the data store service + * @param eloq_store_config Reference to EloqStoreConfig to be configured + * @param dss_data_path Base data path for the data store service + */ +static void configure_eloq_store(EloqDS::EloqStoreConfig &eloq_store_config, + const std::string &dss_data_path) +{ + eloq_store_config.eloqstore_configs_.num_threads= + std::max(eloq_eloqstore_worker_num, 1U); + + std::string storage_path_list(eloq_eloqstore_data_path_list); + if (!storage_path_list.empty()) + { + EloqDS::EloqStoreConfig::ParseStoragePath( + storage_path_list, eloq_store_config.eloqstore_configs_.store_path); + } + else + { + eloq_store_config.eloqstore_configs_.store_path.emplace_back() + .append(dss_data_path) + .append("/eloqstore"); + } + + eloq_store_config.eloqstore_configs_.fd_limit= + eloq_eloqstore_open_files_limit; + eloq_store_config.eloqstore_configs_.cloud_store_path= + eloq_eloqstore_cloud_store_path; + eloq_store_config.eloqstore_configs_.num_gc_threads= + !eloq_store_config.eloqstore_configs_.cloud_store_path.empty() + ? 0 + : eloq_eloqstore_gc_threads; + eloq_store_config.eloqstore_configs_.rclone_threads= + eloq_eloqstore_cloud_worker_count; + + LOG_IF(INFO, !eloq_store_config.eloqstore_configs_.cloud_store_path.empty()) + << "EloqStore cloud store enabled."; + + eloq_store_config.eloqstore_configs_.data_page_restart_interval= + eloq_eloqstore_data_page_restart_interval; + eloq_store_config.eloqstore_configs_.index_page_restart_interval= + eloq_eloqstore_index_page_restart_interval; + eloq_store_config.eloqstore_configs_.init_page_count= + eloq_eloqstore_init_page_count; + eloq_store_config.eloqstore_configs_.skip_verify_checksum= + eloq_eloqstore_skip_verify_checksum; + eloq_store_config.eloqstore_configs_.index_buffer_pool_size= + eloq_eloqstore_index_buffer_pool_size / + eloq_store_config.eloqstore_configs_.num_threads; + eloq_store_config.eloqstore_configs_.manifest_limit= + eloq_eloqstore_manifest_limit; + eloq_store_config.eloqstore_configs_.io_queue_size= + eloq_eloqstore_io_queue_size / + eloq_store_config.eloqstore_configs_.num_threads; + eloq_store_config.eloqstore_configs_.max_inflight_write= + eloq_eloqstore_max_inflight_write / + eloq_store_config.eloqstore_configs_.num_threads; + eloq_store_config.eloqstore_configs_.max_write_batch_pages= + eloq_eloqstore_max_write_batch_pages; + eloq_store_config.eloqstore_configs_.buf_ring_size= + eloq_eloqstore_buf_ring_size; + eloq_store_config.eloqstore_configs_.coroutine_stack_size= + eloq_eloqstore_coroutine_stack_size; + eloq_store_config.eloqstore_configs_.num_retained_archives= + eloq_eloqstore_num_retained_archives; + eloq_store_config.eloqstore_configs_.archive_interval_secs= + eloq_eloqstore_archive_interval_secs; + eloq_store_config.eloqstore_configs_.max_archive_tasks= + eloq_eloqstore_max_archive_tasks; + eloq_store_config.eloqstore_configs_.file_amplify_factor= + eloq_eloqstore_file_amplify_factor; + eloq_store_config.eloqstore_configs_.local_space_limit= + eloq_eloqstore_local_space_limit / + eloq_store_config.eloqstore_configs_.num_threads; + eloq_store_config.eloqstore_configs_.reserve_space_ratio= + eloq_eloqstore_reserve_space_ratio; + eloq_store_config.eloqstore_configs_.data_page_size= + eloq_eloqstore_data_page_size; + eloq_store_config.eloqstore_configs_.pages_per_file_shift= + eloq_eloqstore_pages_per_file_shift; + eloq_store_config.eloqstore_configs_.overflow_pointers= + eloq_eloqstore_overflow_pointers; + eloq_store_config.eloqstore_configs_.data_append_mode= + eloq_eloqstore_data_append_mode; +} +#endif + static int eloq_init_func(void *p) { DBUG_ENTER_FUNC(); @@ -2597,58 +2685,9 @@ static int eloq_init_func(void *p) rocksdb_config, enable_cache_replacement_); #elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) EloqDS::EloqStoreConfig eloq_store_config; - eloq_store_config.worker_count_= std::max(eloq_eloqstore_worker_num, 1U); - eloq_store_config.storage_path_= eloq_eloqstore_data_path; - if (eloq_store_config.storage_path_.empty()) - { - eloq_store_config.storage_path_.append(dss_data_path) - .append("/eloqstore"); - } - - eloq_store_config.open_files_limit_= eloq_eloqstore_open_files_limit; - eloq_store_config.cloud_store_path_= eloq_eloqstore_cloud_store_path; - eloq_store_config.gc_threads_= !eloq_store_config.cloud_store_path_.empty() - ? 0 - : eloq_eloqstore_gc_threads; - eloq_store_config.cloud_worker_count_= eloq_eloqstore_cloud_worker_count; - LOG_IF(INFO, !eloq_store_config.cloud_store_path_.empty()) - << "EloqStore cloud store enabled."; - eloq_store_config.data_page_restart_interval_= - eloq_eloqstore_data_page_restart_interval; - eloq_store_config.index_page_restart_interval_= - eloq_eloqstore_index_page_restart_interval; - eloq_store_config.init_page_count_= eloq_eloqstore_init_page_count; - eloq_store_config.skip_verify_checksum_= - eloq_eloqstore_skip_verify_checksum; - eloq_store_config.index_buffer_pool_size_= - eloq_eloqstore_index_buffer_pool_size / - eloq_store_config.worker_count_; - eloq_store_config.manifest_limit_= eloq_eloqstore_manifest_limit; - eloq_store_config.io_queue_size_= - eloq_eloqstore_io_queue_size / eloq_store_config.worker_count_; - eloq_store_config.max_inflight_write_= - eloq_eloqstore_max_inflight_write / eloq_store_config.worker_count_; - eloq_store_config.max_write_batch_pages_= - eloq_eloqstore_max_write_batch_pages; - eloq_store_config.buf_ring_size_= eloq_eloqstore_buf_ring_size; - eloq_store_config.coroutine_stack_size_= - eloq_eloqstore_coroutine_stack_size; - eloq_store_config.num_retained_archives_= - eloq_eloqstore_num_retained_archives; - eloq_store_config.archive_interval_secs_= - eloq_eloqstore_archive_interval_secs; - eloq_store_config.max_archive_tasks_= eloq_eloqstore_max_archive_tasks; - eloq_store_config.file_amplify_factor_= eloq_eloqstore_file_amplify_factor; - eloq_store_config.local_space_limit_= - eloq_eloqstore_local_space_limit / eloq_store_config.worker_count_; - eloq_store_config.reserve_space_ratio_= eloq_eloqstore_reserve_space_ratio; - eloq_store_config.data_page_size_= eloq_eloqstore_data_page_size; - eloq_store_config.pages_per_file_shift_= - eloq_eloqstore_pages_per_file_shift; - eloq_store_config.overflow_pointers_= eloq_eloqstore_overflow_pointers; - eloq_store_config.data_append_mode_= eloq_eloqstore_data_append_mode; - auto ds_factory= - std::make_unique(eloq_store_config); + configure_eloq_store(eloq_store_config, dss_data_path); + auto ds_factory= std::make_unique( + std::move(eloq_store_config)); #endif data_store_service_= std::make_unique( @@ -2672,68 +2711,8 @@ static int eloq_init_func(void *p) rocksdb_config, (opt_bootstrap || is_single_node), enable_cache_replacement_, shard_id, data_store_service_.get()); #elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE) - DLOG(INFO) << "worker: " << eloq_store_config.worker_count_ - << ", path: " << eloq_store_config.storage_path_ - << ", max open files: " << eloq_store_config.open_files_limit_ - << ", cloud store path: " - << eloq_store_config.cloud_store_path_ - << ", gc threads: " << eloq_store_config.gc_threads_ - << ", cloud worker count: " - << eloq_store_config.cloud_worker_count_ - << ", buffer pool size per shard: " - << eloq_store_config.index_buffer_pool_size_; - ::eloqstore::KvOptions store_config; - store_config.num_threads= eloq_store_config.worker_count_; - store_config.store_path.emplace_back() - .append(eloq_store_config.storage_path_) - .append("/ds_") - .append(std::to_string(shard_id)); - store_config.fd_limit= eloq_store_config.open_files_limit_; - if (!eloq_store_config.cloud_store_path_.empty()) - { - store_config.cloud_store_path - .append(eloq_store_config.cloud_store_path_) - .append("/ds_") - .append(std::to_string(shard_id)); - } - store_config.num_gc_threads= eloq_store_config.gc_threads_; - store_config.rclone_threads= eloq_store_config.cloud_worker_count_; - store_config.data_page_restart_interval= - eloq_store_config.data_page_restart_interval_; - store_config.index_page_restart_interval= - eloq_store_config.index_page_restart_interval_; - store_config.init_page_count= eloq_store_config.init_page_count_; - store_config.skip_verify_checksum= - eloq_store_config.skip_verify_checksum_; - store_config.index_buffer_pool_size= - eloq_store_config.index_buffer_pool_size_; - store_config.manifest_limit= eloq_store_config.manifest_limit_; - store_config.io_queue_size= eloq_store_config.io_queue_size_; - store_config.max_inflight_write= eloq_store_config.max_inflight_write_; - store_config.max_write_batch_pages= - eloq_store_config.max_write_batch_pages_; - store_config.buf_ring_size= eloq_store_config.buf_ring_size_; - store_config.coroutine_stack_size= - eloq_store_config.coroutine_stack_size_; - store_config.num_retained_archives= - eloq_store_config.num_retained_archives_; - store_config.archive_interval_secs= - eloq_store_config.archive_interval_secs_; - store_config.max_archive_tasks= eloq_store_config.max_archive_tasks_; - store_config.file_amplify_factor= eloq_store_config.file_amplify_factor_; - store_config.local_space_limit= eloq_store_config.local_space_limit_; - store_config.reserve_space_ratio= eloq_store_config.reserve_space_ratio_; - store_config.data_page_size= eloq_store_config.data_page_size_; - store_config.pages_per_file_shift= - eloq_store_config.pages_per_file_shift_; - store_config.overflow_pointers= eloq_store_config.overflow_pointers_; - store_config.data_append_mode= eloq_store_config.data_append_mode_; - if (eloq_store_config.comparator_ != nullptr) - { - store_config.comparator_= eloq_store_config.comparator_; - } auto ds= std::make_unique( - shard_id, data_store_service_.get(), store_config); + shard_id, data_store_service_.get()); #endif ds->Initialize(); diff --git a/storage/eloq/store_handler b/storage/eloq/store_handler index fa7a7242455..8710d8b6cf4 160000 --- a/storage/eloq/store_handler +++ b/storage/eloq/store_handler @@ -1 +1 @@ -Subproject commit fa7a7242455ce40afc70f3f53c3affb9eb67ef21 +Subproject commit 8710d8b6cf4c3b9bbbec7e94933fa19734a3e067