diff --git a/bindings/cpp/src/lib.rs b/bindings/cpp/src/lib.rs index cd1803b..2d37763 100644 --- a/bindings/cpp/src/lib.rs +++ b/bindings/cpp/src/lib.rs @@ -625,7 +625,7 @@ impl LogScanner { bucket_offsets.insert(sub.bucket_id, sub.offset); } - let result = RUNTIME.block_on(async { self.inner.subscribe_batch(bucket_offsets).await }); + let result = RUNTIME.block_on(async { self.inner.subscribe_batch(&bucket_offsets).await }); match result { Ok(_) => ok_result(), diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index bf39839..0acaac8 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -223,7 +223,7 @@ impl LogScanner { Ok(()) } - pub async fn subscribe_batch(&self, bucket_offsets: HashMap) -> Result<()> { + pub async fn subscribe_batch(&self, bucket_offsets: &HashMap) -> Result<()> { self.metadata .check_and_update_table_metadata(from_ref(&self.table_path)) .await?; @@ -236,8 +236,8 @@ impl LogScanner { let mut scan_bucket_offsets = HashMap::new(); for (bucket_id, offset) in bucket_offsets { - let table_bucket = TableBucket::new(self.table_id, bucket_id); - scan_bucket_offsets.insert(table_bucket, offset); + let table_bucket = TableBucket::new(self.table_id, *bucket_id); + scan_bucket_offsets.insert(table_bucket, *offset); } self.log_scanner_status diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 006adcc..0ac34c7 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -36,12 +36,16 @@ mod table_test { use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use crate::integration::utils::create_table; use arrow::array::record_batch; + use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; + use fluss::record::ScanRecord; use fluss::row::InternalRow; use fluss::rpc::message::OffsetSpec; use jiff::Timestamp; + use std::collections::HashMap; use std::sync::Arc; use std::thread; + use std::time::Duration; fn before_all() { // Create a new tokio runtime in a separate thread @@ -137,6 +141,11 @@ mod table_test { append_writer.flush().await.expect("Failed to flush"); + // Create scanner to verify appended records + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); let num_buckets = table.table_info().get_num_buckets(); let log_scanner = table .new_scan() @@ -149,84 +158,6 @@ mod table_test { .expect("Failed to subscribe"); } - let scan_records = log_scanner - .poll(std::time::Duration::from_secs(60)) - .await - .expect("Failed to poll"); - - let mut records: Vec<_> = scan_records.into_iter().collect(); - records.sort_by_key(|r| r.offset()); - - assert_eq!(records.len(), 6, "Should have 6 records"); - for (i, record) in records.iter().enumerate() { - let row = record.row(); - let expected_c1 = (i + 1) as i32; - let expected_c2 = format!("a{}", i + 1); - assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i); - assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i); - } - - let log_scanner_projected = table - .new_scan() - .project(&[1, 0]) - .expect("Failed to project") - .create_log_scanner() - .expect("Failed to create log scanner"); - for bucket_id in 0..num_buckets { - log_scanner_projected - .subscribe(bucket_id, 0) - .await - .expect("Failed to subscribe"); - } - - let scan_records_projected = log_scanner_projected - .poll(std::time::Duration::from_secs(10)) - .await - .expect("Failed to poll"); - - let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect(); - records_projected.sort_by_key(|r| r.offset()); - - assert_eq!( - records_projected.len(), - 6, - "Should have 6 records with projection" - ); - for (i, record) in records_projected.iter().enumerate() { - let row = record.row(); - let expected_c2 = format!("a{}", i + 1); - let expected_c1 = (i + 1) as i32; - assert_eq!( - row.get_string(0), - expected_c2, - "Projected c2 (first column) mismatch at index {}", - i - ); - assert_eq!( - row.get_int(1), - expected_c1, - "Projected c1 (second column) mismatch at index {}", - i - ); - } - - // Create scanner to verify appended records - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let table_scan = table.new_scan(); - let log_scanner = table_scan - .create_log_scanner() - .expect("Failed to create log scanner"); - - // Subscribe to bucket 0 starting from offset 0 - log_scanner - .subscribe(0, 0) - .await - .expect("Failed to subscribe to bucket"); - // Poll for records let scan_records = log_scanner .poll(tokio::time::Duration::from_secs(10)) @@ -382,4 +313,160 @@ mod table_test { "Timestamp after append should resolve to offset 0 (no newer records)" ); } + + #[tokio::test] + async fn test_project() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_project".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("col_a", DataTypes::int()) + .column("col_b", DataTypes::string()) + .column("col_c", DataTypes::int()) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Append 3 records + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + let batch = record_batch!( + ("col_a", Int32, [1, 2, 3]), + ("col_b", Utf8, ["x", "y", "z"]), + ("col_c", Int32, [10, 20, 30]) + ) + .unwrap(); + append_writer + .append_arrow_batch(batch) + .await + .expect("Failed to append batch"); + append_writer.flush().await.expect("Failed to flush"); + + // Test project_by_name: select col_b and col_c only + let records = scan_table(&table, |scan| { + scan.project_by_name(&["col_b", "col_c"]) + .expect("Failed to project by name") + }) + .await; + + assert_eq!( + records.len(), + 3, + "Should have 3 records with project_by_name" + ); + + // Verify projected columns are in the correct order (col_b, col_c) + let expected_col_b = ["x", "y", "z"]; + let expected_col_c = [10, 20, 30]; + + for (i, record) in records.iter().enumerate() { + let row = record.row(); + // col_b is now at index 0, col_c is at index 1 + assert_eq!( + row.get_string(0), + expected_col_b[i], + "col_b mismatch at index {}", + i + ); + assert_eq!( + row.get_int(1), + expected_col_c[i], + "col_c mismatch at index {}", + i + ); + } + + // test project by column indices + let records = scan_table(&table, |scan| { + scan.project(&[1, 0]).expect("Failed to project by indices") + }) + .await; + + assert_eq!( + records.len(), + 3, + "Should have 3 records with project_by_name" + ); + // Verify projected columns are in the correct order (col_b, col_a) + let expected_col_b = ["x", "y", "z"]; + let expected_col_a = [1, 2, 3]; + + for (i, record) in records.iter().enumerate() { + let row = record.row(); + // col_b is now at index 0, col_c is at index 1 + assert_eq!( + row.get_string(0), + expected_col_b[i], + "col_b mismatch at index {}", + i + ); + assert_eq!( + row.get_int(1), + expected_col_a[i], + "col_c mismatch at index {}", + i + ); + } + + // Test error case: empty column names should fail + let result = table.new_scan().project_by_name(&[]); + assert!( + result.is_err(), + "project_by_name with empty names should fail" + ); + + // Test error case: non-existent column should fail + let result = table.new_scan().project_by_name(&["nonexistent_column"]); + assert!( + result.is_err(), + "project_by_name with non-existent column should fail" + ); + } + + async fn scan_table<'a>( + table: &FlussTable<'a>, + setup_scan: impl FnOnce(TableScan) -> TableScan, + ) -> Vec { + // 1. build log scanner + let log_scanner = setup_scan(table.new_scan()) + .create_log_scanner() + .expect("Failed to create log scanner"); + + // 2. subscribe + let mut bucket_offsets = HashMap::new(); + bucket_offsets.insert(0, 0); + log_scanner + .subscribe_batch(&bucket_offsets) + .await + .expect("Failed to subscribe"); + + // 3. poll records + let scan_records = log_scanner + .poll(Duration::from_secs(10)) + .await + .expect("Failed to poll"); + + // 4. collect and sort + let mut records: Vec<_> = scan_records.into_iter().collect(); + records.sort_by_key(|r| r.offset()); + records + } }