From 647bdbfd78200f1229b37a52213e6d8abe899523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:10:10 +0100 Subject: [PATCH 01/10] Avoid overallocating arrays --- arrow-select/src/coalesce.rs | 9 +++++++++ arrow-select/src/coalesce/byte_view.rs | 27 +++++++++++++------------- arrow-select/src/coalesce/primitive.rs | 18 ++++++++--------- 3 files changed, 31 insertions(+), 23 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index ddb1c41c8c79..2dd676f2598c 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -470,6 +470,10 @@ impl BatchCoalescer { // If pushing this batch would exceed the target batch size, // finish the current batch and start a new one let mut offset = 0; + for in_progress in self.in_progress_arrays.iter_mut() { + in_progress.ensure_capacity(); + } + while num_rows > (self.target_batch_size - self.buffered_rows) { let remaining_rows = self.target_batch_size - self.buffered_rows; debug_assert!(remaining_rows > 0); @@ -607,6 +611,11 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { /// Finish the currently in-progress array and return it as an `ArrayRef` fn finish(&mut self) -> Result; + + /// Ensure the in-progress array has enough capacity to hold more rows + fn ensure_capacity(&mut self) { + // Default implementation does nothing + } } #[cfg(test)] diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 6d3bcc8ae04c..f3d94770fd79 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -37,7 +37,7 @@ use std::sync::Arc; /// [`BinaryViewArray`]: arrow_array::BinaryViewArray pub(crate) struct InProgressByteViewArray { /// The source array and information - source: Option, + source: Option>, /// the target batch size (and thus size for views allocation) batch_size: usize, /// The in progress views @@ -55,9 +55,9 @@ pub(crate) struct InProgressByteViewArray { _phantom: PhantomData, } -struct Source { +struct Source { /// The array to copy form - array: ArrayRef, + array: GenericByteViewArray, /// Should the strings from the source array be copied into new buffers? need_gc: bool, /// How many bytes were actually used in the source array's buffers? @@ -93,14 +93,6 @@ impl InProgressByteViewArray { } } - /// Allocate space for output views and nulls if needed - /// - /// This is done on write (when we know it is necessary) rather than - /// eagerly to avoid allocations that are not used. - fn ensure_capacity(&mut self) { - self.views.reserve(self.batch_size); - } - /// Finishes in progress buffer, if any fn finish_current(&mut self) { let Some(next_buffer) = self.current.take() else { @@ -296,15 +288,22 @@ impl InProgressArray for InProgressByteViewArray { }; Source { - array, + array: s.clone(), need_gc, ideal_buffer_size, } }) } + /// Allocate space for output views and nulls if needed + /// + /// This is done on write (when we know it is necessary) rather than + /// eagerly to avoid allocations that are not used. + fn ensure_capacity(&mut self) { + self.views.reserve(self.batch_size - self.views.len()); + } + fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { - self.ensure_capacity(); let source = self.source.take().ok_or_else(|| { ArrowError::InvalidArgumentError( "Internal Error: InProgressByteViewArray: source not set".to_string(), @@ -312,7 +311,7 @@ impl InProgressArray for InProgressByteViewArray { })?; // If creating StringViewArray output, ensure input was valid utf8 too - let s = source.array.as_byte_view::(); + let s = &source.array; // add any nulls, as necessary if let Some(nulls) = s.nulls().as_ref() { diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index 85b653357b54..bb86602b617a 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -50,13 +50,7 @@ impl InProgressPrimitiveArray { } } - /// Allocate space for output values if necessary. - /// - /// This is done on write (when we know it is necessary) rather than - /// eagerly to avoid allocations that are not used. - fn ensure_capacity(&mut self) { - self.current.reserve(self.batch_size); - } + } impl InProgressArray for InProgressPrimitiveArray { @@ -64,9 +58,15 @@ impl InProgressArray for InProgressPrimitiveArray self.source = source; } - fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { - self.ensure_capacity(); + /// Allocate space for output values if necessary. + /// + /// This is done on write (when we know it is necessary) rather than + /// eagerly to avoid allocations that are not used. + fn ensure_capacity(&mut self) { + self.current.reserve(self.batch_size - self.current.len()); + } + fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { let s = self .source .as_ref() From 046f52d60e8929be3b2bd3f8dc2c54b1000fb8b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:15:14 +0100 Subject: [PATCH 02/10] Avoid overallocating arrays --- arrow-select/src/coalesce/byte_view.rs | 11 +++++------ arrow-select/src/coalesce/primitive.rs | 2 -- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index f3d94770fd79..731a36a2ab74 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -37,7 +37,7 @@ use std::sync::Arc; /// [`BinaryViewArray`]: arrow_array::BinaryViewArray pub(crate) struct InProgressByteViewArray { /// The source array and information - source: Option>, + source: Option, /// the target batch size (and thus size for views allocation) batch_size: usize, /// The in progress views @@ -55,9 +55,9 @@ pub(crate) struct InProgressByteViewArray { _phantom: PhantomData, } -struct Source { +struct Source { /// The array to copy form - array: GenericByteViewArray, + array: ArrayRef, /// Should the strings from the source array be copied into new buffers? need_gc: bool, /// How many bytes were actually used in the source array's buffers? @@ -288,7 +288,7 @@ impl InProgressArray for InProgressByteViewArray { }; Source { - array: s.clone(), + array, need_gc, ideal_buffer_size, } @@ -311,8 +311,7 @@ impl InProgressArray for InProgressByteViewArray { })?; // If creating StringViewArray output, ensure input was valid utf8 too - let s = &source.array; - + let s = source.array.as_byte_view::(); // add any nulls, as necessary if let Some(nulls) = s.nulls().as_ref() { let nulls = nulls.slice(offset, len); diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index bb86602b617a..84075d08d25b 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -49,8 +49,6 @@ impl InProgressPrimitiveArray { current: vec![], } } - - } impl InProgressArray for InProgressPrimitiveArray { From 01cc8833a167e9450dec11a10a61e4080986b522 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:21:04 +0100 Subject: [PATCH 03/10] Avoid overallocating arrays --- arrow-select/src/coalesce.rs | 8 -------- arrow-select/src/coalesce/byte_view.rs | 17 +++++++++-------- arrow-select/src/coalesce/primitive.rs | 13 +++++++------ 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 2dd676f2598c..e711685a68e3 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -470,9 +470,6 @@ impl BatchCoalescer { // If pushing this batch would exceed the target batch size, // finish the current batch and start a new one let mut offset = 0; - for in_progress in self.in_progress_arrays.iter_mut() { - in_progress.ensure_capacity(); - } while num_rows > (self.target_batch_size - self.buffered_rows) { let remaining_rows = self.target_batch_size - self.buffered_rows; @@ -611,11 +608,6 @@ trait InProgressArray: std::fmt::Debug + Send + Sync { /// Finish the currently in-progress array and return it as an `ArrayRef` fn finish(&mut self) -> Result; - - /// Ensure the in-progress array has enough capacity to hold more rows - fn ensure_capacity(&mut self) { - // Default implementation does nothing - } } #[cfg(test)] diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 731a36a2ab74..6efcb62b0697 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -93,6 +93,14 @@ impl InProgressByteViewArray { } } + /// Allocate space for output views and nulls if needed + /// + /// This is done on write (when we know it is necessary) rather than + /// eagerly to avoid allocations that are not used. + fn ensure_capacity(&mut self) { + self.views.reserve(self.batch_size - self.views.len()); + } + /// Finishes in progress buffer, if any fn finish_current(&mut self) { let Some(next_buffer) = self.current.take() else { @@ -295,15 +303,8 @@ impl InProgressArray for InProgressByteViewArray { }) } - /// Allocate space for output views and nulls if needed - /// - /// This is done on write (when we know it is necessary) rather than - /// eagerly to avoid allocations that are not used. - fn ensure_capacity(&mut self) { - self.views.reserve(self.batch_size - self.views.len()); - } - fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { + self.ensure_capacity(); let source = self.source.take().ok_or_else(|| { ArrowError::InvalidArgumentError( "Internal Error: InProgressByteViewArray: source not set".to_string(), diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index 84075d08d25b..e20826725e9e 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -49,12 +49,6 @@ impl InProgressPrimitiveArray { current: vec![], } } -} - -impl InProgressArray for InProgressPrimitiveArray { - fn set_source(&mut self, source: Option) { - self.source = source; - } /// Allocate space for output values if necessary. /// @@ -63,8 +57,15 @@ impl InProgressArray for InProgressPrimitiveArray fn ensure_capacity(&mut self) { self.current.reserve(self.batch_size - self.current.len()); } +} + +impl InProgressArray for InProgressPrimitiveArray { + fn set_source(&mut self, source: Option) { + self.source = source; + } fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { + self.ensure_capacity(); let s = self .source .as_ref() From e4615e08d25df6a9175b2df0f8a78c3a082a1ab7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:22:27 +0100 Subject: [PATCH 04/10] Avoid overallocating arrays --- arrow-select/src/coalesce.rs | 1 - arrow-select/src/coalesce/byte_view.rs | 1 + arrow-select/src/coalesce/primitive.rs | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index e711685a68e3..ddb1c41c8c79 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -470,7 +470,6 @@ impl BatchCoalescer { // If pushing this batch would exceed the target batch size, // finish the current batch and start a new one let mut offset = 0; - while num_rows > (self.target_batch_size - self.buffered_rows) { let remaining_rows = self.target_batch_size - self.buffered_rows; debug_assert!(remaining_rows > 0); diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 6efcb62b0697..edc496e086a0 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -305,6 +305,7 @@ impl InProgressArray for InProgressByteViewArray { fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { self.ensure_capacity(); + let source = self.source.take().ok_or_else(|| { ArrowError::InvalidArgumentError( "Internal Error: InProgressByteViewArray: source not set".to_string(), diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index e20826725e9e..c36303c0aa3f 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -66,6 +66,7 @@ impl InProgressArray for InProgressPrimitiveArray fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { self.ensure_capacity(); + let s = self .source .as_ref() From 831321eac199e4ec62fa0eb335d7390a2c573714 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:23:21 +0100 Subject: [PATCH 05/10] Avoid overallocating arrays --- arrow-select/src/coalesce/byte_view.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index edc496e086a0..0bfa244f410b 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -314,6 +314,7 @@ impl InProgressArray for InProgressByteViewArray { // If creating StringViewArray output, ensure input was valid utf8 too let s = source.array.as_byte_view::(); + // add any nulls, as necessary if let Some(nulls) = s.nulls().as_ref() { let nulls = nulls.slice(offset, len); From 974232e46f20c77b791cbafcd2919917710d6aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 08:24:24 +0100 Subject: [PATCH 06/10] Avoid overallocating arrays --- arrow-select/src/coalesce/byte_view.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 0bfa244f410b..47196f1fa891 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -305,7 +305,6 @@ impl InProgressArray for InProgressByteViewArray { fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError> { self.ensure_capacity(); - let source = self.source.take().ok_or_else(|| { ArrowError::InvalidArgumentError( "Internal Error: InProgressByteViewArray: source not set".to_string(), From 80b6c2bd620ab0377781135b76b7005f1deea33f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 11:22:20 +0100 Subject: [PATCH 07/10] Simplify --- arrow-select/src/coalesce/byte_view.rs | 4 +++- arrow-select/src/coalesce/primitive.rs | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 47196f1fa891..6edd82c0cea2 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -98,7 +98,9 @@ impl InProgressByteViewArray { /// This is done on write (when we know it is necessary) rather than /// eagerly to avoid allocations that are not used. fn ensure_capacity(&mut self) { - self.views.reserve(self.batch_size - self.views.len()); + if self.views.capacity() == 0 { + self.views = Vec::with_capacity(self.batch_size); + } } /// Finishes in progress buffer, if any diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index c36303c0aa3f..9a30a6e042c6 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -55,7 +55,9 @@ impl InProgressPrimitiveArray { /// This is done on write (when we know it is necessary) rather than /// eagerly to avoid allocations that are not used. fn ensure_capacity(&mut self) { - self.current.reserve(self.batch_size - self.current.len()); + if self.current.capacity() == 0 { + self.current = Vec::with_capacity(self.batch_size); + } } } From 0f3a8abac423105083ff203225df1e7a9168d3ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 11:28:00 +0100 Subject: [PATCH 08/10] Add test --- arrow-select/src/coalesce.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index ddb1c41c8c79..6bee7bc7dd5c 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -615,6 +615,7 @@ mod tests { use crate::concat::concat_batches; use arrow_array::builder::StringViewBuilder; use arrow_array::cast::AsArray; + use arrow_array::types::Int32Type; use arrow_array::{ BinaryViewArray, Int32Array, Int64Array, RecordBatchOptions, StringArray, StringViewArray, TimestampNanosecondArray, UInt32Array, UInt64Array, @@ -1625,6 +1626,8 @@ mod tests { // Now should have a completed batch (100 rows total) assert!(coalescer.has_completed_batch()); let output_batch = coalescer.next_completed_batch().unwrap(); + let size = output_batch.column(0).as_primitive::().get_buffer_memory_size(); + assert_eq!(size, 400); // 100 rows * 4 bytes each assert_eq!(output_batch.num_rows(), 100); assert_eq!(coalescer.get_buffered_rows(), 0); From 77e5e88d08faacd668edca23c07318a8a78e28bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 13:42:06 +0100 Subject: [PATCH 09/10] Fmt --- arrow-select/src/coalesce.rs | 5 ++++- arrow-select/src/coalesce/byte_view.rs | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/arrow-select/src/coalesce.rs b/arrow-select/src/coalesce.rs index 6bee7bc7dd5c..8cf1f024c3ff 100644 --- a/arrow-select/src/coalesce.rs +++ b/arrow-select/src/coalesce.rs @@ -1626,7 +1626,10 @@ mod tests { // Now should have a completed batch (100 rows total) assert!(coalescer.has_completed_batch()); let output_batch = coalescer.next_completed_batch().unwrap(); - let size = output_batch.column(0).as_primitive::().get_buffer_memory_size(); + let size = output_batch + .column(0) + .as_primitive::() + .get_buffer_memory_size(); assert_eq!(size, 400); // 100 rows * 4 bytes each assert_eq!(output_batch.num_rows(), 100); diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index 6edd82c0cea2..f187617e5b0c 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -98,9 +98,10 @@ impl InProgressByteViewArray { /// This is done on write (when we know it is necessary) rather than /// eagerly to avoid allocations that are not used. fn ensure_capacity(&mut self) { - if self.views.capacity() == 0 { - self.views = Vec::with_capacity(self.batch_size); - } + // if self.views.capacity() == 0 { + // self.views = Vec::with_capacity(self.batch_size); + // } + self.views.reserve(self.batch_size); } /// Finishes in progress buffer, if any From 36149ffffaf39cbe712e06916f6eeffbe629ab8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dani=C3=ABl=20Heres?= Date: Sat, 10 Jan 2026 13:45:03 +0100 Subject: [PATCH 10/10] Feedback --- arrow-select/src/coalesce/byte_view.rs | 8 ++++---- arrow-select/src/coalesce/primitive.rs | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/arrow-select/src/coalesce/byte_view.rs b/arrow-select/src/coalesce/byte_view.rs index f187617e5b0c..bca811fff1c6 100644 --- a/arrow-select/src/coalesce/byte_view.rs +++ b/arrow-select/src/coalesce/byte_view.rs @@ -98,10 +98,10 @@ impl InProgressByteViewArray { /// This is done on write (when we know it is necessary) rather than /// eagerly to avoid allocations that are not used. fn ensure_capacity(&mut self) { - // if self.views.capacity() == 0 { - // self.views = Vec::with_capacity(self.batch_size); - // } - self.views.reserve(self.batch_size); + if self.views.capacity() == 0 { + self.views.reserve(self.batch_size); + } + debug_assert_eq!(self.views.capacity(), self.batch_size); } /// Finishes in progress buffer, if any diff --git a/arrow-select/src/coalesce/primitive.rs b/arrow-select/src/coalesce/primitive.rs index 9a30a6e042c6..69dad221bd52 100644 --- a/arrow-select/src/coalesce/primitive.rs +++ b/arrow-select/src/coalesce/primitive.rs @@ -56,8 +56,9 @@ impl InProgressPrimitiveArray { /// eagerly to avoid allocations that are not used. fn ensure_capacity(&mut self) { if self.current.capacity() == 0 { - self.current = Vec::with_capacity(self.batch_size); + self.current.reserve(self.batch_size); } + debug_assert_eq!(self.current.capacity(), self.batch_size); } }