diff --git a/LICENSE b/LICENSE index 80cfd3652e69..f0b85316cd0b 100644 --- a/LICENSE +++ b/LICENSE @@ -229,6 +229,7 @@ This product includes code from Apache Parquet. * DynConstructors.java * IOUtil.java readFully and tests * ByteBufferInputStream implementations and tests +* ByteStreamSplitValuesReader implementation Copyright: 2014-2017 The Apache Software Foundation. Home page: https://parquet.apache.org/ @@ -289,6 +290,8 @@ This product includes code from Apache Spark. * implementation of SetAccumulator. * Connector expressions. * implementation of VectorizedDeltaEncodedValuesReader +* implementation of VectorizedDeltaLengthByteArrayValuesReader +* implementation of VectorizedDeltaByteArrayReader Copyright: 2011-2018 The Apache Software Foundation Home page: https://spark.apache.org/ @@ -336,4 +339,4 @@ This product includes code from Apache Flink. Copyright: 1999-2022 The Apache Software Foundation. Home page: https://flink.apache.org/ -License: https://www.apache.org/licenses/LICENSE-2.0 \ No newline at end of file +License: https://www.apache.org/licenses/LICENSE-2.0 diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java new file mode 100644 index 000000000000..5c3b812446f7 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedByteStreamSplitValuesReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.arrow.vector.FieldVector; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type BYTE_STREAM_SPLIT. This is + * adapted from Parquet's ByteStreamSplitValuesReader. + * + * @see + * Parquet format encodings: BYTE_STREAM_SPLIT + */ +public class VectorizedByteStreamSplitValuesReader extends ValuesReader + implements VectorizedValuesReader { + + private int totalBytesInStream; + private ByteBufferInputStream in; + private ByteBuffer decodedDataStream; + + public VectorizedByteStreamSplitValuesReader() {} + + @Override + public void initFromPage(int ignoredValueCount, ByteBufferInputStream inputStream) + throws IOException { + totalBytesInStream = inputStream.available(); + this.in = inputStream; + } + + @Override + public float readFloat() { + ensureDecodedBufferIsInitializedForElementSize(FLOAT_SIZE); + return decodedDataStream.getFloat(); + } + + @Override + public double readDouble() { + ensureDecodedBufferIsInitializedForElementSize(DOUBLE_SIZE); + return decodedDataStream.getDouble(); + } + + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + readValues( + FLOAT_SIZE, + total, + rowId, + offset -> vec.getDataBuffer().setFloat(offset, decodedDataStream.getFloat())); + } + + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + readValues( + DOUBLE_SIZE, + total, + rowId, + offset -> vec.getDataBuffer().setDouble(offset, decodedDataStream.getDouble())); + } + + private void ensureDecodedBufferIsInitializedForElementSize(int elementSizeInBytes) { + if (decodedDataStream == null) { + decodedDataStream = + decodeDataFromStream(totalBytesInStream / elementSizeInBytes, elementSizeInBytes); + } + } + + private void readValues(int elementSizeInBytes, int total, int rowId, OutputWriter outputWriter) { + ensureDecodedBufferIsInitializedForElementSize(elementSizeInBytes); + decodedDataStream.position(rowId * elementSizeInBytes); + for (int i = 0; i < total; i++) { + int offset = (rowId + i) * elementSizeInBytes; + outputWriter.writeToOutput(offset); + } + } + + @FunctionalInterface + interface OutputWriter { + void writeToOutput(int offset); + } + + private ByteBuffer decodeDataFromStream(int valuesCount, int elementSizeInBytes) { + ByteBuffer encoded; + try { + encoded = in.slice(totalBytesInStream).slice(); + } catch (EOFException e) { + throw new RuntimeException("Failed to read bytes from stream", e); + } + byte[] decoded = new byte[encoded.limit()]; + int destByteIndex = 0; + for (int srcValueIndex = 0; srcValueIndex < valuesCount; ++srcValueIndex) { + for (int stream = 0; stream < elementSizeInBytes; ++stream, ++destByteIndex) { + decoded[destByteIndex] = encoded.get(srcValueIndex + stream * valuesCount); + } + } + return ByteBuffer.wrap(decoded).order(ByteOrder.LITTLE_ENDIAN); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException("readBoolean is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public int readInteger() { + throw new UnsupportedOperationException("readInteger is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE */ + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + /** The Iceberg reader currently does not do skipping */ + @Override + public void skip() { + throw new UnsupportedOperationException("skip is not supported"); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaByteArrayValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaByteArrayValuesReader.java new file mode 100644 index 000000000000..d2834493ce27 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaByteArrayValuesReader.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.IOException; +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedWidthVector; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type DELTA_BYTE_ARRAY. This is + * adapted from Spark's VectorizedDeltaByteArrayReader. + * + * @see + * Parquet format encodings: DELTA_BYTE_ARRAY + */ +public class VectorizedDeltaByteArrayValuesReader extends ValuesReader + implements VectorizedValuesReader { + + private final VectorizedDeltaEncodedValuesReader prefixLengthReader; + private final VectorizedDeltaLengthByteArrayValuesReader suffixReader; + + private int[] prefixLengths; + private Binary previous; + private int currentIndex; + + public VectorizedDeltaByteArrayValuesReader() { + prefixLengthReader = new VectorizedDeltaEncodedValuesReader(); + suffixReader = new VectorizedDeltaLengthByteArrayValuesReader(); + } + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException { + prefixLengthReader.initFromPage(valueCount, in); + // actual number of elements in the page may be less than the passed valueCount here due to + // nulls + prefixLengths = prefixLengthReader.readIntegers(prefixLengthReader.getTotalValueCount(), 0); + suffixReader.initFromPage(valueCount, in); + previous = Binary.EMPTY; + currentIndex = 0; + } + + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException(); + } + + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + if (vec instanceof BaseVariableWidthVector) { + BaseVariableWidthVector vector = (BaseVariableWidthVector) vec; + readValues(total, rowId, vector::setSafe); + } else if (vec instanceof FixedWidthVector) { + BaseFixedWidthVector vector = (BaseFixedWidthVector) vec; + readValues(total, rowId, (index, value) -> vector.setSafe(index, value, 0, value.length)); + } + } + + private void readValues(int total, int rowId, BinaryOutputWriter outputWriter) { + for (int i = 0; i < total; i++) { + int prefixLength = prefixLengths[currentIndex]; + Binary suffix = suffixReader.readBinaryForRow(rowId + i); + int length = prefixLength + suffix.length(); + + if (prefixLength != 0) { + byte[] out = new byte[length]; + System.arraycopy(previous.getBytesUnsafe(), 0, out, 0, prefixLength); + System.arraycopy(suffix.getBytesUnsafe(), 0, out, prefixLength, suffix.length()); + outputWriter.write(rowId + i, out); + previous = Binary.fromConstantByteArray(out); + } else { + outputWriter.write(rowId + i, suffix.getBytesUnsafe()); + previous = suffix; + } + + currentIndex++; + } + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException("readBoolean is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public int readInteger() { + throw new UnsupportedOperationException("readInteger is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public float readFloat() { + throw new UnsupportedOperationException("readFloat is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public double readDouble() { + throw new UnsupportedOperationException("readDouble is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readFloats is not supported"); + } + + /** DELTA_BYTE_ARRAY only supports BINARY */ + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readDoubles is not supported"); + } + + /** The Iceberg reader currently does not do skipping */ + @Override + public void skip() { + throw new UnsupportedOperationException("skip is not supported"); + } + + /** A functional interface to write binary values into a FieldVector */ + @FunctionalInterface + interface BinaryOutputWriter { + + /** + * A functional interface that can be used to write a binary value to a specified row + * + * @param index The offset to write to + * @param val value to write + */ + void write(int index, byte[] val); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java index 115518e1fb50..c803b58fb53b 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaEncodedValuesReader.java @@ -91,6 +91,11 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce firstValue = BytesUtils.readZigZagVarLong(this.inputStream); } + // True value count. May be less than valueCount because of nulls + int getTotalValueCount() { + return totalValueCount; + } + /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ @Override public byte readByte() { @@ -105,13 +110,13 @@ public short readShort() { @Override public int readInteger() { - readValues(1, null, 0, INT_SIZE, (f, i, v) -> intVal = (int) v); + readValues(1, 0, (i, v) -> intVal = (int) v); return intVal; } @Override public long readLong() { - readValues(1, null, 0, LONG_SIZE, (f, i, v) -> longVal = v); + readValues(1, 0, (i, v) -> longVal = v); return longVal; } @@ -129,12 +134,18 @@ public Binary readBinary(int len) { @Override public void readIntegers(int total, FieldVector vec, int rowId) { - readValues(total, vec, rowId, INT_SIZE, (f, i, v) -> f.getDataBuffer().setInt(i, (int) v)); + readValues(total, rowId, (i, v) -> vec.getDataBuffer().setInt(((long) i) * INT_SIZE, (int) v)); + } + + public int[] readIntegers(int total, int rowId) { + int[] outputBuffer = new int[total]; + readValues(total, rowId, (i, v) -> outputBuffer[i] = (int) v); + return outputBuffer; } @Override public void readLongs(int total, FieldVector vec, int rowId) { - readValues(total, vec, rowId, LONG_SIZE, (f, i, v) -> f.getDataBuffer().setLong(i, v)); + readValues(total, rowId, (i, v) -> vec.getDataBuffer().setLong(((long) i) * LONG_SIZE, v)); } /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ @@ -149,8 +160,13 @@ public void readDoubles(int total, FieldVector vec, int rowId) { throw new UnsupportedOperationException("readDoubles is not supported"); } - private void readValues( - int total, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) { + /** DELTA_BINARY_PACKED only supports INT32 and INT64 */ + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + private void readValues(int total, int rowId, IntegerOutputWriter outputWriter) { if (valuesRead + total > totalValueCount) { throw new ParquetDecodingException( "No more values to read. Total values read: " @@ -166,7 +182,7 @@ private void readValues( int currentRowId = rowId; // First value if (valuesRead == 0) { - outputWriter.write(vec, ((long) (currentRowId + valuesRead) * typeWidth), firstValue); + outputWriter.write(currentRowId + valuesRead, firstValue); lastValueRead = firstValue; currentRowId++; remaining--; @@ -175,7 +191,7 @@ private void readValues( while (remaining > 0) { int loadedRows; try { - loadedRows = loadMiniBlockToOutput(remaining, vec, currentRowId, typeWidth, outputWriter); + loadedRows = loadMiniBlockToOutput(remaining, currentRowId, outputWriter); } catch (IOException e) { throw new ParquetDecodingException("Error reading mini block.", e); } @@ -190,8 +206,7 @@ private void readValues( * * @return the number of values read into output */ - private int loadMiniBlockToOutput( - int remaining, FieldVector vec, int rowId, int typeWidth, IntegerOutputWriter outputWriter) + private int loadMiniBlockToOutput(int remaining, int rowId, IntegerOutputWriter outputWriter) throws IOException { // new block; read the block header @@ -212,7 +227,7 @@ private int loadMiniBlockToOutput( // calculate values from deltas unpacked for current block long outValue = lastValueRead + minDeltaInCurrentBlock + unpackedValuesBuffer[i]; lastValueRead = outValue; - outputWriter.write(vec, ((long) (rowId + valuesReadInMiniBlock) * typeWidth), outValue); + outputWriter.write(rowId + valuesReadInMiniBlock, outValue); remainingInBlock--; remainingInMiniBlock--; valuesReadInMiniBlock++; @@ -266,18 +281,17 @@ private void readBitWidthsForMiniBlocks() { } } - /** A functional interface to write long values to into a FieldVector */ + /** A functional interface to write long values to into a destination buffer */ @FunctionalInterface interface IntegerOutputWriter { /** * A functional interface that can be used to write a long value to a specified row in a - * FieldVector + * destination buffer * - * @param vec a FieldVector to write the value into * @param index The offset to write to * @param val value to write */ - void write(FieldVector vec, long index, long val); + void write(int index, long val); } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java new file mode 100644 index 000000000000..4afd2b9b58b2 --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDeltaLengthByteArrayValuesReader.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.IntUnaryOperator; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type DELTA_LENGTH_BYTE_ARRAY. + * This is adapted from Spark's VectorizedDeltaLengthByteArrayReader. + * + * @see + * Parquet format encodings: DELTA_LENGTH_BYTE_ARRAY + */ +public class VectorizedDeltaLengthByteArrayValuesReader extends ValuesReader + implements VectorizedValuesReader { + + private final VectorizedDeltaEncodedValuesReader lengthReader; + + private ByteBufferInputStream in; + private int[] lengths; + private ByteBuffer byteBuffer; + private int currentIndex; + + VectorizedDeltaLengthByteArrayValuesReader() { + lengthReader = new VectorizedDeltaEncodedValuesReader(); + } + + @Override + public void initFromPage(int valueCount, ByteBufferInputStream inputStream) throws IOException { + lengthReader.initFromPage(valueCount, inputStream); + // actual number of elements in the page may be less than the passed valueCount here due to + // nulls + lengths = lengthReader.readIntegers(lengthReader.getTotalValueCount(), 0); + + in = inputStream.remainingStream(); + currentIndex = 0; + } + + @Override + public Binary readBinary(int len) { + readValues(1, null, 0, x -> len, (f, i, v) -> byteBuffer = v); + return Binary.fromReusedByteBuffer(byteBuffer); + } + + Binary readBinaryForRow(int rowId) { + if (lengths[currentIndex] == 0) { + currentIndex++; + return Binary.EMPTY; + } + readValues(1, null, rowId, ignored -> lengths[currentIndex], (f, i, v) -> byteBuffer = v); + return Binary.fromReusedByteBuffer(byteBuffer); + } + + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + readValues( + total, + vec, + rowId, + x -> lengths[currentIndex], + (f, i, v) -> + ((BaseVariableWidthVector) vec) + .setSafe( + (int) i, v.array(), v.position() + v.arrayOffset(), v.limit() - v.position())); + } + + @SuppressWarnings("UnusedVariable") + private void readValues( + int total, + FieldVector vec, + int rowId, + IntUnaryOperator getLength, + BinaryOutputWriter outputWriter) { + ByteBuffer buffer; + for (int i = 0; i < total; i++) { + int length = getLength.applyAsInt(rowId + i); + try { + if (length < 0) { + throw new IllegalStateException("Invalid length: " + length); + } + buffer = in.slice(length); + } catch (EOFException e) { + throw new ParquetDecodingException("Failed to read " + length + " bytes"); + } + outputWriter.write(vec, rowId + i, buffer); + currentIndex++; + } + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException("readBoolean is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public int readInteger() { + throw new UnsupportedOperationException("readInteger is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public float readFloat() { + throw new UnsupportedOperationException("readFloat is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public double readDouble() { + throw new UnsupportedOperationException("readDouble is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readFloats is not supported"); + } + + /** DELTA_LENGTH_BYTE_ARRAY only supports BINARY */ + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readDoubles is not supported"); + } + + /** The Iceberg reader currently does not do skipping */ + @Override + public void skip() { + throw new UnsupportedOperationException("skip is not supported"); + } + + /** A functional interface to write binary values into a FieldVector */ + @FunctionalInterface + interface BinaryOutputWriter { + + /** + * A functional interface that can be used to write a binary value to a specified row in a + * FieldVector + * + * @param vec a FieldVector to write the value into + * @param index The offset to write to + * @param val value to write + */ + void write(FieldVector vec, long index, ByteBuffer val); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java index be1a3324ae43..b3518e58cbda 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java @@ -34,6 +34,7 @@ import org.apache.parquet.column.values.RequiresPreviousReader; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.schema.PrimitiveType; public class VectorizedPageIterator extends BasePageIterator { private final boolean setArrowValidityVector; @@ -100,6 +101,23 @@ protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, i case DELTA_BINARY_PACKED: valuesReader = new VectorizedDeltaEncodedValuesReader(); break; + case DELTA_LENGTH_BYTE_ARRAY: + valuesReader = new VectorizedDeltaLengthByteArrayValuesReader(); + break; + case DELTA_BYTE_ARRAY: + valuesReader = new VectorizedDeltaByteArrayValuesReader(); + break; + case BYTE_STREAM_SPLIT: + valuesReader = new VectorizedByteStreamSplitValuesReader(); + break; + case RLE: + if (desc.getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valuesReader = + new VectorizedRunLengthEncodedParquetValuesReader(setArrowValidityVector); + break; + } + // fall through default: throw new UnsupportedOperationException( "Cannot support vectorized reads for column " diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java index 1ca3bfe809c0..a07961fb14bf 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java @@ -591,20 +591,7 @@ protected void nextVal( VectorizedValuesReader valuesReader, int typeWidth, byte[] byteArray) { - int len = valuesReader.readInteger(); - ByteBuffer buffer = valuesReader.readBinary(len).toByteBuffer(); - // Calling setValueLengthSafe takes care of allocating a larger buffer if - // running out of space. - ((BaseVariableWidthVector) vector).setValueLengthSafe(idx, len); - int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx); - // It is possible that the data buffer was reallocated. So it is important to - // not cache the data buffer reference but instead use vector.getDataBuffer(). - vector.getDataBuffer().setBytes(startOffset, buffer); - // Similarly, we need to get the latest reference to the validity buffer as well - // since reallocation changes reference of the validity buffers as well. - if (setArrowValidityVector) { - BitVectorHelper.setBit(vector.getValidityBuffer(), idx); - } + valuesReader.readBinary(1, vector, idx, setArrowValidityVector); } @Override diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java index 764b2fc353e3..56bcfe3ff8e2 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPlainValuesReader.java @@ -19,6 +19,8 @@ package org.apache.iceberg.arrow.vectorized.parquet; import java.nio.ByteBuffer; +import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.iceberg.parquet.ValuesAsBytesReader; import org.apache.parquet.io.api.Binary; @@ -74,4 +76,22 @@ public void readFloats(int total, FieldVector vec, int rowId) { public void readDoubles(int total, FieldVector vec, int rowId) { readValues(total, vec, rowId, DOUBLE_SIZE); } + + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + int len = readInteger(); + ByteBuffer buffer = readBinary(len).toByteBuffer(); + // Calling setValueLengthSafe takes care of allocating a larger buffer if + // running out of space. + ((BaseVariableWidthVector) vec).setValueLengthSafe(rowId, len); + int startOffset = ((BaseVariableWidthVector) vec).getStartOffset(rowId); + // It is possible that the data buffer was reallocated. So it is important to + // not cache the data buffer reference but instead use vector.getDataBuffer(). + vec.getDataBuffer().setBytes(startOffset, buffer); + // Similarly, we need to get the latest reference to the validity buffer as well + // since reallocation changes reference of the validity buffers as well. + if (setArrowValidityVector) { + BitVectorHelper.setBit(vec.getValidityBuffer(), rowId); + } + } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java new file mode 100644 index 000000000000..f324aa37efbf --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedRunLengthEncodedParquetValuesReader.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.arrow.vectorized.parquet; + +import org.apache.arrow.vector.FieldVector; +import org.apache.parquet.io.api.Binary; + +/** + * A {@link VectorizedValuesReader} implementation for the encoding type Run Length Encoding / RLE. + * + * @see + * Parquet format encodings: RLE + */ +public class VectorizedRunLengthEncodedParquetValuesReader extends BaseVectorizedParquetValuesReader + implements VectorizedValuesReader { + + // Since we can only read booleans, bit-width is always 1 + private static final int BOOLEAN_BIT_WIDTH = 1; + // Since this can only be used in the context of a data page, the definition level can be set to + // anything, and it doesn't really matter + private static final int IRRELEVANT_MAX_DEFINITION_LEVEL = 1; + // For boolean values in data page v1 & v2, length is always prepended to the encoded data + // See + // https://parquet.apache.org/docs/file-format/data-pages/encodings/#run-length-encoding--bit-packing-hybrid-rle--3 + private static final boolean ALWAYS_READ_LENGTH = true; + + public VectorizedRunLengthEncodedParquetValuesReader(boolean setArrowValidityVector) { + super( + BOOLEAN_BIT_WIDTH, + IRRELEVANT_MAX_DEFINITION_LEVEL, + ALWAYS_READ_LENGTH, + setArrowValidityVector); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public byte readByte() { + throw new UnsupportedOperationException("readByte is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public short readShort() { + throw new UnsupportedOperationException("readShort is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public long readLong() { + throw new UnsupportedOperationException("readLong is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public float readFloat() { + throw new UnsupportedOperationException("readFloat is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public double readDouble() { + throw new UnsupportedOperationException("readDouble is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public Binary readBinary(int len) { + throw new UnsupportedOperationException("readBinary is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readIntegers(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readIntegers is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readLongs(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readLongs is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readFloats(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readFloats is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readDoubles(int total, FieldVector vec, int rowId) { + throw new UnsupportedOperationException("readDoubles is not supported"); + } + + /** RLE only supports BOOLEAN as a data page encoding */ + @Override + public void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector) { + throw new UnsupportedOperationException("readBinary is not supported"); + } +} diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java index 7c23149b18ab..48f118f387e9 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedValuesReader.java @@ -58,7 +58,7 @@ interface VectorizedValuesReader { double readDouble(); /** - * Read binary data of some length + * Read a single binary value of some length * * @param len The number of bytes to read */ @@ -76,6 +76,9 @@ interface VectorizedValuesReader { /** Read `total` doubles into `vec` starting at `vec[rowId]` */ void readDoubles(int total, FieldVector vec, int rowId); + /** Read `total` binary values into `vec` starting at `vec[rowId]` */ + void readBinary(int total, FieldVector vec, int rowId, boolean setArrowValidityVector); + /** * Initialize the reader from a page. See {@link ValuesReader#initFromPage(int, * ByteBufferInputStream)}. diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet new file mode 100644 index 000000000000..3e0edd2627ab Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet new file mode 100644 index 000000000000..d23c1e6b7387 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet new file mode 100644 index 000000000000..8ba32a302f08 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet new file mode 100644 index 000000000000..c16e10e6804e Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/BYTE_STREAM_SPLIT/float_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int32_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int32_with_nulls.parquet new file mode 100644 index 000000000000..358a9792a4a9 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int32_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int64_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int64_with_nulls.parquet new file mode 100644 index 000000000000..23270e0bf0f5 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BINARY_PACKED/int64_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary.parquet new file mode 100644 index 000000000000..6d01839cd4aa Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary_with_nulls.parquet new file mode 100644 index 000000000000..b252f642ed6e Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/binary_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string.parquet new file mode 100644 index 000000000000..204719bc072b Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string_with_nulls.parquet new file mode 100644 index 000000000000..599e72bac021 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_BYTE_ARRAY/string_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet new file mode 100644 index 000000000000..926d02835f77 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary_with_nulls.parquet new file mode 100644 index 000000000000..82697f40aafa Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/binary_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string.parquet new file mode 100644 index 000000000000..686bf91ff076 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string_with_nulls.parquet new file mode 100644 index 000000000000..b33d5602c823 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/DELTA_LENGTH_BYTE_ARRAY/string_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/binary_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/binary_with_nulls.parquet new file mode 100644 index 000000000000..80ce3ee71ecc Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/binary_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet new file mode 100644 index 000000000000..4678e31da179 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/boolean_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet new file mode 100644 index 000000000000..edd614c66af6 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet new file mode 100644 index 000000000000..3d4f64baa722 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet new file mode 100644 index 000000000000..c9abb9b98def Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/float_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/int32_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/int32_with_nulls.parquet new file mode 100644 index 000000000000..3e1dc514c725 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/int32_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/int64_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/int64_with_nulls.parquet new file mode 100644 index 000000000000..1e6e36e54195 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/int64_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN/string_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN/string_with_nulls.parquet new file mode 100644 index 000000000000..9beedb2be5d0 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN/string_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/binary_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/binary_with_nulls.parquet new file mode 100644 index 000000000000..42fceed1261e Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/binary_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double.parquet new file mode 100644 index 000000000000..bf9f55dd91e6 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double_with_nulls.parquet new file mode 100644 index 000000000000..537f13cbce46 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/float_with_nulls.parquet new file mode 100644 index 000000000000..f34232e39cb1 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/float_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int32_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int32_with_nulls.parquet new file mode 100644 index 000000000000..c7d637a07ff4 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int32_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int64_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int64_with_nulls.parquet new file mode 100644 index 000000000000..e7bd5ec07b60 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/int64_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/string_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/string_with_nulls.parquet new file mode 100644 index 000000000000..bbdc391c68f5 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/PLAIN_DICTIONARY/string_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE/boolean.parquet b/parquet/src/testFixtures/resources/encodings/RLE/boolean.parquet new file mode 100644 index 000000000000..bbc11933919f Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE/boolean.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet new file mode 100644 index 000000000000..6eed9b602c68 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE/boolean_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/binary_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/binary_with_nulls.parquet new file mode 100644 index 000000000000..42fceed1261e Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/binary_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double.parquet new file mode 100644 index 000000000000..bf9f55dd91e6 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double_with_nulls.parquet new file mode 100644 index 000000000000..537f13cbce46 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/double_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/float_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/float_with_nulls.parquet new file mode 100644 index 000000000000..f34232e39cb1 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/float_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int32_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int32_with_nulls.parquet new file mode 100644 index 000000000000..c7d637a07ff4 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int32_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int64_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int64_with_nulls.parquet new file mode 100644 index 000000000000..e7bd5ec07b60 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/int64_with_nulls.parquet differ diff --git a/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/string_with_nulls.parquet b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/string_with_nulls.parquet new file mode 100644 index 000000000000..bbdc391c68f5 Binary files /dev/null and b/parquet/src/testFixtures/resources/encodings/RLE_DICTIONARY/string_with_nulls.parquet differ diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index 6bff9010eb4f..d428fae70718 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -20,17 +20,20 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Iterator; import java.util.function.Consumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.inmemory.InMemoryOutputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; @@ -48,9 +51,13 @@ import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.junit.jupiter.api.Test; @@ -296,11 +303,7 @@ public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary // (i.e. decimals > 8 bytes) - Schema schema = - new Schema( - optional(102, "float_data", Types.FloatType.get()), - optional(103, "double_data", Types.DoubleType.get()), - optional(104, "decimal_data", Types.DecimalType.of(25, 5))); + Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); OutputFile outputFile = new InMemoryOutputFile(); Iterable data = @@ -311,26 +314,6 @@ public void testSupportedReadsForParquetV2() throws Exception { assertRecordsMatch(schema, 30000, data, outputFile.toInputFile(), false, BATCH_SIZE); } - @Test - public void testUnsupportedReadsForParquetV2() throws Exception { - // Longs, ints, string types etc use delta encoding and which are not supported for vectorized - // reads - Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - OutputFile outputFile = new InMemoryOutputFile(); - Iterable data = - generateData(schema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetV2Writer(schema, outputFile)) { - writer.addAll(data); - } - assertThatThrownBy( - () -> - assertRecordsMatch( - schema, 30000, data, outputFile.toInputFile(), false, BATCH_SIZE)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageStartingWith("Cannot support vectorized reads for column") - .hasMessageEndingWith("Disable vectorized reads to read this table/file"); - } - @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding @@ -345,6 +328,33 @@ public void testUuidReads() throws Exception { assertRecordsMatch(schema, numRows, data, dataFile.toInputFile(), false, BATCH_SIZE); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + protected void assertNoLeak(String testName, Consumer testFunction) { BufferAllocator allocator = ArrowAllocation.rootAllocator().newChildAllocator(testName, 0, Long.MAX_VALUE); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 6e823a8bfc05..d0d0aabaf532 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -21,6 +21,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; @@ -30,6 +31,7 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Iterator; @@ -41,6 +43,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -64,9 +67,13 @@ import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -81,7 +88,14 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of( + "PLAIN_DICTIONARY", + "RLE_DICTIONARY", + "DELTA_BINARY_PACKED", + "DELTA_LENGTH_BYTE_ARRAY", + "DELTA_BYTE_ARRAY", + "BYTE_STREAM_SPLIT", + "RLE"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -89,7 +103,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { "int32", Types.IntegerType.get(), "int64", Types.LongType.get(), "binary", Types.BinaryType.get(), - "boolean", Types.BooleanType.get()); + "boolean", Types.BooleanType.get(), + "double", Types.DoubleType.get()); static final Function IDENTITY = record -> record; @@ -404,13 +419,7 @@ public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. - Schema schema = - new Schema( - optional(102, "float_data", Types.FloatType.get()), - optional(103, "double_data", Types.DoubleType.get()), - optional(104, "decimal_data", Types.DecimalType.of(25, 5)), - optional(105, "int_data", Types.IntegerType.get()), - optional(106, "long_data", Types.LongType.get())); + Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -422,23 +431,6 @@ public void testSupportedReadsForParquetV2() throws Exception { assertRecordsMatch(schema, 30000, data, dataFile, false, BATCH_SIZE); } - @Test - public void testUnsupportedReadsForParquetV2() throws Exception { - // Some types use delta encoding and which are not supported for vectorized reads - Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - File dataFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); - Iterable data = - generateData(schema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetV2Writer(schema, dataFile)) { - writer.addAll(data); - } - assertThatThrownBy(() -> assertRecordsMatch(schema, 30000, data, dataFile, false, BATCH_SIZE)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageStartingWith("Cannot support vectorized reads for column") - .hasMessageEndingWith("Disable vectorized reads to read this table/file"); - } - @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding @@ -454,6 +446,33 @@ public void testUuidReads() throws Exception { assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + protected void assertNoLeak(String testName, Consumer testFunction) { BufferAllocator allocator = ArrowAllocation.rootAllocator().newChildAllocator(testName, 0, Long.MAX_VALUE); @@ -504,10 +523,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException { diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java index 46a6a302e1c4..a0cf877478cf 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java @@ -21,6 +21,7 @@ import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.parquet.schema.Types.primitive; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assumptions.assumeThat; @@ -30,6 +31,7 @@ import java.io.InputStream; import java.net.URISyntaxException; import java.net.URL; +import java.nio.ByteBuffer; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Iterator; @@ -41,6 +43,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.Schema; import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -64,9 +67,13 @@ import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Type; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -81,7 +88,14 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { private static final String PLAIN = "PLAIN"; private static final List GOLDEN_FILE_ENCODINGS = - ImmutableList.of("PLAIN_DICTIONARY", "RLE_DICTIONARY", "DELTA_BINARY_PACKED"); + ImmutableList.of( + "PLAIN_DICTIONARY", + "RLE_DICTIONARY", + "DELTA_BINARY_PACKED", + "DELTA_LENGTH_BYTE_ARRAY", + "DELTA_BYTE_ARRAY", + "BYTE_STREAM_SPLIT", + "RLE"); private static final Map GOLDEN_FILE_TYPES = ImmutableMap.of( "string", Types.StringType.get(), @@ -89,7 +103,8 @@ public class TestParquetVectorizedReads extends AvroDataTestBase { "int32", Types.IntegerType.get(), "int64", Types.LongType.get(), "binary", Types.BinaryType.get(), - "boolean", Types.BooleanType.get()); + "boolean", Types.BooleanType.get(), + "double", Types.DoubleType.get()); static final Function IDENTITY = record -> record; @@ -404,13 +419,7 @@ public void testSupportedReadsForParquetV2() throws Exception { // Float and double column types are written using plain encoding with Parquet V2, // also Parquet V2 will dictionary encode decimals that use fixed length binary // (i.e. decimals > 8 bytes). Int and long types use DELTA_BINARY_PACKED. - Schema schema = - new Schema( - optional(102, "float_data", Types.FloatType.get()), - optional(103, "double_data", Types.DoubleType.get()), - optional(104, "decimal_data", Types.DecimalType.of(25, 5)), - optional(105, "int_data", Types.IntegerType.get()), - optional(106, "long_data", Types.LongType.get())); + Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); File dataFile = File.createTempFile("junit", null, temp.toFile()); assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); @@ -422,23 +431,6 @@ public void testSupportedReadsForParquetV2() throws Exception { assertRecordsMatch(schema, 30000, data, dataFile, false, BATCH_SIZE); } - @Test - public void testUnsupportedReadsForParquetV2() throws Exception { - // Some types use delta encoding and which are not supported for vectorized reads - Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - File dataFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(dataFile.delete()).as("Delete should succeed").isTrue(); - Iterable data = - generateData(schema, 30000, 0L, RandomData.DEFAULT_NULL_PERCENTAGE, IDENTITY); - try (FileAppender writer = getParquetV2Writer(schema, dataFile)) { - writer.addAll(data); - } - assertThatThrownBy(() -> assertRecordsMatch(schema, 30000, data, dataFile, false, BATCH_SIZE)) - .isInstanceOf(UnsupportedOperationException.class) - .hasMessageStartingWith("Cannot support vectorized reads for column") - .hasMessageEndingWith("Disable vectorized reads to read this table/file"); - } - @Test public void testUuidReads() throws Exception { // Just one row to maintain dictionary encoding @@ -454,6 +446,33 @@ public void testUuidReads() throws Exception { assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE); } + @Test + public void testRLEEncodingOnlySupportsBooleanDataPage() { + MessageType schema = + new MessageType( + "test", + primitive(PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).id(1).named("int_col")); + ColumnDescriptor intColumnDesc = schema.getColumnDescription(new String[] {"int_col"}); + ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.allocate(0)); + + String expectedMessage = + "Cannot support vectorized reads for column " + + intColumnDesc + + " with encoding " + + Encoding.RLE + + ". Disable vectorized reads to read this table/file"; + + assertThatThrownBy( + () -> + new VectorizedPageIterator(intColumnDesc, "parquet-mr", false) { + { + initDataReader(Encoding.RLE, stream, 0); + } + }) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage(expectedMessage); + } + private void assertIdenticalFileContents( File actual, File expected, Schema schema, boolean vectorized) throws IOException { try (CloseableIterable expectedIterator = @@ -490,10 +509,16 @@ static Stream goldenFilesAndEncodings() { .flatMap( e -> Stream.of(true, false) - .map( + .flatMap( vectorized -> - Arguments.of( - encoding, e.getKey(), e.getValue(), vectorized)))); + Stream.of( + Arguments.of( + encoding, e.getKey(), e.getValue(), vectorized), + Arguments.of( + encoding, + e.getKey() + "_with_nulls", + e.getValue(), + vectorized))))); } private File resourceUrlToLocalFile(URL url) throws IOException, URISyntaxException {