Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.components.deadletterqueue;


import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -26,27 +25,33 @@
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.NonNull;

public class DLQRouter<T, K> extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection<T>> {
public class DLQRouter<T, K>
extends PTransform<@NonNull PCollectionTuple, @NonNull PCollection<T>> {

private final TupleTag<T> goodMessages;

private final TupleTag<K> badMessages;

private final PTransform<@NonNull PCollection<K>,?> errorSink;
private final PTransform<@NonNull PCollection<K>, ?> errorSink;

public DLQRouter (TupleTag<T> goodMessages, TupleTag<K> badMessages, PTransform<@NonNull PCollection<K>,?> errorSink){
public DLQRouter(
TupleTag<T> goodMessages,
TupleTag<K> badMessages,
PTransform<@NonNull PCollection<K>, ?> errorSink) {
this.goodMessages = goodMessages;
this.badMessages = badMessages;
this.errorSink = errorSink;
}

@Override
public PCollection<T> expand(@NonNull PCollectionTuple input) {
//validate no extra messages are dropped
Map<TupleTag<?>,PCollection<?>> pcollections = new HashMap<>(input.getAll());
// validate no extra messages are dropped
Map<TupleTag<?>, PCollection<?>> pcollections = new HashMap<>(input.getAll());
pcollections.remove(goodMessages);
pcollections.remove(badMessages);
if (pcollections.size() != 0){
throw new IllegalArgumentException("DLQ Router only supports PCollectionTuples split between two message groupings");
if (pcollections.size() != 0) {
throw new IllegalArgumentException(
"DLQ Router only supports PCollectionTuples split between two message groupings");
}

input.get(badMessages).apply(errorSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.components.deadletterqueue.sinks;


import org.apache.beam.repackaged.core.org.apache.commons.lang3.ObjectUtils.Null;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -38,7 +37,7 @@ public PDone expand(@NonNull PCollection<T> input) {
public class ThrowingDoFn extends DoFn<T, Null> {

@ProcessElement
public void processElement(@Element @NonNull T element){
public void processElement(@Element @NonNull T element) {
throw new RuntimeException(element.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;

/**
* The ThrottlingSignaler is a utility class for IOs to signal to the runner
* that a process is being throttled, preventing autoscaling. This is primarily
* used when making calls to a remote service where quotas and rate limiting
* are reasonable considerations.
* The ThrottlingSignaler is a utility class for IOs to signal to the runner that a process is being
* throttled, preventing autoscaling. This is primarily used when making calls to a remote service
* where quotas and rate limiting are reasonable considerations.
*/
public class ThrottlingSignaler {
private final Counter throttleCounter;
Expand All @@ -37,8 +37,7 @@ public ThrottlingSignaler() {
}

/**
* Signal that a transform has been throttled for an amount of time
* represented in milliseconds.
* Signal that a transform has been throttled for an amount of time represented in milliseconds.
*/
public void signalThrottling(long milliseconds) {
throttleCounter.inc(milliseconds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.components.deadletterqueue;


import org.apache.beam.sdk.io.components.deadletterqueue.sinks.ThrowingSink;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand All @@ -31,46 +30,45 @@

public class DLQRouterTest {

@Rule
public final transient TestPipeline p = TestPipeline.create();
@Rule public final transient TestPipeline p = TestPipeline.create();

@Rule public ExpectedException thrown = ExpectedException.none();


@Test
public void testExceptionWithInvalidConfiguration(){
public void testExceptionWithInvalidConfiguration() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("DLQ Router only supports PCollectionTuples split between two message groupings");
thrown.expectMessage(
"DLQ Router only supports PCollectionTuples split between two message groupings");

TupleTag<String> tag1 = new TupleTag<>();
TupleTag<String> tag2 = new TupleTag<>();
TupleTag<String> tag3 = new TupleTag<>();
PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply(Create.<String>of("elem1")))
.and(tag2, p.apply(Create.<String>of("elem2")))
.and(tag3, p.apply(Create.<String>of("elem1")));
PCollectionTuple tuple =
PCollectionTuple.of(tag1, p.apply(Create.<String>of("elem1")))
.and(tag2, p.apply(Create.<String>of("elem2")))
.and(tag3, p.apply(Create.<String>of("elem1")));
tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>()));

p.run();

}

@Test
public void testExpectCorrectRouting(){
public void testExpectCorrectRouting() {
thrown.expect(RuntimeException.class);
thrown.expectMessage("elem2");

TupleTag<String> tag1 = new TupleTag<>();
TupleTag<String> tag2 = new TupleTag<>();

PCollectionTuple tuple = PCollectionTuple.of(tag1, p.apply("create elem1", Create.<String>of("elem1")))
.and(tag2, p.apply("create elem2", Create.<String>of("elem2")));
PCollectionTuple tuple =
PCollectionTuple.of(tag1, p.apply("create elem1", Create.<String>of("elem1")))
.and(tag2, p.apply("create elem2", Create.<String>of("elem2")));

PCollection<String> expectedElement = tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>()));
PCollection<String> expectedElement =
tuple.apply(new DLQRouter<>(tag1, tag2, new ThrowingSink<>()));

PAssert.thatSingleton(expectedElement).isEqualTo("elem1");

p.run();
}


}
Loading