From 8b1235905225bd35b4f53fd08af5babeb4ea0f33 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Fri, 20 Feb 2026 18:11:04 -0500 Subject: [PATCH] Fix DebeziumIO resuming from worker restart * Move startTime recording into setup to fix NPE in restarted worker * Fix DebeziumIO poll loop not exiting when record list isn't empty * Make pollTimeout configurable --- sdks/java/io/debezium/src/README.md | 7 +- .../apache/beam/io/debezium/DebeziumIO.java | 28 ++-- .../io/debezium/KafkaSourceConsumerFn.java | 127 +++++++++--------- .../debezium/KafkaSourceConsumerFnTest.java | 89 ++++++++++-- .../beam/io/debezium/OffsetTrackerTest.java | 25 +--- 5 files changed, 159 insertions(+), 117 deletions(-) diff --git a/sdks/java/io/debezium/src/README.md b/sdks/java/io/debezium/src/README.md index e56ac370b705..535213218856 100644 --- a/sdks/java/io/debezium/src/README.md +++ b/sdks/java/io/debezium/src/README.md @@ -155,12 +155,7 @@ There are two ways of initializing KSC: * Restricted by number of records * Restricted by amount of time (minutes) -By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter: - -|Function|Param|Description| -|-|-|-| -|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, SourceRecordMapper, Int_|Restrict run by number of records (Default).| -|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).| +By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter for DebeziumIO.Read transform. ### Requirements and Supported versions diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java index b38c035adf2d..ebf91a4a0957 100644 --- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java @@ -63,11 +63,6 @@ * *

Usage example

* - *

Support is currently experimental. One of the known issues is that the connector does not - * preserve the offset on a worker crash or restart, causing it to retrieve all the data from the - * beginning again. See Issue #28248 for - * details. - * *

Connect to a Debezium - MySQL database and run a Pipeline * *

@@ -147,6 +142,8 @@ public abstract static class Read extends PTransform>
 
     abstract @Nullable Long getMaxTimeToRun();
 
+    abstract @Nullable Long getPollingTimeout();
+
     abstract @Nullable Coder getCoder();
 
     abstract Builder toBuilder();
@@ -163,6 +160,8 @@ abstract static class Builder {
 
       abstract Builder setMaxTimeToRun(Long miliseconds);
 
+      abstract Builder setPollingTimeout(Long miliseconds);
+
       abstract Read build();
     }
 
@@ -222,12 +221,18 @@ public Read withMaxTimeToRun(Long miliseconds) {
       return toBuilder().setMaxTimeToRun(miliseconds).build();
     }
 
+    /**
+     * Sets the timeout in milliseconds for consumer polling request in the {@link
+     * KafkaSourceConsumerFn}. A lower timeout optimizes for latency. Increase the timeout if the
+     * consumer is not fetching any records. The default is 1000 milliseconds.
+     */
+    public Read withPollingTimeout(Long miliseconds) {
+      return toBuilder().setPollingTimeout(miliseconds).build();
+    }
+
     protected Schema getRecordSchema() {
       KafkaSourceConsumerFn fn =
-          new KafkaSourceConsumerFn<>(
-              getConnectorConfiguration().getConnectorClass().get(),
-              getFormatFunction(),
-              getMaxNumberOfRecords());
+          new KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), this);
       fn.register(
           new KafkaSourceConsumerFn.OffsetTracker(
               new KafkaSourceConsumerFn.OffsetHolder(null, null, 0)));
@@ -267,10 +272,7 @@ public PCollection expand(PBegin input) {
           .apply(
               ParDo.of(
                   new KafkaSourceConsumerFn<>(
-                      getConnectorConfiguration().getConnectorClass().get(),
-                      getFormatFunction(),
-                      getMaxNumberOfRecords(),
-                      getMaxTimeToRun())))
+                      getConnectorConfiguration().getConnectorClass().get(), this)))
           .setCoder(getCoder());
     }
   }
diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index 00d7e6ac7411..62cb555b8af9 100644
--- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -29,6 +29,7 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -48,6 +49,8 @@
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
@@ -60,7 +63,6 @@
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,54 +92,37 @@
 public class KafkaSourceConsumerFn extends DoFn, T> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceConsumerFn.class);
   public static final String BEAM_INSTANCE_PROPERTY = "beam.parent.instance";
+  private static final Long DEFAULT_POLLING_TIMEOUT = 1000L;
 
   private final Class connectorClass;
+  private final DebeziumIO.Read spec;
   private final SourceRecordMapper fn;
+  private final Long pollingTimeOut;
 
-  private final Long millisecondsToRun;
-  private final Integer maxRecords;
-
-  private static DateTime startTime;
+  private transient DateTime startTime;
   private static final Map>>
       restrictionTrackers = new ConcurrentHashMap<>();
 
-  /**
-   * Initializes the SDF with a time limit.
-   *
-   * @param connectorClass Supported Debezium connector class
-   * @param fn a SourceRecordMapper
-   * @param maxRecords Maximum number of records to fetch before finishing.
-   * @param millisecondsToRun Maximum time to run (in milliseconds)
-   */
-  @SuppressWarnings("unchecked")
-  KafkaSourceConsumerFn(
-      Class connectorClass,
-      SourceRecordMapper fn,
-      Integer maxRecords,
-      Long millisecondsToRun) {
-    this.connectorClass = (Class) connectorClass;
-    this.fn = fn;
-    this.maxRecords = maxRecords;
-    this.millisecondsToRun = millisecondsToRun;
-  }
-
   /**
    * Initializes the SDF to be run indefinitely.
    *
    * @param connectorClass Supported Debezium connector class
-   * @param fn a SourceRecordMapper
-   * @param maxRecords Maximum number of records to fetch before finishing.
+   * @param spec a DebeziumIO.Read treansform
    */
-  KafkaSourceConsumerFn(Class connectorClass, SourceRecordMapper fn, Integer maxRecords) {
-    this(connectorClass, fn, maxRecords, null);
+  KafkaSourceConsumerFn(Class connectorClass, DebeziumIO.Read spec) {
+    // this(connectorClass, fn, maxRecords, null);
+    this.connectorClass = (Class) connectorClass;
+    this.spec = spec;
+    this.fn = spec.getFormatFunction();
+    this.pollingTimeOut =
+        MoreObjects.firstNonNull(spec.getPollingTimeout(), DEFAULT_POLLING_TIMEOUT);
   }
 
   @SuppressFBWarnings("ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD")
   @GetInitialRestriction
   public OffsetHolder getInitialRestriction(@Element Map unused)
       throws IOException {
-    KafkaSourceConsumerFn.startTime = new DateTime();
-    return new OffsetHolder(null, null, null, this.maxRecords, this.millisecondsToRun);
+    return new OffsetHolder(null, null, null, spec.getMaxNumberOfRecords(), spec.getMaxTimeToRun());
   }
 
   @NewTracker
@@ -211,6 +196,11 @@ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
     return timestamp;
   }
 
+  @Setup
+  public void setup() {
+    startTime = DateTime.now();
+  }
+
   /**
    * Process the retrieved element and format it for output. Update all pending
    *
@@ -222,28 +212,36 @@ private static Instant ensureTimestampWithinBounds(Instant timestamp) {
    *     continue processing after 1 second. Otherwise, if we've reached a limit of elements, to
    *     stop processing.
    */
-  @DoFn.ProcessElement
+  @ProcessElement
   public ProcessContinuation process(
       @Element Map element,
       RestrictionTracker> tracker,
-      OutputReceiver receiver)
-      throws Exception {
+      OutputReceiver receiver) {
     Map configuration = new HashMap<>(element);
 
     // Adding the current restriction to the class object to be found by the database history
     register(tracker);
     configuration.put(BEAM_INSTANCE_PROPERTY, this.getHashCode());
 
-    SourceConnector connector = connectorClass.getDeclaredConstructor().newInstance();
-    connector.start(configuration);
-
-    SourceTask task = (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance();
+    SourceConnector connector;
+    SourceTask task;
+    try {
+      connector = connectorClass.getDeclaredConstructor().newInstance();
+      connector.start(configuration);
+      task = (SourceTask) connector.taskClass().getDeclaredConstructor().newInstance();
+    } catch (InvocationTargetException
+        | InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
 
+    Duration remainingTimeout = Duration.ofMillis(pollingTimeOut);
     try {
       Map consumerOffset = tracker.currentRestriction().offset;
       LOG.debug("--------- Consumer offset from Debezium Tracker: {}", consumerOffset);
 
-      task.initialize(new BeamSourceTaskContext(tracker.currentRestriction().offset));
+      task.initialize(new BeamSourceTaskContext(consumerOffset));
       task.start(connector.taskConfigs(1).get(0));
 
       List records = task.poll();
@@ -254,7 +252,11 @@ public ProcessContinuation process(
       }
 
       LOG.debug("-------- {} records found", records.size());
-      while (records != null && !records.isEmpty()) {
+      final Stopwatch pollTimer = Stopwatch.createUnstarted();
+
+      while (Duration.ZERO.compareTo(remainingTimeout) < 0
+          && records != null
+          && !records.isEmpty()) {
         for (SourceRecord record : records) {
           LOG.debug("-------- Record found: {}", record);
 
@@ -271,8 +273,15 @@ public ProcessContinuation process(
           Instant recordInstant = debeziumRecordInstant(record);
           receiver.outputWithTimestamp(json, recordInstant);
         }
+        pollTimer.reset().start();
         task.commit();
         records = task.poll();
+        final Duration elapsed = pollTimer.elapsed();
+        try {
+          remainingTimeout = remainingTimeout.minus(elapsed);
+        } catch (ArithmeticException e) {
+          remainingTimeout = Duration.ZERO;
+        }
       }
     } catch (Exception ex) {
       throw new RuntimeException("Error occurred when consuming changes from Database. ", ex);
@@ -283,12 +292,14 @@ public ProcessContinuation process(
       task.stop();
     }
 
-    long elapsedTime = System.currentTimeMillis() - KafkaSourceConsumerFn.startTime.getMillis();
-    if (millisecondsToRun != null && millisecondsToRun > 0 && elapsedTime >= millisecondsToRun) {
-      return ProcessContinuation.stop();
-    } else {
-      return ProcessContinuation.resume().withResumeDelay(Duration.standardSeconds(1));
+    if (spec.getMaxTimeToRun() != null && spec.getMaxTimeToRun() > 0) {
+      long elapsedTime = System.currentTimeMillis() - startTime.getMillis();
+      if (elapsedTime >= spec.getMaxTimeToRun()) {
+        return ProcessContinuation.stop();
+      }
     }
+    return ProcessContinuation.resume()
+        .withResumeDelay(org.joda.time.Duration.millis(remainingTimeout.toMillis()));
   }
 
   public String getHashCode() {
@@ -418,17 +429,8 @@ static class OffsetTracker extends RestrictionTrackerThis works on two different ways:
-     *
-     * 

Number of records

- * - *

This is the default behavior. Once the specified number of records has been reached, it - * will stop fetching them. - * - *

Time based

- * - * User may specify the amount of time the connector to be kept alive. Please see {@link - * KafkaSourceConsumerFn} for more details on this. + *

If number of record has been set, once the specified number of records has been reached, + * it will stop fetching them. * * @param position Currently not used * @return boolean @@ -436,23 +438,16 @@ static class OffsetTracker extends RestrictionTracker position) { LOG.debug("-------------- Claiming {} used to have: {}", position, restriction.offset); - long elapsedTime = System.currentTimeMillis() - startTime.getMillis(); int fetchedRecords = this.restriction.fetchedRecords == null ? 0 : this.restriction.fetchedRecords + 1; LOG.debug("------------Fetched records {} / {}", fetchedRecords, this.restriction.maxRecords); - LOG.debug( - "-------------- Time running: {} / {}", elapsedTime, (this.restriction.millisToRun)); this.restriction.offset = position; this.restriction.fetchedRecords = fetchedRecords; LOG.debug("-------------- History: {}", this.restriction.history); - // If we've reached the maximum number of records OR the maximum time, we reject - // the attempt to claim. - // If we've reached neither, then we continue approve the claim. - return (this.restriction.maxRecords == null || fetchedRecords < this.restriction.maxRecords) - && (this.restriction.millisToRun == null - || this.restriction.millisToRun == -1 - || elapsedTime < this.restriction.millisToRun); + // If we've reached the maximum number of records, we reject the attempt to claim. + // Otherwise, we approve the claim. + return (this.restriction.maxRecords == null || fetchedRecords < this.restriction.maxRecords); } @Override diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java index 1df50b5e9acd..354e2589753d 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaSourceConsumerFnTest.java @@ -18,21 +18,27 @@ package org.apache.beam.io.debezium; import com.google.common.testing.EqualsTester; +import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.ListIterator; import java.util.Map; +import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.io.debezium.KafkaSourceConsumerFn.OffsetHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestOutputReceiver; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -48,6 +54,7 @@ import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.source.SourceTaskContext; import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.After; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,6 +63,19 @@ @RunWith(JUnit4.class) public class KafkaSourceConsumerFnTest implements Serializable { + static DebeziumIO.Read getSpec(SourceRecordMapper fn, Integer maxRecords) { + DebeziumIO.Read transform = DebeziumIO.read().withFormatFunction(fn); + if (maxRecords > 0) { + transform = transform.withMaxNumberOfRecords(maxRecords); + } + return transform; + } + + @After + public void cleanUp() { + CounterTask.resetCountTask(); + } + @Test public void testKafkaSourceConsumerFn() { Map config = @@ -76,9 +96,10 @@ public void testKafkaSourceConsumerFn() { ParDo.of( new KafkaSourceConsumerFn<>( CounterSourceConnector.class, - sourceRecord -> - ((Struct) sourceRecord.value()).getInt64("value").intValue(), - 10))) + getSpec( + sourceRecord -> + ((Struct) sourceRecord.value()).getInt64("value").intValue(), + 10)))) .setCoder(VarIntCoder.of()); PAssert.that(counts).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); @@ -104,8 +125,10 @@ public void testStoppableKafkaSourceConsumerFn() { ParDo.of( new KafkaSourceConsumerFn<>( CounterSourceConnector.class, - sourceRecord -> ((Struct) sourceRecord.value()).getInt64("value").intValue(), - 1))) + getSpec( + sourceRecord -> + ((Struct) sourceRecord.value()).getInt64("value").intValue(), + 1)))) .setCoder(VarIntCoder.of()); pipeline.run().waitUntilFinish(); @@ -159,6 +182,36 @@ public void testKafkaOffsetHolderEquality() { null)); tester.testEquals(); } + + @Test(timeout = 2000) + public void testMaxTimeToRun() throws IOException { + KafkaSourceConsumerFn kafkaSourceConsumerFn = + new KafkaSourceConsumerFn<>( + CounterSourceConnector.class, + KafkaSourceConsumerFnTest.getSpec( + sourceRecord -> ((Struct) sourceRecord.value()).getInt64("value").intValue(), 0) + .withPollingTimeout(100L) + .withMaxTimeToRun(500L)); // Run for 0.5 s + kafkaSourceConsumerFn.setup(); + OffsetHolder initialRestriction = kafkaSourceConsumerFn.getInitialRestriction(null); + RestrictionTracker> tracker = + kafkaSourceConsumerFn.newTracker(initialRestriction); + Map config = + ImmutableMap.of("from", "1", "delay", "0.4", "sleep", "1", "topic", "any"); + TestOutputReceiver receiver = new TestOutputReceiver<>(); + while (true) { + DoFn.ProcessContinuation continuation = + kafkaSourceConsumerFn.process(config, tracker, receiver); + if (continuation == DoFn.ProcessContinuation.stop()) { + break; + } + } + // Check results are in order + ListIterator it = receiver.getOutputs().listIterator(); + while (it.hasNext()) { + Assert.assertEquals(it.nextIndex(), it.next() - 1); + } + } } class CounterSourceConnector extends SourceConnector { @@ -173,9 +226,15 @@ public static class CounterSourceConnectorConfig extends AbstractConfig { protected static ConfigDef configDef() { return new ConfigDef() .define("from", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number to start from") - .define("to", ConfigDef.Type.INT, ConfigDef.Importance.HIGH, "Number to go to") + .define("to", ConfigDef.Type.INT, -1, ConfigDef.Importance.HIGH, "Number to go to") .define( "delay", ConfigDef.Type.DOUBLE, ConfigDef.Importance.HIGH, "Time between each event") + .define( + "sleep", + ConfigDef.Type.INT, + 0, + ConfigDef.Importance.MEDIUM, + "Millis to sleep in each poll") .define( "topic", ConfigDef.Type.STRING, @@ -205,8 +264,9 @@ public List> taskConfigs(int maxTasks) { return Collections.singletonList( ImmutableMap.of( "from", this.connectorConfig.props.get("from"), - "to", this.connectorConfig.props.get("to"), + "to", this.connectorConfig.props.getOrDefault("to", "-1"), "delay", this.connectorConfig.props.get("delay"), + "sleep", this.connectorConfig.props.getOrDefault("sleep", "0"), "topic", this.connectorConfig.props.get("topic"))); } @@ -224,11 +284,13 @@ public String version() { } } +@NotThreadSafe class CounterTask extends SourceTask { private static int countStopTasks = 0; private String topic = ""; private Integer from = 0; private Integer to = 0; + private Integer sleep = 0; private Double delay = 0.0; private Long start = System.currentTimeMillis(); @@ -266,8 +328,9 @@ public void initialize(SourceTaskContext context) { public void start(Map props) { this.topic = props.getOrDefault("topic", ""); this.from = Integer.parseInt(props.getOrDefault("from", "0")); - this.to = Integer.parseInt(props.getOrDefault("to", "0")); + this.to = Integer.parseInt(props.getOrDefault("to", "-1")); this.delay = Double.parseDouble(props.getOrDefault("delay", "0")); + this.sleep = Integer.parseInt(props.getOrDefault("sleep", "0")); if (this.lastOffset != null) { return; @@ -296,7 +359,7 @@ public List poll() throws InterruptedException { Long secondsSinceStart = (callTime - this.start) / 1000; Long recordsToOutput = Math.round(Math.floor(secondsSinceStart / this.delay)); - while (this.last < this.to) { + while (this.to == -1 || this.last < this.to) { this.last = this.last + 1; Map sourcePartition = Collections.singletonMap(PARTITION_FIELD, 1); Map sourceOffset = @@ -316,7 +379,9 @@ public List poll() throws InterruptedException { break; } } - + if (this.sleep > 0) { + Thread.sleep(this.sleep); + } return records; } @@ -328,4 +393,8 @@ public void stop() { public static int getCountTasks() { return CounterTask.countStopTasks; } + + public static void resetCountTask() { + CounterTask.countStopTasks = 0; + } } diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java index dc4338ac0485..c4ddfbef4f2b 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/OffsetTrackerTest.java @@ -37,7 +37,9 @@ public void testRestrictByNumberOfRecords() throws IOException { Map position = new HashMap<>(); KafkaSourceConsumerFn kafkaSourceConsumerFn = new KafkaSourceConsumerFn( - MySqlConnector.class, new SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords); + MySqlConnector.class, + KafkaSourceConsumerFnTest.getSpec( + new SourceRecordJson.SourceRecordJsonMapper(), maxNumRecords)); KafkaSourceConsumerFn.OffsetHolder restriction = kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>()); KafkaSourceConsumerFn.OffsetTracker tracker = @@ -48,25 +50,4 @@ public void testRestrictByNumberOfRecords() throws IOException { } assertFalse("OffsetTracker should stop", tracker.tryClaim(position)); } - - @Test - public void testRestrictByAmountOfTime() throws IOException, InterruptedException { - Map position = new HashMap<>(); - KafkaSourceConsumerFn kafkaSourceConsumerFn = - new KafkaSourceConsumerFn( - MySqlConnector.class, - new SourceRecordJson.SourceRecordJsonMapper(), - 100000, - 500L); // Run for 500 ms - KafkaSourceConsumerFn.OffsetHolder restriction = - kafkaSourceConsumerFn.getInitialRestriction(new HashMap<>()); - KafkaSourceConsumerFn.OffsetTracker tracker = - new KafkaSourceConsumerFn.OffsetTracker(restriction); - - assertTrue(tracker.tryClaim(position)); - - Thread.sleep(1000); // Sleep for a whole 2 seconds - - assertFalse(tracker.tryClaim(position)); - } }