@@ -23,7 +23,12 @@ use crate::column::page::PageIterator;
2323use crate :: data_type:: { DataType , Int96 } ;
2424use crate :: errors:: Result ;
2525use crate :: schema:: types:: ColumnDescPtr ;
26- use arrow_array:: { Array , ArrayRef , Date64Array , Decimal64Array , Decimal128Array , Decimal256Array , Int8Array , Int16Array , Int32Array , Int64Array , PrimitiveArray , UInt8Array , UInt16Array , builder:: PrimitiveDictionaryBuilder , cast:: AsArray , downcast_integer, types:: * , BooleanArray , Decimal32Array , Float64Array , Float32Array } ;
26+ use arrow_array:: {
27+ Array , ArrayRef , BooleanArray , Date64Array , Decimal64Array , Decimal128Array ,
28+ Decimal256Array , Float32Array , Float64Array , Int8Array , Int16Array , Int32Array , Int64Array ,
29+ PrimitiveArray , UInt8Array , UInt16Array , builder:: PrimitiveDictionaryBuilder , cast:: AsArray ,
30+ downcast_integer, types:: * ,
31+ } ;
2732use arrow_array:: {
2833 TimestampMicrosecondArray , TimestampMillisecondArray , TimestampNanosecondArray ,
2934 TimestampSecondArray , UInt32Array , UInt64Array ,
@@ -146,19 +151,9 @@ where
146151
147152 fn consume_batch ( & mut self ) -> Result < ArrayRef > {
148153 let target_type = & self . data_type ;
149- let arrow_data_type = match T :: get_physical_type ( ) {
150- PhysicalType :: BOOLEAN => ArrowType :: Boolean ,
151- PhysicalType :: INT32 => ArrowType :: Int32 ,
152- PhysicalType :: INT64 => ArrowType :: Int64 ,
153- PhysicalType :: FLOAT => ArrowType :: Float32 ,
154- PhysicalType :: DOUBLE => ArrowType :: Float64 ,
155- PhysicalType :: INT96 => ArrowType :: Int64 ,
156- PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
157- unreachable ! ( "PrimitiveArrayReaders don't support complex physical types" ) ;
158- }
159- } ;
160154
161- // Convert to equivalent arrow type to parquet physical type
155+ // Convert physical data to equivalent arrow type, and then perform
156+ // coercion as needed
162157 let record_data = self
163158 . record_reader
164159 . consume_record_data ( )
@@ -175,36 +170,14 @@ where
175170 BooleanBuffer :: new ( record_data, 0 , len) ,
176171 nulls,
177172 ) ) ,
178- PhysicalType :: INT32 => match arrow_data_type {
179- ArrowType :: UInt32 => Arc :: new ( UInt32Array :: new (
180- ScalarBuffer :: new ( record_data, 0 , len) ,
181- nulls,
182- ) ) ,
183- ArrowType :: Int32 => Arc :: new ( Int32Array :: new (
184- ScalarBuffer :: new ( record_data, 0 , len) ,
185- nulls,
186- ) ) ,
187- ArrowType :: Decimal32 ( _, _) => Arc :: new ( Decimal32Array :: new (
188- ScalarBuffer :: new ( record_data, 0 , len) ,
189- nulls,
190- ) ) ,
191- _ => unreachable ! ( ) ,
192- } ,
193- PhysicalType :: INT64 => match arrow_data_type {
194- ArrowType :: UInt64 => Arc :: new ( UInt64Array :: new (
195- ScalarBuffer :: new ( record_data, 0 , len) ,
196- nulls,
197- ) ) ,
198- ArrowType :: Int64 => Arc :: new ( Int64Array :: new (
199- ScalarBuffer :: new ( record_data, 0 , len) ,
200- nulls,
201- ) ) ,
202- ArrowType :: Decimal64 ( _, _) => Arc :: new ( Decimal64Array :: new (
203- ScalarBuffer :: new ( record_data, 0 , len) ,
204- nulls,
205- ) ) ,
206- _ => unreachable ! ( ) ,
207- } ,
173+ PhysicalType :: INT32 => Arc :: new ( Int32Array :: new (
174+ ScalarBuffer :: new ( record_data, 0 , len) ,
175+ nulls,
176+ ) ) ,
177+ PhysicalType :: INT64 => Arc :: new ( Int64Array :: new (
178+ ScalarBuffer :: new ( record_data, 0 , len) ,
179+ nulls,
180+ ) ) ,
208181 PhysicalType :: FLOAT => Arc :: new ( Float32Array :: new (
209182 ScalarBuffer :: new ( record_data, 0 , len) ,
210183 nulls,
@@ -213,22 +186,10 @@ where
213186 ScalarBuffer :: new ( record_data, 0 , len) ,
214187 nulls,
215188 ) ) ,
216- PhysicalType :: INT96 => match target_type {
217- ArrowType :: Timestamp ( TimeUnit :: Second , _) => Arc :: new ( TimestampSecondArray :: new (
218- ScalarBuffer :: new ( record_data, 0 , len) ,
219- nulls,
220- ) ) ,
221- ArrowType :: Timestamp ( TimeUnit :: Millisecond , _) => Arc :: new (
222- TimestampMillisecondArray :: new ( ScalarBuffer :: new ( record_data, 0 , len) , nulls) ,
223- ) ,
224- ArrowType :: Timestamp ( TimeUnit :: Microsecond , _) => Arc :: new (
225- TimestampMicrosecondArray :: new ( ScalarBuffer :: new ( record_data, 0 , len) , nulls) ,
226- ) ,
227- ArrowType :: Timestamp ( TimeUnit :: Nanosecond , _) => Arc :: new (
228- TimestampNanosecondArray :: new ( ScalarBuffer :: new ( record_data, 0 , len) , nulls) ,
229- ) ,
230- _ => unreachable ! ( "INT96 must be a timestamp." ) ,
231- } ,
189+ PhysicalType :: INT96 => Arc :: new ( Int64Array :: new (
190+ ScalarBuffer :: new ( record_data, 0 , len) ,
191+ nulls,
192+ ) ) ,
232193 PhysicalType :: BYTE_ARRAY | PhysicalType :: FIXED_LEN_BYTE_ARRAY => {
233194 unreachable ! ( "PrimitiveArrayReaders don't support complex physical types" ) ;
234195 }
@@ -275,7 +236,7 @@ fn coerce_array(array: ArrayRef, target_type: &ArrowType) -> Result<ArrayRef> {
275236 ArrowType :: Int32 => coerce_i32 ( array. as_primitive ( ) , target_type) ,
276237 ArrowType :: Int64 => coerce_i64 ( array. as_primitive ( ) , target_type) ,
277238 ArrowType :: Boolean | ArrowType :: Float32 | ArrowType :: Float64 => Ok ( array) ,
278- _ => unreachable ! ( ) ,
239+ _ => unreachable ! ( "Cannot coerce array of type {}" , array . data_type ( ) ) ,
279240 }
280241}
281242
0 commit comments