Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storage/eloq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ elseif(WITH_DATA_STORE STREQUAL "ELOQDSS_ELOQSTORE")
store_handler/eloq_data_store_service/data_store_service_config.cpp
store_handler/eloq_data_store_service/eloq_store_data_store.cpp
store_handler/eloq_data_store_service/ds_request.pb.cc
store_handler/eloq_data_store_service/eloq_store_config.cpp
)
SET(MYELOQ_LIBRARY ${MYELOQ_LIBRARY} txservice eloqstore)

Expand Down
213 changes: 96 additions & 117 deletions storage/eloq/ha_eloq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ static unsigned int eloq_dss_rocksdb_max_write_buffer_number= 8;
static unsigned int eloq_dss_rocksdb_max_background_jobs= 8;
#if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
static unsigned int eloq_eloqstore_worker_num= 1;
static char *eloq_eloqstore_data_path= nullptr;
static char *eloq_eloqstore_data_path_list= nullptr;
static unsigned int eloq_eloqstore_open_files_limit= 1024;
static char *eloq_eloqstore_cloud_store_path= nullptr;
static unsigned int eloq_eloqstore_gc_threads= 1;
Expand Down Expand Up @@ -1026,9 +1026,9 @@ static MYSQL_SYSVAR_UINT(eloqstore_worker_num, eloq_eloqstore_worker_num,
"EloqStore server worker num.", NULL, NULL, 1, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_STR(
eloqstore_data_path, eloq_eloqstore_data_path,
eloqstore_data_path_list, eloq_eloqstore_data_path_list,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY | PLUGIN_VAR_MEMALLOC,
"The data path of the EloqStore (use memory store if empty).", NULL, NULL,
"The data paths of the EloqStore (use memory store if empty).", NULL, NULL,
"");
static MYSQL_SYSVAR_UINT(eloqstore_open_files_limit,
eloq_eloqstore_open_files_limit,
Expand Down Expand Up @@ -1253,7 +1253,7 @@ static struct st_mysql_sys_var *eloq_system_variables[]= {
MYSQL_SYSVAR(dss_rocksdb_max_background_jobs),
#if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
MYSQL_SYSVAR(eloqstore_worker_num),
MYSQL_SYSVAR(eloqstore_data_path),
MYSQL_SYSVAR(eloqstore_data_path_list),
MYSQL_SYSVAR(eloqstore_open_files_limit),
MYSQL_SYSVAR(eloqstore_cloud_store_path),
MYSQL_SYSVAR(eloqstore_gc_threads),
Expand Down Expand Up @@ -2310,6 +2310,94 @@ static void RegisterFactory()
txservice::TxRecordFactory::RegisterCreateTxRecordFunc(EloqRecord::Create);
}

#if defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
/**
* Configure EloqStore settings for the data store service
* @param eloq_store_config Reference to EloqStoreConfig to be configured
* @param dss_data_path Base data path for the data store service
*/
static void configure_eloq_store(EloqDS::EloqStoreConfig &eloq_store_config,
const std::string &dss_data_path)
{
eloq_store_config.eloqstore_configs_.num_threads=
std::max(eloq_eloqstore_worker_num, 1U);

std::string storage_path_list(eloq_eloqstore_data_path_list);
if (!storage_path_list.empty())
{
EloqDS::EloqStoreConfig::ParseStoragePath(
storage_path_list, eloq_store_config.eloqstore_configs_.store_path);
}
else
{
eloq_store_config.eloqstore_configs_.store_path.emplace_back()
.append(dss_data_path)
.append("/eloqstore");
}

eloq_store_config.eloqstore_configs_.fd_limit=
eloq_eloqstore_open_files_limit;
eloq_store_config.eloqstore_configs_.cloud_store_path=
eloq_eloqstore_cloud_store_path;
eloq_store_config.eloqstore_configs_.num_gc_threads=
!eloq_store_config.eloqstore_configs_.cloud_store_path.empty()
? 0
: eloq_eloqstore_gc_threads;
eloq_store_config.eloqstore_configs_.rclone_threads=
eloq_eloqstore_cloud_worker_count;

LOG_IF(INFO, !eloq_store_config.eloqstore_configs_.cloud_store_path.empty())
<< "EloqStore cloud store enabled.";

eloq_store_config.eloqstore_configs_.data_page_restart_interval=
eloq_eloqstore_data_page_restart_interval;
eloq_store_config.eloqstore_configs_.index_page_restart_interval=
eloq_eloqstore_index_page_restart_interval;
eloq_store_config.eloqstore_configs_.init_page_count=
eloq_eloqstore_init_page_count;
eloq_store_config.eloqstore_configs_.skip_verify_checksum=
eloq_eloqstore_skip_verify_checksum;
eloq_store_config.eloqstore_configs_.index_buffer_pool_size=
eloq_eloqstore_index_buffer_pool_size /
eloq_store_config.eloqstore_configs_.num_threads;
eloq_store_config.eloqstore_configs_.manifest_limit=
eloq_eloqstore_manifest_limit;
eloq_store_config.eloqstore_configs_.io_queue_size=
eloq_eloqstore_io_queue_size /
eloq_store_config.eloqstore_configs_.num_threads;
eloq_store_config.eloqstore_configs_.max_inflight_write=
eloq_eloqstore_max_inflight_write /
eloq_store_config.eloqstore_configs_.num_threads;
eloq_store_config.eloqstore_configs_.max_write_batch_pages=
eloq_eloqstore_max_write_batch_pages;
eloq_store_config.eloqstore_configs_.buf_ring_size=
eloq_eloqstore_buf_ring_size;
eloq_store_config.eloqstore_configs_.coroutine_stack_size=
eloq_eloqstore_coroutine_stack_size;
eloq_store_config.eloqstore_configs_.num_retained_archives=
eloq_eloqstore_num_retained_archives;
eloq_store_config.eloqstore_configs_.archive_interval_secs=
eloq_eloqstore_archive_interval_secs;
eloq_store_config.eloqstore_configs_.max_archive_tasks=
eloq_eloqstore_max_archive_tasks;
eloq_store_config.eloqstore_configs_.file_amplify_factor=
eloq_eloqstore_file_amplify_factor;
eloq_store_config.eloqstore_configs_.local_space_limit=
eloq_eloqstore_local_space_limit /
eloq_store_config.eloqstore_configs_.num_threads;
eloq_store_config.eloqstore_configs_.reserve_space_ratio=
eloq_eloqstore_reserve_space_ratio;
eloq_store_config.eloqstore_configs_.data_page_size=
eloq_eloqstore_data_page_size;
eloq_store_config.eloqstore_configs_.pages_per_file_shift=
eloq_eloqstore_pages_per_file_shift;
eloq_store_config.eloqstore_configs_.overflow_pointers=
eloq_eloqstore_overflow_pointers;
eloq_store_config.eloqstore_configs_.data_append_mode=
eloq_eloqstore_data_append_mode;
}
#endif

static int eloq_init_func(void *p)
{
DBUG_ENTER_FUNC();
Expand Down Expand Up @@ -2597,58 +2685,9 @@ static int eloq_init_func(void *p)
rocksdb_config, enable_cache_replacement_);
#elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
EloqDS::EloqStoreConfig eloq_store_config;
eloq_store_config.worker_count_= std::max(eloq_eloqstore_worker_num, 1U);
eloq_store_config.storage_path_= eloq_eloqstore_data_path;
if (eloq_store_config.storage_path_.empty())
{
eloq_store_config.storage_path_.append(dss_data_path)
.append("/eloqstore");
}

eloq_store_config.open_files_limit_= eloq_eloqstore_open_files_limit;
eloq_store_config.cloud_store_path_= eloq_eloqstore_cloud_store_path;
eloq_store_config.gc_threads_= !eloq_store_config.cloud_store_path_.empty()
? 0
: eloq_eloqstore_gc_threads;
eloq_store_config.cloud_worker_count_= eloq_eloqstore_cloud_worker_count;
LOG_IF(INFO, !eloq_store_config.cloud_store_path_.empty())
<< "EloqStore cloud store enabled.";
eloq_store_config.data_page_restart_interval_=
eloq_eloqstore_data_page_restart_interval;
eloq_store_config.index_page_restart_interval_=
eloq_eloqstore_index_page_restart_interval;
eloq_store_config.init_page_count_= eloq_eloqstore_init_page_count;
eloq_store_config.skip_verify_checksum_=
eloq_eloqstore_skip_verify_checksum;
eloq_store_config.index_buffer_pool_size_=
eloq_eloqstore_index_buffer_pool_size /
eloq_store_config.worker_count_;
eloq_store_config.manifest_limit_= eloq_eloqstore_manifest_limit;
eloq_store_config.io_queue_size_=
eloq_eloqstore_io_queue_size / eloq_store_config.worker_count_;
eloq_store_config.max_inflight_write_=
eloq_eloqstore_max_inflight_write / eloq_store_config.worker_count_;
eloq_store_config.max_write_batch_pages_=
eloq_eloqstore_max_write_batch_pages;
eloq_store_config.buf_ring_size_= eloq_eloqstore_buf_ring_size;
eloq_store_config.coroutine_stack_size_=
eloq_eloqstore_coroutine_stack_size;
eloq_store_config.num_retained_archives_=
eloq_eloqstore_num_retained_archives;
eloq_store_config.archive_interval_secs_=
eloq_eloqstore_archive_interval_secs;
eloq_store_config.max_archive_tasks_= eloq_eloqstore_max_archive_tasks;
eloq_store_config.file_amplify_factor_= eloq_eloqstore_file_amplify_factor;
eloq_store_config.local_space_limit_=
eloq_eloqstore_local_space_limit / eloq_store_config.worker_count_;
eloq_store_config.reserve_space_ratio_= eloq_eloqstore_reserve_space_ratio;
eloq_store_config.data_page_size_= eloq_eloqstore_data_page_size;
eloq_store_config.pages_per_file_shift_=
eloq_eloqstore_pages_per_file_shift;
eloq_store_config.overflow_pointers_= eloq_eloqstore_overflow_pointers;
eloq_store_config.data_append_mode_= eloq_eloqstore_data_append_mode;
auto ds_factory=
std::make_unique<EloqDS::EloqStoreDataStoreFactory>(eloq_store_config);
configure_eloq_store(eloq_store_config, dss_data_path);
auto ds_factory= std::make_unique<EloqDS::EloqStoreDataStoreFactory>(
std::move(eloq_store_config));
#endif

data_store_service_= std::make_unique<EloqDS::DataStoreService>(
Expand All @@ -2672,68 +2711,8 @@ static int eloq_init_func(void *p)
rocksdb_config, (opt_bootstrap || is_single_node),
enable_cache_replacement_, shard_id, data_store_service_.get());
#elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
DLOG(INFO) << "worker: " << eloq_store_config.worker_count_
<< ", path: " << eloq_store_config.storage_path_
<< ", max open files: " << eloq_store_config.open_files_limit_
<< ", cloud store path: "
<< eloq_store_config.cloud_store_path_
<< ", gc threads: " << eloq_store_config.gc_threads_
<< ", cloud worker count: "
<< eloq_store_config.cloud_worker_count_
<< ", buffer pool size per shard: "
<< eloq_store_config.index_buffer_pool_size_;
::eloqstore::KvOptions store_config;
store_config.num_threads= eloq_store_config.worker_count_;
store_config.store_path.emplace_back()
.append(eloq_store_config.storage_path_)
.append("/ds_")
.append(std::to_string(shard_id));
store_config.fd_limit= eloq_store_config.open_files_limit_;
if (!eloq_store_config.cloud_store_path_.empty())
{
store_config.cloud_store_path
.append(eloq_store_config.cloud_store_path_)
.append("/ds_")
.append(std::to_string(shard_id));
}
store_config.num_gc_threads= eloq_store_config.gc_threads_;
store_config.rclone_threads= eloq_store_config.cloud_worker_count_;
store_config.data_page_restart_interval=
eloq_store_config.data_page_restart_interval_;
store_config.index_page_restart_interval=
eloq_store_config.index_page_restart_interval_;
store_config.init_page_count= eloq_store_config.init_page_count_;
store_config.skip_verify_checksum=
eloq_store_config.skip_verify_checksum_;
store_config.index_buffer_pool_size=
eloq_store_config.index_buffer_pool_size_;
store_config.manifest_limit= eloq_store_config.manifest_limit_;
store_config.io_queue_size= eloq_store_config.io_queue_size_;
store_config.max_inflight_write= eloq_store_config.max_inflight_write_;
store_config.max_write_batch_pages=
eloq_store_config.max_write_batch_pages_;
store_config.buf_ring_size= eloq_store_config.buf_ring_size_;
store_config.coroutine_stack_size=
eloq_store_config.coroutine_stack_size_;
store_config.num_retained_archives=
eloq_store_config.num_retained_archives_;
store_config.archive_interval_secs=
eloq_store_config.archive_interval_secs_;
store_config.max_archive_tasks= eloq_store_config.max_archive_tasks_;
store_config.file_amplify_factor= eloq_store_config.file_amplify_factor_;
store_config.local_space_limit= eloq_store_config.local_space_limit_;
store_config.reserve_space_ratio= eloq_store_config.reserve_space_ratio_;
store_config.data_page_size= eloq_store_config.data_page_size_;
store_config.pages_per_file_shift=
eloq_store_config.pages_per_file_shift_;
store_config.overflow_pointers= eloq_store_config.overflow_pointers_;
store_config.data_append_mode= eloq_store_config.data_append_mode_;
if (eloq_store_config.comparator_ != nullptr)
{
store_config.comparator_= eloq_store_config.comparator_;
}
auto ds= std::make_unique<EloqDS::EloqStoreDataStore>(
shard_id, data_store_service_.get(), store_config);
shard_id, data_store_service_.get());
#endif
ds->Initialize();

Expand Down