From ac5c49f707b755e05fdfc99ce5687f06f17af924 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 19 Aug 2025 19:22:05 +0000 Subject: [PATCH 1/4] load files --- src/moonlink/src/table_handler.rs | 7 +++++++ src/moonlink/src/table_notify.rs | 14 ++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/moonlink/src/table_handler.rs b/src/moonlink/src/table_handler.rs index e5b27875f..cda601f36 100644 --- a/src/moonlink/src/table_handler.rs +++ b/src/moonlink/src/table_handler.rs @@ -213,6 +213,13 @@ impl TableHandler { Self::process_cdc_table_event(event, &mut table, &mut table_handler_state) .await; } + // ============================== + // Bulk ingestion events + // ============================== + TableEvent::LoadFiles { files, lsn } => { + + } + // ============================== // Interactive blocking events // ============================== diff --git a/src/moonlink/src/table_notify.rs b/src/moonlink/src/table_notify.rs index f04f7d485..5762aa12b 100644 --- a/src/moonlink/src/table_notify.rs +++ b/src/moonlink/src/table_notify.rs @@ -103,6 +103,18 @@ pub enum TableEvent { /// Result for mem slice flush. flush_result: Option>, }, + + /// ============================== + /// Bulk ingestion events + /// ============================== + /// + LoadFiles { + /// Parquet files to directly load into mooncake table, without schema validation, index construction, etc. + files: Vec, + /// LSN for the bulk upload operation. + lsn: u64, + }, + /// ============================== /// Test events /// ============================== @@ -118,6 +130,7 @@ pub enum TableEvent { xact_id: u32, is_recovery: bool, }, + /// ============================== /// Interactive blocking events /// ============================== @@ -149,6 +162,7 @@ pub enum TableEvent { FinishInitialCopy { start_lsn: u64, }, + /// ============================== /// Table internal events /// ============================== From 143268ffa9242f39040267f1f47bf39184e18030 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 19 Aug 2025 22:27:03 +0000 Subject: [PATCH 2/4] initial implementation --- Cargo.lock | 1 + Cargo.toml | 1 + src/moonlink/Cargo.toml | 1 + src/moonlink/src/storage/mooncake_table.rs | 1 + .../storage/mooncake_table/batch_ingestion.rs | 59 +++++++++++++++++++ .../mooncake_table/transaction_stream.rs | 22 ++++++- src/moonlink/src/table_handler.rs | 4 +- src/moonlink/src/table_handler/test_utils.rs | 28 ++++++++- src/moonlink/src/table_handler/tests.rs | 11 ++++ src/moonlink/src/table_notify.rs | 2 +- src/moonlink_connectors/Cargo.toml | 2 +- 11 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 src/moonlink/src/storage/mooncake_table/batch_ingestion.rs diff --git a/Cargo.lock b/Cargo.lock index 1ee9b6580..da376c9b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4214,6 +4214,7 @@ dependencies = [ "base64 0.22.1", "bincode", "bitstream-io", + "bytes", "chrono", "crc32fast", "criterion", diff --git a/Cargo.toml b/Cargo.toml index f2c73c206..b57bffee7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-trait = "0.1" backon = { version = "1.5" } bincode = { version = "2", features = ["serde"] } bitstream-io = "4.5" +bytes = "1.0" chrono = { version = "0.4", default-features = false } clap = { version = "4", features = ["derive"] } crc32fast = "1" diff --git a/src/moonlink/Cargo.toml b/src/moonlink/Cargo.toml index 22ef99040..f41b78c76 100644 --- a/src/moonlink/Cargo.toml +++ b/src/moonlink/Cargo.toml @@ -47,6 +47,7 @@ backon = { workspace = true } base64 = { version = "0.22", optional = true } bincode = { workspace = true } bitstream-io = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } crc32fast = { workspace = true } fastbloom = { workspace = true } diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 519445153..6286186c2 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -1,4 +1,5 @@ mod batch_id_counter; +mod batch_ingestion; mod data_batches; pub(crate) mod delete_vector; mod disk_slice; diff --git a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs new file mode 100644 index 000000000..aed6f2381 --- /dev/null +++ b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs @@ -0,0 +1,59 @@ +use crate::{ + create_data_file, storage::mooncake_table::transaction_stream::TransactionStreamCommit, +}; + +use super::*; + +use bytes::Bytes; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + +impl MooncakeTable { + /// Batch ingestion the given [`parquet_files`] into mooncake table. + /// + /// TODO(hjiang): + /// 1. Record table events. + /// 2. It involves IO operations, should be placed at background thread.s + pub(crate) async fn batch_ingest(&mut self, parquet_files: Vec, lsn: u64) { + let mut disk_files = hashbrown::HashMap::with_capacity(parquet_files.len()); + + // TODO(hjiang): Parallel IO and error handling. + // Construct disk file from the given disk files. + for cur_file in parquet_files.into_iter() { + let content = tokio::fs::read(&cur_file).await.unwrap(); + let content = Bytes::from(content); + let file_size = content.len(); + let builder = ParquetRecordBatchReaderBuilder::try_new(content).unwrap(); + let num_rows: usize = builder.metadata().file_metadata().num_rows() as usize; + + let cur_file_id = self.next_file_id; + self.next_file_id += 1; + let mooncake_data_file = create_data_file(cur_file_id as u64, cur_file); + let disk_file_entry = DiskFileEntry { + cache_handle: None, + num_rows, + file_size, + batch_deletion_vector: BatchDeletionVector::new(num_rows), + puffin_deletion_blob: None, + }; + + assert!(disk_files + .insert(mooncake_data_file, disk_file_entry) + .is_none()); + } + + // let mut stream_state = TransactionStreamState::new( + // self.metadata.schema.clone(), + // self.metadata.config.batch_size, + // self.metadata.identity.clone(), + // Arc::clone(&self.streaming_batch_id_counter), + // ); + // stream_state.flushed_files = disk_files; + // stream_state.commit_lsn = Some(lsn); + + // Commit the current crafted streaming transaction. + let commit = TransactionStreamCommit::from_disk_files(disk_files, lsn); + self.next_snapshot_task + .new_streaming_xact + .push(TransactionStreamOutput::Commit(commit)); + } +} diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index 2ea6a131c..5c197f300 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -28,7 +28,7 @@ pub(crate) struct TransactionStreamState { /// Only safe to remove transaction stream state when there are no pending flushes. ongoing_flush_count: u32, /// Commit LSN for this transaction, set when transaction is committed. - commit_lsn: Option, + pub(crate) commit_lsn: Option, } /// Determines the state of a transaction stream. @@ -65,6 +65,24 @@ pub struct TransactionStreamCommit { } impl TransactionStreamCommit { + /// Create a stream commit from disk files. + pub(crate) fn from_disk_files( + disk_files: hashbrown::HashMap, + lsn: u64, + ) -> Self { + Self { + xact_id: 0, // Unused. + commit_lsn: lsn, + flushed_file_index: MooncakeIndex { + in_memory_index: HashSet::new(), + file_indices: Vec::new(), + }, + flushed_files: disk_files, + local_deletions: Vec::new(), + pending_deletions: Vec::new(), + } + } + /// Get flushed data files for the current streaming commit. pub(crate) fn get_flushed_data_files(&self) -> Vec { self.flushed_files.keys().cloned().collect::>() @@ -91,7 +109,7 @@ impl TransactionStreamCommit { } impl TransactionStreamState { - fn new( + pub(crate) fn new( schema: Arc, batch_size: usize, identity: IdentityProp, diff --git a/src/moonlink/src/table_handler.rs b/src/moonlink/src/table_handler.rs index cda601f36..7f6fff4e2 100644 --- a/src/moonlink/src/table_handler.rs +++ b/src/moonlink/src/table_handler.rs @@ -217,9 +217,9 @@ impl TableHandler { // Bulk ingestion events // ============================== TableEvent::LoadFiles { files, lsn } => { - + table.batch_ingest(files, lsn).await; } - + // ============================== // Interactive blocking events // ============================== diff --git a/src/moonlink/src/table_handler/test_utils.rs b/src/moonlink/src/table_handler/test_utils.rs index 9b22b960b..603e22fd7 100644 --- a/src/moonlink/src/table_handler/test_utils.rs +++ b/src/moonlink/src/table_handler/test_utils.rs @@ -22,12 +22,13 @@ use crate::{ WalConfig, WalTransactionState, }; -use arrow_array::RecordBatch; +use arrow_array::{Int32Array, RecordBatch, StringArray}; use futures::StreamExt; use iceberg::io::FileIOBuilder; use iceberg::io::FileRead; use more_asserts as ma; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::AsyncArrowWriter; use std::sync::Arc; use tempfile::{tempdir, TempDir}; use tokio::sync::{mpsc, watch}; @@ -337,6 +338,10 @@ impl TestEnvironment { .await; } + pub async fn bulk_upload_files(&self, files: Vec, lsn: u64) { + self.send_event(TableEvent::LoadFiles { files, lsn }).await; + } + // --- LSN and Verification Helpers --- /// Sets both table commit and replication LSN to the same value. @@ -613,3 +618,24 @@ pub(crate) async fn load_one_arrow_batch(filepath: &str) -> RecordBatch { .unwrap() .expect("Should have one batch") } + +/// Test util function to generate a parquet under the given [`tempdir`]. +pub(crate) async fn generate_parquet_file(tempdir: &TempDir) -> String { + let schema = create_test_arrow_schema(); + let ids = Int32Array::from(vec![1, 2, 3]); + let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + let ages = Int32Array::from(vec![10, 20, 30]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(names), Arc::new(ages)], + ) + .unwrap(); + let file_path = tempdir.path().join("test.parquet"); + let file_path_str = file_path.to_str().unwrap().to_string(); + let file = tokio::fs::File::create(file_path).await.unwrap(); + let mut writer: AsyncArrowWriter = + AsyncArrowWriter::try_new(file, schema, /*props=*/ None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + file_path_str +} diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index b650c2004..c72d9d0ac 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -2114,3 +2114,14 @@ async fn test_commit_flush_streaming_transaction_with_deletes() { env.shutdown().await; } + +/// Testing scenario: batch upload parquet files into mooncake table. +#[tokio::test] +async fn test_batch_ingestion() { + let env = TestEnvironment::default().await; + let disk_file = generate_parquet_file(&env.temp_dir).await; + env.bulk_upload_files(vec![disk_file], /*lsn=*/ 10).await; + + env.set_readable_lsn(10); + env.verify_snapshot(101, &[1, 2, 3]).await; +} diff --git a/src/moonlink/src/table_notify.rs b/src/moonlink/src/table_notify.rs index 5762aa12b..1a58ee35a 100644 --- a/src/moonlink/src/table_notify.rs +++ b/src/moonlink/src/table_notify.rs @@ -107,7 +107,7 @@ pub enum TableEvent { /// ============================== /// Bulk ingestion events /// ============================== - /// + /// LoadFiles { /// Parquet files to directly load into mooncake table, without schema validation, index construction, etc. files: Vec, diff --git a/src/moonlink_connectors/Cargo.toml b/src/moonlink_connectors/Cargo.toml index 6b168068e..9cfa5005a 100644 --- a/src/moonlink_connectors/Cargo.toml +++ b/src/moonlink_connectors/Cargo.toml @@ -17,7 +17,7 @@ arrow-schema = { workspace = true } async-trait = { workspace = true } bigdecimal = { version = "0.4", default-features = false, features = ["std"] } byteorder = "1.5" -bytes = "1.0" +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = "0.10" futures = { workspace = true } From b3d8cbfce5cccf7983b41d42783cd556d993d39a Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 19 Aug 2025 22:36:00 +0000 Subject: [PATCH 3/4] WIP --- src/moonlink/src/storage/mooncake_table.rs | 2 ++ .../src/storage/mooncake_table/batch_ingestion.rs | 2 ++ src/moonlink/src/storage/mooncake_table/snapshot.rs | 10 ++++++++++ .../src/storage/mooncake_table/snapshot_validation.rs | 2 ++ 4 files changed, 16 insertions(+) diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 6286186c2..979fb8079 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -325,6 +325,8 @@ impl SnapshotTask { // If mooncake has new transaction commits. (self.commit_lsn_baseline > 0 && self.commit_lsn_baseline != self.prev_commit_lsn_baseline) || self.force_empty_iceberg_payload + // If mooncake table has completed streaming transactions. + || !self.new_streaming_xact.is_empty() // If mooncake table accumulated large enough writes. || !self.new_disk_slices.is_empty() || self.new_deletions.len() diff --git a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs index aed6f2381..8b5a50009 100644 --- a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs +++ b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs @@ -55,5 +55,7 @@ impl MooncakeTable { self.next_snapshot_task .new_streaming_xact .push(TransactionStreamOutput::Commit(commit)); + self.next_snapshot_task.new_flush_lsn = Some(lsn); + self.next_snapshot_task.commit_lsn_baseline = lsn; } } diff --git a/src/moonlink/src/storage/mooncake_table/snapshot.rs b/src/moonlink/src/storage/mooncake_table/snapshot.rs index 5eed577a2..ddd76d519 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot.rs @@ -484,6 +484,9 @@ impl SnapshotTableState { mut task: SnapshotTask, opt: SnapshotOption, ) -> MooncakeSnapshotOutput { + + println!("update snapshot, streaming state # ? {}", task.new_streaming_xact.len()); + // Validate event id is assigned. assert!(opt.id.is_some()); // Validate mooncake table operation invariants. @@ -568,6 +571,8 @@ impl SnapshotTableState { self.last_commit = cp; } + println!("new flush lsn = {:?}, commit lsn = {:?}", self.current_snapshot.flush_lsn, self.current_snapshot.snapshot_version); + // Till this point, committed changes have been reflected to current snapshot; sync the latest change to iceberg. // To reduce iceberg persistence overhead, there're certain cases an iceberg snapshot will be triggered: // (1) there're persisted data files @@ -603,6 +608,9 @@ impl SnapshotTableState { && (force_empty_iceberg_payload || flush_by_table_write) && flush_lsn < task.min_ongoing_flush_lsn { + + println!("create iceberg snapshot"); + // Getting persistable committed deletion logs is not cheap, which requires iterating through all logs, // so we only aggregate when there's committed deletion. let committed_deletion_logs = self.aggregate_committed_deletion_logs(flush_lsn); @@ -616,6 +624,8 @@ impl SnapshotTableState { iceberg_snapshot_payload = Some(self.get_iceberg_snapshot_payload(flush_lsn, committed_deletion_logs)); } + } else { + println!("no create snapshot"); } // Validate disk files count is as expected. diff --git a/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs b/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs index 14a9bbf0e..e3adf9521 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs @@ -71,6 +71,8 @@ impl SnapshotTableState { /// Util function to validate data files and file indices match each other. #[cfg(any(test, debug_assertions))] fn assert_data_files_and_file_indices_match(&self) { + if self.mooncake_table_metadata.config.appen + let mut all_data_files_1 = HashSet::new(); let mut all_data_files_2 = HashSet::new(); for (cur_data_file, _) in self.current_snapshot.disk_files.iter() { From 3fc440877209fc6a298399fced27a5d49fa104c3 Mon Sep 17 00:00:00 2001 From: dentiny Date: Tue, 19 Aug 2025 22:40:08 +0000 Subject: [PATCH 4/4] unit test --- .../src/storage/mooncake_table/snapshot.rs | 10 ---------- src/moonlink/src/table_handler/tests.rs | 18 ++++++++++++++++-- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/moonlink/src/storage/mooncake_table/snapshot.rs b/src/moonlink/src/storage/mooncake_table/snapshot.rs index ddd76d519..5eed577a2 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot.rs @@ -484,9 +484,6 @@ impl SnapshotTableState { mut task: SnapshotTask, opt: SnapshotOption, ) -> MooncakeSnapshotOutput { - - println!("update snapshot, streaming state # ? {}", task.new_streaming_xact.len()); - // Validate event id is assigned. assert!(opt.id.is_some()); // Validate mooncake table operation invariants. @@ -571,8 +568,6 @@ impl SnapshotTableState { self.last_commit = cp; } - println!("new flush lsn = {:?}, commit lsn = {:?}", self.current_snapshot.flush_lsn, self.current_snapshot.snapshot_version); - // Till this point, committed changes have been reflected to current snapshot; sync the latest change to iceberg. // To reduce iceberg persistence overhead, there're certain cases an iceberg snapshot will be triggered: // (1) there're persisted data files @@ -608,9 +603,6 @@ impl SnapshotTableState { && (force_empty_iceberg_payload || flush_by_table_write) && flush_lsn < task.min_ongoing_flush_lsn { - - println!("create iceberg snapshot"); - // Getting persistable committed deletion logs is not cheap, which requires iterating through all logs, // so we only aggregate when there's committed deletion. let committed_deletion_logs = self.aggregate_committed_deletion_logs(flush_lsn); @@ -624,8 +616,6 @@ impl SnapshotTableState { iceberg_snapshot_payload = Some(self.get_iceberg_snapshot_payload(flush_lsn, committed_deletion_logs)); } - } else { - println!("no create snapshot"); } // Validate disk files count is as expected. diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index 50ded525f..a1fbd749c 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -2349,10 +2349,24 @@ async fn test_append_only_table_basic() { /// Testing scenario: batch upload parquet files into mooncake table. #[tokio::test] async fn test_batch_ingestion() { - let env = TestEnvironment::default().await; + let temp_dir = tempdir().unwrap(); + let mooncake_table_config = MooncakeTableConfig { + append_only: true, // Enable append-only mode + batch_size: 2, + mem_slice_size: 1000, + snapshot_deletion_record_count: 1000, + temp_files_directory: temp_dir.path().to_str().unwrap().to_string(), + disk_slice_writer_config: DiskSliceWriterConfig::default(), + data_compaction_config: DataCompactionConfig::default(), + file_index_config: FileIndexMergeConfig::default(), + persistence_config: IcebergPersistenceConfig::default(), + }; + let env = TestEnvironment::new(temp_dir, mooncake_table_config).await; + let disk_file = generate_parquet_file(&env.temp_dir).await; env.bulk_upload_files(vec![disk_file], /*lsn=*/ 10).await; + // Validate bulk ingestion result. env.set_readable_lsn(10); - env.verify_snapshot(101, &[1, 2, 3]).await; + env.verify_snapshot(10, &[1, 2, 3]).await; }