Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/moonlink/src/table_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ impl TableHandler {
});
};

let mut ingest_event_count = 0;

// Process events until the receiver is closed or a Shutdown event is received
while let Some(event) = event_receiver.recv().await {
// Record event if requested.
Expand All @@ -193,6 +195,25 @@ impl TableHandler {

match event {
event if event.is_ingest_event() => {
ingest_event_count += 1;
if ingest_event_count % 10_000_000 == 0 {
println!("ingest event count: {}", ingest_event_count);
}
if ingest_event_count == 100_000_000 {
println!("ingest event count reached 100,000,000, stopping replication");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
let now = chrono::Utc::now().to_rfc3339();
println!(
"ingest event count reached 1,000,000, stopping replication at {:?}",
now
);
break;
}
// dont acutally process table event
continue;
Self::process_cdc_table_event(event, &mut table, &mut table_handler_state)
.await;
}
Expand Down
Loading