From e909f21ad9991eb10a4c0c4a7a5ca8fdcc8c5eab Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Fri, 26 Dec 2025 12:51:07 +0100 Subject: [PATCH 1/5] chore: add integration tests for subscribe_batch and project_by_name Add two new integration tests to improve API coverage: - test_subscribe_batch: tests LogScanner::subscribe_batch() with HashMap - test_project_by_name: tests TableScan::project_by_name() with column names Both tests include error case validation. --- crates/fluss/tests/integration/table.rs | 204 ++++++++++++++++++++++++ 1 file changed, 204 insertions(+) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 006adcc..dd841a5 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -40,6 +40,7 @@ mod table_test { use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; use jiff::Timestamp; + use std::collections::HashMap; use std::sync::Arc; use std::thread; @@ -382,4 +383,207 @@ mod table_test { "Timestamp after append should resolve to offset 0 (no newer records)" ); } + + #[tokio::test] + async fn test_subscribe_batch() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_subscribe_batch".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("value", DataTypes::string()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Append 6 records + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + let batch = record_batch!( + ("id", Int32, [1, 2, 3, 4, 5, 6]), + ("value", Utf8, ["a", "b", "c", "d", "e", "f"]) + ) + .unwrap(); + append_writer + .append_arrow_batch(batch) + .await + .expect("Failed to append batch"); + append_writer.flush().await.expect("Failed to flush"); + + // Test subscribe_batch with HashMap + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + + let mut bucket_offsets = HashMap::new(); + bucket_offsets.insert(0, 0i64); + log_scanner + .subscribe_batch(bucket_offsets) + .await + .expect("Failed to subscribe batch"); + + let scan_records = log_scanner + .poll(std::time::Duration::from_secs(60)) + .await + .expect("Failed to poll"); + + let records: Vec<_> = scan_records.into_iter().collect(); + assert_eq!( + records.len(), + 6, + "Should have 6 records via subscribe_batch" + ); + + // Verify record contents + for record in records.iter() { + let row = record.row(); + let id = row.get_int(0); + let value = row.get_string(1); + assert!(id >= 1 && id <= 6, "id should be between 1 and 6"); + assert!( + ["a", "b", "c", "d", "e", "f"].contains(&value.as_str()), + "value should be one of a-f" + ); + } + + // Test error case: empty HashMap should fail + let log_scanner_empty = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + + let result = log_scanner_empty.subscribe_batch(HashMap::new()).await; + assert!( + result.is_err(), + "subscribe_batch with empty HashMap should fail" + ); + } + + #[tokio::test] + async fn test_project_by_name() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_project_by_name".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("col_a", DataTypes::int()) + .column("col_b", DataTypes::string()) + .column("col_c", DataTypes::int()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Append 3 records + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + let batch = record_batch!( + ("col_a", Int32, [1, 2, 3]), + ("col_b", Utf8, ["x", "y", "z"]), + ("col_c", Int32, [10, 20, 30]) + ) + .unwrap(); + append_writer + .append_arrow_batch(batch) + .await + .expect("Failed to append batch"); + append_writer.flush().await.expect("Failed to flush"); + + // Test project_by_name: select col_b and col_c only + let log_scanner = table + .new_scan() + .project_by_name(&["col_b", "col_c"]) + .expect("Failed to project by name") + .create_log_scanner() + .expect("Failed to create log scanner"); + + log_scanner + .subscribe(0, 0) + .await + .expect("Failed to subscribe"); + + let scan_records = log_scanner + .poll(std::time::Duration::from_secs(60)) + .await + .expect("Failed to poll"); + + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + + assert_eq!( + records.len(), + 3, + "Should have 3 records with project_by_name" + ); + + // Verify projected columns are in the correct order (col_b, col_c) + let expected_col_b = vec!["x", "y", "z"]; + let expected_col_c = vec![10, 20, 30]; + + for (i, record) in records.iter().enumerate() { + let row = record.row(); + // col_b is now at index 0, col_c is at index 1 + assert_eq!( + row.get_string(0), + expected_col_b[i], + "col_b mismatch at index {}", + i + ); + assert_eq!( + row.get_int(1), + expected_col_c[i], + "col_c mismatch at index {}", + i + ); + } + + // Test error case: empty column names should fail + let result = table.new_scan().project_by_name(&[]); + assert!( + result.is_err(), + "project_by_name with empty names should fail" + ); + + // Test error case: non-existent column should fail + let result = table.new_scan().project_by_name(&["nonexistent_column"]); + assert!( + result.is_err(), + "project_by_name with non-existent column should fail" + ); + } } From eec27f3d48463b02c6ddeb6558ea461d6008f61b Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Sat, 27 Dec 2025 10:21:22 +0100 Subject: [PATCH 2/5] fix: update assertion to use value directly in list_offsets test --- crates/fluss/tests/integration/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index dd841a5..3ff34d9 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -460,7 +460,7 @@ mod table_test { let value = row.get_string(1); assert!(id >= 1 && id <= 6, "id should be between 1 and 6"); assert!( - ["a", "b", "c", "d", "e", "f"].contains(&value.as_str()), + ["a", "b", "c", "d", "e", "f"].contains(&value), "value should be one of a-f" ); } From a6a1df43d23bad19348dded46155151e21f5e05a Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Sat, 27 Dec 2025 10:29:01 +0100 Subject: [PATCH 3/5] fix: clippy --- crates/fluss/tests/integration/table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 3ff34d9..654f17a 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -458,7 +458,7 @@ mod table_test { let row = record.row(); let id = row.get_int(0); let value = row.get_string(1); - assert!(id >= 1 && id <= 6, "id should be between 1 and 6"); + assert!((1..=6).contains(&id), "id should be between 1 and 6"); assert!( ["a", "b", "c", "d", "e", "f"].contains(&value), "value should be one of a-f" @@ -552,8 +552,8 @@ mod table_test { ); // Verify projected columns are in the correct order (col_b, col_c) - let expected_col_b = vec!["x", "y", "z"]; - let expected_col_c = vec![10, 20, 30]; + let expected_col_b = ["x", "y", "z"]; + let expected_col_c = [10, 20, 30]; for (i, record) in records.iter().enumerate() { let row = record.row(); From ddc3dda6c51b5f9c9361ad96af0b6aed6678267f Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Sun, 28 Dec 2025 14:24:24 +0100 Subject: [PATCH 4/5] fix: verify record ordering in test_subscribe_batch sort records by offset and verify exact ordering. --- crates/fluss/tests/integration/table.rs | 28 +++++++++++++++++-------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 654f17a..c4525ae 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -446,22 +446,32 @@ mod table_test { .await .expect("Failed to poll"); - let records: Vec<_> = scan_records.into_iter().collect(); + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + assert_eq!( records.len(), 6, "Should have 6 records via subscribe_batch" ); - // Verify record contents - for record in records.iter() { + // Verify record contents and ordering + let expected_ids = [1, 2, 3, 4, 5, 6]; + let expected_values = ["a", "b", "c", "d", "e", "f"]; + + for (i, record) in records.iter().enumerate() { let row = record.row(); - let id = row.get_int(0); - let value = row.get_string(1); - assert!((1..=6).contains(&id), "id should be between 1 and 6"); - assert!( - ["a", "b", "c", "d", "e", "f"].contains(&value), - "value should be one of a-f" + assert_eq!( + row.get_int(0), + expected_ids[i], + "id mismatch at index {}", + i + ); + assert_eq!( + row.get_string(1), + expected_values[i], + "value mismatch at index {}", + i ); } From 2774867ccb6f1b4735f723fef6298b10bb2c03c6 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Fri, 2 Jan 2026 16:45:28 +0800 Subject: [PATCH 5/5] improve test --- bindings/cpp/src/lib.rs | 2 +- crates/fluss/src/client/table/scanner.rs | 6 +- crates/fluss/tests/integration/table.rs | 271 ++++++----------------- 3 files changed, 76 insertions(+), 203 deletions(-) diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index cd1803b..2d37763 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -625,7 +625,7 @@ impl LogScanner { bucket_offsets.insert(sub.bucket_id, sub.offset); } - let result = RUNTIME.block_on(async { self.inner.subscribe_batch(bucket_offsets).await }); + let result = RUNTIME.block_on(async { self.inner.subscribe_batch(&bucket_offsets).await }); match result { Ok(_) => ok_result(), diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index bf39839..0acaac8 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -223,7 +223,7 @@ impl LogScanner { Ok(()) } - pub async fn subscribe_batch(&self, bucket_offsets: HashMap) -> Result<()> { + pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -236,8 +236,8 @@ impl LogScanner { let mut scan_bucket_offsets = HashMap::new(); for (bucket_id, offset) in bucket_offsets { - let table_bucket = TableBucket::new(self.table_id, bucket_id); - scan_bucket_offsets.insert(table_bucket, offset); + let table_bucket = TableBucket::new(self.table_id, *bucket_id); + scan_bucket_offsets.insert(table_bucket, *offset); } self.log_scanner_status diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index c4525ae..0ac34c7 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -36,13 +36,16 @@ mod table_test { use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use crate::integration::utils::create_table; use arrow::array::record_batch; + use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::record::ScanRecord; use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; use jiff::Timestamp; use std::collections::HashMap; use std::sync::Arc; use std::thread; + use std::time::Duration; fn before_all() { // Create a new tokio runtime in a separate thread @@ -138,6 +141,11 @@ mod table_test { append_writer.flush().await.expect("Failed to flush"); + // Create scanner to verify appended records + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); let num_buckets = table.table_info().get_num_buckets(); let log_scanner = table .new_scan() @@ -150,84 +158,6 @@ mod table_test { .expect("Failed to subscribe"); } - let scan_records = log_scanner - .poll(std::time::Duration::from_secs(60)) - .await - .expect("Failed to poll"); - - let mut records: Vec<_> = scan_records.into_iter().collect(); - records.sort_by_key(|r| r.offset()); - - assert_eq!(records.len(), 6, "Should have 6 records"); - for (i, record) in records.iter().enumerate() { - let row = record.row(); - let expected_c1 = (i + 1) as i32; - let expected_c2 = format!("a{}", i + 1); - assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); - assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); - } - - let log_scanner_projected = table - .new_scan() - .project(&[1, 0]) - .expect("Failed to project") - .create_log_scanner() - .expect("Failed to create log scanner"); - for bucket_id in 0..num_buckets { - log_scanner_projected - .subscribe(bucket_id, 0) - .await - .expect("Failed to subscribe"); - } - - let scan_records_projected = log_scanner_projected - .poll(std::time::Duration::from_secs(10)) - .await - .expect("Failed to poll"); - - let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect(); - records_projected.sort_by_key(|r| r.offset()); - - assert_eq!( - records_projected.len(), - 6, - "Should have 6 records with projection" - ); - for (i, record) in records_projected.iter().enumerate() { - let row = record.row(); - let expected_c2 = format!("a{}", i + 1); - let expected_c1 = (i + 1) as i32; - assert_eq!( - row.get_string(0), - expected_c2, - "Projected c2 (first column) mismatch at index {}", - i - ); - assert_eq!( - row.get_int(1), - expected_c1, - "Projected c1 (second column) mismatch at index {}", - i - ); - } - - // Create scanner to verify appended records - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let table_scan = table.new_scan(); - let log_scanner = table_scan - .create_log_scanner() - .expect("Failed to create log scanner"); - - // Subscribe to bucket 0 starting from offset 0 - log_scanner - .subscribe(0, 0) - .await - .expect("Failed to subscribe to bucket"); - // Poll for records let scan_records = log_scanner .poll(tokio::time::Duration::from_secs(10)) @@ -385,19 +315,20 @@ mod table_test { } #[tokio::test] - async fn test_subscribe_batch() { + async fn test_project() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = TablePath::new("fluss".to_string(), "test_subscribe_batch".to_string()); + let table_path = TablePath::new("fluss".to_string(), "test_project".to_string()); let table_descriptor = TableDescriptor::builder() .schema( Schema::builder() - .column("id", DataTypes::int()) - .column("value", DataTypes::string()) + .column("col_a", DataTypes::int()) + .column("col_b", DataTypes::string()) + .column("col_c", DataTypes::int()) .build() .expect("Failed to build schema"), ) @@ -411,15 +342,16 @@ mod table_test { .await .expect("Failed to get table"); - // Append 6 records + // Append 3 records let append_writer = table .new_append() .expect("Failed to create append") .create_writer(); let batch = record_batch!( - ("id", Int32, [1, 2, 3, 4, 5, 6]), - ("value", Utf8, ["a", "b", "c", "d", "e", "f"]) + ("col_a", Int32, [1, 2, 3]), + ("col_b", Utf8, ["x", "y", "z"]), + ("col_c", Int32, [10, 20, 30]) ) .unwrap(); append_writer @@ -428,142 +360,54 @@ mod table_test { .expect("Failed to append batch"); append_writer.flush().await.expect("Failed to flush"); - // Test subscribe_batch with HashMap - let log_scanner = table - .new_scan() - .create_log_scanner() - .expect("Failed to create log scanner"); - - let mut bucket_offsets = HashMap::new(); - bucket_offsets.insert(0, 0i64); - log_scanner - .subscribe_batch(bucket_offsets) - .await - .expect("Failed to subscribe batch"); - - let scan_records = log_scanner - .poll(std::time::Duration::from_secs(60)) - .await - .expect("Failed to poll"); - - let mut records: Vec<_> = scan_records.into_iter().collect(); - records.sort_by_key(|r| r.offset()); + // Test project_by_name: select col_b and col_c only + let records = scan_table(&table, |scan| { + scan.project_by_name(&["col_b", "col_c"]) + .expect("Failed to project by name") + }) + .await; assert_eq!( records.len(), - 6, - "Should have 6 records via subscribe_batch" + 3, + "Should have 3 records with project_by_name" ); - // Verify record contents and ordering - let expected_ids = [1, 2, 3, 4, 5, 6]; - let expected_values = ["a", "b", "c", "d", "e", "f"]; + // Verify projected columns are in the correct order (col_b, col_c) + let expected_col_b = ["x", "y", "z"]; + let expected_col_c = [10, 20, 30]; for (i, record) in records.iter().enumerate() { let row = record.row(); + // col_b is now at index 0, col_c is at index 1 assert_eq!( - row.get_int(0), - expected_ids[i], - "id mismatch at index {}", + row.get_string(0), + expected_col_b[i], + "col_b mismatch at index {}", i ); assert_eq!( - row.get_string(1), - expected_values[i], - "value mismatch at index {}", + row.get_int(1), + expected_col_c[i], + "col_c mismatch at index {}", i ); } - // Test error case: empty HashMap should fail - let log_scanner_empty = table - .new_scan() - .create_log_scanner() - .expect("Failed to create log scanner"); - - let result = log_scanner_empty.subscribe_batch(HashMap::new()).await; - assert!( - result.is_err(), - "subscribe_batch with empty HashMap should fail" - ); - } - - #[tokio::test] - async fn test_project_by_name() { - let cluster = get_fluss_cluster(); - let connection = cluster.get_fluss_connection().await; - - let admin = connection.get_admin().await.expect("Failed to get admin"); - - let table_path = TablePath::new("fluss".to_string(), "test_project_by_name".to_string()); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("col_a", DataTypes::int()) - .column("col_b", DataTypes::string()) - .column("col_c", DataTypes::int()) - .build() - .expect("Failed to build schema"), - ) - .build() - .expect("Failed to build table"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - // Append 3 records - let append_writer = table - .new_append() - .expect("Failed to create append") - .create_writer(); - - let batch = record_batch!( - ("col_a", Int32, [1, 2, 3]), - ("col_b", Utf8, ["x", "y", "z"]), - ("col_c", Int32, [10, 20, 30]) - ) - .unwrap(); - append_writer - .append_arrow_batch(batch) - .await - .expect("Failed to append batch"); - append_writer.flush().await.expect("Failed to flush"); - - // Test project_by_name: select col_b and col_c only - let log_scanner = table - .new_scan() - .project_by_name(&["col_b", "col_c"]) - .expect("Failed to project by name") - .create_log_scanner() - .expect("Failed to create log scanner"); - - log_scanner - .subscribe(0, 0) - .await - .expect("Failed to subscribe"); - - let scan_records = log_scanner - .poll(std::time::Duration::from_secs(60)) - .await - .expect("Failed to poll"); - - let mut records: Vec<_> = scan_records.into_iter().collect(); - records.sort_by_key(|r| r.offset()); + // test project by column indices + let records = scan_table(&table, |scan| { + scan.project(&[1, 0]).expect("Failed to project by indices") + }) + .await; assert_eq!( records.len(), 3, "Should have 3 records with project_by_name" ); - - // Verify projected columns are in the correct order (col_b, col_c) + // Verify projected columns are in the correct order (col_b, col_a) let expected_col_b = ["x", "y", "z"]; - let expected_col_c = [10, 20, 30]; + let expected_col_a = [1, 2, 3]; for (i, record) in records.iter().enumerate() { let row = record.row(); @@ -576,7 +420,7 @@ mod table_test { ); assert_eq!( row.get_int(1), - expected_col_c[i], + expected_col_a[i], "col_c mismatch at index {}", i ); @@ -596,4 +440,33 @@ mod table_test { "project_by_name with non-existent column should fail" ); } + + async fn scan_table<'a>( + table: &FlussTable<'a>, + setup_scan: impl FnOnce(TableScan) -> TableScan, + ) -> Vec { + // 1. build log scanner + let log_scanner = setup_scan(table.new_scan()) + .create_log_scanner() + .expect("Failed to create log scanner"); + + // 2. subscribe + let mut bucket_offsets = HashMap::new(); + bucket_offsets.insert(0, 0); + log_scanner + .subscribe_batch(&bucket_offsets) + .await + .expect("Failed to subscribe"); + + // 3. poll records + let scan_records = log_scanner + .poll(Duration::from_secs(10)) + .await + .expect("Failed to poll"); + + // 4. collect and sort + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + records + } }