Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 104 additions & 106 deletions storage/eloq/ha_eloq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2568,6 +2568,59 @@ static int eloq_init_func(void *p)

std::string store_keyspace_name(eloq_keyspace_name);

if (opt_bootstrap)
{
// When execute mysql_install_db.sh, eloq should run in solo mode.
std::vector<NodeConfig> solo_config;
solo_config.emplace_back(0, local_ip, local_port);
ng_configs.try_emplace(0, std::move(solo_config));
}
else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs,
cluster_config_version))
{

// Read cluster topology from general config file in this case
auto parse_res= txservice::ParseNgConfig(
eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs,
eloq_node_group_replica_num, 0);
if (!parse_res)
{
LOG(ERROR) << "Failed to extract cluster configs from ip_port_list.";
DBUG_RETURN(eloq_init_abort());
}
}

bool found= false;
uint32_t native_ng_id= 0;
// check whether this node is in cluster.
for (auto &pair : ng_configs)
{
auto &ng_nodes= pair.second;
for (size_t i= 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_port)
{
node_id= ng_nodes[i].node_id_;
found= true;
if (ng_nodes[i].is_candidate_)
{
// found native_ng_id.
native_ng_id= pair.first;
break;
}
}
}
}

if (!found)
{
sql_print_error("!!!!!!!! Current node does not belong to any node "
"group, EloqDB "
"startup is terminated !!!!!!!!");
DBUG_RETURN(eloq_init_abort());
}

Comment on lines +2571 to +2623
Copy link

@coderabbitai coderabbitai bot Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix native_ng_id assignment and stop scanning once the local node is found

  • native_ng_id is set only when is_candidate_ is true; for non-candidate (e.g., standby/learner) nodes it remains 0. This can misconfigure TxService/DSS when this node is part of a group but not a candidate.
  • The outer loop doesn’t break after a match; we keep scanning needlessly and risk overwriting state.

Apply this diff to always capture the group id and exit both loops after the first match:

-  bool found= false;
-  uint32_t native_ng_id= 0;
+  bool found= false;
+  uint32_t native_ng_id= 0;
   // check whether this node is in cluster.
-  for (auto &pair : ng_configs)
+  for (const auto &pair : ng_configs)
   {
     auto &ng_nodes= pair.second;
     for (size_t i= 0; i < ng_nodes.size(); i++)
     {
       if (ng_nodes[i].host_name_ == local_ip &&
           ng_nodes[i].port_ == local_port)
       {
         node_id= ng_nodes[i].node_id_;
-        found= true;
-        if (ng_nodes[i].is_candidate_)
-        {
-          // found native_ng_id.
-          native_ng_id= pair.first;
-          break;
-        }
+        native_ng_id= pair.first;   // set regardless of candidate role
+        found= true;
+        break;
       }
     }
+    if (found) break;
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (opt_bootstrap)
{
// When execute mysql_install_db.sh, eloq should run in solo mode.
std::vector<NodeConfig> solo_config;
solo_config.emplace_back(0, local_ip, local_port);
ng_configs.try_emplace(0, std::move(solo_config));
}
else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs,
cluster_config_version))
{
// Read cluster topology from general config file in this case
auto parse_res= txservice::ParseNgConfig(
eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs,
eloq_node_group_replica_num, 0);
if (!parse_res)
{
LOG(ERROR) << "Failed to extract cluster configs from ip_port_list.";
DBUG_RETURN(eloq_init_abort());
}
}
bool found= false;
uint32_t native_ng_id= 0;
// check whether this node is in cluster.
for (auto &pair : ng_configs)
{
auto &ng_nodes= pair.second;
for (size_t i= 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_port)
{
node_id= ng_nodes[i].node_id_;
found= true;
if (ng_nodes[i].is_candidate_)
{
// found native_ng_id.
native_ng_id= pair.first;
break;
}
}
}
}
if (!found)
{
sql_print_error("!!!!!!!! Current node does not belong to any node "
"group, EloqDB "
"startup is terminated !!!!!!!!");
DBUG_RETURN(eloq_init_abort());
}
if (opt_bootstrap)
{
// When execute mysql_install_db.sh, eloq should run in solo mode.
std::vector<NodeConfig> solo_config;
solo_config.emplace_back(0, local_ip, local_port);
ng_configs.try_emplace(0, std::move(solo_config));
}
else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs,
cluster_config_version))
{
// Read cluster topology from general config file in this case
auto parse_res= txservice::ParseNgConfig(
eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs,
eloq_node_group_replica_num, 0);
if (!parse_res)
{
LOG(ERROR) << "Failed to extract cluster configs from ip_port_list.";
DBUG_RETURN(eloq_init_abort());
}
}
bool found= false;
uint32_t native_ng_id= 0;
// check whether this node is in cluster.
for (const auto &pair : ng_configs)
{
auto &ng_nodes= pair.second;
for (size_t i= 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_port)
{
node_id= ng_nodes[i].node_id_;
native_ng_id= pair.first; // set regardless of candidate role
found= true;
break;
}
}
if (found) break;
}
if (!found)
{
sql_print_error("!!!!!!!! Current node does not belong to any node "
"group, EloqDB "
"startup is terminated !!!!!!!!");
DBUG_RETURN(eloq_init_abort());
}
🤖 Prompt for AI Agents
In storage/eloq/ha_eloq.cc around lines 2571-2623, the code only assigns
native_ng_id when ng_nodes[i].is_candidate_ is true and continues scanning after
finding the local node which can leave native_ng_id unset for non-candidate
nodes and risk overwriting state; change the logic so that when a local node
match is found you always set node_id, found = true and native_ng_id =
pair.first (regardless of is_candidate_), then break out of the inner loop and
also break the outer loop (use a boolean flag or an outer-break pattern) to stop
further scanning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to ng_config, there always be a candidate for a group, or any node will always be a candidate of a group.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@githubzilla Got it — thanks for the clarification. If the local node is always a candidate per ng_config, then guarding native_ng_id behind is_candidate_ is fine.

One small improvement still worth making: break out of both loops as soon as the local node is found (and make the loop variables const). This avoids needless scanning and eliminates any chance of later iterations touching state.

Here’s the minimal change that preserves your current semantics while adding the early breaks:

// check whether this node is in cluster.
for (const auto &pair : ng_configs)
{
  const auto &ng_nodes = pair.second;
  for (size_t i = 0; i < ng_nodes.size(); ++i)
  {
    if (ng_nodes[i].host_name_ == local_ip &&
        ng_nodes[i].port_ == local_port)
    {
      node_id = ng_nodes[i].node_id_;
      if (ng_nodes[i].is_candidate_)
      {
        // found native_ng_id.
        native_ng_id = pair.first;
      }
      found = true;
      break;           // stop scanning this group
    }
  }
  if (found) break;    // stop scanning other groups
}

Optionally, we can add a debug assert to catch config drift in the future:

assert(ng_nodes[i].is_candidate_ && "Local node should be a candidate per ng_config");

If you’re good with this micro-change, I can prepare it as an update to the PR.


✏️ Learnings added
Learnt from: githubzilla
Repo: eloqdata/eloqsql PR: 154
File: storage/eloq/ha_eloq.cc:2571-2623
Timestamp: 2025-11-07T07:10:40.624Z
Learning: Repo: eloqdata/eloqsql PR: 154
File/Area: storage/eloq/ha_eloq.cc (eloq_init_func cluster membership resolution)
Learning: In this repo’s ng_config, the local node is guaranteed to be a candidate (NodeConfig::is_candidate_ == true). Therefore, assigning native_ng_id only when is_candidate_ is true is acceptable; the key improvement is to break early from both loops once the local node is found.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: githubzilla
Repo: eloqdata/eloqsql PR: 154
File: storage/eloq/ha_eloq.cc:2770-2781
Timestamp: 2025-11-07T05:12:43.190Z
Learning: In eloqdata/eloqsql, storage/eloq/ha_eloq.cc (KV_ELOQDS path): for DATA_STORE_TYPE_ELOQDSS_ROCKSDB (local RocksDB), each eloqsql instance owns its own RocksDB instance, and passing create_if_missing=true to DataStoreService::StartService is intentional and safe.

switch (eloq_kv_storage)
{
#if defined(DATA_STORE_TYPE_DYNAMODB)
Expand Down Expand Up @@ -2624,58 +2677,6 @@ static int eloq_init_func(void *p)
sql_print_error("Failed to create dir: %s", dss_data_path.c_str());
DBUG_RETURN(eloq_init_abort());
}
std::string dss_config_file_path= eloq_dss_config_file_path;
if (dss_config_file_path.empty())
{
dss_config_file_path= dss_data_path + "/dss_config.ini";
}

EloqDS::DataStoreServiceClusterManager ds_config;
if (!ds_config.Load(dss_config_file_path))
{
if (!ds_peer_node.empty())
{
ds_config.SetThisNode(local_ip, local_port + 7);
// Fetch ds topology from peer node
if (!EloqDS::DataStoreService::FetchConfigFromPeer(ds_peer_node,
ds_config))
{
sql_print_error("Failed to fetch config from peer node: %s",
ds_peer_node.c_str());
DBUG_RETURN(eloq_init_abort());
}

// Save the fetched config to the local file
if (!ds_config.Save(dss_config_file_path))
{
sql_print_error("Failed to save config to file: %s",
dss_config_file_path.c_str());
DBUG_RETURN(eloq_init_abort());
}
}
else if (opt_bootstrap || is_single_node)
{
// Initialize the data store service config
ds_config.Initialize(local_ip, local_port + 7);
if (!ds_config.Save(dss_config_file_path))
{
sql_print_error("Failed to save config to file: %s",
dss_config_file_path.c_str());
DBUG_RETURN(eloq_init_abort());
}
}
else
{
sql_print_error("Failed to load data store service config file: %s",
dss_config_file_path.c_str());
DBUG_RETURN(eloq_init_abort());
}
}
else
{
sql_print_information("EloqDataStoreService loaded config file %s",
dss_config_file_path.c_str());
}

#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \
defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS)
Expand Down Expand Up @@ -2721,14 +2722,63 @@ static int eloq_init_func(void *p)
std::move(eloq_store_config));
#endif

std::string dss_config_file_path= "";
EloqDS::DataStoreServiceClusterManager ds_config;
uint32_t dss_leader_id= EloqDS::UNKNOWN_DSS_LEADER_NODE_ID;

// use tx node id as the dss node id
// since they are deployed together
uint32_t dss_node_id= node_id;
if (opt_bootstrap || is_single_node)
{
dss_leader_id= node_id;
}

if (!ds_peer_node.empty())
{
ds_config.SetThisNode(
local_ip,
EloqDS::DataStoreServiceClient::TxPort2DssPort(local_port));
// Fetch ds topology from peer node
if (!EloqDS::DataStoreService::FetchConfigFromPeer(eloq_dss_peer_node,
ds_config))
{
sql_print_error("Failed to fetch DSS config from peer node: %s",
eloq_dss_peer_node);
DBUG_RETURN(eloq_init_abort());
}
}
else
{
if (ng_configs.size() > 1)
{
sql_print_error("EloqDS multi-node cluster must specify "
"eloq_dss_peer_node.");
DBUG_RETURN(eloq_init_abort());
}

EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig(
dss_node_id, native_ng_id, ng_configs, dss_leader_id, ds_config);
}

data_store_service_= std::make_unique<EloqDS::DataStoreService>(
ds_config, dss_config_file_path, dss_data_path + "/DSMigrateLog",
std::move(ds_factory));

// setup local data store service, the data store service will start
// data store if needed.
bool ret=
data_store_service_->StartService((opt_bootstrap || is_single_node));
bool ret= true;
#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB)
// For non shared storage like rocksdb,
// we always set create_if_missing to true
// since non conflicts will happen under
// multi-node deployment.
ret= data_store_service_->StartService(true, dss_leader_id, dss_node_id);
#else
ret= data_store_service_->StartService((opt_bootstrap || is_single_node),
dss_leader_id, dss_node_id);
#endif

if (!ret)
{
sql_print_error("Failed to start data store service");
Expand All @@ -2754,58 +2804,6 @@ static int eloq_init_func(void *p)
default:
DBUG_RETURN(eloq_init_abort());
}
if (opt_bootstrap)
{
// When execute mysql_install_db.sh, eloq should run in solo mode.
std::vector<NodeConfig> solo_config;
solo_config.emplace_back(0, local_ip, local_port);
ng_configs.try_emplace(0, std::move(solo_config));
}
else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs,
cluster_config_version))
{

// Read cluster topology from general config file in this case
auto parse_res= txservice::ParseNgConfig(
eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs,
eloq_node_group_replica_num, 0);
if (!parse_res)
{
LOG(ERROR) << "Failed to extract cluster configs from ip_port_list.";
DBUG_RETURN(eloq_init_abort());
}
}

bool found= false;
uint32_t native_ng_id= 0;
// check whether this node is in cluster.
for (auto &pair : ng_configs)
{
auto &ng_nodes= pair.second;
for (size_t i= 0; i < ng_nodes.size(); i++)
{
if (ng_nodes[i].host_name_ == local_ip &&
ng_nodes[i].port_ == local_port)
{
node_id= ng_nodes[i].node_id_;
found= true;
if (ng_nodes[i].is_candidate_)
{
// found native_ng_id.
native_ng_id= pair.first;
break;
}
}
}
}

if (!found)
{
sql_print_error("!!!!!!!! Current node does not belong to any node "
"group, EloqDB "
"startup is terminated !!!!!!!!");
DBUG_RETURN(eloq_init_abort());
}

// Set max rpc message size as 512mb.
GFLAGS_NAMESPACE::SetCommandLineOption("max_body_size", "536870912");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ drop table t1;
--error ER_NO_SUCH_TABLE
select * from t1;
SET SESSION debug_dbug="-d,eloq;kv_upsert_table;node_id=1";
# Sleep for replay work finished to avoid ww conflict
--disable_result_log
--sleep 3
--enable_result_log
create table t1(i int primary key, j int) engine= eloq;
select * from t1;
drop table t1;
Expand Down