Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;

public class VectorizedPageIterator extends BasePageIterator {
private final boolean setArrowValidityVector;
Expand Down Expand Up @@ -100,6 +101,14 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i
case DELTA_BINARY_PACKED:
valuesReader = new VectorizedDeltaEncodedValuesReader();
break;
case RLE:
if (desc.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.BOOLEAN) {
valuesReader =
new VectorizedRunLengthEncodedParquetValuesReader(setArrowValidityVector);
break;
}
// fall through
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is quite a serious fall through here, given the parquet spec limits what RLEs can be used for to bools, Repetition and definition levels & Dictionary indices. Is it likely to occur in the wild?

If so, it probably merits a test case to see that if you create one with a column whose type != BOOLEAN then you can't init() it with RLE data encoding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given the parquet spec limits what RLEs can be used for to bools, Repetition and definition levels & Dictionary indices. Is it likely to occur in the wild?

Yeah theoretically for a malformed parquet writer this could occur in the wild. That being said it wouldn't be to spec given that bool is the only data page that can be RLE encoded and we handle the dictionary RLE up in VectorizedDictionaryEncodedParquetValuesReader (directly above here) and the repetition levels are handled via VectorizedParquetDefinitionLevelReader.

All to say, I think this is impossible. If a malformed writer does in fact write a file with a non-bool data page, it wouldn't be to spec so we'd be correctly throwing here. I can add a negative test case for this, although I'd have to make a corrupt parquet writer implementation to do so. Happy to do if you think it adds value.

Also FWIW, the full parquet v2 vectorized impl PR (that this PR was split out from has quite a few production PBs read under its belt at this point and hasn't hit anything like this in the wild.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, nothing is going to handle it so it's not much of a source of data, is it? Key thing is not to cause damage to the system other than the specific query failing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in dda0185

default:
throw new UnsupportedOperationException(
"Cannot support vectorized reads for column "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.arrow.vectorized.parquet;

import org.apache.arrow.vector.FieldVector;
import org.apache.parquet.io.api.Binary;

/**
* A {@link VectorizedValuesReader} implementation for the encoding type Run Length Encoding / RLE.
*
* @see <a
* href="https://parquet.apache.org/docs/file-format/data-pages/encodings/#run-length-encoding--bit-packing-hybrid-rle--3">
* Parquet format encodings: RLE</a>
*/
public class VectorizedRunLengthEncodedParquetValuesReader extends BaseVectorizedParquetValuesReader
implements VectorizedValuesReader {

// Since we can only read booleans, bit-width is always 1
private static final int BOOLEAN_BIT_WIDTH = 1;
// Since this can only be used in the context of a data page, the definition level can be set to
// anything, and it doesn't really matter
private static final int IRRELEVANT_MAX_DEFINITION_LEVEL = 1;
// For boolean values in data page v1 & v2, length is always prepended to the encoded data
// See
// https://parquet.apache.org/docs/file-format/data-pages/encodings/#run-length-encoding--bit-packing-hybrid-rle--3
private static final boolean ALWAYS_READ_LENGTH = true;

public VectorizedRunLengthEncodedParquetValuesReader(boolean setArrowValidityVector) {
super(
BOOLEAN_BIT_WIDTH,
IRRELEVANT_MAX_DEFINITION_LEVEL,
ALWAYS_READ_LENGTH,
setArrowValidityVector);
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public byte readByte() {
throw new UnsupportedOperationException("readByte is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public short readShort() {
throw new UnsupportedOperationException("readShort is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public long readLong() {
throw new UnsupportedOperationException("readLong is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public float readFloat() {
throw new UnsupportedOperationException("readFloat is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public double readDouble() {
throw new UnsupportedOperationException("readDouble is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public Binary readBinary(int len) {
throw new UnsupportedOperationException("readBinary is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public void readIntegers(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readIntegers is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public void readLongs(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readLongs is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public void readFloats(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readFloats is not supported");
}

/** RLE only supports BOOLEAN as a data page encoding */
@Override
public void readDoubles(int total, FieldVector vec, int rowId) {
throw new UnsupportedOperationException("readDoubles is not supported");
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public void testVectorizedReadsWithNewContainers() throws IOException {
// Disabled since this code path is already tested in TestParquetVectorizedReads
}

@Test
@Override
public void testUnsupportedReadsForParquetV2() throws Exception {
// Disabled since vectorized reads are supported for parquet v2 written files over
// dictionary-encoded files
}

@Test
public void testMixedDictionaryNonDictionaryReads() throws IOException {
Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.parquet.schema.Types.primitive;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.function.Consumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator;
import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
Expand All @@ -48,9 +51,13 @@
import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -295,12 +302,16 @@ public void testReadsForTypePromotedColumns() throws Exception {
public void testSupportedReadsForParquetV2() throws Exception {
// Float and double column types are written using plain encoding with Parquet V2,
// also Parquet V2 will dictionary encode decimals that use fixed length binary
// (i.e. decimals > 8 bytes)
// (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED.
// Boolean types use RLE.
Schema schema =
new Schema(
optional(102, "float_data", Types.FloatType.get()),
optional(103, "double_data", Types.DoubleType.get()),
optional(104, "decimal_data", Types.DecimalType.of(25, 5)));
optional(104, "decimal_data", Types.DecimalType.of(25, 5)),
optional(105, "int_data", Types.IntegerType.get()),
optional(106, "long_data", Types.LongType.get()),
optional(107, "boolean_data", Types.BooleanType.get()));

OutputFile outputFile = new InMemoryOutputFile();
Iterable<GenericData.Record> data =
Expand Down Expand Up @@ -331,6 +342,33 @@ public void testUnsupportedReadsForParquetV2() throws Exception {
.hasMessageEndingWith("Disable vectorized reads to read this table/file");
}

@Test
public void testRLEEncodingOnlySupportsBooleanDataPage() {
MessageType schema =
new MessageType(
"test",
primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col"));
ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"});
ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0));

String expectedMessage =
"Cannot support vectorized reads for column "
+ intColumnDesc
+ " with encoding "
+ Encoding.RLE
+ ". Disable vectorized reads to read this table/file";

assertThatThrownBy(
() ->
new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) {
{
initDataReader(Encoding.RLE, stream, 0);
}
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(expectedMessage);
}

@Test
public void testUuidReads() throws Exception {
// Just one row to maintain dictionary encoding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ Iterable<Record> generateData(
@Disabled // Ignored since this code path is already tested in TestParquetVectorizedReads
public void testVectorizedReadsWithNewContainers() throws IOException {}

@Test
@Override
@Disabled // Ignored since vectorized reads are supported for parquet v2 written files over
// dictionary-encoded files
public void testUnsupportedReadsForParquetV2() throws IOException {}

@Test
public void testMixedDictionaryNonDictionaryReads() throws IOException {
Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.parquet.schema.Types.primitive;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
Expand All @@ -30,6 +31,7 @@
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
Expand All @@ -41,6 +43,7 @@
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.ArrowAllocation;
import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
Expand All @@ -64,9 +67,13 @@
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.vectorized.ColumnarBatch;
Expand All @@ -81,7 +88,7 @@ public class TestParquetVectorizedReads extends AvroDataTestBase {

private static final String PLAIN = "PLAIN";
private static final List<String> GOLDEN_FILE_ENCODINGS =
ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED");
ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED", "RLE");
private static final Map<String, PrimitiveType> GOLDEN_FILE_TYPES =
ImmutableMap.of(
"string", Types.StringType.get(),
Expand Down Expand Up @@ -404,13 +411,15 @@ public void testSupportedReadsForParquetV2() throws Exception {
// Float and double column types are written using plain encoding with Parquet V2,
// also Parquet V2 will dictionary encode decimals that use fixed length binary
// (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED.
// Boolean types use RLE.
Schema schema =
new Schema(
optional(102, "float_data", Types.FloatType.get()),
optional(103, "double_data", Types.DoubleType.get()),
optional(104, "decimal_data", Types.DecimalType.of(25, 5)),
optional(105, "int_data", Types.IntegerType.get()),
optional(106, "long_data", Types.LongType.get()));
optional(106, "long_data", Types.LongType.get()),
optional(107, "boolean_data", Types.BooleanType.get()));

File dataFile = File.createTempFile("junit", null, temp.toFile());
assertThat(dataFile.delete()).as("Delete should succeed").isTrue();
Expand Down Expand Up @@ -439,6 +448,33 @@ public void testUnsupportedReadsForParquetV2() throws Exception {
.hasMessageEndingWith("Disable vectorized reads to read this table/file");
}

@Test
public void testRLEEncodingOnlySupportsBooleanDataPage() {
MessageType schema =
new MessageType(
"test",
primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col"));
ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"});
ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0));

String expectedMessage =
"Cannot support vectorized reads for column "
+ intColumnDesc
+ " with encoding "
+ Encoding.RLE
+ ". Disable vectorized reads to read this table/file";

assertThatThrownBy(
() ->
new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) {
{
initDataReader(Encoding.RLE, stream, 0);
}
})
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(expectedMessage);
}

@Test
public void testUuidReads() throws Exception {
// Just one row to maintain dictionary encoding
Expand Down Expand Up @@ -504,10 +540,16 @@ static Stream<Arguments> goldenFilesAndEncodings() {
.flatMap(
e ->
Stream.of(true, false)
.map(
.flatMap(
vectorized ->
Arguments.of(
encoding, e.getKey(), e.getValue(), vectorized))));
Stream.of(
Arguments.of(
encoding, e.getKey(), e.getValue(), vectorized),
Arguments.of(
encoding,
e.getKey() + "_with_nulls",
e.getValue(),
vectorized)))));
}

private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,10 @@ public void testDecimalNotAllPagesDictionaryEncoded() throws Exception {
}
});
}

@Test
@Override
@Disabled // Ignored since vectorized reads are supported for parquet v2 written files over
// dictionary-encoded files
public void testUnsupportedReadsForParquetV2() throws IOException {}
}
Loading
Loading