Skip to content

Commit 711fac8

Browse files
authored
feat(parquet): add push_decoder benchmark for PushBuffers overhead (#9696)
# Which issue does this PR close? - None but relates to #9695. # Rationale for this change This PR is meant to document and measure the quadratic behavior reported in the above issue. # What changes are included in this PR? # Are these changes tested? Add a benchmark that isolates `PushBuffers` overhead during row group construction, independent of page decoding. It calls `try_next_reader` to build row group readers without consuming any pages, so the measured cost is purely buffer lookup, stitching, and release. Two benchmark groups exercise different scaling axes: - `1buf`: pushes the entire file as a single buffer, varying column count (100 to 50k). This isolates the per-range cost of `has_range`/`get_bytes` lookups and `release_through`. - `Nbuf`: pushes one buffer per requested range, varying column count (100 to 10k). This isolates the cost when buffer count equals range count. Baseline results (Apple M1 Max): ``` push_decoder/1buf/1000ranges 323.5 µs push_decoder/1buf/10000ranges 3.25 ms push_decoder/1buf/100000ranges 34.6 ms push_decoder/1buf/500000ranges 185.3 ms push_decoder/Nbuf/1000ranges 437.2 µs push_decoder/Nbuf/10000ranges 10.7 ms push_decoder/Nbuf/100000ranges 711.6 ms ``` # Are there any user-facing changes? N/A Signed-off-by: Hippolyte Barraud <hippolyte.barraud@datadoghq.com>
1 parent d69c604 commit 711fac8

File tree

2 files changed

+175
-0
lines changed

2 files changed

+175
-0
lines changed

parquet/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,11 @@ name = "arrow_writer"
221221
required-features = ["arrow"]
222222
harness = false
223223

224+
[[bench]]
225+
name = "push_decoder"
226+
required-features = ["arrow"]
227+
harness = false
228+
224229
[[bench]]
225230
name = "arrow_reader"
226231
required-features = ["arrow", "test_common", "experimental"]

parquet/benches/push_decoder.rs

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks for the push-based decoder measuring PushBuffers overhead.
19+
//!
20+
//! Uses `try_next_reader` to build row group readers without decoding any
21+
//! pages, isolating PushBuffers operations (has_range, get_bytes, clearing).
22+
23+
use std::hint::black_box;
24+
use std::sync::Arc;
25+
26+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
27+
use arrow_array::{Float32Array, RecordBatch};
28+
use bytes::Bytes;
29+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
30+
use parquet::DecodeResult;
31+
use parquet::arrow::ArrowWriter;
32+
use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
33+
use parquet::file::metadata::ParquetMetaDataPushDecoder;
34+
use parquet::file::properties::WriterProperties;
35+
36+
fn make_wide_schema(num_columns: usize) -> SchemaRef {
37+
let fields: Vec<Field> = (0..num_columns)
38+
.map(|i| Field::new(format!("c{i}"), DataType::Float32, false))
39+
.collect();
40+
Arc::new(Schema::new(fields))
41+
}
42+
43+
/// Write a Parquet file with `num_columns` columns, 10 row groups of 100 rows.
44+
fn make_test_file(num_columns: usize) -> Bytes {
45+
let num_rows = 1_000;
46+
let rows_per_rg = 100;
47+
let schema = make_wide_schema(num_columns);
48+
let columns: Vec<Arc<dyn arrow_array::Array>> = (0..num_columns)
49+
.map(|_| Arc::new(Float32Array::from(vec![0.0f32; num_rows])) as _)
50+
.collect();
51+
let batch = RecordBatch::try_new(schema.clone(), columns).unwrap();
52+
53+
let mut buf = Vec::new();
54+
let props = WriterProperties::builder()
55+
.set_max_row_group_row_count(Some(rows_per_rg))
56+
.build();
57+
let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(props)).unwrap();
58+
writer.write(&batch).unwrap();
59+
writer.close().unwrap();
60+
Bytes::from(buf)
61+
}
62+
63+
fn decode_metadata(file_data: &Bytes) -> Arc<parquet::file::metadata::ParquetMetaData> {
64+
let file_len = file_data.len() as u64;
65+
let mut dec = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
66+
dec.push_range(0..file_len, file_data.clone()).unwrap();
67+
match dec.try_decode().unwrap() {
68+
DecodeResult::Data(m) => Arc::new(m),
69+
other => panic!("expected metadata, got {other:?}"),
70+
}
71+
}
72+
73+
/// Push the entire file as one buffer, then build all row group readers.
74+
fn build_readers_single_buffer(
75+
file_data: &Bytes,
76+
metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
77+
) {
78+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
79+
.unwrap()
80+
.build()
81+
.unwrap();
82+
83+
decoder
84+
.push_range(0..file_data.len() as u64, file_data.clone())
85+
.unwrap();
86+
87+
loop {
88+
match decoder.try_next_reader().unwrap() {
89+
DecodeResult::Data(reader) => {
90+
black_box(reader);
91+
}
92+
DecodeResult::Finished => break,
93+
DecodeResult::NeedsData(r) => panic!("unexpected NeedsData: {r:?}"),
94+
}
95+
}
96+
}
97+
98+
/// Push one buffer per requested range, then build all row group readers.
99+
fn build_readers_exact_ranges(
100+
file_data: &Bytes,
101+
metadata: &Arc<parquet::file::metadata::ParquetMetaData>,
102+
) {
103+
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata.clone())
104+
.unwrap()
105+
.build()
106+
.unwrap();
107+
108+
loop {
109+
match decoder.try_next_reader().unwrap() {
110+
DecodeResult::Data(reader) => {
111+
black_box(reader);
112+
}
113+
DecodeResult::Finished => break,
114+
DecodeResult::NeedsData(ranges) => {
115+
let buffers: Vec<Bytes> = ranges
116+
.iter()
117+
.map(|r| file_data.slice(r.start as usize..r.end as usize))
118+
.collect();
119+
decoder.push_ranges(ranges, buffers).unwrap();
120+
}
121+
}
122+
}
123+
}
124+
125+
fn bench_1buf(c: &mut Criterion) {
126+
let mut group = c.benchmark_group("push_decoder/1buf");
127+
128+
for num_cols in [100, 1_000, 10_000, 50_000] {
129+
let file_data = make_test_file(num_cols);
130+
let metadata = decode_metadata(&file_data);
131+
let num_ranges: usize = metadata
132+
.row_groups()
133+
.iter()
134+
.map(|rg| rg.columns().len())
135+
.sum();
136+
137+
group.bench_with_input(
138+
BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
139+
&(&file_data, &metadata),
140+
|b, &(data, meta)| b.iter(|| build_readers_single_buffer(data, meta)),
141+
);
142+
}
143+
144+
group.finish();
145+
}
146+
147+
fn bench_nbuf(c: &mut Criterion) {
148+
let mut group = c.benchmark_group("push_decoder/Nbuf");
149+
150+
for num_cols in [100, 1_000, 10_000] {
151+
let file_data = make_test_file(num_cols);
152+
let metadata = decode_metadata(&file_data);
153+
let num_ranges: usize = metadata
154+
.row_groups()
155+
.iter()
156+
.map(|rg| rg.columns().len())
157+
.sum();
158+
159+
group.bench_with_input(
160+
BenchmarkId::from_parameter(format!("{num_ranges}ranges")),
161+
&(&file_data, &metadata),
162+
|b, &(data, meta)| b.iter(|| build_readers_exact_ranges(data, meta)),
163+
);
164+
}
165+
166+
group.finish();
167+
}
168+
169+
criterion_group!(benches, bench_1buf, bench_nbuf);
170+
criterion_main!(benches);

0 commit comments

Comments
 (0)