Skip to content

Commit 6af5aa0

Browse files
committed
add interoperability test
1 parent 87def04 commit 6af5aa0

File tree

2 files changed

+287
-0
lines changed

2 files changed

+287
-0
lines changed

parquet/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ name = "arrow_writer"
170170
required-features = ["arrow"]
171171
path = "./tests/arrow_writer.rs"
172172

173+
[[test]]
174+
name = "ieee754_nan_interop"
175+
required-features = ["arrow"]
176+
path = "./tests/ieee754_nan_interop.rs"
177+
173178
[[test]]
174179
name = "encryption"
175180
required-features = ["arrow"]
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Interoperability test for https://github.com/apache/parquet-format/pull/514.
19+
//! Demonstrate reading NaN statstics and counts from a file generated with
20+
//! parquet-java, and show that on write we produce the same statistics.
21+
22+
use bytes::Bytes;
23+
use core::f32;
24+
use half::f16;
25+
use std::{fs, path::PathBuf, sync::Arc};
26+
27+
use arrow::util::test_util::parquet_test_data;
28+
use arrow_array::{Array, Float16Array, Float32Array, Float64Array, UInt64Array};
29+
use arrow_schema::Schema;
30+
use parquet::{
31+
arrow::{
32+
ArrowWriter,
33+
arrow_reader::{ArrowReaderBuilder, ArrowReaderOptions, statistics::StatisticsConverter},
34+
},
35+
errors::Result,
36+
file::{metadata::ParquetMetaData, properties::WriterProperties},
37+
schema::types::SchemaDescriptor,
38+
};
39+
40+
const NAN_COUNTS: [u64; 5] = [0, 4, 10, 0, 0];
41+
42+
const FLOAT_NEG_NAN_SMALL: f32 = f32::from_bits(0xffffffff);
43+
const FLOAT_NEG_NAN_LARGE: f32 = f32::from_bits(0xfff00001);
44+
const FLOAT_NAN_SMALL: f32 = f32::from_bits(0x7fc00001);
45+
const FLOAT_NAN_LARGE: f32 = f32::from_bits(0x7fffffff);
46+
47+
const FLOAT_MINS: [f32; 5] = [-2.0, -2.0, FLOAT_NEG_NAN_SMALL, 0.0, -5.0];
48+
const FLOAT_MAXS: [f32; 5] = [5.0, 3.0, FLOAT_NAN_LARGE, 5.0, -0.0];
49+
50+
fn validate_float_metadata(
51+
metadata: &ParquetMetaData,
52+
arrow_schema: &Schema,
53+
parquet_schema: &SchemaDescriptor,
54+
) -> Result<()> {
55+
let converter = StatisticsConverter::try_new("float_ieee754", arrow_schema, parquet_schema)?;
56+
let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect();
57+
58+
// verify column statistics mins
59+
let exp: Arc<dyn Array> = Arc::new(Float32Array::from(FLOAT_MINS.to_vec()));
60+
let mins = converter.row_group_mins(metadata.row_groups())?;
61+
assert_eq!(&mins, &exp);
62+
63+
// verify page mins (should be 1 page per row group, so should be same)
64+
let page_mins = converter.data_page_mins(
65+
metadata.column_index().unwrap(),
66+
metadata.offset_index().unwrap(),
67+
&row_group_indices,
68+
)?;
69+
assert_eq!(&page_mins, &exp);
70+
71+
let exp: Arc<dyn Array> = Arc::new(Float32Array::from(FLOAT_MAXS.to_vec()));
72+
let maxs = converter.row_group_maxes(metadata.row_groups())?;
73+
assert_eq!(&maxs, &exp);
74+
75+
// verify page maxs (should be 1 page per row group, so should be same)
76+
let page_maxs = converter.data_page_maxes(
77+
metadata.column_index().unwrap(),
78+
metadata.offset_index().unwrap(),
79+
&row_group_indices,
80+
)?;
81+
assert_eq!(&page_maxs, &exp);
82+
83+
let exp = UInt64Array::from(NAN_COUNTS.to_vec());
84+
let nans = converter.row_group_nan_counts(metadata.row_groups())?;
85+
assert_eq!(&nans, &exp);
86+
87+
let page_nans = converter.data_page_nan_counts(
88+
metadata.column_index().unwrap(),
89+
metadata.offset_index().unwrap(),
90+
&row_group_indices,
91+
)?;
92+
assert_eq!(&page_nans, &exp);
93+
94+
Ok(())
95+
}
96+
97+
const DOUBLE_NEG_NAN_SMALL: f64 = f64::from_bits(0xffffffffffffffff);
98+
const DOUBLE_NEG_NAN_LARGE: f64 = f64::from_bits(0xfff0000000000001);
99+
const DOUBLE_NAN_SMALL: f64 = f64::from_bits(0x7ff0000000000001);
100+
const DOUBLE_NAN_LARGE: f64 = f64::from_bits(0x7fffffffffffffff);
101+
102+
const DOUBLE_MINS: [f64; 5] = [-2.0, -2.0, DOUBLE_NEG_NAN_SMALL, 0.0, -5.0];
103+
const DOUBLE_MAXS: [f64; 5] = [5.0, 3.0, DOUBLE_NAN_LARGE, 5.0, -0.0];
104+
105+
fn validate_double_metadata(
106+
metadata: &ParquetMetaData,
107+
arrow_schema: &Schema,
108+
parquet_schema: &SchemaDescriptor,
109+
) -> Result<()> {
110+
let converter = StatisticsConverter::try_new("double_ieee754", arrow_schema, parquet_schema)?;
111+
let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect();
112+
113+
// verify column statistics mins
114+
let exp: Arc<dyn Array> = Arc::new(Float64Array::from(DOUBLE_MINS.to_vec()));
115+
let mins = converter.row_group_mins(metadata.row_groups())?;
116+
assert_eq!(&mins, &exp);
117+
118+
// verify page mins (should be 1 page per row group, so should be same)
119+
let page_mins = converter.data_page_mins(
120+
metadata.column_index().unwrap(),
121+
metadata.offset_index().unwrap(),
122+
&row_group_indices,
123+
)?;
124+
assert_eq!(&page_mins, &exp);
125+
126+
let exp: Arc<dyn Array> = Arc::new(Float64Array::from(DOUBLE_MAXS.to_vec()));
127+
let maxs = converter.row_group_maxes(metadata.row_groups())?;
128+
assert_eq!(&maxs, &exp);
129+
130+
// verify page maxs (should be 1 page per row group, so should be same)
131+
let page_maxs = converter.data_page_maxes(
132+
metadata.column_index().unwrap(),
133+
metadata.offset_index().unwrap(),
134+
&row_group_indices,
135+
)?;
136+
assert_eq!(&page_maxs, &exp);
137+
138+
let exp = UInt64Array::from(NAN_COUNTS.to_vec());
139+
let nans = converter.row_group_nan_counts(metadata.row_groups())?;
140+
assert_eq!(&nans, &exp);
141+
142+
let page_nans = converter.data_page_nan_counts(
143+
metadata.column_index().unwrap(),
144+
metadata.offset_index().unwrap(),
145+
&row_group_indices,
146+
)?;
147+
assert_eq!(&page_nans, &exp);
148+
149+
Ok(())
150+
}
151+
152+
const FLOAT16_NEG_NAN_SMALL: f16 = f16::from_bits(0xffff);
153+
const FLOAT16_NEG_NAN_LARGE: f16 = f16::from_bits(0xfc01);
154+
const FLOAT16_NAN_SMALL: f16 = f16::from_bits(0x7c01);
155+
const FLOAT16_NAN_LARGE: f16 = f16::from_bits(0x7fff);
156+
157+
const FLOAT16_MINS: [f16; 5] = [
158+
f16::from_bits(0xc000),
159+
f16::from_bits(0xc000),
160+
FLOAT16_NEG_NAN_SMALL,
161+
f16::from_bits(0x0000),
162+
f16::from_bits(0xc500),
163+
];
164+
const FLOAT16_MAXS: [f16; 5] = [
165+
f16::from_bits(0x4500),
166+
f16::from_bits(0x4200),
167+
FLOAT16_NAN_LARGE,
168+
f16::from_bits(0x4500),
169+
f16::from_bits(0x8000),
170+
];
171+
172+
fn validate_float16_metadata(
173+
metadata: &ParquetMetaData,
174+
arrow_schema: &Schema,
175+
parquet_schema: &SchemaDescriptor,
176+
) -> Result<()> {
177+
let converter = StatisticsConverter::try_new("float16_ieee754", arrow_schema, parquet_schema)?;
178+
let row_group_indices: Vec<_> = (0..metadata.num_row_groups()).collect();
179+
180+
// verify column statistics mins
181+
let exp: Arc<dyn Array> = Arc::new(Float16Array::from(FLOAT16_MINS.to_vec()));
182+
let mins = converter.row_group_mins(metadata.row_groups())?;
183+
assert_eq!(&mins, &exp);
184+
185+
// verify page mins (should be 1 page per row group, so should be same)
186+
let page_mins = converter.data_page_mins(
187+
metadata.column_index().unwrap(),
188+
metadata.offset_index().unwrap(),
189+
&row_group_indices,
190+
)?;
191+
assert_eq!(&page_mins, &exp);
192+
193+
let exp: Arc<dyn Array> = Arc::new(Float16Array::from(FLOAT16_MAXS.to_vec()));
194+
let maxs = converter.row_group_maxes(metadata.row_groups())?;
195+
assert_eq!(&maxs, &exp);
196+
197+
// verify page maxs (should be 1 page per row group, so should be same)
198+
let page_maxs = converter.data_page_maxes(
199+
metadata.column_index().unwrap(),
200+
metadata.offset_index().unwrap(),
201+
&row_group_indices,
202+
)?;
203+
assert_eq!(&page_maxs, &exp);
204+
205+
let exp = UInt64Array::from(NAN_COUNTS.to_vec());
206+
let nans = converter.row_group_nan_counts(metadata.row_groups())?;
207+
assert_eq!(&nans, &exp);
208+
209+
let page_nans = converter.data_page_nan_counts(
210+
metadata.column_index().unwrap(),
211+
metadata.offset_index().unwrap(),
212+
&row_group_indices,
213+
)?;
214+
assert_eq!(&page_nans, &exp);
215+
216+
Ok(())
217+
}
218+
219+
fn validate_metadata(
220+
metadata: &ParquetMetaData,
221+
arrow_schema: &Schema,
222+
parquet_schema: &SchemaDescriptor,
223+
) -> Result<()> {
224+
validate_float_metadata(metadata, arrow_schema, parquet_schema)?;
225+
validate_double_metadata(metadata, arrow_schema, parquet_schema)?;
226+
validate_float16_metadata(metadata, arrow_schema, parquet_schema)
227+
}
228+
229+
#[test]
230+
fn test_ieee754_interop() {
231+
// 1) read interop file
232+
// 2) validate stats are as expected
233+
// 3) rewrite file, check validate metadata from writer
234+
// 4) re-read what we've written, again validate metadata
235+
let parquet_testing_data = parquet_test_data();
236+
let path = PathBuf::from(parquet_testing_data).join("floating_orders_nan_count.parquet");
237+
println!("Reading file: {path:?}");
238+
239+
let file = std::fs::File::open(&path).unwrap();
240+
let options = ArrowReaderOptions::new()
241+
.with_page_index_policy(parquet::file::metadata::PageIndexPolicy::Required);
242+
let builder = ArrowReaderBuilder::try_new_with_options(file, options).unwrap();
243+
let file_metadata = builder.metadata().clone();
244+
let schema = builder.schema().clone();
245+
let parquet_schema = builder.parquet_schema().clone();
246+
247+
println!("validate interop file");
248+
validate_metadata(file_metadata.as_ref(), schema.as_ref(), &parquet_schema)
249+
.expect("validate read metadata");
250+
251+
let reader = builder.build().unwrap();
252+
let mut outbuf = Vec::new();
253+
{
254+
let writer_options = WriterProperties::builder()
255+
.set_max_row_group_row_count(Some(10))
256+
.build();
257+
let mut writer = ArrowWriter::try_new(&mut outbuf, schema.clone(), Some(writer_options))
258+
.expect("create arrow writer");
259+
for maybe_batch in reader {
260+
let batch = maybe_batch.expect("reading batch");
261+
writer.write(&batch).expect("writing data");
262+
}
263+
let write_meta = writer.close().expect("closing file");
264+
println!("validate writer output");
265+
validate_metadata(&write_meta, schema.as_ref(), &parquet_schema)
266+
.expect("validate written metadata");
267+
}
268+
269+
fs::write("output.pq", outbuf.clone()).unwrap();
270+
271+
// now re-validate the bit we've written
272+
let options = ArrowReaderOptions::new()
273+
.with_page_index_policy(parquet::file::metadata::PageIndexPolicy::Required);
274+
let builder = ArrowReaderBuilder::try_new_with_options(Bytes::from(outbuf), options).unwrap();
275+
let file_metadata = builder.metadata().clone();
276+
let schema = builder.schema().clone();
277+
let parquet_schema = builder.parquet_schema().clone();
278+
279+
println!("validate from rust output");
280+
validate_metadata(file_metadata.as_ref(), schema.as_ref(), &parquet_schema)
281+
.expect("validate re-read metadata");
282+
}

0 commit comments

Comments
 (0)