diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 0698c7242673..80840be629e0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -91,7 +91,7 @@ object VeloxBackend { } object VeloxBackendSettings extends BackendSettingsApi { - val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd") + val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd", "snappy") val GLUTEN_VELOX_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".driver.udfLibraryPaths" val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".internal.udfLibraryPaths" diff --git a/cpp/core/utils/Compression.cc b/cpp/core/utils/Compression.cc index 236990106b6f..832e44214048 100644 --- a/cpp/core/utils/Compression.cc +++ b/cpp/core/utils/Compression.cc @@ -29,9 +29,15 @@ std::unique_ptr createCompressionCodec(arrow::Compression::type compressedType, CodecBackend codecBackend, int32_t compressionLevel) { std::unique_ptr codec; switch (compressedType) { + case arrow::Compression::UNCOMPRESSED: { + return nullptr; + } case arrow::Compression::LZ4_FRAME: { GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType)); } break; + case arrow::Compression::SNAPPY: { + GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType)); + } break; case arrow::Compression::ZSTD: { if (codecBackend == CodecBackend::NONE) { GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType, compressionLevel)); diff --git a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc index 364d31e180cc..4e2220faaeb3 100644 --- a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc @@ -137,7 +137,10 @@ std::vector getTestParams() { } const std::vector compressions = { - arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, arrow::Compression::ZSTD}; + arrow::Compression::UNCOMPRESSED, + arrow::Compression::LZ4_FRAME, + arrow::Compression::ZSTD, + arrow::Compression::SNAPPY}; const std::vector compressionThresholds = {-1, 0, 3, 4, 10, 4096}; const std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index e79c8de1de1d..e5162dda5d4d 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -99,7 +99,10 @@ std::vector getTestParams() { } const std::vector compressions = { - arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, arrow::Compression::ZSTD}; + arrow::Compression::UNCOMPRESSED, + arrow::Compression::LZ4_FRAME, + arrow::Compression::ZSTD, + arrow::Compression::SNAPPY}; const std::vector compressionThresholds = {-1, 0, 3, 4, 10, 4096}; const std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index 80b0e94830c9..f48c3be0b473 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -105,6 +105,8 @@ object GlutenShuffleUtils { checkAndGetBufferSize(IO_COMPRESSION_LZ4_BLOCKSIZE) } else if ("zstd" == codec) { checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE) + } else if ("snappy" == codec) { + checkAndGetBufferSize(IO_COMPRESSION_SNAPPY_BLOCKSIZE) } else if ("gzip" == codec) { // QAT supports it only. // Temporarily hard-coded to 32k. 32 * 1024