From b086c3ef51c9573b08c29f1b3eb928666d76d837 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Wed, 22 Oct 2025 17:08:50 +0200 Subject: [PATCH 1/2] Add FilterPredicate::filter_record_batch --- arrow-select/src/filter.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index dace2bab728f..2eb0f5ddcee7 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -173,20 +173,14 @@ pub fn filter_record_batch( predicate: &BooleanArray, ) -> Result { let mut filter_builder = FilterBuilder::new(predicate); - if record_batch.num_columns() > 1 { + if record_batch.num_columns() > 1 || multiple_arrays(record_batch.schema().fields()[0].data_type()) { // Only optimize if filtering more than one column // Otherwise, the overhead of optimization can be more than the benefit filter_builder = filter_builder.optimize(); } let filter = filter_builder.build(); - let filtered_arrays = record_batch - .columns() - .iter() - .map(|a| filter_array(a, &filter)) - .collect::, _>>()?; - let options = RecordBatchOptions::default().with_row_count(Some(filter.count())); - RecordBatch::try_new_with_options(record_batch.schema(), filtered_arrays, &options) + filter.filter_record_batch(record_batch) } /// A builder to construct [`FilterPredicate`] @@ -300,6 +294,22 @@ impl FilterPredicate { filter_array(values, self) } + /// Returns a filtered [`RecordBatch`] containing only the rows that are selected by this + /// [`FilterPredicate`]. + /// + /// This is the equivalent of calling [filter] on each column of the [`RecordBatch`]. + pub fn filter_record_batch(&self, record_batch: &RecordBatch) -> Result { + let filtered_arrays = record_batch + .columns() + .iter() + .map(|a| filter_array(a, self)) + .collect::, _>>()?; + + // SAFETY: we know that the set of filtered arrays will match the schema of the original + // record batch + unsafe { Ok(RecordBatch::new_unchecked(record_batch.schema(), filtered_arrays, self.count)) } + } + /// Number of rows being selected based on this [`FilterPredicate`] pub fn count(&self) -> usize { self.count From c28e65e1970b0228f3d982f3657412fb78f81591 Mon Sep 17 00:00:00 2001 From: Pepijn Van Eeckhoudt Date: Wed, 22 Oct 2025 17:08:50 +0200 Subject: [PATCH 2/2] Add FilterPredicate::filter_record_batch --- arrow-select/src/filter.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index 2eb0f5ddcee7..fbbc1929107d 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -173,7 +173,10 @@ pub fn filter_record_batch( predicate: &BooleanArray, ) -> Result { let mut filter_builder = FilterBuilder::new(predicate); - if record_batch.num_columns() > 1 || multiple_arrays(record_batch.schema().fields()[0].data_type()) { + let num_cols = record_batch.num_columns(); + if num_cols > 1 + || (num_cols > 0 && multiple_arrays(record_batch.schema_ref().field(0).data_type())) + { // Only optimize if filtering more than one column // Otherwise, the overhead of optimization can be more than the benefit filter_builder = filter_builder.optimize(); @@ -298,7 +301,10 @@ impl FilterPredicate { /// [`FilterPredicate`]. /// /// This is the equivalent of calling [filter] on each column of the [`RecordBatch`]. - pub fn filter_record_batch(&self, record_batch: &RecordBatch) -> Result { + pub fn filter_record_batch( + &self, + record_batch: &RecordBatch, + ) -> Result { let filtered_arrays = record_batch .columns() .iter() @@ -307,7 +313,13 @@ impl FilterPredicate { // SAFETY: we know that the set of filtered arrays will match the schema of the original // record batch - unsafe { Ok(RecordBatch::new_unchecked(record_batch.schema(), filtered_arrays, self.count)) } + unsafe { + Ok(RecordBatch::new_unchecked( + record_batch.schema(), + filtered_arrays, + self.count, + )) + } } /// Number of rows being selected based on this [`FilterPredicate`]