From 9674aa1b233fce5335d4711ead219c5bf164c68c Mon Sep 17 00:00:00 2001 From: lokax Date: Thu, 30 Oct 2025 14:17:27 +0800 Subject: [PATCH 1/3] fix --- storage/eloq/eloq_catalog_factory.cpp | 2 +- storage/eloq/ha_eloq.cc | 64 ++++++++++++++++++--------- storage/eloq/ha_eloq.h | 5 +-- storage/eloq/store_handler | 2 +- storage/eloq/tx_service | 2 +- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/storage/eloq/eloq_catalog_factory.cpp b/storage/eloq/eloq_catalog_factory.cpp index 23f1a619e3b..36aa7fc0df2 100644 --- a/storage/eloq/eloq_catalog_factory.cpp +++ b/storage/eloq/eloq_catalog_factory.cpp @@ -684,7 +684,7 @@ MariaCatalogFactory::CreateRangeCcmScanner( { assert(range_table_name.Type() == txservice::TableType::RangePartition); return std::make_unique< - txservice::TemplateCcScanner>( + txservice::HashParitionCcScanner>( direction, range_table_name.IsBase() ? txservice::ScanIndexType::Primary : txservice::ScanIndexType::Secondary, diff --git a/storage/eloq/ha_eloq.cc b/storage/eloq/ha_eloq.cc index 71d8157e003..feadfe2461e 100644 --- a/storage/eloq/ha_eloq.cc +++ b/storage/eloq/ha_eloq.cc @@ -116,6 +116,7 @@ #include "mysql_version.h" #include #include +#include #ifdef USE_PRAGMA_IMPLEMENTATION #pragma implementation // gcc: Class implementation #endif @@ -165,7 +166,7 @@ #define ELOQDS 1 #endif -#if (defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \ +#if (defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \ defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS)) #define ELOQDS_RKDB_CLOUD 1 #endif @@ -212,7 +213,7 @@ #if !defined(LOG_STATE_TYPE_RKDB_CLOUD) // Only if LOG_STATE_TYPE_RKDB_CLOUD undefined -#if ((defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS)) && \ +#if ((defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS)) && \ !defined(LOG_STATE_TYPE_RKDB)) #define LOG_STATE_TYPE_RKDB_CLOUD 1 #endif @@ -222,7 +223,7 @@ #if !defined(LOG_STATE_TYPE_RKDB_ALL) // Only if LOG_STATE_TYPE_RKDB_ALL undefined -#if (defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS) || \ +#if (defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS) || \ defined(LOG_STATE_TYPE_RKDB)) #define LOG_STATE_TYPE_RKDB_ALL 1 #endif @@ -233,7 +234,6 @@ #include "rocksdb_cloud_config.h" #endif - // Don't put this include after sql_class.h include, it will cause compile // error #if (defined(DATA_STORE_TYPE_DYNAMODB) || defined(LOG_STATE_TYPE_RKDB_S3) || \ @@ -3301,8 +3301,7 @@ static int eloq_done_func(void *p) metrics_registry= nullptr; } -#if defined(DATA_STORE_TYPE_DYNAMODB) || \ - defined(LOG_STATE_TYPE_RKDB_S3) || \ +#if defined(DATA_STORE_TYPE_DYNAMODB) || defined(LOG_STATE_TYPE_RKDB_S3) || \ defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) aws_deinit(); #endif @@ -4844,11 +4843,13 @@ int ha_eloq::analyze(THD *thd, HA_CHECK_OPT *check_opt) bool end_inclusive= false; ScanDirection scan_direction= ScanDirection::Forward; + auto save_point= std::make_unique(); + // uint64_t schema_version= txm->GetSchemaVersion(table_name); ScanOpenTxRequest scan_open_req( &scan_table_name, 0, scan_type, start_key, start_inclusive, end_key, end_inclusive, scan_direction, false, false, false, true, true, true, - true, true, yield_func, resume_func, txm); + true, true, yield_func, resume_func, txm, -1, {}, save_point.get()); txm->Execute(&scan_open_req); scan_open_req.Wait(); if (scan_open_req.IsError()) @@ -4859,12 +4860,21 @@ int ha_eloq::analyze(THD *thd, HA_CHECK_OPT *check_opt) size_t scan_alias= scan_open_req.Result(); std::vector scan_batch; - do + + size_t current_index= 0; + if (save_point->prev_pause_idx_ != UINT64_MAX) + { + current_index= save_point->prev_pause_idx_; + } + + size_t plan_size= save_point->PlanSize(); + BucketScanPlan plan= save_point->PickPlan(current_index); + while (current_index < plan_size) { scan_batch.clear(); ScanBatchTxRequest scan_batch_req(scan_alias, scan_table_name, &scan_batch, yield_func, resume_func, - txm); + txm, -1, {}, &plan); txm->Execute(&scan_batch_req); scan_batch_req.Wait(); if (scan_batch_req.IsError()) @@ -4872,7 +4882,18 @@ int ha_eloq::analyze(THD *thd, HA_CHECK_OPT *check_opt) ret= HA_ADMIN_FAILED; break; } - } while (!scan_batch.empty()); + + if (scan_batch_req.Result()) + { + current_index++; + if (current_index < plan_size) + { + plan= save_point->PickPlan(current_index); + } + } + } + + scan_batch.clear(); if (!ret) { @@ -7279,7 +7300,7 @@ void ha_eloq::AddPushedDownCondition(Item *cond_item) * When field value type can not match with filed type, do not pushdown. * Otherwise, pushdwon. */ -std::vector ha_eloq::BindPushedCond() +std::vector ha_eloq::BindPushedCond() { // Don't take active index field as pushdown condition. std::set active_index_fields; @@ -7292,7 +7313,7 @@ std::vector ha_eloq::BindPushedCond() table->key_info[active_index].key_part[i].field); } } - std::vector res; + std::vector res; for (const auto &pushed_cond : pushed_conds_) { @@ -7303,8 +7324,8 @@ std::vector ha_eloq::BindPushedCond() } bool pushdown= true; - txservice::store::DataStoreDataType data_type= - txservice::store::DataStoreDataType::Numeric; + txservice::DataStoreDataType data_type= + txservice::DataStoreDataType::Numeric; Item_func::Functype func_type= pushed_cond.func_type_; Item_field *col_field= pushed_cond.col_field_; @@ -7318,7 +7339,7 @@ std::vector ha_eloq::BindPushedCond() case MYSQL_TYPE_LONGLONG: case MYSQL_TYPE_FLOAT: case MYSQL_TYPE_DOUBLE: - data_type= txservice::store::DataStoreDataType::Numeric; + data_type= txservice::DataStoreDataType::Numeric; break; case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: @@ -7334,8 +7355,8 @@ std::vector ha_eloq::BindPushedCond() : false; // binary character set when number is 63. data_type= (col_field->field->charset()->number == 63) - ? txservice::store::DataStoreDataType::Blob - : txservice::store::DataStoreDataType::String; + ? txservice::DataStoreDataType::Blob + : txservice::DataStoreDataType::String; break; default: // Only push down conditions on certain data types. The list may be @@ -7457,8 +7478,7 @@ std::vector ha_eloq::BindPushedCond() } std::string ha_eloq::PushedConditionString( - const std::vector - &pushdown_condition) + const std::vector &pushdown_condition) { std::string pushed_conds_str; @@ -7475,7 +7495,7 @@ std::string ha_eloq::PushedConditionString( switch (pushed_cond.data_type_) { - case txservice::store::DataStoreDataType::String: { + case txservice::DataStoreDataType::String: { std::stringstream ss; ss << '\''; @@ -7497,7 +7517,7 @@ std::string ha_eloq::PushedConditionString( pushed_conds_str.append(ss.str()); break; } - case txservice::store::DataStoreDataType::Blob: { + case txservice::DataStoreDataType::Blob: { std::stringstream ss; ss << "0x" << std::hex << std::setfill('0'); for (size_t pos= 0; pos < pushed_cond.val_str_.length(); ++pos) @@ -7509,7 +7529,7 @@ std::string ha_eloq::PushedConditionString( pushed_conds_str.append(ss.str()); break; } - case txservice::store::DataStoreDataType::Numeric: { + case txservice::DataStoreDataType::Numeric: { pushed_conds_str.append(pushed_cond.val_str_); break; } diff --git a/storage/eloq/ha_eloq.h b/storage/eloq/ha_eloq.h index 1dc6fc1bc33..cb775ef5262 100644 --- a/storage/eloq/ha_eloq.h +++ b/storage/eloq/ha_eloq.h @@ -666,8 +666,7 @@ class ha_eloq : public handler void AddPushedDownCondition(Item *cond_item); std::string PushedConditionString( - const std::vector - &pushdown_condition); + const std::vector &pushdown_condition); int MatchIcp(uchar *table_record, const EloqKey &sk, const EloqRecord &rec, const EloqKeySchema *sk_schema); @@ -744,7 +743,7 @@ class ha_eloq : public handler Item_literal *val_field_; }; - std::vector BindPushedCond(); + std::vector BindPushedCond(); int BulkInsert(const uchar *buf, std::unique_ptr eloq_key, std::unique_ptr eloq_rec); diff --git a/storage/eloq/store_handler b/storage/eloq/store_handler index 516fe3faaa9..7192b83377b 160000 --- a/storage/eloq/store_handler +++ b/storage/eloq/store_handler @@ -1 +1 @@ -Subproject commit 516fe3faaa991212abffb3fd01a5709a4698234d +Subproject commit 7192b83377bde9b96da70d3f96286d6083b2f30e diff --git a/storage/eloq/tx_service b/storage/eloq/tx_service index 03d2d52bf10..731cb1c3dbd 160000 --- a/storage/eloq/tx_service +++ b/storage/eloq/tx_service @@ -1 +1 @@ -Subproject commit 03d2d52bf103cee0f87f2bd7157d6b0d69f6f2e8 +Subproject commit 731cb1c3dbd9b2aedd163ce6a1ad21dca9386b3b From 6b1767b5024b367a4dbfe592c7591829a3447917 Mon Sep 17 00:00:00 2001 From: lokax Date: Fri, 31 Oct 2025 10:39:26 +0800 Subject: [PATCH 2/3] update submodule --- storage/eloq/store_handler | 2 +- storage/eloq/tx_service | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/eloq/store_handler b/storage/eloq/store_handler index 7192b83377b..19a088c7aa6 160000 --- a/storage/eloq/store_handler +++ b/storage/eloq/store_handler @@ -1 +1 @@ -Subproject commit 7192b83377bde9b96da70d3f96286d6083b2f30e +Subproject commit 19a088c7aa6f83d4f55362fb13076a3c22ade5ef diff --git a/storage/eloq/tx_service b/storage/eloq/tx_service index 731cb1c3dbd..88f8b8b9858 160000 --- a/storage/eloq/tx_service +++ b/storage/eloq/tx_service @@ -1 +1 @@ -Subproject commit 731cb1c3dbd9b2aedd163ce6a1ad21dca9386b3b +Subproject commit 88f8b8b98589a4a86e32d103f7d2ee83842b0e1d From 8b3f3f8d292ae42fca8cd2fc427936c872e23627 Mon Sep 17 00:00:00 2001 From: lokax Date: Mon, 3 Nov 2025 11:06:53 +0800 Subject: [PATCH 3/3] update submodule --- storage/eloq/store_handler | 2 +- storage/eloq/tx_service | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/storage/eloq/store_handler b/storage/eloq/store_handler index 19a088c7aa6..62e17d3b8c8 160000 --- a/storage/eloq/store_handler +++ b/storage/eloq/store_handler @@ -1 +1 @@ -Subproject commit 19a088c7aa6f83d4f55362fb13076a3c22ade5ef +Subproject commit 62e17d3b8c8c4095d094b66351fdd7dec3a4f4d4 diff --git a/storage/eloq/tx_service b/storage/eloq/tx_service index 88f8b8b9858..d7cab24b218 160000 --- a/storage/eloq/tx_service +++ b/storage/eloq/tx_service @@ -1 +1 @@ -Subproject commit 88f8b8b98589a4a86e32d103f7d2ee83842b0e1d +Subproject commit d7cab24b218083969875f2bb291c9129ae350df1