From 16e646b49f5dd90a50050d01c8af998eae38d6db Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 19 Feb 2026 10:35:04 +0100 Subject: [PATCH 1/4] Adds triggerring configuration to KafkaIO eos to improve performance of sink. EOS sink is slow when sending transactions with single element, to speed up, it's needed to trigger together multiple messages. --- .../sdk/io/kafka/KafkaExactlyOnceSink.java | 22 ++++++++++++-- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 29 +++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index 93e7ff2b663d..410ca1e50d5e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -51,7 +51,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; @@ -84,6 +86,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; +import org.joda.time.Duration; import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,7 +162,8 @@ static void ensureEOSSupport() { @Override public PCollection expand(PCollection> input) { String topic = Preconditions.checkStateNotNull(spec.getTopic()); - + int numElements = spec.getNumElements(); + Duration timeout = spec.getTimeout(); int numShards = spec.getNumShards(); if (numShards <= 0) { try (Consumer consumer = openConsumer(spec)) { @@ -176,13 +180,27 @@ public PCollection expand(PCollection> input) { return input .apply( Window.>into(new GlobalWindows()) // Everything into global window. - .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterPane.elementCountAtLeast(numElements), + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeout)))) .discardingFiredPanes()) .apply( String.format("Shuffle across %d shards", numShards), ParDo.of(new Reshard<>(numShards))) .apply("Persist sharding", GroupByKey.create()) .apply("Assign sequential ids", ParDo.of(new Sequencer<>())) + .apply( + "Windowing", + Window.>>>>into( + new GlobalWindows()) // Everything into global window. + .triggering( + Repeatedly.forever( + AfterFirst.of( + AfterPane.elementCountAtLeast(numElements), + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeout)))) + .discardingFiredPanes()) .apply("Persist ids", GroupByKey.create()) .apply( String.format("Write to Kafka topic '%s'", spec.getTopic()), diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index ad5535517646..7570391be2e8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -648,6 +648,8 @@ public static WriteRecords writeRecords() { return new AutoValue_KafkaIO_WriteRecords.Builder() .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES) .setEOS(false) + .setNumElements(100) + .setTimeout(Duration.standardSeconds(1)) .setNumShards(0) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) @@ -3185,6 +3187,10 @@ public abstract static class WriteRecords @Pure public abstract boolean isEOS(); + public abstract int getNumElements(); + + public abstract Duration getTimeout(); + @Pure public abstract @Nullable String getSinkGroupId(); @@ -3221,6 +3227,10 @@ abstract Builder setPublishTimestampFunction( abstract Builder setEOS(boolean eosEnabled); + abstract Builder setNumElements(int numElements); + + abstract Builder setTimeout(Duration timeout); + abstract Builder setSinkGroupId(String sinkGroupId); abstract Builder setNumShards(int numShards); @@ -3368,6 +3378,12 @@ public WriteRecords withEOS(int numShards, String sinkGroupId) { return toBuilder().setEOS(true).setNumShards(numShards).setSinkGroupId(sinkGroupId).build(); } + public WriteRecords withEOSTriggerConfig(int numElements, Duration timeout) { + checkArgument(numElements >= 1, "numElements should be >= 1"); + checkArgument(timeout != null, "timeout is required for exactly-once sink"); + return toBuilder().setNumElements(numElements).setTimeout(timeout).build(); + } + /** * When exactly-once semantics are enabled (see {@link #withEOS(int, String)}), the sink needs * to fetch previously stored state with Kafka topic. Fetching the metadata requires a consumer. @@ -3653,6 +3669,19 @@ public Write withEOS(int numShards, String sinkGroupId) { return withWriteRecordsTransform(getWriteRecordsTransform().withEOS(numShards, sinkGroupId)); } + /** + * Set the frequency and numElements threshold at which messages are triggered. + * + *

This is only applicable when the write method is set to EOS. + * + *

Every timeout duration, or numElements (repeated, after first condition is met) collection + * of elements written. + */ + public Write withEOSTriggerConfig(int numElements, Duration timeout) { + return withWriteRecordsTransform( + getWriteRecordsTransform().withEOSTriggerConfig(numElements, timeout)); + } + /** * Wrapper method over {@link WriteRecords#withConsumerFactoryFn(SerializableFunction)}, used to * keep the compatibility with old API based on KV type of element. From c6ab2f5acc49a2ecc8faf2a02f384d73a7da36cd Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 19 Feb 2026 15:40:43 +0100 Subject: [PATCH 2/4] rename field --- .../beam/sdk/io/kafka/KafkaExactlyOnceSink.java | 4 ++-- .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 14 +++++++------- .../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 5 +++++ .../io/kafka/upgrade/KafkaIOTranslationTest.java | 2 ++ 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index 410ca1e50d5e..f4eb4c30a221 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -162,8 +162,8 @@ static void ensureEOSSupport() { @Override public PCollection expand(PCollection> input) { String topic = Preconditions.checkStateNotNull(spec.getTopic()); - int numElements = spec.getNumElements(); - Duration timeout = spec.getTimeout(); + int numElements = spec.getEosTriggerNumElements(); + Duration timeout = spec.getEosTriggerTimeout(); int numShards = spec.getNumShards(); if (numShards <= 0) { try (Consumer consumer = openConsumer(spec)) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7570391be2e8..e717447efe36 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -648,8 +648,8 @@ public static WriteRecords writeRecords() { return new AutoValue_KafkaIO_WriteRecords.Builder() .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES) .setEOS(false) - .setNumElements(100) - .setTimeout(Duration.standardSeconds(1)) + .setEosTriggerNumElements(100) + .setEosTriggerTimeout(Duration.standardSeconds(1)) .setNumShards(0) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) @@ -3187,9 +3187,9 @@ public abstract static class WriteRecords @Pure public abstract boolean isEOS(); - public abstract int getNumElements(); + public abstract int getEosTriggerNumElements(); - public abstract Duration getTimeout(); + public abstract Duration getEosTriggerTimeout(); @Pure public abstract @Nullable String getSinkGroupId(); @@ -3227,9 +3227,9 @@ abstract Builder setPublishTimestampFunction( abstract Builder setEOS(boolean eosEnabled); - abstract Builder setNumElements(int numElements); + abstract Builder setEosTriggerNumElements(int numElements); - abstract Builder setTimeout(Duration timeout); + abstract Builder setEosTriggerTimeout(Duration timeout); abstract Builder setSinkGroupId(String sinkGroupId); @@ -3381,7 +3381,7 @@ public WriteRecords withEOS(int numShards, String sinkGroupId) { public WriteRecords withEOSTriggerConfig(int numElements, Duration timeout) { checkArgument(numElements >= 1, "numElements should be >= 1"); checkArgument(timeout != null, "timeout is required for exactly-once sink"); - return toBuilder().setNumElements(numElements).setTimeout(timeout).build(); + return toBuilder().setEosTriggerNumElements(numElements).setEosTriggerTimeout(timeout).build(); } /** diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 51d9b028bab0..05b9b296bc25 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -479,6 +479,8 @@ static class KafkaIOWriteTranslator implements TransformPayloadTranslator transform) { } fieldValues.put("eos", writeRecordsTransform.isEOS()); + fieldValues.put( + "eos_trigger_timeout_ms", writeRecordsTransform.getEosTriggerTimeout().getMillis()); + fieldValues.put("eos_trigger_num_elements", writeRecordsTransform.getEosTriggerNumElements()); fieldValues.put("num_shards", writeRecordsTransform.getNumShards()); if (writeRecordsTransform.getSinkGroupId() != null) { diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index 845e89b3b659..205884b2cb60 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -94,6 +94,8 @@ public class KafkaIOTranslationTest { WRITE_TRANSFORM_SCHEMA_MAPPING.put("getValueSerializer", "value_serializer"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getPublishTimestampFunction", "publish_timestamp_fn"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("isEOS", "eos"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getEosTriggerTimeout", "eos_trigger_timeout_ms"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put("getEosTriggerNumElements", "eos_trigger_num_elements"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getSinkGroupId", "sink_group_id"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getNumShards", "num_shards"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getConsumerFactoryFn", "consumer_factory_fn"); From 06cb13e9ef3ba7dd77074a77314ba16d5cfbb338 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Thu, 19 Feb 2026 16:29:37 +0100 Subject: [PATCH 3/4] spotless --- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index e717447efe36..176110b85766 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -3381,7 +3381,10 @@ public WriteRecords withEOS(int numShards, String sinkGroupId) { public WriteRecords withEOSTriggerConfig(int numElements, Duration timeout) { checkArgument(numElements >= 1, "numElements should be >= 1"); checkArgument(timeout != null, "timeout is required for exactly-once sink"); - return toBuilder().setEosTriggerNumElements(numElements).setEosTriggerTimeout(timeout).build(); + return toBuilder() + .setEosTriggerNumElements(numElements) + .setEosTriggerTimeout(timeout) + .build(); } /** From 7ac991fc7a04dcef4acff17e21c5a8773b3ad0e4 Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 20 Feb 2026 14:50:52 +0100 Subject: [PATCH 4/4] fix compatibility spotless ds --- .../sdk/io/kafka/KafkaExactlyOnceSink.java | 26 +++++++++++-------- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 +++--- .../io/kafka/upgrade/KafkaIOTranslation.java | 8 +++--- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java index f4eb4c30a221..f34547bd2611 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java @@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.KV; @@ -176,30 +177,33 @@ public PCollection expand(PCollection> input) { } } checkState(numShards > 0, "Could not set number of shards"); - + Trigger.OnceTrigger trigger = null; + if (timeout != null) { + trigger = + AfterFirst.of( + AfterPane.elementCountAtLeast(numElements), + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeout)); + } else { + // fallback to default + trigger = AfterPane.elementCountAtLeast(numElements); + } return input .apply( Window.>into(new GlobalWindows()) // Everything into global window. - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterPane.elementCountAtLeast(numElements), - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeout)))) + .triggering(Repeatedly.forever(trigger)) .discardingFiredPanes()) .apply( String.format("Shuffle across %d shards", numShards), ParDo.of(new Reshard<>(numShards))) .apply("Persist sharding", GroupByKey.create()) .apply("Assign sequential ids", ParDo.of(new Sequencer<>())) + // Reapply the windowing configuration as the continuation trigger doesn't maintain the + // desired batching. .apply( "Windowing", Window.>>>>into( new GlobalWindows()) // Everything into global window. - .triggering( - Repeatedly.forever( - AfterFirst.of( - AfterPane.elementCountAtLeast(numElements), - AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeout)))) + .triggering(Repeatedly.forever(trigger)) .discardingFiredPanes()) .apply("Persist ids", GroupByKey.create()) .apply( diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 176110b85766..02d14b745fe8 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -648,8 +648,8 @@ public static WriteRecords writeRecords() { return new AutoValue_KafkaIO_WriteRecords.Builder() .setProducerConfig(WriteRecords.DEFAULT_PRODUCER_PROPERTIES) .setEOS(false) - .setEosTriggerNumElements(100) - .setEosTriggerTimeout(Duration.standardSeconds(1)) + .setEosTriggerNumElements(1) // keep default numElements + .setEosTriggerTimeout(null) // keep default trigger (timeout) .setNumShards(0) .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) @@ -3189,7 +3189,7 @@ public abstract static class WriteRecords public abstract int getEosTriggerNumElements(); - public abstract Duration getEosTriggerTimeout(); + public abstract @Nullable Duration getEosTriggerTimeout(); @Pure public abstract @Nullable String getSinkGroupId(); @@ -3229,7 +3229,7 @@ abstract Builder setPublishTimestampFunction( abstract Builder setEosTriggerNumElements(int numElements); - abstract Builder setEosTriggerTimeout(Duration timeout); + abstract Builder setEosTriggerTimeout(@Nullable Duration timeout); abstract Builder setSinkGroupId(String sinkGroupId); diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 05b9b296bc25..a015d6d48f32 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -480,7 +480,7 @@ static class KafkaIOWriteTranslator implements TransformPayloadTranslator transform) { } fieldValues.put("eos", writeRecordsTransform.isEOS()); - fieldValues.put( - "eos_trigger_timeout_ms", writeRecordsTransform.getEosTriggerTimeout().getMillis()); + org.joda.time.Duration eosTriggerTimeout = writeRecordsTransform.getEosTriggerTimeout(); + if (eosTriggerTimeout != null) { + fieldValues.put("eos_trigger_timeout_ms", eosTriggerTimeout.getMillis()); + } fieldValues.put("eos_trigger_num_elements", writeRecordsTransform.getEosTriggerNumElements()); fieldValues.put("num_shards", writeRecordsTransform.getNumShards());