diff --git a/storage/eloq/ha_eloq.cc b/storage/eloq/ha_eloq.cc index feadfe2461e..9f644d492fe 100644 --- a/storage/eloq/ha_eloq.cc +++ b/storage/eloq/ha_eloq.cc @@ -2568,6 +2568,59 @@ static int eloq_init_func(void *p) std::string store_keyspace_name(eloq_keyspace_name); + if (opt_bootstrap) + { + // When execute mysql_install_db.sh, eloq should run in solo mode. + std::vector solo_config; + solo_config.emplace_back(0, local_ip, local_port); + ng_configs.try_emplace(0, std::move(solo_config)); + } + else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs, + cluster_config_version)) + { + + // Read cluster topology from general config file in this case + auto parse_res= txservice::ParseNgConfig( + eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs, + eloq_node_group_replica_num, 0); + if (!parse_res) + { + LOG(ERROR) << "Failed to extract cluster configs from ip_port_list."; + DBUG_RETURN(eloq_init_abort()); + } + } + + bool found= false; + uint32_t native_ng_id= 0; + // check whether this node is in cluster. + for (auto &pair : ng_configs) + { + auto &ng_nodes= pair.second; + for (size_t i= 0; i < ng_nodes.size(); i++) + { + if (ng_nodes[i].host_name_ == local_ip && + ng_nodes[i].port_ == local_port) + { + node_id= ng_nodes[i].node_id_; + found= true; + if (ng_nodes[i].is_candidate_) + { + // found native_ng_id. + native_ng_id= pair.first; + break; + } + } + } + } + + if (!found) + { + sql_print_error("!!!!!!!! Current node does not belong to any node " + "group, EloqDB " + "startup is terminated !!!!!!!!"); + DBUG_RETURN(eloq_init_abort()); + } + switch (eloq_kv_storage) { #if defined(DATA_STORE_TYPE_DYNAMODB) @@ -2624,58 +2677,6 @@ static int eloq_init_func(void *p) sql_print_error("Failed to create dir: %s", dss_data_path.c_str()); DBUG_RETURN(eloq_init_abort()); } - std::string dss_config_file_path= eloq_dss_config_file_path; - if (dss_config_file_path.empty()) - { - dss_config_file_path= dss_data_path + "/dss_config.ini"; - } - - EloqDS::DataStoreServiceClusterManager ds_config; - if (!ds_config.Load(dss_config_file_path)) - { - if (!ds_peer_node.empty()) - { - ds_config.SetThisNode(local_ip, local_port + 7); - // Fetch ds topology from peer node - if (!EloqDS::DataStoreService::FetchConfigFromPeer(ds_peer_node, - ds_config)) - { - sql_print_error("Failed to fetch config from peer node: %s", - ds_peer_node.c_str()); - DBUG_RETURN(eloq_init_abort()); - } - - // Save the fetched config to the local file - if (!ds_config.Save(dss_config_file_path)) - { - sql_print_error("Failed to save config to file: %s", - dss_config_file_path.c_str()); - DBUG_RETURN(eloq_init_abort()); - } - } - else if (opt_bootstrap || is_single_node) - { - // Initialize the data store service config - ds_config.Initialize(local_ip, local_port + 7); - if (!ds_config.Save(dss_config_file_path)) - { - sql_print_error("Failed to save config to file: %s", - dss_config_file_path.c_str()); - DBUG_RETURN(eloq_init_abort()); - } - } - else - { - sql_print_error("Failed to load data store service config file: %s", - dss_config_file_path.c_str()); - DBUG_RETURN(eloq_init_abort()); - } - } - else - { - sql_print_information("EloqDataStoreService loaded config file %s", - dss_config_file_path.c_str()); - } #if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \ defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS) @@ -2721,14 +2722,63 @@ static int eloq_init_func(void *p) std::move(eloq_store_config)); #endif + std::string dss_config_file_path= ""; + EloqDS::DataStoreServiceClusterManager ds_config; + uint32_t dss_leader_id= EloqDS::UNKNOWN_DSS_LEADER_NODE_ID; + + // use tx node id as the dss node id + // since they are deployed together + uint32_t dss_node_id= node_id; + if (opt_bootstrap || is_single_node) + { + dss_leader_id= node_id; + } + + if (!ds_peer_node.empty()) + { + ds_config.SetThisNode( + local_ip, + EloqDS::DataStoreServiceClient::TxPort2DssPort(local_port)); + // Fetch ds topology from peer node + if (!EloqDS::DataStoreService::FetchConfigFromPeer(eloq_dss_peer_node, + ds_config)) + { + sql_print_error("Failed to fetch DSS config from peer node: %s", + eloq_dss_peer_node); + DBUG_RETURN(eloq_init_abort()); + } + } + else + { + if (ng_configs.size() > 1) + { + sql_print_error("EloqDS multi-node cluster must specify " + "eloq_dss_peer_node."); + DBUG_RETURN(eloq_init_abort()); + } + + EloqDS::DataStoreServiceClient::TxConfigsToDssClusterConfig( + dss_node_id, native_ng_id, ng_configs, dss_leader_id, ds_config); + } + data_store_service_= std::make_unique( ds_config, dss_config_file_path, dss_data_path + "/DSMigrateLog", std::move(ds_factory)); // setup local data store service, the data store service will start // data store if needed. - bool ret= - data_store_service_->StartService((opt_bootstrap || is_single_node)); + bool ret= true; +#if defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB) + // For non shared storage like rocksdb, + // we always set create_if_missing to true + // since non conflicts will happen under + // multi-node deployment. + ret= data_store_service_->StartService(true, dss_leader_id, dss_node_id); +#else + ret= data_store_service_->StartService((opt_bootstrap || is_single_node), + dss_leader_id, dss_node_id); +#endif + if (!ret) { sql_print_error("Failed to start data store service"); @@ -2754,58 +2804,6 @@ static int eloq_init_func(void *p) default: DBUG_RETURN(eloq_init_abort()); } - if (opt_bootstrap) - { - // When execute mysql_install_db.sh, eloq should run in solo mode. - std::vector solo_config; - solo_config.emplace_back(0, local_ip, local_port); - ng_configs.try_emplace(0, std::move(solo_config)); - } - else if (!txservice::ReadClusterConfigFile(cluster_config_file, ng_configs, - cluster_config_version)) - { - - // Read cluster topology from general config file in this case - auto parse_res= txservice::ParseNgConfig( - eloq_ip_list, eloq_standby_ip_list, eloq_voter_ip_list, ng_configs, - eloq_node_group_replica_num, 0); - if (!parse_res) - { - LOG(ERROR) << "Failed to extract cluster configs from ip_port_list."; - DBUG_RETURN(eloq_init_abort()); - } - } - - bool found= false; - uint32_t native_ng_id= 0; - // check whether this node is in cluster. - for (auto &pair : ng_configs) - { - auto &ng_nodes= pair.second; - for (size_t i= 0; i < ng_nodes.size(); i++) - { - if (ng_nodes[i].host_name_ == local_ip && - ng_nodes[i].port_ == local_port) - { - node_id= ng_nodes[i].node_id_; - found= true; - if (ng_nodes[i].is_candidate_) - { - // found native_ng_id. - native_ng_id= pair.first; - break; - } - } - } - } - - if (!found) - { - sql_print_error("!!!!!!!! Current node does not belong to any node " - "group, EloqDB " - "startup is terminated !!!!!!!!"); - DBUG_RETURN(eloq_init_abort()); - } // Set max rpc message size as 512mb. GFLAGS_NAMESPACE::SetCommandLineOption("max_body_size", "536870912"); diff --git a/storage/eloq/mysql-test/mono_multi/t/fault_inject_for_upsert_table.test b/storage/eloq/mysql-test/mono_multi/t/fault_inject_for_upsert_table.test index 9c5840781b4..bfb37618837 100644 --- a/storage/eloq/mysql-test/mono_multi/t/fault_inject_for_upsert_table.test +++ b/storage/eloq/mysql-test/mono_multi/t/fault_inject_for_upsert_table.test @@ -54,6 +54,10 @@ drop table t1; --error ER_NO_SUCH_TABLE select * from t1; SET SESSION debug_dbug="-d,eloq;kv_upsert_table;node_id=1"; +# Sleep for replay work finished to avoid ww conflict +--disable_result_log +--sleep 3 +--enable_result_log create table t1(i int primary key, j int) engine= eloq; select * from t1; drop table t1; diff --git a/storage/eloq/store_handler b/storage/eloq/store_handler index 62e17d3b8c8..ce0e9317fd4 160000 --- a/storage/eloq/store_handler +++ b/storage/eloq/store_handler @@ -1 +1 @@ -Subproject commit 62e17d3b8c8c4095d094b66351fdd7dec3a4f4d4 +Subproject commit ce0e9317fd486a0e5a1342228ca4f2805d492daf diff --git a/storage/eloq/tx_service b/storage/eloq/tx_service index f3f89e6e6d9..307387086a3 160000 --- a/storage/eloq/tx_service +++ b/storage/eloq/tx_service @@ -1 +1 @@ -Subproject commit f3f89e6e6d9a4c8dca6ceed4b6058848ab256597 +Subproject commit 307387086a3e05d9da8c31dc6b818923361b076b