diff --git a/sdks/java/io/debezium/src/README.md b/sdks/java/io/debezium/src/README.md
index e56ac370b705..535213218856 100644
--- a/sdks/java/io/debezium/src/README.md
+++ b/sdks/java/io/debezium/src/README.md
@@ -155,12 +155,7 @@ There are two ways of initializing KSC:
* Restricted by number of records
* Restricted by amount of time (minutes)
-By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter:
-
-|Function|Param|Description|
-|-|-|-|
-|`KafkaSourceConsumerFn(connectorClass, recordMapper, maxRecords)`|_Class, SourceRecordMapper, Int_|Restrict run by number of records (Default).|
-|`KafkaSourceConsumerFn(connectorClass, recordMapper, timeToRun)`|_Class, SourceRecordMapper, Long_|Restrict run by amount of time (in minutes).|
+By default, DebeziumIO initializes it with the former, though user may choose the latter by setting the amount of minutes as a parameter for DebeziumIO.Read transform.
### Requirements and Supported versions
diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
index b38c035adf2d..ebf91a4a0957 100644
--- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
+++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -63,11 +63,6 @@
*
*
Usage example
*
- * Support is currently experimental. One of the known issues is that the connector does not
- * preserve the offset on a worker crash or restart, causing it to retrieve all the data from the
- * beginning again. See Issue #28248 for
- * details.
- *
*
Connect to a Debezium - MySQL database and run a Pipeline
*
*
@@ -147,6 +142,8 @@ public abstract static class Read extends PTransform>
abstract @Nullable Long getMaxTimeToRun();
+ abstract @Nullable Long getPollingTimeout();
+
abstract @Nullable Coder getCoder();
abstract Builder toBuilder();
@@ -163,6 +160,8 @@ abstract static class Builder {
abstract Builder setMaxTimeToRun(Long miliseconds);
+ abstract Builder setPollingTimeout(Long miliseconds);
+
abstract Read build();
}
@@ -222,12 +221,18 @@ public Read withMaxTimeToRun(Long miliseconds) {
return toBuilder().setMaxTimeToRun(miliseconds).build();
}
+ /**
+ * Sets the timeout in milliseconds for consumer polling request in the {@link
+ * KafkaSourceConsumerFn}. A lower timeout optimizes for latency. Increase the timeout if the
+ * consumer is not fetching any records. The default is 1000 milliseconds.
+ */
+ public Read withPollingTimeout(Long miliseconds) {
+ return toBuilder().setPollingTimeout(miliseconds).build();
+ }
+
protected Schema getRecordSchema() {
KafkaSourceConsumerFn fn =
- new KafkaSourceConsumerFn<>(
- getConnectorConfiguration().getConnectorClass().get(),
- getFormatFunction(),
- getMaxNumberOfRecords());
+ new KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), this);
fn.register(
new KafkaSourceConsumerFn.OffsetTracker(
new KafkaSourceConsumerFn.OffsetHolder(null, null, 0)));
@@ -267,10 +272,7 @@ public PCollection expand(PBegin input) {
.apply(
ParDo.of(
new KafkaSourceConsumerFn<>(
- getConnectorConfiguration().getConnectorClass().get(),
- getFormatFunction(),
- getMaxNumberOfRecords(),
- getMaxTimeToRun())))
+ getConnectorConfiguration().getConnectorClass().get(), this)))
.setCoder(getCoder());
}
}
diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index 00d7e6ac7411..62cb555b8af9 100644
--- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -29,6 +29,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -48,6 +49,8 @@
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams;
@@ -60,7 +63,6 @@
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
-import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,54 +92,37 @@
public class KafkaSourceConsumerFn extends DoFn