diff --git a/store_handler/eloq_data_store_service/data_store.h b/store_handler/eloq_data_store_service/data_store.h index dad914b1..65bd19e3 100644 --- a/store_handler/eloq_data_store_service/data_store.h +++ b/store_handler/eloq_data_store_service/data_store.h @@ -62,7 +62,7 @@ class DataStore * @param term The term value to use when starting the database. * @return True if start successfully, otherwise false. */ - virtual bool StartDB(int64_t term) = 0; + virtual bool StartDB(int64_t term, uint32_t shard_id) = 0; /** * @brief Close the data store. diff --git a/store_handler/eloq_data_store_service/data_store_service.cpp b/store_handler/eloq_data_store_service/data_store_service.cpp index c6d10e37..68acbda3 100644 --- a/store_handler/eloq_data_store_service/data_store_service.cpp +++ b/store_handler/eloq_data_store_service/data_store_service.cpp @@ -425,7 +425,7 @@ bool DataStoreService::ConnectAndStartDataStore(uint32_t data_shard_id, return false; } - res = shard_ref.data_store_->StartDB(term); + res = shard_ref.data_store_->StartDB(term, data_shard_id); if (!res) { LOG(ERROR) << "Failed to start db instance in data store service"; diff --git a/store_handler/eloq_data_store_service/eloq_store_config.cpp b/store_handler/eloq_data_store_service/eloq_store_config.cpp index 66844620..b6592252 100644 --- a/store_handler/eloq_data_store_service/eloq_store_config.cpp +++ b/store_handler/eloq_data_store_service/eloq_store_config.cpp @@ -153,6 +153,10 @@ DEFINE_uint32(eloq_store_max_cloud_concurrency, DEFINE_uint32(eloq_store_cloud_request_threads, 1, "EloqStore cloud request thread number"); +DEFINE_uint32(eloq_store_max_global_request_batch, + 1000, + "EloqStore maximum number of requests processed in one global " + "batch."); DEFINE_uint32(eloq_store_max_write_concurrency, 0, "EloqStore maximum number of concurrent write tasks per shard; " @@ -802,6 +806,13 @@ EloqStoreConfig::EloqStoreConfig(const INIReader &config_reader, : config_reader.GetInteger("store", "eloq_store_cloud_request_threads", FLAGS_eloq_store_cloud_request_threads); + eloqstore_configs_.max_global_request_batch = + !CheckCommandLineFlagIsDefault("eloq_store_max_global_request_batch") + ? FLAGS_eloq_store_max_global_request_batch + : config_reader.GetInteger( + "store", + "eloq_store_max_global_request_batch", + FLAGS_eloq_store_max_global_request_batch); eloqstore_configs_.max_write_concurrency = !CheckCommandLineFlagIsDefault("eloq_store_max_write_concurrency") ? FLAGS_eloq_store_max_write_concurrency diff --git a/store_handler/eloq_data_store_service/eloq_store_data_store.h b/store_handler/eloq_data_store_service/eloq_store_data_store.h index 0ba872b8..bad9bf51 100644 --- a/store_handler/eloq_data_store_service/eloq_store_data_store.h +++ b/store_handler/eloq_data_store_service/eloq_store_data_store.h @@ -177,9 +177,10 @@ class EloqStoreDataStore : public DataStore bool Initialize() override; - bool StartDB(int64_t term) override + bool StartDB(int64_t term, uint32_t data_shard_id = 0) override { - ::eloqstore::KvError res = eloq_store_service_->Start(term); + ::eloqstore::KvError res = + eloq_store_service_->Start(term, data_shard_id); if (res != ::eloqstore::KvError::NoError) { LOG(ERROR) << "EloqStore start failed with error code: " diff --git a/store_handler/eloq_data_store_service/eloqstore b/store_handler/eloq_data_store_service/eloqstore index 24483fbc..71bc06da 160000 --- a/store_handler/eloq_data_store_service/eloqstore +++ b/store_handler/eloq_data_store_service/eloqstore @@ -1 +1 @@ -Subproject commit 24483fbc9fbb53f608a13aeea72f2c0db1804544 +Subproject commit 71bc06dab527d1c134a7d0e5d13a00f68f5f0980 diff --git a/tx_service/src/remote/cc_node_service.cpp b/tx_service/src/remote/cc_node_service.cpp index 9278648d..8f3d02c7 100644 --- a/tx_service/src/remote/cc_node_service.cpp +++ b/tx_service/src/remote/cc_node_service.cpp @@ -1849,6 +1849,16 @@ void CcNodeService::UpdateStandbyCkptTs( DLOG(INFO) << "Receive UpdateStandbyCkptTs req, req ckpt ts:" << request->primary_succ_ckpt_ts() << ", has_data_store_write: " << (int) request->has_data_store_write(); + auto standby_node_term = Sharder::Instance().StandbyNodeTerm(); + if (standby_node_term == -1 || + (standby_node_term >> 32) != request->ng_term()) + { + DLOG(INFO) << "Discard UpdateStandbyCkptTs req, req ckpt ts:" + << request->primary_succ_ckpt_ts() + << ", standby_node_term=" << standby_node_term; + response->set_error(true); + return; + } if (request->primary_succ_ckpt_ts() <= Sharder::Instance().NativeNodeGroupCkptTs()) { @@ -2051,7 +2061,7 @@ void CcNodeService::RequestSyncSnapshot( const uint32_t local_node_id = Sharder::Instance().NodeId(); DLOG(INFO) << "RequestSyncSnapshot RPC received, ng_id=" << request->ng_id() << ", snapshot_ts=" << request->snapshot_ts() - << ", local_node_id=" << local_node_id << ", role=follower"; + << ", local_node_id=" << local_node_id; auto store_hd = Sharder::Instance().GetLocalCcShards()->store_hd_; if (!store_hd) {