Skip to content

Commit 670bb5e

Browse files
chore: add integration tests for subscribe_batch and project_by_name (#116)
--------- Co-authored-by: luoyuxia <luoyuxia@alumni.sjtu.edu.cn>
1 parent 0f43550 commit 670bb5e

File tree

3 files changed

+169
-82
lines changed

3 files changed

+169
-82
lines changed

bindings/cpp/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,7 @@ impl LogScanner {
625625
bucket_offsets.insert(sub.bucket_id, sub.offset);
626626
}
627627

628-
let result = RUNTIME.block_on(async { self.inner.subscribe_batch(bucket_offsets).await });
628+
let result = RUNTIME.block_on(async { self.inner.subscribe_batch(&bucket_offsets).await });
629629

630630
match result {
631631
Ok(_) => ok_result(),

crates/fluss/src/client/table/scanner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl LogScanner {
223223
Ok(())
224224
}
225225

226-
pub async fn subscribe_batch(&self, bucket_offsets: HashMap<i32, i64>) -> Result<()> {
226+
pub async fn subscribe_batch(&self, bucket_offsets: &HashMap<i32, i64>) -> Result<()> {
227227
self.metadata
228228
.check_and_update_table_metadata(from_ref(&self.table_path))
229229
.await?;
@@ -236,8 +236,8 @@ impl LogScanner {
236236

237237
let mut scan_bucket_offsets = HashMap::new();
238238
for (bucket_id, offset) in bucket_offsets {
239-
let table_bucket = TableBucket::new(self.table_id, bucket_id);
240-
scan_bucket_offsets.insert(table_bucket, offset);
239+
let table_bucket = TableBucket::new(self.table_id, *bucket_id);
240+
scan_bucket_offsets.insert(table_bucket, *offset);
241241
}
242242

243243
self.log_scanner_status

crates/fluss/tests/integration/table.rs

Lines changed: 165 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,16 @@ mod table_test {
3636
use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder};
3737
use crate::integration::utils::create_table;
3838
use arrow::array::record_batch;
39+
use fluss::client::{FlussTable, TableScan};
3940
use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath};
41+
use fluss::record::ScanRecord;
4042
use fluss::row::InternalRow;
4143
use fluss::rpc::message::OffsetSpec;
4244
use jiff::Timestamp;
45+
use std::collections::HashMap;
4346
use std::sync::Arc;
4447
use std::thread;
48+
use std::time::Duration;
4549

4650
fn before_all() {
4751
// Create a new tokio runtime in a separate thread
@@ -137,6 +141,11 @@ mod table_test {
137141

138142
append_writer.flush().await.expect("Failed to flush");
139143

144+
// Create scanner to verify appended records
145+
let table = connection
146+
.get_table(&table_path)
147+
.await
148+
.expect("Failed to get table");
140149
let num_buckets = table.table_info().get_num_buckets();
141150
let log_scanner = table
142151
.new_scan()
@@ -149,84 +158,6 @@ mod table_test {
149158
.expect("Failed to subscribe");
150159
}
151160

152-
let scan_records = log_scanner
153-
.poll(std::time::Duration::from_secs(60))
154-
.await
155-
.expect("Failed to poll");
156-
157-
let mut records: Vec<_> = scan_records.into_iter().collect();
158-
records.sort_by_key(|r| r.offset());
159-
160-
assert_eq!(records.len(), 6, "Should have 6 records");
161-
for (i, record) in records.iter().enumerate() {
162-
let row = record.row();
163-
let expected_c1 = (i + 1) as i32;
164-
let expected_c2 = format!("a{}", i + 1);
165-
assert_eq!(row.get_int(0), expected_c1, "c1 mismatch at index {}", i);
166-
assert_eq!(row.get_string(1), expected_c2, "c2 mismatch at index {}", i);
167-
}
168-
169-
let log_scanner_projected = table
170-
.new_scan()
171-
.project(&[1, 0])
172-
.expect("Failed to project")
173-
.create_log_scanner()
174-
.expect("Failed to create log scanner");
175-
for bucket_id in 0..num_buckets {
176-
log_scanner_projected
177-
.subscribe(bucket_id, 0)
178-
.await
179-
.expect("Failed to subscribe");
180-
}
181-
182-
let scan_records_projected = log_scanner_projected
183-
.poll(std::time::Duration::from_secs(10))
184-
.await
185-
.expect("Failed to poll");
186-
187-
let mut records_projected: Vec<_> = scan_records_projected.into_iter().collect();
188-
records_projected.sort_by_key(|r| r.offset());
189-
190-
assert_eq!(
191-
records_projected.len(),
192-
6,
193-
"Should have 6 records with projection"
194-
);
195-
for (i, record) in records_projected.iter().enumerate() {
196-
let row = record.row();
197-
let expected_c2 = format!("a{}", i + 1);
198-
let expected_c1 = (i + 1) as i32;
199-
assert_eq!(
200-
row.get_string(0),
201-
expected_c2,
202-
"Projected c2 (first column) mismatch at index {}",
203-
i
204-
);
205-
assert_eq!(
206-
row.get_int(1),
207-
expected_c1,
208-
"Projected c1 (second column) mismatch at index {}",
209-
i
210-
);
211-
}
212-
213-
// Create scanner to verify appended records
214-
let table = connection
215-
.get_table(&table_path)
216-
.await
217-
.expect("Failed to get table");
218-
219-
let table_scan = table.new_scan();
220-
let log_scanner = table_scan
221-
.create_log_scanner()
222-
.expect("Failed to create log scanner");
223-
224-
// Subscribe to bucket 0 starting from offset 0
225-
log_scanner
226-
.subscribe(0, 0)
227-
.await
228-
.expect("Failed to subscribe to bucket");
229-
230161
// Poll for records
231162
let scan_records = log_scanner
232163
.poll(tokio::time::Duration::from_secs(10))
@@ -382,4 +313,160 @@ mod table_test {
382313
"Timestamp after append should resolve to offset 0 (no newer records)"
383314
);
384315
}
316+
317+
#[tokio::test]
318+
async fn test_project() {
319+
let cluster = get_fluss_cluster();
320+
let connection = cluster.get_fluss_connection().await;
321+
322+
let admin = connection.get_admin().await.expect("Failed to get admin");
323+
324+
let table_path = TablePath::new("fluss".to_string(), "test_project".to_string());
325+
326+
let table_descriptor = TableDescriptor::builder()
327+
.schema(
328+
Schema::builder()
329+
.column("col_a", DataTypes::int())
330+
.column("col_b", DataTypes::string())
331+
.column("col_c", DataTypes::int())
332+
.build()
333+
.expect("Failed to build schema"),
334+
)
335+
.build()
336+
.expect("Failed to build table");
337+
338+
create_table(&admin, &table_path, &table_descriptor).await;
339+
340+
let table = connection
341+
.get_table(&table_path)
342+
.await
343+
.expect("Failed to get table");
344+
345+
// Append 3 records
346+
let append_writer = table
347+
.new_append()
348+
.expect("Failed to create append")
349+
.create_writer();
350+
351+
let batch = record_batch!(
352+
("col_a", Int32, [1, 2, 3]),
353+
("col_b", Utf8, ["x", "y", "z"]),
354+
("col_c", Int32, [10, 20, 30])
355+
)
356+
.unwrap();
357+
append_writer
358+
.append_arrow_batch(batch)
359+
.await
360+
.expect("Failed to append batch");
361+
append_writer.flush().await.expect("Failed to flush");
362+
363+
// Test project_by_name: select col_b and col_c only
364+
let records = scan_table(&table, |scan| {
365+
scan.project_by_name(&["col_b", "col_c"])
366+
.expect("Failed to project by name")
367+
})
368+
.await;
369+
370+
assert_eq!(
371+
records.len(),
372+
3,
373+
"Should have 3 records with project_by_name"
374+
);
375+
376+
// Verify projected columns are in the correct order (col_b, col_c)
377+
let expected_col_b = ["x", "y", "z"];
378+
let expected_col_c = [10, 20, 30];
379+
380+
for (i, record) in records.iter().enumerate() {
381+
let row = record.row();
382+
// col_b is now at index 0, col_c is at index 1
383+
assert_eq!(
384+
row.get_string(0),
385+
expected_col_b[i],
386+
"col_b mismatch at index {}",
387+
i
388+
);
389+
assert_eq!(
390+
row.get_int(1),
391+
expected_col_c[i],
392+
"col_c mismatch at index {}",
393+
i
394+
);
395+
}
396+
397+
// test project by column indices
398+
let records = scan_table(&table, |scan| {
399+
scan.project(&[1, 0]).expect("Failed to project by indices")
400+
})
401+
.await;
402+
403+
assert_eq!(
404+
records.len(),
405+
3,
406+
"Should have 3 records with project_by_name"
407+
);
408+
// Verify projected columns are in the correct order (col_b, col_a)
409+
let expected_col_b = ["x", "y", "z"];
410+
let expected_col_a = [1, 2, 3];
411+
412+
for (i, record) in records.iter().enumerate() {
413+
let row = record.row();
414+
// col_b is now at index 0, col_c is at index 1
415+
assert_eq!(
416+
row.get_string(0),
417+
expected_col_b[i],
418+
"col_b mismatch at index {}",
419+
i
420+
);
421+
assert_eq!(
422+
row.get_int(1),
423+
expected_col_a[i],
424+
"col_c mismatch at index {}",
425+
i
426+
);
427+
}
428+
429+
// Test error case: empty column names should fail
430+
let result = table.new_scan().project_by_name(&[]);
431+
assert!(
432+
result.is_err(),
433+
"project_by_name with empty names should fail"
434+
);
435+
436+
// Test error case: non-existent column should fail
437+
let result = table.new_scan().project_by_name(&["nonexistent_column"]);
438+
assert!(
439+
result.is_err(),
440+
"project_by_name with non-existent column should fail"
441+
);
442+
}
443+
444+
async fn scan_table<'a>(
445+
table: &FlussTable<'a>,
446+
setup_scan: impl FnOnce(TableScan) -> TableScan,
447+
) -> Vec<ScanRecord> {
448+
// 1. build log scanner
449+
let log_scanner = setup_scan(table.new_scan())
450+
.create_log_scanner()
451+
.expect("Failed to create log scanner");
452+
453+
// 2. subscribe
454+
let mut bucket_offsets = HashMap::new();
455+
bucket_offsets.insert(0, 0);
456+
log_scanner
457+
.subscribe_batch(&bucket_offsets)
458+
.await
459+
.expect("Failed to subscribe");
460+
461+
// 3. poll records
462+
let scan_records = log_scanner
463+
.poll(Duration::from_secs(10))
464+
.await
465+
.expect("Failed to poll");
466+
467+
// 4. collect and sort
468+
let mut records: Vec<_> = scan_records.into_iter().collect();
469+
records.sort_by_key(|r| r.offset());
470+
records
471+
}
385472
}

0 commit comments

Comments
 (0)