diff --git a/include/log_shipping_agent.h b/include/log_shipping_agent.h index e6aca6c..b0eea25 100644 --- a/include/log_shipping_agent.h +++ b/include/log_shipping_agent.h @@ -143,6 +143,7 @@ class LogShippingAgent finish_msg->set_log_group_id(log_group_id_); finish_msg->set_latest_txn_no(latest_txn_no_); finish_msg->set_last_ckpt_ts(last_ckpt_ts_); + finish_msg->set_max_ts_in_log(max_ts_in_log_); int eagain = 0; if (SendMessage(replay_msg, iobuf_, true, eagain) != 0) @@ -399,11 +400,16 @@ class LogShippingAgent int idx = 0; int cnt = 0; int eagain = 0; + uint64_t ddl_max_ts = last_ckpt_ts_; for (iterator->SeekToDDLFirst(); iterator->ValidDDL(); iterator->NextDDL(), idx++) { cnt++; const Item &item = iterator->GetDDLItem(); + if (item.timestamp_ > ddl_max_ts) + { + ddl_max_ts = item.timestamp_; + } if (item.item_type_ == LogItemType::ClusterScaleLog) { ReplayClusterScaleMsg *scale_msg = @@ -459,7 +465,6 @@ class LogShippingAgent { int eagain = 0; int cnt = 0; - int msg_cnt = 0; ReplayMessage replay_msg; replay_msg.set_cc_node_group_id(cc_node_group_id_); replay_msg.set_cc_node_group_term(cc_node_group_term_); @@ -507,7 +512,6 @@ class LogShippingAgent err, std::memory_order_relaxed); return; } - msg_cnt++; replay_msg.clear_binary_log_records(); } @@ -530,7 +534,6 @@ class LogShippingAgent data_log_send_err.store(err, std::memory_order_relaxed); return; } - msg_cnt++; // Store results for this thread thread_results[i].max_ts = thd_max_ts; @@ -546,22 +549,27 @@ class LogShippingAgent thd.join(); } - // Update latest_txn_no_ based on the thread with the max timestamp - uint64_t global_max_ts = last_ckpt_ts_; + // Aggregate max timestamp across data-log threads and DDL pass + uint64_t data_global_max_ts = last_ckpt_ts_; uint32_t global_latest_txn_no = 0; for (const auto &[max_ts, latest_txn_no] : thread_results) { - if (max_ts > global_max_ts) + if (max_ts > data_global_max_ts) { - global_max_ts = max_ts; + data_global_max_ts = max_ts; global_latest_txn_no = latest_txn_no; } } - if (global_max_ts > last_ckpt_ts_) + // Only update latest_txn_no_ based on data logs; DDLs have no txn id + if (data_global_max_ts > last_ckpt_ts_) { assert(latest_txn_no_ <= global_latest_txn_no); latest_txn_no_ = global_latest_txn_no; } + // Final per-log-group max commit ts includes both DDL and data logs + uint64_t final_max_ts = + (ddl_max_ts > data_global_max_ts) ? ddl_max_ts : data_global_max_ts; + max_ts_in_log_ = final_max_ts; return data_log_send_err.load(std::memory_order_relaxed); } @@ -648,6 +656,7 @@ class LogShippingAgent std::condition_variable to_send_cv_; uint32_t latest_txn_no_{}; uint64_t last_ckpt_ts_{}; + uint64_t max_ts_in_log_{}; bool start_with_replay_; butil::IOBuf iobuf_; diff --git a/tx-log-protos b/tx-log-protos index d066029..19bd827 160000 --- a/tx-log-protos +++ b/tx-log-protos @@ -1 +1 @@ -Subproject commit d066029c2731d3c10d174b15c3130f3d7a32ad6b +Subproject commit 19bd8277219c7440a5b26a45b1ce788c827f4aa9