Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 219 additions & 6 deletions storage/eloq/ha_eloq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ static char *eloq_dss_rocksdb_cloud_endpoint_url= nullptr;
static char *eloq_dss_rocksdb_cloud_sst_file_cache_size= nullptr;
static int eloq_dss_rocksdb_cloud_sst_file_cache_num_shard_bits= 5;
static char *eloq_dss_rocksdb_target_file_size_base= nullptr;
static unsigned int eloq_dss_rocksdb_cloud_purger_periodicity_secs= 600; // 10 minutes
static unsigned int eloq_dss_rocksdb_cloud_purger_periodicity_secs=
600; // 10 minutes
static unsigned int eloq_dss_rocksdb_cloud_file_deletion_delay= 3600;
static unsigned int eloq_dss_rocksdb_max_write_buffer_number= 8;
static unsigned int eloq_dss_rocksdb_max_background_jobs= 8;
Expand All @@ -369,11 +370,31 @@ static unsigned int eloq_eloqstore_open_files_limit= 1024;
static char *eloq_eloqstore_cloud_store_path= nullptr;
static unsigned int eloq_eloqstore_gc_threads= 1;
static unsigned int eloq_eloqstore_cloud_worker_count= 1;
static unsigned int eloq_eloqstore_data_page_restart_interval= 16;
static unsigned int eloq_eloqstore_index_page_restart_interval= 16;
static unsigned int eloq_eloqstore_init_page_count= 1 << 15;
static my_bool eloq_eloqstore_skip_verify_checksum= false;
static unsigned int eloq_eloqstore_index_buffer_pool_size= 1 << 15;
static unsigned int eloq_eloqstore_manifest_limit= 8 << 20;
static unsigned int eloq_eloqstore_io_queue_size= 4096;
static unsigned int eloq_eloqstore_max_inflight_write= 64 << 10;
static unsigned int eloq_eloqstore_max_write_batch_pages= 256;
static unsigned int eloq_eloqstore_buf_ring_size= 1 << 12;
static unsigned int eloq_eloqstore_coroutine_stack_size= 32 * 1024;
static unsigned int eloq_eloqstore_num_retained_archives= 0;
static unsigned int eloq_eloqstore_archive_interval_secs= 86400;
static unsigned int eloq_eloqstore_max_archive_tasks= 256;
static unsigned int eloq_eloqstore_file_amplify_factor= 4;
static unsigned long long eloq_eloqstore_local_space_limit= 1ULL << 40;
static unsigned int eloq_eloqstore_reserve_space_ratio= 100;
static unsigned int eloq_eloqstore_data_page_size= 1 << 12;
static unsigned int eloq_eloqstore_pages_per_file_shift= 11;
static unsigned int eloq_eloqstore_overflow_pointers= 16;
static my_bool eloq_eloqstore_data_append_mode= false;
#endif

const char *enum_var_names[]= {"e1", "e2", NullS};
const char *kv_storage_names[]= { "dynamo", "bigtable", "eloqds",
NullS};
const char *kv_storage_names[]= {"dynamo", "bigtable", "eloqds", NullS};
const char *partition_names[]= {"Hash", "Range", NullS};

#define KV_DYNAMO 0
Expand Down Expand Up @@ -1029,6 +1050,107 @@ static MYSQL_SYSVAR_UINT(eloqstore_cloud_worker_count,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore server cloud worker count", NULL, NULL, 1,
1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_data_page_restart_interval,
eloq_eloqstore_data_page_restart_interval,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore data page restart interval", NULL, NULL,
16, 1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_index_page_restart_interval,
eloq_eloqstore_index_page_restart_interval,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore index page restart interval", NULL, NULL,
16, 1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_init_page_count,
eloq_eloqstore_init_page_count,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore init page count", NULL, NULL, 1 << 15, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_BOOL(eloqstore_skip_verify_checksum,
eloq_eloqstore_skip_verify_checksum,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore skip verify checksum", NULL, NULL, FALSE);
static MYSQL_SYSVAR_UINT(eloqstore_index_buffer_pool_size,
eloq_eloqstore_index_buffer_pool_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore index buffer pool size", NULL, NULL,
1 << 15, 1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_manifest_limit,
eloq_eloqstore_manifest_limit,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore manifest limit", NULL, NULL, 8 << 20, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_io_queue_size, eloq_eloqstore_io_queue_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore io queue size", NULL, NULL, 4096, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_max_inflight_write,
eloq_eloqstore_max_inflight_write,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore max inflight write", NULL, NULL, 64 << 10,
1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_max_write_batch_pages,
eloq_eloqstore_max_write_batch_pages,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore max write batch pages", NULL, NULL, 256, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_buf_ring_size, eloq_eloqstore_buf_ring_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore buf ring size", NULL, NULL, 1 << 12, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_coroutine_stack_size,
eloq_eloqstore_coroutine_stack_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore coroutine stack size", NULL, NULL,
32 * 1024, 1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_num_retained_archives,
eloq_eloqstore_num_retained_archives,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore num retained archives", NULL, NULL, 0, 0,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_archive_interval_secs,
eloq_eloqstore_archive_interval_secs,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore archive interval secs", NULL, NULL, 86400,
1, UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_max_archive_tasks,
eloq_eloqstore_max_archive_tasks,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore max archive tasks", NULL, NULL, 256, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_file_amplify_factor,
eloq_eloqstore_file_amplify_factor,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore file amplify factor", NULL, NULL, 4, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_ULONGLONG(eloqstore_local_space_limit,
eloq_eloqstore_local_space_limit,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore local space limit", NULL, NULL,
1ULL << 40, 1, ULONGLONG_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_reserve_space_ratio,
eloq_eloqstore_reserve_space_ratio,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore reserve space ratio", NULL, NULL, 100, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_data_page_size,
eloq_eloqstore_data_page_size,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore data page size", NULL, NULL, 1 << 12, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_pages_per_file_shift,
eloq_eloqstore_pages_per_file_shift,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore pages per file shift", NULL, NULL, 11, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_UINT(eloqstore_overflow_pointers,
eloq_eloqstore_overflow_pointers,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore overflow pointers", NULL, NULL, 16, 1,
UINT_MAX, 1);
static MYSQL_SYSVAR_BOOL(eloqstore_data_append_mode,
eloq_eloqstore_data_append_mode,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"EloqStore data append mode", NULL, NULL, FALSE);
#endif

static struct st_mysql_sys_var *eloq_system_variables[]= {
Expand Down Expand Up @@ -1136,6 +1258,27 @@ static struct st_mysql_sys_var *eloq_system_variables[]= {
MYSQL_SYSVAR(eloqstore_cloud_store_path),
MYSQL_SYSVAR(eloqstore_gc_threads),
MYSQL_SYSVAR(eloqstore_cloud_worker_count),
MYSQL_SYSVAR(eloqstore_data_page_restart_interval),
MYSQL_SYSVAR(eloqstore_index_page_restart_interval),
MYSQL_SYSVAR(eloqstore_init_page_count),
MYSQL_SYSVAR(eloqstore_skip_verify_checksum),
MYSQL_SYSVAR(eloqstore_index_buffer_pool_size),
MYSQL_SYSVAR(eloqstore_manifest_limit),
MYSQL_SYSVAR(eloqstore_io_queue_size),
MYSQL_SYSVAR(eloqstore_max_inflight_write),
MYSQL_SYSVAR(eloqstore_max_write_batch_pages),
MYSQL_SYSVAR(eloqstore_buf_ring_size),
MYSQL_SYSVAR(eloqstore_coroutine_stack_size),
MYSQL_SYSVAR(eloqstore_num_retained_archives),
MYSQL_SYSVAR(eloqstore_archive_interval_secs),
MYSQL_SYSVAR(eloqstore_max_archive_tasks),
MYSQL_SYSVAR(eloqstore_file_amplify_factor),
MYSQL_SYSVAR(eloqstore_local_space_limit),
MYSQL_SYSVAR(eloqstore_reserve_space_ratio),
MYSQL_SYSVAR(eloqstore_data_page_size),
MYSQL_SYSVAR(eloqstore_pages_per_file_shift),
MYSQL_SYSVAR(eloqstore_overflow_pointers),
MYSQL_SYSVAR(eloqstore_data_append_mode),
#endif
NULL};

Expand Down Expand Up @@ -2438,7 +2581,7 @@ static int eloq_init_func(void *p)
eloq_dss_rocksdb_cloud_sst_file_cache_num_shard_bits;
rocksdb_cloud_config.db_file_deletion_delay_=
eloq_dss_rocksdb_cloud_file_deletion_delay;
rocksdb_cloud_config.purger_periodicity_millis_ =
rocksdb_cloud_config.purger_periodicity_millis_=
eloq_dss_rocksdb_cloud_purger_periodicity_secs * 1000;

bool enable_cache_replacement_= fake_config_reader.GetBoolean(
Expand All @@ -2454,7 +2597,7 @@ static int eloq_init_func(void *p)
rocksdb_config, enable_cache_replacement_);
#elif defined(DATA_STORE_TYPE_ELOQDSS_ELOQSTORE)
EloqDS::EloqStoreConfig eloq_store_config;
eloq_store_config.worker_count_= eloq_eloqstore_worker_num;
eloq_store_config.worker_count_= std::max(eloq_eloqstore_worker_num, 1U);
eloq_store_config.storage_path_= eloq_eloqstore_data_path;
if (eloq_store_config.storage_path_.empty())
{
Expand All @@ -2470,6 +2613,40 @@ static int eloq_init_func(void *p)
eloq_store_config.cloud_worker_count_= eloq_eloqstore_cloud_worker_count;
LOG_IF(INFO, !eloq_store_config.cloud_store_path_.empty())
<< "EloqStore cloud store enabled.";
eloq_store_config.data_page_restart_interval_=
eloq_eloqstore_data_page_restart_interval;
eloq_store_config.index_page_restart_interval_=
eloq_eloqstore_index_page_restart_interval;
eloq_store_config.init_page_count_= eloq_eloqstore_init_page_count;
eloq_store_config.skip_verify_checksum_=
eloq_eloqstore_skip_verify_checksum;
eloq_store_config.index_buffer_pool_size_=
eloq_eloqstore_index_buffer_pool_size /
eloq_store_config.worker_count_;
eloq_store_config.manifest_limit_= eloq_eloqstore_manifest_limit;
eloq_store_config.io_queue_size_=
eloq_eloqstore_io_queue_size / eloq_store_config.worker_count_;
eloq_store_config.max_inflight_write_=
eloq_eloqstore_max_inflight_write / eloq_store_config.worker_count_;
eloq_store_config.max_write_batch_pages_=
eloq_eloqstore_max_write_batch_pages;
eloq_store_config.buf_ring_size_= eloq_eloqstore_buf_ring_size;
eloq_store_config.coroutine_stack_size_=
eloq_eloqstore_coroutine_stack_size;
eloq_store_config.num_retained_archives_=
eloq_eloqstore_num_retained_archives;
eloq_store_config.archive_interval_secs_=
eloq_eloqstore_archive_interval_secs;
eloq_store_config.max_archive_tasks_= eloq_eloqstore_max_archive_tasks;
eloq_store_config.file_amplify_factor_= eloq_eloqstore_file_amplify_factor;
eloq_store_config.local_space_limit_=
eloq_eloqstore_local_space_limit / eloq_store_config.worker_count_;
eloq_store_config.reserve_space_ratio_= eloq_eloqstore_reserve_space_ratio;
eloq_store_config.data_page_size_= eloq_eloqstore_data_page_size;
eloq_store_config.pages_per_file_shift_=
eloq_eloqstore_pages_per_file_shift;
eloq_store_config.overflow_pointers_= eloq_eloqstore_overflow_pointers;
eloq_store_config.data_append_mode_= eloq_eloqstore_data_append_mode;
auto ds_factory=
std::make_unique<EloqDS::EloqStoreDataStoreFactory>(eloq_store_config);
#endif
Expand Down Expand Up @@ -2502,7 +2679,9 @@ static int eloq_init_func(void *p)
<< eloq_store_config.cloud_store_path_
<< ", gc threads: " << eloq_store_config.gc_threads_
<< ", cloud worker count: "
<< eloq_store_config.cloud_worker_count_;
<< eloq_store_config.cloud_worker_count_
<< ", buffer pool size per shard: "
<< eloq_store_config.index_buffer_pool_size_;
::eloqstore::KvOptions store_config;
store_config.num_threads= eloq_store_config.worker_count_;
store_config.store_path.emplace_back()
Expand All @@ -2519,6 +2698,40 @@ static int eloq_init_func(void *p)
}
store_config.num_gc_threads= eloq_store_config.gc_threads_;
store_config.rclone_threads= eloq_store_config.cloud_worker_count_;
store_config.data_page_restart_interval=
eloq_store_config.data_page_restart_interval_;
store_config.index_page_restart_interval=
eloq_store_config.index_page_restart_interval_;
store_config.init_page_count= eloq_store_config.init_page_count_;
store_config.skip_verify_checksum=
eloq_store_config.skip_verify_checksum_;
store_config.index_buffer_pool_size=
eloq_store_config.index_buffer_pool_size_;
store_config.manifest_limit= eloq_store_config.manifest_limit_;
store_config.io_queue_size= eloq_store_config.io_queue_size_;
store_config.max_inflight_write= eloq_store_config.max_inflight_write_;
store_config.max_write_batch_pages=
eloq_store_config.max_write_batch_pages_;
store_config.buf_ring_size= eloq_store_config.buf_ring_size_;
store_config.coroutine_stack_size=
eloq_store_config.coroutine_stack_size_;
store_config.num_retained_archives=
eloq_store_config.num_retained_archives_;
store_config.archive_interval_secs=
eloq_store_config.archive_interval_secs_;
store_config.max_archive_tasks= eloq_store_config.max_archive_tasks_;
store_config.file_amplify_factor= eloq_store_config.file_amplify_factor_;
store_config.local_space_limit= eloq_store_config.local_space_limit_;
store_config.reserve_space_ratio= eloq_store_config.reserve_space_ratio_;
store_config.data_page_size= eloq_store_config.data_page_size_;
store_config.pages_per_file_shift=
eloq_store_config.pages_per_file_shift_;
store_config.overflow_pointers= eloq_store_config.overflow_pointers_;
store_config.data_append_mode= eloq_store_config.data_append_mode_;
if (eloq_store_config.comparator_ != nullptr)
{
store_config.comparator_= eloq_store_config.comparator_;
}
auto ds= std::make_unique<EloqDS::EloqStoreDataStore>(
shard_id, data_store_service_.get(), store_config);
#endif
Expand Down