From dcc837f9556759f7eefbdc74a237095b0b2fa9aa Mon Sep 17 00:00:00 2001 From: yxheartipp Date: Tue, 25 Mar 2025 13:53:39 +0800 Subject: [PATCH] add filter conditions to ParquetReader --- .../Impl/Parquet/ColumnFilterHelper.cpp | 4 +++ .../Formats/Impl/Parquet/ColumnFilterHelper.h | 1 + .../Formats/Impl/Parquet/ParquetReader.cpp | 36 +++++++++++++++++++ .../Formats/Impl/Parquet/ParquetReader.h | 4 +++ .../Impl/Parquet/RowGroupChunkReader.cpp | 17 +++++++-- .../Impl/Parquet/RowGroupChunkReader.h | 5 +-- 6 files changed, 61 insertions(+), 6 deletions(-) diff --git a/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.cpp b/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.cpp index bba69b75a830..dd6769eab103 100644 --- a/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.cpp +++ b/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.cpp @@ -31,7 +31,10 @@ FilterSplitResultPtr ColumnFilterHelper::splitFilterForPushDown(const ActionsDAG if (case_insensitive) col_name = Poco::toLower(col_name); if (!split_result->filters.contains(col_name)) + { split_result->filters.emplace(col_name, named_filter.second); + split_result->conditions.emplace(col_name,condition->result_type); + } else { auto merged = split_result->filters[col_name]->merge(named_filter.second.get()); @@ -60,5 +63,6 @@ void pushFilterToParquetReader(const ActionsDAG & filter_expression, ParquetRead return; auto split_result = ColumnFilterHelper::splitFilterForPushDown(filter_expression); reader.pushDownFilter(split_result); + reader.addCondations(split_result->conditions); } } diff --git a/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.h b/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.h index 2ed2c0189a9d..71701b98d558 100644 --- a/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.h +++ b/src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.h @@ -10,6 +10,7 @@ struct FilterSplitResult { ActionsDAG filter_expression; std::unordered_map filters; + std::unordered_map conditions; std::unordered_map fallback_filters; std::vector> expression_filters; }; diff --git a/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp index d818b23b7ed0..696a27645c49 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/ParquetReader.cpp @@ -93,6 +93,14 @@ Block ParquetReader::read() const return header.cloneWithColumns(chunk.detachColumns()); } +Block ParquetReader::read_with_select_conditions() const +{ + Chunk chunk = chunk_reader->read_with_select_conditions(max_block_size); + if (!chunk) + return header.cloneEmpty(); + return header.cloneWithColumns(chunk.detachColumns()); +} + void ParquetReader::setSourceArrowFile(std::shared_ptr arrow_file_) { this->arrow_file = arrow_file_; @@ -109,6 +117,23 @@ void ParquetReader::addFilter(const String & column_name, const ColumnFilterPtr // std::cerr << "filter on column " << column_name << ": " << filters[column_name]->toString() << std::endl; } +void ParquetReader::addCondations(std::unordered_map conditions) +{ + std::unordered_set header_names; + for (const auto & col_with_name : header) + { + header_names.insert(col_with_name.name); + } + for (const auto & [name, type] : conditions) + { + if (!header_names.contains(name)) + { + condition_data_types.emplace(name, type); + } + } +} + + std::unique_ptr ParquetReader::getRowGroupChunkReader(size_t row_group_idx, RowGroupPrefetchPtr conditions_prefetch, RowGroupPrefetchPtr prefetch) { @@ -190,6 +215,17 @@ DB::Chunk SubRowGroupRangeReader::read(size_t rows) } return chunk; } +DB::Chunk SubRowGroupRangeReader::read_with_select_conditions(size_t rows) +{ + Chunk chunk; + while (chunk.getNumRows() == 0) + { + if (!loadRowGroupChunkReaderIfNeeded()) + break; + chunk = row_group_chunk_reader->readChunk(rows); + } + return chunk; +} bool SubRowGroupRangeReader::loadRowGroupChunkReaderIfNeeded() { if (row_group_chunk_reader && !row_group_chunk_reader->hasMoreRows() && next_row_group_idx >= row_group_indices.size()) diff --git a/src/Processors/Formats/Impl/Parquet/ParquetReader.h b/src/Processors/Formats/Impl/Parquet/ParquetReader.h index 9916e8e8df0d..56dfd5324174 100644 --- a/src/Processors/Formats/Impl/Parquet/ParquetReader.h +++ b/src/Processors/Formats/Impl/Parquet/ParquetReader.h @@ -27,6 +27,7 @@ class SubRowGroupRangeReader std::vector && row_group_prefetches, RowGroupReaderCreator && creator); DB::Chunk read(size_t rows); + DB::Chunk read_with_select_conditions(size_t rows); private: bool loadRowGroupChunkReaderIfNeeded(); @@ -61,6 +62,9 @@ class ParquetReader std::shared_ptr io_pool_ = nullptr); Block read() const; + Block read_with_select_conditions() const; + std::unordered_map condition_data_types; + void addCondations(std::unordered_map conditions); void setSourceArrowFile(std::shared_ptr arrow_file_); void pushDownFilter(FilterSplitResultPtr filter_split_result); std::unique_ptr diff --git a/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp b/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp index d1e93d4661cf..d8e1f750c614 100644 --- a/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp +++ b/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp @@ -1,4 +1,7 @@ #include "RowGroupChunkReader.h" + +#include + #include #include #include @@ -75,7 +78,6 @@ RowGroupChunkReader::RowGroupChunkReader( reader_data_types.push_back(column_reader->getResultType()); reader_columns_mapping[col_with_name.name] = column_reader; } - // fallback filter, for example, read number data using string type, number reader doesn't support BytesValuesFilter std::call_once( parquet_reader->filter_fallback_checked, @@ -101,7 +103,18 @@ RowGroupChunkReader::RowGroupChunkReader( parquet_reader->addExpressionFilter(expr_filter); } }); - + if (!parquet_reader->condition_data_types.empty()) + { + for (const auto& [name, data_type] : parquet_reader->condition_data_types) + { + const auto & node = context.parquet_reader->getParquetColumn(name); + SelectiveColumnReaderPtr column_reader; + column_reader = builder->buildReader(node, data_type, 0, 0); + column_readers.push_back(column_reader); + reader_data_types.push_back(column_reader->getResultType()); + reader_columns_mapping[name] = column_reader; + } + } for (auto & [name, filter] : parquet_reader->filters) { if (reader_columns_mapping.contains(name)) diff --git a/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.h b/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.h index 24ac4d8f099b..036c31adc595 100644 --- a/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.h +++ b/src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.h @@ -110,10 +110,7 @@ class RowGroupChunkReader RowGroupPrefetchPtr prefetch_conditions, RowGroupPrefetchPtr prefetch, std::unordered_map filters); - ~RowGroupChunkReader() - { -// printMetrics(std::cerr); - } + ~RowGroupChunkReader() = default; Chunk readChunk(size_t rows); bool hasMoreRows() const { return remain_rows > 0; } void printMetrics(std::ostream & out) const