From 42a1c662322e7801f974295991dcf6d6fba9b7fc Mon Sep 17 00:00:00 2001 From: dentiny Date: Fri, 29 Aug 2025 09:41:29 +0000 Subject: [PATCH] repro --- .../src/storage/compaction/compactor.rs | 20 +++ src/moonlink/src/storage/iceberg/tests.rs | 153 ++++++++++++++++++ .../src/storage/mooncake_table/snapshot.rs | 11 ++ .../table_operation_test_utils.rs | 17 +- 4 files changed, 200 insertions(+), 1 deletion(-) diff --git a/src/moonlink/src/storage/compaction/compactor.rs b/src/moonlink/src/storage/compaction/compactor.rs index b69a6db88..c2b614f29 100644 --- a/src/moonlink/src/storage/compaction/compactor.rs +++ b/src/moonlink/src/storage/compaction/compactor.rs @@ -218,13 +218,33 @@ impl CompactionBuilder { .unwrap() }; + if data_file_to_compact.file_id.file_id.0 == 10000000000014100 { + println!("10000000000014100 row num = {}, puffin deleted row number = {}", total_num_rows, deleted_rows_num); + } + if data_file_to_compact.file_id.file_id.0 == 10000000000014300 { + println!("10000000000014300 row num = {}, puffin deleted row number = {}", total_num_rows, deleted_rows_num); + } + let mut old_start_row_idx = 0; let mut old_to_new_remap = HashMap::new(); while let Some(cur_record_batch) = reader.try_next().await? { + // If all rows have been deleted for the old data file, do nothing. let cur_num_rows = cur_record_batch.num_rows(); let filtered_record_batch = get_filtered_record_batch(cur_record_batch, old_start_row_idx); + + if data_file_to_compact.file_id.file_id.0 == 10000000000014100 { + println!("10000000000014100 filtered record batch = {:?}", filtered_record_batch); + } + if data_file_to_compact.file_id.file_id.0 == 10000000000014300 { + println!("10000000000014300 filtered record batch = {:?}", filtered_record_batch); + } + if data_file_to_compact.file_id.file_id.0 == 10000000000059500 { + println!("10000000000059500 filtered record batch = {:?}", filtered_record_batch); + } + + if filtered_record_batch.num_rows() == 0 { old_start_row_idx += cur_num_rows; continue; diff --git a/src/moonlink/src/storage/iceberg/tests.rs b/src/moonlink/src/storage/iceberg/tests.rs index 68139c73a..6b65d4d4b 100644 --- a/src/moonlink/src/storage/iceberg/tests.rs +++ b/src/moonlink/src/storage/iceberg/tests.rs @@ -27,7 +27,9 @@ use crate::storage::mooncake_table::table_operation_test_utils::*; use crate::storage::mooncake_table::test_utils_commons::ICEBERG_TEST_NAMESPACE; use crate::storage::mooncake_table::test_utils_commons::ICEBERG_TEST_TABLE; use crate::storage::mooncake_table::validation_test_utils::*; +use crate::storage::mooncake_table::DataCompactionResult; use crate::storage::mooncake_table::IcebergSnapshotPayload; +use crate::storage::mooncake_table::IcebergSnapshotResult; use crate::storage::mooncake_table::{ IcebergSnapshotDataCompactionPayload, IcebergSnapshotImportPayload, IcebergSnapshotIndexMergePayload, @@ -46,6 +48,7 @@ use crate::storage::wal::test_utils::WAL_TEST_TABLE_ID; use crate::storage::MooncakeTable; use crate::DataCompactionConfig; use crate::FileSystemAccessor; +use crate::TableEvent; use crate::WalConfig; use crate::WalManager; @@ -3017,3 +3020,153 @@ async fn test_schema_update_with_gcs() { // Common testing logic. test_schema_update_impl(iceberg_table_config.clone()).await; } + +#[tokio::test] +async fn test_debug() { + // three disk files + // two of them in iceberg already. one of them in mooncake snapshot + + // Local filesystem for iceberg. + let iceberg_temp_dir = tempdir().unwrap(); + let iceberg_table_config = get_iceberg_table_config(&iceberg_temp_dir); + + // Local filesystem to store write-through cache. + let table_temp_dir = tempdir().unwrap(); + let local_table_directory = table_temp_dir.path().to_str().unwrap().to_string(); + + // Create mooncake metadata. + let file_index_config = FileIndexMergeConfig::disabled(); + let data_compaction_config = DataCompactionConfig { + min_data_file_to_compact: 2, + max_data_file_to_compact: u32::MAX, + data_file_final_size: u64::MAX, + data_file_deletion_percentage: 0, + }; + let iceberg_persistence_config = IcebergPersistenceConfig { + new_data_file_count: 1, + new_committed_deletion_log: 1, + new_compacted_data_file_count: 1, + old_compacted_data_file_count: 1, + old_merged_file_indices_count: usize::MAX, + }; + let mut config = MooncakeTableConfig::new(table_temp_dir.path().to_str().unwrap().to_string()); + config.file_index_config = file_index_config; + config.data_compaction_config = data_compaction_config; + config.persistence_config = iceberg_persistence_config; + let mooncake_table_metadata = create_test_table_metadata_with_config( + local_table_directory, + config, + ); + + // Local filesystem to store read-through cache. + let cache_temp_dir = tempdir().unwrap(); + let object_storage_cache = create_test_object_storage_cache(&cache_temp_dir); + + let (mut table, mut notify_rx) = create_mooncake_table_and_notify( + mooncake_table_metadata.clone(), + iceberg_table_config.clone(), + object_storage_cache.clone(), + ) + .await; + let row = test_row_1(); + + // disk file 1: + // one row, in mooncake and iceberg + // deleted, in batch deletion vector and puffin + // + // disk file 2: + // one row, in mooncake and iceberg + // deleted, in batch deletion vector but not puffin + // + // disk file 3: + // one row, not in iceberg + // no deletion + // + // Initial append. + table.append(row.clone()).unwrap(); + table.commit(/*lsn=*/ 1); + flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 1) + .await + .unwrap(); + create_mooncake_and_persist_for_test(&mut table, &mut notify_rx).await; + + // Overwrite. + table.delete(row.clone(), /*lsn=*/ 2).await; + table.append(row.clone()).unwrap(); + table.commit(/*lsn=*/ 3); + flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 3) + .await + .unwrap(); + create_mooncake_and_persist_for_test(&mut table, &mut notify_rx).await; + + // Overwrite. + table.delete(row.clone(), /*lsn=*/ 4).await; + table.append(row.clone()).unwrap(); + table.commit(/*lsn=*/ 5); + flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 5) + .await + .unwrap(); + let (_, iceberg_payload, _, data_compaction_payload, _) = create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await; + // checkpoint-1 + + // Initiate an iceberg snapshot. + table.persist_iceberg_snapshot(iceberg_payload.unwrap()); + + // Perform data compaction. + let data_compaction_payload = data_compaction_payload.take_payload().unwrap(); + table.perform_data_compaction(data_compaction_payload); + + // Block wait both operations to finish. + let mut stored_data_compaction_result : Option = None; + let mut stored_iceberg_snapshot_result : Option = None; + + for _ in 0..2 { + let notification = notify_rx.recv().await.unwrap(); + if let TableEvent::DataCompactionResult { data_compaction_result } = notification { + assert!(stored_data_compaction_result.is_none()); + stored_data_compaction_result = Some(data_compaction_result.unwrap()); + } else if let TableEvent::IcebergSnapshotResult { iceberg_snapshot_result } = notification { + assert!(stored_iceberg_snapshot_result.is_none()); + stored_iceberg_snapshot_result = Some(iceberg_snapshot_result.unwrap()); + } else { + panic!("Expect either iceberg snapshot result and data compaction result but get {:?}", notification); + } + } + assert!(stored_data_compaction_result.is_some()); + assert!(stored_iceberg_snapshot_result.is_some()); + + // Reflect iceberg snapshot result to mooncake snapshot. + table.set_iceberg_snapshot_res(stored_iceberg_snapshot_result.unwrap()); + // checkpoint-2 + + // Create mooncake snapshot and sync. + create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await; + + // Reflect data compaction result. + table.set_data_compaction_res(stored_data_compaction_result.unwrap()); + + // Create mooncake snapshot and sync. + let (_, iceberg_payload, _, _, _) = create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await; + assert!(iceberg_payload.is_some()); + + // Create iceberg snapshot and sync. + let iceberg_snapshot_result = create_iceberg_snapshot(&mut table, iceberg_payload, &mut notify_rx).await.unwrap(); // <---- + table.set_iceberg_snapshot_res(iceberg_snapshot_result); + + // Validate iceberg snapshot content. + let filesystem_accessor = create_test_filesystem_accessor(&iceberg_table_config); + let mut iceberg_table_manager_for_recovery = IcebergTableManager::new( + mooncake_table_metadata.clone(), + create_test_object_storage_cache(&cache_temp_dir), // Use separate cache for each table. + filesystem_accessor.clone(), + iceberg_table_config.clone(), + ) + .unwrap(); + let (next_file_id, snapshot) = iceberg_table_manager_for_recovery + .load_snapshot_from_table() + .await + .unwrap(); + + // Validate iceberg snapshot. + verify_recovered_mooncake_snapshot(&snapshot, /*expected_ids=*/ &[1]).await; +} diff --git a/src/moonlink/src/storage/mooncake_table/snapshot.rs b/src/moonlink/src/storage/mooncake_table/snapshot.rs index 5e4347ae7..11ea3e732 100644 --- a/src/moonlink/src/storage/mooncake_table/snapshot.rs +++ b/src/moonlink/src/storage/mooncake_table/snapshot.rs @@ -578,6 +578,14 @@ impl SnapshotTableState { // TODO(hjiang): When there's only schema evolution, we should also flush even no flush. let flush_lsn = self.current_snapshot.flush_lsn.unwrap_or(0); + + println!("option = {:?}, flush by table write {}, flush lsn = {}, min ongoing {}", + opt.iceberg_snapshot_option, + flush_by_table_write, + flush_lsn, + task.min_ongoing_flush_lsn, + ); + if opt.iceberg_snapshot_option != IcebergSnapshotOption::Skip && (force_empty_iceberg_payload || flush_by_table_write) && flush_lsn < task.min_ongoing_flush_lsn @@ -592,6 +600,9 @@ impl SnapshotTableState { || flush_by_new_files_or_maintenance || force_empty_iceberg_payload { + + println!("should have iceberg payload?"); + iceberg_snapshot_payload = Some(self.get_iceberg_snapshot_payload( &opt.iceberg_snapshot_option, flush_lsn, diff --git a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs index b0ead4b04..3e1a234c7 100644 --- a/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs +++ b/src/moonlink/src/storage/mooncake_table/table_operation_test_utils.rs @@ -344,6 +344,21 @@ async fn sync_data_compaction(receiver: &mut Receiver) -> DataCompac /// Composite util functions /// =================================== /// +/// Test util function, but not block wait its completion. +pub(crate) async fn create_mooncake_snapshot_no_sync( + table: &mut MooncakeTable, +) { + let mooncake_snapshot_created = table.create_snapshot(SnapshotOption { + uuid: uuid::Uuid::new_v4(), + force_create: true, + dump_snapshot: false, + iceberg_snapshot_option: IcebergSnapshotOption::BestEffort(uuid::Uuid::new_v4()), + data_compaction_option: MaintenanceOption::BestEffort(uuid::Uuid::new_v4()), + index_merge_option: MaintenanceOption::BestEffort(uuid::Uuid::new_v4()), + }); + assert!(mooncake_snapshot_created); +} + // Test util function, which creates mooncake snapshot for testing. pub(crate) async fn create_mooncake_snapshot_for_test( table: &mut MooncakeTable, @@ -390,7 +405,7 @@ pub(crate) async fn create_mooncake_and_persist_for_test( } // Test util to block wait current mooncake snapshot completion, get the iceberg persistence payload, and perform a new mooncake snapshot and wait completion. -async fn sync_mooncake_snapshot_and_create_new_by_iceberg_payload( +pub(crate) async fn sync_mooncake_snapshot_and_create_new_by_iceberg_payload( table: &mut MooncakeTable, receiver: &mut Receiver, ) {