Skip to content

Commit f4f92e6

Browse files
Add row group index virual column
1 parent 67e04e7 commit f4f92e6

File tree

6 files changed

+521
-8
lines changed

6 files changed

+521
-8
lines changed

parquet/src/arrow/array_reader/builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use crate::arrow::array_reader::cached_array_reader::CachedArrayReader;
2626
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
2727
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
2828
use crate::arrow::array_reader::row_group_cache::RowGroupCache;
29+
use crate::arrow::array_reader::row_group_index::RowGroupIndexReader;
2930
use crate::arrow::array_reader::row_number::RowNumberReader;
3031
use crate::arrow::array_reader::{
3132
ArrayReader, FixedSizeListArrayReader, ListArrayReader, MapArrayReader, NullArrayReader,
@@ -169,6 +170,7 @@ impl<'a> ArrayReaderBuilder<'a> {
169170
// They need to be built by specialized readers
170171
match virtual_type {
171172
VirtualColumnType::RowNumber => Ok(Some(self.build_row_number_reader()?)),
173+
VirtualColumnType::RowGroupIndex => Ok(Some(self.build_row_group_index_reader()?)),
172174
}
173175
}
174176
ParquetFieldType::Group { .. } => match &field.arrow_type {
@@ -194,6 +196,18 @@ impl<'a> ArrayReaderBuilder<'a> {
194196
)?))
195197
}
196198

199+
fn build_row_group_index_reader(&self) -> Result<Box<dyn ArrayReader>> {
200+
let parquet_metadata = self.parquet_metadata.ok_or_else(|| {
201+
ParquetError::General(
202+
"ParquetMetaData is required to read virtual row group index columns.".to_string(),
203+
)
204+
})?;
205+
Ok(Box::new(RowGroupIndexReader::try_new(
206+
parquet_metadata,
207+
self.row_groups.row_groups(),
208+
)?))
209+
}
210+
197211
/// Build array reader for map type.
198212
fn build_map_reader(
199213
&self,

parquet/src/arrow/array_reader/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod map_array;
4343
mod null_array;
4444
mod primitive_array;
4545
mod row_group_cache;
46+
mod row_group_index;
4647
mod row_number;
4748
mod struct_array;
4849

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::arrow::array_reader::ArrayReader;
19+
use crate::errors::{ParquetError, Result};
20+
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
21+
use arrow_array::{ArrayRef, Int64Array};
22+
use arrow_schema::DataType;
23+
use std::any::Any;
24+
use std::collections::HashMap;
25+
use std::sync::Arc;
26+
27+
pub(crate) struct RowGroupIndexReader {
28+
buffered_indices: Vec<i64>,
29+
remaining_indices: std::iter::Flatten<std::vec::IntoIter<std::iter::RepeatN<i64>>>,
30+
}
31+
32+
impl RowGroupIndexReader {
33+
pub(crate) fn try_new<'a>(
34+
parquet_metadata: &'a ParquetMetaData,
35+
row_groups: impl Iterator<Item = &'a RowGroupMetaData>,
36+
) -> Result<Self> {
37+
// build mapping from ordinal to row group index
38+
// this is O(M) where M is the total number of row groups in the file
39+
let mut ordinal_to_index: HashMap<i16, i64> = HashMap::new();
40+
41+
for (row_group_index, rg) in parquet_metadata.row_groups().iter().enumerate() {
42+
if let Some(ordinal) = rg.ordinal() {
43+
ordinal_to_index.insert(ordinal, row_group_index as i64);
44+
}
45+
}
46+
47+
// build repeating iterators in the order specified by the row_groups iterator
48+
// this is O(n) where n is the number of selected row groups
49+
let repeated_indices: Vec<_> = row_groups
50+
.map(|rg| {
51+
let ordinal = rg.ordinal().ok_or_else(|| {
52+
ParquetError::General(
53+
"Row group missing ordinal field, required to compute row group indices"
54+
.to_string(),
55+
)
56+
})?;
57+
58+
let row_group_index = ordinal_to_index.get(&ordinal).ok_or_else(|| {
59+
ParquetError::General(format!(
60+
"Row group with ordinal {} not found in metadata",
61+
ordinal
62+
))
63+
})?;
64+
65+
// repeat row group index for each row in this row group
66+
Ok(std::iter::repeat_n(*row_group_index, rg.num_rows() as usize))
67+
})
68+
.collect::<Result<_>>()?;
69+
70+
Ok(Self {
71+
buffered_indices: Vec::new(),
72+
remaining_indices: repeated_indices.into_iter().flatten(),
73+
})
74+
}
75+
}
76+
77+
impl ArrayReader for RowGroupIndexReader {
78+
fn read_records(&mut self, batch_size: usize) -> Result<usize> {
79+
let starting_len = self.buffered_indices.len();
80+
self.buffered_indices
81+
.extend((&mut self.remaining_indices).take(batch_size));
82+
Ok(self.buffered_indices.len() - starting_len)
83+
}
84+
85+
fn skip_records(&mut self, num_records: usize) -> Result<usize> {
86+
// TODO: Use advance_by when it stabilizes to improve performance
87+
Ok((&mut self.remaining_indices).take(num_records).count())
88+
}
89+
90+
fn as_any(&self) -> &dyn Any {
91+
self
92+
}
93+
94+
fn get_data_type(&self) -> &DataType {
95+
&DataType::Int64
96+
}
97+
98+
fn consume_batch(&mut self) -> Result<ArrayRef> {
99+
Ok(Arc::new(Int64Array::from_iter(
100+
self.buffered_indices.drain(..),
101+
)))
102+
}
103+
104+
fn get_def_levels(&self) -> Option<&[i16]> {
105+
None
106+
}
107+
108+
fn get_rep_levels(&self) -> Option<&[i16]> {
109+
None
110+
}
111+
}
112+
113+
#[cfg(test)]
114+
mod tests {
115+
use super::*;
116+
use crate::basic::Type as PhysicalType;
117+
use crate::file::metadata::{
118+
ColumnChunkMetaData, FileMetaData, ParquetMetaData, RowGroupMetaData,
119+
};
120+
use crate::schema::types::{SchemaDescriptor, Type as SchemaType};
121+
use std::sync::Arc;
122+
123+
fn create_test_schema() -> Arc<SchemaDescriptor> {
124+
let schema = SchemaType::group_type_builder("schema")
125+
.with_fields(vec![Arc::new(
126+
SchemaType::primitive_type_builder("test_col", PhysicalType::INT32)
127+
.build()
128+
.unwrap(),
129+
)])
130+
.build()
131+
.unwrap();
132+
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
133+
}
134+
135+
fn create_test_parquet_metadata(row_groups: Vec<(i16, i64)>) -> ParquetMetaData {
136+
let schema_descr = create_test_schema();
137+
138+
let mut row_group_metas = vec![];
139+
for (ordinal, num_rows) in row_groups {
140+
let columns: Vec<_> = schema_descr
141+
.columns()
142+
.iter()
143+
.map(|col| ColumnChunkMetaData::builder(col.clone()).build().unwrap())
144+
.collect();
145+
146+
let row_group = RowGroupMetaData::builder(schema_descr.clone())
147+
.set_num_rows(num_rows)
148+
.set_ordinal(ordinal)
149+
.set_total_byte_size(100)
150+
.set_column_metadata(columns)
151+
.build()
152+
.unwrap();
153+
row_group_metas.push(row_group);
154+
}
155+
156+
let total_rows: i64 = row_group_metas.iter().map(|rg| rg.num_rows()).sum();
157+
let file_metadata = FileMetaData::new(1, total_rows, None, None, schema_descr, None);
158+
159+
ParquetMetaData::new(file_metadata, row_group_metas)
160+
}
161+
162+
#[test]
163+
fn test_row_group_index_reader_basic() {
164+
// create metadata with 3 row groups, each with varying number of rows
165+
let metadata = create_test_parquet_metadata(vec![
166+
(0, 2), // rg: 0, ordinal: 0, 2 rows
167+
(1, 3), // rg: 1, ordinal: 1, 3 rows
168+
(2, 1), // rg: 2, ordinal: 2, 1 row
169+
]);
170+
171+
let selected_row_groups: Vec<_> = metadata.row_groups().iter().collect();
172+
173+
let mut reader =
174+
RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();
175+
176+
// 2 rows + 3 rows + 1 row
177+
let num_read = reader.read_records(6).unwrap();
178+
assert_eq!(num_read, 6);
179+
180+
let array = reader.consume_batch().unwrap();
181+
let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
182+
183+
let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
184+
assert_eq!(actual, [0, 0, 1, 1, 1, 2],);
185+
}
186+
187+
#[test]
188+
fn test_row_group_index_reader_reverse_order() {
189+
// create metadata with 3 row groups, each rg has 2 rows
190+
let metadata = create_test_parquet_metadata(vec![(0, 2), (1, 2), (2, 2)]);
191+
192+
// select only rgs with ordinals 2 and 0 (in that order)
193+
// means select row group 2 first, then row group 0, skipping row group 1
194+
let selected_row_groups: Vec<_> =
195+
vec![&metadata.row_groups()[2], &metadata.row_groups()[0]];
196+
197+
let mut reader =
198+
RowGroupIndexReader::try_new(&metadata, selected_row_groups.into_iter()).unwrap();
199+
200+
let num_read = reader.read_records(6).unwrap();
201+
// 2 rgs * 2 rows each
202+
assert_eq!(num_read, 4);
203+
204+
let array = reader.consume_batch().unwrap();
205+
let indices = array.as_any().downcast_ref::<Int64Array>().unwrap();
206+
207+
let actual: Vec<i64> = indices.iter().map(|v| v.unwrap()).collect();
208+
209+
assert_eq!(actual, [2, 2, 0, 0],);
210+
}
211+
}

0 commit comments

Comments
 (0)