From 2f6c02f2f7df81b040c21f79495cfbc841e13c5c Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Tue, 20 Jan 2026 19:42:01 +0800 Subject: [PATCH 1/6] add snappy as gluten columnar shuffle compression codec --- .../org/apache/gluten/backendsapi/velox/VeloxBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 0698c7242673..80840be629e0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -91,7 +91,7 @@ object VeloxBackend { } object VeloxBackendSettings extends BackendSettingsApi { - val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd") + val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd", "snappy") val GLUTEN_VELOX_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".driver.udfLibraryPaths" val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".internal.udfLibraryPaths" From e667b97c4b06fce15b972fe3011185513d224f97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Sat, 24 Jan 2026 19:09:44 +0800 Subject: [PATCH 2/6] [Velox] Add snappy support for columnar shuffle codec --- cpp/core/utils/Compression.cc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/core/utils/Compression.cc b/cpp/core/utils/Compression.cc index 236990106b6f..832e44214048 100644 --- a/cpp/core/utils/Compression.cc +++ b/cpp/core/utils/Compression.cc @@ -29,9 +29,15 @@ std::unique_ptr createCompressionCodec(arrow::Compression::type compressedType, CodecBackend codecBackend, int32_t compressionLevel) { std::unique_ptr codec; switch (compressedType) { + case arrow::Compression::UNCOMPRESSED: { + return nullptr; + } case arrow::Compression::LZ4_FRAME: { GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType)); } break; + case arrow::Compression::SNAPPY: { + GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType)); + } break; case arrow::Compression::ZSTD: { if (codecBackend == CodecBackend::NONE) { GLUTEN_ASSIGN_OR_THROW(codec, arrow::util::Codec::Create(compressedType, compressionLevel)); From c0f7d2c0f05b38de64e2731a15bd18fcae05194c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Sat, 24 Jan 2026 19:09:48 +0800 Subject: [PATCH 3/6] [Velox] Add snappy support for columnar shuffle codec --- cpp/velox/tests/VeloxShuffleWriterTest.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc index e79c8de1de1d..e5162dda5d4d 100644 --- a/cpp/velox/tests/VeloxShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc @@ -99,7 +99,10 @@ std::vector getTestParams() { } const std::vector compressions = { - arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, arrow::Compression::ZSTD}; + arrow::Compression::UNCOMPRESSED, + arrow::Compression::LZ4_FRAME, + arrow::Compression::ZSTD, + arrow::Compression::SNAPPY}; const std::vector compressionThresholds = {-1, 0, 3, 4, 10, 4096}; const std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; From f00b4956524dc236def0c99a0a14980209b56256 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Sat, 24 Jan 2026 19:09:50 +0800 Subject: [PATCH 4/6] [Velox] Add snappy support for columnar shuffle codec --- cpp/velox/tests/VeloxGpuShuffleWriterTest.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc index 364d31e180cc..4e2220faaeb3 100644 --- a/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc +++ b/cpp/velox/tests/VeloxGpuShuffleWriterTest.cc @@ -137,7 +137,10 @@ std::vector getTestParams() { } const std::vector compressions = { - arrow::Compression::UNCOMPRESSED, arrow::Compression::LZ4_FRAME, arrow::Compression::ZSTD}; + arrow::Compression::UNCOMPRESSED, + arrow::Compression::LZ4_FRAME, + arrow::Compression::ZSTD, + arrow::Compression::SNAPPY}; const std::vector compressionThresholds = {-1, 0, 3, 4, 10, 4096}; const std::vector mergeBufferSizes = {0, 3, 4, 10, 4096}; From f1f8bf23c0976ea4f56c7efcfe4439f53782b1b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E6=89=AC?= <654010905@qq.com> Date: Sat, 24 Jan 2026 19:09:54 +0800 Subject: [PATCH 5/6] [Velox] Add snappy support for columnar shuffle codec --- .../scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index 80b0e94830c9..21e6017766ed 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -105,6 +105,8 @@ object GlutenShuffleUtils { checkAndGetBufferSize(IO_COMPRESSION_LZ4_BLOCKSIZE) } else if ("zstd" == codec) { checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE) + } else if ("snappy" == codec) { + 32 * 1024 } else if ("gzip" == codec) { // QAT supports it only. // Temporarily hard-coded to 32k. 32 * 1024 From 0213a5265ee87d6ec874cddf0f6b2983c4c90e68 Mon Sep 17 00:00:00 2001 From: "liyang.127" Date: Sat, 24 Jan 2026 20:16:41 +0800 Subject: [PATCH 6/6] [Velox] Add snappy support for columnar shuffle codec: implement SNAPPY in Compression.cc, enable SNAPPY in velox shuffle tests, allow snappy in VeloxBackendSettings, and set snappy buffer size (32k). Co-Authored-By: Aime Change-Id: Ie25a5bd1e4e3f978e4ce386175bb1eb67d841e3d --- .../scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala index 21e6017766ed..f48c3be0b473 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala @@ -106,7 +106,7 @@ object GlutenShuffleUtils { } else if ("zstd" == codec) { checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE) } else if ("snappy" == codec) { - 32 * 1024 + checkAndGetBufferSize(IO_COMPRESSION_SNAPPY_BLOCKSIZE) } else if ("gzip" == codec) { // QAT supports it only. // Temporarily hard-coded to 32k. 32 * 1024