diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java index 6b4485db2167..a6c99f807d0a 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouter.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.components.deadletterqueue; - import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.transforms.PTransform; @@ -26,27 +25,33 @@ import org.apache.beam.sdk.values.TupleTag; import org.checkerframework.checker.nullness.qual.NonNull; -public class DLQRouter extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection> { +public class DLQRouter + extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection> { private final TupleTag goodMessages; private final TupleTag badMessages; - private final PTransform<@NonNull PCollection,?> errorSink; + private final PTransform<@NonNull PCollection, ?> errorSink; - public DLQRouter (TupleTag goodMessages, TupleTag badMessages, PTransform<@NonNull PCollection,?> errorSink){ + public DLQRouter( + TupleTag goodMessages, + TupleTag badMessages, + PTransform<@NonNull PCollection, ?> errorSink) { this.goodMessages = goodMessages; this.badMessages = badMessages; this.errorSink = errorSink; } + @Override public PCollection expand(@NonNull PCollectionTuple input) { - //validate no extra messages are dropped - Map,PCollection> pcollections = new HashMap<>(input.getAll()); + // validate no extra messages are dropped + Map, PCollection> pcollections = new HashMap<>(input.getAll()); pcollections.remove(goodMessages); pcollections.remove(badMessages); - if (pcollections.size() != 0){ - throw new IllegalArgumentException("DLQ Router only supports PCollectionTuples split between two message groupings"); + if (pcollections.size() != 0) { + throw new IllegalArgumentException( + "DLQ Router only supports PCollectionTuples split between two message groupings"); } input.get(badMessages).apply(errorSink); diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java index 9869cd09da3b..73970d8a7c61 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/deadletterqueue/sinks/ThrowingSink.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.components.deadletterqueue.sinks; - import org.apache.beam.repackaged.core.org.apache.commons.lang3.ObjectUtils.Null; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; @@ -38,7 +37,7 @@ public PDone expand(@NonNull PCollection input) { public class ThrowingDoFn extends DoFn { @ProcessElement - public void processElement(@Element @NonNull T element){ + public void processElement(@Element @NonNull T element) { throw new RuntimeException(element.toString()); } } diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ThrottlingSignaler.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ThrottlingSignaler.java index 894c9294bed4..9bd4466997fd 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ThrottlingSignaler.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/throttling/ThrottlingSignaler.java @@ -19,11 +19,11 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; + /** - * The ThrottlingSignaler is a utility class for IOs to signal to the runner - * that a process is being throttled, preventing autoscaling. This is primarily - * used when making calls to a remote service where quotas and rate limiting - * are reasonable considerations. + * The ThrottlingSignaler is a utility class for IOs to signal to the runner that a process is being + * throttled, preventing autoscaling. This is primarily used when making calls to a remote service + * where quotas and rate limiting are reasonable considerations. */ public class ThrottlingSignaler { private final Counter throttleCounter; @@ -37,8 +37,7 @@ public ThrottlingSignaler() { } /** - * Signal that a transform has been throttled for an amount of time - * represented in milliseconds. + * Signal that a transform has been throttled for an amount of time represented in milliseconds. */ public void signalThrottling(long milliseconds) { throttleCounter.inc(milliseconds); diff --git a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java index 8d2e5d296f4f..09e886da97ba 100644 --- a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java +++ b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/deadletterqueue/DLQRouterTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io.components.deadletterqueue; - import org.apache.beam.sdk.io.components.deadletterqueue.sinks.ThrowingSink; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -31,46 +30,45 @@ public class DLQRouterTest { - @Rule - public final transient TestPipeline p = TestPipeline.create(); + @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void testExceptionWithInvalidConfiguration(){ + public void testExceptionWithInvalidConfiguration() { thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("DLQ Router only supports PCollectionTuples split between two message groupings"); + thrown.expectMessage( + "DLQ Router only supports PCollectionTuples split between two message groupings"); TupleTag tag1 = new TupleTag<>(); TupleTag tag2 = new TupleTag<>(); TupleTag tag3 = new TupleTag<>(); - PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply(Create.of("elem1"))) - .and(tag2, p.apply(Create.of("elem2"))) - .and(tag3, p.apply(Create.of("elem1"))); + PCollectionTuple tuple = + PCollectionTuple.of(tag1, p.apply(Create.of("elem1"))) + .and(tag2, p.apply(Create.of("elem2"))) + .and(tag3, p.apply(Create.of("elem1"))); tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>())); p.run(); - } @Test - public void testExpectCorrectRouting(){ + public void testExpectCorrectRouting() { thrown.expect(RuntimeException.class); thrown.expectMessage("elem2"); TupleTag tag1 = new TupleTag<>(); TupleTag tag2 = new TupleTag<>(); - PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply("create elem1", Create.of("elem1"))) - .and(tag2, p.apply("create elem2", Create.of("elem2"))); + PCollectionTuple tuple = + PCollectionTuple.of(tag1, p.apply("create elem1", Create.of("elem1"))) + .and(tag2, p.apply("create elem2", Create.of("elem2"))); - PCollection expectedElement = tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>())); + PCollection expectedElement = + tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>())); PAssert.thatSingleton(expectedElement).isEqualTo("elem1"); p.run(); } - - }