Skip to content

Commit 1b6dd37

Browse files
committed
test: compression fallback wins
Modify the compression fallback test to illustrate the benefit in an admittedly differently contrived case. The heuristic borrowed from parquet-java is still not ideal for all cases, so we'll need more configurability.
1 parent 701ff2b commit 1b6dd37

File tree

1 file changed

+42
-37
lines changed
  • parquet/src/arrow/arrow_writer

1 file changed

+42
-37
lines changed

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4857,6 +4857,15 @@ mod tests {
48574857
assert_eq!(chunk_page_stats, file_page_stats);
48584858
}
48594859

4860+
fn get_dict_page_size(meta: &ColumnChunkMetaData, data: Bytes) -> usize {
4861+
let mut reader = SerializedPageReader::new(Arc::new(data), meta, 0, None).unwrap();
4862+
let page = reader.get_next_page().unwrap().unwrap();
4863+
match page {
4864+
Page::DictionaryPage { buf, .. } => buf.len(),
4865+
_ => panic!("expected DictionaryPage"),
4866+
}
4867+
}
4868+
48604869
#[test]
48614870
fn test_different_dict_page_size_limit() {
48624871
let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
@@ -4881,60 +4890,56 @@ mod tests {
48814890
let col0_meta = metadata.row_group(0).column(0);
48824891
let col1_meta = metadata.row_group(0).column(1);
48834892

4884-
let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4885-
let mut reader =
4886-
SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4887-
let page = reader.get_next_page().unwrap().unwrap();
4888-
match page {
4889-
Page::DictionaryPage { buf, .. } => buf.len(),
4890-
_ => panic!("expected DictionaryPage"),
4891-
}
4892-
};
4893-
4894-
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4895-
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
4893+
assert_eq!(get_dict_page_size(col0_meta, data.clone()), 1024 * 1024);
4894+
assert_eq!(get_dict_page_size(col1_meta, data.clone()), 1024 * 1024 * 4);
48964895
}
48974896

48984897
#[test]
48994898
fn test_dict_page_size_decided_by_compression_fallback() {
4900-
let array = Arc::new(Int64Array::from_iter(0..1024 * 1024));
4901-
let schema = Arc::new(Schema::new(vec![
4902-
Field::new("col0", arrow_schema::DataType::Int64, false),
4903-
Field::new("col1", arrow_schema::DataType::Int64, false),
4904-
]));
4905-
let batch =
4906-
arrow_array::RecordBatch::try_new(schema.clone(), vec![array.clone(), array]).unwrap();
4899+
// Generate values that are well dispersed across a range approximating (0..256 * 1024)
4900+
let array = Arc::new(Int32Array::from_iter(
4901+
(0i32..1024 * 1024).map(|x| x.wrapping_mul(163019) % 262139),
4902+
));
4903+
let schema = Arc::new(Schema::new(vec![Field::new(
4904+
"col0",
4905+
arrow_schema::DataType::Int32,
4906+
false,
4907+
)]));
4908+
let batch = arrow_array::RecordBatch::try_new(schema.clone(), vec![array]).unwrap();
4909+
4910+
let props = WriterProperties::builder()
4911+
.set_dictionary_page_size_limit(1024 * 1024)
4912+
.build();
4913+
let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap();
4914+
writer.write(&batch).unwrap();
4915+
let data = Bytes::from(writer.into_inner().unwrap());
4916+
4917+
// println!("file length, dictionary: {}", data.len());
4918+
4919+
let mut metadata = ParquetMetaDataReader::new();
4920+
metadata.try_parse(&data).unwrap();
4921+
let metadata = metadata.finish().unwrap();
4922+
let full_dict_meta = metadata.row_group(0).column(0);
4923+
assert_eq!(get_dict_page_size(full_dict_meta, data.clone()), 1_048_576);
49074924

49084925
let props = WriterProperties::builder()
49094926
.set_dictionary_page_size_limit(1024 * 1024)
4910-
.set_column_dictionary_page_size_limit(ColumnPath::from("col1"), 1024 * 1024 * 4)
49114927
.set_column_dictionary_fallback(
4912-
ColumnPath::from("col1"),
4928+
ColumnPath::from("col0"),
49134929
DictionaryFallback::OnUnfavorableCompression,
49144930
)
49154931
.build();
4916-
let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
4932+
let mut writer = ArrowWriter::try_new(Vec::new(), schema.clone(), Some(props)).unwrap();
49174933
writer.write(&batch).unwrap();
49184934
let data = Bytes::from(writer.into_inner().unwrap());
49194935

4936+
// println!("file length, fallback: {}", data.len());
4937+
49204938
let mut metadata = ParquetMetaDataReader::new();
49214939
metadata.try_parse(&data).unwrap();
49224940
let metadata = metadata.finish().unwrap();
4923-
let col0_meta = metadata.row_group(0).column(0);
4924-
let col1_meta = metadata.row_group(0).column(1);
4925-
4926-
let get_dict_page_size = move |meta: &ColumnChunkMetaData| {
4927-
let mut reader =
4928-
SerializedPageReader::new(Arc::new(data.clone()), meta, 0, None).unwrap();
4929-
let page = reader.get_next_page().unwrap().unwrap();
4930-
match page {
4931-
Page::DictionaryPage { buf, .. } => buf.len(),
4932-
_ => panic!("expected DictionaryPage"),
4933-
}
4934-
};
4935-
4936-
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
4937-
assert_eq!(get_dict_page_size(col1_meta), 8192);
4941+
let fallback_meta = metadata.row_group(0).column(0);
4942+
assert_eq!(get_dict_page_size(fallback_meta, data.clone()), 4096);
49384943
}
49394944

49404945
struct WriteBatchesShape {

0 commit comments

Comments
 (0)