Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ FilterSplitResultPtr ColumnFilterHelper::splitFilterForPushDown(const ActionsDAG
if (case_insensitive)
col_name = Poco::toLower(col_name);
if (!split_result->filters.contains(col_name))
{
split_result->filters.emplace(col_name, named_filter.second);
split_result->conditions.emplace(col_name,condition->result_type);
}
else
{
auto merged = split_result->filters[col_name]->merge(named_filter.second.get());
Expand Down Expand Up @@ -60,5 +63,6 @@ void pushFilterToParquetReader(const ActionsDAG & filter_expression, ParquetRead
return;
auto split_result = ColumnFilterHelper::splitFilterForPushDown(filter_expression);
reader.pushDownFilter(split_result);
reader.addCondations(split_result->conditions);
}
}
1 change: 1 addition & 0 deletions src/Processors/Formats/Impl/Parquet/ColumnFilterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct FilterSplitResult
{
ActionsDAG filter_expression;
std::unordered_map<String, ColumnFilterPtr> filters;
std::unordered_map<String, DataTypePtr> conditions;
std::unordered_map<String, ActionsDAG::NodeRawConstPtrs> fallback_filters;
std::vector<std::shared_ptr<ExpressionFilter>> expression_filters;
};
Expand Down
36 changes: 36 additions & 0 deletions src/Processors/Formats/Impl/Parquet/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ Block ParquetReader::read() const
return header.cloneWithColumns(chunk.detachColumns());
}

Block ParquetReader::read_with_select_conditions() const
{
Chunk chunk = chunk_reader->read_with_select_conditions(max_block_size);
if (!chunk)
return header.cloneEmpty();
return header.cloneWithColumns(chunk.detachColumns());
}

void ParquetReader::setSourceArrowFile(std::shared_ptr<arrow::io::RandomAccessFile> arrow_file_)
{
this->arrow_file = arrow_file_;
Expand All @@ -109,6 +117,23 @@ void ParquetReader::addFilter(const String & column_name, const ColumnFilterPtr
// std::cerr << "filter on column " << column_name << ": " << filters[column_name]->toString() << std::endl;
}

void ParquetReader::addCondations(std::unordered_map<String, DataTypePtr> conditions)
{
std::unordered_set<std::string> header_names;
for (const auto & col_with_name : header)
{
header_names.insert(col_with_name.name);
}
for (const auto & [name, type] : conditions)
{
if (!header_names.contains(name))
{
condition_data_types.emplace(name, type);
}
}
}


std::unique_ptr<RowGroupChunkReader>
ParquetReader::getRowGroupChunkReader(size_t row_group_idx, RowGroupPrefetchPtr conditions_prefetch, RowGroupPrefetchPtr prefetch)
{
Expand Down Expand Up @@ -190,6 +215,17 @@ DB::Chunk SubRowGroupRangeReader::read(size_t rows)
}
return chunk;
}
DB::Chunk SubRowGroupRangeReader::read_with_select_conditions(size_t rows)
{
Chunk chunk;
while (chunk.getNumRows() == 0)
{
if (!loadRowGroupChunkReaderIfNeeded())
break;
chunk = row_group_chunk_reader->readChunk(rows);
}
return chunk;
}
bool SubRowGroupRangeReader::loadRowGroupChunkReaderIfNeeded()
{
if (row_group_chunk_reader && !row_group_chunk_reader->hasMoreRows() && next_row_group_idx >= row_group_indices.size())
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Formats/Impl/Parquet/ParquetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class SubRowGroupRangeReader
std::vector<RowGroupPrefetchPtr> && row_group_prefetches,
RowGroupReaderCreator && creator);
DB::Chunk read(size_t rows);
DB::Chunk read_with_select_conditions(size_t rows);

private:
bool loadRowGroupChunkReaderIfNeeded();
Expand Down Expand Up @@ -61,6 +62,9 @@ class ParquetReader
std::shared_ptr<ThreadPool> io_pool_ = nullptr);

Block read() const;
Block read_with_select_conditions() const;
std::unordered_map<String, DataTypePtr> condition_data_types;
void addCondations(std::unordered_map<String, DataTypePtr> conditions);
void setSourceArrowFile(std::shared_ptr<arrow::io::RandomAccessFile> arrow_file_);
void pushDownFilter(FilterSplitResultPtr filter_split_result);
std::unique_ptr<RowGroupChunkReader>
Expand Down
17 changes: 15 additions & 2 deletions src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#include "RowGroupChunkReader.h"

#include <memory>

#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/SharedThreadPools.h>
Expand Down Expand Up @@ -75,7 +78,6 @@ RowGroupChunkReader::RowGroupChunkReader(
reader_data_types.push_back(column_reader->getResultType());
reader_columns_mapping[col_with_name.name] = column_reader;
}

// fallback filter, for example, read number data using string type, number reader doesn't support BytesValuesFilter
std::call_once(
parquet_reader->filter_fallback_checked,
Expand All @@ -101,7 +103,18 @@ RowGroupChunkReader::RowGroupChunkReader(
parquet_reader->addExpressionFilter(expr_filter);
}
});

if (!parquet_reader->condition_data_types.empty())
{
for (const auto& [name, data_type] : parquet_reader->condition_data_types)
{
const auto & node = context.parquet_reader->getParquetColumn(name);
SelectiveColumnReaderPtr column_reader;
column_reader = builder->buildReader(node, data_type, 0, 0);
column_readers.push_back(column_reader);
reader_data_types.push_back(column_reader->getResultType());
reader_columns_mapping[name] = column_reader;
}
}
for (auto & [name, filter] : parquet_reader->filters)
{
if (reader_columns_mapping.contains(name))
Expand Down
5 changes: 1 addition & 4 deletions src/Processors/Formats/Impl/Parquet/RowGroupChunkReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ class RowGroupChunkReader
RowGroupPrefetchPtr prefetch_conditions,
RowGroupPrefetchPtr prefetch,
std::unordered_map<String, ColumnFilterPtr> filters);
~RowGroupChunkReader()
{
// printMetrics(std::cerr);
}
~RowGroupChunkReader() = default;
Chunk readChunk(size_t rows);
bool hasMoreRows() const { return remain_rows > 0; }
void printMetrics(std::ostream & out) const
Expand Down