From 666d554bbac45e02ee3acf2864476cabfffc0e45 Mon Sep 17 00:00:00 2001 From: lzxddz Date: Fri, 18 Jul 2025 12:45:21 +0800 Subject: [PATCH] store log in rocksdb groupby ng fix persist schema op --- include/log_shipping_agent.h | 12 +- include/log_state.h | 155 +++++++++++++++------ include/log_state_rocksdb_impl.h | 94 ++++++++----- src/log_state_rocksdb_impl.cpp | 229 +++++++++++++++++++------------ src/open_log_service.cpp | 8 +- src/open_log_task.cpp | 20 ++- 6 files changed, 334 insertions(+), 184 deletions(-) diff --git a/include/log_shipping_agent.h b/include/log_shipping_agent.h index 48c8eaf..884c3e5 100644 --- a/include/log_shipping_agent.h +++ b/include/log_shipping_agent.h @@ -39,10 +39,6 @@ namespace txlog { - -static int64_t DEFAULT_CC_NG_TERM = 1; -static uint32_t DEFAULT_CC_NG_ID = 0; - /* * Agent to ship redo log records to cc node leader, who is waiting for * uncheckpointed log record to replay. Shipping agent is implemented as a @@ -498,8 +494,12 @@ class LogShippingAgent msg_cnt++; replay_msg.clear_binary_log_records(); } - - AppendLogBlob(*log_records_blob, item); + // filter out log items not belong the + // cc_node_group_id_ + if (item.cc_ng_ == cc_node_group_id_) + { + AppendLogBlob(*log_records_blob, item); + } } } int err = SendMessage(replay_msg, buf, false, eagain); diff --git a/include/log_state.h b/include/log_state.h index 1b0bfff..d5aeac3 100644 --- a/include/log_state.h +++ b/include/log_state.h @@ -55,11 +55,13 @@ struct Item Item(uint64_t tx_number, uint64_t timestamp, std::string log_message, - LogItemType item_type) + LogItemType item_type, + uint32_t cc_ng = UINT32_MAX) : tx_number_(tx_number), timestamp_(timestamp), log_message_(std::move(log_message)), - item_type_(item_type) + item_type_(item_type), + cc_ng_(cc_ng) { } @@ -67,6 +69,7 @@ struct Item uint64_t timestamp_; std::string log_message_; LogItemType item_type_; + uint32_t cc_ng_; }; class ItemIterator @@ -115,29 +118,32 @@ class LogState LogState() = default; virtual ~LogState() = default; - virtual int AddLogItem(uint64_t tx_number, + virtual int AddLogItem(uint32_t cc_ng_id, + uint64_t tx_number, uint64_t timestamp, const std::string &log_message) = 0; virtual int AddLogItemBatch( - const std::vector> + const std::vector> &batch_logs) { int err = 0; - for (const auto &[tx, ts, log_message] : batch_logs) + for (const auto &[cc_ng_id, tx, ts, log_message] : batch_logs) { - err = AddLogItem(tx, ts, log_message); + err = AddLogItem(cc_ng_id, tx, ts, log_message); if (err != 0) + { break; + } } return err; } virtual std::pair> GetLogReplayList( - uint64_t start_timestamp) = 0; + uint32_t node_group_id, uint64_t start_timestamp) = 0; virtual std::pair SearchTxDataLog( - uint64_t tx_number, uint64_t lower_bound_ts = 0) = 0; + uint64_t tx_number, uint32_t cc_ng_id, uint64_t lower_bound_ts = 0) = 0; /** * Stores cc node's latest state. @@ -177,18 +183,15 @@ class LogState leader_ip_ = rhs.leader_ip_; leader_port_ = rhs.leader_port_; - latest_txn_no_.store( - rhs.latest_txn_no_.load(std::memory_order_relaxed)); - last_ckpt_ts_.store( - rhs.last_ckpt_ts_.load(std::memory_order_relaxed)); - + latest_txn_no_ = rhs.latest_txn_no_; + last_ckpt_ts_ = rhs.last_ckpt_ts_; return *this; } std::string leader_ip_; uint32_t leader_port_{}; - std::atomic latest_txn_no_{}; - std::atomic last_ckpt_ts_{}; + uint32_t latest_txn_no_{}; + uint64_t last_ckpt_ts_{}; }; /** @@ -493,51 +496,111 @@ class LogState } } - uint32_t LatestCommittedTxnNumber() const + uint32_t LatestCommittedTxnNumber(uint32_t cc_ng) const { - return cc_ng_info_.latest_txn_no_.load(std::memory_order_relaxed); + auto it = cc_ng_info_.find(cc_ng); + if (it == cc_ng_info_.end()) + { + return 0; + } + return it->second.latest_txn_no_; } - void UpdateLatestCommittedTxnNumber(uint32_t tx_ident) + void UpdateLatestCommittedTxnNumber(uint32_t tx_cc_ng, uint32_t tx_ident) { + // access different fields of node group info with RecoverTx RPC + // thread, no need to lock + std::shared_lock s_lk(log_state_mutex_); + + auto it = cc_ng_info_.find(tx_cc_ng); + if (it == cc_ng_info_.end()) + { + return; + } + CcNgInfo &info = it->second; + // to handle the situation that committed txn number wraps around // uint32, assuming that active txn numbers won't span half of // UINT32_MAX - if (tx_ident - cc_ng_info_.latest_txn_no_ < (UINT32_MAX >> 1)) + if (tx_ident - info.latest_txn_no_ < (UINT32_MAX >> 1)) { - cc_ng_info_.latest_txn_no_.store(tx_ident, - std::memory_order_relaxed); + // info.latest_txn_no_.store(tx_ident, std::memory_order_relaxed); + info.latest_txn_no_ = tx_ident; } } - void UpdateCkptTs(uint64_t timestamp) + void UpdateCkptTs(uint32_t cc_ng, uint64_t timestamp) { - std::unique_lock lk(log_state_mutex_); - if (timestamp > - cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed)) - { - uint32_t max_txn = - cc_ng_info_.latest_txn_no_.load(std::memory_order_relaxed); - int rc = PersistCkptAndMaxTxn(timestamp, max_txn); - while (rc != 0) + // this func is called when on_apply processing UpdateCkptTsRequest, + // might be concurrent with LastCkptTimestamp() in RecoverTx rpc + // thread + std::shared_lock s_lk(log_state_mutex_); + + auto it = cc_ng_info_.find(cc_ng); + if (it != cc_ng_info_.end()) + { + CcNgInfo &info = it->second; + if (timestamp > info.last_ckpt_ts_) { - rc = PersistCkptAndMaxTxn(timestamp, max_txn); + info.last_ckpt_ts_ = timestamp; + bool updated = UpdateMinCkptTsOfAllNodeGroups(); + if (updated) + { + TryCleanMultiStageOps(); + } + + auto cc_ng_info = GetCopyOfCcNgInfo(); + s_lk.unlock(); + + int rc = PersistCkptAndMaxTxn(cc_ng_info); + while (rc != 0) + { + rc = PersistCkptAndMaxTxn(cc_ng_info); + } } - cc_ng_info_.last_ckpt_ts_.store(timestamp, - std::memory_order_release); TryCleanMultiStageOps(); } } - uint64_t LastCkptTimestamp() + bool UpdateMinCkptTsOfAllNodeGroups() + { + uint64_t min_ts = UINT64_MAX; + for (const auto &[ng_id, ng_info] : cc_ng_info_) + { + min_ts = std::min(min_ts, ng_info.last_ckpt_ts_); + } + if (min_ts > min_ckpt_ts_) + { + min_ckpt_ts_ = min_ts; + return true; + } + return false; + } + + uint64_t LastCkptTimestamp(uint32_t cc_ng_id) { - return cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed); + // this func is also called in RecoverTx rpc thread, which might be + // concurrent with state machine's on_apply when processing + // ReplayLogRequest + std::shared_lock s_lk(log_state_mutex_); + + auto iter = cc_ng_info_.find(cc_ng_id); + if (iter == cc_ng_info_.end()) + { + return 0; + } + else + { + return iter->second.last_ckpt_ts_; + } } - CcNgInfo &GetCcNgInfo() + const std::unordered_map GetCopyOfCcNgInfo() const { - return cc_ng_info_; + std::shared_lock s_lk(log_state_mutex_); + std::unordered_map cc_ng_info_copy = cc_ng_info_; + return cc_ng_info_copy; } virtual uint64_t GetApproximateReplayLogSize() @@ -603,8 +666,7 @@ class LogState */ void TryCleanMultiStageOps() { - uint64_t ckpt_ts = - cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed); + uint64_t ckpt_ts = min_ckpt_ts_; using namespace std::chrono_literals; uint64_t one_hour = std::chrono::microseconds(1h).count(); for (auto it = tx_catalog_ops_.begin(); it != tx_catalog_ops_.end();) @@ -682,9 +744,14 @@ class LogState virtual int DeleteRangeOp(uint64_t txn, uint64_t timestamp) = 0; - virtual int PersistCkptAndMaxTxn(uint64_t ckpt_ts, uint32_t max_txn) = 0; + virtual int PersistCkptAndMaxTxn( + const std::unordered_map &ng_infos) = 0; + + // CcNgInfo cc_ng_info_; + std::unordered_map cc_ng_info_; - CcNgInfo cc_ng_info_; + // to erase finished schema ops and split range ops after one hour + std::atomic_uint64_t min_ckpt_ts_{}; struct CatalogOp { @@ -734,7 +801,8 @@ class LogState bool all_cleaned = std::all_of(schemas_op_msg_.begin(), schemas_op_msg_.end(), - [](const SchemaOpMessage &schema_op_msg) { + [](const SchemaOpMessage &schema_op_msg) + { return schema_op_msg.stage() == SchemaOpMessage_Stage_CleanSchema; }); @@ -747,7 +815,8 @@ class LogState bool all_committed = std::all_of(schemas_op_msg_.begin(), schemas_op_msg_.end(), - [](const SchemaOpMessage &schema_op_msg) { + [](const SchemaOpMessage &schema_op_msg) + { return schema_op_msg.stage() == SchemaOpMessage_Stage_CommitSchema; }); diff --git a/include/log_state_rocksdb_impl.h b/include/log_state_rocksdb_impl.h index 4ccdf9c..7bcb477 100644 --- a/include/log_state_rocksdb_impl.h +++ b/include/log_state_rocksdb_impl.h @@ -53,9 +53,10 @@ using current_time_func = std::function; namespace txlog { -// commit_ts(8)----tx_number(8) -inline void Serialize(std::array &res, +// Serialize DML op key: commit_ts(8)--ng_id(4)--tx_number(8) +inline void Serialize(std::array &res, uint64_t timestamp, + uint32_t ng_id, uint64_t tx_number) { char *p = res.data(); @@ -63,12 +64,36 @@ inline void Serialize(std::array &res, std::memcpy(p, &ts_be, sizeof(uint64_t)); p += sizeof(uint64_t); + uint32_t ng_id_be = __builtin_bswap32(ng_id); + std::memcpy(p, &ng_id_be, sizeof(uint32_t)); + p += sizeof(uint32_t); uint64_t tx_no_be = __builtin_bswap64(tx_number); std::memcpy(p, &tx_no_be, sizeof(uint64_t)); } -// tx_number(8)----commit_ts(8)----code(1) +inline void Deserialize(rocksdb::Slice key, + uint64_t ×tamp, + uint32_t &ng_id, + uint64_t &tx_number) +{ + assert(key.size() == 20); + const char *p = key.data(); + uint64_t ts_be, tx_no_be; + std::memcpy(&ts_be, p, sizeof(uint64_t)); + timestamp = __builtin_bswap64(ts_be); + + p += sizeof(uint64_t); + uint32_t ng_id_be; + std::memcpy(&ng_id_be, p, sizeof(uint32_t)); + ng_id = __builtin_bswap32(ng_id_be); + + p += sizeof(uint32_t); + std::memcpy(&tx_no_be, p, sizeof(uint64_t)); + tx_number = __builtin_bswap64(tx_no_be); +} + +// Serialize DDL op key: tx_number(8)----commit_ts(8)----code(1) inline void Serialize(std::array &res, uint64_t timestamp, uint64_t tx_number, @@ -87,21 +112,6 @@ inline void Serialize(std::array &res, res[res.size() - 1] = (char) code; } -inline void Deserialize(rocksdb::Slice key, - uint64_t ×tamp, - uint64_t &tx_number) -{ - assert(key.size() == 16); - const char *p = key.data(); - uint64_t ts_be, tx_no_be; - std::memcpy(&ts_be, p, sizeof(uint64_t)); - timestamp = __builtin_bswap64(ts_be); - p += sizeof(uint64_t); - - std::memcpy(&tx_no_be, p, sizeof(uint64_t)); - tx_number = __builtin_bswap64(tx_no_be); -} - inline void Deserialize( rocksdb::Slice key, rocksdb::Slice value, @@ -165,13 +175,14 @@ class ItemIteratorRocksDBImpl : public ItemIterator #else rocksdb::DB *db, #endif - uint64_t start_ts) + uint64_t start_ts, + uint32_t target_ng) : ItemIterator(std::move(ddl_list)), db_(db), start_key_(start_key_storage_.data(), start_key_storage_.size()), worker_num_(worker_num) { - Serialize(start_key_storage_, start_ts, 0); + Serialize(start_key_storage_, start_ts, target_ng, 0); rocksdb::ReadOptions read_options; // set iterate_lower_bound for read_options for better performance read_options.iterate_lower_bound = &start_key_; @@ -180,13 +191,14 @@ class ItemIteratorRocksDBImpl : public ItemIterator std::unique_ptr(db_->NewIterator(read_options)); uint64_t first_ts = 0; uint64_t last_ts = 0; + uint32_t tmp_ng; uint64_t tmp_txn; rocksdb_iterator_->SeekToFirst(); if (!rocksdb_iterator_->Valid()) { return; } - Deserialize(rocksdb_iterator_->key(), first_ts, tmp_txn); + Deserialize(rocksdb_iterator_->key(), first_ts, tmp_ng, tmp_txn); rocksdb_iterator_->SeekToLast(); if (!rocksdb_iterator_->Valid()) { @@ -194,7 +206,7 @@ class ItemIteratorRocksDBImpl : public ItemIterator } keys_storage_.reserve(worker_num_ * 2); keys_.reserve(worker_num_ * 2); - Deserialize(rocksdb_iterator_->key(), last_ts, tmp_txn); + Deserialize(rocksdb_iterator_->key(), last_ts, tmp_ng, tmp_txn); std::vector ts_list; ts_list.push_back(first_ts); auto gap = (last_ts - first_ts) / worker_num_; @@ -206,9 +218,9 @@ class ItemIteratorRocksDBImpl : public ItemIterator for (size_t i = 0; i < ts_list.size() - 1; i++) { rocksdb::ReadOptions read_options; - std::array start_key{}; + std::array start_key{}; // range start: current_ts, target_ng, txn number 0 - Serialize(start_key, ts_list[i], 0); + Serialize(start_key, ts_list[i], target_ng, 0); keys_storage_.push_back(start_key); keys_.push_back(rocksdb::Slice(keys_storage_.back().data(), keys_storage_.back().size())); @@ -216,9 +228,9 @@ class ItemIteratorRocksDBImpl : public ItemIterator if (i != ts_list.size() - 2) { - std::array end_key{}; + std::array end_key{}; // range end: next_ts - 1, target_ng, txn number UINT64_MAX - Serialize(end_key, ts_list[i + 1] - 1, UINT64_MAX); + Serialize(end_key, ts_list[i + 1] - 1, target_ng, UINT64_MAX); keys_storage_.push_back(end_key); keys_.push_back(rocksdb::Slice(keys_storage_.back().data(), keys_storage_.back().size())); @@ -272,13 +284,16 @@ class ItemIteratorRocksDBImpl : public ItemIterator { return *ddl_list_.at(ddl_idx_); } + uint32_t ng; uint64_t timestamp, tx_number; - Deserialize(rocksdb_iterator_->key(), timestamp, tx_number); + Deserialize(rocksdb_iterator_->key(), timestamp, ng, tx_number); + rocksdb::Slice value = rocksdb_iterator_->value(); item_.tx_number_ = tx_number; item_.timestamp_ = timestamp; item_.log_message_ = {value.data(), value.size()}; item_.item_type_ = LogItemType::DataLog; + item_.cc_ng_ = ng; return item_; }; @@ -313,13 +328,16 @@ class ItemIteratorRocksDBImpl : public ItemIterator const Item &GetItem(size_t idx) override { + uint32_t ng; uint64_t timestamp, tx_number; - Deserialize(rocksdb_iterators_[idx]->key(), timestamp, tx_number); + Deserialize(rocksdb_iterators_[idx]->key(), timestamp, ng, tx_number); + rocksdb::Slice value = rocksdb_iterators_[idx]->value(); items_[idx].tx_number_ = tx_number; items_[idx].timestamp_ = timestamp; items_[idx].log_message_ = {value.data(), value.size()}; items_[idx].item_type_ = LogItemType::DataLog; + items_[idx].cc_ng_ = ng; return items_[idx]; }; @@ -350,11 +368,11 @@ class ItemIteratorRocksDBImpl : public ItemIterator #else rocksdb::DB *db_; #endif - std::array start_key_storage_; + std::array start_key_storage_; rocksdb::Slice start_key_; std::unique_ptr rocksdb_iterator_; Item item_; - std::vector> keys_storage_; + std::vector> keys_storage_; std::vector keys_; std::vector> rocksdb_iterators_; std::vector items_; @@ -372,19 +390,22 @@ class LogStateRocksDBImpl : public LogState ~LogStateRocksDBImpl() override; - int AddLogItem(uint64_t tx_number, + int AddLogItem(uint32_t cc_ng_id, + uint64_t tx_number, uint64_t timestamp, const std::string &log_message) override; int AddLogItemBatch( - const std::vector> - &batch_logs); + const std::vector> + &batch_logs) override; std::pair> GetLogReplayList( - uint64_t start_timestamp) override; + uint32_t node_group_id, uint64_t start_timestamp) override; std::pair SearchTxDataLog( - uint64_t tx_number, uint64_t lower_bound_ts = 0) override; + uint64_t tx_number, + uint32_t cc_ng_id, + uint64_t lower_bound_ts = 0) override; /** * Start will be called in starting LogState. @@ -421,7 +442,8 @@ class LogStateRocksDBImpl : public LogState int DeleteRangeOp(uint64_t txn, uint64_t timestamp) override; - int PersistCkptAndMaxTxn(uint64_t ckpt_ts, uint32_t max_txn) override; + int PersistCkptAndMaxTxn( + const std::unordered_map &ng_infos) override; void StopRocksDB(); diff --git a/src/log_state_rocksdb_impl.cpp b/src/log_state_rocksdb_impl.cpp index 78c0b16..90fa10e 100644 --- a/src/log_state_rocksdb_impl.cpp +++ b/src/log_state_rocksdb_impl.cpp @@ -55,14 +55,15 @@ LogStateRocksDBImpl::~LogStateRocksDBImpl() } int LogStateRocksDBImpl::AddLogItemBatch( - const std::vector> &batch_logs) + const std::vector> + &batch_logs) { rocksdb::WriteBatch batch; - for (const auto &[tx_number, timestamp, log_message] : batch_logs) + for (const auto &[cc_ng_id, tx_number, timestamp, log_message] : batch_logs) { - std::array key{}; - Serialize(key, timestamp, tx_number); + std::array key{}; + Serialize(key, timestamp, cc_ng_id, tx_number); batch.Put(rocksdb::Slice(key.data(), key.size()), log_message); } @@ -77,12 +78,14 @@ int LogStateRocksDBImpl::AddLogItemBatch( return (int) status.code(); } -int LogStateRocksDBImpl::AddLogItem(uint64_t tx_number, +int LogStateRocksDBImpl::AddLogItem(uint32_t cc_ng_id, + uint64_t tx_number, uint64_t timestamp, const std::string &log_message) { - std::array key{}; - Serialize(key, timestamp, tx_number); + std::array key{}; + Serialize(key, timestamp, cc_ng_id, tx_number); + rocksdb::Status status = db_->Put( write_option_, rocksdb::Slice(key.data(), key.size()), log_message); if (!status.ok()) @@ -97,7 +100,8 @@ int LogStateRocksDBImpl::AddLogItem(uint64_t tx_number, } std::pair> -LogStateRocksDBImpl::GetLogReplayList(uint64_t start_timestamp) +LogStateRocksDBImpl::GetLogReplayList(uint32_t node_group_id, + uint64_t start_timestamp) { std::vector ddl_list; @@ -110,14 +114,17 @@ LogStateRocksDBImpl::GetLogReplayList(uint64_t start_timestamp) LOG(INFO) << "split_range_op_list size: " << rs_size; std::unique_ptr result = - std::make_unique( - rocksdb_scan_threads_, std::move(ddl_list), db_, start_timestamp); + std::make_unique(rocksdb_scan_threads_, + std::move(ddl_list), + db_, + start_timestamp, + node_group_id); return std::make_pair(true, std::move(result)); } std::pair LogStateRocksDBImpl::SearchTxDataLog( - uint64_t tx_number, uint64_t lower_bound_ts) + uint64_t tx_number, uint32_t ng_id, uint64_t lower_bound_ts) { LOG(INFO) << "log state search tx: " << tx_number << ", lower_bound_ts: " << lower_bound_ts; @@ -130,7 +137,7 @@ std::pair LogStateRocksDBImpl::SearchTxDataLog( } else { - start_ts = LastCkptTimestamp(); + start_ts = LastCkptTimestamp(ng_id); if (start_ts != 0) // 0 indicates no checkpoint happened { start_ts += 1; @@ -139,8 +146,8 @@ std::pair LogStateRocksDBImpl::SearchTxDataLog( LOG(INFO) << "iterate log records, start ts: " << start_ts; // iterate log records in range [start, limit) - std::array start_key{}; - Serialize(start_key, start_ts, 0); + std::array start_key{}; + Serialize(start_key, start_ts, ng_id, 0); rocksdb::Slice start(start_key.data(), start_key.size()); rocksdb::ReadOptions read_option; // set iterate_upper_bound for read_option for better performance @@ -159,9 +166,10 @@ std::pair LogStateRocksDBImpl::SearchTxDataLog( // key_sv is in form of timestamp(8) + tx_no(8) if (key_sv.compare(8, 8, target_txn.data(), 8) == 0) { + uint32_t ng; uint64_t ts; uint64_t tx_no; - Deserialize(it->key(), ts, tx_no); + Deserialize(it->key(), ts, ng, tx_no); LOG(INFO) << "Found matching key, tx_no: " << tx_no << ", timestamp: " << ts; ptr = std::make_shared( @@ -322,22 +330,20 @@ int LogStateRocksDBImpl::Start() meta_handle_, rocksdb::Slice(key.data(), key.size()), &value); - if (rc.ok()) - { - cc_ng_info_.last_ckpt_ts_ = *((uint64_t *) value.data()); - } - else if (rc.IsNotFound()) - { - cc_ng_info_.last_ckpt_ts_ = 0; - } - else + + uint32_t ng_id; + uint64_t ts; + size_t offset = 0; + char *ptr = value.data(); + while (offset < value.size()) { - LOG(ERROR) << "Failed to get last checkpoint timestamp from " - "rocksdb, rocksdb storage path: " - << rocksdb_storage_path_ - << ", error message: " << rc.ToString(); - assert(false); - return -1; + ng_id = *((uint32_t *) ptr); + ptr += sizeof(uint32_t); + offset += sizeof(uint32_t); + ts = *((uint64_t *) ptr); + ptr += sizeof(uint64_t); + offset += sizeof(uint64_t); + cc_ng_info_.try_emplace(ng_id, "", 0, 0, ts); } // latest_txn_no @@ -347,23 +353,18 @@ int LogStateRocksDBImpl::Start() meta_handle_, rocksdb::Slice(key.data(), key.size()), &value); - - if (rc.ok()) + ptr = value.data(); + offset = 0; + uint32_t txn_no; + while (offset < value.size()) { - cc_ng_info_.latest_txn_no_ = *((uint32_t *) value.data()); - } - else if (rc.IsNotFound()) - { - cc_ng_info_.latest_txn_no_ = 0; - } - else - { - LOG(ERROR) - << "Failed to get last txn from rocksdb, rocksdb storage path: " - << rocksdb_storage_path_ - << ", error message: " << rc.ToString(); - assert(false); - return -1; + ng_id = *((uint32_t *) ptr); + ptr += sizeof(uint32_t); + offset += sizeof(uint32_t); + txn_no = *((uint32_t *) ptr); + ptr += sizeof(uint32_t); + offset += sizeof(uint32_t); + cc_ng_info_[ng_id].latest_txn_no_ = txn_no; } // tx_catalog_ops and tx_split_range_ops_ @@ -430,9 +431,8 @@ int LogStateRocksDBImpl::Start() // The schema operation has been logged. Only // updates the stage. - assert(static_cast( - catalog_it->second.SchemaOpMsgCount()) == - new_schemas_op_msg.size()); + assert(catalog_it->second.SchemaOpMsgCount() == + static_cast(new_schemas_op_msg.size())); for (uint16_t idx = 0; idx < catalog_it->second.SchemaOpMsgCount(); @@ -537,8 +537,10 @@ void LogStateRocksDBImpl::StopRocksDB() void LogStateRocksDBImpl::PrintKey(rocksdb::Slice key) { uint64_t timestamp, tx_no; - Deserialize(key, timestamp, tx_no); - LOG(INFO) << "timestamp: " << timestamp << ",\ttx_number: " << tx_no; + uint32_t ng_id; + Deserialize(key, timestamp, ng_id, tx_no); + LOG(INFO) << "timestamp: " << timestamp << ",\tng_id: " << ng_id + << ",\ttx_number: " << tx_no; } /** @@ -671,13 +673,16 @@ void LogStateRocksDBImpl::PurgingSstFiles() << static_cast(sst_files_size_) / 1024 / 1024 << "MB, purge sst files"; - const auto &ng_info = GetCcNgInfo(); - // find last_ckpt_ts - uint64_t min_last_ckpt_ts = ng_info.last_ckpt_ts_; - if (min_last_ckpt_ts == 0) + const std::unordered_map ng_info_map = + GetCopyOfCcNgInfo(); + // find the minimum last_ckpt_ts from all cc_ngs + uint64_t min_last_ckpt_ts = UINT64_MAX; + uint32_t max_cc_ng_id = 0; + for (const auto &ng_info : ng_info_map) { - LOG(INFO) << "No checkpoint found, skip purge sst files"; - continue; + min_last_ckpt_ts = + std::min(min_last_ckpt_ts, ng_info.second.last_ckpt_ts_); + max_cc_ng_id = std::max(max_cc_ng_id, ng_info.first); } if (min_last_ckpt_ts <= last_purging_sst_ckpt_ts_) @@ -697,10 +702,10 @@ void LogStateRocksDBImpl::PurgingSstFiles() // all log entries before the min_last_ckpt_ts belongs to all cc // node group could be deleted min_last_ckpt_ts -= 1; - std::array start_key{}; - std::array end_key{}; - Serialize(start_key, 0, 0); - Serialize(end_key, min_last_ckpt_ts, 0); + std::array start_key{}; + std::array end_key{}; + Serialize(start_key, 0, 0, 0); + Serialize(end_key, min_last_ckpt_ts, max_cc_ng_id, 0); rocksdb::Slice start(start_key.data(), start_key.size()); rocksdb::Slice end(end_key.data(), end_key.size()); @@ -715,16 +720,19 @@ void LogStateRocksDBImpl::PurgingSstFiles() rocksdb::Slice largestkey(meta.largestkey); #ifndef NDEBUG uint64_t sk_ts, sk_tx_no; - assert(smallestkey.size() == 16); - Deserialize(smallestkey, sk_ts, sk_tx_no); + uint32_t sk_ng_id; + assert(smallestkey.size() == 20); + Deserialize(smallestkey, sk_ts, sk_ng_id, sk_tx_no); uint64_t lk_ts, lk_tx_no; - assert(largestkey.size() == 16); - Deserialize(largestkey, lk_ts, lk_tx_no); + uint32_t lk_ng_id; + assert(largestkey.size() == 20); + Deserialize(largestkey, lk_ts, lk_ng_id, lk_tx_no); DLOG(INFO) << "sst file: " << meta.name << ", size: " << meta.size << ", level: " << meta.level - << " smallest key: " << sk_ts << ", " << sk_tx_no - << " largest key: " << lk_ts << ", " << lk_tx_no; + << " smallest key: " << sk_ts << ", " << sk_ng_id + << ", " << sk_tx_no << " largest key: " << lk_ts << ", " + << lk_ng_id << ", " << lk_tx_no; #endif // without compaction, sst files must in level 0 @@ -772,16 +780,19 @@ void LogStateRocksDBImpl::PurgingSstFiles() rocksdb::Slice smallestkey(meta.smallestkey); rocksdb::Slice largestkey(meta.largestkey); uint64_t sk_ts, sk_tx_no; - assert(smallestkey.size() == 16); - Deserialize(smallestkey, sk_ts, sk_tx_no); + uint32_t sk_ng_id; + assert(smallestkey.size() == 20); + Deserialize(smallestkey, sk_ts, sk_ng_id, sk_tx_no); uint64_t lk_ts, lk_tx_no; - assert(largestkey.size() == 16); - Deserialize(largestkey, lk_ts, lk_tx_no); + uint32_t lk_ng_id; + assert(largestkey.size() == 20); + Deserialize(largestkey, lk_ts, lk_ng_id, lk_tx_no); DLOG(INFO) << "sst file: " << meta.name << ", size: " << meta.size << ", level: " << meta.level - << " smallest key: " << sk_ts << ", " << sk_tx_no - << " largest key: " << lk_ts << ", " << lk_tx_no; + << " smallest key: " << sk_ts << ", " << sk_ng_id + << ", " << sk_tx_no << " largest key: " << lk_ts << ", " + << lk_ng_id << ", " << lk_tx_no; } } #endif @@ -810,10 +821,24 @@ int LogStateRocksDBImpl::PersistSchemaOp(uint64_t txn, if (rc.IsNotFound()) { - schema_op_msg.SerializeToString(&schemas_op_str); + // schema_op_msg.SerializeToString(&schemas_op_str); + uint16_t cnt = 1; + schemas_op_str.append(reinterpret_cast(&cnt), sizeof(cnt)); + + std::string str = schema_op_msg.SerializeAsString(); + uint32_t len = str.size(); + schemas_op_str.append(reinterpret_cast(&len), sizeof(len)); + schemas_op_str += str; + + DLOG(INFO) << "schema_op_msg.SerializeToString===key-size:" + << key.size() << ", schema-size:" << schemas_op_str.size() + << ", tx:" << txn; } else { + DLOG(INFO) << "schema_op_msg found===key-size:" << key.size() + << ", schema-size:" << schemas_op_str.size() + << ", tx:" << txn; SchemaOpMessage schema_op_msg_stored; char *ptr = schemas_op_str.data(); @@ -837,8 +862,10 @@ int LogStateRocksDBImpl::PersistSchemaOp(uint64_t txn, } } - rc = db_->Put( - write_option_, rocksdb::Slice(key.data(), key.size()), schemas_op_str); + rc = db_->Put(write_option_, + meta_handle_, + rocksdb::Slice(key.data(), key.size()), + schemas_op_str); return rc.ok() ? 0 : rc.code(); } @@ -927,32 +954,53 @@ int LogStateRocksDBImpl::DeleteRangeOp(uint64_t txn, uint64_t timestamp) return rc.ok() ? 0 : rc.code(); } -int LogStateRocksDBImpl::PersistCkptAndMaxTxn(uint64_t ckpt_ts, - uint32_t max_txn) +int LogStateRocksDBImpl::PersistCkptAndMaxTxn( + const std::unordered_map &ng_infos) { - const char *ckpt_ptr = (const char *) (&ckpt_ts); - const char *txn_ptr = (const char *) (&max_txn); - std::array key; - Serialize( key, UINT64_MAX, UINT64_MAX, (uint8_t) LogState::MetaOp::LastCkpt); - rocksdb::Status rc = db_->Put(write_option_, - meta_handle_, - rocksdb::Slice(key.data(), key.size()), - rocksdb::Slice(ckpt_ptr, sizeof(uint64_t))); + std::string ckpt_str; + ckpt_str.reserve(ng_infos.size() * sizeof(uint32_t) + + ng_infos.size() * sizeof(uint64_t)); + for (const auto &ng_info : ng_infos) + { + ckpt_str.append(reinterpret_cast(&ng_info.first), + sizeof(uint32_t)); + ckpt_str.append( + reinterpret_cast(&ng_info.second.last_ckpt_ts_), + sizeof(uint64_t)); + } + + rocksdb::Status rc = + db_->Put(write_option_, + meta_handle_, + rocksdb::Slice(key.data(), key.size()), + rocksdb::Slice(ckpt_str.data(), ckpt_str.size())); if (!rc.ok()) { LOG(ERROR) << "PersistCkptAndMaxTxn (LastCkpt) failed, error: " - << rc.ToString() << ", ckpt_ts: " << ckpt_ts; + << rc.ToString(); return rc.code(); } Serialize(key, UINT64_MAX, UINT64_MAX, (uint8_t) LogState::MetaOp::MaxTxn); + std::string txn_str; + txn_str.reserve(ng_infos.size() * sizeof(uint32_t) + + ng_infos.size() * sizeof(uint32_t)); + for (const auto &ng_info : ng_infos) + { + txn_str.append(reinterpret_cast(&ng_info.first), + sizeof(uint32_t)); + txn_str.append( + reinterpret_cast(&ng_info.second.latest_txn_no_), + sizeof(uint32_t)); + } + rc = db_->Put(write_option_, meta_handle_, rocksdb::Slice(key.data(), key.size()), - rocksdb::Slice(txn_ptr, sizeof(uint32_t))); + rocksdb::Slice(txn_str.data(), txn_str.size())); return rc.ok() ? 0 : rc.code(); } @@ -982,10 +1030,11 @@ uint64_t LogStateRocksDBImpl::GetApproximateReplayLogSize() { rocksdb::Slice largestkey(meta.largestkey); uint64_t lk_ts, lk_tx_no; + uint32_t ng_id; assert(largestkey.size() == 16); - Deserialize(largestkey, lk_ts, lk_tx_no); + Deserialize(largestkey, lk_ts, ng_id, lk_tx_no); - if (cc_ng_info_.last_ckpt_ts_ <= lk_ts) + if (cc_ng_info_[ng_id].last_ckpt_ts_ <= lk_ts) { DLOG(INFO) << "SSTable " << meta.name << " used in replay with size: " diff --git a/src/open_log_service.cpp b/src/open_log_service.cpp index 94c2c20..cf750d4 100644 --- a/src/open_log_service.cpp +++ b/src/open_log_service.cpp @@ -26,6 +26,8 @@ namespace txlog { +static int64_t DEFAULT_CC_NG_TERM = 1; + thread_local size_t OpenLogServiceImpl::received_task_cnt_ = 0; void OpenLogServiceImpl::WriteLog(::google::protobuf::RpcController *controller, @@ -68,7 +70,7 @@ void OpenLogServiceImpl::ReplayLog( const std::string &cc_node_ip = req.source_ip(); uint16_t cc_node_port = req.source_port(); - uint64_t last_ckpt_ts = log_state_->LastCkptTimestamp(); + uint64_t last_ckpt_ts = log_state_->LastCkptTimestamp(cc_ng_id); // get log list since last_ckpt_ts+1 // 0 indicates no checkpoint happened uint64_t start_ts = last_ckpt_ts == 0 ? 0 : last_ckpt_ts + 1; @@ -78,7 +80,7 @@ void OpenLogServiceImpl::ReplayLog( } LOG(INFO) << "Start replaying from timestamp " << start_ts; - auto [ok, iterator] = log_state_->GetLogReplayList(start_ts); + auto [ok, iterator] = log_state_->GetLogReplayList(cc_ng_id, start_ts); if (!ok) { @@ -87,7 +89,7 @@ void OpenLogServiceImpl::ReplayLog( return; } - uint32_t latest_txn_no = log_state_->LatestCommittedTxnNumber(); + uint32_t latest_txn_no = log_state_->LatestCommittedTxnNumber(cc_ng_id); std::unique_lock lk(log_replay_workers_mutex_); diff --git a/src/open_log_task.cpp b/src/open_log_task.cpp index 349b3a9..a20043d 100644 --- a/src/open_log_task.cpp +++ b/src/open_log_task.cpp @@ -79,7 +79,8 @@ void OpenLogTaskWorker::WorkerThreadMain() task_cnt_.fetch_sub(count, std::memory_order_relaxed); // First pass: collect all data log writes from across tasks - std::vector> batch_logs; + std::vector> + batch_logs; // Keep track of which tasks have data logs to update their // responses later std::vector> data_log_tasks; @@ -111,7 +112,8 @@ void OpenLogTaskWorker::WorkerThreadMain() it != data_log.node_txn_logs().end(); ++it) { - batch_logs.emplace_back(txn, timestamp, it->second); + batch_logs.emplace_back( + it->first, txn, timestamp, it->second); } } } @@ -154,7 +156,11 @@ void OpenLogTaskWorker::WorkerThreadMain() // Update latest committed transaction number uint32_t tx_ident = req->write_log_request().txn_number() & 0xFFFFFFFF; - log_state_->UpdateLatestCommittedTxnNumber(tx_ident); + uint32_t high_half = + req->write_log_request().txn_number() >> 32L; + uint32_t ng_id = high_half >> 10; + log_state_->UpdateLatestCommittedTxnNumber(ng_id, + tx_ident); break; } } @@ -240,7 +246,7 @@ void OpenLogTaskWorker::HandleWriteLog(const WriteLogRequest &req, it != data_log.node_txn_logs().end(); ++it) { - err = log_state_->AddLogItem(txn, timestamp, it->second); + err = log_state_->AddLogItem(it->first, txn, timestamp, it->second); if (err != 0) break; } @@ -264,7 +270,9 @@ void OpenLogTaskWorker::HandleWriteLog(const WriteLogRequest &req, } uint32_t tx_ident = req.txn_number() & 0xFFFFFFFF; - log_state_->UpdateLatestCommittedTxnNumber(tx_ident); + uint32_t high_half = req.txn_number() >> 32L; + uint32_t ng_id = high_half >> 10; + log_state_->UpdateLatestCommittedTxnNumber(ng_id, tx_ident); if (err != 0) { @@ -280,7 +288,7 @@ void OpenLogTaskWorker::HandleWriteLog(const WriteLogRequest &req, void OpenLogTaskWorker::HandleUpdateCkptTs(const UpdateCheckpointTsRequest &req, LogResponse *resp) { - log_state_->UpdateCkptTs(req.ckpt_timestamp()); + log_state_->UpdateCkptTs(req.cc_node_group_id(), req.ckpt_timestamp()); resp->set_response_status( LogResponse::ResponseStatus::LogResponse_ResponseStatus_Success); }