@@ -24,16 +24,16 @@ use crate::data_type::{DataType, Int96};
2424use crate :: errors:: Result ;
2525use crate :: schema:: types:: ColumnDescPtr ;
2626use arrow_array:: {
27- Array , ArrayRef , Date64Array , Decimal64Array , Decimal128Array , Decimal256Array , Int8Array ,
28- Int16Array , Int32Array , Int64Array , PrimitiveArray , UInt8Array , UInt16Array ,
29- builder:: PrimitiveDictionaryBuilder , cast:: AsArray , downcast_integer, make_array, types:: * ,
27+ Array , ArrayRef , BooleanArray , Date64Array , Decimal64Array , Decimal128Array , Decimal256Array ,
28+ Float32Array , Float64Array , Int8Array , Int16Array , Int32Array , Int64Array , PrimitiveArray ,
29+ UInt8Array , UInt16Array , builder:: PrimitiveDictionaryBuilder , cast:: AsArray , downcast_integer,
30+ types:: * ,
3031} ;
3132use arrow_array:: {
3233 TimestampMicrosecondArray , TimestampMillisecondArray , TimestampNanosecondArray ,
3334 TimestampSecondArray , UInt32Array , UInt64Array ,
3435} ;
35- use arrow_buffer:: { BooleanBuffer , Buffer , i256} ;
36- use arrow_data:: ArrayDataBuilder ;
36+ use arrow_buffer:: { BooleanBuffer , Buffer , NullBuffer , ScalarBuffer , i256} ;
3737use arrow_schema:: { DataType as ArrowType , TimeUnit } ;
3838use std:: any:: Any ;
3939use std:: sync:: Arc ;
@@ -151,31 +151,49 @@ where
151151
152152 fn consume_batch ( & mut self ) -> Result < ArrayRef > {
153153 let target_type = & self . data_type ;
154- let arrow_data_type = match T :: get_physical_type ( ) {
155- PhysicalType :: BOOLEAN => ArrowType :: Boolean ,
156- PhysicalType :: INT32 => ArrowType :: Int32 ,
157- PhysicalType :: INT64 => ArrowType :: Int64 ,
158- PhysicalType :: FLOAT => ArrowType :: Float32 ,
159- PhysicalType :: DOUBLE => ArrowType :: Float64 ,
160- PhysicalType :: INT96 => ArrowType :: Int64 ,
161- PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
162- unreachable ! ( "PrimitiveArrayReaders don't support complex physical types" ) ;
163- }
164- } ;
165154
166- // Convert to equivalent arrow type to parquet physical type
155+ // Convert physical data to equivalent arrow type, and then perform
156+ // coercion as needed
167157 let record_data = self
168158 . record_reader
169159 . consume_record_data ( )
170160 . into_buffer ( target_type) ;
171161
172- let array_data = ArrayDataBuilder :: new ( arrow_data_type)
173- . len ( self . record_reader . num_values ( ) )
174- . add_buffer ( record_data)
175- . null_bit_buffer ( self . record_reader . consume_bitmap_buffer ( ) ) ;
176-
177- let array_data = unsafe { array_data. build_unchecked ( ) } ;
178- let array: ArrayRef = make_array ( array_data) ;
162+ let len = self . record_reader . num_values ( ) ;
163+ let nulls = self
164+ . record_reader
165+ . consume_bitmap_buffer ( )
166+ . map ( |b| NullBuffer :: new ( BooleanBuffer :: new ( b, 0 , len) ) ) ;
167+
168+ let array: ArrayRef = match T :: get_physical_type ( ) {
169+ PhysicalType :: BOOLEAN => Arc :: new ( BooleanArray :: new (
170+ BooleanBuffer :: new ( record_data, 0 , len) ,
171+ nulls,
172+ ) ) ,
173+ PhysicalType :: INT32 => Arc :: new ( Int32Array :: new (
174+ ScalarBuffer :: new ( record_data, 0 , len) ,
175+ nulls,
176+ ) ) ,
177+ PhysicalType :: INT64 => Arc :: new ( Int64Array :: new (
178+ ScalarBuffer :: new ( record_data, 0 , len) ,
179+ nulls,
180+ ) ) ,
181+ PhysicalType :: FLOAT => Arc :: new ( Float32Array :: new (
182+ ScalarBuffer :: new ( record_data, 0 , len) ,
183+ nulls,
184+ ) ) ,
185+ PhysicalType :: DOUBLE => Arc :: new ( Float64Array :: new (
186+ ScalarBuffer :: new ( record_data, 0 , len) ,
187+ nulls,
188+ ) ) ,
189+ PhysicalType :: INT96 => Arc :: new ( Int64Array :: new (
190+ ScalarBuffer :: new ( record_data, 0 , len) ,
191+ nulls,
192+ ) ) ,
193+ PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
194+ unreachable ! ( "PrimitiveArrayReaders don't support complex physical types" ) ;
195+ }
196+ } ;
179197
180198 // Coerce the arrow type to the desired array type
181199 let array = coerce_array ( array, target_type) ?;
@@ -218,7 +236,7 @@ fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
218236 ArrowType :: Int32 => coerce_i32 ( array. as_primitive ( ) , target_type) ,
219237 ArrowType :: Int64 => coerce_i64 ( array. as_primitive ( ) , target_type) ,
220238 ArrowType :: Boolean | ArrowType :: Float32 | ArrowType :: Float64 => Ok ( array) ,
221- _ => unreachable ! ( ) ,
239+ _ => unreachable ! ( "Cannot coerce array of type {}" , array . data_type ( ) ) ,
222240 }
223241}
224242
0 commit comments