Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl LogScanner {
bucket_offsets.insert(sub.bucket_id, sub.offset);
}

let result = RUNTIME.block_on(async { self.inner.subscribe_batch(bucket_offsets).await });
let result = RUNTIME.block_on(async { self.inner.subscribe_batch(&bucket_offsets).await });

match result {
Ok(_) => ok_result(),
Expand Down
6 changes: 3 additions & 3 deletions crates/fluss/src/client/table/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl LogScanner {
Ok(())
}

pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> Result<()> {
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
Expand All @@ -236,8 +236,8 @@ impl LogScanner {

let mut scan_bucket_offsets = HashMap::new();
for (bucket_id, offset) in bucket_offsets {
let table_bucket = TableBucket::new(self.table_id, bucket_id);
scan_bucket_offsets.insert(table_bucket, offset);
let table_bucket = TableBucket::new(self.table_id, *bucket_id);
scan_bucket_offsets.insert(table_bucket, *offset);
}

self.log_scanner_status
Expand Down
243 changes: 165 additions & 78 deletions crates/fluss/tests/integration/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ mod table_test {
use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder};
use crate::integration::utils::create_table;
use arrow::array::record_batch;
use fluss::client::{FlussTable, TableScan};
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath};
use fluss::record::ScanRecord;
use fluss::row::InternalRow;
use fluss::rpc::message::OffsetSpec;
use jiff::Timestamp;
use std::collections::HashMap;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn before_all() {
// Create a new tokio runtime in a separate thread
Expand Down Expand Up @@ -137,6 +141,11 @@ mod table_test {

append_writer.flush().await.expect("Failed to flush");

// Create scanner to verify appended records
let table = connection
.get_table(&table_path)
.await
.expect("Failed to get table");
let num_buckets = table.table_info().get_num_buckets();
let log_scanner = table
.new_scan()
Expand All @@ -149,84 +158,6 @@ mod table_test {
.expect("Failed to subscribe");
}

let scan_records = log_scanner
.poll(std::time::Duration::from_secs(60))
.await
.expect("Failed to poll");

let mut records: Vec<_> = scan_records.into_iter().collect();
records.sort_by_key(|r| r.offset());

assert_eq!(records.len(), 6, "Should have 6 records");
for (i, record) in records.iter().enumerate() {
let row = record.row();
let expected_c1 = (i + 1) as i32;
let expected_c2 = format!("a{}", i + 1);
assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i);
assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i);
}

let log_scanner_projected = table
.new_scan()
.project(&[1, 0])
.expect("Failed to project")
.create_log_scanner()
.expect("Failed to create log scanner");
for bucket_id in 0..num_buckets {
log_scanner_projected
.subscribe(bucket_id, 0)
.await
.expect("Failed to subscribe");
}

let scan_records_projected = log_scanner_projected
.poll(std::time::Duration::from_secs(10))
.await
.expect("Failed to poll");

let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect();
records_projected.sort_by_key(|r| r.offset());

assert_eq!(
records_projected.len(),
6,
"Should have 6 records with projection"
);
for (i, record) in records_projected.iter().enumerate() {
let row = record.row();
let expected_c2 = format!("a{}", i + 1);
let expected_c1 = (i + 1) as i32;
assert_eq!(
row.get_string(0),
expected_c2,
"Projected c2 (first column) mismatch at index {}",
i
);
assert_eq!(
row.get_int(1),
expected_c1,
"Projected c1 (second column) mismatch at index {}",
i
);
}

// Create scanner to verify appended records
let table = connection
.get_table(&table_path)
.await
.expect("Failed to get table");

let table_scan = table.new_scan();
let log_scanner = table_scan
.create_log_scanner()
.expect("Failed to create log scanner");

// Subscribe to bucket 0 starting from offset 0
log_scanner
.subscribe(0, 0)
.await
.expect("Failed to subscribe to bucket");

// Poll for records
let scan_records = log_scanner
.poll(tokio::time::Duration::from_secs(10))
Expand Down Expand Up @@ -382,4 +313,160 @@ mod table_test {
"Timestamp after append should resolve to offset 0 (no newer records)"
);
}

#[tokio::test]
async fn test_project() {
let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");

let table_path = TablePath::new("fluss".to_string(), "test_project".to_string());

let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("col_a", DataTypes::int())
.column("col_b", DataTypes::string())
.column("col_c", DataTypes::int())
.build()
.expect("Failed to build schema"),
)
.build()
.expect("Failed to build table");

create_table(&admin, &table_path, &table_descriptor).await;

let table = connection
.get_table(&table_path)
.await
.expect("Failed to get table");

// Append 3 records
let append_writer = table
.new_append()
.expect("Failed to create append")
.create_writer();

let batch = record_batch!(
("col_a", Int32, [1, 2, 3]),
("col_b", Utf8, ["x", "y", "z"]),
("col_c", Int32, [10, 20, 30])
)
.unwrap();
append_writer
.append_arrow_batch(batch)
.await
.expect("Failed to append batch");
append_writer.flush().await.expect("Failed to flush");

// Test project_by_name: select col_b and col_c only
let records = scan_table(&table, |scan| {
scan.project_by_name(&["col_b", "col_c"])
.expect("Failed to project by name")
})
.await;

assert_eq!(
records.len(),
3,
"Should have 3 records with project_by_name"
);

// Verify projected columns are in the correct order (col_b, col_c)
let expected_col_b = ["x", "y", "z"];
let expected_col_c = [10, 20, 30];

for (i, record) in records.iter().enumerate() {
let row = record.row();
// col_b is now at index 0, col_c is at index 1
assert_eq!(
row.get_string(0),
expected_col_b[i],
"col_b mismatch at index {}",
i
);
assert_eq!(
row.get_int(1),
expected_col_c[i],
"col_c mismatch at index {}",
i
);
}

// test project by column indices
let records = scan_table(&table, |scan| {
scan.project(&[1, 0]).expect("Failed to project by indices")
})
.await;

assert_eq!(
records.len(),
3,
"Should have 3 records with project_by_name"
);
// Verify projected columns are in the correct order (col_b, col_a)
let expected_col_b = ["x", "y", "z"];
let expected_col_a = [1, 2, 3];

for (i, record) in records.iter().enumerate() {
let row = record.row();
// col_b is now at index 0, col_c is at index 1
assert_eq!(
row.get_string(0),
expected_col_b[i],
"col_b mismatch at index {}",
i
);
assert_eq!(
row.get_int(1),
expected_col_a[i],
"col_c mismatch at index {}",
i
);
}

// Test error case: empty column names should fail
let result = table.new_scan().project_by_name(&[]);
assert!(
result.is_err(),
"project_by_name with empty names should fail"
);

// Test error case: non-existent column should fail
let result = table.new_scan().project_by_name(&["nonexistent_column"]);
assert!(
result.is_err(),
"project_by_name with non-existent column should fail"
);
}

async fn scan_table<'a>(
table: &FlussTable<'a>,
setup_scan: impl FnOnce(TableScan) -> TableScan,
) -> Vec<ScanRecord> {
// 1. build log scanner
let log_scanner = setup_scan(table.new_scan())
.create_log_scanner()
.expect("Failed to create log scanner");

// 2. subscribe
let mut bucket_offsets = HashMap::new();
bucket_offsets.insert(0, 0);
log_scanner
.subscribe_batch(&bucket_offsets)
.await
.expect("Failed to subscribe");

// 3. poll records
let scan_records = log_scanner
.poll(Duration::from_secs(10))
.await
.expect("Failed to poll");

// 4. collect and sort
let mut records: Vec<_> = scan_records.into_iter().collect();
records.sort_by_key(|r| r.offset());
records
}
}
Loading