Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions include/log_shipping_agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@

namespace txlog
{

static int64_t DEFAULT_CC_NG_TERM = 1;
static uint32_t DEFAULT_CC_NG_ID = 0;

/*
* Agent to ship redo log records to cc node leader, who is waiting for
* uncheckpointed log record to replay. Shipping agent is implemented as a
Expand Down Expand Up @@ -498,8 +494,12 @@ class LogShippingAgent
msg_cnt++;
replay_msg.clear_binary_log_records();
}

AppendLogBlob(*log_records_blob, item);
// filter out log items not belong the
// cc_node_group_id_
if (item.cc_ng_ == cc_node_group_id_)
{
AppendLogBlob(*log_records_blob, item);
}
}
}
int err = SendMessage(replay_msg, buf, false, eagain);
Expand Down
155 changes: 112 additions & 43 deletions include/log_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,21 @@ struct Item
Item(uint64_t tx_number,
uint64_t timestamp,
std::string log_message,
LogItemType item_type)
LogItemType item_type,
uint32_t cc_ng = UINT32_MAX)
: tx_number_(tx_number),
timestamp_(timestamp),
log_message_(std::move(log_message)),
item_type_(item_type)
item_type_(item_type),
cc_ng_(cc_ng)
{
}

uint64_t tx_number_;
uint64_t timestamp_;
std::string log_message_;
LogItemType item_type_;
uint32_t cc_ng_;
};

class ItemIterator
Expand Down Expand Up @@ -115,29 +118,32 @@ class LogState
LogState() = default;
virtual ~LogState() = default;

virtual int AddLogItem(uint64_t tx_number,
virtual int AddLogItem(uint32_t cc_ng_id,
uint64_t tx_number,
uint64_t timestamp,
const std::string &log_message) = 0;

virtual int AddLogItemBatch(
const std::vector<std::tuple<uint64_t, uint64_t, std::string>>
const std::vector<std::tuple<uint32_t, uint64_t, uint64_t, std::string>>
&batch_logs)
{
int err = 0;
for (const auto &[tx, ts, log_message] : batch_logs)
for (const auto &[cc_ng_id, tx, ts, log_message] : batch_logs)
{
err = AddLogItem(tx, ts, log_message);
err = AddLogItem(cc_ng_id, tx, ts, log_message);
if (err != 0)
{
break;
}
}
return err;
}

virtual std::pair<bool, std::unique_ptr<ItemIterator>> GetLogReplayList(
uint64_t start_timestamp) = 0;
uint32_t node_group_id, uint64_t start_timestamp) = 0;

virtual std::pair<bool, Item::Pointer> SearchTxDataLog(
uint64_t tx_number, uint64_t lower_bound_ts = 0) = 0;
uint64_t tx_number, uint32_t cc_ng_id, uint64_t lower_bound_ts = 0) = 0;

/**
* Stores cc node's latest state.
Expand Down Expand Up @@ -177,18 +183,15 @@ class LogState

leader_ip_ = rhs.leader_ip_;
leader_port_ = rhs.leader_port_;
latest_txn_no_.store(
rhs.latest_txn_no_.load(std::memory_order_relaxed));
last_ckpt_ts_.store(
rhs.last_ckpt_ts_.load(std::memory_order_relaxed));

latest_txn_no_ = rhs.latest_txn_no_;
last_ckpt_ts_ = rhs.last_ckpt_ts_;
return *this;
}

std::string leader_ip_;
uint32_t leader_port_{};
std::atomic<uint32_t> latest_txn_no_{};
std::atomic<uint64_t> last_ckpt_ts_{};
uint32_t latest_txn_no_{};
uint64_t last_ckpt_ts_{};
};

/**
Expand Down Expand Up @@ -493,51 +496,111 @@ class LogState
}
}

uint32_t LatestCommittedTxnNumber() const
uint32_t LatestCommittedTxnNumber(uint32_t cc_ng) const
{
return cc_ng_info_.latest_txn_no_.load(std::memory_order_relaxed);
auto it = cc_ng_info_.find(cc_ng);
if (it == cc_ng_info_.end())
{
return 0;
}
return it->second.latest_txn_no_;
}

void UpdateLatestCommittedTxnNumber(uint32_t tx_ident)
void UpdateLatestCommittedTxnNumber(uint32_t tx_cc_ng, uint32_t tx_ident)
{
// access different fields of node group info with RecoverTx RPC
// thread, no need to lock
std::shared_lock s_lk(log_state_mutex_);

auto it = cc_ng_info_.find(tx_cc_ng);
if (it == cc_ng_info_.end())
{
return;
}
CcNgInfo &info = it->second;

// to handle the situation that committed txn number wraps around
// uint32, assuming that active txn numbers won't span half of
// UINT32_MAX
if (tx_ident - cc_ng_info_.latest_txn_no_ < (UINT32_MAX >> 1))
if (tx_ident - info.latest_txn_no_ < (UINT32_MAX >> 1))
{
cc_ng_info_.latest_txn_no_.store(tx_ident,
std::memory_order_relaxed);
// info.latest_txn_no_.store(tx_ident, std::memory_order_relaxed);
info.latest_txn_no_ = tx_ident;
}
}

void UpdateCkptTs(uint64_t timestamp)
void UpdateCkptTs(uint32_t cc_ng, uint64_t timestamp)
{
std::unique_lock<std::shared_mutex> lk(log_state_mutex_);
if (timestamp >
cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed))
{
uint32_t max_txn =
cc_ng_info_.latest_txn_no_.load(std::memory_order_relaxed);
int rc = PersistCkptAndMaxTxn(timestamp, max_txn);
while (rc != 0)
// this func is called when on_apply processing UpdateCkptTsRequest,
// might be concurrent with LastCkptTimestamp() in RecoverTx rpc
// thread
std::shared_lock s_lk(log_state_mutex_);

auto it = cc_ng_info_.find(cc_ng);
if (it != cc_ng_info_.end())
{
CcNgInfo &info = it->second;
if (timestamp > info.last_ckpt_ts_)
{
rc = PersistCkptAndMaxTxn(timestamp, max_txn);
info.last_ckpt_ts_ = timestamp;
bool updated = UpdateMinCkptTsOfAllNodeGroups();
if (updated)
{
TryCleanMultiStageOps();
}

auto cc_ng_info = GetCopyOfCcNgInfo();
s_lk.unlock();

int rc = PersistCkptAndMaxTxn(cc_ng_info);
while (rc != 0)
{
rc = PersistCkptAndMaxTxn(cc_ng_info);
}
}

cc_ng_info_.last_ckpt_ts_.store(timestamp,
std::memory_order_release);
TryCleanMultiStageOps();
}
}

uint64_t LastCkptTimestamp()
bool UpdateMinCkptTsOfAllNodeGroups()
{
uint64_t min_ts = UINT64_MAX;
for (const auto &[ng_id, ng_info] : cc_ng_info_)
{
min_ts = std::min(min_ts, ng_info.last_ckpt_ts_);
}
if (min_ts > min_ckpt_ts_)
{
min_ckpt_ts_ = min_ts;
return true;
}
return false;
}

uint64_t LastCkptTimestamp(uint32_t cc_ng_id)
{
return cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed);
// this func is also called in RecoverTx rpc thread, which might be
// concurrent with state machine's on_apply when processing
// ReplayLogRequest
std::shared_lock s_lk(log_state_mutex_);

auto iter = cc_ng_info_.find(cc_ng_id);
if (iter == cc_ng_info_.end())
{
return 0;
}
else
{
return iter->second.last_ckpt_ts_;
}
}

CcNgInfo &GetCcNgInfo()
const std::unordered_map<uint32_t, CcNgInfo> GetCopyOfCcNgInfo() const
{
return cc_ng_info_;
std::shared_lock s_lk(log_state_mutex_);
std::unordered_map<uint32_t, CcNgInfo> cc_ng_info_copy = cc_ng_info_;
return cc_ng_info_copy;
}

virtual uint64_t GetApproximateReplayLogSize()
Expand Down Expand Up @@ -603,8 +666,7 @@ class LogState
*/
void TryCleanMultiStageOps()
{
uint64_t ckpt_ts =
cc_ng_info_.last_ckpt_ts_.load(std::memory_order_relaxed);
uint64_t ckpt_ts = min_ckpt_ts_;
using namespace std::chrono_literals;
uint64_t one_hour = std::chrono::microseconds(1h).count();
for (auto it = tx_catalog_ops_.begin(); it != tx_catalog_ops_.end();)
Expand Down Expand Up @@ -682,9 +744,14 @@ class LogState

virtual int DeleteRangeOp(uint64_t txn, uint64_t timestamp) = 0;

virtual int PersistCkptAndMaxTxn(uint64_t ckpt_ts, uint32_t max_txn) = 0;
virtual int PersistCkptAndMaxTxn(
const std::unordered_map<uint32_t, CcNgInfo> &ng_infos) = 0;

// CcNgInfo cc_ng_info_;
std::unordered_map<uint32_t, CcNgInfo> cc_ng_info_;

CcNgInfo cc_ng_info_;
// to erase finished schema ops and split range ops after one hour
std::atomic_uint64_t min_ckpt_ts_{};

struct CatalogOp
{
Expand Down Expand Up @@ -734,7 +801,8 @@ class LogState
bool all_cleaned =
std::all_of(schemas_op_msg_.begin(),
schemas_op_msg_.end(),
[](const SchemaOpMessage &schema_op_msg) {
[](const SchemaOpMessage &schema_op_msg)
{
return schema_op_msg.stage() ==
SchemaOpMessage_Stage_CleanSchema;
});
Expand All @@ -747,7 +815,8 @@ class LogState
bool all_committed =
std::all_of(schemas_op_msg_.begin(),
schemas_op_msg_.end(),
[](const SchemaOpMessage &schema_op_msg) {
[](const SchemaOpMessage &schema_op_msg)
{
return schema_op_msg.stage() ==
SchemaOpMessage_Stage_CommitSchema;
});
Expand Down
Loading