Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 90 additions & 20 deletions storage/eloq/ha_eloq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2298,7 +2298,11 @@ Aws::SDKOptions aws_options;
static int aws_init()
{
DBUG_ENTER("aws_init");
#if !defined(DBUG_OFF)
aws_options.loggingOptions.logLevel= Aws::Utils::Logging::LogLevel::Debug;
#else
aws_options.loggingOptions.logLevel= Aws::Utils::Logging::LogLevel::Info;
#endif
Aws::InitAPI(aws_options);
DBUG_RETURN(0);
}
Expand Down Expand Up @@ -2660,7 +2664,6 @@ static int eloq_init_func(void *p)

#elif ELOQDS
case KV_ELOQDS: {
bool is_single_node= true;
std::string ds_peer_node= eloq_dss_peer_node;
std::string ds_branch_name= eloq_dss_branch_name;
std::string dss_data_path= mysql_real_data_home_ptr;
Expand Down Expand Up @@ -2724,15 +2727,7 @@ static int eloq_init_func(void *p)

std::string dss_config_file_path= "";
EloqDS::DataStoreServiceClusterManager ds_config;
uint32_t dss_leader_id= EloqDS::UNKNOWN_DSS_LEADER_NODE_ID;

// use tx node id as the dss node id
// since they are deployed together
uint32_t dss_node_id= node_id;
if (opt_bootstrap || is_single_node)
{
dss_leader_id= node_id;
}
bool is_single_node= false;

Comment on lines 2728 to 2731
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bug: user-provided DSS config file path is ignored (shadowed variable)

Local std::string dss_config_file_path is hardcoded to "", shadowing the global sysvar eloq_dss_config_file_path. This drops any configured path at startup.

Apply this fix:

-    std::string dss_config_file_path= "";
+    // Honor the sysvar if set; otherwise leave empty.
+    std::string dss_config_file_path =
+        (eloq_dss_config_file_path && std::strlen(eloq_dss_config_file_path) > 0)
+            ? std::string(eloq_dss_config_file_path)
+            : std::string();
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
std::string dss_config_file_path= "";
EloqDS::DataStoreServiceClusterManager ds_config;
uint32_t dss_leader_id= EloqDS::UNKNOWN_DSS_LEADER_NODE_ID;
// use tx node id as the dss node id
// since they are deployed together
uint32_t dss_node_id= node_id;
if (opt_bootstrap || is_single_node)
{
dss_leader_id= node_id;
}
bool is_single_node= false;
// Honor the sysvar if set; otherwise leave empty.
std::string dss_config_file_path =
(eloq_dss_config_file_path && std::strlen(eloq_dss_config_file_path) > 0)
? std::string(eloq_dss_config_file_path)
: std::string();
EloqDS::DataStoreServiceClusterManager ds_config;
bool is_single_node= false;
🤖 Prompt for AI Agents
In storage/eloq/ha_eloq.cc around lines 2728 to 2731, a local variable
std::string dss_config_file_path is initialized to "" which shadows the global
sysvar eloq_dss_config_file_path and drops any user-provided path; remove the
hardcoded local initialization and either use the global
eloq_dss_config_file_path directly or initialize the local variable from that
global (e.g., std::string dss_config_file_path = eloq_dss_config_file_path) so
the configured path is preserved, and ensure no other local variable with the
same name exists in this scope.

if (!ds_peer_node.empty())
{
Expand All @@ -2747,18 +2742,88 @@ static int eloq_init_func(void *p)
eloq_dss_peer_node);
DBUG_RETURN(eloq_init_abort());
}
is_single_node=
(ng_configs.size() == 1 && ng_configs.begin()->second.size() == 1);
}
else
{
if (ng_configs.size() > 1)
// Bind dss leaders with TxService ng leaders.
std::unordered_map<uint32_t, uint32_t> ng_leaders;

if (opt_bootstrap)
{
sql_print_error("EloqDS multi-node cluster must specify "
"eloq_dss_peer_node.");
DBUG_RETURN(eloq_init_abort());
// Must parse all node groups to generate data store shards configs.
std::unordered_map<uint32_t, std::vector<NodeConfig>> tmp_ng_configs;
uint64_t tmp_cluster_version= 2;
if (!txservice::ReadClusterConfigFile(
cluster_config_file, tmp_ng_configs, tmp_cluster_version))
{

// Read cluster topology from general config file in this case
auto parse_res= txservice::ParseNgConfig(
eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list,
tmp_ng_configs, eloq_node_group_replica_num, 0);
if (!parse_res)
{
LOG(ERROR)
<< "Failed to extract cluster configs from ip_port_list.";
DBUG_RETURN(eloq_init_abort());
}
}

bool found= false;
uint32_t tmp_node_id;
// check whether this node is in cluster.
for (auto &pair : tmp_ng_configs)
{
auto &ng_nodes= pair.second;
for (size_t i= 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_port)
{
tmp_node_id= ng_nodes[i].node_id_;
found= true;
break;
}
}
if (found)
{
break;
}
}
if (!found)
{
sql_print_error("!!!!!!!! Current node does not belong to any node "
"group, EloqDB "
"startup is terminated !!!!!!!!");
DBUG_RETURN(eloq_init_abort());
}

// For bootstrap, start all data store shards in current node.
for (auto &ng : tmp_ng_configs)
{
ng_leaders.emplace(ng.first, tmp_node_id);
}

EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
tmp_node_id, tmp_ng_configs, ng_leaders, ds_config);
}
else
{
is_single_node=
(ng_configs.size() == 1 && ng_configs.begin()->second.size() == 1);
if (is_single_node)
{
for (const auto &[ng_id, ng_config] : ng_configs)
{
ng_leaders.emplace(ng_id, node_id);
}
}

EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
dss_node_id, native_ng_id, ng_configs, dss_leader_id, ds_config);
EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
node_id, ng_configs, ng_leaders, ds_config);
}
}

data_store_service_= std::make_unique<EloqDS::DataStoreService>(
Expand All @@ -2773,10 +2838,9 @@ static int eloq_init_func(void *p)
// we always set create_if_missing to true
// since non conflicts will happen under
// multi-node deployment.
ret= data_store_service_->StartService(true, dss_leader_id, dss_node_id);
ret= data_store_service_->StartService(true);
#else
ret= data_store_service_->StartService((opt_bootstrap || is_single_node),
dss_leader_id, dss_node_id);
ret= data_store_service_->StartService((opt_bootstrap || is_single_node));
#endif

if (!ret)
Expand All @@ -2787,7 +2851,13 @@ static int eloq_init_func(void *p)

// setup data store service client
storage_hd= std::make_unique<EloqDS::DataStoreServiceClient>(
catalog_factory, ds_config, data_store_service_.get());
#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB)
true,
#else
(opt_bootstrap || is_single_node),
#endif
catalog_factory, ds_config, ds_peer_node.empty(),
data_store_service_.get());

if (!storage_hd->Connect())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ insert into t2 values(1,1);
select * from t2;
i j
1 1
SET SESSION debug_dbug="+d,eloq;remote_acquire_before_sendmessage;node_id=1;action=panic";
SET SESSION debug_dbug="+d,eloq;remote_acquire_before_sendmessage;node_id=2;action=panic";
insert into t2 values(2,2);
ERROR HY000: Eloq: failed to commit transaction. try restarting transaction. Transaction failed due to internal request timeout.
select * from t2;
Expand Down
2 changes: 1 addition & 1 deletion storage/eloq/mysql-test/mono_multi/r/multi_nodes.result
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ i j
1 2
Begin;
update t2 set j=3;
connection serv_2;
connection serv_3;
# restart
connection serv_1;
commit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ insert into t2 values(1,1);
select * from t2;

# The row(2,2) will be located on server3(ng2).
SET SESSION debug_dbug="+d,eloq;remote_acquire_before_sendmessage;node_id=1;action=panic";
SET SESSION debug_dbug="+d,eloq;remote_acquire_before_sendmessage;node_id=2;action=panic";
# Write file to make mysql-test-run.pl expect crash
--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.2.expect
--exec echo "restart" > $MYSQLTEST_VARDIR/tmp/mysqld.3.expect

# insert transaction will timeout when waiting for the acquire write response.
--error 199
Expand Down
6 changes: 3 additions & 3 deletions storage/eloq/mysql-test/mono_multi/t/multi_nodes.test
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ select * from t2;
Begin;
update t2 set j=3;

# The row(1,1) is located on server-2, then restart it to trigger changing node term.
# Here will restart server 2. It will decide which node to restart
# The row(1,1) is located on server-3, then restart it to trigger changing node term.
# Here will restart server-3. It will decide which node to restart
# according to the current connection.
--connection serv_2
--connection serv_3
--source include/restart_mysqld.inc

--connection serv_1
Expand Down
2 changes: 1 addition & 1 deletion storage/eloq/store_handler
Submodule store_handler updated 32 files
+395 −278 data_store_service_client.cpp
+59 −73 data_store_service_client.h
+28 −19 data_store_service_client_closure.cpp
+73 −52 data_store_service_client_closure.h
+5 −1 data_store_service_scanner.cpp
+8 −7 data_store_service_scanner.h
+5 −5 eloq_data_store_service/CMakeLists.txt
+1 −0 eloq_data_store_service/build_eloq_store.cmake
+48 −0 eloq_data_store_service/data_store_factory.h
+805 −239 eloq_data_store_service/data_store_service.cpp
+135 −45 eloq_data_store_service/data_store_service.h
+14 −0 eloq_data_store_service/data_store_service_config.cpp
+16 −1 eloq_data_store_service/data_store_service_config.h
+5 −0 eloq_data_store_service/data_store_service_util.h
+24 −5 eloq_data_store_service/ds_request.proto
+9 −0 eloq_data_store_service/eloq_store_config.cpp
+45 −0 eloq_data_store_service/eloq_store_data_store_factory.h
+198 −69 eloq_data_store_service/internal_request.h
+2 −2 eloq_data_store_service/main.cpp
+114 −16 eloq_data_store_service/rocksdb_cloud_data_store.cpp
+14 −0 eloq_data_store_service/rocksdb_cloud_data_store.h
+54 −1 eloq_data_store_service/rocksdb_cloud_data_store_factory.h
+2 −8 eloq_data_store_service/rocksdb_data_store.cpp
+22 −27 eloq_data_store_service/rocksdb_data_store_common.cpp
+2 −4 eloq_data_store_service/rocksdb_data_store_common.h
+40 −0 eloq_data_store_service/rocksdb_data_store_factory.h
+163 −0 eloq_data_store_service/s3_file_downloader.cpp
+68 −0 eloq_data_store_service/s3_file_downloader.h
+51 −35 eloq_data_store_service/thread_worker_pool.cpp
+1 −0 eloq_data_store_service/thread_worker_pool.h
+3 −2 rocksdb_handler.cpp
+3 −2 rocksdb_handler.h