diff --git a/Cargo.lock b/Cargo.lock index 0ecac209e..4e281cb42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4217,6 +4217,7 @@ dependencies = [ "base64 0.22.1", "bincode", "bitstream-io", + "bytes", "chrono", "crc32fast", "criterion", diff --git a/Cargo.toml b/Cargo.toml index f09eeddc1..a0b849901 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ async-trait = "0.1" backon = { version = "1.5" } bincode = { version = "2", features = ["serde"] } bitstream-io = "4.5" +bytes = "1.0" chrono = { version = "0.4", default-features = false } clap = { version = "4", features = ["derive"] } crc32fast = "1" diff --git a/src/moonlink/Cargo.toml b/src/moonlink/Cargo.toml index 22ef99040..f41b78c76 100644 --- a/src/moonlink/Cargo.toml +++ b/src/moonlink/Cargo.toml @@ -47,6 +47,7 @@ backon = { workspace = true } base64 = { version = "0.22", optional = true } bincode = { workspace = true } bitstream-io = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true } crc32fast = { workspace = true } fastbloom = { workspace = true } diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 439182566..959b763bb 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -1,4 +1,5 @@ mod batch_id_counter; +mod batch_ingestion; mod data_batches; pub(crate) mod delete_vector; mod disk_slice; @@ -324,6 +325,8 @@ impl SnapshotTask { // If mooncake has new transaction commits. (self.commit_lsn_baseline > 0 && self.commit_lsn_baseline != self.prev_commit_lsn_baseline) || self.force_empty_iceberg_payload + // If mooncake table has completed streaming transactions. + || !self.new_streaming_xact.is_empty() // If mooncake table accumulated large enough writes. || !self.new_disk_slices.is_empty() || self.new_deletions.len() diff --git a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs new file mode 100644 index 000000000..8b5a50009 --- /dev/null +++ b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs @@ -0,0 +1,61 @@ +use crate::{ + create_data_file, storage::mooncake_table::transaction_stream::TransactionStreamCommit, +}; + +use super::*; + +use bytes::Bytes; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + +impl MooncakeTable { + /// Batch ingestion the given [`parquet_files`] into mooncake table. + /// + /// TODO(hjiang): + /// 1. Record table events. + /// 2. It involves IO operations, should be placed at background thread.s + pub(crate) async fn batch_ingest(&mut self, parquet_files: Vec, lsn: u64) { + let mut disk_files = hashbrown::HashMap::with_capacity(parquet_files.len()); + + // TODO(hjiang): Parallel IO and error handling. + // Construct disk file from the given disk files. + for cur_file in parquet_files.into_iter() { + let content = tokio::fs::read(&cur_file).await.unwrap(); + let content = Bytes::from(content); + let file_size = content.len(); + let builder = ParquetRecordBatchReaderBuilder::try_new(content).unwrap(); + let num_rows: usize = builder.metadata().file_metadata().num_rows() as usize; + + let cur_file_id = self.next_file_id; + self.next_file_id += 1; + let mooncake_data_file = create_data_file(cur_file_id as u64, cur_file); + let disk_file_entry = DiskFileEntry { + cache_handle: None, + num_rows, + file_size, + batch_deletion_vector: BatchDeletionVector::new(num_rows), + puffin_deletion_blob: None, + }; + + assert!(disk_files + .insert(mooncake_data_file, disk_file_entry) + .is_none()); + } + + // let mut stream_state = TransactionStreamState::new( + // self.metadata.schema.clone(), + // self.metadata.config.batch_size, + // self.metadata.identity.clone(), + // Arc::clone(&self.streaming_batch_id_counter), + // ); + // stream_state.flushed_files = disk_files; + // stream_state.commit_lsn = Some(lsn); + + // Commit the current crafted streaming transaction. + let commit = TransactionStreamCommit::from_disk_files(disk_files, lsn); + self.next_snapshot_task + .new_streaming_xact + .push(TransactionStreamOutput::Commit(commit)); + self.next_snapshot_task.new_flush_lsn = Some(lsn); + self.next_snapshot_task.commit_lsn_baseline = lsn; + } +} diff --git a/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs b/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs index 66404a468..9ef96d4a4 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot_validation.rs @@ -72,7 +72,7 @@ impl SnapshotTableState { /// Util function to validate data files and file indices match each other. #[cfg(any(test, debug_assertions))] fn assert_data_files_and_file_indices_match(&self) { - // Skip validation for append-only tables since they don't have file indices + // Skip validation for append-only tables since they don't have file indices. if matches!(self.mooncake_table_metadata.identity, IdentityProp::None) { return; } diff --git a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs index f5c4ddd00..c84ae0219 100644 --- a/src/moonlink/src/storage/mooncake_table/transaction_stream.rs +++ b/src/moonlink/src/storage/mooncake_table/transaction_stream.rs @@ -28,7 +28,7 @@ pub(crate) struct TransactionStreamState { /// Only safe to remove transaction stream state when there are no pending flushes. ongoing_flush_count: u32, /// Commit LSN for this transaction, set when transaction is committed. - commit_lsn: Option, + pub(crate) commit_lsn: Option, } /// Determines the state of a transaction stream. @@ -65,6 +65,24 @@ pub struct TransactionStreamCommit { } impl TransactionStreamCommit { + /// Create a stream commit from disk files. + pub(crate) fn from_disk_files( + disk_files: hashbrown::HashMap, + lsn: u64, + ) -> Self { + Self { + xact_id: 0, // Unused. + commit_lsn: lsn, + flushed_file_index: MooncakeIndex { + in_memory_index: HashSet::new(), + file_indices: Vec::new(), + }, + flushed_files: disk_files, + local_deletions: Vec::new(), + pending_deletions: Vec::new(), + } + } + /// Get flushed data files for the current streaming commit. pub(crate) fn get_flushed_data_files(&self) -> Vec { self.flushed_files.keys().cloned().collect::>() @@ -91,7 +109,7 @@ impl TransactionStreamCommit { } impl TransactionStreamState { - fn new( + pub(crate) fn new( schema: Arc, batch_size: usize, identity: IdentityProp, diff --git a/src/moonlink/src/table_handler.rs b/src/moonlink/src/table_handler.rs index e5b27875f..7f6fff4e2 100644 --- a/src/moonlink/src/table_handler.rs +++ b/src/moonlink/src/table_handler.rs @@ -213,6 +213,13 @@ impl TableHandler { Self::process_cdc_table_event(event, &mut table, &mut table_handler_state) .await; } + // ============================== + // Bulk ingestion events + // ============================== + TableEvent::LoadFiles { files, lsn } => { + table.batch_ingest(files, lsn).await; + } + // ============================== // Interactive blocking events // ============================== diff --git a/src/moonlink/src/table_handler/test_utils.rs b/src/moonlink/src/table_handler/test_utils.rs index 3d55930cc..c64001bc3 100644 --- a/src/moonlink/src/table_handler/test_utils.rs +++ b/src/moonlink/src/table_handler/test_utils.rs @@ -22,12 +22,13 @@ use crate::{ WalConfig, WalTransactionState, }; -use arrow_array::RecordBatch; +use arrow_array::{Int32Array, RecordBatch, StringArray}; use futures::StreamExt; use iceberg::io::FileIOBuilder; use iceberg::io::FileRead; use more_asserts as ma; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use parquet::arrow::AsyncArrowWriter; use std::sync::Arc; use tempfile::{tempdir, TempDir}; use tokio::sync::{mpsc, watch}; @@ -353,6 +354,10 @@ impl TestEnvironment { .await; } + pub async fn bulk_upload_files(&self, files: Vec, lsn: u64) { + self.send_event(TableEvent::LoadFiles { files, lsn }).await; + } + // --- LSN and Verification Helpers --- /// Sets both table commit and replication LSN to the same value. @@ -629,3 +634,24 @@ pub(crate) async fn load_one_arrow_batch(filepath: &str) -> RecordBatch { .unwrap() .expect("Should have one batch") } + +/// Test util function to generate a parquet under the given [`tempdir`]. +pub(crate) async fn generate_parquet_file(tempdir: &TempDir) -> String { + let schema = create_test_arrow_schema(); + let ids = Int32Array::from(vec![1, 2, 3]); + let names = StringArray::from(vec!["Alice", "Bob", "Charlie"]); + let ages = Int32Array::from(vec![10, 20, 30]); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(ids), Arc::new(names), Arc::new(ages)], + ) + .unwrap(); + let file_path = tempdir.path().join("test.parquet"); + let file_path_str = file_path.to_str().unwrap().to_string(); + let file = tokio::fs::File::create(file_path).await.unwrap(); + let mut writer: AsyncArrowWriter = + AsyncArrowWriter::try_new(file, schema, /*props=*/ None).unwrap(); + writer.write(&batch).await.unwrap(); + writer.close().await.unwrap(); + file_path_str +} diff --git a/src/moonlink/src/table_handler/tests.rs b/src/moonlink/src/table_handler/tests.rs index 870854693..a1fbd749c 100644 --- a/src/moonlink/src/table_handler/tests.rs +++ b/src/moonlink/src/table_handler/tests.rs @@ -2345,3 +2345,28 @@ async fn test_append_only_table_basic() { env.shutdown().await; println!("Basic append-only table test passed!"); } + +/// Testing scenario: batch upload parquet files into mooncake table. +#[tokio::test] +async fn test_batch_ingestion() { + let temp_dir = tempdir().unwrap(); + let mooncake_table_config = MooncakeTableConfig { + append_only: true, // Enable append-only mode + batch_size: 2, + mem_slice_size: 1000, + snapshot_deletion_record_count: 1000, + temp_files_directory: temp_dir.path().to_str().unwrap().to_string(), + disk_slice_writer_config: DiskSliceWriterConfig::default(), + data_compaction_config: DataCompactionConfig::default(), + file_index_config: FileIndexMergeConfig::default(), + persistence_config: IcebergPersistenceConfig::default(), + }; + let env = TestEnvironment::new(temp_dir, mooncake_table_config).await; + + let disk_file = generate_parquet_file(&env.temp_dir).await; + env.bulk_upload_files(vec![disk_file], /*lsn=*/ 10).await; + + // Validate bulk ingestion result. + env.set_readable_lsn(10); + env.verify_snapshot(10, &[1, 2, 3]).await; +} diff --git a/src/moonlink/src/table_notify.rs b/src/moonlink/src/table_notify.rs index f04f7d485..1a58ee35a 100644 --- a/src/moonlink/src/table_notify.rs +++ b/src/moonlink/src/table_notify.rs @@ -103,6 +103,18 @@ pub enum TableEvent { /// Result for mem slice flush. flush_result: Option>, }, + + /// ============================== + /// Bulk ingestion events + /// ============================== + /// + LoadFiles { + /// Parquet files to directly load into mooncake table, without schema validation, index construction, etc. + files: Vec, + /// LSN for the bulk upload operation. + lsn: u64, + }, + /// ============================== /// Test events /// ============================== @@ -118,6 +130,7 @@ pub enum TableEvent { xact_id: u32, is_recovery: bool, }, + /// ============================== /// Interactive blocking events /// ============================== @@ -149,6 +162,7 @@ pub enum TableEvent { FinishInitialCopy { start_lsn: u64, }, + /// ============================== /// Table internal events /// ============================== diff --git a/src/moonlink_connectors/Cargo.toml b/src/moonlink_connectors/Cargo.toml index 6b168068e..9cfa5005a 100644 --- a/src/moonlink_connectors/Cargo.toml +++ b/src/moonlink_connectors/Cargo.toml @@ -17,7 +17,7 @@ arrow-schema = { workspace = true } async-trait = { workspace = true } bigdecimal = { version = "0.4", default-features = false, features = ["std"] } byteorder = "1.5" -bytes = "1.0" +bytes = { workspace = true } chrono = { workspace = true } chrono-tz = "0.10" futures = { workspace = true }