diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs new file mode 100644 index 0000000..2611ac7 --- /dev/null +++ b/crates/fluss/src/bucketing/mod.rs @@ -0,0 +1,266 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataLakeFormat; +use crate::util::murmur_hash; + +pub trait BucketingFunction: Sync + Send { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result; +} + +#[allow(dead_code)] +impl dyn BucketingFunction { + /// Provides the bucketing function for a given [DataLakeFormat] + /// + /// # Arguments + /// * `lake_format` - Data lake format or none + /// + /// # Returns + /// * BucketingFunction + pub fn of(lake_format: Option<&DataLakeFormat>) -> Box { + match lake_format { + None => Box::new(FlussBucketingFunction), + Some(DataLakeFormat::Paimon) => Box::new(PaimonBucketingFunction), + Some(DataLakeFormat::Lance) => Box::new(FlussBucketingFunction), + Some(DataLakeFormat::Iceberg) => Box::new(IcebergBucketingFunction), + } + } +} + +struct FlussBucketingFunction; +impl BucketingFunction for FlussBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { + if bucket_key.is_empty() { + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); + } + + if num_buckets <= 0 { + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + } + + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?; + + Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets) + } +} + +struct PaimonBucketingFunction; +impl BucketingFunction for PaimonBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { + if bucket_key.is_empty() { + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); + } + + if num_buckets <= 0 { + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + } + + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?; + + Ok((key_hash % num_buckets).abs()) + } +} + +struct IcebergBucketingFunction; +impl BucketingFunction for IcebergBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { + if bucket_key.is_empty() { + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); + } + + if num_buckets <= 0 { + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + }; + + Ok((murmur_hash::hash_bytes(bucket_key) as i32 & i32::MAX) % num_buckets) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_bucketing() { + let default_bucketing = ::of(None); + + let expected = 1; + let actual = default_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = default_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = default_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = default_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_paimon_bucketing() { + let paimon_bucketing = ::of(Some(&DataLakeFormat::Paimon)); + + let expected = 1; + let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 11; + let actual = paimon_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 12; + let actual = paimon_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = paimon_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_lance_bucketing() { + let lance_bucketing = ::of(Some(&DataLakeFormat::Lance)); + + let expected = 1; + let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = lance_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = lance_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = lance_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_iceberg_bucketing() { + let iceberg_bucketing = ::of(Some(&DataLakeFormat::Iceberg)); + + let expected = 3; + let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 4; + let actual = iceberg_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 12; + let actual = iceberg_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 3; + let actual = iceberg_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } +} diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index 991c5f9..44b2673 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::bucketing::BucketingFunction; use crate::cluster::Cluster; +use crate::error::Error::IllegalArgument; +use crate::error::Result; use crate::metadata::TablePath; use rand::Rng; use std::sync::atomic::{AtomicI32, Ordering}; @@ -25,7 +28,7 @@ pub trait BucketAssigner: Sync + Send { fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32); - fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32; + fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result; } #[derive(Debug)] @@ -91,12 +94,55 @@ impl BucketAssigner for StickyBucketAssigner { self.next_bucket(cluster, prev_bucket_id); } - fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32 { + fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result { let bucket_id = self.current_bucket_id.load(Ordering::Relaxed); if bucket_id < 0 { - self.next_bucket(cluster, bucket_id) + Ok(self.next_bucket(cluster, bucket_id)) } else { - bucket_id + Ok(bucket_id) } } } + +/// A [BucketAssigner] which assigns based on a modulo hashing function +pub struct HashBucketAssigner { + num_buckets: i32, + bucketing_function: Box, +} + +#[allow(dead_code)] +impl HashBucketAssigner { + /// Creates a new [HashBucketAssigner] based on the given [BucketingFunction]. + /// See [BucketingFunction.of(Option<&DataLakeFormat>)] for bucketing functions. + /// + /// + /// # Arguments + /// * `num_buckets` - The number of buckets + /// * `bucketing_function` - The bucketing function + /// + /// # Returns + /// * [HashBucketAssigner] - The hash bucket assigner + pub fn new(num_buckets: i32, bucketing_function: Box) -> Self { + HashBucketAssigner { + num_buckets, + bucketing_function, + } + } +} + +impl BucketAssigner for HashBucketAssigner { + fn abort_if_batch_full(&self) -> bool { + false + } + + fn on_new_batch(&self, _: &Cluster, _: i32) { + // do nothing + } + + fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> Result { + let key = bucket_key.ok_or_else(|| IllegalArgument { + message: "no bucket key provided".to_string(), + })?; + self.bucketing_function.bucketing(key, self.num_buckets) + } +} diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 042859a..22e0397 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -91,7 +91,7 @@ impl WriterClient { let table_path = &record.table_path; let cluster = self.metadata.get_cluster(); - let (bucket_assigner, bucket_id) = self.assign_bucket(table_path); + let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?; let mut result = self .accumulate @@ -101,7 +101,7 @@ impl WriterClient { if result.abort_record_for_new_batch { let prev_bucket_id = bucket_id; bucket_assigner.on_new_batch(&cluster, prev_bucket_id); - let bucket_id = bucket_assigner.assign_bucket(None, &cluster); + let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; result = self .accumulate .append(record, bucket_id, &cluster, false) @@ -114,7 +114,10 @@ impl WriterClient { Ok(result.result_handle.expect("result_handle should exist")) } - fn assign_bucket(&self, table_path: &Arc) -> (Arc>, i32) { + fn assign_bucket( + &self, + table_path: &Arc, + ) -> Result<(Arc>, i32)> { let cluster = self.metadata.get_cluster(); let bucket_assigner = { if let Some(assigner) = self.bucket_assigners.get(table_path) { @@ -126,8 +129,8 @@ impl WriterClient { assigner } }; - let bucket_id = bucket_assigner.assign_bucket(None, &cluster); - (bucket_assigner, bucket_id) + let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + Ok((bucket_assigner, bucket_id)) } pub async fn close(self) -> Result<()> { diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 25978ce..1bd72a4 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -26,6 +26,7 @@ mod cluster; pub mod config; pub mod error; +mod bucketing; mod compression; pub mod io; mod util; diff --git a/crates/fluss/src/metadata/data_lake_format.rs b/crates/fluss/src/metadata/data_lake_format.rs new file mode 100644 index 0000000..76a23f8 --- /dev/null +++ b/crates/fluss/src/metadata/data_lake_format.rs @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Identifies the logical format of a data lake table supported by Fluss. +/// +/// This enum is typically used in metadata and configuration to distinguish +/// between different table formats so that the appropriate integration and +/// semantics can be applied. +pub enum DataLakeFormat { + /// Apache Paimon data lake table format. + Paimon, + /// Lance columnar data format / lakehouse table format. + Lance, + /// Apache Iceberg data lake table format. + Iceberg, +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 8754007..9c0b1b4 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod data_lake_format; mod database; mod datatype; mod json_serde; mod table; +pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use json_serde::*; diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index d8c0db5..5f67290 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod murmur_hash; + use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; use std::collections::{HashMap, HashSet}; diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs new file mode 100644 index 0000000..12229c7 --- /dev/null +++ b/crates/fluss/src/util/murmur_hash.rs @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ +use crate::error::Error::IllegalArgument; +use crate::error::Result; + +pub const MURMUR3_DEFAULT_SEED: u32 = 0; +pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42; + +const C1: u32 = 0xCC9E_2D51; +const C2: u32 = 0x1B87_3593; +const R1: u32 = 15; +const R2: u32 = 13; +const M: u32 = 5; +const N: u32 = 0xE654_6B64; +const CHUNK_SIZE: usize = 4; + +/// Hashes the data using 32-bit Murmur3 hash with 0 as seed +/// +/// # Arguments +/// * `data` - byte array containing data to be hashed +/// +/// # Returns +/// Returns hash value +pub fn hash_bytes(data: &[u8]) -> u32 { + hash_bytes_with_seed(data, MURMUR3_DEFAULT_SEED) +} + +#[inline(always)] +fn hash_bytes_with_seed(data: &[u8], seed: u32) -> u32 { + let length = data.len(); + let chunks = length / CHUNK_SIZE; + let length_aligned = chunks * CHUNK_SIZE; + + let mut h1 = hash_full_chunks(data, seed); + let mut k1 = 0u32; + + for (shift, &b) in data[length_aligned..].iter().enumerate() { + k1 |= (b as u32) << (8 * shift); + } + + h1 ^= k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2); + + fmix(h1, length) +} + +/// Hashes the data using Fluss'/Flink's variant of 32-bit Murmur hash with 42 as seed and tail bytes mixed into hash byte-by-byte +/// Maximum data array size supported is 2GB +/// +/// # Arguments +/// * `data` - byte array containing data to be hashed +/// +/// # Returns +/// * result of hashing, `Ok(hash_value)` +/// +/// # Error +/// Returns `Err(IllegalArgument)` if byte array is larger than 2GB +pub fn fluss_hash_bytes(data: &[u8]) -> Result { + fluss_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED) +} +#[inline(always)] +fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> Result { + let length = data.len(); + + if length >= i32::MAX as usize { + return Err(IllegalArgument { + message: "data array size {length} is bigger than supported".to_string(), + }); + } + + let chunks = length / CHUNK_SIZE; + let length_aligned = chunks * CHUNK_SIZE; + + let mut h1 = hash_full_chunks(data, seed as u32); + + for byte in data.iter().take(length).skip(length_aligned) { + let k1 = mix_k1(*byte as u32); + h1 = mix_h1(h1, k1); + } + + Ok(fmix(h1, length) as i32) +} + +#[inline(always)] +fn hash_full_chunks(data: &[u8], seed: u32) -> u32 { + data.chunks_exact(CHUNK_SIZE).fold(seed, |h1, chunk| { + let block = u32::from_le_bytes(chunk.try_into().unwrap()); + let k1 = mix_k1(block); + mix_h1(h1, k1) + }) +} + +#[inline(always)] +fn mix_k1(k1: u32) -> u32 { + k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2) +} + +#[inline(always)] +fn mix_h1(h1: u32, k1: u32) -> u32 { + (h1 ^ k1).rotate_left(R2).wrapping_mul(M).wrapping_add(N) +} + +// Finalization mix - force all bits of a hash block to avalanche +#[inline(always)] +fn fmix(mut h1: u32, length: usize) -> u32 { + h1 ^= length as u32; + bit_mix(h1) +} + +/// Hashes an i32 using Fluss'/Flink's variant of Murmur +/// +/// # Arguments +/// * `input` - i32 value to be hashed +/// +/// # Returns +/// Returns hash value +pub fn fluss_hash_i32(input: i32) -> i32 { + let mut input = input as u32; + input = input.wrapping_mul(C1); + input = input.rotate_left(R1); + input = input.wrapping_mul(C2); + input = input.rotate_left(R2); + + input = input.wrapping_mul(M).wrapping_add(N); + input ^= CHUNK_SIZE as u32; + let output = bit_mix(input) as i32; + + if output >= 0 { + output + } else if output != i32::MIN { + -output + } else { + 0 + } +} + +const BIT_MIX_A: u32 = 0x85EB_CA6B; +const BIT_MIX_B: u32 = 0xC2B2_AE35; + +#[inline(always)] +fn bit_mix(mut input: u32) -> u32 { + input = input ^ (input >> 16); + input = input.wrapping_mul(BIT_MIX_A); + input = input ^ (input >> 13); + input = input.wrapping_mul(BIT_MIX_B); + input = input ^ (input >> 16); + input +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_murmur3() { + // + let empty_data_hash = hash_bytes(&[]); + assert_eq!(empty_data_hash, 0); + + let empty_data_hash = hash_bytes_with_seed(&[], 1); + assert_eq!(0x514E_28B7, empty_data_hash); + + let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFF); + assert_eq!(0x81F1_6F39, empty_data_hash); + + let hash = hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); + assert_eq!(0x2E4F_F723, hash); + + let hash = hash_bytes_with_seed( + "The quick brown fox jumps over the lazy dog".as_bytes(), + 0x9747_B28C, + ); + assert_eq!(0x2FA8_26CD, hash); + } + + #[test] + fn test_flink_murmur() { + let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0).expect("Failed to hash"); + assert_eq!(empty_data_hash, 0); + + let empty_data_hash = fluss_hash_bytes(&[]).expect("Failed to hash"); + assert_eq!(0x087F_CD5C, empty_data_hash); + + let empty_data_hash = + fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32).expect("Failed to hash"); + assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); + + let hash = + fluss_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0) + .expect("Failed to hash"); + assert_eq!(0x5FD2_0A20, hash); + + let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()) + .expect("Failed to hash"); + assert_eq!(0x1BC6_F880, hash); + + let hash = fluss_hash_i32(0); + assert_eq!(0x2362_F9DE, hash); + + let hash = fluss_hash_i32(42); + assert_eq!(0x43A4_6E1D, hash); + + let hash = fluss_hash_i32(-77); + assert_eq!(0x2EEB_27DE, hash); + } +}