Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 112 additions & 70 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

//! Functionality used both on logical and physical plans

#[cfg(not(feature = "force_hash_collisions"))]
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::types::{IntervalDayTime, IntervalMonthDayNano};
use arrow::array::*;
Expand Down Expand Up @@ -215,12 +212,11 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
// Hash each dictionary value once, and then use that computed
// hash for each key value to avoid a potentially expensive
// redundant hashing for large dictionary elements (e.g. strings)
let dict_values = Arc::clone(array.values());
let dict_values = array.values();
let mut dict_hashes = vec![0; dict_values.len()];
create_hashes(&[dict_values], random_state, &mut dict_hashes)?;
create_hashes_from_arrays(&[dict_values.as_ref()], random_state, &mut dict_hashes)?;

// combine hash for each index in values
let dict_values = array.values();
for (hash, key) in hashes_buffer.iter_mut().zip(array.keys().iter()) {
if let Some(key) = key {
let idx = key.as_usize();
Expand Down Expand Up @@ -308,11 +304,11 @@ fn hash_list_array<OffsetSize>(
where
OffsetSize: OffsetSizeTrait,
{
let values = Arc::clone(array.values());
let values = array.values();
let offsets = array.value_offsets();
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for (i, (start, stop)) in offsets.iter().zip(offsets.iter().skip(1)).enumerate() {
if nulls.is_valid(i) {
Expand All @@ -339,11 +335,11 @@ fn hash_fixed_list_array(
random_state: &RandomState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let values = Arc::clone(array.values());
let values = array.values();
let value_length = array.value_length() as usize;
let nulls = array.nulls();
let mut values_hashes = vec![0u64; values.len()];
create_hashes(&[values], random_state, &mut values_hashes)?;
create_hashes_from_arrays(&[values.as_ref()], random_state, &mut values_hashes)?;
if let Some(nulls) = nulls {
for i in 0..array.len() {
if nulls.is_valid(i) {
Expand All @@ -366,83 +362,113 @@ fn hash_fixed_list_array(
Ok(())
}

/// Test version of `create_hashes` that produces the same value for
/// all hashes (to test collisions)
///
/// See comments on `hashes_buffer` for more details
/// Internal helper function that hashes a single array and either initializes or combines
/// the hash values in the buffer.
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
array.data_type()
);
}
}
Ok(())
}

/// Test version of `hash_single_array` that forces all hashes to collide to zero.
#[cfg(feature = "force_hash_collisions")]
pub fn create_hashes<'a>(
_arrays: &[ArrayRef],
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
for hash in hashes_buffer.iter_mut() {
*hash = 0
}
Ok(hashes_buffer)
Ok(())
}

/// Creates hash values for every row, based on the values in the
/// columns.
/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately
#[cfg(not(feature = "force_hash_collisions"))]
///
/// This is the same as [`create_hashes`] but accepts `&dyn Array`s instead of requiring
/// `ArrayRef`s.
pub fn create_hashes_from_arrays<'a>(
arrays: &[&dyn Array],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, &array) in arrays.iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
hash_single_array(array, random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}

/// Creates hash values for every row, based on the values in the columns.
///
/// The number of rows to hash is determined by `hashes_buffer.len()`.
/// `hashes_buffer` should be pre-sized appropriately.
///
/// This is the same as [`create_hashes_from_arrays`] but accepts `ArrayRef`s.
pub fn create_hashes<'a>(
arrays: &[ArrayRef],
random_state: &RandomState,
hashes_buffer: &'a mut Vec<u64>,
) -> Result<&'a mut Vec<u64>> {
for (i, col) in arrays.iter().enumerate() {
let array = col.as_ref();
for (i, array) in arrays.iter().enumerate() {
// combine hashes with `combine_hashes` for all columns besides the first
let rehash = i >= 1;
downcast_primitive_array! {
array => hash_array_primitive(array, random_state, hashes_buffer, rehash),
DataType::Null => hash_null(random_state, hashes_buffer, rehash),
DataType::Boolean => hash_array(as_boolean_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8 => hash_array(as_string_array(array)?, random_state, hashes_buffer, rehash),
DataType::Utf8View => hash_array(as_string_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeUtf8 => hash_array(as_largestring_array(array), random_state, hashes_buffer, rehash),
DataType::Binary => hash_array(as_generic_binary_array::<i32>(array)?, random_state, hashes_buffer, rehash),
DataType::BinaryView => hash_array(as_binary_view_array(array)?, random_state, hashes_buffer, rehash),
DataType::LargeBinary => hash_array(as_generic_binary_array::<i64>(array)?, random_state, hashes_buffer, rehash),
DataType::FixedSizeBinary(_) => {
let array: &FixedSizeBinaryArray = array.as_any().downcast_ref().unwrap();
hash_array(array, random_state, hashes_buffer, rehash)
}
DataType::Dictionary(_, _) => downcast_dictionary_array! {
array => hash_dictionary(array, random_state, hashes_buffer, rehash)?,
_ => unreachable!()
}
DataType::Struct(_) => {
let array = as_struct_array(array)?;
hash_struct_array(array, random_state, hashes_buffer)?;
}
DataType::List(_) => {
let array = as_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::LargeList(_) => {
let array = as_large_list_array(array)?;
hash_list_array(array, random_state, hashes_buffer)?;
}
DataType::Map(_, _) => {
let array = as_map_array(array)?;
hash_map_array(array, random_state, hashes_buffer)?;
}
DataType::FixedSizeList(_,_) => {
let array = as_fixed_size_list_array(array)?;
hash_fixed_list_array(array, random_state, hashes_buffer)?;
}
_ => {
// This is internal because we should have caught this before.
return _internal_err!(
"Unsupported data type in hasher: {}",
col.data_type()
);
}
}
hash_single_array(array.as_ref(), random_state, hashes_buffer, rehash)?;
}
Ok(hashes_buffer)
}
Expand Down Expand Up @@ -896,4 +922,20 @@ mod tests {

assert_ne!(one_col_hashes, two_col_hashes);
}

#[test]
fn test_create_hashes_from_arrays() {
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let float_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; int_array.len()];
let hashes = create_hashes_from_arrays(
&[int_array.as_ref(), float_array.as_ref()],
&random_state,
hashes_buff,
)
.unwrap();
assert_eq!(hashes.len(), 4,);
}
}
6 changes: 3 additions & 3 deletions datafusion/common/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::cast::{
};
use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_err};
use crate::format::DEFAULT_CAST_OPTIONS;
use crate::hash_utils::create_hashes;
use crate::hash_utils::create_hashes_from_arrays;
use crate::utils::SingleRowListArrayBuilder;
use crate::{_internal_datafusion_err, arrow_datafusion_err};
use arrow::array::{
Expand Down Expand Up @@ -878,10 +878,10 @@ impl Hash for ScalarValue {

fn hash_nested_array<H: Hasher>(arr: ArrayRef, state: &mut H) {
let len = arr.len();
let arrays = vec![arr];
let hashes_buffer = &mut vec![0; len];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let hashes = create_hashes(&arrays, &random_state, hashes_buffer).unwrap();
let hashes =
create_hashes_from_arrays(&[arr.as_ref()], &random_state, hashes_buffer).unwrap();
// Hash back to std::hash::Hasher
hashes.hash(state);
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr-common/src/binary_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::array::{
};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
use std::any::type_name;
use std::fmt::Debug;
Expand Down Expand Up @@ -349,7 +349,7 @@ where
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(values.len(), 0);
create_hashes(&[Arc::clone(values)], &self.random_state, batch_hashes)
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
// hash is supported for all types and create_hashes only
// returns errors for unsupported types
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-expr-common/src/binary_view_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ahash::RandomState;
use arrow::array::cast::AsArray;
use arrow::array::{Array, ArrayBuilder, ArrayRef, GenericByteViewBuilder};
use arrow::datatypes::{BinaryViewType, ByteViewType, DataType, StringViewType};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -243,7 +243,7 @@ where
let batch_hashes = &mut self.hashes_buffer;
batch_hashes.clear();
batch_hashes.resize(values.len(), 0);
create_hashes(&[Arc::clone(values)], &self.random_state, batch_hashes)
create_hashes_from_arrays(&[values.as_ref()], &self.random_state, batch_hashes)
// hash is supported for all types and create_hashes only
// returns errors for unsupported types
.unwrap();
Expand Down
18 changes: 9 additions & 9 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ mod tests {
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::Schema;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::hash_utils::create_hashes_from_arrays;
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
use datafusion_common::{
assert_batches_eq, assert_batches_sorted_eq, assert_contains, exec_err,
Expand Down Expand Up @@ -3454,8 +3454,8 @@ mod tests {

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes(
&[Arc::clone(&left.columns()[0])],
let hashes = create_hashes_from_arrays(
&[left.columns()[0].as_ref()],
&random_state,
hashes_buff,
)?;
Expand Down Expand Up @@ -3487,8 +3487,8 @@ mod tests {
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(
&[Arc::clone(&right_keys_values)],
create_hashes_from_arrays(
&[right_keys_values.as_ref()],
&random_state,
&mut hashes_buffer,
)?;
Expand Down Expand Up @@ -3525,8 +3525,8 @@ mod tests {

let random_state = RandomState::with_seeds(0, 0, 0, 0);
let hashes_buff = &mut vec![0; left.num_rows()];
let hashes = create_hashes(
&[Arc::clone(&left.columns()[0])],
let hashes = create_hashes_from_arrays(
&[left.columns()[0].as_ref()],
&random_state,
hashes_buff,
)?;
Expand All @@ -3552,8 +3552,8 @@ mod tests {
let right_keys_values =
key_column.evaluate(&right)?.into_array(right.num_rows())?;
let mut hashes_buffer = vec![0; right.num_rows()];
create_hashes(
&[Arc::clone(&right_keys_values)],
create_hashes_from_arrays(
&[right_keys_values.as_ref()],
&random_state,
&mut hashes_buffer,
)?;
Expand Down