Skip to content
266 changes: 266 additions & 0 deletions crates/fluss/src/bucketing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::DataLakeFormat;
use crate::util::murmur_hash;

pub trait BucketingFunction: Sync + Send {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32>;
}

#[allow(dead_code)]
impl dyn BucketingFunction {
/// Provides the bucketing function for a given [DataLakeFormat]
///
/// # Arguments
/// * `lake_format` - Data lake format or none
///
/// # Returns
/// * BucketingFunction
pub fn of(lake_format: Option<&DataLakeFormat>) -> Box<dyn BucketingFunction> {
match lake_format {
None => Box::new(FlussBucketingFunction),
Some(DataLakeFormat::Paimon) => Box::new(PaimonBucketingFunction),
Some(DataLakeFormat::Lance) => Box::new(FlussBucketingFunction),
Some(DataLakeFormat::Iceberg) => Box::new(IcebergBucketingFunction),
}
}
}

struct FlussBucketingFunction;
impl BucketingFunction for FlussBucketingFunction {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
if bucket_key.is_empty() {
return Err(IllegalArgument {
message: "bucket_key must not be empty!".to_string(),
});
}

if num_buckets <= 0 {
return Err(IllegalArgument {
message: "num_buckets must be positive!".to_string(),
});
}

let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;

Ok(murmur_hash::fluss_hash_i32(key_hash) % num_buckets)
}
}

struct PaimonBucketingFunction;
impl BucketingFunction for PaimonBucketingFunction {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
if bucket_key.is_empty() {
return Err(IllegalArgument {
message: "bucket_key must not be empty!".to_string(),
});
}

if num_buckets <= 0 {
return Err(IllegalArgument {
message: "num_buckets must be positive!".to_string(),
});
}

let key_hash = murmur_hash::fluss_hash_bytes(bucket_key)?;

Ok((key_hash % num_buckets).abs())
}
}

struct IcebergBucketingFunction;
impl BucketingFunction for IcebergBucketingFunction {
fn bucketing(&self, bucket_key: &[u8], num_buckets: i32) -> Result<i32> {
if bucket_key.is_empty() {
return Err(IllegalArgument {
message: "bucket_key must not be empty!".to_string(),
});
}

if num_buckets <= 0 {
return Err(IllegalArgument {
message: "num_buckets must be positive!".to_string(),
});
};

Ok((murmur_hash::hash_bytes(bucket_key) as i32 & i32::MAX) % num_buckets)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_default_bucketing() {
let default_bucketing = <dyn BucketingFunction>::of(None);

let expected = 1;
let actual = default_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 0;
let actual = default_bucketing
.bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 6;
let actual = default_bucketing
.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 6;
let actual = default_bucketing
.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);
}

#[test]
fn test_paimon_bucketing() {
let paimon_bucketing = <dyn BucketingFunction>::of(Some(&DataLakeFormat::Paimon));

let expected = 1;
let actual = paimon_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 11;
let actual = paimon_bucketing
.bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 12;
let actual = paimon_bucketing
.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 0;
let actual = paimon_bucketing
.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);
}

#[test]
fn test_lance_bucketing() {
let lance_bucketing = <dyn BucketingFunction>::of(Some(&DataLakeFormat::Lance));

let expected = 1;
let actual = lance_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 0;
let actual = lance_bucketing
.bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 6;
let actual = lance_bucketing
.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 6;
let actual = lance_bucketing
.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);
}

#[test]
fn test_iceberg_bucketing() {
let iceberg_bucketing = <dyn BucketingFunction>::of(Some(&DataLakeFormat::Iceberg));

let expected = 3;
let actual = iceberg_bucketing.bucketing(&[00u8, 10u8], 7).unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 4;
let actual = iceberg_bucketing
.bucketing(&[00u8, 10u8, 10u8, 10u8], 12)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 12;
let actual = iceberg_bucketing
.bucketing("2bb87d68-baf9-4e64-90f9-f80910419fa6".as_bytes(), 16)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);

let expected = 3;
let actual = iceberg_bucketing
.bucketing("The quick brown fox jumps over the lazy dog".as_bytes(), 8)
.unwrap();
assert_eq!(
expected, actual,
"Expecting bucket to be {expected} but got {actual}"
);
}
}
54 changes: 50 additions & 4 deletions crates/fluss/src/client/write/bucket_assigner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::bucketing::BucketingFunction;
use crate::cluster::Cluster;
use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::TablePath;
use rand::Rng;
use std::sync::atomic::{AtomicI32, Ordering};
Expand All @@ -25,7 +28,7 @@ pub trait BucketAssigner: Sync + Send {

fn on_new_batch(&self, cluster: &Cluster, prev_bucket_id: i32);

fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32;
fn assign_bucket(&self, bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result<i32>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -91,12 +94,55 @@ impl BucketAssigner for StickyBucketAssigner {
self.next_bucket(cluster, prev_bucket_id);
}

fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> i32 {
fn assign_bucket(&self, _bucket_key: Option<&[u8]>, cluster: &Cluster) -> Result<i32> {
let bucket_id = self.current_bucket_id.load(Ordering::Relaxed);
if bucket_id < 0 {
self.next_bucket(cluster, bucket_id)
Ok(self.next_bucket(cluster, bucket_id))
} else {
bucket_id
Ok(bucket_id)
}
}
}

/// A [BucketAssigner] which assigns based on a modulo hashing function
pub struct HashBucketAssigner {
num_buckets: i32,
bucketing_function: Box<dyn BucketingFunction>,
}

#[allow(dead_code)]
impl HashBucketAssigner {
/// Creates a new [HashBucketAssigner] based on the given [BucketingFunction].
/// See [BucketingFunction.of(Option<&DataLakeFormat>)] for bucketing functions.
///
///
/// # Arguments
/// * `num_buckets` - The number of buckets
/// * `bucketing_function` - The bucketing function
///
/// # Returns
/// * [HashBucketAssigner] - The hash bucket assigner
pub fn new(num_buckets: i32, bucketing_function: Box<dyn BucketingFunction>) -> Self {
HashBucketAssigner {
num_buckets,
bucketing_function,
}
}
}

impl BucketAssigner for HashBucketAssigner {
fn abort_if_batch_full(&self) -> bool {
false
}

fn on_new_batch(&self, _: &Cluster, _: i32) {
// do nothing
}

fn assign_bucket(&self, bucket_key: Option<&[u8]>, _: &Cluster) -> Result<i32> {
let key = bucket_key.ok_or_else(|| IllegalArgument {
message: "no bucket key provided".to_string(),
})?;
self.bucketing_function.bucketing(key, self.num_buckets)
}
}
13 changes: 8 additions & 5 deletions crates/fluss/src/client/write/writer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl WriterClient {
let table_path = &record.table_path;
let cluster = self.metadata.get_cluster();

let (bucket_assigner, bucket_id) = self.assign_bucket(table_path);
let (bucket_assigner, bucket_id) = self.assign_bucket(table_path)?;

let mut result = self
.accumulate
Expand All @@ -101,7 +101,7 @@ impl WriterClient {
if result.abort_record_for_new_batch {
let prev_bucket_id = bucket_id;
bucket_assigner.on_new_batch(&cluster, prev_bucket_id);
let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
result = self
.accumulate
.append(record, bucket_id, &cluster, false)
Expand All @@ -114,7 +114,10 @@ impl WriterClient {

Ok(result.result_handle.expect("result_handle should exist"))
}
fn assign_bucket(&self, table_path: &Arc<TablePath>) -> (Arc<Box<dyn BucketAssigner>>, i32) {
fn assign_bucket(
&self,
table_path: &Arc<TablePath>,
) -> Result<(Arc<Box<dyn BucketAssigner>>, i32)> {
let cluster = self.metadata.get_cluster();
let bucket_assigner = {
if let Some(assigner) = self.bucket_assigners.get(table_path) {
Expand All @@ -126,8 +129,8 @@ impl WriterClient {
assigner
}
};
let bucket_id = bucket_assigner.assign_bucket(None, &cluster);
(bucket_assigner, bucket_id)
let bucket_id = bucket_assigner.assign_bucket(None, &cluster)?;
Ok((bucket_assigner, bucket_id))
}

pub async fn close(self) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions crates/fluss/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod cluster;
pub mod config;
pub mod error;

mod bucketing;
mod compression;
pub mod io;
mod util;
Expand Down
Loading
Loading