From 2e16284d02df9df75df15f75d9c633f3b498cb7b Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sat, 27 Dec 2025 20:55:28 +0000 Subject: [PATCH 01/10] Support put KV: Add HashBucketAssigner --- crates/fluss/src/bucketing/mod.rs | 213 ++++++++++++++++++ .../fluss/src/client/write/bucket_assigner.rs | 31 +++ crates/fluss/src/lib.rs | 1 + crates/fluss/src/metadata/data_lake_format.rs | 22 ++ crates/fluss/src/metadata/mod.rs | 2 + crates/fluss/src/util/mod.rs | 2 + crates/fluss/src/util/murmur_hash.rs | 211 +++++++++++++++++ 7 files changed, 482 insertions(+) create mode 100644 crates/fluss/src/bucketing/mod.rs create mode 100644 crates/fluss/src/metadata/data_lake_format.rs create mode 100644 crates/fluss/src/util/murmur_hash.rs diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs new file mode 100644 index 0000000..edcb210 --- /dev/null +++ b/crates/fluss/src/bucketing/mod.rs @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::DataLakeFormat; +use crate::util::murmur_hash; + +pub trait BucketingFunction: Sync + Send { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32; +} + +#[allow(dead_code)] +impl dyn BucketingFunction { + pub fn of(lake_format: Option<&DataLakeFormat>) -> Box { + match lake_format { + None => Box::new(FlussBucketingFunction), + Some(DataLakeFormat::PAIMON) => Box::new(PaimonBucketingFunction), + Some(DataLakeFormat::LANCE) => Box::new(FlussBucketingFunction), + Some(DataLakeFormat::ICEBERG) => Box::new(IcebergBucketingFunction), + } + } +} + +struct FlussBucketingFunction; +impl BucketingFunction for FlussBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { + assert!(!bucket_key.is_empty(), "Assigned key must not be empty!"); + let key_hash = murmur_hash::flink_hash_bytes(bucket_key); + murmur_hash::flink_hash_i32(key_hash) % num_buckets + } +} + +struct PaimonBucketingFunction; +impl BucketingFunction for PaimonBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { + assert!(!bucket_key.is_empty(), "Assigned key must not be empty!"); + let key_hash = murmur_hash::flink_hash_bytes(bucket_key); + (key_hash % num_buckets).abs() + } +} + +struct IcebergBucketingFunction; +impl BucketingFunction for IcebergBucketingFunction { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { + if bucket_key.is_empty() { + panic!("bucket_key must not be empty!"); + } + + if num_buckets <= 0 { + panic!("num_buckets must be positive!"); + } + + (murmur_hash::hash_bytes(bucket_key) & i32::MAX) % num_buckets + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default_bucketing() { + let default_bucketing = ::of(None); + + let expected = 1; + let actual = default_bucketing.bucketing(&[00u8, 10u8], 7); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = default_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = + default_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = default_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_paimon_bucketing() { + let paimon_bucketing = ::of(Some(&DataLakeFormat::PAIMON)); + + let expected = 1; + let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 11; + let actual = paimon_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 12; + let actual = + paimon_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = + paimon_bucketing.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_lance_bucketing() { + let lance_bucketing = ::of(Some(&DataLakeFormat::LANCE)); + + let expected = 1; + let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 0; + let actual = lance_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = + lance_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 6; + let actual = + lance_bucketing.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } + + #[test] + fn test_iceberg_bucketing() { + let iceberg_bucketing = ::of(Some(&DataLakeFormat::ICEBERG)); + + let expected = 3; + let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 4; + let actual = iceberg_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 12; + let actual = + iceberg_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + + let expected = 3; + let actual = iceberg_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + assert_eq!( + expected, actual, + "Expecting bucket to be {expected} but got {actual}" + ); + } +} diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index 991c5f9..f5d2ffd 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -19,6 +19,7 @@ use crate::cluster::Cluster; use crate::metadata::TablePath; use rand::Rng; use std::sync::atomic::{AtomicI32, Ordering}; +use crate::bucketing::BucketingFunction; pub trait BucketAssigner: Sync + Send { fn abort_if_batch_full(&self) -> bool; @@ -100,3 +101,33 @@ impl BucketAssigner for StickyBucketAssigner { } } } + +pub struct HashBucketAssigner { + num_buckets: i32, + bucketing_function: Box, +} + +#[allow(dead_code)] +impl HashBucketAssigner { + pub fn new(num_buckets: i32, bucketing_function: Box) -> Self { + HashBucketAssigner { + num_buckets, + bucketing_function, + } + } +} + +impl BucketAssigner for HashBucketAssigner { + fn abort_if_batch_full(&self) -> bool { + false + } + + fn on_new_batch(&self, _: &Cluster, _: i32) { + // do nothing + } + + fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> i32 { + let key = bucket_key.expect("no bucket key provided"); + self.bucketing_function.bucketing(key, self.num_buckets) + } +} diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs index 25978ce..1bd72a4 100644 --- a/crates/fluss/src/lib.rs +++ b/crates/fluss/src/lib.rs @@ -26,6 +26,7 @@ mod cluster; pub mod config; pub mod error; +mod bucketing; mod compression; pub mod io; mod util; diff --git a/crates/fluss/src/metadata/data_lake_format.rs b/crates/fluss/src/metadata/data_lake_format.rs new file mode 100644 index 0000000..f78cac2 --- /dev/null +++ b/crates/fluss/src/metadata/data_lake_format.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub enum DataLakeFormat { + PAIMON, + LANCE, + ICEBERG, +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 8754007..9c0b1b4 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod data_lake_format; mod database; mod datatype; mod json_serde; mod table; +pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use json_serde::*; diff --git a/crates/fluss/src/util/mod.rs b/crates/fluss/src/util/mod.rs index d8c0db5..5f67290 100644 --- a/crates/fluss/src/util/mod.rs +++ b/crates/fluss/src/util/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +pub mod murmur_hash; + use crate::metadata::TableBucket; use linked_hash_map::LinkedHashMap; use std::collections::{HashMap, HashSet}; diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs new file mode 100644 index 0000000..d79cb70 --- /dev/null +++ b/crates/fluss/src/util/murmur_hash.rs @@ -0,0 +1,211 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +pub const MURMUR3_DEFAULT_SEED: i32 = 0; +pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42; + +const C1: i32 = 0xCC9E_2D51_u32 as i32; +const C2: i32 = 0x1B87_3593; +const R1: u32 = 15; +const R2: u32 = 13; +const M: i32 = 5; +const N: i32 = 0xE654_6B64_u32 as i32; +const CHUNK_SIZE: usize = 4; + +/// Hashes the data using 32-bit Murmur3 hash with 0 as seed +/// +/// # Arguments +/// * `data` - byte array containing data to be hashed +/// +/// # Returns +/// Returns hash value +pub fn hash_bytes(data: &[u8]) -> i32 { + hash_bytes_with_seed(data, MURMUR3_DEFAULT_SEED) +} + +#[inline(always)] +fn hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { + let length = data.len(); + let chunks = length / CHUNK_SIZE; + let length_aligned = chunks * CHUNK_SIZE; + + let mut h1 = hash_full_chunks(data, seed, length_aligned); + let mut k1 = 0i32; + + for (shift, &b) in data[length_aligned..].iter().enumerate() { + k1 |= (b as i32) << (8 * shift); + } + + h1 ^= k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2); + + fmix(h1, length) +} + +/// Hashes the data using Flink's variant of 32-bit Murmur hash with 42 as seed and tail bytes mixed into hash byte-by-byte +/// +/// # Arguments +/// * `data` - byte array containing data to be hashed +/// +/// # Returns +/// Returns hash value +pub fn flink_hash_bytes(data: &[u8]) -> i32 { + flink_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED) +} +#[inline(always)] +fn flink_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { + let length = data.len(); + let chunks = length / CHUNK_SIZE; + let length_aligned = chunks * CHUNK_SIZE; + + let mut h1 = hash_full_chunks(data, seed, length_aligned); + + #[allow(clippy::needless_range_loop)] + for index in length_aligned..length { + let byte = i32::from(data[index]); + let k1 = mix_k1(byte); + h1 = mix_h1(h1, k1); + } + + fmix(h1, length) +} + +#[inline(always)] +fn hash_full_chunks(data: &[u8], seed: i32, length_aligned: usize) -> i32 { + let mut h1 = seed; + + for i in 0..length_aligned / CHUNK_SIZE { + let offset = i * 4; + let block = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); + let k1 = mix_k1(block); + h1 = mix_h1(h1, k1); + } + h1 +} + +#[inline(always)] +fn mix_k1(k1: i32) -> i32 { + k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2) +} + +#[inline(always)] +fn mix_h1(h1: i32, k1: i32) -> i32 { + (h1 ^ k1).rotate_left(R2).wrapping_mul(M).wrapping_add(N) +} + +// Finalization mix - force all bits of a hash block to avalanche +#[inline(always)] +fn fmix(mut h1: i32, length: usize) -> i32 { + h1 ^= length as i32; + bit_mix(h1) +} + +/// Hashes an i32 using Flink's variant of Murmur +/// +/// # Arguments +/// * `code` - byte array containing data to be hashed +/// +/// # Returns +/// Returns hash value +pub fn flink_hash_i32(code: i32) -> i32 { + let mut code = code.wrapping_mul(C1); + code = code.rotate_left(R1); + code = code.wrapping_mul(C2); + code = code.rotate_left(R2); + + code = code.wrapping_mul(M).wrapping_add(N); + code ^= CHUNK_SIZE as i32; + code = bit_mix(code); + + if code >= 0 { + code + } else if code != i32::MIN { + -code + } else { + 0 + } +} + +const BIT_MIX_A: i32 = 0x85EB_CA6Bu32 as i32; +const BIT_MIX_B: i32 = 0xC2B2_AE35u32 as i32; + +#[inline(always)] +fn bit_mix(mut input: i32) -> i32 { + input = input ^ ((input as u32) >> 16) as i32; + input = input.wrapping_mul(BIT_MIX_A); + input = input ^ ((input as u32) >> 13) as i32; + input = input.wrapping_mul(BIT_MIX_B); + input = input ^ ((input as u32) >> 16) as i32; + input +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_murmur3() { + // + let empty_data_hash = hash_bytes(&[]); + assert_eq!(empty_data_hash, 0); + + let empty_data_hash = hash_bytes_with_seed(&[], 1); + assert_eq!(0x514E_28B7, empty_data_hash); + + let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); + assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); + + let hash = hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); + assert_eq!(0x2E4F_F723, hash); + + let hash = hash_bytes_with_seed( + "The quick brown fox jumps over the lazy dog".as_bytes(), + 0x9747_B28Cu32 as i32, + ); + assert_eq!(0x2FA8_26CD, hash); + } + + #[test] + fn test_flink_murmur() { + let empty_data_hash = flink_hash_bytes_with_seed(&[], 0); + assert_eq!(empty_data_hash, 0); + + let empty_data_hash = flink_hash_bytes(&[]); + assert_eq!(0x087F_CD5C, empty_data_hash); + + let empty_data_hash = flink_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); + assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); + + let hash = + flink_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0); + assert_eq!(0x5FD2_0A20, hash); + + let hash = flink_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); + assert_eq!(0x1BC6_F880, hash); + + let hash = flink_hash_i32(0); + assert_eq!(0x2362_F9DE, hash); + + let hash = flink_hash_i32(42); + assert_eq!(0x43A4_6E1D, hash); + + let hash = flink_hash_i32(-77); + assert_eq!(0x2EEB_27DE, hash); + } +} From b8a689a86acd6ba88212f6ac95535d55f634b7e7 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 09:48:24 +0000 Subject: [PATCH 02/10] Use Result<> for BucketAssigner, add documentation to HashBucketAssigner related classes --- crates/fluss/src/bucketing/mod.rs | 121 +++++++++++++----- .../fluss/src/client/write/bucket_assigner.rs | 29 ++++- .../fluss/src/client/write/writer_client.rs | 13 +- 3 files changed, 117 insertions(+), 46 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index edcb210..5ad8b0c 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -15,15 +15,24 @@ // specific language governing permissions and limitations // under the License. +use crate::error::Error::IllegalArgument; +use crate::error::Result; use crate::metadata::DataLakeFormat; use crate::util::murmur_hash; pub trait BucketingFunction: Sync + Send { - fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32; + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result; } #[allow(dead_code)] impl dyn BucketingFunction { + /// Provides the bucketing function for a given [DataLakeFormat] + /// + /// # Arguments + /// * `lake_format` - Data lake format or none + /// + /// # Returns + /// Returns BucketingFunction pub fn of(lake_format: Option<&DataLakeFormat>) -> Box { match lake_format { None => Box::new(FlussBucketingFunction), @@ -36,34 +45,62 @@ impl dyn BucketingFunction { struct FlussBucketingFunction; impl BucketingFunction for FlussBucketingFunction { - fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { - assert!(!bucket_key.is_empty(), "Assigned key must not be empty!"); + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { + if bucket_key.is_empty() { + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); + } + + if num_buckets <= 0 { + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + } + let key_hash = murmur_hash::flink_hash_bytes(bucket_key); - murmur_hash::flink_hash_i32(key_hash) % num_buckets + + Ok(murmur_hash::flink_hash_i32(key_hash) % num_buckets) } } struct PaimonBucketingFunction; impl BucketingFunction for PaimonBucketingFunction { - fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { - assert!(!bucket_key.is_empty(), "Assigned key must not be empty!"); + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { + if bucket_key.is_empty() { + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); + } + + if num_buckets <= 0 { + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + } + let key_hash = murmur_hash::flink_hash_bytes(bucket_key); - (key_hash % num_buckets).abs() + + Ok((key_hash % num_buckets).abs()) } } struct IcebergBucketingFunction; impl BucketingFunction for IcebergBucketingFunction { - fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> i32 { + fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result { if bucket_key.is_empty() { - panic!("bucket_key must not be empty!"); + return Err(IllegalArgument { + message: "bucket_key must not be empty!".to_string(), + }); } if num_buckets <= 0 { - panic!("num_buckets must be positive!"); - } + return Err(IllegalArgument { + message: "num_buckets must be positive!".to_string(), + }); + }; - (murmur_hash::hash_bytes(bucket_key) & i32::MAX) % num_buckets + Ok((murmur_hash::hash_bytes(bucket_key) & i32::MAX) % num_buckets) } } @@ -76,22 +113,25 @@ mod tests { let default_bucketing = ::of(None); let expected = 1; - let actual = default_bucketing.bucketing(&[00u8, 10u8], 7); + let actual = default_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 0; - let actual = default_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + let actual = default_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 6; - let actual = - default_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + let actual = default_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" @@ -99,7 +139,8 @@ mod tests { let expected = 6; let actual = default_bucketing - .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" @@ -111,30 +152,34 @@ mod tests { let paimon_bucketing = ::of(Some(&DataLakeFormat::PAIMON)); let expected = 1; - let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7); + let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 11; - let actual = paimon_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + let actual = paimon_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 12; - let actual = - paimon_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + let actual = paimon_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 0; - let actual = - paimon_bucketing.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + let actual = paimon_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" @@ -146,30 +191,34 @@ mod tests { let lance_bucketing = ::of(Some(&DataLakeFormat::LANCE)); let expected = 1; - let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7); + let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 0; - let actual = lance_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + let actual = lance_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 6; - let actual = - lance_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + let actual = lance_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 6; - let actual = - lance_bucketing.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + let actual = lance_bucketing + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" @@ -181,22 +230,25 @@ mod tests { let iceberg_bucketing = ::of(Some(&DataLakeFormat::ICEBERG)); let expected = 3; - let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7); + let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 4; - let actual = iceberg_bucketing.bucketing(&[00u8, 10u8, 10u8, 10u8], 12); + let actual = iceberg_bucketing + .bucketing(&[00u8, 10u8, 10u8, 10u8], 12) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" ); let expected = 12; - let actual = - iceberg_bucketing.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16); + let actual = iceberg_bucketing + .bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" @@ -204,7 +256,8 @@ mod tests { let expected = 3; let actual = iceberg_bucketing - .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8); + .bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8) + .unwrap(); assert_eq!( expected, actual, "Expecting bucket to be {expected} but got {actual}" diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index f5d2ffd..c4240e5 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -15,18 +15,20 @@ // specific language governing permissions and limitations // under the License. +use crate::bucketing::BucketingFunction; use crate::cluster::Cluster; +use crate::error::Error::IllegalArgument; +use crate::error::Result; use crate::metadata::TablePath; use rand::Rng; use std::sync::atomic::{AtomicI32, Ordering}; -use crate::bucketing::BucketingFunction; pub trait BucketAssigner: Sync + Send { fn abort_if_batch_full(&self) -> bool; fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32); - fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32; + fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result; } #[derive(Debug)] @@ -92,16 +94,17 @@ impl BucketAssigner for StickyBucketAssigner { self.next_bucket(cluster, prev_bucket_id); } - fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32 { + fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result { let bucket_id = self.current_bucket_id.load(Ordering::Relaxed); if bucket_id < 0 { - self.next_bucket(cluster, bucket_id) + Ok(self.next_bucket(cluster, bucket_id)) } else { - bucket_id + Ok(bucket_id) } } } +// A [BucketAssigner] which assigns based on a modulo hashing function pub struct HashBucketAssigner { num_buckets: i32, bucketing_function: Box, @@ -109,6 +112,16 @@ pub struct HashBucketAssigner { #[allow(dead_code)] impl HashBucketAssigner { + /// Creates a new [HashBucketAssigner] based on the given [BucketingFunction]. + /// See [BucketingFunction.of(Option<&DataLakeFormat>)] for bucketing functions. + /// + /// + /// # Arguments + /// * `num_buckets` - The number of buckets + /// * `bucketing_function` - The bucketing function + /// + /// # Returns + /// * [HashBucketAssigner] - The hash bucket assigner pub fn new(num_buckets: i32, bucketing_function: Box) -> Self { HashBucketAssigner { num_buckets, @@ -126,8 +139,10 @@ impl BucketAssigner for HashBucketAssigner { // do nothing } - fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> i32 { - let key = bucket_key.expect("no bucket key provided"); + fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> Result { + let key = bucket_key.ok_or_else(|| IllegalArgument { + message: "no bucket key provided".to_string(), + })?; self.bucketing_function.bucketing(key, self.num_buckets) } } diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index 042859a..22e0397 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -91,7 +91,7 @@ impl WriterClient { let table_path = &record.table_path; let cluster = self.metadata.get_cluster(); - let (bucket_assigner, bucket_id) = self.assign_bucket(table_path); + let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?; let mut result = self .accumulate @@ -101,7 +101,7 @@ impl WriterClient { if result.abort_record_for_new_batch { let prev_bucket_id = bucket_id; bucket_assigner.on_new_batch(&cluster, prev_bucket_id); - let bucket_id = bucket_assigner.assign_bucket(None, &cluster); + let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; result = self .accumulate .append(record, bucket_id, &cluster, false) @@ -114,7 +114,10 @@ impl WriterClient { Ok(result.result_handle.expect("result_handle should exist")) } - fn assign_bucket(&self, table_path: &Arc) -> (Arc>, i32) { + fn assign_bucket( + &self, + table_path: &Arc, + ) -> Result<(Arc>, i32)> { let cluster = self.metadata.get_cluster(); let bucket_assigner = { if let Some(assigner) = self.bucket_assigners.get(table_path) { @@ -126,8 +129,8 @@ impl WriterClient { assigner } }; - let bucket_id = bucket_assigner.assign_bucket(None, &cluster); - (bucket_assigner, bucket_id) + let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?; + Ok((bucket_assigner, bucket_id)) } pub async fn close(self) -> Result<()> { From da3daf078971a699498aa9c844750b33000925a4 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 09:50:31 +0000 Subject: [PATCH 03/10] DataLakeFormat documentation and pascal casing. --- crates/fluss/src/bucketing/mod.rs | 12 ++++++------ crates/fluss/src/metadata/data_lake_format.rs | 14 +++++++++++--- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index 5ad8b0c..84a0503 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -36,9 +36,9 @@ impl dyn BucketingFunction { pub fn of(lake_format: Option<&DataLakeFormat>) -> Box { match lake_format { None => Box::new(FlussBucketingFunction), - Some(DataLakeFormat::PAIMON) => Box::new(PaimonBucketingFunction), - Some(DataLakeFormat::LANCE) => Box::new(FlussBucketingFunction), - Some(DataLakeFormat::ICEBERG) => Box::new(IcebergBucketingFunction), + Some(DataLakeFormat::Paimon) => Box::new(PaimonBucketingFunction), + Some(DataLakeFormat::Lance) => Box::new(FlussBucketingFunction), + Some(DataLakeFormat::Iceberg) => Box::new(IcebergBucketingFunction), } } } @@ -149,7 +149,7 @@ mod tests { #[test] fn test_paimon_bucketing() { - let paimon_bucketing = ::of(Some(&DataLakeFormat::PAIMON)); + let paimon_bucketing = ::of(Some(&DataLakeFormat::Paimon)); let expected = 1; let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); @@ -188,7 +188,7 @@ mod tests { #[test] fn test_lance_bucketing() { - let lance_bucketing = ::of(Some(&DataLakeFormat::LANCE)); + let lance_bucketing = ::of(Some(&DataLakeFormat::Lance)); let expected = 1; let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); @@ -227,7 +227,7 @@ mod tests { #[test] fn test_iceberg_bucketing() { - let iceberg_bucketing = ::of(Some(&DataLakeFormat::ICEBERG)); + let iceberg_bucketing = ::of(Some(&DataLakeFormat::Iceberg)); let expected = 3; let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap(); diff --git a/crates/fluss/src/metadata/data_lake_format.rs b/crates/fluss/src/metadata/data_lake_format.rs index f78cac2..76a23f8 100644 --- a/crates/fluss/src/metadata/data_lake_format.rs +++ b/crates/fluss/src/metadata/data_lake_format.rs @@ -15,8 +15,16 @@ // specific language governing permissions and limitations // under the License. +/// Identifies the logical format of a data lake table supported by Fluss. +/// +/// This enum is typically used in metadata and configuration to distinguish +/// between different table formats so that the appropriate integration and +/// semantics can be applied. pub enum DataLakeFormat { - PAIMON, - LANCE, - ICEBERG, + /// Apache Paimon data lake table format. + Paimon, + /// Lance columnar data format / lakehouse table format. + Lance, + /// Apache Iceberg data lake table format. + Iceberg, } From 1d1fdf0696b420a57830ca64e139d943215507f0 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 09:52:31 +0000 Subject: [PATCH 04/10] Rename flink_hash_bytes to fluss_hash_bytes --- crates/fluss/src/bucketing/mod.rs | 4 ++-- crates/fluss/src/util/murmur_hash.rs | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index 84a0503..cc3fd02 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -58,7 +58,7 @@ impl BucketingFunction for FlussBucketingFunction { }); } - let key_hash = murmur_hash::flink_hash_bytes(bucket_key); + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key); Ok(murmur_hash::flink_hash_i32(key_hash) % num_buckets) } @@ -79,7 +79,7 @@ impl BucketingFunction for PaimonBucketingFunction { }); } - let key_hash = murmur_hash::flink_hash_bytes(bucket_key); + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key); Ok((key_hash % num_buckets).abs()) } diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index d79cb70..3cce0e3 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -59,18 +59,18 @@ fn hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { fmix(h1, length) } -/// Hashes the data using Flink's variant of 32-bit Murmur hash with 42 as seed and tail bytes mixed into hash byte-by-byte +/// Hashes the data using Fluss'/Flink's variant of 32-bit Murmur hash with 42 as seed and tail bytes mixed into hash byte-by-byte /// /// # Arguments /// * `data` - byte array containing data to be hashed /// /// # Returns -/// Returns hash value -pub fn flink_hash_bytes(data: &[u8]) -> i32 { - flink_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED) +/// * hash value +pub fn fluss_hash_bytes(data: &[u8]) -> i32 { + fluss_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED) } #[inline(always)] -fn flink_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { +fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { let length = data.len(); let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; @@ -183,20 +183,20 @@ mod tests { #[test] fn test_flink_murmur() { - let empty_data_hash = flink_hash_bytes_with_seed(&[], 0); + let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0); assert_eq!(empty_data_hash, 0); - let empty_data_hash = flink_hash_bytes(&[]); + let empty_data_hash = fluss_hash_bytes(&[]); assert_eq!(0x087F_CD5C, empty_data_hash); - let empty_data_hash = flink_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); + let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); let hash = - flink_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0); + fluss_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0); assert_eq!(0x5FD2_0A20, hash); - let hash = flink_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); + let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); assert_eq!(0x1BC6_F880, hash); let hash = flink_hash_i32(0); From 9da56c2ecbe668554eff6cf1abc95154bcdc178c Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 09:53:17 +0000 Subject: [PATCH 05/10] Rename flink_hash_i32 to flink_hash_i32 --- crates/fluss/src/bucketing/mod.rs | 2 +- crates/fluss/src/util/murmur_hash.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index cc3fd02..7360636 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -60,7 +60,7 @@ impl BucketingFunction for FlussBucketingFunction { let key_hash = murmur_hash::fluss_hash_bytes(bucket_key); - Ok(murmur_hash::flink_hash_i32(key_hash) % num_buckets) + Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets) } } diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index 3cce0e3..f546c87 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -117,14 +117,14 @@ fn fmix(mut h1: i32, length: usize) -> i32 { bit_mix(h1) } -/// Hashes an i32 using Flink's variant of Murmur +/// Hashes an i32 using Fluss'/Flink's variant of Murmur /// /// # Arguments /// * `code` - byte array containing data to be hashed /// /// # Returns /// Returns hash value -pub fn flink_hash_i32(code: i32) -> i32 { +pub fn fluss_hash_i32(code: i32) -> i32 { let mut code = code.wrapping_mul(C1); code = code.rotate_left(R1); code = code.wrapping_mul(C2); @@ -199,13 +199,13 @@ mod tests { let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); assert_eq!(0x1BC6_F880, hash); - let hash = flink_hash_i32(0); + let hash = fluss_hash_i32(0); assert_eq!(0x2362_F9DE, hash); - let hash = flink_hash_i32(42); + let hash = fluss_hash_i32(42); assert_eq!(0x43A4_6E1D, hash); - let hash = flink_hash_i32(-77); + let hash = fluss_hash_i32(-77); assert_eq!(0x2EEB_27DE, hash); } } From 858770b53d4c5dd90d220f5d9c2a287707fb02c2 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 12:27:15 +0000 Subject: [PATCH 06/10] Simplify hash_full_chunks. Documentation update. --- crates/fluss/src/util/murmur_hash.rs | 57 +++++++++++++--------------- 1 file changed, 26 insertions(+), 31 deletions(-) diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index f546c87..4c2b8dc 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -47,7 +47,7 @@ fn hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; - let mut h1 = hash_full_chunks(data, seed, length_aligned); + let mut h1 = hash_full_chunks(data, seed); let mut k1 = 0i32; for (shift, &b) in data[length_aligned..].iter().enumerate() { @@ -75,12 +75,10 @@ fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; - let mut h1 = hash_full_chunks(data, seed, length_aligned); + let mut h1 = hash_full_chunks(data, seed); - #[allow(clippy::needless_range_loop)] - for index in length_aligned..length { - let byte = i32::from(data[index]); - let k1 = mix_k1(byte); + for byte in data.iter().take(length).skip(length_aligned) { + let k1 = mix_k1(*byte as i32); h1 = mix_h1(h1, k1); } @@ -88,16 +86,13 @@ fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { } #[inline(always)] -fn hash_full_chunks(data: &[u8], seed: i32, length_aligned: usize) -> i32 { - let mut h1 = seed; - - for i in 0..length_aligned / CHUNK_SIZE { - let offset = i * 4; - let block = i32::from_le_bytes(data[offset..offset + 4].try_into().unwrap()); - let k1 = mix_k1(block); - h1 = mix_h1(h1, k1); - } - h1 +fn hash_full_chunks(data: &[u8], seed: i32) -> i32 { + data.chunks_exact(CHUNK_SIZE) + .fold(seed, |h1, chunk| { + let block = i32::from_le_bytes(chunk.try_into().unwrap()); + let k1 = mix_k1(block); + mix_h1(h1, k1) + }) } #[inline(always)] @@ -120,24 +115,24 @@ fn fmix(mut h1: i32, length: usize) -> i32 { /// Hashes an i32 using Fluss'/Flink's variant of Murmur /// /// # Arguments -/// * `code` - byte array containing data to be hashed +/// * `input` - i32 value to be hashed /// /// # Returns /// Returns hash value -pub fn fluss_hash_i32(code: i32) -> i32 { - let mut code = code.wrapping_mul(C1); - code = code.rotate_left(R1); - code = code.wrapping_mul(C2); - code = code.rotate_left(R2); - - code = code.wrapping_mul(M).wrapping_add(N); - code ^= CHUNK_SIZE as i32; - code = bit_mix(code); - - if code >= 0 { - code - } else if code != i32::MIN { - -code +pub fn fluss_hash_i32(mut input: i32) -> i32 { + input = input.wrapping_mul(C1); + input = input.rotate_left(R1); + input = input.wrapping_mul(C2); + input = input.rotate_left(R2); + + input = input.wrapping_mul(M).wrapping_add(N); + input ^= CHUNK_SIZE as i32; + input = bit_mix(input); + + if input >= 0 { + input + } else if input != i32::MIN { + -input } else { 0 } From eb204c12fd244a64d61ac8d2a760aba389d2b7fe Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 12:30:05 +0000 Subject: [PATCH 07/10] Formatting --- crates/fluss/src/util/murmur_hash.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index 4c2b8dc..9b7c3e9 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -87,12 +87,11 @@ fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { #[inline(always)] fn hash_full_chunks(data: &[u8], seed: i32) -> i32 { - data.chunks_exact(CHUNK_SIZE) - .fold(seed, |h1, chunk| { - let block = i32::from_le_bytes(chunk.try_into().unwrap()); - let k1 = mix_k1(block); - mix_h1(h1, k1) - }) + data.chunks_exact(CHUNK_SIZE).fold(seed, |h1, chunk| { + let block = i32::from_le_bytes(chunk.try_into().unwrap()); + let k1 = mix_k1(block); + mix_h1(h1, k1) + }) } #[inline(always)] From 07b436c0f16303c5ce023bb195fb178c03aa6bcb Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 12:31:04 +0000 Subject: [PATCH 08/10] Doc for hash bucket assigner. --- crates/fluss/src/client/write/bucket_assigner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/src/client/write/bucket_assigner.rs b/crates/fluss/src/client/write/bucket_assigner.rs index c4240e5..44b2673 100644 --- a/crates/fluss/src/client/write/bucket_assigner.rs +++ b/crates/fluss/src/client/write/bucket_assigner.rs @@ -104,7 +104,7 @@ impl BucketAssigner for StickyBucketAssigner { } } -// A [BucketAssigner] which assigns based on a modulo hashing function +/// A [BucketAssigner] which assigns based on a modulo hashing function pub struct HashBucketAssigner { num_buckets: i32, bucketing_function: Box, From 66d299d98c01eb6ca61a7598dd7472e68657f40c Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Sun, 28 Dec 2025 12:46:16 +0000 Subject: [PATCH 09/10] Use u32 for murmur3 hash --- crates/fluss/src/bucketing/mod.rs | 2 +- crates/fluss/src/util/murmur_hash.rs | 69 ++++++++++++++-------------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index 7360636..71db0e0 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -100,7 +100,7 @@ impl BucketingFunction for IcebergBucketingFunction { }); }; - Ok((murmur_hash::hash_bytes(bucket_key) & i32::MAX) % num_buckets) + Ok((murmur_hash::hash_bytes(bucket_key) as i32 & i32::MAX) % num_buckets) } } diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index 9b7c3e9..611226c 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -19,15 +19,15 @@ * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ -pub const MURMUR3_DEFAULT_SEED: i32 = 0; +pub const MURMUR3_DEFAULT_SEED: u32 = 0; pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42; -const C1: i32 = 0xCC9E_2D51_u32 as i32; -const C2: i32 = 0x1B87_3593; +const C1: u32 = 0xCC9E_2D51; +const C2: u32 = 0x1B87_3593; const R1: u32 = 15; const R2: u32 = 13; -const M: i32 = 5; -const N: i32 = 0xE654_6B64_u32 as i32; +const M: u32 = 5; +const N: u32 = 0xE654_6B64; const CHUNK_SIZE: usize = 4; /// Hashes the data using 32-bit Murmur3 hash with 0 as seed @@ -37,21 +37,21 @@ const CHUNK_SIZE: usize = 4; /// /// # Returns /// Returns hash value -pub fn hash_bytes(data: &[u8]) -> i32 { +pub fn hash_bytes(data: &[u8]) -> u32 { hash_bytes_with_seed(data, MURMUR3_DEFAULT_SEED) } #[inline(always)] -fn hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { +fn hash_bytes_with_seed(data: &[u8], seed: u32) -> u32 { let length = data.len(); let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; let mut h1 = hash_full_chunks(data, seed); - let mut k1 = 0i32; + let mut k1 = 0u32; for (shift, &b) in data[length_aligned..].iter().enumerate() { - k1 |= (b as i32) << (8 * shift); + k1 |= (b as u32) << (8 * shift); } h1 ^= k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2); @@ -75,39 +75,39 @@ fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; - let mut h1 = hash_full_chunks(data, seed); + let mut h1 = hash_full_chunks(data, seed as u32); for byte in data.iter().take(length).skip(length_aligned) { - let k1 = mix_k1(*byte as i32); + let k1 = mix_k1(*byte as u32); h1 = mix_h1(h1, k1); } - fmix(h1, length) + fmix(h1, length) as i32 } #[inline(always)] -fn hash_full_chunks(data: &[u8], seed: i32) -> i32 { +fn hash_full_chunks(data: &[u8], seed: u32) -> u32 { data.chunks_exact(CHUNK_SIZE).fold(seed, |h1, chunk| { - let block = i32::from_le_bytes(chunk.try_into().unwrap()); + let block = u32::from_le_bytes(chunk.try_into().unwrap()); let k1 = mix_k1(block); mix_h1(h1, k1) }) } #[inline(always)] -fn mix_k1(k1: i32) -> i32 { +fn mix_k1(k1: u32) -> u32 { k1.wrapping_mul(C1).rotate_left(R1).wrapping_mul(C2) } #[inline(always)] -fn mix_h1(h1: i32, k1: i32) -> i32 { +fn mix_h1(h1: u32, k1: u32) -> u32 { (h1 ^ k1).rotate_left(R2).wrapping_mul(M).wrapping_add(N) } // Finalization mix - force all bits of a hash block to avalanche #[inline(always)] -fn fmix(mut h1: i32, length: usize) -> i32 { - h1 ^= length as i32; +fn fmix(mut h1: u32, length: usize) -> u32 { + h1 ^= length as u32; bit_mix(h1) } @@ -118,35 +118,36 @@ fn fmix(mut h1: i32, length: usize) -> i32 { /// /// # Returns /// Returns hash value -pub fn fluss_hash_i32(mut input: i32) -> i32 { +pub fn fluss_hash_i32(input: i32) -> i32 { + let mut input = input as u32; input = input.wrapping_mul(C1); input = input.rotate_left(R1); input = input.wrapping_mul(C2); input = input.rotate_left(R2); input = input.wrapping_mul(M).wrapping_add(N); - input ^= CHUNK_SIZE as i32; - input = bit_mix(input); + input ^= CHUNK_SIZE as u32; + let output = bit_mix(input) as i32; - if input >= 0 { - input - } else if input != i32::MIN { - -input + if output >= 0 { + output + } else if output != i32::MIN { + -output } else { 0 } } -const BIT_MIX_A: i32 = 0x85EB_CA6Bu32 as i32; -const BIT_MIX_B: i32 = 0xC2B2_AE35u32 as i32; +const BIT_MIX_A: u32 = 0x85EB_CA6B; +const BIT_MIX_B: u32 = 0xC2B2_AE35; #[inline(always)] -fn bit_mix(mut input: i32) -> i32 { - input = input ^ ((input as u32) >> 16) as i32; +fn bit_mix(mut input: u32) -> u32 { + input = input ^ (input >> 16); input = input.wrapping_mul(BIT_MIX_A); - input = input ^ ((input as u32) >> 13) as i32; + input = input ^ (input >> 13); input = input.wrapping_mul(BIT_MIX_B); - input = input ^ ((input as u32) >> 16) as i32; + input = input ^ (input >> 16); input } @@ -162,15 +163,15 @@ mod tests { let empty_data_hash = hash_bytes_with_seed(&[], 1); assert_eq!(0x514E_28B7, empty_data_hash); - let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); - assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); + let empty_data_hash = hash_bytes_with_seed(&[], 0xFFFF_FFFF); + assert_eq!(0x81F1_6F39, empty_data_hash); let hash = hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); assert_eq!(0x2E4F_F723, hash); let hash = hash_bytes_with_seed( "The quick brown fox jumps over the lazy dog".as_bytes(), - 0x9747_B28Cu32 as i32, + 0x9747_B28C, ); assert_eq!(0x2FA8_26CD, hash); } From 84a6eaa84b98c14e7919e5b9c8620354f01b52fc Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Mon, 29 Dec 2025 09:45:48 +0000 Subject: [PATCH 10/10] Error when using fluss_hash_bytes with data larger than 2GB --- crates/fluss/src/bucketing/mod.rs | 6 ++--- crates/fluss/src/util/murmur_hash.rs | 34 ++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/crates/fluss/src/bucketing/mod.rs b/crates/fluss/src/bucketing/mod.rs index 71db0e0..2611ac7 100644 --- a/crates/fluss/src/bucketing/mod.rs +++ b/crates/fluss/src/bucketing/mod.rs @@ -32,7 +32,7 @@ impl dyn BucketingFunction { /// * `lake_format` - Data lake format or none /// /// # Returns - /// Returns BucketingFunction + /// * BucketingFunction pub fn of(lake_format: Option<&DataLakeFormat>) -> Box { match lake_format { None => Box::new(FlussBucketingFunction), @@ -58,7 +58,7 @@ impl BucketingFunction for FlussBucketingFunction { }); } - let key_hash = murmur_hash::fluss_hash_bytes(bucket_key); + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?; Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets) } @@ -79,7 +79,7 @@ impl BucketingFunction for PaimonBucketingFunction { }); } - let key_hash = murmur_hash::fluss_hash_bytes(bucket_key); + let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?; Ok((key_hash % num_buckets).abs()) } diff --git a/crates/fluss/src/util/murmur_hash.rs b/crates/fluss/src/util/murmur_hash.rs index 611226c..12229c7 100644 --- a/crates/fluss/src/util/murmur_hash.rs +++ b/crates/fluss/src/util/murmur_hash.rs @@ -18,6 +18,8 @@ /* This file is based on source code of Apache Flink Project (https://flink.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ +use crate::error::Error::IllegalArgument; +use crate::error::Result; pub const MURMUR3_DEFAULT_SEED: u32 = 0; pub const FLINK_MURMUR3_DEFAULT_SEED: i32 = 42; @@ -60,18 +62,29 @@ fn hash_bytes_with_seed(data: &[u8], seed: u32) -> u32 { } /// Hashes the data using Fluss'/Flink's variant of 32-bit Murmur hash with 42 as seed and tail bytes mixed into hash byte-by-byte +/// Maximum data array size supported is 2GB /// /// # Arguments /// * `data` - byte array containing data to be hashed /// /// # Returns -/// * hash value -pub fn fluss_hash_bytes(data: &[u8]) -> i32 { +/// * result of hashing, `Ok(hash_value)` +/// +/// # Error +/// Returns `Err(IllegalArgument)` if byte array is larger than 2GB +pub fn fluss_hash_bytes(data: &[u8]) -> Result { fluss_hash_bytes_with_seed(data, FLINK_MURMUR3_DEFAULT_SEED) } #[inline(always)] -fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { +fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> Result { let length = data.len(); + + if length >= i32::MAX as usize { + return Err(IllegalArgument { + message: "data array size {length} is bigger than supported".to_string(), + }); + } + let chunks = length / CHUNK_SIZE; let length_aligned = chunks * CHUNK_SIZE; @@ -82,7 +95,7 @@ fn fluss_hash_bytes_with_seed(data: &[u8], seed: i32) -> i32 { h1 = mix_h1(h1, k1); } - fmix(h1, length) as i32 + Ok(fmix(h1, length) as i32) } #[inline(always)] @@ -178,20 +191,23 @@ mod tests { #[test] fn test_flink_murmur() { - let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0); + let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0).expect("Failed to hash"); assert_eq!(empty_data_hash, 0); - let empty_data_hash = fluss_hash_bytes(&[]); + let empty_data_hash = fluss_hash_bytes(&[]).expect("Failed to hash"); assert_eq!(0x087F_CD5C, empty_data_hash); - let empty_data_hash = fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32); + let empty_data_hash = + fluss_hash_bytes_with_seed(&[], 0xFFFF_FFFFu32 as i32).expect("Failed to hash"); assert_eq!(0x81F1_6F39u32 as i32, empty_data_hash); let hash = - fluss_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0); + fluss_hash_bytes_with_seed("The quick brown fox jumps over the lazy dog".as_bytes(), 0) + .expect("Failed to hash"); assert_eq!(0x5FD2_0A20, hash); - let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()); + let hash = fluss_hash_bytes("The quick brown fox jumps over the lazy dog".as_bytes()) + .expect("Failed to hash"); assert_eq!(0x1BC6_F880, hash); let hash = fluss_hash_i32(0);