Skip to content

Commit 073e3d5

Browse files
Add row group index virtual column
1 parent 67e04e7 commit 073e3d5

File tree

6 files changed

+526
-8
lines changed

6 files changed

+526
-8
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
2626
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2727
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2828
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
29+
use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
2930
use crate::arrow::array_reader::row_number::RowNumberReader;
3031
use crate::arrow::array_reader::{
3132
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
@@ -169,6 +170,9 @@ impl<'a> ArrayReaderBuilder<'a> {
169170
// They need to be built by specialized readers
170171
match virtual_type {
171172
VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
173+
VirtualColumnType::RowGroupIndex => {
174+
+ Ok(Some(self.build_row_group_index_reader()?))
175+
+ }
172176
}
173177
}
174178
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -194,6 +198,18 @@ impl<'a> ArrayReaderBuilder<'a> {
194198
)?))
195199
}
196200

201+
fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
202+
let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
203+
ParquetError::General(
204+
"ParquetMetaData is required to read virtual row group index columns.".to_string(),
205+
)
206+
})?;
207+
Ok(Box::new(RowGroupIndexReader::try_new(
208+
parquet_metadata,
209+
self.row_groups.row_groups(),
210+
)?))
211+
}
212+
197213
/// Build array reader for map type.
198214
fn build_map_reader(
199215
&self,

parquet/src/arrow/array_reader/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod map_array;
4343
mod null_array;
4444
mod primitive_array;
4545
mod row_group_cache;
46+
mod row_group_index;
4647
mod row_number;
4748
mod struct_array;
4849

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::array_reader::ArrayReader;
19+
use crate::errors::{ParquetError, Result};
20+
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
21+
use arrow_array::{ArrayRef, Int64Array};
22+
use arrow_schema::DataType;
23+
use std::any::Any;
24+
use std::collections::HashMap;
25+
use std::sync::Arc;
26+
27+
pub(crate) struct RowGroupIndexReader {
28+
buffered_indices: Vec<i64>,
29+
remaining_indices: std::iter::Flatten<std::vec::IntoIter<std::iter::RepeatN<i64>>>,
30+
}
31+
32+
impl RowGroupIndexReader {
33+
pub(crate) fn try_new<'a>(
34+
parquet_metadata: &'a ParquetMetaData,
35+
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
36+
) -> Result<Self> {
37+
// build mapping from ordinal to row group index
38+
// this is O(M) where M is the total number of row groups in the file
39+
let ordinal_to_index: HashMap<i16, i64> =
40+
HashMap::from_iter(parquet_metadata.row_groups().iter().enumerate().filter_map(
41+
|(row_group_index, rg)| {
42+
rg.ordinal()
43+
.map(|ordinal| (ordinal, row_group_index as i64))
44+
},
45+
));
46+
47+
// build repeating iterators in the order specified by the row_groups iterator
48+
// this is O(n) where n is the number of selected row groups
49+
let repeated_indices: Vec<_> = row_groups
50+
.map(|rg| {
51+
let ordinal = rg.ordinal().ok_or_else(|| {
52+
ParquetError::General(
53+
"Row group missing ordinal field, required to compute row group indices"
54+
.to_string(),
55+
)
56+
})?;
57+
58+
let row_group_index = ordinal_to_index.get(&ordinal).ok_or_else(|| {
59+
ParquetError::General(format!(
60+
"Row group with ordinal {} not found in metadata",
61+
ordinal
62+
))
63+
})?;
64+
65+
// repeat row group index for each row in this row group
66+
Ok(std::iter::repeat_n(
67+
*row_group_index,
68+
rg.num_rows() as usize,
69+
))
70+
})
71+
.collect::<Result<_>>()?;
72+
73+
Ok(Self {
74+
buffered_indices: Vec::new(),
75+
remaining_indices: repeated_indices.into_iter().flatten(),
76+
})
77+
}
78+
}
79+
80+
impl ArrayReader for RowGroupIndexReader {
81+
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
82+
let starting_len = self.buffered_indices.len();
83+
self.buffered_indices
84+
.extend((&mut self.remaining_indices).take(batch_size));
85+
Ok(self.buffered_indices.len() - starting_len)
86+
}
87+
88+
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
89+
// TODO: Use advance_by when it stabilizes to improve performance
90+
Ok((&mut self.remaining_indices).take(num_records).count())
91+
}
92+
93+
fn as_any(&self) -> &dyn Any {
94+
self
95+
}
96+
97+
fn get_data_type(&self) -> &DataType {
98+
&DataType::Int64
99+
}
100+
101+
fn consume_batch(&mut self) -> Result<ArrayRef> {
102+
Ok(Arc::new(Int64Array::from_iter(
103+
self.buffered_indices.drain(..),
104+
)))
105+
}
106+
107+
fn get_def_levels(&self) -> Option<&[i16]> {
108+
None
109+
}
110+
111+
fn get_rep_levels(&self) -> Option<&[i16]> {
112+
None
113+
}
114+
}
115+
116+
#[cfg(test)]
117+
mod tests {
118+
use super::*;
119+
use crate::basic::Type as PhysicalType;
120+
use crate::file::metadata::{
121+
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
122+
};
123+
use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
124+
use std::sync::Arc;
125+
126+
fn create_test_schema() -> Arc<SchemaDescriptor> {
127+
let schema = SchemaType::group_type_builder("schema")
128+
.with_fields(vec![Arc::new(
129+
SchemaType::primitive_type_builder("test_col", PhysicalType::INT32)
130+
.build()
131+
.unwrap(),
132+
)])
133+
.build()
134+
.unwrap();
135+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
136+
}
137+
138+
fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
139+
let schema_descr = create_test_schema();
140+
141+
let mut row_group_metas = vec![];
142+
for (ordinal, num_rows) in row_groups {
143+
let columns: Vec<_> = schema_descr
144+
.columns()
145+
.iter()
146+
.map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap())
147+
.collect();
148+
149+
let row_group = RowGroupMetaData::builder(schema_descr.clone())
150+
.set_num_rows(num_rows)
151+
.set_ordinal(ordinal)
152+
.set_total_byte_size(100)
153+
.set_column_metadata(columns)
154+
.build()
155+
.unwrap();
156+
row_group_metas.push(row_group);
157+
}
158+
159+
let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum();
160+
let file_metadata = FileMetaData::new(1, total_rows, None, None, schema_descr, None);
161+
162+
ParquetMetaData::new(file_metadata, row_group_metas)
163+
}
164+
165+
#[test]
166+
fn test_row_group_index_reader_basic() {
167+
// create metadata with 3 row groups, each with varying number of rows
168+
let metadata = create_test_parquet_metadata(vec![
169+
(0, 2), // rg: 0, ordinal: 0, 2 rows
170+
(1, 3), // rg: 1, ordinal: 1, 3 rows
171+
(2, 1), // rg: 2, ordinal: 2, 1 row
172+
]);
173+
174+
let selected_row_groups: Vec<_> = metadata.row_groups().iter().collect();
175+
176+
let mut reader =
177+
RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();
178+
179+
// 2 rows + 3 rows + 1 row
180+
let num_read = reader.read_records(6).unwrap();
181+
assert_eq!(num_read, 6);
182+
183+
let array = reader.consume_batch().unwrap();
184+
let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
185+
186+
let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
187+
assert_eq!(actual, [0, 0, 1, 1, 1, 2],);
188+
}
189+
190+
#[test]
191+
fn test_row_group_index_reader_reverse_order() {
192+
// create metadata with 3 row groups, each rg has 2 rows
193+
let metadata = create_test_parquet_metadata(vec![(0, 2), (1, 2), (2, 2)]);
194+
195+
// select only rgs with ordinals 2 and 0 (in that order)
196+
// means select row group 2 first, then row group 0, skipping row group 1
197+
let selected_row_groups: Vec<_> =
198+
vec![&metadata.row_groups()[2], &metadata.row_groups()[0]];
199+
200+
let mut reader =
201+
RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();
202+
203+
let num_read = reader.read_records(6).unwrap();
204+
// 2 rgs * 2 rows each
205+
assert_eq!(num_read, 4);
206+
207+
let array = reader.consume_batch().unwrap();
208+
let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
209+
210+
let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
211+
212+
assert_eq!(actual, [2, 2, 0, 0],);
213+
}
214+
}

0 commit comments

Comments
 (0)