Skip to content

Commit 0c17b73

Browse files
authored
Merge branch 'main' into alamb/test_bin_op5
2 parents d697357 + 9213ffd commit 0c17b73

File tree

8 files changed

+201
-138
lines changed

8 files changed

+201
-138
lines changed

arrow-row/src/lib.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,24 +1644,22 @@ fn encode_column(
16441644
}
16451645
}
16461646
DataType::Binary => {
1647-
variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1647+
variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
16481648
}
16491649
DataType::BinaryView => {
16501650
variable::encode(data, offsets, column.as_binary_view().iter(), opts)
16511651
}
16521652
DataType::LargeBinary => {
1653-
variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1653+
variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
16541654
}
1655-
DataType::Utf8 => variable::encode(
1655+
DataType::Utf8 => variable::encode_generic_byte_array(
16561656
data, offsets,
1657-
column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1657+
column.as_string::<i32>(),
16581658
opts,
16591659
),
1660-
DataType::LargeUtf8 => variable::encode(
1660+
DataType::LargeUtf8 => variable::encode_generic_byte_array(
16611661
data, offsets,
1662-
column.as_string::<i64>()
1663-
.iter()
1664-
.map(|x| x.map(|x| x.as_bytes())),
1662+
column.as_string::<i64>(),
16651663
opts,
16661664
),
16671665
DataType::Utf8View => variable::encode(

arrow-row/src/run.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ pub unsafe fn decode<R: RunEndIndexType>(
134134
run_ends.push(R::Native::usize_as(idx));
135135
}
136136
unique_row_indices.push(decoded_values.len());
137-
decoded_values.push(decoded_data.clone());
137+
let capacity = decoded_data.capacity();
138+
decoded_values.push(std::mem::replace(
139+
&mut decoded_data,
140+
Vec::with_capacity(capacity),
141+
));
138142
}
139143
}
140144
// Add the final run end

arrow-row/src/variable.rs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
use crate::null_sentinel;
1919
use arrow_array::builder::BufferBuilder;
20+
use arrow_array::types::ByteArrayType;
2021
use arrow_array::*;
21-
use arrow_buffer::MutableBuffer;
2222
use arrow_buffer::bit_util::ceil;
23+
use arrow_buffer::{ArrowNativeType, MutableBuffer};
2324
use arrow_data::{ArrayDataBuilder, MAX_INLINE_VIEW_LEN};
2425
use arrow_schema::{DataType, SortOptions};
2526
use builder::make_view;
@@ -84,6 +85,48 @@ pub fn encode<'a, I: Iterator<Item = Option<&'a [u8]>>>(
8485
}
8586
}
8687

88+
/// Calls [`encode`] with optimized iterator for generic byte arrays
89+
pub(crate) fn encode_generic_byte_array<T: ByteArrayType>(
90+
data: &mut [u8],
91+
offsets: &mut [usize],
92+
input_array: &GenericByteArray<T>,
93+
opts: SortOptions,
94+
) {
95+
let input_offsets = input_array.value_offsets();
96+
let bytes = input_array.values().as_slice();
97+
98+
if let Some(null_buffer) = input_array.nulls().filter(|x| x.null_count() > 0) {
99+
let input_iter =
100+
input_offsets
101+
.windows(2)
102+
.zip(null_buffer.iter())
103+
.map(|(start_end, is_valid)| {
104+
if is_valid {
105+
let item_range = start_end[0].as_usize()..start_end[1].as_usize();
106+
// SAFETY: the offsets of the input are valid by construction
107+
// so it is ok to use unsafe here
108+
let item = unsafe { bytes.get_unchecked(item_range) };
109+
Some(item)
110+
} else {
111+
None
112+
}
113+
});
114+
115+
encode(data, offsets, input_iter, opts);
116+
} else {
117+
// Skip null checks
118+
let input_iter = input_offsets.windows(2).map(|start_end| {
119+
let item_range = start_end[0].as_usize()..start_end[1].as_usize();
120+
// SAFETY: the offsets of the input are valid by construction
121+
// so it is ok to use unsafe here
122+
let item = unsafe { bytes.get_unchecked(item_range) };
123+
Some(item)
124+
});
125+
126+
encode(data, offsets, input_iter, opts);
127+
}
128+
}
129+
87130
pub fn encode_null(out: &mut [u8], opts: SortOptions) -> usize {
88131
out[0] = null_sentinel(opts);
89132
1
@@ -97,6 +140,7 @@ pub fn encode_empty(out: &mut [u8], opts: SortOptions) -> usize {
97140
1
98141
}
99142

143+
#[inline]
100144
pub fn encode_one(out: &mut [u8], val: Option<&[u8]>, opts: SortOptions) -> usize {
101145
match val {
102146
None => encode_null(out, opts),

arrow/src/util/data_gen.rs

Lines changed: 71 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -66,110 +66,72 @@ pub fn create_random_batch(
6666
pub fn create_random_array(
6767
field: &Field,
6868
size: usize,
69-
null_density: f32,
69+
mut null_density: f32,
7070
true_density: f32,
7171
) -> Result<ArrayRef> {
72-
// Override null density with 0.0 if the array is non-nullable
73-
// and a primitive type in case a nested field is nullable
74-
let primitive_null_density = match field.is_nullable() {
75-
true => null_density,
76-
false => 0.0,
77-
};
72+
// Override nullability in case of not nested and not dictionary
73+
// For nested we don't want to override as we want to keep the nullability for the children
74+
// For dictionary it handle the nullability internally
75+
if !field.data_type().is_nested() && !matches!(field.data_type(), Dictionary(_, _)) {
76+
// Override null density with 0.0 if the array is non-nullable
77+
null_density = match field.is_nullable() {
78+
true => null_density,
79+
false => 0.0,
80+
};
81+
}
82+
7883
use DataType::*;
79-
Ok(match field.data_type() {
84+
let array = match field.data_type() {
8085
Null => Arc::new(NullArray::new(size)) as ArrayRef,
81-
Boolean => Arc::new(create_boolean_array(
82-
size,
83-
primitive_null_density,
84-
true_density,
85-
)),
86-
Int8 => Arc::new(create_primitive_array::<Int8Type>(
87-
size,
88-
primitive_null_density,
89-
)),
90-
Int16 => Arc::new(create_primitive_array::<Int16Type>(
91-
size,
92-
primitive_null_density,
93-
)),
94-
Int32 => Arc::new(create_primitive_array::<Int32Type>(
95-
size,
96-
primitive_null_density,
97-
)),
98-
Int64 => Arc::new(create_primitive_array::<Int64Type>(
99-
size,
100-
primitive_null_density,
101-
)),
102-
UInt8 => Arc::new(create_primitive_array::<UInt8Type>(
103-
size,
104-
primitive_null_density,
105-
)),
106-
UInt16 => Arc::new(create_primitive_array::<UInt16Type>(
107-
size,
108-
primitive_null_density,
109-
)),
110-
UInt32 => Arc::new(create_primitive_array::<UInt32Type>(
111-
size,
112-
primitive_null_density,
113-
)),
114-
UInt64 => Arc::new(create_primitive_array::<UInt64Type>(
115-
size,
116-
primitive_null_density,
117-
)),
86+
Boolean => Arc::new(create_boolean_array(size, null_density, true_density)),
87+
Int8 => Arc::new(create_primitive_array::<Int8Type>(size, null_density)),
88+
Int16 => Arc::new(create_primitive_array::<Int16Type>(size, null_density)),
89+
Int32 => Arc::new(create_primitive_array::<Int32Type>(size, null_density)),
90+
Int64 => Arc::new(create_primitive_array::<Int64Type>(size, null_density)),
91+
UInt8 => Arc::new(create_primitive_array::<UInt8Type>(size, null_density)),
92+
UInt16 => Arc::new(create_primitive_array::<UInt16Type>(size, null_density)),
93+
UInt32 => Arc::new(create_primitive_array::<UInt32Type>(size, null_density)),
94+
UInt64 => Arc::new(create_primitive_array::<UInt64Type>(size, null_density)),
11895
Float16 => {
11996
return Err(ArrowError::NotYetImplemented(
12097
"Float16 is not implemented".to_string(),
12198
));
12299
}
123-
Float32 => Arc::new(create_primitive_array::<Float32Type>(
124-
size,
125-
primitive_null_density,
126-
)),
127-
Float64 => Arc::new(create_primitive_array::<Float64Type>(
128-
size,
129-
primitive_null_density,
130-
)),
100+
Float32 => Arc::new(create_primitive_array::<Float32Type>(size, null_density)),
101+
Float64 => Arc::new(create_primitive_array::<Float64Type>(size, null_density)),
131102
Timestamp(unit, tz) => match unit {
132103
TimeUnit::Second => Arc::new(
133-
create_random_temporal_array::<TimestampSecondType>(size, primitive_null_density)
104+
create_random_temporal_array::<TimestampSecondType>(size, null_density)
134105
.with_timezone_opt(tz.clone()),
135-
),
106+
) as ArrayRef,
136107
TimeUnit::Millisecond => Arc::new(
137-
create_random_temporal_array::<TimestampMillisecondType>(
138-
size,
139-
primitive_null_density,
140-
)
141-
.with_timezone_opt(tz.clone()),
108+
create_random_temporal_array::<TimestampMillisecondType>(size, null_density)
109+
.with_timezone_opt(tz.clone()),
142110
),
143111
TimeUnit::Microsecond => Arc::new(
144-
create_random_temporal_array::<TimestampMicrosecondType>(
145-
size,
146-
primitive_null_density,
147-
)
148-
.with_timezone_opt(tz.clone()),
112+
create_random_temporal_array::<TimestampMicrosecondType>(size, null_density)
113+
.with_timezone_opt(tz.clone()),
149114
),
150115
TimeUnit::Nanosecond => Arc::new(
151-
create_random_temporal_array::<TimestampNanosecondType>(
152-
size,
153-
primitive_null_density,
154-
)
155-
.with_timezone_opt(tz.clone()),
116+
create_random_temporal_array::<TimestampNanosecondType>(size, null_density)
117+
.with_timezone_opt(tz.clone()),
156118
),
157119
},
158120
Date32 => Arc::new(create_random_temporal_array::<Date32Type>(
159121
size,
160-
primitive_null_density,
122+
null_density,
161123
)),
162124
Date64 => Arc::new(create_random_temporal_array::<Date64Type>(
163125
size,
164-
primitive_null_density,
126+
null_density,
165127
)),
166128
Time32(unit) => match unit {
167129
TimeUnit::Second => Arc::new(create_random_temporal_array::<Time32SecondType>(
168130
size,
169-
primitive_null_density,
131+
null_density,
170132
)) as ArrayRef,
171133
TimeUnit::Millisecond => Arc::new(
172-
create_random_temporal_array::<Time32MillisecondType>(size, primitive_null_density),
134+
create_random_temporal_array::<Time32MillisecondType>(size, null_density),
173135
),
174136
_ => {
175137
return Err(ArrowError::InvalidArgumentError(format!(
@@ -179,36 +141,31 @@ pub fn create_random_array(
179141
},
180142
Time64(unit) => match unit {
181143
TimeUnit::Microsecond => Arc::new(
182-
create_random_temporal_array::<Time64MicrosecondType>(size, primitive_null_density),
144+
create_random_temporal_array::<Time64MicrosecondType>(size, null_density),
183145
) as ArrayRef,
184146
TimeUnit::Nanosecond => Arc::new(create_random_temporal_array::<Time64NanosecondType>(
185147
size,
186-
primitive_null_density,
148+
null_density,
187149
)),
188150
_ => {
189151
return Err(ArrowError::InvalidArgumentError(format!(
190152
"Unsupported unit {unit:?} for Time64"
191153
)));
192154
}
193155
},
194-
Utf8 => Arc::new(create_string_array::<i32>(size, primitive_null_density)),
195-
LargeUtf8 => Arc::new(create_string_array::<i64>(size, primitive_null_density)),
156+
Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
157+
LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
196158
Utf8View => Arc::new(create_string_view_array_with_len(
197159
size,
198-
primitive_null_density,
160+
null_density,
199161
4,
200162
false,
201163
)),
202-
Binary => Arc::new(create_binary_array::<i32>(size, primitive_null_density)),
203-
LargeBinary => Arc::new(create_binary_array::<i64>(size, primitive_null_density)),
204-
FixedSizeBinary(len) => Arc::new(create_fsb_array(
205-
size,
206-
primitive_null_density,
207-
*len as usize,
208-
)),
164+
Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
165+
LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
166+
FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
209167
BinaryView => Arc::new(
210-
create_string_view_array_with_len(size, primitive_null_density, 4, false)
211-
.to_binary_view(),
168+
create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
212169
),
213170
List(_) => create_random_list_array(field, size, null_density, true_density)?,
214171
LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
@@ -230,7 +187,13 @@ pub fn create_random_array(
230187
"Generating random arrays not yet implemented for {other:?}"
231188
)));
232189
}
233-
})
190+
};
191+
192+
if !field.is_nullable() {
193+
assert_eq!(array.null_count(), 0);
194+
}
195+
196+
Ok(array)
234197
}
235198

236199
#[inline]
@@ -812,4 +775,23 @@ mod tests {
812775
assert_eq!(array.len(), size);
813776
}
814777
}
778+
779+
#[test]
780+
fn create_non_nullable_decimal_array_with_null_density() {
781+
let size = 10;
782+
let fields = vec![
783+
Field::new("a", DataType::Decimal128(10, -2), false),
784+
Field::new("b", DataType::Decimal256(10, -2), false),
785+
];
786+
let schema = Schema::new(fields);
787+
let schema_ref = Arc::new(schema);
788+
let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
789+
790+
assert_eq!(batch.schema(), schema_ref);
791+
assert_eq!(batch.num_columns(), schema_ref.fields().len());
792+
for array in batch.columns() {
793+
assert_eq!(array.len(), size);
794+
assert_eq!(array.null_count(), 0);
795+
}
796+
}
815797
}

0 commit comments

Comments
 (0)