From dc9aec8d46e52fe2b3e136079fa22a6b7a06347a Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 1 Sep 2025 16:12:39 +0800 Subject: [PATCH 1/3] organize schema ReaderWriterCntl by node group and clears it on leader stop; fix FinishWriter --- include/cc/catalog_cc_map.h | 14 ++++---- include/cc/cc_shard.h | 14 ++++---- include/cc/reader_writer_cntl.h | 2 +- include/type.h | 1 + src/cc/cc_req_misc.cpp | 2 +- src/cc/cc_shard.cpp | 58 +++++++++++++++++++++++++-------- src/cc/reader_writer_cntl.cpp | 8 ++++- 7 files changed, 69 insertions(+), 30 deletions(-) diff --git a/include/cc/catalog_cc_map.h b/include/cc/catalog_cc_map.h index 2643f4f9..042cf88a 100644 --- a/include/cc/catalog_cc_map.h +++ b/include/cc/catalog_cc_map.h @@ -118,7 +118,7 @@ class CatalogCcMap const TableName &tbl_name = catalog_key->Name(); std::shared_ptr> schema_cntl = - shard_->FindEmplaceSchemaCntl(tbl_name); + shard_->FindEmplaceSchemaCntl(tbl_name, ng_id); AddWriterResult ret = schema_cntl->AddWriter(&req); switch (ret) { @@ -132,8 +132,8 @@ class CatalogCcMap return false; case AddWriterResult::Invalid: // The control block is invalid. Deletes it from the shard. - shard_->DeleteSchemaCntl(tbl_name); - schema_cntl = shard_->FindEmplaceSchemaCntl(tbl_name); + shard_->DeleteSchemaCntl(tbl_name, ng_id); + schema_cntl = shard_->FindEmplaceSchemaCntl(tbl_name, ng_id); ret = schema_cntl->AddWriter(&req); assert(ret == AddWriterResult::Success); break; @@ -572,11 +572,11 @@ class CatalogCcMap { const TableName &tbl_name = table_key->Name(); std::shared_ptr> schema_cntl = - shard_->FindSchemaCntl(tbl_name); + shard_->FindSchemaCntl(tbl_name, req.NodeGroupId()); if (schema_cntl != nullptr) { - schema_cntl->FinishWriter(); - shard_->DeleteSchemaCntl(tbl_name); + schema_cntl->FinishWriter(req.Txn()); + shard_->DeleteSchemaCntl(tbl_name, req.NodeGroupId()); } catalog_entry = @@ -1219,7 +1219,7 @@ class CatalogCcMap sch_rec->DirtySchema() == nullptr) { std::shared_ptr> sch_cntl = - shard_->FindEmplaceSchemaCntl(table_key->Name()); + shard_->FindEmplaceSchemaCntl(table_key->Name(), ng_id); if (sch_cntl->HasNoWriter()) { if (sch_cntl->GetObjectPtr() == nullptr) diff --git a/include/cc/cc_shard.h b/include/cc/cc_shard.h index 6d2d4f0b..8bdada83 100644 --- a/include/cc/cc_shard.h +++ b/include/cc/cc_shard.h @@ -1000,14 +1000,14 @@ class CcShard } std::shared_ptr> FindSchemaCntl( - const TableName &tbl_name); + const TableName &tbl_name, NodeGroupId cc_ng_id); std::shared_ptr> FindEmplaceSchemaCntl( - const TableName &tbl_name); + const TableName &tbl_name, NodeGroupId cc_ng_id); - void DeleteSchemaCntl(const TableName &tbl_name); + void DeleteSchemaCntl(const TableName &tbl_name, NodeGroupId cc_ng_id); - void ClearNativeSchemaCntl(); + void ClearSchemaCntl(NodeGroupId cc_ng_id); void CollectCacheHit(); void CollectCacheMiss(); @@ -1151,8 +1151,10 @@ class CcShard SystemHandler *const system_handler_; - absl::flat_hash_map>> + absl::flat_hash_map< + NodeGroupId, + absl::flat_hash_map>>> catalog_rw_cntl_; // The max number of cc page to scan in one invocation of Clean(). diff --git a/include/cc/reader_writer_cntl.h b/include/cc/reader_writer_cntl.h index 7367efca..aeefd78b 100644 --- a/include/cc/reader_writer_cntl.h +++ b/include/cc/reader_writer_cntl.h @@ -51,7 +51,7 @@ class ReaderWriterCntl bool AddReader(); void FinishReader(); AddWriterResult AddWriter(CcRequestBase *write_req); - void FinishWriter(); + void FinishWriter(uint64_t tx_number); void Invalidate(); bool HasNoWriter() const diff --git a/include/type.h b/include/type.h index 9f585f38..ceb72e87 100644 --- a/include/type.h +++ b/include/type.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include diff --git a/src/cc/cc_req_misc.cpp b/src/cc/cc_req_misc.cpp index 1ff19f49..46b2297e 100644 --- a/src/cc/cc_req_misc.cpp +++ b/src/cc/cc_req_misc.cpp @@ -410,11 +410,11 @@ bool ClearCcNodeGroup::Execute(CcShard &ccs) ccs.DropLockHoldingTxs(cc_ng_id_); ccs.DropCcms(cc_ng_id_); ccs.ResetStandbySequence(); + ccs.ClearSchemaCntl(cc_ng_id_); if (ccs.IsNative(cc_ng_id_)) { ccs.ClearActvieSiTxs(); - ccs.ClearNativeSchemaCntl(); ccs.ClearActiveBlockingTxs(); } diff --git a/src/cc/cc_shard.cpp b/src/cc/cc_shard.cpp index 6504ee28..94499094 100644 --- a/src/cc/cc_shard.cpp +++ b/src/cc/cc_shard.cpp @@ -2911,40 +2911,70 @@ void CcShard::NotifyTxProcessor() } std::shared_ptr> CcShard::FindSchemaCntl( - const TableName &tbl_name) + const TableName &tbl_name, NodeGroupId cc_ng_id) { - auto it = catalog_rw_cntl_.find(tbl_name); - return it == catalog_rw_cntl_.end() ? nullptr : it->second; + auto ng_it = catalog_rw_cntl_.find(cc_ng_id); + if (ng_it == catalog_rw_cntl_.end()) + { + return nullptr; + } + absl::flat_hash_map>> + &tbl_rw_cntls = ng_it->second; + auto it = tbl_rw_cntls.find(tbl_name); + return it == tbl_rw_cntls.end() ? nullptr : it->second; } std::shared_ptr> CcShard::FindEmplaceSchemaCntl( - const TableName &tbl_name) + const TableName &tbl_name, NodeGroupId cc_ng_id) { - auto [it, insert] = catalog_rw_cntl_.try_emplace(tbl_name); - if (insert) + auto [ng_it, _] = catalog_rw_cntl_.try_emplace(cc_ng_id); + absl::flat_hash_map>> + &tbl_rw_cntls = ng_it->second; + auto [it, inserted] = tbl_rw_cntls.try_emplace(tbl_name); + if (inserted) { it->second = std::make_shared>(this); } - return it->second; } -void CcShard::DeleteSchemaCntl(const TableName &tbl_name) +void CcShard::DeleteSchemaCntl(const TableName &tbl_name, NodeGroupId cc_ng_id) { - catalog_rw_cntl_.erase(tbl_name); + auto ng_it = catalog_rw_cntl_.find(cc_ng_id); + if (ng_it == catalog_rw_cntl_.end()) + { + return; + } + absl::flat_hash_map>> + &tbl_rw_cntls = ng_it->second; + tbl_rw_cntls.erase(tbl_name); + if (tbl_rw_cntls.empty()) + { + catalog_rw_cntl_.erase(ng_it); + } } -void CcShard::ClearNativeSchemaCntl() +void CcShard::ClearSchemaCntl(NodeGroupId cc_ng_id) { - // When the native cc shard fails over, invalidates all schema reader-writer + // When the node gorup fails over, invalidates all schema reader-writer // control blocks. Invalidation ensures that future runtime queries will not // use cached schema and falls back to reading the schema via concurrency // control. Ongoing runtime queries will continue to use the old schema. - for (auto &[tbl_name, cntl] : catalog_rw_cntl_) + auto ng_it = catalog_rw_cntl_.find(cc_ng_id); + if (ng_it != catalog_rw_cntl_.end()) { - cntl->Invalidate(); + absl::flat_hash_map>> + &tbl_rw_cntls = ng_it->second; + for (auto &[tbl_name, cntl] : tbl_rw_cntls) + { + cntl->Invalidate(); + } + catalog_rw_cntl_.erase(ng_it); } - catalog_rw_cntl_.clear(); } TxLockInfo::uptr CcShard::GetTxLockInfo(int64_t tx_term) diff --git a/src/cc/reader_writer_cntl.cpp b/src/cc/reader_writer_cntl.cpp index a91dc2e7..f755cba8 100644 --- a/src/cc/reader_writer_cntl.cpp +++ b/src/cc/reader_writer_cntl.cpp @@ -129,8 +129,14 @@ AddWriterResult ReaderWriterCntl::AddWriter(CcRequestBase *write_req) return success ? AddWriterResult::Success : AddWriterResult::WritePending; } -void ReaderWriterCntl::FinishWriter() +void ReaderWriterCntl::FinishWriter(uint64_t tx_number) { + // The writer txn might have failed to acquire the write lock and aborted. + auto write_req = write_req_.load(std::memory_order_relaxed); + if (write_req == nullptr || write_req->Txn() != tx_number) + { + return; + } assert( [&]() { From 0e180418056f8ee971863f045a31929ce55f7ec0 Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Tue, 2 Sep 2025 11:58:07 +0800 Subject: [PATCH 2/3] format, typo fix --- src/cc/cc_shard.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cc/cc_shard.cpp b/src/cc/cc_shard.cpp index 94499094..8c69013b 100644 --- a/src/cc/cc_shard.cpp +++ b/src/cc/cc_shard.cpp @@ -2959,7 +2959,7 @@ void CcShard::DeleteSchemaCntl(const TableName &tbl_name, NodeGroupId cc_ng_id) void CcShard::ClearSchemaCntl(NodeGroupId cc_ng_id) { - // When the node gorup fails over, invalidates all schema reader-writer + // When the node group fails over, invalidates all schema reader-writer // control blocks. Invalidation ensures that future runtime queries will not // use cached schema and falls back to reading the schema via concurrency // control. Ongoing runtime queries will continue to use the old schema. @@ -2967,7 +2967,7 @@ void CcShard::ClearSchemaCntl(NodeGroupId cc_ng_id) if (ng_it != catalog_rw_cntl_.end()) { absl::flat_hash_map>> + std::shared_ptr>> &tbl_rw_cntls = ng_it->second; for (auto &[tbl_name, cntl] : tbl_rw_cntls) { From 487719310a3383abd09ed33030d46f15bbe30d3d Mon Sep 17 00:00:00 2001 From: Kevin Chou Date: Mon, 8 Sep 2025 15:18:07 +0800 Subject: [PATCH 3/3] fix DDL abort at acquire write intent failure PostWriteAllCc execution --- include/cc/catalog_cc_map.h | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/include/cc/catalog_cc_map.h b/include/cc/catalog_cc_map.h index 042cf88a..e499b5b4 100644 --- a/include/cc/catalog_cc_map.h +++ b/include/cc/catalog_cc_map.h @@ -582,22 +582,18 @@ class CatalogCcMap catalog_entry = shard_->GetCatalog(table_key->Name(), req.NodeGroupId()); - if (catalog_entry == nullptr) - { - // The target table catalog haven't been initialized. Just - // return. - req.Result()->SetError(CcErrorCode::REQUESTED_TABLE_NOT_EXISTS); - return true; - } - if (req.CommitTs() == TransactionOperation::tx_op_failed_ts_) { + // The catalog_entry could be null if the txn aborts after + // acquire write intent failure. + // For add index op, we create new sk ccmap, table ranges and // table statistics for the new sk during prepare phase. If // flush kv failed, should clean these up. But, if the dirty // schema is nullptr, that is mean, this is the recovering // transaction, and there is no need to drop the new sk ccmap. if (req.OpType() == OperationType::AddIndex && + catalog_entry != nullptr && catalog_entry->dirty_schema_ != nullptr) { std::vector new_index_names = @@ -633,7 +629,8 @@ class CatalogCcMap } // Flush kv fails, need to clear dirty CatalogEntry - if (shard_->core_id_ == shard_->core_cnt_ - 1) + if (shard_->core_id_ == shard_->core_cnt_ - 1 && + catalog_entry != nullptr) { catalog_entry->RejectDirtySchema(); } @@ -646,6 +643,7 @@ class CatalogCcMap return TemplateCcMap::Execute(req); } + assert(catalog_entry != nullptr); if (shard_->core_id_ == 0) { // For post commit, retrieves the current and dirty schema pair