diff --git a/src/moonlink/src/table_handler.rs b/src/moonlink/src/table_handler.rs index 1bb27ab5d..bca6c9514 100644 --- a/src/moonlink/src/table_handler.rs +++ b/src/moonlink/src/table_handler.rs @@ -182,6 +182,8 @@ impl TableHandler { }); }; + let mut ingest_event_count = 0; + // Process events until the receiver is closed or a Shutdown event is received while let Some(event) = event_receiver.recv().await { // Record event if requested. @@ -193,6 +195,25 @@ impl TableHandler { match event { event if event.is_ingest_event() => { + ingest_event_count += 1; + if ingest_event_count % 10_000_000 == 0 { + println!("ingest event count: {}", ingest_event_count); + } + if ingest_event_count == 100_000_000 { + println!("ingest event count reached 100,000,000, stopping replication"); + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs_f64(); + let now = chrono::Utc::now().to_rfc3339(); + println!( + "ingest event count reached 1,000,000, stopping replication at {:?}", + now + ); + break; + } + // dont acutally process table event + continue; Self::process_cdc_table_event(event, &mut table, &mut table_handler_state) .await; }