From dbebc5d77a4bdcb4ae550f11605144b0adce86d1 Mon Sep 17 00:00:00 2001 From: Becker Ewing Date: Mon, 15 Dec 2025 17:17:26 -0500 Subject: [PATCH 1/4] Add vectorized read support for parquet RLE encoded data pages - Split out from https://github.com/apache/iceberg/pull/14800 --- .../parquet/VectorizedPageIterator.java | 9 ++ ...edRunLengthEncodedParquetValuesReader.java | 111 ++++++++++++++++++ .../PLAIN/boolean_with_nulls.parquet | Bin 0 -> 618 bytes .../resources/encodings/RLE/boolean.parquet | Bin 0 -> 521 bytes .../encodings/RLE/boolean_with_nulls.parquet | Bin 0 -> 623 bytes .../parquet/TestParquetVectorizedReads.java | 18 ++- .../parquet/TestParquetVectorizedReads.java | 18 ++- 7 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java create mode 100644 parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet create mode 100644 parquet/src/testFixtures/resources/encodings/RLE/boolean.parquet create mode 100644 parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index be1a3324ae43..0dffe9fb6499 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -34,6 +34,7 @@ import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; public class VectorizedPageIterator extends BasePageIterator { private final boolean setArrowValidityVector; @@ -100,6 +101,14 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i case DELTA_BINARY_PACKED: valuesReader = new VectorizedDeltaEncodedValuesReader(); break; + case RLE: + if (desc.getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valuesReader = + new VectorizedRunLengthEncodedParquetValuesReader(setArrowValidityVector); + break; + } + // fall through default: throw new UnsupportedOperationException( "Cannot support vectorized reads for column " diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java new file mode 100644 index 000000000000..f30701c1db0d --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import org.apache.arrow.vector.FieldVector; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type Run Length Encoding / RLE. + * + * @see + * Parquet format encodings: RLE + */ +public class VectorizedRunLengthEncodedParquetValuesReader extends BaseVectorizedParquetValuesReader + implements VectorizedValuesReader { + + // Since we can only read booleans, bit-width is always 1 + private static final int BOOLEAN_BIT_WIDTH = 1; + // Since this can only be used in the context of a data page, the definition level can be set to + // anything, and it doesn't really matter + private static final int IRRELEVANT_MAX_DEFINITION_LEVEL = 1; + // For boolean values in data page v1 & v2, length is always prepended to the encoded data + // See + // https://parquet.apache.org/docs/file-format/data-pages/encodings/#run-length-encoding--bit-packing-hybrid-rle--3 + private static final boolean ALWAYS_READ_LENGTH = true; + + public VectorizedRunLengthEncodedParquetValuesReader(boolean setArrowValidityVector) { + super( + BOOLEAN_BIT_WIDTH, + IRRELEVANT_MAX_DEFINITION_LEVEL, + ALWAYS_READ_LENGTH, + setArrowValidityVector); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public float readFloat() { + throw new UnsupportedOperationException("readFloat is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public double readDouble() { + throw new UnsupportedOperationException("readDouble is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readFloats is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readDoubles is not supported"); + } +} diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4678e31da17910d23a6f48f5afd6ae12d4c8a23e GIT binary patch literal 618 zcmWG=3^EjD5Ix5XL^`4u_+LY5C|q69laeL%N?E49o-xq(;c1D!Jfgr^Y;OY)m=>!%BGa=$2tDUkVBZ4wue4rMvJcEQnL1Iy1X=;gX nazTM^Vo_0kxk6cLQE_H|o`R8)o`If$q>K#Ep81~yAKubTD zV9buv^SxVM)TXV^nviku@@L)!_4>2DPb-U0>6s?~{Nl@&Z3mxp%|1QH#C7}D<{O{2 zy!^dh1*^?9OfzZS{iRgmr7`E$e5PPg7D*jb1_`#}+LY5C|q69laeL%N?E49o-xq(;c1D!Jfgr^Y;OY)m=>!%BGa=$2tDUkVBZ4wu ze4rMvJcEQnL1Iy1X=;gXazTM^Vo_0kxk6cLQE_H|o`R8)o`If$q>K#EpT3L?3<1DI F006r+la~Mh literal 0 HcmV?d00001 diff --git a/parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet new file mode 100644 index 0000000000000000000000000000000000000000..6eed9b602c68041b8070fa904a6930c5c7ae1ac3 GIT binary patch literal 623 zcmWG=3^EjD5WU3=L^`4u_(j=3P)350QG$^{Yyz_em?0?0z`#%s!uP7*@BL0bQA>&D zz2DP7PT0Lkv!SVdg<9_&uiIsFb*~h*E;6$8N{zX2eEX$cuj5v;=|nXj*V$3Z=w-vo z(S7%vTHEw>qD|b_D;fXlz1`X8EvJ=O^RB}8%Hd5Lb!K{1+-?7}PSJ0w?W9>9K9bw3 z`c{~`d}9t1Ws%e|WsqPiPR>ZpO%!Ef5M@whl3++V*A~g;h%m9=Gss@HF6N8N=o21$V<^w!H&w$il;8)`S(hQO^5 GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "RLE"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -404,13 +404,15 @@ public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. + // Boolean types use RLE. Schema schema = new Schema( optional(102, "float_data", Types.FloatType.get()), optional(103, "double_data", Types.DoubleType.get()), optional(104, "decimal_data", Types.DecimalType.of(25, 5)), optional(105, "int_data", Types.IntegerType.get()), - optional(106, "long_data", Types.LongType.get())); + optional(106, "long_data", Types.LongType.get()), + optional(107, "boolean_data", Types.BooleanType.get())); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -504,10 +506,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 46a6a302e1c4..188c357c4f1f 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -81,7 +81,7 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "RLE"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -404,13 +404,15 @@ public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. + // Boolean types use RLE. Schema schema = new Schema( optional(102, "float_data", Types.FloatType.get()), optional(103, "double_data", Types.DoubleType.get()), optional(104, "decimal_data", Types.DecimalType.of(25, 5)), optional(105, "int_data", Types.IntegerType.get()), - optional(106, "long_data", Types.LongType.get())); + optional(106, "long_data", Types.LongType.get()), + optional(107, "boolean_data", Types.BooleanType.get())); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -490,10 +492,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { From 51cbe97ea618d2935470d39a53e077c5bdf307a2 Mon Sep 17 00:00:00 2001 From: Becker Ewing Date: Mon, 15 Dec 2025 19:38:21 -0500 Subject: [PATCH 2/4] Fix dictionary-encoded vectorized read tests - Now that RLE boolean data page reads are vectorized, vectorized reads over parquet v2 written files with dictionary encoded columns should work completely --- .../TestParquetDictionaryEncodedVectorizedReads.java | 7 +++++++ .../parquet/vectorized/TestParquetVectorizedReads.java | 8 ++++++-- .../TestParquetDictionaryEncodedVectorizedReads.java | 6 ++++++ .../TestParquetDictionaryEncodedVectorizedReads.java | 6 ++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java index 98c25268cb45..00f334e8f9fa 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java @@ -55,6 +55,13 @@ public void testVectorizedReadsWithNewContainers() throws IOException { // Disabled since this code path is already tested in TestParquetVectorizedReads } + @Test + @Override + public void testUnsupportedReadsForParquetV2() throws Exception { + // Disabled since vectorized reads are supported for parquet v2 written files over + // dictionary-encoded files + } + @Test public void testMixedDictionaryNonDictionaryReads() throws IOException { Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index 6bff9010eb4f..362b65ec9116 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -295,12 +295,16 @@ public void testReadsForTypePromotedColumns() throws Exception { public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary - // (i.e. decimals > 8 bytes) + // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. + // Boolean types use RLE. Schema schema = new Schema( optional(102, "float_data", Types.FloatType.get()), optional(103, "double_data", Types.DoubleType.get()), - optional(104, "decimal_data", Types.DecimalType.of(25, 5))); + optional(104, "decimal_data", Types.DecimalType.of(25, 5)), + optional(105, "int_data", Types.IntegerType.get()), + optional(106, "long_data", Types.LongType.get()), + optional(107, "boolean_data", Types.BooleanType.get())); OutputFile outputFile = new InMemoryOutputFile(); Iterable data = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java index 284fa0b0552f..bb0eff70d096 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java @@ -94,6 +94,12 @@ Iterable generateData( @Disabled // Ignored since this code path is already tested in TestParquetVectorizedReads public void testVectorizedReadsWithNewContainers() throws IOException {} + @Test + @Override + @Disabled // Ignored since vectorized reads are supported for parquet v2 written files over + // dictionary-encoded files + public void testUnsupportedReadsForParquetV2() throws IOException {} + @Test public void testMixedDictionaryNonDictionaryReads() throws IOException { Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java index 284fa0b0552f..bf965fda667e 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetDictionaryEncodedVectorizedReads.java @@ -196,4 +196,10 @@ public void testDecimalNotAllPagesDictionaryEncoded() throws Exception { } }); } + + @Test + @Override + @Disabled // Ignored since vectorized reads are supported for parquet v2 written files over + // dictionary-encoded files + public void testUnsupportedReadsForParquetV2() throws IOException {} } From dda0185ce0d3109d5faa5f8645f0a2f98df5ac27 Mon Sep 17 00:00:00 2001 From: Becker Ewing Date: Tue, 20 Jan 2026 21:57:24 -0500 Subject: [PATCH 3/4] Add test to verify fall through for reading non-boolean rle encoded data pages - https://github.com/apache/iceberg/pull/14853#discussion_r2699781090 --- .../TestParquetVectorizedReads.java | 34 +++++++++++++++++++ .../parquet/TestParquetVectorizedReads.java | 34 +++++++++++++++++++ .../parquet/TestParquetVectorizedReads.java | 34 +++++++++++++++++++ 3 files changed, 102 insertions(+) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index 362b65ec9116..ed55c6577609 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -20,15 +20,18 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.function.Consumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.avro.generic.GenericData; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; import org.apache.iceberg.inmemory.InMemoryOutputFile; @@ -48,9 +51,13 @@ import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.junit.jupiter.api.Test; @@ -335,6 +342,33 @@ public void testUnsupportedReadsForParquetV2() throws Exception { .hasMessageEndingWith("Disable vectorized reads to read this table/file"); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 6425fc4d0f60..de50cf12905f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -21,6 +21,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; @@ -30,6 +31,7 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Iterator; @@ -38,6 +40,7 @@ import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.arrow.memory.BufferAllocator; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; @@ -64,9 +67,13 @@ import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -441,6 +448,33 @@ public void testUnsupportedReadsForParquetV2() throws Exception { .hasMessageEndingWith("Disable vectorized reads to read this table/file"); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 188c357c4f1f..1a9587b452e9 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -21,6 +21,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; @@ -30,6 +31,7 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Iterator; @@ -41,6 +43,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -64,9 +67,13 @@ import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -441,6 +448,33 @@ public void testUnsupportedReadsForParquetV2() throws Exception { .hasMessageEndingWith("Disable vectorized reads to read this table/file"); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding From 41286c494d6875f0ea8475dc21c3177cf03deacb Mon Sep 17 00:00:00 2001 From: Becker Ewing Date: Tue, 20 Jan 2026 22:13:44 -0500 Subject: [PATCH 4/4] Run spotless for Spark 3.5 & 3.4 --- .../data/parquet/vectorized/TestParquetVectorizedReads.java | 2 +- .../data/vectorized/parquet/TestParquetVectorizedReads.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index ed55c6577609..aaf6c6e15f8b 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -31,9 +31,9 @@ import java.util.function.Consumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.avro.generic.GenericData; -import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index de50cf12905f..e9fe199abd3b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -40,10 +40,10 @@ import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.arrow.memory.BufferAllocator; -import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders;