Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 121 additions & 9 deletions tidesdb/ha_tidesdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ static ulong srv_log_level = 0; /* TDB_LOG_
static ulonglong srv_block_cache_size = TIDESDB_DEFAULT_BLOCK_CACHE; /* 256MB */
static ulong srv_max_open_sstables = 256;
static ulonglong srv_max_memory_usage = 0; /* 0 = auto (library decides) */
static my_bool srv_log_to_file = 1; /* write TidesDB logs to file (default: yes) */
static ulonglong srv_log_truncation_at = 24ULL * 1024 * 1024; /* log file truncation size (24MB) */

/* Per-session TTL override (seconds). 0 = use table default. */
static MYSQL_THDVAR_ULONGLONG(ttl, PLUGIN_VAR_RQCMDARG,
Expand Down Expand Up @@ -196,6 +198,17 @@ static MYSQL_SYSVAR_ULONGLONG(max_memory_usage, srv_max_memory_usage,
"TidesDB global memory limit in bytes (0 = auto, ~80%% system RAM)",
NULL, NULL, 0, 0, ULONGLONG_MAX, 0);

static MYSQL_SYSVAR_BOOL(log_to_file, srv_log_to_file, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"Write TidesDB logs to a LOG file in the data directory "
"instead of stderr (default: ON)",
NULL, NULL, 1);

static MYSQL_SYSVAR_ULONGLONG(log_truncation_at, srv_log_truncation_at,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
"TidesDB log file truncation size in bytes "
"(0 = no truncation, default: 24MB)",
NULL, NULL, 24ULL * 1024 * 1024, 0, ULONGLONG_MAX, 0);

/* Configurable data directory.
Defaults to NULL which means the plugin computes a sibling directory
of mysql_real_data_home. Setting this overrides the auto-computed path. */
Expand Down Expand Up @@ -288,6 +301,51 @@ static MYSQL_SYSVAR_STR(backup_dir, srv_backup_dir, PLUGIN_VAR_RQCMDARG | PLUGIN
"Example: SET GLOBAL tidesdb_backup_dir = '/path/to/backup'",
NULL, tidesdb_backup_dir_update, NULL);

/* ---- Checkpoint (hard-link snapshot) via system variable ---- */

static char *srv_checkpoint_dir = NULL;

static void tidesdb_checkpoint_dir_update(THD *thd, struct st_mysql_sys_var *, void *var_ptr,
const void *save)
{
const char *new_dir = *static_cast<const char *const *>(save);

if (!new_dir || !new_dir[0])
{
*static_cast<char **>(var_ptr) = NULL;
return;
}

if (!tdb_global)
{
my_error(ER_UNKNOWN_ERROR, MYF(0), "TidesDB is not open");
return;
}

sql_print_information("TIDESDB: Starting checkpoint to '%s'", new_dir);

int rc = tidesdb_checkpoint(tdb_global, new_dir);

if (rc != TDB_SUCCESS)
{
sql_print_error("TIDESDB: Checkpoint to '%s' failed (err=%d)", new_dir, rc);
my_printf_error(ER_UNKNOWN_ERROR, "TIDESDB: Checkpoint to '%s' failed (err=%d)", MYF(0),
new_dir, rc);
return;
}

sql_print_information("TIDESDB: Checkpoint to '%s' completed successfully", new_dir);
*static_cast<const char **>(var_ptr) = new_dir;
}

static MYSQL_SYSVAR_STR(checkpoint_dir, srv_checkpoint_dir,
PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_MEMALLOC,
"Set to a directory path to trigger a TidesDB checkpoint "
"(hard-link snapshot, near-instant). "
"The directory must not exist or be empty. "
"Example: SET GLOBAL tidesdb_checkpoint_dir = '/path/to/checkpoint'",
NULL, tidesdb_checkpoint_dir_update, NULL);

static struct st_mysql_sys_var *tidesdb_system_variables[] = {
MYSQL_SYSVAR(flush_threads),
MYSQL_SYSVAR(compaction_threads),
Expand All @@ -296,6 +354,7 @@ static struct st_mysql_sys_var *tidesdb_system_variables[] = {
MYSQL_SYSVAR(max_open_sstables),
MYSQL_SYSVAR(max_memory_usage),
MYSQL_SYSVAR(backup_dir),
MYSQL_SYSVAR(checkpoint_dir),
MYSQL_SYSVAR(print_all_conflicts),
MYSQL_SYSVAR(data_home_dir),
MYSQL_SYSVAR(ttl),
Expand All @@ -305,6 +364,8 @@ static struct st_mysql_sys_var *tidesdb_system_variables[] = {
MYSQL_SYSVAR(default_bloom_filter),
MYSQL_SYSVAR(default_use_btree),
MYSQL_SYSVAR(default_block_indexes),
MYSQL_SYSVAR(log_to_file),
MYSQL_SYSVAR(log_truncation_at),
NULL};

/* ******************** Table options (per-table CF config) ******************** */
Expand Down Expand Up @@ -987,8 +1048,8 @@ static int tidesdb_init_func(void *p)
cfg.log_level = (tidesdb_log_level_t)log_level_map[srv_log_level];
cfg.block_cache_size = (size_t)srv_block_cache_size;
cfg.max_open_sstables = (int)srv_max_open_sstables;
cfg.log_to_file = 1;
cfg.log_truncation_at = 24 * 1024 * 1024;
cfg.log_to_file = srv_log_to_file ? 1 : 0;
cfg.log_truncation_at = (size_t)srv_log_truncation_at;
cfg.max_memory_usage = (size_t)srv_max_memory_usage;

int rc = tidesdb_open(&cfg, &tdb_global);
Expand Down Expand Up @@ -3839,17 +3900,21 @@ int ha_tidesdb::optimize(THD *thd, HA_CHECK_OPT *check_opt)

if (!share || !share->cf) DBUG_RETURN(HA_ADMIN_FAILED);

int rc = tidesdb_compact(share->cf);
/* tidesdb_purge_cf() is synchronous: flushes memtable to disk, then
runs a full compaction inline, blocking until complete. This is
the right semantic for OPTIMIZE TABLE -- the caller expects the
table to be fully compacted when the statement returns. */
int rc = tidesdb_purge_cf(share->cf);
if (rc != TDB_SUCCESS)
sql_print_warning("TIDESDB: optimize: compact data CF '%s' failed (err=%d)",
sql_print_warning("TIDESDB: optimize: purge data CF '%s' failed (err=%d)",
share->cf_name.c_str(), rc);

for (uint i = 0; i < share->idx_cfs.size(); i++)
{
if (!share->idx_cfs[i]) continue;
rc = tidesdb_compact(share->idx_cfs[i]);
rc = tidesdb_purge_cf(share->idx_cfs[i]);
if (rc != TDB_SUCCESS)
sql_print_warning("TIDESDB: optimize: compact idx CF '%s' failed (err=%d)",
sql_print_warning("TIDESDB: optimize: purge idx CF '%s' failed (err=%d)",
share->idx_cf_names[i].c_str(), rc);
}

Expand Down Expand Up @@ -4685,6 +4750,53 @@ bool ha_tidesdb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_
for (uint i = 0; i < share->idx_cfs.size(); i++)
if (share->idx_cfs[i]) share->num_secondary_indexes++;

/* If table options changed (SYNC_MODE, COMPRESSION, BLOOM_FPR, etc.),
apply them to the live CF(s) so they take effect immediately instead
of only being persisted in the .frm. */
if (ha_alter_info->handler_flags & ALTER_CHANGE_CREATE_OPTION)
{
tidesdb_column_family_config_t cfg = build_cf_config(TDB_TABLE_OPTIONS(altered_table));

/* Main data CF */
if (share->cf)
{
int rc = tidesdb_cf_update_runtime_config(share->cf, &cfg, 1);
if (rc != TDB_SUCCESS)
sql_print_warning(
"TIDESDB: ALTER: failed to update runtime config for "
"data CF '%s' (err=%d)",
share->cf_name.c_str(), rc);
}

/* Secondary index CFs */
for (uint i = 0; i < share->idx_cfs.size(); i++)
{
if (share->idx_cfs[i])
{
int rc = tidesdb_cf_update_runtime_config(share->idx_cfs[i], &cfg, 1);
if (rc != TDB_SUCCESS)
sql_print_warning(
"TIDESDB: ALTER: failed to update runtime config for "
"index CF '%s' (err=%d)",
share->idx_cf_names[i].c_str(), rc);
}
}

/* Update share-level cached options that are read from table options */
if (TDB_TABLE_OPTIONS(altered_table))
{
uint iso_idx = TDB_TABLE_OPTIONS(altered_table)->isolation_level;
if (iso_idx < array_elements(tdb_isolation_map))
share->isolation_level = (tidesdb_isolation_level_t)tdb_isolation_map[iso_idx];
share->default_ttl = TDB_TABLE_OPTIONS(altered_table)->ttl;
share->has_ttl = (share->default_ttl > 0 || share->ttl_field_idx >= 0);
share->encrypted = TDB_TABLE_OPTIONS(altered_table)->encrypted;
if (share->encrypted)
share->encryption_key_id =
(uint)TDB_TABLE_OPTIONS(altered_table)->encryption_key_id;
}
}

/* We force a stats refresh on next info() call */
share->stats_refresh_us.store(0, std::memory_order_relaxed);
unlock_shared_ha_data();
Expand Down Expand Up @@ -4863,8 +4975,8 @@ maria_declare_plugin(tidesdb){
PLUGIN_LICENSE_GPL,
tidesdb_init_func,
tidesdb_deinit_func,
0x30401,
0x30500,
NULL,
tidesdb_system_variables,
"3.4.1",
MariaDB_PLUGIN_MATURITY_EXPERIMENTAL} maria_declare_plugin_end;
"3.5.0",
MariaDB_PLUGIN_MATURITY_GAMMA} maria_declare_plugin_end;
Loading