Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .cursor/rules.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The AI agents should never link to any issue or a pull request
in any GitHub repository in the code reviews!

The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md!

5 changes: 5 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,8 @@ When creating a PR, you MUST follow the [PR template](.github/pull_request_templ

See the [Testing Quick Start](docs/source/contributor-guide/testing.md#testing-quick-start)
for the recommended pre-PR test commands.
The AI agents should never link to any issue or a pull request
in any GitHub repository in the code reviews!

The AI agents should not review AI agents' config files like CLAUDE.md or AGENTS.md!

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ use std::sync::Arc;

const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024;

#[derive(Clone, Copy)]
struct PendingByteViewCopy {
dest_index: usize,
source: ByteView,
}

/// An implementation of [`GroupColumn`] for binary view and utf8 view types.
///
/// Stores a collection of binary view or utf8 view group values in a buffer
Expand Down Expand Up @@ -159,16 +165,12 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

match all_null_or_non_null {
Nulls::Some => {
for &row in rows {
self.append_val_inner(array, row);
}
self.vectorized_append_views_with_nulls(arr, rows);
}

Nulls::None => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
}
self.vectorized_append_non_null_views(arr, rows);
}

Nulls::All => {
Expand All @@ -179,6 +181,134 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
}
}

fn vectorized_append_non_null_views(
&mut self,
array: &GenericByteViewArray<B>,
rows: &[usize],
) {
let source_views = array.views();
self.views.reserve(rows.len());

if array.data_buffers().is_empty() {
self.views.extend(rows.iter().map(|&row| source_views[row]));
return;
}

let start_idx = self.views.len();
self.views.extend(rows.iter().map(|&row| source_views[row]));

let mut pending = Vec::with_capacity(rows.len());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The pending vector is allocated and its capacity is reserved to rows.len() in every call to vectorized_append_non_null_views. For large batches where most strings are short (<= 12 bytes), this results in significant over-allocation of memory that is only partially used. Consider using a smaller initial capacity or a reusable buffer to reduce allocation overhead.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! Most probably many of the rows won't be long (longer than 12 bytes), so the pending Vec won't use all of the pre-allocated slots. It would be better to allocate less entries, e.g. half of the rows' length, and let it resize on append if needed.

for (idx, &row) in rows.iter().enumerate() {
let view = source_views[row];
if (view as u32) > 12 {
pending.push(PendingByteViewCopy {
dest_index: start_idx + idx,
source: ByteView::from(view),
});
}
}

self.batch_copy_long_views(array.data_buffers(), &pending);
}

fn vectorized_append_views_with_nulls(
&mut self,
array: &GenericByteViewArray<B>,
rows: &[usize],
) {
let source_views = array.views();
let mut pending = Vec::with_capacity(rows.len());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the non-null case, the pending vector here is allocated with rows.len() capacity. This can be wasteful if the number of 'long' views is small relative to the total number of rows. Reusing a buffer or using a more conservative initial capacity would be more efficient.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! Most probably many of the rows won't be long (longer than 12 bytes), so the pending Vec won't use all of the pre-allocated slots. It would be better to allocate less entries, e.g. half of the rows' length, and let it resize on append if needed.

self.views.reserve(rows.len());

for &row in rows {
if array.is_null(row) {
self.nulls.append(true);
self.views.push(0);
continue;
}

self.nulls.append(false);

let view = source_views[row];
let dest_index = self.views.len();
self.views.push(view);

if (view as u32) > 12 {
pending.push(PendingByteViewCopy {
dest_index,
source: ByteView::from(view),
});
}
}

self.batch_copy_long_views(array.data_buffers(), &pending);
}

fn batch_copy_long_views(
&mut self,
source_buffers: &[Buffer],
pending: &[PendingByteViewCopy],
) {
let mut batch_start = 0;
while batch_start < pending.len() {
let first = pending[batch_start].source;
let first_len = first.length as usize;

if self.in_progress.len() + first_len > self.max_block_size
&& !self.in_progress.is_empty()
{
self.flush_in_progress();
}

let max_batch_len = if self.in_progress.is_empty() {
self.max_block_size.max(first_len)
} else {
self.max_block_size - self.in_progress.len()
};

let source_buffer_index = first.buffer_index as usize;
let batch_source_start = first.offset as usize;
let mut batch_source_end = batch_source_start + first_len;
let mut batch_end = batch_start + 1;

while batch_end < pending.len() {
let next = pending[batch_end].source;
let next_start = next.offset as usize;
let next_end = next_start + next.length as usize;

if next.buffer_index as usize != source_buffer_index
|| next_start != batch_source_end
|| next_end - batch_source_start > max_batch_len
{
break;
}

batch_source_end = next_end;
batch_end += 1;
}

let buffer_index = self.completed.len();
let dest_batch_start = self.in_progress.len();
let batch_len = batch_source_end - batch_source_start;
self.in_progress.reserve(batch_len);

let source_buffer = &source_buffers[source_buffer_index];
self.in_progress.extend_from_slice(
&source_buffer.as_slice()[batch_source_start..batch_source_end],
);

for pending_copy in &pending[batch_start..batch_end] {
let mut view = pending_copy.source;
view.buffer_index = buffer_index as u32;
view.offset = (dest_batch_start + pending_copy.source.offset as usize
- batch_source_start) as u32;
self.views[pending_copy.dest_index] = view.as_u128();
}

batch_start = batch_end;
}
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
where
B: ByteViewType,
Expand Down Expand Up @@ -584,7 +714,7 @@ mod tests {
use std::sync::Arc;

use crate::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use arrow::array::{ArrayRef, AsArray, NullBufferBuilder, StringViewArray};
use arrow::array::{ArrayRef, AsArray, ByteView, NullBufferBuilder, StringViewArray};
use arrow::datatypes::StringViewType;

use super::GroupColumn;
Expand Down Expand Up @@ -724,6 +854,169 @@ mod tests {
assert!(equal_to_results[4]);
}

#[test]
fn test_byte_view_vectorized_append_subset_and_repeated_rows() {
let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(256);

let input_array = Arc::new(StringViewArray::from(vec![
Some("this string is definitely long 0"),
Some("tiny"),
Some("this string is definitely long 1"),
None,
Some("this string is definitely long 2"),
Some("bar"),
Some("this string is definitely long 3"),
])) as ArrayRef;

let rows = [0, 1, 4, 3, 2, 4, 6];
builder.vectorized_append(&input_array, &rows).unwrap();

let output = Box::new(builder).build();
let expected = Arc::new(StringViewArray::from(vec![
Some("this string is definitely long 0"),
Some("tiny"),
Some("this string is definitely long 2"),
None,
Some("this string is definitely long 1"),
Some("this string is definitely long 2"),
Some("this string is definitely long 3"),
])) as ArrayRef;

assert_eq!(&output, &expected);
}

#[test]
fn test_byte_view_take_n_after_vectorized_append() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);
let input_array = Arc::new(StringViewArray::from(vec![
None,
Some("foo"),
Some(long0.as_str()),
Some(long1.as_str()),
None,
Some(long2.as_str()),
Some(long3.as_str()),
Some("bar"),
])) as ArrayRef;
let rows = (0..input_array.len()).collect::<Vec<_>>();

builder.vectorized_append(&input_array, &rows).unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(builder.in_progress.len(), long2.len() + long3.len());

let taken_array = builder.take_n(4);
assert_eq!(&taken_array, &input_array.slice(0, 4));

let taken_array = builder.take_n(4);
assert_eq!(&taken_array, &input_array.slice(4, 4));

assert!(builder.completed.is_empty());
assert!(builder.in_progress.is_empty());
assert!(builder.views.is_empty());
}

#[test]
fn test_byte_view_vectorized_append_multiple_long_batches() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);

let first_batch = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
])) as ArrayRef;
builder.vectorized_append(&first_batch, &[0, 1]).unwrap();

assert!(builder.completed.is_empty());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);

let second_batch = Arc::new(StringViewArray::from(vec![
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;
builder.vectorized_append(&second_batch, &[0, 1]).unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1);
assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1);

let output = Box::new(builder).build();
let expected = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;
assert_eq!(&output, &expected);
}

#[test]
fn test_byte_view_vectorized_append_flushes_mid_batch() {
let long0 = "a".repeat(24);
let long1 = "b".repeat(24);
let long2 = "c".repeat(24);
let long3 = "d".repeat(24);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(50);
let input_array = Arc::new(StringViewArray::from(vec![
Some(long0.as_str()),
Some(long1.as_str()),
Some(long2.as_str()),
Some(long3.as_str()),
])) as ArrayRef;

builder
.vectorized_append(&input_array, &[0, 1, 2, 3])
.unwrap();

assert_eq!(builder.completed.len(), 1);
assert_eq!(builder.completed[0].len(), long0.len() + long1.len());
assert_eq!(builder.in_progress.len(), long2.len() + long3.len());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[1]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[2]).buffer_index, 1);
assert_eq!(ByteView::from(builder.views[3]).buffer_index, 1);

let output = Box::new(builder).build();
assert_eq!(&output, &input_array);
}

#[test]
fn test_byte_view_vectorized_append_single_oversized_value() {
let oversized = "z".repeat(32);

let mut builder =
ByteViewGroupValueBuilder::<StringViewType>::new().with_max_block_size(16);
let input_array =
Arc::new(StringViewArray::from(vec![Some(oversized.as_str())])) as ArrayRef;

builder.vectorized_append(&input_array, &[0]).unwrap();

assert!(builder.completed.is_empty());
assert_eq!(builder.in_progress.len(), oversized.len());
assert_eq!(ByteView::from(builder.views[0]).buffer_index, 0);
assert_eq!(ByteView::from(builder.views[0]).offset, 0);

let output = Box::new(builder).build();
assert_eq!(&output, &input_array);
}

fn test_byte_view_equal_to_internal<A, E>(mut append: A, mut equal_to: E)
where
A: FnMut(&mut ByteViewGroupValueBuilder<StringViewType>, &ArrayRef, &[usize]),
Expand Down
Loading