diff --git a/.cursor/rules.md b/.cursor/rules.md new file mode 100644 index 0000000000000..00385e6f5f759 --- /dev/null +++ b/.cursor/rules.md @@ -0,0 +1,5 @@ +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! + diff --git a/AGENTS.md b/AGENTS.md index 9dff7f6f1ffd1..74b77ec968fe1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -39,3 +39,8 @@ When creating a PR, you MUST follow the [PR template](.github/pull_request_templ See the [Testing Quick Start](docs/source/contributor-guide/testing.md#testing-quick-start) for the recommended pre-PR test commands. +The AI agents should never link to any issue or a pull request +in any GitHub repository in the code reviews! + +The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md! + diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index a91dd3115d879..5134b699b6a0c 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -30,6 +30,12 @@ use std::sync::Arc; const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; +#[derive(Clone, Copy)] +struct PendingByteViewCopy { + dest_index: usize, + source: ByteView, +} + /// An implementation of [`GroupColumn`] for binary view and utf8 view types. /// /// Stores a collection of binary view or utf8 view group values in a buffer @@ -159,16 +165,12 @@ impl ByteViewGroupValueBuilder { match all_null_or_non_null { Nulls::Some => { - for &row in rows { - self.append_val_inner(array, row); - } + self.vectorized_append_views_with_nulls(arr, rows); } Nulls::None => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); - } + self.vectorized_append_non_null_views(arr, rows); } Nulls::All => { @@ -179,6 +181,134 @@ impl ByteViewGroupValueBuilder { } } + fn vectorized_append_non_null_views( + &mut self, + array: &GenericByteViewArray, + rows: &[usize], + ) { + let source_views = array.views(); + self.views.reserve(rows.len()); + + if array.data_buffers().is_empty() { + self.views.extend(rows.iter().map(|&row| source_views[row])); + return; + } + + let start_idx = self.views.len(); + self.views.extend(rows.iter().map(|&row| source_views[row])); + + let mut pending = Vec::with_capacity(rows.len()); + for (idx, &row) in rows.iter().enumerate() { + let view = source_views[row]; + if (view as u32) > 12 { + pending.push(PendingByteViewCopy { + dest_index: start_idx + idx, + source: ByteView::from(view), + }); + } + } + + self.batch_copy_long_views(array.data_buffers(), &pending); + } + + fn vectorized_append_views_with_nulls( + &mut self, + array: &GenericByteViewArray, + rows: &[usize], + ) { + let source_views = array.views(); + let mut pending = Vec::with_capacity(rows.len()); + self.views.reserve(rows.len()); + + for &row in rows { + if array.is_null(row) { + self.nulls.append(true); + self.views.push(0); + continue; + } + + self.nulls.append(false); + + let view = source_views[row]; + let dest_index = self.views.len(); + self.views.push(view); + + if (view as u32) > 12 { + pending.push(PendingByteViewCopy { + dest_index, + source: ByteView::from(view), + }); + } + } + + self.batch_copy_long_views(array.data_buffers(), &pending); + } + + fn batch_copy_long_views( + &mut self, + source_buffers: &[Buffer], + pending: &[PendingByteViewCopy], + ) { + let mut batch_start = 0; + while batch_start < pending.len() { + let first = pending[batch_start].source; + let first_len = first.length as usize; + + if self.in_progress.len() + first_len > self.max_block_size + && !self.in_progress.is_empty() + { + self.flush_in_progress(); + } + + let max_batch_len = if self.in_progress.is_empty() { + self.max_block_size.max(first_len) + } else { + self.max_block_size - self.in_progress.len() + }; + + let source_buffer_index = first.buffer_index as usize; + let batch_source_start = first.offset as usize; + let mut batch_source_end = batch_source_start + first_len; + let mut batch_end = batch_start + 1; + + while batch_end < pending.len() { + let next = pending[batch_end].source; + let next_start = next.offset as usize; + let next_end = next_start + next.length as usize; + + if next.buffer_index as usize != source_buffer_index + || next_start != batch_source_end + || next_end - batch_source_start > max_batch_len + { + break; + } + + batch_source_end = next_end; + batch_end += 1; + } + + let buffer_index = self.completed.len(); + let dest_batch_start = self.in_progress.len(); + let batch_len = batch_source_end - batch_source_start; + self.in_progress.reserve(batch_len); + + let source_buffer = &source_buffers[source_buffer_index]; + self.in_progress.extend_from_slice( + &source_buffer.as_slice()[batch_source_start..batch_source_end], + ); + + for pending_copy in &pending[batch_start..batch_end] { + let mut view = pending_copy.source; + view.buffer_index = buffer_index as u32; + view.offset = (dest_batch_start + pending_copy.source.offset as usize + - batch_source_start) as u32; + self.views[pending_copy.dest_index] = view.as_u128(); + } + + batch_start = batch_end; + } + } + fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) where B: ByteViewType, @@ -584,7 +714,7 @@ mod tests { use std::sync::Arc; use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; - use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray}; + use arrow::array::{ArrayRef, AsArray, ByteView, NullBufferBuilder, StringViewArray}; use arrow::datatypes::StringViewType; use super::GroupColumn; @@ -724,6 +854,169 @@ mod tests { assert!(equal_to_results[4]); } + #[test] + fn test_byte_view_vectorized_append_subset_and_repeated_rows() { + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(256); + + let input_array = Arc::new(StringViewArray::from(vec![ + Some("this string is definitely long 0"), + Some("tiny"), + Some("this string is definitely long 1"), + None, + Some("this string is definitely long 2"), + Some("bar"), + Some("this string is definitely long 3"), + ])) as ArrayRef; + + let rows = [0, 1, 4, 3, 2, 4, 6]; + builder.vectorized_append(&input_array, &rows).unwrap(); + + let output = Box::new(builder).build(); + let expected = Arc::new(StringViewArray::from(vec![ + Some("this string is definitely long 0"), + Some("tiny"), + Some("this string is definitely long 2"), + None, + Some("this string is definitely long 1"), + Some("this string is definitely long 2"), + Some("this string is definitely long 3"), + ])) as ArrayRef; + + assert_eq!(&output, &expected); + } + + #[test] + fn test_byte_view_take_n_after_vectorized_append() { + let long0 = "a".repeat(24); + let long1 = "b".repeat(24); + let long2 = "c".repeat(24); + let long3 = "d".repeat(24); + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(50); + let input_array = Arc::new(StringViewArray::from(vec![ + None, + Some("foo"), + Some(long0.as_str()), + Some(long1.as_str()), + None, + Some(long2.as_str()), + Some(long3.as_str()), + Some("bar"), + ])) as ArrayRef; + let rows = (0..input_array.len()).collect::>(); + + builder.vectorized_append(&input_array, &rows).unwrap(); + + assert_eq!(builder.completed.len(), 1); + assert_eq!(builder.in_progress.len(), long2.len() + long3.len()); + + let taken_array = builder.take_n(4); + assert_eq!(&taken_array, &input_array.slice(0, 4)); + + let taken_array = builder.take_n(4); + assert_eq!(&taken_array, &input_array.slice(4, 4)); + + assert!(builder.completed.is_empty()); + assert!(builder.in_progress.is_empty()); + assert!(builder.views.is_empty()); + } + + #[test] + fn test_byte_view_vectorized_append_multiple_long_batches() { + let long0 = "a".repeat(24); + let long1 = "b".repeat(24); + let long2 = "c".repeat(24); + let long3 = "d".repeat(24); + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(50); + + let first_batch = Arc::new(StringViewArray::from(vec![ + Some(long0.as_str()), + Some(long1.as_str()), + ])) as ArrayRef; + builder.vectorized_append(&first_batch, &[0, 1]).unwrap(); + + assert!(builder.completed.is_empty()); + assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0); + + let second_batch = Arc::new(StringViewArray::from(vec![ + Some(long2.as_str()), + Some(long3.as_str()), + ])) as ArrayRef; + builder.vectorized_append(&second_batch, &[0, 1]).unwrap(); + + assert_eq!(builder.completed.len(), 1); + assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1); + assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1); + + let output = Box::new(builder).build(); + let expected = Arc::new(StringViewArray::from(vec![ + Some(long0.as_str()), + Some(long1.as_str()), + Some(long2.as_str()), + Some(long3.as_str()), + ])) as ArrayRef; + assert_eq!(&output, &expected); + } + + #[test] + fn test_byte_view_vectorized_append_flushes_mid_batch() { + let long0 = "a".repeat(24); + let long1 = "b".repeat(24); + let long2 = "c".repeat(24); + let long3 = "d".repeat(24); + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(50); + let input_array = Arc::new(StringViewArray::from(vec![ + Some(long0.as_str()), + Some(long1.as_str()), + Some(long2.as_str()), + Some(long3.as_str()), + ])) as ArrayRef; + + builder + .vectorized_append(&input_array, &[0, 1, 2, 3]) + .unwrap(); + + assert_eq!(builder.completed.len(), 1); + assert_eq!(builder.completed[0].len(), long0.len() + long1.len()); + assert_eq!(builder.in_progress.len(), long2.len() + long3.len()); + assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1); + assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1); + + let output = Box::new(builder).build(); + assert_eq!(&output, &input_array); + } + + #[test] + fn test_byte_view_vectorized_append_single_oversized_value() { + let oversized = "z".repeat(32); + + let mut builder = + ByteViewGroupValueBuilder::::new().with_max_block_size(16); + let input_array = + Arc::new(StringViewArray::from(vec![Some(oversized.as_str())])) as ArrayRef; + + builder.vectorized_append(&input_array, &[0]).unwrap(); + + assert!(builder.completed.is_empty()); + assert_eq!(builder.in_progress.len(), oversized.len()); + assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0); + assert_eq!(ByteView::from(builder.views[0]).offset, 0); + + let output = Box::new(builder).build(); + assert_eq!(&output, &input_array); + } + fn test_byte_view_equal_to_internal(mut append: A, mut equal_to: E) where A: FnMut(&mut ByteViewGroupValueBuilder, &ArrayRef, &[usize]),