From 89577e1ad6d18fecc173ba62637ab50875739e52 Mon Sep 17 00:00:00 2001 From: liunyl Date: Tue, 2 Dec 2025 08:09:39 +0000 Subject: [PATCH 1/2] refactor init process --- data_substrate | 2 +- include/redis_service.h | 3 + src/redis_server.cpp | 36 ++++- src/redis_service.cpp | 284 +++++++++++++++++++++++++--------------- 4 files changed, 211 insertions(+), 114 deletions(-) diff --git a/data_substrate b/data_substrate index 9f83976a..213ff3a9 160000 --- a/data_substrate +++ b/data_substrate @@ -1 +1 @@ -Subproject commit 9f83976a3f3b5f90480d927d92806ecb06bd76d1 +Subproject commit 213ff3a99fe2074b137e58c4e8f4f13f25cf0b5e diff --git a/include/redis_service.h b/include/redis_service.h index 6340aca8..ce6aaf54 100644 --- a/include/redis_service.h +++ b/include/redis_service.h @@ -165,6 +165,9 @@ class RedisServiceImpl : public brpc::RedisService bool Init(brpc::Server &brpc_server); + // Second phase of initialization, to be called after DataSubstrate::Start() + bool Start(brpc::Server &brpc_server); + void Stop(); // The number of master nodes serving at least one hash slot in the cluster. diff --git a/src/redis_server.cpp b/src/redis_server.cpp index 90a20c47..8f4e4e62 100644 --- a/src/redis_server.cpp +++ b/src/redis_server.cpp @@ -449,12 +449,16 @@ int main(int argc, char *argv[]) // Convert eloqkv flags to tx flags ConvertEloqkvFlagsToTxFlags(&config_reader); - if (!DataSubstrate::InitializeGlobal(config_file)) + // Step 1: Initialize DataSubstrate + if (!DataSubstrate::Instance().Init(config_file)) { LOG(ERROR) << "Failed to initialize DataSubstrate."; return -1; } + + // Step 2: Initialize and register EloqKv engine LOG(INFO) << "Starting EloqKV Server ..."; + DataSubstrate::Instance().EnableEngine(txservice::TableEngine::EloqKv); brpc::Server server; brpc::ServerOptions server_options; auto redis_service_impl = @@ -463,13 +467,37 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Failed to start EloqKV server."; redis_service_impl->Stop(); - DataSubstrate::GetGlobal()->Shutdown(); + DataSubstrate::Instance().Shutdown(); +#if BRPC_WITH_GLOG + google::ShutdownGoogleLogging(); +#endif + return -1; + } + + // Step 3: Start DataSubstrate + if (!DataSubstrate::Instance().Start()) + { + LOG(ERROR) << "Failed to start DataSubstrate."; + redis_service_impl->Stop(); + DataSubstrate::Instance().Shutdown(); #if BRPC_WITH_GLOG google::ShutdownGoogleLogging(); #endif return -1; } + + // Step 4: Start Redis service EloqKV::RedisServiceImpl *redis_service_ptr = redis_service_impl.get(); + if (!redis_service_ptr->Start(server)) + { + LOG(ERROR) << "Failed to start Redis service."; + redis_service_ptr->Stop(); + DataSubstrate::Instance().Shutdown(); +#if BRPC_WITH_GLOG + google::ShutdownGoogleLogging(); +#endif + return -1; + } std::string n_bthreads; GFLAGS_NAMESPACE::GetCommandLineOption("bthread_concurrency", &n_bthreads); server_options.num_threads = std::stoi(n_bthreads); @@ -480,7 +508,7 @@ int main(int argc, char *argv[]) { LOG(ERROR) << "Failed to start EloqKV server."; redis_service_ptr->Stop(); - DataSubstrate::GetGlobal()->Shutdown(); + DataSubstrate::Instance().Shutdown(); #if BRPC_WITH_GLOG google::ShutdownGoogleLogging(); #endif @@ -503,7 +531,7 @@ int main(int argc, char *argv[]) { std::cout << "\nEloqKV Server Stopping..." << std::endl; } - DataSubstrate::GetGlobal()->Shutdown(); + DataSubstrate::Instance().Shutdown(); redis_service_ptr->Stop(); if (!FLAGS_alsologtostderr) diff --git a/src/redis_service.cpp b/src/redis_service.cpp index f1a4fb8e..a867528f 100644 --- a/src/redis_service.cpp +++ b/src/redis_service.cpp @@ -90,7 +90,6 @@ DECLARE_int32(event_dispatcher_num); } EloqKV::RedisCatalogFactory catalog_factory; -txservice::CatalogFactory *eloqkv_catalog_factory = &catalog_factory; // DEFINE all gflags here DECLARE_int32(node_memory_limit_mb); @@ -184,29 +183,15 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) return false; } - tx_service_ = DataSubstrate::GetGlobal()->GetTxService(); - if (tx_service_ == nullptr) - { - LOG(ERROR) << "Error: TxService is not initialized."; - return false; - } - bool enable_store = - DataSubstrate::GetGlobal()->GetCoreConfig().enable_data_store; - store_hd_ = DataSubstrate::GetGlobal()->GetStoreHandler(); - if (enable_store && store_hd_ == nullptr) - { - LOG(ERROR) << "Error: DataStoreHandler is not initialized."; - return false; - } - core_num_ = DataSubstrate::GetGlobal()->GetCoreConfig().core_num; - if (core_num_ == 0) - { - LOG(ERROR) << "Error: core_num is 0."; - return false; - } - node_memory_limit_mb_ = FLAGS_node_memory_limit_mb; + // Engine registration: EloqKv + auto &ds = DataSubstrate::Instance(); databases = config_reader.GetInteger("local", "databases", 16); + + // Prebuilt Redis tables (same names as today: data_table_0 .. data_table_15) + std::vector> prebuilt_tables; + prebuilt_tables.reserve(databases); + for (int i = 0; i < databases; i++) { std::string table_name("data_table_"); @@ -223,16 +208,56 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) #elif defined(DATA_STORE_TYPE_ROCKSDB) // TODO(lokax): #endif + prebuilt_tables.emplace_back(redis_table_name, image); redis_table_names_.push_back(redis_table_name); } - requirepass = config_reader.GetString("local", "requirepass", ""); + // EloqKV-specific metrics (follow commented metrics_init.cpp patterns) + std::vector>> + engine_metrics; + + for (const auto &[cmd, _] : EloqKV::command_types) + { + std::vector label_groups = {{"type", {cmd}}}; + + engine_metrics.push_back(std::make_tuple( + metrics::NAME_REDIS_COMMAND_DURATION, + metrics::Type::Histogram, + label_groups)); + engine_metrics.push_back(std::make_tuple( + metrics::NAME_REDIS_COMMAND_TOTAL, + metrics::Type::Counter, + label_groups)); + } + + for (const auto &access_type : {"read", "write"}) + { + engine_metrics.push_back( + std::make_tuple(metrics::NAME_REDIS_COMMAND_AGGREGATED_TOTAL, + metrics::Type::Counter, + std::vector{ + {"access_type", {access_type}}})); + engine_metrics.push_back( + std::make_tuple(metrics::NAME_REDIS_COMMAND_AGGREGATED_DURATION, + metrics::Type::Histogram, + std::vector{ + {"access_type", {access_type}}})); + } - skip_kv_ = !DataSubstrate::GetGlobal()->GetCoreConfig().enable_data_store; - skip_wal_ = !DataSubstrate::GetGlobal()->GetCoreConfig().enable_wal; + if (!ds.RegisterEngine(txservice::TableEngine::EloqKv, + &catalog_factory, + /*system_handler=*/nullptr, + std::move(prebuilt_tables), + std::move(engine_metrics))) + { + LOG(ERROR) << "Failed to register EloqKV engine with DataSubstrate"; + return false; + } + + requirepass = config_reader.GetString("local", "requirepass", ""); - std::string local_ip = - DataSubstrate::GetGlobal()->GetNetworkConfig().local_ip; redis_port_ = !CheckCommandLineFlagIsDefault("eloqkv_port") ? FLAGS_eloqkv_port : config_reader.GetInteger( @@ -262,34 +287,12 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) ? FLAGS_slow_log_max_length : config_reader.GetInteger("local", "slow_log_max_length", 128); - for (size_t core_idx = 0; core_idx < core_num_; ++core_idx) - { - slow_log_mutexes_.emplace_back(std::make_unique()); - } - - slow_log_.resize(core_num_); - next_slow_log_idx_.resize(core_num_, 0); - for (size_t core_idx = 0; core_idx < core_num_; ++core_idx) - { - // high 16 bits: core_id - // low 48 bits: counter - next_slow_log_unique_id_.push_back(core_idx << 48); - } - - slow_log_len_.resize(core_num_, 0); if (slow_log_threshold_ < -1) { // slowlog threshold master >= -1 slow_log_threshold_ = -1; } - // Each task group will maintain its own slow log to avoid race - // condition. - for (uint32_t i = 0; i < core_num_; ++i) - { - slow_log_[i].resize(slow_log_max_length_); - } - config_.try_emplace("slowlog-log-slower-than", std::to_string(slow_log_threshold_)); config_.try_emplace("slowlog-max-len", @@ -298,59 +301,6 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) std::srand(std::time(nullptr)); crc64speed_init_big(); - // Change(lzx): change the "local.port" and "cluster.ip_port_list" to the - // address of local redis and cluster instead of txservice. The port of - // txservice is set as redis port add "10000". eg.6379->16379 - if (DataSubstrate::GetGlobal()->GetNetworkConfig().bind_all) - { - redis_ip_port = "0.0.0.0:" + std::to_string(redis_port_); - } - else - { - redis_ip_port = local_ip + ":" + std::to_string(redis_port_); - } - - DLOG(INFO) << "Local EloqKv server ip port: " << redis_ip_port; - - if (DataSubstrate::GetGlobal()->GetCoreConfig().bootstrap) - { - Stop(); - DataSubstrate::GetGlobal()->Shutdown(); - - LOG(INFO) << "bootstrap done !!!"; - - if (!FLAGS_alsologtostderr) - { - std::cout << "bootstrap done !!!" << std::endl; - } - -#if BRPC_WITH_GLOG - google::ShutdownGoogleLogging(); -#endif - - exit(0); - } - - /* Initialize metrics registery and register metrics */ - stopping_indicator_.store(false, std::memory_order_release); - - if (metrics::enable_metrics) - { - redis_cmd_current_rounds_.resize(core_num_); - for (auto &vec : redis_cmd_current_rounds_) - { - vec.resize(command_types.size() + 10, 1); - } - } - - if (metrics::enable_metrics) - { - metrics_collector_thd_ = - std::thread(&RedisServiceImpl::CollectConnectionsMetrics, - this, - std::ref(brpc_server)); - } - enable_redis_stats_ = !CheckCommandLineFlagIsDefault("enable_redis_stats") ? FLAGS_enable_redis_stats @@ -487,17 +437,132 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) vector_index_worker_pool_ = std::make_unique(vector_index_worker_num); } + // Vector handler initialization moved to Start() since it requires tx_service_ +#endif + + return true; +} + +bool RedisServiceImpl::Start(brpc::Server &brpc_server) +{ + auto &ds = DataSubstrate::Instance(); + + // Attach to TxService and DataStoreHandler now that DataSubstrate is started. + + tx_service_ = ds.GetTxService(); + if (tx_service_ == nullptr) + { + LOG(ERROR) << "Error: TxService is not initialized."; + return false; + } + bool enable_store = + ds.GetCoreConfig().enable_data_store; + store_hd_ = ds.GetStoreHandler(); + if (enable_store && store_hd_ == nullptr) + { + LOG(ERROR) << "Error: DataStoreHandler is not initialized."; + return false; + } + core_num_ = ds.GetCoreConfig().core_num; + if (core_num_ == 0) + { + LOG(ERROR) << "Error: core_num is 0."; + return false; + } + node_memory_limit_mb_ = FLAGS_node_memory_limit_mb; - // Initialize vector handler - EloqVec::CloudConfig vector_cloud_config(config_reader); - if (!EloqVec::VectorHandler::InitHandlerInstance( + // Size slow_log_* structures based on core_num_ + slow_log_mutexes_.clear(); + for (size_t core_idx = 0; core_idx < core_num_; ++core_idx) + { + slow_log_mutexes_.emplace_back(std::make_unique()); + } + + slow_log_.resize(core_num_); + next_slow_log_idx_.resize(core_num_, 0); + for (size_t core_idx = 0; core_idx < core_num_; ++core_idx) + { + // high 16 bits: core_id + // low 48 bits: counter + next_slow_log_unique_id_.push_back(core_idx << 48); + } + + slow_log_len_.resize(core_num_, 0); + + // Each task group will maintain its own slow log to avoid race + // condition. + for (uint32_t i = 0; i < core_num_; ++i) + { + slow_log_[i].resize(slow_log_max_length_); + } + + // Compute redis_ip_port using network config + std::string local_ip = ds.GetNetworkConfig().local_ip; + if (ds.GetNetworkConfig().bind_all) + { + redis_ip_port = "0.0.0.0:" + std::to_string(redis_port_); + } + else + { + redis_ip_port = local_ip + ":" + std::to_string(redis_port_); + } + + DLOG(INFO) << "Local EloqKv server ip port: " << redis_ip_port; + + // Check bootstrap and handle it + if (ds.GetCoreConfig().bootstrap) + { + Stop(); + ds.Shutdown(); + + LOG(INFO) << "bootstrap done !!!"; + + if (!FLAGS_alsologtostderr) + { + std::cout << "bootstrap done !!!" << std::endl; + } + +#if BRPC_WITH_GLOG + google::ShutdownGoogleLogging(); +#endif + + exit(0); + } + + // Metrics-related initialization that depends on DataSubstrate::InitializeMetrics + stopping_indicator_.store(false, std::memory_order_release); + + if (metrics::enable_metrics) + { + redis_cmd_current_rounds_.resize(core_num_); + for (auto &vec : redis_cmd_current_rounds_) + { + vec.resize(command_types.size() + 10, 1); + } + + // The metrics registry and redis_meter are already created by DataSubstrate. + // We only need to start our collector thread here. + metrics_collector_thd_ = + std::thread(&RedisServiceImpl::CollectConnectionsMetrics, + this, + std::ref(brpc_server)); + } + +#ifdef VECTOR_INDEX_ENABLED + // Initialize vector handler now that tx_service_ is available + if (vector_index_worker_pool_ != nullptr) + { + INIReader config_reader(config_file_); + EloqVec::CloudConfig vector_cloud_config(config_reader); + if (!EloqVec::VectorHandler::InitHandlerInstance( tx_service_, vector_index_worker_pool_.get(), eloq_data_path, &vector_cloud_config)) - { - LOG(ERROR) << "Failed to initialize vector handler instance"; - return false; + { + LOG(ERROR) << "Failed to initialize vector handler instance"; + return false; + } } #endif @@ -5985,7 +6050,8 @@ metrics::Meter *RedisServiceImpl::GetMeter(std::size_t core_id) const size_t RedisServiceImpl::MaxConnectionCount() const { - return DataSubstrate::GetGlobal()->GetCoreConfig().maxclients; + auto &ds = DataSubstrate::Instance(); + return ds.GetCoreConfig().maxclients; } } // namespace EloqKV From 63dc8cdf06fc6827569672d44fa18e6b147fad06 Mon Sep 17 00:00:00 2001 From: liunyl Date: Wed, 3 Dec 2025 07:34:15 +0000 Subject: [PATCH 2/2] fix skip_wal_ not set --- src/redis_service.cpp | 63 ++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/src/redis_service.cpp b/src/redis_service.cpp index a867528f..198345d7 100644 --- a/src/redis_service.cpp +++ b/src/redis_service.cpp @@ -188,7 +188,8 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) databases = config_reader.GetInteger("local", "databases", 16); - // Prebuilt Redis tables (same names as today: data_table_0 .. data_table_15) + // Prebuilt Redis tables (same names as today: data_table_0 .. + // data_table_15) std::vector> prebuilt_tables; prebuilt_tables.reserve(databases); @@ -222,28 +223,26 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) { std::vector label_groups = {{"type", {cmd}}}; - engine_metrics.push_back(std::make_tuple( - metrics::NAME_REDIS_COMMAND_DURATION, - metrics::Type::Histogram, - label_groups)); - engine_metrics.push_back(std::make_tuple( - metrics::NAME_REDIS_COMMAND_TOTAL, - metrics::Type::Counter, - label_groups)); + engine_metrics.push_back( + std::make_tuple(metrics::NAME_REDIS_COMMAND_DURATION, + metrics::Type::Histogram, + label_groups)); + engine_metrics.push_back( + std::make_tuple(metrics::NAME_REDIS_COMMAND_TOTAL, + metrics::Type::Counter, + label_groups)); } for (const auto &access_type : {"read", "write"}) { - engine_metrics.push_back( - std::make_tuple(metrics::NAME_REDIS_COMMAND_AGGREGATED_TOTAL, - metrics::Type::Counter, - std::vector{ - {"access_type", {access_type}}})); - engine_metrics.push_back( - std::make_tuple(metrics::NAME_REDIS_COMMAND_AGGREGATED_DURATION, - metrics::Type::Histogram, - std::vector{ - {"access_type", {access_type}}})); + engine_metrics.push_back(std::make_tuple( + metrics::NAME_REDIS_COMMAND_AGGREGATED_TOTAL, + metrics::Type::Counter, + std::vector{{"access_type", {access_type}}})); + engine_metrics.push_back(std::make_tuple( + metrics::NAME_REDIS_COMMAND_AGGREGATED_DURATION, + metrics::Type::Histogram, + std::vector{{"access_type", {access_type}}})); } if (!ds.RegisterEngine(txservice::TableEngine::EloqKv, @@ -437,7 +436,8 @@ bool RedisServiceImpl::Init(brpc::Server &brpc_server) vector_index_worker_pool_ = std::make_unique(vector_index_worker_num); } - // Vector handler initialization moved to Start() since it requires tx_service_ + // Vector handler initialization moved to Start() since it requires + // tx_service_ #endif return true; @@ -447,7 +447,8 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) { auto &ds = DataSubstrate::Instance(); - // Attach to TxService and DataStoreHandler now that DataSubstrate is started. + // Attach to TxService and DataStoreHandler now that DataSubstrate is + // started. tx_service_ = ds.GetTxService(); if (tx_service_ == nullptr) @@ -455,8 +456,7 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) LOG(ERROR) << "Error: TxService is not initialized."; return false; } - bool enable_store = - ds.GetCoreConfig().enable_data_store; + bool enable_store = ds.GetCoreConfig().enable_data_store; store_hd_ = ds.GetStoreHandler(); if (enable_store && store_hd_ == nullptr) { @@ -470,6 +470,8 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) return false; } node_memory_limit_mb_ = FLAGS_node_memory_limit_mb; + skip_kv_ = !ds.GetCoreConfig().enable_data_store; + skip_wal_ = !ds.GetCoreConfig().enable_wal; // Size slow_log_* structures based on core_num_ slow_log_mutexes_.clear(); @@ -529,7 +531,8 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) exit(0); } - // Metrics-related initialization that depends on DataSubstrate::InitializeMetrics + // Metrics-related initialization that depends on + // DataSubstrate::InitializeMetrics stopping_indicator_.store(false, std::memory_order_release); if (metrics::enable_metrics) @@ -540,8 +543,8 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) vec.resize(command_types.size() + 10, 1); } - // The metrics registry and redis_meter are already created by DataSubstrate. - // We only need to start our collector thread here. + // The metrics registry and redis_meter are already created by + // DataSubstrate. We only need to start our collector thread here. metrics_collector_thd_ = std::thread(&RedisServiceImpl::CollectConnectionsMetrics, this, @@ -555,10 +558,10 @@ bool RedisServiceImpl::Start(brpc::Server &brpc_server) INIReader config_reader(config_file_); EloqVec::CloudConfig vector_cloud_config(config_reader); if (!EloqVec::VectorHandler::InitHandlerInstance( - tx_service_, - vector_index_worker_pool_.get(), - eloq_data_path, - &vector_cloud_config)) + tx_service_, + vector_index_worker_pool_.get(), + ds.GetCoreConfig().data_path, + &vector_cloud_config)) { LOG(ERROR) << "Failed to initialize vector handler instance"; return false;