Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions include/log_shipping_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ class LogShippingAgent
finish_msg->set_log_group_id(log_group_id_);
finish_msg->set_latest_txn_no(latest_txn_no_);
finish_msg->set_last_ckpt_ts(last_ckpt_ts_);
finish_msg->set_max_ts_in_log(max_ts_in_log_);

int eagain = 0;
if (SendMessage(replay_msg, iobuf_, true, eagain) != 0)
Expand Down Expand Up @@ -399,11 +400,16 @@ class LogShippingAgent
int idx = 0;
int cnt = 0;
int eagain = 0;
uint64_t ddl_max_ts = last_ckpt_ts_;
for (iterator->SeekToDDLFirst(); iterator->ValidDDL();
iterator->NextDDL(), idx++)
{
cnt++;
const Item &item = iterator->GetDDLItem();
if (item.timestamp_ > ddl_max_ts)
{
ddl_max_ts = item.timestamp_;
}
if (item.item_type_ == LogItemType::ClusterScaleLog)
{
ReplayClusterScaleMsg *scale_msg =
Expand Down Expand Up @@ -459,7 +465,6 @@ class LogShippingAgent
{
int eagain = 0;
int cnt = 0;
int msg_cnt = 0;
ReplayMessage replay_msg;
replay_msg.set_cc_node_group_id(cc_node_group_id_);
replay_msg.set_cc_node_group_term(cc_node_group_term_);
Expand Down Expand Up @@ -507,7 +512,6 @@ class LogShippingAgent
err, std::memory_order_relaxed);
return;
}
msg_cnt++;
replay_msg.clear_binary_log_records();
}

Expand All @@ -530,7 +534,6 @@ class LogShippingAgent
data_log_send_err.store(err, std::memory_order_relaxed);
return;
}
msg_cnt++;

// Store results for this thread
thread_results[i].max_ts = thd_max_ts;
Expand All @@ -546,22 +549,27 @@ class LogShippingAgent
thd.join();
}

// Update latest_txn_no_ based on the thread with the max timestamp
uint64_t global_max_ts = last_ckpt_ts_;
// Aggregate max timestamp across data-log threads and DDL pass
uint64_t data_global_max_ts = last_ckpt_ts_;
uint32_t global_latest_txn_no = 0;
for (const auto &[max_ts, latest_txn_no] : thread_results)
{
if (max_ts > global_max_ts)
if (max_ts > data_global_max_ts)
{
global_max_ts = max_ts;
data_global_max_ts = max_ts;
global_latest_txn_no = latest_txn_no;
}
}
if (global_max_ts > last_ckpt_ts_)
// Only update latest_txn_no_ based on data logs; DDLs have no txn id
if (data_global_max_ts > last_ckpt_ts_)
{
assert(latest_txn_no_ <= global_latest_txn_no);
latest_txn_no_ = global_latest_txn_no;
}
// Final per-log-group max commit ts includes both DDL and data logs
uint64_t final_max_ts =
(ddl_max_ts > data_global_max_ts) ? ddl_max_ts : data_global_max_ts;
max_ts_in_log_ = final_max_ts;

return data_log_send_err.load(std::memory_order_relaxed);
}
Expand Down Expand Up @@ -648,6 +656,7 @@ class LogShippingAgent
std::condition_variable to_send_cv_;
uint32_t latest_txn_no_{};
uint64_t last_ckpt_ts_{};
uint64_t max_ts_in_log_{};
bool start_with_replay_;

butil::IOBuf iobuf_;
Expand Down
2 changes: 1 addition & 1 deletion tx-log-protos
Submodule tx-log-protos updated 1 files
+1 −0 log.proto