diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 3f6e92240..f0c4704a4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -9,6 +9,7 @@ import lombok.Builder; import lombok.Getter; import lombok.ToString; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; @@ -19,6 +20,7 @@ import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.DLQ; /** * The options for the {@link AbstractParallelEoSStreamProcessor} system. @@ -43,6 +45,11 @@ public class ParallelConsumerOptions { */ private final Producer producer; + /** + * Supplying a producer is only needed for some functions, which will cause an error if used when it's missing. + */ + private final AdminClient adminClient; + /** * Path to Managed executor service for Java EE */ @@ -228,7 +235,7 @@ public boolean isUsingBatching() { public enum TerminalFailureReaction { SHUTDOWN, SKIP, - // DLQ, TODO + DLQ } /** @@ -248,8 +255,17 @@ public void validate() { // WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); + + // + // todo should admin be required? + if (!isProducerSupplied() && getAdminClient() == null && getTerminalFailureReaction() == DLQ) { + throw new IllegalArgumentException(msg("Wanting to use DQL failure mode ({}) without supplying a either a Producer or Admin client (both are needed) instance", + getTerminalFailureReaction())); + } + } + public boolean isUsingTransactionalProducer() { return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index bbfb14866..040adf266 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -4,19 +4,14 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; -import io.confluent.parallelconsumer.internal.InternalRuntimeError; +import io.confluent.parallelconsumer.internal.ProducerManager; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import pl.tlinkowski.unij.api.UniLists; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -68,22 +63,9 @@ public void pollAndProduceMany(Function, List> results = new ArrayList<>(); log.trace("Producing {} messages in result...", recordListToProduce.size()); - - var futures = super.getProducerManager().get().produceMessages(recordListToProduce); - try { - for (Tuple, Future> future : futures) { - var recordMetadata = TimeUtils.time(() -> - future.getRight().get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS)); - var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata); - results.add(result); - } - } catch (Exception e) { - throw new InternalRuntimeError("Error while waiting for produce results", e); - } - - return results; + ProducerManager producer = super.getProducerManager().get(); + return producer.produceMessagesSyncWithContext(context.getPollContext(), recordListToProduce); }; supervisorLoop(wrappedUserFunc, callback); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 6ce59ffcf..3d43b03b2 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -104,6 +104,8 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall @Getter(PROTECTED) private final BlockingQueue> workMailBox = new LinkedBlockingQueue<>(); // Thread safe, highly performant, non blocking + private final UserFunctionRunner runner; + /** * An inbound message to the controller. *

@@ -111,6 +113,7 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall */ @Value @RequiredArgsConstructor(access = PRIVATE) + // todo refactor - this has been extracted in PR#270 private static class ControllerEventMessage { WorkContainer workContainer; EpochAndRecordsMap consumerRecords; @@ -256,6 +259,8 @@ public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptio this.producerManager = Optional.empty(); this.committer = this.brokerPollSubsystem; } + + this.runner = new UserFunctionRunner<>(this, clock, getProducerManager(), options.getAdminClient()); } private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer consumer) { @@ -771,8 +776,7 @@ private void submitWorkToPoolInner(final Function, log.trace("Sending work ({}) to pool", batch); Future outputRecordFuture = workerThreadPool.submit(() -> { addInstanceMDC(); - UserFunctionRunner runner = new UserFunctionRunner<>(this); - return runner.runUserFunction(usersFunction, callback, batch); + return this.runner.runUserFunction(usersFunction, callback, batch); }); // for a batch, each message in the batch shares the same result for (final WorkContainer workContainer : batch) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java index 93f49ea5e..05ce77d61 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java @@ -5,8 +5,10 @@ */ import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.state.WorkManager; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -20,15 +22,24 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; -import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static io.confluent.csid.utils.StringUtils.msg; +/** + * Central control point for {@link KafkaProducer} where it's needed in some scenarios, like transactions. + *

+ * Also has useful wrapping methods. + *

+ * todo docs + */ @Slf4j public class ProducerManager extends AbstractOffsetCommitter implements OffsetCommitter { @@ -38,9 +49,8 @@ public class ProducerManager extends AbstractOffsetCommitter impleme private final boolean producerIsConfiguredForTransactions; - /** - * The {@link KafkaProducer) isn't actually completely thread safe, at least when using it transactionally. We must + * The {@link KafkaProducer} isn't actually completely thread safe, at least when using it transactionally. We must * be careful not to send messages to the producer, while we are committing a transaction - "Cannot call send in * state COMMITTING_TRANSACTION". */ @@ -123,7 +133,7 @@ private TransactionManager getTransactionManager() { * @see ParallelConsumer#poll * @see ParallelStreamProcessor#pollAndProduceMany */ - public List, Future>> produceMessages(List> outMsgs) { + public List, Future>> produceMessages(List> outMsgs) { // only needed if not using tx Callback callback = (RecordMetadata metadata, Exception exception) -> { if (exception != null) { @@ -134,18 +144,42 @@ public List, Future> ReentrantReadWriteLock.ReadLock readLock = producerTransactionLock.readLock(); readLock.lock(); - List, Future>> futures = new ArrayList<>(outMsgs.size()); try { - for (ProducerRecord rec : outMsgs) { + return outMsgs.stream().map(rec -> { log.trace("Producing {}", rec); var future = producer.send(rec, callback); - futures.add(ParallelConsumer.Tuple.pairOf(rec, future)); - } + return Tuple.pairOf(rec, future); + }).collect(Collectors.toList()); } finally { readLock.unlock(); } + } + + public List> produceMessagesSyncWithContext(PollContext context, List> outMsgs) { + return produceMessagesSync(outMsgs).map(tuple -> { + RecordMetadata recordMetadata = tuple.getRight(); + ProducerRecord producerRecord = tuple.getLeft(); + return new ParallelStreamProcessor.ConsumeProduceResult<>(context, producerRecord, recordMetadata); + }).collect(Collectors.toList()); + } + + public Stream, RecordMetadata>> produceMessagesSync(List> outMsgs) { + var futures = produceMessages(outMsgs); - return futures; + try { + return futures.stream().map(tuple -> { + Future futureSendResult = tuple.getRight(); + ProducerRecord producerRecord = tuple.getLeft(); + try { + RecordMetadata recordMetadata = futureSendResult.get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS); + return new Tuple<>(producerRecord, recordMetadata); + } catch (Exception e) { + throw new RuntimeException(msg("Error while waiting on individual produce result {}", producerRecord), e); + } + }); + } catch (Exception e) { + throw new InternalRuntimeError(msg("Error while waiting for produce results {}", outMsgs), e); + } } @Override diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 4964b52f4..af3fab38c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -9,26 +9,62 @@ import io.confluent.parallelconsumer.state.WorkContainer; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.slf4j.MDC; import pl.tlinkowski.unij.api.UniLists; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static io.confluent.csid.utils.StringUtils.msg; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.*; import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR; +import static java.util.Optional.empty; @AllArgsConstructor @Slf4j public class UserFunctionRunner { + public static final String HEADER_PREFIX = "pc-"; + + private static final String FAILURE_COUNT_KEY = HEADER_PREFIX + "failure-count"; + + private static final String LAST_FAILURE_KEY = HEADER_PREFIX + "last-failure-at"; + + private static final String FAILURE_CAUSE_KEY = HEADER_PREFIX + "last-failure-cause"; + + private static final String PARTITION_KEY = HEADER_PREFIX + "partition"; + + private static final String OFFSET_KEY = HEADER_PREFIX + "offset"; + + private static final Serializer serializer = Serdes.String().serializer(); + + private static final String DLQ_SUFFIX = ".DLQ"; + private AbstractParallelEoSStreamProcessor pc; + private final Clock clock; + + private final Optional> producer; + + private final AdminClient adminClient; + + /** * Run the supplied function. */ @@ -113,7 +149,8 @@ private List, R>> handleUserSuccess(Consumer c } private List, R>> handleUserTerminalFailure(PollContextInternal context, - PCTerminalException e, Consumer callback) { + PCTerminalException e, + Consumer callback) { var reaction = pc.getOptions().getTerminalFailureReaction(); if (reaction == SKIP) { @@ -134,11 +171,98 @@ private List, R>> handleUserTerminalFailure(PollC // throw again to make the future failed throw e; + } else if (reaction == DLQ) { + handleDlqReaction(context, e); + // othewise pretend to succeed + return handleUserSuccess(callback, context.getWorkContainers(), UniLists.of()); } else { throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); } } + private void handleDlqReaction(PollContextInternal context, PCTerminalException userError) { + try { + var producerRecords = prepareDlqMsgs(context, userError); + //noinspection OptionalGetWithoutIsPresent - presence handled in Options verifier + try { + producer.get().produceMessagesSync(producerRecords); + } catch (TopicExistsException e) { + // only do this on exception, otherwise we have to check for presence every time we try to send + tryEnsureTopicExists(context); + // send again + producer.get().produceMessagesSync(producerRecords); + } + } catch (Exception sendError) { + InternalRuntimeError multiRoot = new InternalRuntimeError( + msg("Error sending record to DLQ, while reacting to the user failure: {}", userError.getMessage()), + sendError); + attachRootCause(sendError, userError); + multiRoot.addSuppressed(userError); + throw multiRoot; + } + } + + private void tryEnsureTopicExists(PollContextInternal context) { + var topics = context.getByTopicPartitionMap().keySet().stream() + .map(TopicPartition::topic) + .distinct() + .map(name -> new NewTopic(DLQ_SUFFIX + name, empty(), empty())) + .collect(Collectors.toList()); + this.adminClient.createTopics(topics); + } + + private void attachRootCause(final Exception sendError, final PCTerminalException userError) { + //noinspection ThrowableNotThrown + Throwable root = getRootCause(sendError); + root.initCause(userError); + } + + private Throwable getRootCause(Throwable sendError) { + Throwable cause = sendError.getCause(); + if (cause == null) return sendError; + else return getRootCause(cause); + } + + @SuppressWarnings("FeatureEnvy") + private List> prepareDlqMsgs(PollContextInternal context, final PCTerminalException userError) { + return context.stream().map(recordContext -> { + Iterable

headers = makeDlqHeaders(recordContext, userError); + Integer partition = null; + return new ProducerRecord<>(recordContext.topic(), + partition, + recordContext.key(), + recordContext.value(), + headers); + }).collect(Collectors.toList()); + } + + @SuppressWarnings("FeatureEnvy") + private Iterable
makeDlqHeaders(RecordContext recordContext, final PCTerminalException userError) { + int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts(); + Optional lastFailureAt = recordContext.getLastFailureAt(); + + String topic = recordContext.topic(); + + var failures = serializer.serialize(topic, Integer.toString(numberOfFailedAttempts)); + + Instant resolvedInstant = lastFailureAt.orElse(clock.instant()); + var last = serializer.serialize(topic, resolvedInstant.toString()); + + var cause = serializer.serialize(topic, userError.getMessage()); + + var offset = serializer.serialize(topic, String.valueOf(recordContext.offset())); + + var partition = serializer.serialize(topic, String.valueOf(recordContext.partition())); + + return UniLists.of( + new RecordHeader(FAILURE_COUNT_KEY, failures), + new RecordHeader(LAST_FAILURE_KEY, last), + new RecordHeader(FAILURE_CAUSE_KEY, cause), + new RecordHeader(PARTITION_KEY, partition), + new RecordHeader(OFFSET_KEY, offset) + ); + } + private void handleExplicitUserRetriableFailure(PollContextInternal context, PCRetriableException e) { logUserFunctionException(e); markRecordsFailed(context.getWorkContainers(), e); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index 84d928b27..ab90eec1d 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -10,9 +10,12 @@ import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.ModelUtils; import io.confluent.parallelconsumer.state.WorkManager; +import org.apache.kafka.clients.admin.AdminClient; import org.junit.jupiter.api.Test; +import java.time.Clock; import java.util.List; +import java.util.Optional; import java.util.function.Function; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; @@ -39,7 +42,7 @@ private void run(TerminalFailureReaction shutdown, Function r = new UserFunctionRunner<>(mock); + UserFunctionRunner r = new UserFunctionRunner<>(mock, Clock.systemUTC(), Optional.empty(), mock(AdminClient.class)); var workFor = ModelUtils.createWorkFor(0); r.runUserFunction(fake, o -> {