Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion storage/eloq/eloq_catalog_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ MariaCatalogFactory::CreateRangeCcmScanner(
{
assert(range_table_name.Type() == txservice::TableType::RangePartition);
return std::make_unique<
txservice::TemplateCcScanner<EloqKey, txservice::RangeRecord>>(
txservice::HashParitionCcScanner<EloqKey, txservice::RangeRecord>>(
direction,
range_table_name.IsBase() ? txservice::ScanIndexType::Primary
: txservice::ScanIndexType::Secondary,
Expand Down
64 changes: 42 additions & 22 deletions storage/eloq/ha_eloq.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
#include "mysql_version.h"
#include <climits>
#include <cstdint>
#include <sys/types.h>
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation // gcc: Class implementation
#endif
Expand Down Expand Up @@ -165,7 +166,7 @@
#define ELOQDS 1
#endif

#if (defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \
#if (defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3) || \
defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_GCS))
#define ELOQDS_RKDB_CLOUD 1
#endif
Expand Down Expand Up @@ -212,7 +213,7 @@
#if !defined(LOG_STATE_TYPE_RKDB_CLOUD)

// Only if LOG_STATE_TYPE_RKDB_CLOUD undefined
#if ((defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS)) && \
#if ((defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS)) && \
!defined(LOG_STATE_TYPE_RKDB))
#define LOG_STATE_TYPE_RKDB_CLOUD 1
#endif
Expand All @@ -222,7 +223,7 @@
#if !defined(LOG_STATE_TYPE_RKDB_ALL)

// Only if LOG_STATE_TYPE_RKDB_ALL undefined
#if (defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS) || \
#if (defined(LOG_STATE_TYPE_RKDB_S3) || defined(LOG_STATE_TYPE_RKDB_GCS) || \
defined(LOG_STATE_TYPE_RKDB))
#define LOG_STATE_TYPE_RKDB_ALL 1
#endif
Expand All @@ -233,7 +234,6 @@
#include "rocksdb_cloud_config.h"
#endif


// Don't put this include after sql_class.h include, it will cause compile
// error
#if (defined(DATA_STORE_TYPE_DYNAMODB) || defined(LOG_STATE_TYPE_RKDB_S3) || \
Expand Down Expand Up @@ -3301,8 +3301,7 @@ static int eloq_done_func(void *p)
metrics_registry= nullptr;
}

#if defined(DATA_STORE_TYPE_DYNAMODB) || \
defined(LOG_STATE_TYPE_RKDB_S3) || \
#if defined(DATA_STORE_TYPE_DYNAMODB) || defined(LOG_STATE_TYPE_RKDB_S3) || \
defined(DATA_STORE_TYPE_ELOQDSS_ROCKSDB_CLOUD_S3)
aws_deinit();
#endif
Expand Down Expand Up @@ -4844,11 +4843,13 @@ int ha_eloq::analyze(THD *thd, HA_CHECK_OPT *check_opt)
bool end_inclusive= false;
ScanDirection scan_direction= ScanDirection::Forward;

auto save_point= std::make_unique<BucketScanSavePoint>();

// uint64_t schema_version= txm->GetSchemaVersion(table_name);
ScanOpenTxRequest scan_open_req(
&scan_table_name, 0, scan_type, start_key, start_inclusive, end_key,
end_inclusive, scan_direction, false, false, false, true, true, true,
true, true, yield_func, resume_func, txm);
true, true, yield_func, resume_func, txm, -1, {}, save_point.get());
txm->Execute(&scan_open_req);
scan_open_req.Wait();
if (scan_open_req.IsError())
Expand All @@ -4859,20 +4860,40 @@ int ha_eloq::analyze(THD *thd, HA_CHECK_OPT *check_opt)

size_t scan_alias= scan_open_req.Result();
std::vector<txservice::ScanBatchTuple> scan_batch;
do

size_t current_index= 0;
if (save_point->prev_pause_idx_ != UINT64_MAX)
{
current_index= save_point->prev_pause_idx_;
}

size_t plan_size= save_point->PlanSize();
BucketScanPlan plan= save_point->PickPlan(current_index);
while (current_index < plan_size)
{
scan_batch.clear();
ScanBatchTxRequest scan_batch_req(scan_alias, scan_table_name,
&scan_batch, yield_func, resume_func,
txm);
txm, -1, {}, &plan);
txm->Execute(&scan_batch_req);
scan_batch_req.Wait();
if (scan_batch_req.IsError())
{
ret= HA_ADMIN_FAILED;
break;
}
} while (!scan_batch.empty());

if (scan_batch_req.Result())
{
current_index++;
if (current_index < plan_size)
{
plan= save_point->PickPlan(current_index);
}
}
}

scan_batch.clear();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Likely lock leak: clearing scan_batch before ScanCloseTxRequest

You pass scan_batch to ScanCloseTxRequest, but clear it immediately before. This likely drops the last batch’s unlock metadata.

Apply:

-    scan_batch.clear();
+    /* keep the last batch for ScanCloseTxRequest to unlock */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
scan_batch.clear();
/* keep the last batch for ScanCloseTxRequest to unlock */
🤖 Prompt for AI Agents
In storage/eloq/ha_eloq.cc around line 4896, clearing scan_batch before calling
ScanCloseTxRequest risks dropping the last batch’s unlock metadata and causing a
lock leak; move the scan_batch.clear() so it executes after ScanCloseTxRequest
(or otherwise preserve the batch contents until the call completes) so that
ScanCloseTxRequest receives the full batch data before any clearing.


if (!ret)
{
Expand Down Expand Up @@ -7279,7 +7300,7 @@ void ha_eloq::AddPushedDownCondition(Item *cond_item)
* When field value type can not match with filed type, do not pushdown.
* Otherwise, pushdwon.
*/
std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
std::vector<txservice::DataStoreSearchCond> ha_eloq::BindPushedCond()
{
// Don't take active index field as pushdown condition.
std::set<const mysql::Field *> active_index_fields;
Expand All @@ -7292,7 +7313,7 @@ std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
table->key_info[active_index].key_part[i].field);
}
}
std::vector<txservice::store::DataStoreSearchCond> res;
std::vector<txservice::DataStoreSearchCond> res;

for (const auto &pushed_cond : pushed_conds_)
{
Expand All @@ -7303,8 +7324,8 @@ std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
}

bool pushdown= true;
txservice::store::DataStoreDataType data_type=
txservice::store::DataStoreDataType::Numeric;
txservice::DataStoreDataType data_type=
txservice::DataStoreDataType::Numeric;

Item_func::Functype func_type= pushed_cond.func_type_;
Item_field *col_field= pushed_cond.col_field_;
Expand All @@ -7318,7 +7339,7 @@ std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_FLOAT:
case MYSQL_TYPE_DOUBLE:
data_type= txservice::store::DataStoreDataType::Numeric;
data_type= txservice::DataStoreDataType::Numeric;
break;
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
Expand All @@ -7334,8 +7355,8 @@ std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
: false;
// binary character set when number is 63.
data_type= (col_field->field->charset()->number == 63)
? txservice::store::DataStoreDataType::Blob
: txservice::store::DataStoreDataType::String;
? txservice::DataStoreDataType::Blob
: txservice::DataStoreDataType::String;
break;
default:
// Only push down conditions on certain data types. The list may be
Expand Down Expand Up @@ -7457,8 +7478,7 @@ std::vector<txservice::store::DataStoreSearchCond> ha_eloq::BindPushedCond()
}

std::string ha_eloq::PushedConditionString(
const std::vector<txservice::store::DataStoreSearchCond>
&pushdown_condition)
const std::vector<txservice::DataStoreSearchCond> &pushdown_condition)
{
std::string pushed_conds_str;

Expand All @@ -7475,7 +7495,7 @@ std::string ha_eloq::PushedConditionString(

switch (pushed_cond.data_type_)
{
case txservice::store::DataStoreDataType::String: {
case txservice::DataStoreDataType::String: {
std::stringstream ss;
ss << '\'';

Expand All @@ -7497,7 +7517,7 @@ std::string ha_eloq::PushedConditionString(
pushed_conds_str.append(ss.str());
break;
}
case txservice::store::DataStoreDataType::Blob: {
case txservice::DataStoreDataType::Blob: {
std::stringstream ss;
ss << "0x" << std::hex << std::setfill('0');
for (size_t pos= 0; pos < pushed_cond.val_str_.length(); ++pos)
Expand All @@ -7509,7 +7529,7 @@ std::string ha_eloq::PushedConditionString(
pushed_conds_str.append(ss.str());
break;
}
case txservice::store::DataStoreDataType::Numeric: {
case txservice::DataStoreDataType::Numeric: {
pushed_conds_str.append(pushed_cond.val_str_);
break;
}
Expand Down
5 changes: 2 additions & 3 deletions storage/eloq/ha_eloq.h
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,7 @@ class ha_eloq : public handler
void AddPushedDownCondition(Item *cond_item);

std::string PushedConditionString(
const std::vector<txservice::store::DataStoreSearchCond>
&pushdown_condition);
const std::vector<txservice::DataStoreSearchCond> &pushdown_condition);

int MatchIcp(uchar *table_record, const EloqKey &sk, const EloqRecord &rec,
const EloqKeySchema *sk_schema);
Expand Down Expand Up @@ -744,7 +743,7 @@ class ha_eloq : public handler
Item_literal *val_field_;
};

std::vector<txservice::store::DataStoreSearchCond> BindPushedCond();
std::vector<txservice::DataStoreSearchCond> BindPushedCond();

int BulkInsert(const uchar *buf, std::unique_ptr<EloqKey> eloq_key,
std::unique_ptr<EloqRecord> eloq_rec);
Expand Down