Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions include/cc/catalog_cc_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CatalogCcMap

const TableName &tbl_name = catalog_key->Name();
std::shared_ptr<ReaderWriterObject<TableSchema>> schema_cntl =
shard_->FindEmplaceSchemaCntl(tbl_name);
shard_->FindEmplaceSchemaCntl(tbl_name, ng_id);
AddWriterResult ret = schema_cntl->AddWriter(&req);
switch (ret)
{
Expand All @@ -132,8 +132,8 @@ class CatalogCcMap
return false;
case AddWriterResult::Invalid:
// The control block is invalid. Deletes it from the shard.
shard_->DeleteSchemaCntl(tbl_name);
schema_cntl = shard_->FindEmplaceSchemaCntl(tbl_name);
shard_->DeleteSchemaCntl(tbl_name, ng_id);
schema_cntl = shard_->FindEmplaceSchemaCntl(tbl_name, ng_id);
ret = schema_cntl->AddWriter(&req);
assert(ret == AddWriterResult::Success);
break;
Expand Down Expand Up @@ -572,32 +572,28 @@ class CatalogCcMap
{
const TableName &tbl_name = table_key->Name();
std::shared_ptr<ReaderWriterObject<TableSchema>> schema_cntl =
shard_->FindSchemaCntl(tbl_name);
shard_->FindSchemaCntl(tbl_name, req.NodeGroupId());
if (schema_cntl != nullptr)
{
schema_cntl->FinishWriter();
shard_->DeleteSchemaCntl(tbl_name);
schema_cntl->FinishWriter(req.Txn());
shard_->DeleteSchemaCntl(tbl_name, req.NodeGroupId());
}

catalog_entry =
shard_->GetCatalog(table_key->Name(), req.NodeGroupId());

if (catalog_entry == nullptr)
{
// The target table catalog haven't been initialized. Just
// return.
req.Result()->SetError(CcErrorCode::REQUESTED_TABLE_NOT_EXISTS);
return true;
}

if (req.CommitTs() == TransactionOperation::tx_op_failed_ts_)
{
// The catalog_entry could be null if the txn aborts after
// acquire write intent failure.

// For add index op, we create new sk ccmap, table ranges and
// table statistics for the new sk during prepare phase. If
// flush kv failed, should clean these up. But, if the dirty
// schema is nullptr, that is mean, this is the recovering
// transaction, and there is no need to drop the new sk ccmap.
if (req.OpType() == OperationType::AddIndex &&
catalog_entry != nullptr &&
catalog_entry->dirty_schema_ != nullptr)
{
std::vector<TableName> new_index_names =
Expand Down Expand Up @@ -633,7 +629,8 @@ class CatalogCcMap
}

// Flush kv fails, need to clear dirty CatalogEntry
if (shard_->core_id_ == shard_->core_cnt_ - 1)
if (shard_->core_id_ == shard_->core_cnt_ - 1 &&
catalog_entry != nullptr)
{
catalog_entry->RejectDirtySchema();
}
Expand All @@ -646,6 +643,7 @@ class CatalogCcMap
return TemplateCcMap::Execute(req);
}

assert(catalog_entry != nullptr);
if (shard_->core_id_ == 0)
{
// For post commit, retrieves the current and dirty schema pair
Expand Down Expand Up @@ -1219,7 +1217,7 @@ class CatalogCcMap
sch_rec->DirtySchema() == nullptr)
{
std::shared_ptr<ReaderWriterObject<TableSchema>> sch_cntl =
shard_->FindEmplaceSchemaCntl(table_key->Name());
shard_->FindEmplaceSchemaCntl(table_key->Name(), ng_id);
if (sch_cntl->HasNoWriter())
{
if (sch_cntl->GetObjectPtr() == nullptr)
Expand Down
14 changes: 8 additions & 6 deletions include/cc/cc_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -1000,14 +1000,14 @@ class CcShard
}

std::shared_ptr<ReaderWriterObject<TableSchema>> FindSchemaCntl(
const TableName &tbl_name);
const TableName &tbl_name, NodeGroupId cc_ng_id);

std::shared_ptr<ReaderWriterObject<TableSchema>> FindEmplaceSchemaCntl(
const TableName &tbl_name);
const TableName &tbl_name, NodeGroupId cc_ng_id);

void DeleteSchemaCntl(const TableName &tbl_name);
void DeleteSchemaCntl(const TableName &tbl_name, NodeGroupId cc_ng_id);

void ClearNativeSchemaCntl();
void ClearSchemaCntl(NodeGroupId cc_ng_id);
void CollectCacheHit();
void CollectCacheMiss();

Expand Down Expand Up @@ -1151,8 +1151,10 @@ class CcShard

SystemHandler *const system_handler_;

absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>
absl::flat_hash_map<
NodeGroupId,
absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>>
catalog_rw_cntl_;

// The max number of cc page to scan in one invocation of Clean().
Expand Down
2 changes: 1 addition & 1 deletion include/cc/reader_writer_cntl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ReaderWriterCntl
bool AddReader();
void FinishReader();
AddWriterResult AddWriter(CcRequestBase *write_req);
void FinishWriter();
void FinishWriter(uint64_t tx_number);
void Invalidate();

bool HasNoWriter() const
Expand Down
1 change: 1 addition & 0 deletions include/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <iostream>
#include <iterator>
#include <mutex>
#include <set>
Expand Down
2 changes: 1 addition & 1 deletion src/cc/cc_req_misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,11 @@ bool ClearCcNodeGroup::Execute(CcShard &ccs)
ccs.DropLockHoldingTxs(cc_ng_id_);
ccs.DropCcms(cc_ng_id_);
ccs.ResetStandbySequence();
ccs.ClearSchemaCntl(cc_ng_id_);

if (ccs.IsNative(cc_ng_id_))
{
ccs.ClearActvieSiTxs();
ccs.ClearNativeSchemaCntl();
ccs.ClearActiveBlockingTxs();
}

Expand Down
58 changes: 44 additions & 14 deletions src/cc/cc_shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2911,40 +2911,70 @@ void CcShard::NotifyTxProcessor()
}

std::shared_ptr<ReaderWriterObject<TableSchema>> CcShard::FindSchemaCntl(
const TableName &tbl_name)
const TableName &tbl_name, NodeGroupId cc_ng_id)
{
auto it = catalog_rw_cntl_.find(tbl_name);
return it == catalog_rw_cntl_.end() ? nullptr : it->second;
auto ng_it = catalog_rw_cntl_.find(cc_ng_id);
if (ng_it == catalog_rw_cntl_.end())
{
return nullptr;
}
absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>
&tbl_rw_cntls = ng_it->second;
auto it = tbl_rw_cntls.find(tbl_name);
return it == tbl_rw_cntls.end() ? nullptr : it->second;
}

std::shared_ptr<ReaderWriterObject<TableSchema>> CcShard::FindEmplaceSchemaCntl(
const TableName &tbl_name)
const TableName &tbl_name, NodeGroupId cc_ng_id)
{
auto [it, insert] = catalog_rw_cntl_.try_emplace(tbl_name);
if (insert)
auto [ng_it, _] = catalog_rw_cntl_.try_emplace(cc_ng_id);
absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>
&tbl_rw_cntls = ng_it->second;
auto [it, inserted] = tbl_rw_cntls.try_emplace(tbl_name);
if (inserted)
{
it->second = std::make_shared<ReaderWriterObject<TableSchema>>(this);
}

return it->second;
}

void CcShard::DeleteSchemaCntl(const TableName &tbl_name)
void CcShard::DeleteSchemaCntl(const TableName &tbl_name, NodeGroupId cc_ng_id)
{
catalog_rw_cntl_.erase(tbl_name);
auto ng_it = catalog_rw_cntl_.find(cc_ng_id);
if (ng_it == catalog_rw_cntl_.end())
{
return;
}
absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>
&tbl_rw_cntls = ng_it->second;
tbl_rw_cntls.erase(tbl_name);
if (tbl_rw_cntls.empty())
{
catalog_rw_cntl_.erase(ng_it);
}
}

void CcShard::ClearNativeSchemaCntl()
void CcShard::ClearSchemaCntl(NodeGroupId cc_ng_id)
{
// When the native cc shard fails over, invalidates all schema reader-writer
// When the node group fails over, invalidates all schema reader-writer
// control blocks. Invalidation ensures that future runtime queries will not
// use cached schema and falls back to reading the schema via concurrency
// control. Ongoing runtime queries will continue to use the old schema.
for (auto &[tbl_name, cntl] : catalog_rw_cntl_)
auto ng_it = catalog_rw_cntl_.find(cc_ng_id);
if (ng_it != catalog_rw_cntl_.end())
{
cntl->Invalidate();
absl::flat_hash_map<TableName,
std::shared_ptr<ReaderWriterObject<TableSchema>>>
&tbl_rw_cntls = ng_it->second;
for (auto &[tbl_name, cntl] : tbl_rw_cntls)
{
cntl->Invalidate();
}
catalog_rw_cntl_.erase(ng_it);
}
catalog_rw_cntl_.clear();
}

TxLockInfo::uptr CcShard::GetTxLockInfo(int64_t tx_term)
Expand Down
8 changes: 7 additions & 1 deletion src/cc/reader_writer_cntl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,14 @@ AddWriterResult ReaderWriterCntl::AddWriter(CcRequestBase *write_req)
return success ? AddWriterResult::Success : AddWriterResult::WritePending;
}

void ReaderWriterCntl::FinishWriter()
void ReaderWriterCntl::FinishWriter(uint64_t tx_number)
{
// The writer txn might have failed to acquire the write lock and aborted.
auto write_req = write_req_.load(std::memory_order_relaxed);
if (write_req == nullptr || write_req->Txn() != tx_number)
{
return;
}
assert(
[&]()
{
Expand Down