diff --git a/backends-velox/src-celeborn/main/java/org/apache/gluten/vectorized/CelebornPartitionWriterJniWrapper.java b/backends-velox/src-celeborn/main/java/org/apache/gluten/vectorized/CelebornPartitionWriterJniWrapper.java index 7cbfe6895560..0acaa12006f0 100644 --- a/backends-velox/src-celeborn/main/java/org/apache/gluten/vectorized/CelebornPartitionWriterJniWrapper.java +++ b/backends-velox/src-celeborn/main/java/org/apache/gluten/vectorized/CelebornPartitionWriterJniWrapper.java @@ -38,7 +38,6 @@ public long rtHandle() { public native long createPartitionWriter( int numPartitions, String codec, - String codecBackend, int compressionLevel, int compressionBufferSize, int pushBufferMaxSize, diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala index 0869ad3c30c8..4773bd9aa764 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarBatchSerializer.scala @@ -89,8 +89,6 @@ private class CelebornColumnarBatchSerializerInstance( } else { null // uncompressed } - val compressionCodecBackend = - GlutenConfig.get.columnarShuffleCodecBackend.orNull val jniWrapper = ShuffleReaderJniWrapper.create(runtime) val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize @@ -99,7 +97,6 @@ private class CelebornColumnarBatchSerializerInstance( .make( cSchema.memoryAddress(), compressionCodec, - compressionCodecBackend, batchSize, readerBufferSize, deserializerBufferSize, diff --git a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala index d2c27e960c84..067ac5ce9e3d 100644 --- a/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala +++ b/backends-velox/src-celeborn/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala @@ -133,7 +133,6 @@ class VeloxCelebornColumnarShuffleWriter[K, V]( val partitionWriterHandle = partitionWriterJniWrapper.createPartitionWriter( numPartitions, compressionCodec.orNull, - GlutenConfig.get.columnarShuffleCodecBackend.orNull, compressionLevel, compressionBufferSize, clientPushBufferMaxSize, diff --git a/backends-velox/src-uniffle/main/java/org/apache/gluten/vectorized/UnifflePartitionWriterJniWrapper.java b/backends-velox/src-uniffle/main/java/org/apache/gluten/vectorized/UnifflePartitionWriterJniWrapper.java index cf3ee9c6648b..5e0c549a2543 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/gluten/vectorized/UnifflePartitionWriterJniWrapper.java +++ b/backends-velox/src-uniffle/main/java/org/apache/gluten/vectorized/UnifflePartitionWriterJniWrapper.java @@ -38,7 +38,6 @@ public long rtHandle() { public native long createPartitionWriter( int numPartitions, String codec, - String codecBackend, int compressionLevel, int compressionBufferSize, int pushBufferMaxSize, diff --git a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index 3240a500642c..225ac184fece 100644 --- a/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/backends-velox/src-uniffle/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -66,7 +66,6 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter codecBackend = GlutenConfig.get().columnarShuffleCodecBackend(); - if (codecBackend.isDefined()) { - this.codecBackend = codecBackend.get(); - } } } @@ -156,7 +151,6 @@ protected void writeImpl(Iterator> records) { partitionWriterJniWrapper.createPartitionWriter( numPartitions, compressionCodec, - codecBackend, compressionLevel, compressionBufferSize, bufferSize, diff --git a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala index 53354f432bfc..e2378ffedc66 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/vectorized/ColumnarBatchSerializer.scala @@ -93,8 +93,6 @@ private class ColumnarBatchSerializerInstance( } else { null // uncompressed } - val compressionCodecBackend = - GlutenConfig.get.columnarShuffleCodecBackend.orNull val batchSize = GlutenConfig.get.maxBatchSize val readerBufferSize = GlutenConfig.get.columnarShuffleReaderBufferSize val deserializerBufferSize = GlutenConfig.get.columnarSortShuffleDeserializerBufferSize @@ -103,7 +101,6 @@ private class ColumnarBatchSerializerInstance( val shuffleReaderHandle = jniWrapper.make( cSchema.memoryAddress(), compressionCodec, - compressionCodecBackend, batchSize, readerBufferSize, deserializerBufferSize, diff --git a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala index 94735e5b70a0..b236711590ef 100644 --- a/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala +++ b/backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala @@ -146,7 +146,6 @@ class ColumnarShuffleWriter[K, V]( val partitionWriterHandle = partitionWriterJniWrapper.createPartitionWriter( numPartitions, compressionCodec.orNull, - GlutenConfig.get.columnarShuffleCodecBackend.orNull, compressionLevel, compressionBufferSize, GlutenConfig.get.columnarShuffleCompressionThreshold, diff --git a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json index 1bb7562e5372..03be6e9b5579 100644 --- a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json +++ b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json @@ -40,7 +40,6 @@ "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint": "hdfs://127.0.0.1:8020/", "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size": "10Gi", "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path": "/tmp/metadata/hdfs/3.5/", - "spark.gluten.sql.columnar.shuffle.codecBackend": "", "spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.input_write_timeout": "180000", "spark.gluten.memory.task.offHeap.size.in.bytes": "10737418240", "spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk": "hdfs_cache", diff --git a/cpp/CMake/BuildQATZstd.cmake b/cpp/CMake/BuildQATZstd.cmake deleted file mode 100644 index f79e9ea58fcc..000000000000 --- a/cpp/CMake/BuildQATZstd.cmake +++ /dev/null @@ -1,103 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include(ExternalProject) - -if("${MAKE}" STREQUAL "") - if(NOT MSVC) - find_program(MAKE make) - endif() -endif() - -macro(build_qatzstd) - # Find ZSTD - include(FindZstd) - - message(STATUS "Building QAT-ZSTD from source") - set(QATZSTD_SOURCE_URL "https://github.com/marin-ma/QAT-ZSTD-Plugin.git") - set(QATZSTD_SOURCE_BRANCH "fix-duplicate-symbol") - set(QATZSTD_LIB_NAME "qatseqprod") - - set(QATZSTD_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qatzstd_ep-install") - set(QATZSTD_SOURCE_DIR "${QATZSTD_PREFIX}/src/qatzstd_ep") - set(QATZSTD_INCLUDE_DIR "${QATZSTD_SOURCE_DIR}/src") - set(QATZSTD_STATIC_LIB_NAME - "${CMAKE_STATIC_LIBRARY_PREFIX}${QATZSTD_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}" - ) - set(QATZSTD_STATIC_LIB_TARGETS - "${QATZSTD_SOURCE_DIR}/src/${QATZSTD_STATIC_LIB_NAME}") - set(QATZSTD_MAKE_ARGS "ENABLE_USDM_DRV=1" "ZSTDLIB=${ZSTD_INCLUDE_DIR}") - - ExternalProject_Add( - qatzstd_ep - PREFIX ${QATZSTD_PREFIX} - GIT_REPOSITORY ${QATZSTD_SOURCE_URL} - GIT_TAG ${QATZSTD_SOURCE_BRANCH} - SOURCE_DIR ${QATZSTD_SOURCE_DIR} - CONFIGURE_COMMAND "" - BUILD_COMMAND ${MAKE} ${QATZSTD_MAKE_ARGS} - INSTALL_COMMAND "" - BUILD_BYPRODUCTS ${QATZSTD_STATIC_LIB_TARGETS} - BUILD_IN_SOURCE 1) - - add_library(qatzstd::qatzstd STATIC IMPORTED) - - # The include directory must exist before it is referenced by a target. - file(MAKE_DIRECTORY "${QATZSTD_INCLUDE_DIR}") - - set(QATZSTD_INCLUDE_DIRS "${QATZSTD_INCLUDE_DIR}" "${ZSTD_INCLUDE_DIR}") - - set(QATZSTD_LINK_LIBRARIES - "${ZSTD_LIBRARY}" "${QAT_LIBRARY}" "${USDM_DRV_LIBRARY}" "${ADF_LIBRARY}" - "${OSAL_LIBRARY}") - - set_target_properties( - qatzstd::qatzstd - PROPERTIES IMPORTED_LOCATION "${QATZSTD_STATIC_LIB_TARGETS}" - INTERFACE_INCLUDE_DIRECTORIES "${QATZSTD_INCLUDE_DIRS}" - INTERFACE_LINK_LIBRARIES "${QATZSTD_LINK_LIBRARIES}") - - add_dependencies(qatzstd::qatzstd qatzstd_ep) -endmacro() - -find_library( - QAT_LIBRARY REQUIRED - NAMES qat - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - USDM_DRV_LIBRARY REQUIRED - NAMES usdm_drv - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - ADF_LIBRARY REQUIRED - NAMES adf - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - OSAL_LIBRARY REQUIRED - NAMES osal - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) - -message(STATUS "Found qat: ${QAT_LIBRARY}") -message(STATUS "Found usdm_drv: ${USDM_DRV_LIBRARY}") -message(STATUS "Found adf: ${ADF_LIBRARY}") -message(STATUS "Found osal: ${OSAL_LIBRARY}") - -build_qatzstd() diff --git a/cpp/CMake/BuildQATzip.cmake b/cpp/CMake/BuildQATzip.cmake deleted file mode 100644 index fd75757d7286..000000000000 --- a/cpp/CMake/BuildQATzip.cmake +++ /dev/null @@ -1,126 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include(ExternalProject) - -if("${MAKE}" STREQUAL "") - if(NOT MSVC) - find_program(MAKE make) - endif() -endif() - -macro(build_qatzip) - message(STATUS "Building QATzip from source") - set(QATZIP_BUILD_VERSION "v1.1.1") - set(QATZIP_BUILD_SHA256_CHECKSUM - "679f5522deb35e7ffa36f227ae49d07ef2d69a83e56bfda849303829b274e79b") - set(QATZIP_SOURCE_URL - "https://github.com/intel/QATzip/archive/refs/tags/${QATZIP_BUILD_VERSION}.tar.gz" - ) - set(QATZIP_LIB_NAME "qatzip") - - set(QATZIP_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/qatzip_ep-install") - set(QATZIP_SOURCE_DIR "${QATZIP_PREFIX}/src/qatzip_ep") - set(QATZIP_INCLUDE_DIR "${QATZIP_SOURCE_DIR}/include") - set(QATZIP_STATIC_LIB_NAME - "${CMAKE_STATIC_LIBRARY_PREFIX}${QATZIP_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}" - ) - set(QATZIP_STATIC_LIB_TARGETS - "${QATZIP_SOURCE_DIR}/src/.libs/${QATZIP_STATIC_LIB_NAME}") - set(QATZIP_CONFIGURE_ARGS "--prefix=${QATZIP_PREFIX}" "--with-pic" - "--with-ICP_ROOT=$ENV{ICP_ROOT}") - - ExternalProject_Add( - qatzip_ep - PREFIX ${QATZIP_PREFIX} - URL ${QATZIP_SOURCE_URL} - URL_HASH "SHA256=${QATZIP_BUILD_SHA256_CHECKSUM}" - SOURCE_DIR ${QATZIP_SOURCE_DIR} - CONFIGURE_COMMAND ${CMAKE_COMMAND} -E env QZ_ROOT=${QATZIP_SOURCE_DIR} - ./configure ${QATZIP_CONFIGURE_ARGS} - BUILD_COMMAND ${MAKE} all - BUILD_BYPRODUCTS ${QATZIP_STATIC_LIB_TARGETS} - BUILD_IN_SOURCE 1) - - ExternalProject_Add_Step( - qatzip_ep pre-configure - COMMAND ./autogen.sh - DEPENDEES download - DEPENDERS configure - WORKING_DIRECTORY ${QATZIP_SOURCE_DIR}) - - # The include directory must exist before it is referenced by a target. - file(MAKE_DIRECTORY "${QATZIP_INCLUDE_DIR}") - - set(QATZIP_LINK_LIBRARIES - "${ZLIB_LIBRARY}" - "${LZ4_LIBRARY}" - "${UDEV_LIBRARY}" - "${QAT_LIBRARY}" - "${USDM_DRV_LIBRARY}" - "${ADF_LIBRARY}" - "${OSAL_LIBRARY}" - Threads::Threads) - - # Fix libudev.so not get linked. - set(QATZIP_LINK_OPTIONS "-Wl,--no-as-needed") - - add_library(qatzip::qatzip STATIC IMPORTED) - set_target_properties( - qatzip::qatzip - PROPERTIES IMPORTED_LOCATION "${QATZIP_STATIC_LIB_TARGETS}" - INTERFACE_INCLUDE_DIRECTORIES "${QATZIP_INCLUDE_DIR}" - INTERFACE_LINK_LIBRARIES "${QATZIP_LINK_LIBRARIES}" - INTERFACE_LINK_OPTIONS "${QATZIP_LINK_OPTIONS}") - - add_dependencies(qatzip::qatzip qatzip_ep) -endmacro() - -set(THREADS_PREFER_PTHREAD_FLAG ON) -find_package(Threads REQUIRED) - -find_library(ZLIB_LIBRARY REQUIRED NAMES z) -find_library(LZ4_LIBRARY REQUIRED NAMES lz4) -find_library(UDEV_LIBRARY REQUIRED NAMES udev) -find_library( - USDM_DRV_LIBRARY REQUIRED - NAMES usdm_drv - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - QAT_LIBRARY REQUIRED - NAMES qat - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - OSAL_LIBRARY REQUIRED - NAMES osal - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) -find_library( - ADF_LIBRARY REQUIRED - NAMES adf - PATHS "$ENV{ICP_ROOT}/build" - NO_DEFAULT_PATH) - -message(STATUS "Found zlib: ${ZLIB_LIBRARY}") -message(STATUS "Found lz4: ${LZ4_LIBRARY}") -message(STATUS "Found udev: ${UDEV_LIBRARY}") -message(STATUS "Found usdm_drv: ${USDM_DRV_LIBRARY}") -message(STATUS "Found qat: ${QAT_LIBRARY}") - -build_qatzip() diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 13995253119c..719ff59f9c9c 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -48,7 +48,6 @@ option(BUILD_EXAMPLES "Build Examples" OFF) option(BUILD_BENCHMARKS "Build Benchmarks" OFF) option(ENABLE_JEMALLOC_STATS "Prints Jemalloc stats for debugging" OFF) option(BUILD_GLOG "Build Glog from Source" OFF) -option(ENABLE_QAT "Enable QAT for de/compression" OFF) option(ENABLE_GCS "Enable GCS" OFF) option(ENABLE_S3 "Enable S3" OFF) option(ENABLE_HDFS "Enable HDFS" OFF) @@ -228,10 +227,6 @@ if(BUILD_TESTS OR BUILD_BENCHMARKS) endif() endif() -if(ENABLE_QAT) - add_definitions(-DGLUTEN_ENABLE_QAT) -endif() - if(ENABLE_GPU) add_definitions(-DGLUTEN_ENABLE_GPU) endif() diff --git a/cpp/core/CMakeLists.txt b/cpp/core/CMakeLists.txt index 58dd301b6968..ac354c0c74fb 100644 --- a/cpp/core/CMakeLists.txt +++ b/cpp/core/CMakeLists.txt @@ -178,15 +178,6 @@ endif() find_arrow_lib(${ARROW_LIB_NAME}) find_arrow_lib(${ARROW_BUNDLED_DEPS}) -if(ENABLE_QAT) - include(BuildQATzip) - include(BuildQATZstd) - target_sources(gluten PRIVATE utils/qat/QatCodec.cc) - target_include_directories(gluten PUBLIC ${QATZIP_INCLUDE_DIR} - ${QATZSTD_INCLUDE_DIR}) - target_link_libraries(gluten PUBLIC qatzip::qatzip qatzstd::qatzstd) -endif() - find_protobuf() message(STATUS "Found Protobuf: ${PROTOBUF_LIBRARY}") target_link_libraries(gluten LINK_PUBLIC ${PROTOBUF_LIBRARY}) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index a1aeb5ac354c..8d10a562b0db 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -74,11 +74,9 @@ const std::string kUGIUserName = "spark.gluten.ugi.username"; const std::string kUGITokens = "spark.gluten.ugi.tokens"; const std::string kShuffleCompressionCodec = "spark.gluten.sql.columnar.shuffle.codec"; -const std::string kShuffleCompressionCodecBackend = "spark.gluten.sql.columnar.shuffle.codecBackend"; const std::string kShuffleSpillDiskWriteBufferSize = "spark.shuffle.spill.diskWriteBufferSize"; const std::string kSortShuffleReaderDeserializerBufferSize = "spark.gluten.sql.columnar.shuffle.sort.deserializerBufferSize"; -const std::string kQatBackendName = "qat"; const std::string kSparkRedactionRegex = "spark.redaction.regex"; const std::string kSparkRedactionString = "*********(redacted)"; diff --git a/cpp/core/jni/JniCommon.h b/cpp/core/jni/JniCommon.h index bc59a2a48555..e8f49e0b0b03 100644 --- a/cpp/core/jni/JniCommon.h +++ b/cpp/core/jni/JniCommon.h @@ -354,17 +354,6 @@ static inline arrow::Compression::type getCompressionType(JNIEnv* env, jstring c return compressionType; } -static inline gluten::CodecBackend getCodecBackend(JNIEnv* env, jstring codecBackendJstr) { - if (codecBackendJstr == nullptr) { - return gluten::CodecBackend::NONE; - } - auto codecBackend = jStringToCString(env, codecBackendJstr); - if (codecBackend == "qat") { - return gluten::CodecBackend::QAT; - } - throw std::invalid_argument("Not support this codec backend " + codecBackend); -} - static inline gluten::CompressionMode getCompressionMode(JNIEnv* env, jstring compressionModeJstr) { GLUTEN_DCHECK(compressionModeJstr != nullptr, "CompressionMode cannot be null"); auto compressionMode = jStringToCString(env, compressionModeJstr); diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 523c4a74940e..f9fbecd4d4ab 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -798,7 +798,6 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition jobject wrapper, jint numPartitions, jstring codecJstr, - jstring codecBackendJstr, jint compressionLevel, jint compressionBufferSize, jint compressionThreshold, @@ -827,7 +826,7 @@ Java_org_apache_gluten_vectorized_LocalPartitionWriterJniWrapper_createPartition auto partitionWriter = std::make_shared( numPartitions, - createArrowIpcCodec(getCompressionType(env, codecJstr), getCodecBackend(env, codecBackendJstr), compressionLevel), + createCompressionCodec(getCompressionType(env, codecJstr), compressionLevel), ctx->memoryManager(), partitionWriterOptions, dataFile, @@ -1035,7 +1034,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe jobject wrapper, jlong cSchema, jstring compressionType, - jstring compressionBackend, jint batchSize, jlong readerBufferSize, jlong deserializerBufferSize, @@ -1045,9 +1043,6 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleReaderJniWrappe ShuffleReaderOptions options = ShuffleReaderOptions{}; options.compressionType = getCompressionType(env, compressionType); - if (compressionType != nullptr) { - options.codecBackend = getCodecBackend(env, compressionBackend); - } options.batchSize = batchSize; options.readerBufferSize = readerBufferSize; options.deserializerBufferSize = deserializerBufferSize; diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 717f75dea54e..80d47f855ca8 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -52,7 +52,6 @@ struct ShuffleReaderOptions { // Compression options. arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; - CodecBackend codecBackend = CodecBackend::NONE; // Output batch size. int32_t batchSize = kDefaultBatchSize; diff --git a/cpp/core/utils/Compression.cc b/cpp/core/utils/Compression.cc index 913ee3b55fcc..8fcf817bf095 100644 --- a/cpp/core/utils/Compression.cc +++ b/cpp/core/utils/Compression.cc @@ -19,44 +19,21 @@ #include "Exception.h" -#ifdef GLUTEN_ENABLE_QAT -#include "utils/qat/QatCodec.h" -#endif - namespace gluten { -std::unique_ptr -createArrowIpcCodec(arrow::Compression::type compressedType, CodecBackend codecBackend, int32_t compressionLevel) { +std::unique_ptr createCompressionCodec( + arrow::Compression::type compressedType, + int32_t compressionLevel) { std::unique_ptr codec; + // TODO: More compression codecs should be supported. switch (compressedType) { - case arrow::Compression::LZ4_FRAME: { - GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType)); - } break; + case arrow::Compression::LZ4_FRAME: case arrow::Compression::ZSTD: { - if (codecBackend == CodecBackend::NONE) { - GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType, compressionLevel)); - } else if (codecBackend == CodecBackend::QAT) { -#if defined(GLUTEN_ENABLE_QAT) - codec = qat::makeDefaultQatZstdCodec(); -#else - throw GlutenException("Backend QAT but not compile with option GLUTEN_ENABLE_QAT"); -#endif - } - } break; - case arrow::Compression::GZIP: { - if (codecBackend == CodecBackend::NONE) { - return nullptr; - } else if (codecBackend == CodecBackend::QAT) { -#if defined(GLUTEN_ENABLE_QAT) - codec = qat::makeDefaultQatGZipCodec(); -#else - throw GlutenException("Backend QAT but not compile with option GLUTEN_ENABLE_QAT"); -#endif - } - } break; + GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType, compressionLevel)); + return codec; + } default: return nullptr; } - return codec; } } // namespace gluten diff --git a/cpp/core/utils/Compression.h b/cpp/core/utils/Compression.h index 5a3e170fb4a1..702833ddf39b 100644 --- a/cpp/core/utils/Compression.h +++ b/cpp/core/utils/Compression.h @@ -21,15 +21,12 @@ namespace gluten { -enum CodecBackend { NONE, QAT }; - // BUFFER mode will preallocate max compressed buffer, and then compress each buffer to the max compressed buffer // ROWVECTOR mode will copy the buffers to a big buffer and then compress the big buffer enum CompressionMode { BUFFER, ROWVECTOR }; -std::unique_ptr createArrowIpcCodec( +std::unique_ptr createCompressionCodec( arrow::Compression::type compressedType, - CodecBackend codecBackend, int32_t compressionLevel = arrow::util::kUseDefaultCompressionLevel); } // namespace gluten diff --git a/cpp/core/utils/qat/QatCodec.cc b/cpp/core/utils/qat/QatCodec.cc deleted file mode 100644 index c86fc3bc2152..000000000000 --- a/cpp/core/utils/qat/QatCodec.cc +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "QatCodec.h" - -#define QZ_INIT_FAIL(rc) ((QZ_OK != (rc)) && (QZ_DUPLICATE != (rc))) - -#define QZ_SETUP_SESSION_FAIL(rc) (QZ_PARAMS == (rc) || QZ_NOSW_NO_HW == (rc) || QZ_NOSW_LOW_MEM == (rc)) - -namespace gluten { -namespace qat { - -class QatZipCodec : public arrow::util::Codec { - protected: - explicit QatZipCodec(int compressionLevel) : compressionLevel_(compressionLevel) {} - - ~QatZipCodec() { - static_cast(qzTeardownSession(&qzSession_)); - static_cast(qzClose(&qzSession_)); - } - - arrow::Result Decompress(int64_t inputLen, const uint8_t* input, int64_t outputLen, uint8_t* output) - override { - uint32_t compressedSize = static_cast(inputLen); - uint32_t uncompressedSize = static_cast(outputLen); - int ret = qzDecompress(&qzSession_, input, &compressedSize, output, &uncompressedSize); - if (ret == QZ_OK) { - return static_cast(uncompressedSize); - } else if (ret == QZ_PARAMS) { - return arrow::Status::IOError("QAT decompression failure: params is invalid"); - } else if (ret == QZ_FAIL) { - return arrow::Status::IOError("QAT decompression failure: Function did not succeed"); - } else { - return arrow::Status::IOError("QAT decompression failure with error:", ret); - } - } - - int64_t MaxCompressedLen(int64_t inputLen, const uint8_t* ARROW_ARG_UNUSED(input)) override { - ARROW_DCHECK_GE(inputLen, 0); - return qzMaxCompressedLength(static_cast(inputLen), &qzSession_); - } - - arrow::Result Compress(int64_t inputLen, const uint8_t* input, int64_t outputLen, uint8_t* output) override { - uint32_t uncompressedSize = static_cast(inputLen); - uint32_t compressedSize = static_cast(outputLen); - int ret = qzCompress(&qzSession_, input, &uncompressedSize, output, &compressedSize, 1); - if (ret == QZ_OK) { - return static_cast(compressedSize); - } else if (ret == QZ_PARAMS) { - return arrow::Status::IOError("QAT compression failure: params is invalid"); - } else if (ret == QZ_FAIL) { - return arrow::Status::IOError("QAT compression failure: function did not succeed"); - } else { - return arrow::Status::IOError("QAT compression failure with error:", ret); - } - } - - arrow::Result> MakeCompressor() override { - return arrow::Status::NotImplemented("Streaming compression unsupported with QAT"); - } - - arrow::Result> MakeDecompressor() override { - return arrow::Status::NotImplemented("Streaming decompression unsupported with QAT"); - } - - int compression_level() const override { - return compressionLevel_; - } - - int compressionLevel_; - QzSession_T qzSession_ = {0}; -}; - -class QatGZipCodec final : public QatZipCodec { - public: - QatGZipCodec(QzPollingMode_T pollingMode, int compressionLevel) : QatZipCodec(compressionLevel) { - auto rc = qzInit(&qzSession_, /* sw_backup = */ 1); - if (QZ_INIT_FAIL(rc)) { - ARROW_LOG(WARNING) << "qzInit failed with error: " << rc; - } else { - QzSessionParamsDeflate_T params; - qzGetDefaultsDeflate(¶ms); // get the default value. - params.common_params.polling_mode = pollingMode; - params.common_params.comp_lvl = compressionLevel; - rc = qzSetupSessionDeflate(&qzSession_, ¶ms); - if (QZ_SETUP_SESSION_FAIL(rc)) { - ARROW_LOG(WARNING) << "qzSetupSession failed with error: " << rc; - } - } - } - - arrow::Compression::type compression_type() const override { - return arrow::Compression::GZIP; - } - - int minimum_compression_level() const override { - return QZ_DEFLATE_COMP_LVL_MINIMUM; - } - int maximum_compression_level() const override { - return QZ_DEFLATE_COMP_LVL_MAXIMUM; - } - int default_compression_level() const override { - return QZ_COMP_LEVEL_DEFAULT; - } -}; - -bool supportsCodec(const std::string& codec) { - return !codec.empty() && std::any_of(kQatSupportedCodec.begin(), kQatSupportedCodec.end(), [&](const auto& qatCodec) { - return qatCodec == codec; - }); -} - -std::unique_ptr makeQatGZipCodec(QzPollingMode_T pollingMode, int compressionLevel) { - return std::unique_ptr(new QatGZipCodec(pollingMode, compressionLevel)); -} - -std::unique_ptr makeDefaultQatGZipCodec() { - return makeQatGZipCodec(QZ_BUSY_POLLING, QZ_COMP_LEVEL_DEFAULT); -} - -namespace { -constexpr int kZSTDDefaultCompressionLevel = 1; -arrow::Status ZSTDError(size_t ret, const char* prefix_msg) { - return arrow::Status::IOError(prefix_msg, ZSTD_getErrorName(ret)); -} - -void logZstdOnError(size_t ret, const char* prefixMsg) { - if (ZSTD_isError(ret)) { - ARROW_LOG(WARNING) << prefixMsg << ZSTD_getErrorName(ret); - } -} - -} // namespace - -class QatDevice { - public: - QatDevice() { - if (QZSTD_startQatDevice() == QZSTD_FAIL) { - ARROW_LOG(WARNING) << "QZSTD_startQatDevice failed"; - } else { - initialized_ = true; - } - } - - ~QatDevice() { - if (initialized_) { - QZSTD_stopQatDevice(); - } - } - - static std::shared_ptr getInstance() { - std::call_once(initQat_, []() { instance_ = std::make_shared(); }); - return instance_; - } - - bool deviceInitialized() { - return initialized_; - } - - private: - inline static std::shared_ptr instance_; - inline static std::once_flag initQat_; - bool initialized_{false}; -}; - -class QatZstdCodec final : public arrow::util::Codec { - public: - explicit QatZstdCodec(int compressionLevel) : compressionLevel_(compressionLevel) {} - - ~QatZstdCodec() { - if (initCCtx_) { - ZSTD_freeCCtx(zc_); - if (sequenceProducerState_) { - QZSTD_freeSeqProdState(sequenceProducerState_); - } - } - } - - arrow::Compression::type compression_type() const override { - return arrow::Compression::ZSTD; - } - - arrow::Result Decompress(int64_t inputLen, const uint8_t* input, int64_t outputLen, uint8_t* output) - override { - if (output == nullptr) { - // We may pass a NULL 0-byte output buffer but some zstd versions demand - // a valid pointer: https://github.com/facebook/zstd/issues/1385 - static uint8_t emptyBuffer; - DCHECK_EQ(outputLen, 0); - output = &emptyBuffer; - } - - size_t ret = ZSTD_decompress(output, static_cast(outputLen), input, static_cast(inputLen)); - if (ZSTD_isError(ret)) { - return ZSTDError(ret, "ZSTD decompression failed: "); - } - if (static_cast(ret) != outputLen) { - return arrow::Status::IOError("Corrupt ZSTD compressed data."); - } - return static_cast(ret); - } - - int64_t MaxCompressedLen(int64_t inputLen, const uint8_t* ARROW_ARG_UNUSED(input)) override { - DCHECK_GE(inputLen, 0); - return ZSTD_compressBound(static_cast(inputLen)); - } - - arrow::Result Compress(int64_t inputLen, const uint8_t* input, int64_t outputLen, uint8_t* output) override { - RETURN_NOT_OK(initCCtx()); - size_t ret = ZSTD_compress2(zc_, output, static_cast(outputLen), input, static_cast(inputLen)); - if (ZSTD_isError(ret)) { - return ZSTDError(ret, "ZSTD compression failed: "); - } - return static_cast(ret); - } - - arrow::Result> MakeCompressor() override { - return arrow::Status::NotImplemented("Streaming compression unsupported with QAT"); - } - - arrow::Result> MakeDecompressor() override { - return arrow::Status::NotImplemented("Streaming decompression unsupported with QAT"); - } - - int minimum_compression_level() const override { - return ZSTD_minCLevel(); - } - int maximum_compression_level() const override { - return ZSTD_maxCLevel(); - } - int default_compression_level() const override { - return kZSTDDefaultCompressionLevel; - } - - int compression_level() const override { - return compressionLevel_; - } - - private: - int compressionLevel_; - ZSTD_CCtx* zc_; - bool initCCtx_{false}; - - std::shared_ptr qatDevice_; - void* sequenceProducerState_{nullptr}; - - arrow::Status initCCtx() { - if (initCCtx_) { - return arrow::Status::OK(); - } - zc_ = ZSTD_createCCtx(); - logZstdOnError( - ZSTD_CCtx_setParameter(zc_, ZSTD_c_compressionLevel, compressionLevel_), - "ZSTD_CCtx_setParameter failed on ZSTD_c_compressionLevel: "); - if (!qatDevice_) { - qatDevice_ = QatDevice::getInstance(); - } - if (qatDevice_->deviceInitialized()) { - sequenceProducerState_ = QZSTD_createSeqProdState(); - /* register qatSequenceProducer */ - ZSTD_registerSequenceProducer(zc_, sequenceProducerState_, qatSequenceProducer); - /* Enable sequence producer fallback */ - logZstdOnError( - ZSTD_CCtx_setParameter(zc_, ZSTD_c_enableSeqProducerFallback, 1), - "ZSTD_CCtx_setParameter failed on ZSTD_c_enableSeqProducerFallback: "); - } - initCCtx_ = true; - return arrow::Status::OK(); - } -}; - -std::unique_ptr makeQatZstdCodec(int compressionLevel) { - return std::unique_ptr(new QatZstdCodec(compressionLevel)); -} - -std::unique_ptr makeDefaultQatZstdCodec() { - return makeQatZstdCodec(kZSTDDefaultCompressionLevel); -} - -} // namespace qat -} // namespace gluten diff --git a/cpp/core/utils/qat/QatCodec.h b/cpp/core/utils/qat/QatCodec.h deleted file mode 100644 index d56f2fda1244..000000000000 --- a/cpp/core/utils/qat/QatCodec.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include -#include - -namespace gluten { -namespace qat { - -static const std::vector kQatSupportedCodec = {"gzip", "zstd"}; - -bool supportsCodec(const std::string& qatCodec); - -std::unique_ptr makeQatGZipCodec(QzPollingMode_T pollingMode, int compressionLevel); - -std::unique_ptr makeDefaultQatGZipCodec(); - -std::unique_ptr makeQatZstdCodec(int compressionLevel); - -std::unique_ptr makeDefaultQatZstdCodec(); -} // namespace qat -} // namespace gluten diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 98c92b732b3f..07969e3be2d9 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -58,7 +58,10 @@ DEFINE_string( "rr", "Short partitioning name. Valid options are rr, hash, range, single, random (only for test purpose)"); DEFINE_bool(rss, false, "Mocking rss."); -DEFINE_string(compression, "lz4", "Specify the compression codec. Valid options are none, lz4, zstd"); +DEFINE_string( + compression, + "lz4", + "Specify the compression codec. Valid options are none, lz4, zstd"); DEFINE_int32(shuffle_partitions, 200, "Number of shuffle split (reducer) partitions"); DEFINE_bool(shuffle_dictionary, false, "Whether to enable dictionary encoding for shuffle write."); @@ -161,8 +164,7 @@ void cleanupLocalDirs(const std::vector& localDirs) { } } -void setCompressionTypeFromFlag(arrow::Compression::type& compressionType, CodecBackend& codecBackend) { - codecBackend = CodecBackend::NONE; +void setCompressionTypeFromFlag(arrow::Compression::type& compressionType) { if (FLAGS_compression == "none") { compressionType = arrow::Compression::UNCOMPRESSED; } else if (FLAGS_compression == "lz4") { @@ -181,11 +183,10 @@ std::unique_ptr createCodec() { } arrow::Compression::type compressionType; - CodecBackend codecBackend; - setCompressionTypeFromFlag(compressionType, codecBackend); + setCompressionTypeFromFlag(compressionType); - return createArrowIpcCodec(compressionType, codecBackend); + return createCompressionCodec(compressionType); } std::shared_ptr @@ -237,7 +238,7 @@ std::shared_ptr createShuffleWriter( std::shared_ptr createShuffleReader(Runtime* runtime, const std::shared_ptr& schema) { auto readerOptions = ShuffleReaderOptions{}; readerOptions.shuffleWriterType = ShuffleWriter::stringToType(FLAGS_shuffle_writer), - setCompressionTypeFromFlag(readerOptions.compressionType, readerOptions.codecBackend); + setCompressionTypeFromFlag(readerOptions.compressionType); return runtime->createShuffleReader(schema, readerOptions); } diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 96fa3e5f4c79..5696a53be50f 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -24,9 +24,6 @@ #include "operators/plannodes/RowVectorStream.h" #include "utils/ConfigExtractor.h" -#ifdef GLUTEN_ENABLE_QAT -#include "utils/qat/QatCodec.h" -#endif #ifdef GLUTEN_ENABLE_GPU #include "velox/experimental/cudf/exec/ToCudf.h" #endif diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index e826837e934f..cc488bc2f241 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -288,7 +288,7 @@ std::shared_ptr VeloxRuntime::createDataSource( std::shared_ptr VeloxRuntime::createShuffleReader( std::shared_ptr schema, ShuffleReaderOptions options) { - auto codec = gluten::createArrowIpcCodec(options.compressionType, options.codecBackend); + auto codec = gluten::createCompressionCodec(options.compressionType); const auto veloxCompressionKind = arrowCompressionTypeToVelox(options.compressionType); const auto rowType = facebook::velox::asRowType(gluten::fromArrowSchema(schema)); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 624a93f17005..6da20ecb2bfe 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -587,7 +587,6 @@ Java_org_apache_gluten_vectorized_CelebornPartitionWriterJniWrapper_createPartit jobject wrapper, jint numPartitions, jstring codecJstr, - jstring codecBackendJstr, jint compressionLevel, jint compressionBufferSize, jint pushBufferMaxSize, @@ -615,7 +614,7 @@ Java_org_apache_gluten_vectorized_CelebornPartitionWriterJniWrapper_createPartit auto partitionWriter = std::make_shared( numPartitions, - createArrowIpcCodec(getCompressionType(env, codecJstr), getCodecBackend(env, codecBackendJstr), compressionLevel), + createCompressionCodec(getCompressionType(env, codecJstr), compressionLevel), ctx->memoryManager(), partitionWriterOptions, celebornClient); @@ -630,7 +629,6 @@ Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti jobject wrapper, jint numPartitions, jstring codecJstr, - jstring codecBackendJstr, jint compressionLevel, jint compressionBufferSize, jint pushBufferMaxSize, @@ -658,7 +656,7 @@ Java_org_apache_gluten_vectorized_UnifflePartitionWriterJniWrapper_createPartiti auto partitionWriter = std::make_shared( numPartitions, - createArrowIpcCodec(getCompressionType(env, codecJstr), getCodecBackend(env, codecBackendJstr), compressionLevel), + createCompressionCodec(getCompressionType(env, codecJstr), compressionLevel), ctx->memoryManager(), partitionWriterOptions, uniffleClient); diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index 419f18861e73..74a38888ce46 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -108,46 +108,50 @@ std::vector getTestParams() { for (const auto partitionWriterType : {PartitionWriterType::kLocal, PartitionWriterType::kRss}) { for (const auto diskWriteBufferSize : {4, 56, 32 * 1024}) { for (const bool useRadixSort : {true, false}) { - for (const int64_t deserializerBufferSize : {1L, kDefaultDeserializerBufferSize}) { - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kSortShuffle, - .partitionWriterType = partitionWriterType, - .compressionType = compression, - .diskWriteBufferSize = diskWriteBufferSize, - .useRadixSort = useRadixSort, - .deserializerBufferSize = deserializerBufferSize}); + for (const auto deserializerBufferSize : {static_cast(1L), kDefaultDeserializerBufferSize}) { + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kSortShuffle, + .partitionWriterType = partitionWriterType, + .compressionType = compression, + .diskWriteBufferSize = diskWriteBufferSize, + .useRadixSort = useRadixSort, + .deserializerBufferSize = deserializerBufferSize}); } } } } // Rss sort-based shuffle. - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kRssSortShuffle, - .partitionWriterType = PartitionWriterType::kRss, - .compressionType = compression}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kRssSortShuffle, + .partitionWriterType = PartitionWriterType::kRss, + .compressionType = compression}); // Hash-based shuffle. for (const auto compressionThreshold : compressionThresholds) { // Local. for (const auto mergeBufferSize : mergeBufferSizes) { for (const bool enableDictionary : {true, false}) { - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kHashShuffle, - .partitionWriterType = PartitionWriterType::kLocal, - .compressionType = compression, - .compressionThreshold = compressionThreshold, - .mergeBufferSize = mergeBufferSize, - .enableDictionary = enableDictionary}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kHashShuffle, + .partitionWriterType = PartitionWriterType::kLocal, + .compressionType = compression, + .compressionThreshold = compressionThreshold, + .mergeBufferSize = mergeBufferSize, + .enableDictionary = enableDictionary}); } } // Rss. - params.push_back(ShuffleTestParams{ - .shuffleWriterType = ShuffleWriterType::kHashShuffle, - .partitionWriterType = PartitionWriterType::kRss, - .compressionType = compression, - .compressionThreshold = compressionThreshold}); + params.push_back( + ShuffleTestParams{ + .shuffleWriterType = ShuffleWriterType::kHashShuffle, + .partitionWriterType = PartitionWriterType::kRss, + .compressionType = compression, + .compressionThreshold = compressionThreshold}); } } @@ -163,7 +167,7 @@ std::shared_ptr createPartitionWriter( int32_t mergeBufferSize, int32_t compressionThreshold, bool enableDictionary) { - GLUTEN_ASSIGN_OR_THROW(auto codec, arrow::util::Codec::Create(compressionType)); + auto codec = createCompressionCodec(compressionType); switch (partitionWriterType) { case PartitionWriterType::kLocal: { auto options = std::make_shared(); @@ -292,7 +296,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParamgetLeafMemoryPool().get()); - auto codec = createArrowIpcCodec(compressionType, CodecBackend::NONE); + auto codec = createCompressionCodec(compressionType); // Set batchSize to a large value to make all batches are merged by reader. auto deserializerFactory = std::make_unique( @@ -501,12 +505,7 @@ TEST_P(HashPartitioningShuffleWriterTest, hashPart1Vector) { makeFlatVector({232, 34567235, 1212, 4567}, DECIMAL(20, 4)), makeFlatVector( 4, [](vector_size_t row) { return row % 2; }, nullEvery(5), DATE()), - makeFlatVector( - 4, - [](vector_size_t row) { - return Timestamp{row % 2, 0}; - }, - nullEvery(5))}; + makeFlatVector(4, [](vector_size_t row) { return Timestamp{row % 2, 0}; }, nullEvery(5))}; const auto vector = makeRowVector(data); diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index aeaf872804b3..10cfeeef2e72 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -31,7 +31,6 @@ BUILD_BENCHMARKS=OFF ENABLE_JEMALLOC_STATS=OFF BUILD_VELOX_TESTS=OFF BUILD_VELOX_BENCHMARKS=OFF -ENABLE_QAT=OFF ENABLE_GCS=OFF ENABLE_S3=OFF ENABLE_HDFS=OFF @@ -83,10 +82,6 @@ do ENABLE_JEMALLOC_STATS=("${arg#*=}") shift # Remove argument name from processing ;; - --enable_qat=*) - ENABLE_QAT=("${arg#*=}") - shift # Remove argument name from processing - ;; --enable_gcs=*) ENABLE_GCS=("${arg#*=}") shift # Remove argument name from processing @@ -235,7 +230,6 @@ function build_gluten_cpp { -DBUILD_EXAMPLES=$BUILD_EXAMPLES \ -DBUILD_BENCHMARKS=$BUILD_BENCHMARKS \ -DENABLE_JEMALLOC_STATS=$ENABLE_JEMALLOC_STATS \ - -DENABLE_QAT=$ENABLE_QAT \ -DENABLE_GCS=$ENABLE_GCS \ -DENABLE_S3=$ENABLE_S3 \ -DENABLE_HDFS=$ENABLE_HDFS \ diff --git a/dev/setup-qat-ubuntu.sh b/dev/setup-qat-ubuntu.sh deleted file mode 100755 index 979def4e2fab..000000000000 --- a/dev/setup-qat-ubuntu.sh +++ /dev/null @@ -1,59 +0,0 @@ -#!/bin/bash - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Exit on any error -set -e - -# Define versions and URLs -QAT_VERSION="QAT20.L.1.0.50-00003" -QAT_URL="https://downloadmirror.intel.com/783270/${QAT_VERSION}.tar.gz" -ZSTD_VERSION="zstd-1.5.5" -ZSTD_URL="https://github.com/facebook/zstd/releases/download/v1.5.5/${ZSTD_VERSION}.tar.gz" - -# Install required packages for QAT -sudo apt-get update -sudo apt-get install -y zlib1g-dev libisal-dev libudev-dev udev yasm libboost-all-dev gcc g++ pkg-config linux-headers-$(uname -r) - -# Download and extract QAT driver -sudo rm -rf /opt/QAT20 -sudo mkdir -p /opt/QAT20 -sudo wget -O /opt/QAT20/${QAT_VERSION}.tar.gz ${QAT_URL} -sudo tar -C /opt/QAT20 -zxf /opt/QAT20/${QAT_VERSION}.tar.gz - -# Compile and install QAT driver -cd /opt/QAT20 -sudo ./configure -sudo make -sudo make install - -# Update environment variables for QAT driver -echo "export ICP_ROOT=/opt/QAT20" >> ~/.bashrc - -# Download and extract zstd -sudo wget -O /opt/${ZSTD_VERSION}.tar.gz ${ZSTD_URL} -sudo tar -C /opt -zxf /opt/${ZSTD_VERSION}.tar.gz - -# Compile and install zstd -sudo mkdir -p /opt/${ZSTD_VERSION}/build/cmake/build -cd /opt/${ZSTD_VERSION}/build/cmake/build -sudo cmake -DCMAKE_INSTALL_PREFIX=/usr/local .. -sudo make -j -sudo make install - -echo -e "QAT setup is complete." -echo -e "To apply the changes, please log out and log back in." - diff --git a/docs/Configuration.md b/docs/Configuration.md index 4b1013f8e214..bf2516ba53e7 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -109,8 +109,7 @@ nav_order: 15 | spark.gluten.sql.columnar.shuffle | true | Enable or disable columnar shuffle. | | spark.gluten.sql.columnar.shuffle.celeborn.fallback.enabled | true | If enabled, fall back to ColumnarShuffleManager when celeborn service is unavailable.Otherwise, throw an exception. | | spark.gluten.sql.columnar.shuffle.celeborn.useRssSort | true | If true, use RSS sort implementation for Celeborn sort-based shuffle.If false, use Gluten's row-based sort implementation. Only valid when `spark.celeborn.client.spark.shuffle.writer` is set to `sort`. | -| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=qat,the supported codecs are gzip and zstd. When spark.gluten.sql.columnar.shuffle.codecBackend=iaa,the supported codec is gzip. | -| spark.gluten.sql.columnar.shuffle.codecBackend | <undefined> | +| spark.gluten.sql.columnar.shuffle.codec | <undefined> | By default, the supported codecs are lz4 and zstd.| | spark.gluten.sql.columnar.shuffle.compression.threshold | 100 | If number of rows in a batch falls below this threshold, will copy all buffers into one buffer to compress. | | spark.gluten.sql.columnar.shuffle.compressionMode | buffer | buffer means compress each buffer to pre allocated big buffer,rowvector means to copy the buffers to a big buffer, and then compress the buffer | | spark.gluten.sql.columnar.shuffle.dictionary.enabled | false | Enable dictionary in hash-based shuffle. | diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 64d35bba1c65..b65db1858de4 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -590,8 +590,3 @@ To enable this feature, you can set the following Spark configuration: This feature has been tested through a series of tests, and we are collecting more feedback from users. If you have memory problem on broadcast build relations, please try this feature and give more feedbacks. **Note**: This feature will become the default behavior once stabilized. Stay tuned for updates! - - -# Accelerators - -Please refer [QAT](VeloxQAT.md) for details diff --git a/docs/get-started/VeloxQAT.md b/docs/get-started/VeloxQAT.md deleted file mode 100644 index 6d33654e59d5..000000000000 --- a/docs/get-started/VeloxQAT.md +++ /dev/null @@ -1,155 +0,0 @@ ---- -layout: page -title: QAT Support in Velox Backend -nav_order: 1 -parent: Getting-Started ---- - -# Intel® QuickAssist Technology (QAT) support - -Gluten supports using Intel® QuickAssist Technology (QAT) for data compression during Spark Shuffle. It benefits from QAT Hardware-based acceleration on compression/decompression, and uses Gzip as compression format for higher compression ratio to reduce the pressure on disks and network transmission. - -This feature is based on QAT driver library and [QATzip](https://github.com/intel/QATzip) library. Please manually download QAT driver for your system, and follow its README to build and install on all Driver and Worker node: [Intel® QuickAssist Technology Driver for Linux* – HW Version 2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist). - -## Software Requirements - -- Download QAT driver for your system, and follow its README to build and install on all Driver and Worker nodes: [Intel® QuickAssist Technology Driver for Linux* – HW Version 2.0](https://www.intel.com/content/www/us/en/download/765501/intel-quickassist-technology-driver-for-linux-hw-version-2-0.html?wapkw=quickassist). -- Below compression libraries need to be installed on all Driver and Worker nodes: - - Zlib* library of version 1.2.7 or higher - - ZSTD* library of version 1.5.4 or higher - - LZ4* library - -## Build Gluten with QAT - -1. Setup ICP_ROOT environment variable to the directory where QAT driver is extracted. This environment variable is required during building Gluten and running Spark applications. It's recommended to put it in .bashrc on Driver and Worker nodes. - -```bash -echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc -source ~/.bashrc - -# Also set for root if running as non-root user -sudo su - -echo "export ICP_ROOT=/path/to/QAT_driver" >> ~/.bashrc -exit -``` - -2. **This step is required if your application is running as Non-root user.** - The users must be added to the 'qat' group after QAT drvier is installed. And change the amount of max locked memory for the username that is included in the group name. This can be done by specifying the limit in /etc/security/limits.conf. - -```bash -sudo su - -usermod -aG qat username # need relogin to take effect - -# To set 500MB add a line like this in /etc/security/limits.conf -echo "@qat - memlock 500000" >> /etc/security/limits.conf - -exit -``` - -3. Enable huge page. This step is required to execute each time after system reboot. We recommend using systemctl to manage at system startup. You change the values for "max_huge_pages" and "max_huge_pages_per_process" to make sure there are enough resources for your workload. As for Spark applications, one process matches one executor. Within the executor, every task is allocated a maximum of 5 huge pages. - -```bash -sudo su - - -cat << EOF > /usr/local/bin/qat_startup.sh -#!/bin/bash -echo 1024 > /sys/kernel/mm/hugepages/hugepages-2048kB/nr_hugepages -rmmod usdm_drv -insmod $ICP_ROOT/build/usdm_drv.ko max_huge_pages=1024 max_huge_pages_per_process=32 -EOF - -chmod +x /usr/local/bin/qat_startup.sh - -cat << EOF > /etc/systemd/system/qat_startup.service -[Unit] -Description=Configure QAT - -[Service] -ExecStart=/usr/local/bin/qat_startup.sh - -[Install] -WantedBy=multi-user.target -EOF - -systemctl enable qat_startup.service -systemctl start qat_startup.service # setup immediately -systemctl status qat_startup.service - -exit -``` - -4. After the setup, you are now ready to build Gluten with QAT. Use the command below to enable this feature: - -```bash -cd /path/to/gluten - -## The script builds four jars for spark 3.2.2, 3.3.1, 3.4.3 and 3.5.1. -./dev/buildbundle-veloxbe.sh --enable_qat=ON -``` - -## Enable QAT with Gzip/Zstd for shuffle compression - -1. To offload shuffle compression into QAT, first make sure you have the right QAT configuration file at /etc/4xxx_devX.conf. We provide a [example configuration file](../qat/4x16.conf). This configuration sets up to 4 processes that can bind to 1 QAT, and each process can use up to 16 QAT DC instances. - -```bash -## run as root -## Overwrite QAT configuration file. -cd /etc -for i in {0..7}; do echo "4xxx_dev$i.conf"; done | xargs -i cp -f /path/to/gluten/docs/qat/4x16.conf {} -## Restart QAT after updating configuration files. -adf_ctl restart -``` - -2. Check QAT status and make sure the status is up - -```bash -adf_ctl status -``` - -The output should be like: - -``` -Checking status of all devices. -There is 8 QAT acceleration device(s) in the system: - qat_dev0 - type: 4xxx, inst_id: 0, node_id: 0, bsf: 0000:6b:00.0, #accel: 1 #engines: 9 state: up - qat_dev1 - type: 4xxx, inst_id: 1, node_id: 1, bsf: 0000:70:00.0, #accel: 1 #engines: 9 state: up - qat_dev2 - type: 4xxx, inst_id: 2, node_id: 2, bsf: 0000:75:00.0, #accel: 1 #engines: 9 state: up - qat_dev3 - type: 4xxx, inst_id: 3, node_id: 3, bsf: 0000:7a:00.0, #accel: 1 #engines: 9 state: up - qat_dev4 - type: 4xxx, inst_id: 4, node_id: 4, bsf: 0000:e8:00.0, #accel: 1 #engines: 9 state: up - qat_dev5 - type: 4xxx, inst_id: 5, node_id: 5, bsf: 0000:ed:00.0, #accel: 1 #engines: 9 state: up - qat_dev6 - type: 4xxx, inst_id: 6, node_id: 6, bsf: 0000:f2:00.0, #accel: 1 #engines: 9 state: up - qat_dev7 - type: 4xxx, inst_id: 7, node_id: 7, bsf: 0000:f7:00.0, #accel: 1 #engines: 9 state: up -``` - -3. Extra Gluten configurations are required when starting Spark application - -``` ---conf spark.gluten.sql.columnar.shuffle.codec=gzip # Valid options are gzip and zstd ---conf spark.gluten.sql.columnar.shuffle.codecBackend=qat -``` - -4. You can use below command to check whether QAT is working normally at run-time. The value of fw_counters should continue to increase during shuffle. - -``` -while :; do cat /sys/kernel/debug/qat_4xxx_0000:6b:00.0/fw_counters; sleep 1; done -``` - -## QAT driver references - -**Documentation** - -[README Text Files (README_QAT20.L.1.0.0-00021.txt)](https://downloadmirror.intel.com/765523/README_QAT20.L.1.0.0-00021.txt) - -**Release Notes** - -Check out the [Intel® QuickAssist Technology Software for Linux*](https://www.intel.com/content/www/us/en/content-details/632507/intel-quickassist-technology-intel-qat-software-for-linux-release-notes-hardware-version-2-0.html) - Release Notes for the latest changes in this release. - -**Getting Started Guide** - -Check out the [Intel® QuickAssist Technology Software for Linux*](https://www.intel.com/content/www/us/en/content-details/632506/intel-quickassist-technology-intel-qat-software-for-linux-getting-started-guide-hardware-version-2-0.html) - Getting Started Guide for detailed installation instructions. - -**Programmer's Guide** - -Check out the [Intel® QuickAssist Technology Software for Linux*](https://www.intel.com/content/www/us/en/content-details/743912/intel-quickassist-technology-intel-qat-software-for-linux-programmers-guide-hardware-version-2-0.html) - Programmer's Guide for software usage guidelines. - -For more Intel® QuickAssist Technology resources go to [Intel® QuickAssist Technology (Intel® QAT)](https://developer.intel.com/quickassist) diff --git a/docs/get-started/build-guide.md b/docs/get-started/build-guide.md index ffd02ad7ab61..d71940f2703d 100644 --- a/docs/get-started/build-guide.md +++ b/docs/get-started/build-guide.md @@ -15,7 +15,6 @@ Please set them via `--`, e.g. `--build_type=Release`. | build_examples | Build udf example. | OFF | | build_benchmarks | Build gluten cpp benchmarks. | OFF | | enable_jemalloc_stats | Print jemalloc stats for debugging. | OFF | -| enable_qat | Enable QAT for shuffle data de/compression. | OFF | | enable_s3 | Build with S3 support. | OFF | | enable_gcs | Build with GCS support. | OFF | | enable_hdfs | Build with HDFS support. | OFF | diff --git a/docs/qat/4x16.conf b/docs/qat/4x16.conf deleted file mode 100644 index f9a3964da714..000000000000 --- a/docs/qat/4x16.conf +++ /dev/null @@ -1,253 +0,0 @@ -################################################################ -# This file is provided under a dual BSD/GPLv2 license. When using or -# redistributing this file, you may do so under either license. -# -# GPL LICENSE SUMMARY -# -# Copyright(c) 2007-2022 Intel Corporation. All rights reserved. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of version 2 of the GNU General Public License as -# published by the Free Software Foundation. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA. -# The full GNU General Public License is included in this distribution -# in the file called LICENSE.GPL. -# -# Contact Information: -# Intel Corporation -# -# BSD LICENSE -# -# Copyright(c) 2007-2022 Intel Corporation. All rights reserved. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in -# the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Intel Corporation nor the names of its -# contributors may be used to endorse or promote products derived -# from this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# -# -# -################################################################ -[GENERAL] -ServicesEnabled = dc - -ConfigVersion = 2 - -#Default value for FW Auth loading -FirmwareAuthEnabled = 1 - -#Default values for number of concurrent requests*/ -CyNumConcurrentSymRequests = 512 -CyNumConcurrentAasymRequests = 64 - -#Statistics, valid values: 1,0 -statsGeneral = 1 -statsDh = 1 -statsDrbg = 1 -statsDsa = 1 -statsEcc = 1 -statsKeyGen = 1 -statsDc = 1 -statsLn = 1 -statsPrime = 1 -statsRsa = 1 -statsSym = 1 - -# This flag is to enable SSF features (CNV and BnP) -StorageEnabled = 0 - -# Disable public key crypto and prime number -# services by specifying a value of 1 (default is 0) -PkeServiceDisabled = 0 - -# This flag is to enable device auto reset on heartbeat error -AutoResetOnError = 0 - -# Default value for power management idle interrrupt delay -PmIdleInterruptDelay = 0 - -# This flag is to enable power management idle support -PmIdleSupport = 1 - -# This flag is to enable key protection technology -KptEnabled = 1 - -# Define the maximum SWK count per function can have -# Default value is 1, the maximum value is 128 -KptMaxSWKPerFn = 1 - -# Define the maximum SWK count per pasid can have -# Default value is 1, the maximum value is 128 -KptMaxSWKPerPASID = 1 - -# Define the maximum SWK lifetime in second -# Default value is 0 (eternal of life) -# The maximum value is 31536000 (one year) -KptMaxSWKLifetime = 31536000 - -# Flag to define whether to allow SWK to be shared among processes -# Default value is 0 (shared mode is off) -KptSWKShared = 0 - -############################################## -# Kernel Instances Section -############################################## -[KERNEL] -NumberCyInstances = 0 -NumberDcInstances = 0 - -# Crypto - Kernel instance #0 -Cy0Name = "IPSec0" -Cy0IsPolled = 0 -Cy0CoreAffinity = 0 - -# Data Compression - Kernel instance #0 -Dc0Name = "IPComp0" -Dc0IsPolled = 0 -Dc0CoreAffinity = 0 - -############################################## -# ADI Section for Scalable IOV -############################################## -[SIOV] -NumberAdis = 0 - -############################################## -# User Process Instance Section -############################################## -[SHIM] -NumberCyInstances = 0 -NumberDcInstances = 16 -NumProcesses = 4 -LimitDevAccess = 1 - -# Crypto - User instance #0 -#Cy0Name = "SSL0" -#Cy0IsPolled = 1 -## List of core affinities -#Cy0CoreAffinity = 1 - -# Data Compression - User instance #0 -Dc0Name = "Dc0" -Dc0IsPolled = 1 -# List of core affinities -Dc0CoreAffinity = 1 - -# Data Compression - User instance #1 -Dc1Name = "Dc1" -Dc1IsPolled = 1 -# List of core affinities -Dc1CoreAffinity =2 - -# Data Compression - User instance #2 -Dc2Name = "Dc2" -Dc2IsPolled = 1 -# List of core affinities -Dc2CoreAffinity =3 - -# Data Compression - User instance #3 -Dc3Name = "Dc3" -Dc3IsPolled = 1 -# List of core affinities -Dc3CoreAffinity =4 - -Dc4Name = "Dc4" -Dc4IsPolled = 1 -# List of core affinities -Dc4CoreAffinity = 4 # Core affinity not used for polled instance - -# Data Compression - User instance #5 -Dc5Name = "Dc5" -Dc5IsPolled = 1 -# List of core affinities -Dc5CoreAffinity = 5 # Core affinity not used for polled instance - -# Data Compression - User instance #6 -Dc6Name = "Dc6" -Dc6IsPolled = 1 -# List of core affinities -Dc6CoreAffinity = 6 # Core affinity not used for polled instance - -# Data Compression - User instance #7 -Dc7Name = "Dc7" -Dc7IsPolled = 1 -# List of core affinities -Dc7CoreAffinity = 7 # Core affinity not used for polled instance - -# Data Compression - User instance #8 -Dc8Name = "Dc8" -Dc8IsPolled = 1 -# List of core affinities -Dc8CoreAffinity = 8 # Core affinity not used for polled instance - -# Data Compression - User instance #9 -Dc9Name = "Dc9" -Dc9IsPolled = 1 -# List of core affinities -Dc9CoreAffinity = 9 # Core affinity not used for polled instance - -# Data Compression - User instance #10 -Dc10Name = "Dc10" -Dc10IsPolled = 1 -# List of core affinities -Dc10CoreAffinity = 10 # Core affinity not used for polled instance - -# Data Compression - User instance #11 -Dc11Name = "Dc11" -Dc11IsPolled = 1 -# List of core affinities -Dc11CoreAffinity = 11 # Core affinity not used for polled instance - -# Data Compression - User instance #12 -Dc12Name = "Dc12" -Dc12IsPolled = 1 -# List of core affinities -Dc12CoreAffinity = 12 # Core affinity not used for polled instance - -# Data Compression - User instance #13 -Dc13Name = "Dc13" -Dc13IsPolled = 1 -# List of core affinities -Dc13CoreAffinity = 13 # Core affinity not used for polled instance - -# Data Compression - User instance #14 -Dc14Name = "Dc14" -Dc14IsPolled = 1 -# List of core affinities -Dc14CoreAffinity = 14 # Core affinity not used for polled instance - -# Data Compression - User instance #15 -Dc15Name = "Dc15" -Dc15IsPolled = 1 -# List of core affinities -Dc15CoreAffinity = 15 # Core affinity not used for polled instance diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java index 3269e7b17610..38bf22aeade5 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/LocalPartitionWriterJniWrapper.java @@ -38,7 +38,6 @@ public long rtHandle() { public native long createPartitionWriter( int numPartitions, String codec, - String codecBackend, int compressionLevel, int compressionBufferSize, int compressionThreshold, diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java index 46787d4209d5..66c6f249562f 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/ShuffleReaderJniWrapper.java @@ -38,7 +38,6 @@ public long rtHandle() { public native long make( long cSchema, String compressionType, - String compressionCodecBackend, int batchSize, long readerBufferSize, long deserializerBufferSize, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 07a862e17554..962c28d42de3 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -206,11 +206,6 @@ class GlutenConfig(conf: SQLConf) extends GlutenCoreConfig(conf) { def columnarShuffleCompressionMode: String = getConf(COLUMNAR_SHUFFLE_COMPRESSION_MODE) - def columnarShuffleCodecBackend: Option[String] = getConf(COLUMNAR_SHUFFLE_CODEC_BACKEND) - - def columnarShuffleEnableQat: Boolean = - columnarShuffleCodecBackend.contains(GlutenConfig.GLUTEN_QAT_BACKEND_NAME) - def columnarShuffleCompressionThreshold: Int = getConf(COLUMNAR_SHUFFLE_COMPRESSION_THRESHOLD) @@ -438,10 +433,6 @@ object GlutenConfig { val SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE: String = HADOOP_PREFIX + GCS_PREFIX + AUTH_SERVICE_ACCOUNT_JSON_KEYFILE - // QAT config - val GLUTEN_QAT_BACKEND_NAME = "qat" - val GLUTEN_QAT_SUPPORTED_CODEC: Set[String] = Set("gzip", "zstd") - // Private Spark configs. val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead" val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor" @@ -603,7 +594,6 @@ object GlutenConfig { GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.key, GlutenCoreConfig.NUM_TASK_SLOTS_PER_EXECUTOR.defaultValueString)), (COLUMNAR_SHUFFLE_CODEC.key, ""), - (COLUMNAR_SHUFFLE_CODEC_BACKEND.key, ""), (DEBUG_CUDF.key, DEBUG_CUDF.defaultValueString), ("spark.hadoop.input.connect.timeout", "180000"), ("spark.hadoop.input.read.timeout", "180000"), @@ -1027,17 +1017,7 @@ object GlutenConfig { val COLUMNAR_SHUFFLE_CODEC = buildConf("spark.gluten.sql.columnar.shuffle.codec") .internal() - .doc( - "By default, the supported codecs are lz4 and zstd. " + - "When spark.gluten.sql.columnar.shuffle.codecBackend=qat," + - "the supported codecs are gzip and zstd.") - .stringConf - .transform(_.toLowerCase(Locale.ROOT)) - .createOptional - - val COLUMNAR_SHUFFLE_CODEC_BACKEND = - buildConf("spark.gluten.sql.columnar.shuffle.codecBackend") - .internal() + .doc("Supported codecs are lz4 and zstd.") .stringConf .transform(_.toLowerCase(Locale.ROOT)) .createOptional diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index ec67b07207ad..61d6f1247fff 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -58,14 +58,10 @@ object GlutenShuffleUtils { glutenConfig.columnarShuffleCodec match { case Some(codec) => val glutenCodecKey = GlutenConfig.COLUMNAR_SHUFFLE_CODEC.key - if (glutenConfig.columnarShuffleEnableQat) { - checkCodecValues(glutenCodecKey, codec, GlutenConfig.GLUTEN_QAT_SUPPORTED_CODEC) - } else { - checkCodecValues( - glutenCodecKey, - codec, - BackendsApiManager.getSettings.shuffleSupportedCodec()) - } + checkCodecValues( + glutenCodecKey, + codec, + BackendsApiManager.getSettings.shuffleSupportedCodec()) codec case None => val sparkCodecKey = IO_COMPRESSION_CODEC.key diff --git a/tools/workload/benchmark_velox/params.yaml.template b/tools/workload/benchmark_velox/params.yaml.template index ce92bdeacafd..799a66de1254 100644 --- a/tools/workload/benchmark_velox/params.yaml.template +++ b/tools/workload/benchmark_velox/params.yaml.template @@ -52,8 +52,6 @@ gluten_offheap_ratio: 7.0 spark_codec: lz4 # spark.gluten.sql.columnar.shuffle.codec gluten_codec: lz4 -# spark.gluten.sql.columnar.shuffle.codecBackend -gluten_codec_backend: '' # spark.gluten.sql.columnar.maxBatchSize max_batch_size: 4096 # spark.app.name, empty to use default name.