diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index 11bdfa31..bf398391 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -344,10 +344,7 @@ impl LogFetcher { if let Error::RpcError { source, .. } = &e && matches!(source, RpcError::ConnectionError(_) | RpcError::Poisoned(_)) { - warn!( - "Retrying after encountering error while updating table metadata: {}", - e - ); + warn!("Retrying after encountering error while updating table metadata: {e}"); Ok(()) } else { Err(e) @@ -395,7 +392,7 @@ impl LogFetcher { let server_node = match cluster.get_tablet_server(leader) { Some(node) => node, None => { - warn!("No server node found for leader {}, retrying", leader); + warn!("No server node found for leader {leader}, retrying"); Self::handle_fetch_failure(metadata, &leader, &fetch_request).await; return; } diff --git a/crates/fluss/src/compression/arrow_compression.rs b/crates/fluss/src/compression/arrow_compression.rs index 32dfadb4..8121a512 100644 --- a/crates/fluss/src/compression/arrow_compression.rs +++ b/crates/fluss/src/compression/arrow_compression.rs @@ -17,6 +17,7 @@ use crate::error::{Error, Result}; use arrow::ipc::CompressionType; +use arrow_schema::ArrowError; use std::collections::HashMap; pub const TABLE_LOG_ARROW_COMPRESSION_ZSTD_LEVEL: &str = "table.log.arrow.compression.zstd.level"; @@ -71,21 +72,31 @@ impl ArrowCompressionInfo { { Some(Ok(level)) if !(1..=22).contains(&level) => Err(Error::IllegalArgument { message: format!( - "Invalid ZSTD compression level: {}. Expected a value between 1 and 22.", - level + "Invalid ZSTD compression level: {level}. Expected a value between 1 and 22." ), }), Some(Err(e)) => Err(Error::IllegalArgument { message: format!( - "Invalid ZSTD compression level. Expected a value between 1 and 22. {}", - e + "Invalid ZSTD compression level. Expected a value between 1 and 22. {e}" ), }), - - Some(Ok(level)) => Ok(Self { - compression_type, - compression_level: level, - }), + Some(Ok(level)) => { + // TODO Remove once non-default ZSTD compression level is implemented https://github.com/apache/fluss-rust/issues/109 + if level != DEFAULT_ZSTD_COMPRESSION_LEVEL { + return Err(Error::ArrowError { + message: format!( + "Rust client currently only implements default ZSTD compression level {DEFAULT_ZSTD_COMPRESSION_LEVEL}. Got: {level}." + ), + source: ArrowError::NotYetImplemented(format!( + "zstd compression level {level}." + )), + }); + } + Ok(Self { + compression_type, + compression_level: level, + }) + } None => Ok(Self { compression_type, compression_level: DEFAULT_ZSTD_COMPRESSION_LEVEL, @@ -171,11 +182,19 @@ mod tests { "ZSTD", )])); assert_eq!(compression_info.unwrap().compression_level, 3); - let compression_info = ArrowCompressionInfo::from_conf(&mk_map(&[ + } + + // TODO Remove once non-default ZSTD compression level is implemented https://github.com/apache/fluss-rust/issues/109 + #[test] + fn test_from_conf_zstd_compression_level_error_when_non_default() { + let result = ArrowCompressionInfo::from_conf(&mk_map(&[ ("table.log.arrow.compression.type", "ZSTD"), ("table.log.arrow.compression.zstd.level", "1"), ])); - assert_eq!(compression_info.unwrap().compression_level, 1); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains( + "Rust client currently only implements default ZSTD compression level 3. Got: 1." + )); } #[test]