Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/moonlink/src/storage/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,33 @@ impl CompactionBuilder {
.unwrap()
};

if data_file_to_compact.file_id.file_id.0 == 10000000000014100 {
println!("10000000000014100 row num = {}, puffin deleted row number = {}", total_num_rows, deleted_rows_num);
}
if data_file_to_compact.file_id.file_id.0 == 10000000000014300 {
println!("10000000000014300 row num = {}, puffin deleted row number = {}", total_num_rows, deleted_rows_num);
}

Comment on lines +221 to +227

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of println! statements appears to be for debugging. Please remove it before merging.

let mut old_start_row_idx = 0;
let mut old_to_new_remap = HashMap::new();
while let Some(cur_record_batch) = reader.try_next().await? {

// If all rows have been deleted for the old data file, do nothing.
let cur_num_rows = cur_record_batch.num_rows();
let filtered_record_batch =
get_filtered_record_batch(cur_record_batch, old_start_row_idx);

if data_file_to_compact.file_id.file_id.0 == 10000000000014100 {
println!("10000000000014100 filtered record batch = {:?}", filtered_record_batch);
}
if data_file_to_compact.file_id.file_id.0 == 10000000000014300 {
println!("10000000000014300 filtered record batch = {:?}", filtered_record_batch);
}
if data_file_to_compact.file_id.file_id.0 == 10000000000059500 {
println!("10000000000059500 filtered record batch = {:?}", filtered_record_batch);
}


Comment on lines +237 to +247

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block of println! statements also appears to be for debugging and should be removed.

if filtered_record_batch.num_rows() == 0 {
old_start_row_idx += cur_num_rows;
continue;
Expand Down
153 changes: 153 additions & 0 deletions src/moonlink/src/storage/iceberg/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::storage::mooncake_table::table_operation_test_utils::*;
use crate::storage::mooncake_table::test_utils_commons::ICEBERG_TEST_NAMESPACE;
use crate::storage::mooncake_table::test_utils_commons::ICEBERG_TEST_TABLE;
use crate::storage::mooncake_table::validation_test_utils::*;
use crate::storage::mooncake_table::DataCompactionResult;
use crate::storage::mooncake_table::IcebergSnapshotPayload;
use crate::storage::mooncake_table::IcebergSnapshotResult;
use crate::storage::mooncake_table::{
IcebergSnapshotDataCompactionPayload, IcebergSnapshotImportPayload,
IcebergSnapshotIndexMergePayload,
Expand All @@ -46,6 +48,7 @@ use crate::storage::wal::test_utils::WAL_TEST_TABLE_ID;
use crate::storage::MooncakeTable;
use crate::DataCompactionConfig;
use crate::FileSystemAccessor;
use crate::TableEvent;
use crate::WalConfig;
use crate::WalManager;

Expand Down Expand Up @@ -3017,3 +3020,153 @@ async fn test_schema_update_with_gcs() {
// Common testing logic.
test_schema_update_impl(iceberg_table_config.clone()).await;
}

#[tokio::test]
async fn test_debug() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test name test_debug is not descriptive. Please rename it to reflect the scenario being tested, for example, test_concurrent_compaction_and_iceberg_snapshot or something similar that captures the essence of this complex interaction test.

Suggested change
async fn test_debug() {
async fn test_concurrent_compaction_and_iceberg_snapshot() {

// three disk files
// two of them in iceberg already. one of them in mooncake snapshot

// Local filesystem for iceberg.
let iceberg_temp_dir = tempdir().unwrap();
let iceberg_table_config = get_iceberg_table_config(&iceberg_temp_dir);

// Local filesystem to store write-through cache.
let table_temp_dir = tempdir().unwrap();
let local_table_directory = table_temp_dir.path().to_str().unwrap().to_string();

// Create mooncake metadata.
let file_index_config = FileIndexMergeConfig::disabled();
let data_compaction_config = DataCompactionConfig {
min_data_file_to_compact: 2,
max_data_file_to_compact: u32::MAX,
data_file_final_size: u64::MAX,
data_file_deletion_percentage: 0,
};
let iceberg_persistence_config = IcebergPersistenceConfig {
new_data_file_count: 1,
new_committed_deletion_log: 1,
new_compacted_data_file_count: 1,
old_compacted_data_file_count: 1,
old_merged_file_indices_count: usize::MAX,
};
let mut config = MooncakeTableConfig::new(table_temp_dir.path().to_str().unwrap().to_string());
config.file_index_config = file_index_config;
config.data_compaction_config = data_compaction_config;
config.persistence_config = iceberg_persistence_config;
let mooncake_table_metadata = create_test_table_metadata_with_config(
local_table_directory,
config,
);

// Local filesystem to store read-through cache.
let cache_temp_dir = tempdir().unwrap();
let object_storage_cache = create_test_object_storage_cache(&cache_temp_dir);

let (mut table, mut notify_rx) = create_mooncake_table_and_notify(
mooncake_table_metadata.clone(),
iceberg_table_config.clone(),
object_storage_cache.clone(),
)
.await;
let row = test_row_1();

// disk file 1:
// one row, in mooncake and iceberg
// deleted, in batch deletion vector and puffin
//
// disk file 2:
// one row, in mooncake and iceberg
// deleted, in batch deletion vector but not puffin
//
// disk file 3:
// one row, not in iceberg
// no deletion
//
// Initial append.
table.append(row.clone()).unwrap();
table.commit(/*lsn=*/ 1);
flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 1)
.await
.unwrap();
create_mooncake_and_persist_for_test(&mut table, &mut notify_rx).await;

// Overwrite.
table.delete(row.clone(), /*lsn=*/ 2).await;
table.append(row.clone()).unwrap();
table.commit(/*lsn=*/ 3);
flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 3)
.await
.unwrap();
create_mooncake_and_persist_for_test(&mut table, &mut notify_rx).await;

// Overwrite.
table.delete(row.clone(), /*lsn=*/ 4).await;
table.append(row.clone()).unwrap();
table.commit(/*lsn=*/ 5);
flush_table_and_sync(&mut table, &mut notify_rx, /*lsn=*/ 5)
.await
.unwrap();
let (_, iceberg_payload, _, data_compaction_payload, _) = create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await;
// checkpoint-1

// Initiate an iceberg snapshot.
table.persist_iceberg_snapshot(iceberg_payload.unwrap());

// Perform data compaction.
let data_compaction_payload = data_compaction_payload.take_payload().unwrap();
table.perform_data_compaction(data_compaction_payload);

// Block wait both operations to finish.
let mut stored_data_compaction_result : Option<DataCompactionResult> = None;
let mut stored_iceberg_snapshot_result : Option<IcebergSnapshotResult> = None;

for _ in 0..2 {
let notification = notify_rx.recv().await.unwrap();
if let TableEvent::DataCompactionResult { data_compaction_result } = notification {
assert!(stored_data_compaction_result.is_none());
stored_data_compaction_result = Some(data_compaction_result.unwrap());
} else if let TableEvent::IcebergSnapshotResult { iceberg_snapshot_result } = notification {
assert!(stored_iceberg_snapshot_result.is_none());
stored_iceberg_snapshot_result = Some(iceberg_snapshot_result.unwrap());
} else {
panic!("Expect either iceberg snapshot result and data compaction result but get {:?}", notification);
}
}
assert!(stored_data_compaction_result.is_some());
assert!(stored_iceberg_snapshot_result.is_some());

// Reflect iceberg snapshot result to mooncake snapshot.
table.set_iceberg_snapshot_res(stored_iceberg_snapshot_result.unwrap());
// checkpoint-2

// Create mooncake snapshot and sync.
create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await;

// Reflect data compaction result.
table.set_data_compaction_res(stored_data_compaction_result.unwrap());

// Create mooncake snapshot and sync.
let (_, iceberg_payload, _, _, _) = create_mooncake_snapshot_for_test(&mut table, &mut notify_rx).await;
assert!(iceberg_payload.is_some());

// Create iceberg snapshot and sync.
let iceberg_snapshot_result = create_iceberg_snapshot(&mut table, iceberg_payload, &mut notify_rx).await.unwrap(); // <----
table.set_iceberg_snapshot_res(iceberg_snapshot_result);

// Validate iceberg snapshot content.
let filesystem_accessor = create_test_filesystem_accessor(&iceberg_table_config);
let mut iceberg_table_manager_for_recovery = IcebergTableManager::new(
mooncake_table_metadata.clone(),
create_test_object_storage_cache(&cache_temp_dir), // Use separate cache for each table.
filesystem_accessor.clone(),
iceberg_table_config.clone(),
)
.unwrap();
let (next_file_id, snapshot) = iceberg_table_manager_for_recovery
.load_snapshot_from_table()
.await
.unwrap();

// Validate iceberg snapshot.
verify_recovered_mooncake_snapshot(&snapshot, /*expected_ids=*/ &[1]).await;
}
11 changes: 11 additions & 0 deletions src/moonlink/src/storage/mooncake_table/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,14 @@ impl SnapshotTableState {

// TODO(hjiang): When there's only schema evolution, we should also flush even no flush.
let flush_lsn = self.current_snapshot.flush_lsn.unwrap_or(0);

println!("option = {:?}, flush by table write {}, flush lsn = {}, min ongoing {}",
opt.iceberg_snapshot_option,
flush_by_table_write,
flush_lsn,
task.min_ongoing_flush_lsn,
);

Comment on lines +582 to +588

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This println! statement appears to be for debugging. Please remove it before merging.

if opt.iceberg_snapshot_option != IcebergSnapshotOption::Skip
&& (force_empty_iceberg_payload || flush_by_table_write)
&& flush_lsn < task.min_ongoing_flush_lsn
Expand All @@ -592,6 +600,9 @@ impl SnapshotTableState {
|| flush_by_new_files_or_maintenance
|| force_empty_iceberg_payload
{

println!("should have iceberg payload?");

Comment on lines +603 to +605

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This println! statement also seems to be for debugging and should be removed.

iceberg_snapshot_payload = Some(self.get_iceberg_snapshot_payload(
&opt.iceberg_snapshot_option,
flush_lsn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,21 @@ async fn sync_data_compaction(receiver: &mut Receiver<TableEvent>) -> DataCompac
/// Composite util functions
/// ===================================
///
/// Test util function, but not block wait its completion.
pub(crate) async fn create_mooncake_snapshot_no_sync(
table: &mut MooncakeTable,
) {
let mooncake_snapshot_created = table.create_snapshot(SnapshotOption {
uuid: uuid::Uuid::new_v4(),
force_create: true,
dump_snapshot: false,
iceberg_snapshot_option: IcebergSnapshotOption::BestEffort(uuid::Uuid::new_v4()),
data_compaction_option: MaintenanceOption::BestEffort(uuid::Uuid::new_v4()),
index_merge_option: MaintenanceOption::BestEffort(uuid::Uuid::new_v4()),
});
assert!(mooncake_snapshot_created);
}

// Test util function, which creates mooncake snapshot for testing.
pub(crate) async fn create_mooncake_snapshot_for_test(
table: &mut MooncakeTable,
Expand Down Expand Up @@ -390,7 +405,7 @@ pub(crate) async fn create_mooncake_and_persist_for_test(
}

// Test util to block wait current mooncake snapshot completion, get the iceberg persistence payload, and perform a new mooncake snapshot and wait completion.
async fn sync_mooncake_snapshot_and_create_new_by_iceberg_payload(
pub(crate) async fn sync_mooncake_snapshot_and_create_new_by_iceberg_payload(
table: &mut MooncakeTable,
receiver: &mut Receiver<TableEvent>,
) {
Expand Down