From ae1bbd79ab84e14e2e01adb210b7119c6b7b9ffc Mon Sep 17 00:00:00 2001 From: Yan Shangpeng Date: Thu, 18 Nov 2021 02:40:59 -0500 Subject: [PATCH 1/3] modifying ci_scripts/install_pmem_common.sh oap-ape/README.md --- oap-ape/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oap-ape/README.md b/oap-ape/README.md index 31c921528..dd882513e 100644 --- a/oap-ape/README.md +++ b/oap-ape/README.md @@ -72,7 +72,7 @@ mvn clean install Native and Java build. We will use `$OAP_ROOT_DIR/oap-ape/ape-java/ape-spark/target/ape-spark-1.1.0-SNAPSHOT.jar`, `$OAP_ROOT_DIR/oap-ape/ape-java/ape-flink/ape-flink-x.xx.x/target/ape-flink-x.xx.x-1.1.0-SNAPSHOT.jar` later. ``` cd $OAP_ROOT_DIR/oap-ape/ape-java -mvn clean package -am -Pshading +mvn clean package -am --batch-mode -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn ``` Note: This will package libparquet_jni.so into ape-spark-1.1.0-SNAPSHOT.jar under linux/64/lib folder. From 0894433542d1b9447ad13bb79f8838798ed227c6 Mon Sep 17 00:00:00 2001 From: yspMing <1844341967@qq.com> Date: Thu, 18 Nov 2021 02:57:53 -0500 Subject: [PATCH 2/3] modifying ci_scripts/install_pmem_common.sh oap-ape/README.md --- ci_scripts/install_pmem_common.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/ci_scripts/install_pmem_common.sh b/ci_scripts/install_pmem_common.sh index f8d9d279b..4c76e0ed4 100644 --- a/ci_scripts/install_pmem_common.sh +++ b/ci_scripts/install_pmem_common.sh @@ -1,4 +1,5 @@ cd /tmp git clone https://github.com/oap-project/pmem-common.git cd pmem-common/ +git checkout branch-1.1-spark-3.x mvn install -am -q -DskipTests \ No newline at end of file From c8feaf3af0b762a4c374f8a0131d5cc38809012e Mon Sep 17 00:00:00 2001 From: yspMing <1844341967@qq.com> Date: Fri, 7 Jan 2022 02:07:14 -0500 Subject: [PATCH 3/3] Add predicate filter function to ape reader --- .../src/test/predicateFilterTest.cc | 170 ++++++++++++++++ .../src/utils/PredicateExpression.h | 186 ++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 oap-ape/ape-native/src/test/predicateFilterTest.cc create mode 100644 oap-ape/ape-native/src/utils/PredicateExpression.h diff --git a/oap-ape/ape-native/src/test/predicateFilterTest.cc b/oap-ape/ape-native/src/test/predicateFilterTest.cc new file mode 100644 index 000000000..6f572678d --- /dev/null +++ b/oap-ape/ape-native/src/test/predicateFilterTest.cc @@ -0,0 +1,170 @@ +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "src/reader.h" +#include "src/utils/PredicateExpression.h" + +TEST(predicateFilterTest, minMaxTest) +{ + arrow::fs::HdfsOptions options_; + + std::string hdfs_host = "clx06-AEP"; + int hdfs_port = 8020; + + options_.ConfigureEndPoint(hdfs_host, hdfs_port); + + auto result = arrow::fs::HadoopFileSystem::Make(options_); + EXPECT_TRUE(result.ok()) << "HadoopFileSystem Make failed"; + + std::shared_ptr fs_ = + std::make_shared("", *result); + + std::string file_name = + "/user/hive/warehouse/tpcds_hdfs_parquet_10.db/store_sales/ss_sold_date_sk=2450817/" + "part-00013-0828d1ab-ef1f-4b55-bf94-c071fb76c353.c000.snappy.parquet"; + + auto file_result = fs_->OpenInputFile(file_name); + EXPECT_TRUE(file_result.ok()) << "Open hdfs file failed"; + + std::shared_ptr file = file_result.ValueOrDie(); + std::cout << "file size is " << file->GetSize().ValueOrDie() << std::endl; + + parquet::ReaderProperties properties; + // std::shared_ptr metadata; + std::unique_ptr parquetReader = + parquet::ParquetFileReader::Open(file, properties, NULLPTR); + + std::shared_ptr fileMetaData = parquetReader->metadata(); + + int numRows = fileMetaData->num_rows(); + int numCols = fileMetaData->num_columns(); + int numRowGroups = fileMetaData->num_row_groups(); + std::string schema = fileMetaData->schema()->ToString(); + + std::cout<<"parquet file has "< urgMataData = fileMetaData->RowGroup(rowGroupIndex); + std::shared_ptr rgMataData = std::move(urgMataData); + + //std::vector> columnChunkMeta; + //columnChunkMeta.resize(numCols); + std::unique_ptr columnChunkMeta; + + for(int i=0; iColumnChunk(i); + std::string column_name = fileMetaData->schema()->Column(i)->name(); + parquet::Type::type column_type = fileMetaData->schema()->Column(i)->physical_type(); + std::cout<<"current column["< statistic = columnChunkMeta->statistics(); + if(!statistic->HasMinMax()) + { + std::cout<<"This column does not have valid min max value."<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()<(statistic); + std::cout<<"min: "<min()<<" max: "<max()< +#include + +#include "src/utils/Expression.h" +#include "src/utils/Type.h" + +namespace ape { + +class PredicateExpression : public Expression { + public: + explicit PredicateExpression(std::string type_); + virtual void Execute() {} + int ExecuteWithParam(int batchSize, const std::vector& dataBuffers, + const std::vector& nullBuffers, + std::vector& outBuffers) { + return 0; + } + void setSchema(std::shared_ptr> schema_) {} + virtual int PredicateWithParam(int8_t& out) = 0; + virtual void setStatistic(std::shared_ptr rgMataData_) = 0; + ~PredicateExpression(); +}; + +class RootPredicateExpression : public PredicateExpression { + public: + RootPredicateExpression(std::string type_, std::shared_ptr child_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~RootPredicateExpression(); + void setStatistic(std::shared_ptr rgMataData_){ + child->setStatistic(rgMataData_); + } + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + child->setSchema(schema); + } + + private: + std::shared_ptr child; +}; + +class NotPredicateExpression : public PredicateExpression { + public: + NotPredicateExpression(std::string type_, std::shared_ptr child_); + void Execute() {} + int PredicateWithParam(int8_t& out); + void setStatistic(std::shared_ptr rgMataData_){ + child->setStatistic(rgMataData_); + } + ~NotPredicateExpression(); + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + child->setSchema(schema); + } + std::shared_ptr getChild() { return child; } + + private: + std::shared_ptr child; +}; + +class BinaryPredicateExpression : public PredicateExpression { + public: + BinaryPredicateExpression(std::string type_, std::shared_ptr left_, + std::shared_ptr right_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~BinaryPredicateExpression(); + void setSchema(std::shared_ptr> schema_) { + schema = schema_; + left->setSchema(schema); + right->setSchema(schema); + } + void setStatistic(std::shared_ptr rgMataData_){ + left->setStatistic(rgMataData_); + right->setStatistic(rgMataData_); + } + std::shared_ptr getLeftChild() { return left; } + std::shared_ptr getRightChild() { return right; } + + private: + std::shared_ptr left; + std::shared_ptr right; + std::string opType; +}; + +class UnaryPredicateExpression : public PredicateExpression { + public: + UnaryPredicateExpression(std::string type_, std::string columnName_) + : PredicateExpression(type_) { + columnName = columnName_; + } + ~UnaryPredicateExpression() {} + std::string getColumnName(); + + protected: + std::string columnName; +}; + +template +class TypedUnaryPredicateExpression : public UnaryPredicateExpression { + public: + TypedUnaryPredicateExpression(std::string type_, std::string columnName_, T value_); + void Execute() {} + int PredicateWithParam(int8_t& out); + ~TypedUnaryPredicateExpression(); + void setSchema(std::shared_ptr> schema_); + void setStatistic(std::shared_ptr rgMataData_); + + private: + std::string compareType; + T value; + T minVal; + T maxVal; + bool hasMinMax; + int columnIndex; +}; + +class StringPredicateExpression : public UnaryPredicateExpression { + public: + StringPredicateExpression(std::string type_, std::string columnName_, std::string value_); + ~StringPredicateExpression() {} + void setSchema(std::shared_ptr> schema_); + void setStatistic(std::shared_ptr rgMataData_) {} + int PredicateWithParam(int8_t& out) = 0; + std::string getColumnName(); + + protected: + std::string type; + std::string value; + int columnIndex; +}; + +class StartWithPredicateExpression : public StringPredicateExpression { + public: + StartWithPredicateExpression(std::string type_, std::string columnName_, + std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~StartWithPredicateExpression() {} +}; + +class EndWithPredicateExpression : public StringPredicateExpression { + public: + EndWithPredicateExpression(std::string type_, std::string columnName_, std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~EndWithPredicateExpression() {} +}; + +class ContainsPredicateExpression : public StringPredicateExpression { + public: + ContainsPredicateExpression(std::string type_, std::string columnName_, std::string value_) + : StringPredicateExpression(type_, columnName_, value_) {} + int PredicateWithParam(int8_t& out); + ~ContainsPredicateExpression() {} +}; + +using BoolUnaryPredicateExpression = TypedUnaryPredicateExpression; +using Int32UnaryPredicateExpression = TypedUnaryPredicateExpression; +using Int64UnaryPredicateExpression = TypedUnaryPredicateExpression; +// using Int96UnaryPredicateExpression = TypedUnaryExpression; +using FloatUnaryPredicateExpression = TypedUnaryPredicateExpression; +using DoubleUnaryPredicateExpression = TypedUnaryPredicateExpression; +using NullUnaryPredicateExpression = TypedUnaryPredicateExpression; +using ByteArrayUnaryPredicateExpression = TypedUnaryPredicateExpression; + +} // namespace ape