[Parquet] perf: preallocate capacity for ArrayReaderBuilder#9093
[Parquet] perf: preallocate capacity for ArrayReaderBuilder#9093lyang24 wants to merge 5 commits intoapache:mainfrom
Conversation
132b247 to
e2b2b8f
Compare
|
from experiment: Pre-allocation overhead may offset the savings from avoiding incremental growth turning this into a draft |
|
Thank you @lyang24 -- I will look at this more carefully shortly |
|
I suspect we will not be able to detect the difference in an end to end test given how small of an overhead the allocation is compared to running the rest of the query |
@lyang24 -- I am not sure that the Maybe we could pass in the batch size to the reader... Edit: one way we could find out would be to put a println to print out the number of rows that was reserved 🤔 |
|
Update: I made a small test program (below) and printed out the capacities diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs
index 0343047da6..d87b494b46 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -35,6 +35,7 @@ pub struct ViewBuffer {
impl ViewBuffer {
/// Create a new ViewBuffer with capacity for the specified number of views
pub fn with_capacity(capacity: usize) -> Self {
+ println!("Creating ViewBuffer with capacity {}", capacity);
Self {
views: Vec::with_capacity(capacity),
buffers: Vec::new(),Here is what they are: I think those are the number of rows in the dictionary (not the view themselves) I also then printed out the actual capacites diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs
index 8e690c574d..3ea6f08a29 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -259,6 +259,8 @@ impl ByteViewArrayDecoder {
len: usize,
dict: Option<&ViewBuffer>,
) -> Result<usize> {
+ println!("ByteViewArrayDecoder::read called with len {}, current views capacity: {}", len, out.views.capacity());
+
match self {
ByteViewArrayDecoder::Plain(d) => d.read(out, len),
ByteViewArrayDecoder::Dictionary(d) => {You can actually see most of the reads have an empty buffer I tracked it down in a debugger and the default buffer is being created here: Whole Test Progarm
use std::fs::File;
use std::io::{BufReader, Read};
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, FieldRef, Schema};
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use bytes::Bytes;
use parquet::file::metadata::ParquetMetaDataReader;
fn main() {
let file_name = "/Users/andrewlamb/Downloads/hits/hits_0.parquet";
println!("Opening file: {file_name}", );
let mut file = File::open(file_name).unwrap();
let mut bytes = Vec::new();
file.read_to_end(&mut bytes).unwrap();
let bytes = Bytes::from(bytes);
let schema = string_to_view_types(ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap().schema());
//println!("Schema: {:?}", schema);
let options = ArrowReaderOptions::new()
.with_schema(schema);
let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, options).unwrap()
.with_batch_size(8192)
.build().unwrap();
for batch in reader {
let batch = batch.unwrap();
println!("Read batch with {} rows and {} columns", batch.num_rows(), batch.num_columns());
}
println!("Done");
}
// Hack because the clickbench files were written with the wrong logical type for strings
fn string_to_view_types(schema: &Arc<Schema>) -> Arc<Schema> {
let fields: Vec<FieldRef> = schema
.fields()
.iter()
.map(|field| {
let existing_type = field.data_type();
if existing_type == &DataType::Utf8 || existing_type == &DataType::Binary {
Arc::new(Field::new(
field.name(),
DataType::Utf8View,
field.is_nullable(),
))
} else {
Arc::clone(field)
}
})
.collect();
Arc::new(Schema::new(fields))
} |
|
So, TLDR is my analyis is that we aren't properly sizing the allocations. The good news is we can fix this. The bad news is it may be tricky. I will update the original ticket too |
57a2d80 to
224798a
Compare
Thanks for the deep dive, i think passing down a capacity hint from ArrayReaderBuilder is the right way to go. I made a impl attempt - and results looks promising.
with predicate pushdown the sql runs so fast (7ms) the difference become hard to tell |
224798a to
72f35ad
Compare
72f35ad to
a4f04e7
Compare
|
Nice -- checking it out |
|
run benchmark arrow_reader arrow_reader_row_filter arrow_reader_clickbench |
|
🤖 |
alamb
left a comment
There was a problem hiding this comment.
Thanks @lyang24 -- this looks very nice. I kicked off some automated benchmarks and hopefully we can see the benefits reflected there (though the benchmarks are sometimes pretty noisy)
If this does work out, I would like to spend a bit more time trying to get this mechanism to work by removing Default from ValuesBuffer to ensure we got all the cases and that any future array readers work the same
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark arrow_reader arrow_reader_row_filter arrow_reader_clickbench |
|
🤖 |
|
Rerunning the benchmarks to see if we can see a consistent pattern |
|
implemented @Dandandan 's suggestions over the issue and subset of bench on mac
|
48ed3bb to
d126a19
Compare
|
the optimization is “reserve the values buffer up front,” but the fixed-length path can’t actually use that reservation at the point where the generic code applies it. so fixed-length arrays gets the overhead of this change but no benefits. |
|
ran a full scale array reader benchmark on ubuntu i think its a net gain overall, and regression on FixedLenByteArray are confirmed. |
This comment has been minimized.
This comment has been minimized.
| pub fn new( | ||
| row_groups: &'a dyn RowGroups, | ||
| metrics: &'a ArrowReaderMetrics, | ||
| batch_size: usize, |
There was a problem hiding this comment.
This is a public API and thus this change is a breaking API change.
Maybe we could avoid changing the API via a new with method instead
something like
let reader = ArrayReaderBuilder::new(row_groups, metrics)
.with_batch_size(batch_size)🤔
There was a problem hiding this comment.
agreed will change it later this week
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
|
run benchmark arrow_reader |
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing pre_allocate_view_vec (d126a19) to 6471e9a (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing pre_allocate_view_vec (d126a19) to 6471e9a (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
run benchmark arrow_reader_clickbench |
|
🤖 Arrow criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing pre_allocate_view_vec (d126a19) to 6471e9a (merge-base) diff File an issue against this benchmark runner |
|
🤖 Arrow criterion benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagebase (merge-base)
branch
File an issue against this benchmark runner |
|
i have been working on this PR in the background. I am trying to ensure it doesn't over-allocate on all paths and I am currently arguing with codex about a few cases. The performance looks promising I think |
- Optimize OffsetBuffer dict decode by inlining extend + offset push - Use raw ptr writes in ByteViewArray dictionary decoder for better perf - Remove unused append_raw_view_unchecked from ViewBuffer Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
d126a19 to
3d7ba2d
Compare
| fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> { | ||
| // Update capacity hint to the largest batch size seen | ||
| if batch_size > self.capacity_hint { | ||
| self.capacity_hint = batch_size; | ||
| } | ||
|
|
||
| // Lazily initialize buffer on first read | ||
| let capacity_hint = self.capacity_hint; |
There was a problem hiding this comment.
It would be nice to avoid the allocation on a zero sized read (end of stream)
| fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> { | |
| // Update capacity hint to the largest batch size seen | |
| if batch_size > self.capacity_hint { | |
| self.capacity_hint = batch_size; | |
| } | |
| // Lazily initialize buffer on first read | |
| let capacity_hint = self.capacity_hint; | |
| fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> { | |
| if batch_size == 0 { | |
| return Ok(0); | |
| } | |
| // Update capacity hint to the largest batch size seen | |
| if batch_size > self.capacity_hint { | |
| self.capacity_hint = batch_size; | |
| } |
| // The decoder will pre-allocate when it knows both capacity and byte_length | ||
| Self { | ||
| buffer: Vec::new(), | ||
| byte_length: None, | ||
| } | ||
| } | ||
|
|
||
| fn pad_nulls( |
There was a problem hiding this comment.
You could potentially improve the allocation in the fixed size buffer too by deferring the allocation until we know the value_capaoity
fn with_capacity(capacity: usize) -> Self {
// `byte_length` is not known initially, so preserve the value-count
// hint so the first decode can allocate the exact byte capacity.
Self {
buffer: Vec::new(),
byte_length: None,
values_capacity: Some(capacity),
}
}There was a problem hiding this comment.
And the matching use-site change would be:
None => {
out.byte_length = Some(self.byte_length);
if out.buffer.is_empty()
&& let Some(values_capacity) = out.values_capacity.take()
{
// now that the byte length per output element is is known,
// allocate the actual needed space.
let byte_capacity = values_capacity.saturating_mul(self.byte_length);
out.buffer = Vec::with_capacity(byte_capacity);
}
}There was a problem hiding this comment.
We can do this as a follow on PR
|
The only API changes here are to the experimental feature (array-reader) but I think that is ok to merge |
alamb
left a comment
There was a problem hiding this comment.
TLDR is I think this is really nice -- thank you @lyang24
I went through it carefully and I think it really does a good job
I had two small suggestions, which I think we could also do as a follow on PR.
Please let me know your preference
Thank you for your patience. This is great
Which issue does this PR close?
Rationale for this change
reduce allocation cost mentioned in #9059 from experiment: Pre-allocation overhead may offset the savings from avoiding incremental growth
What changes are included in this PR?
add with_capacity method to ValuesBuffer trait, and remove defaults to enforce the capacity hint is required for ArrayReaderBuilder.
The capacity hint will be passed down to GenericRecordReader to preallocate the buffer.
Are there any user-facing changes?
yes ArrayReaders needs an extra capacity variable to indicate the preferred batch size and we will provision buffer with this capacity.