From d6e13db721d735c297100414b0882007be639562 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 12:57:17 +0100 Subject: [PATCH 1/7] START: DLQ draft --- .../ParallelConsumerOptions.java | 34 +++-- .../ParallelEoSStreamProcessor.java | 24 +-- .../AbstractParallelEoSStreamProcessor.java | 143 +++++++++--------- .../internal/ProducerManager.java | 41 ++++- .../internal/UserFunctionRunner.java | 86 ++++++++++- .../internal/UserFunctionRunnerTest.java | 4 +- 6 files changed, 221 insertions(+), 111 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index 3f6e92240..a3753707b 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -19,6 +19,7 @@ import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.DLQ; /** * The options for the {@link AbstractParallelEoSStreamProcessor} system. @@ -225,19 +226,6 @@ public boolean isUsingBatching() { private final TerminalFailureReaction terminalFailureReaction; - public enum TerminalFailureReaction { - SHUTDOWN, - SKIP, - // DLQ, TODO - } - - /** - * @return the combined target of the desired concurrency by the configured batch size - */ - public int getTargetAmountOfRecordsInFlight() { - return getMaxConcurrency() * getBatchSize(); - } - public void validate() { Objects.requireNonNull(consumer, "A consumer must be supplied"); @@ -248,6 +236,26 @@ public void validate() { // WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); + + // + if (!isProducerSupplied() && getTerminalFailureReaction() == DLQ) { + throw new IllegalArgumentException(msg("Wanting to use DQL failure mode ({}) without supplying a Producer instance", + getTerminalFailureReaction())); + } + + } + + /** + * @return the combined target of the desired concurrency by the configured batch size + */ + public int getTargetAmountOfRecordsInFlight() { + return getMaxConcurrency() * getBatchSize(); + } + + public enum TerminalFailureReaction { + SHUTDOWN, + SKIP, + DLQ } public boolean isUsingTransactionalProducer() { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index bbfb14866..040adf266 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -4,19 +4,14 @@ * Copyright (C) 2020-2022 Confluent, Inc. */ -import io.confluent.csid.utils.TimeUtils; import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor; -import io.confluent.parallelconsumer.internal.InternalRuntimeError; +import io.confluent.parallelconsumer.internal.ProducerManager; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import pl.tlinkowski.unij.api.UniLists; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -68,22 +63,9 @@ public void pollAndProduceMany(Function, List> results = new ArrayList<>(); log.trace("Producing {} messages in result...", recordListToProduce.size()); - - var futures = super.getProducerManager().get().produceMessages(recordListToProduce); - try { - for (Tuple, Future> future : futures) { - var recordMetadata = TimeUtils.time(() -> - future.getRight().get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS)); - var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata); - results.add(result); - } - } catch (Exception e) { - throw new InternalRuntimeError("Error while waiting for produce results", e); - } - - return results; + ProducerManager producer = super.getProducerManager().get(); + return producer.produceMessagesSyncWithContext(context.getPollContext(), recordListToProduce); }; supervisorLoop(wrappedUserFunc, callback); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 6ce59ffcf..a4c8cd3d6 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -104,32 +104,49 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall @Getter(PROTECTED) private final BlockingQueue> workMailBox = new LinkedBlockingQueue<>(); // Thread safe, highly performant, non blocking + private final UserFunctionRunner runner; + /** - * An inbound message to the controller. - *

- * Currently, an Either type class, representing either newly polled records to ingest, or a work result. + * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which + * way as per normal. + * + * @see ParallelConsumerOptions */ - @Value - @RequiredArgsConstructor(access = PRIVATE) - private static class ControllerEventMessage { - WorkContainer workContainer; - EpochAndRecordsMap consumerRecords; + public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { + Objects.requireNonNull(newOptions, "Options must be supplied"); - private boolean isWorkResult() { - return workContainer != null; - } + log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions); - private boolean isNewConsumerRecords() { - return !isWorkResult(); - } + options = newOptions; + options.validate(); - private static ControllerEventMessage of(EpochAndRecordsMap polledRecords) { - return new ControllerEventMessage<>(null, polledRecords); - } + this.dynamicExtraLoadFactor = new DynamicLoadFactor(); + this.consumer = options.getConsumer(); - public static ControllerEventMessage of(WorkContainer work) { - return new ControllerEventMessage(work, null); + checkGroupIdConfigured(consumer); + checkNotSubscribed(consumer); + checkAutoCommitIsDisabled(consumer); + + workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency()); + + this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock()); + + ConsumerManager consumerMgr = new ConsumerManager<>(consumer); + + this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions); + + if (options.isProducerSupplied()) { + this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options)); + if (options.isUsingTransactionalProducer()) + this.committer = this.producerManager.get(); + else + this.committer = this.brokerPollSubsystem; + } else { + this.producerManager = Optional.empty(); + this.committer = this.brokerPollSubsystem; } + + this.runner = new UserFunctionRunner(this, clock, getProducerManager()); } private final BrokerPollSystem brokerPollSubsystem; @@ -217,44 +234,18 @@ public Exception getFailureCause() { */ private boolean lastWorkRequestWasFulfilled = false; - /** - * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which - * way as per normal. - * - * @see ParallelConsumerOptions - */ - public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { - Objects.requireNonNull(newOptions, "Options must be supplied"); - - log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions); - - options = newOptions; - options.validate(); - - this.dynamicExtraLoadFactor = new DynamicLoadFactor(); - this.consumer = options.getConsumer(); - - checkGroupIdConfigured(consumer); - checkNotSubscribed(consumer); - checkAutoCommitIsDisabled(consumer); - - workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency()); - - this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock()); - - ConsumerManager consumerMgr = new ConsumerManager<>(consumer); - - this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions); - - if (options.isProducerSupplied()) { - this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options)); - if (options.isUsingTransactionalProducer()) - this.committer = this.producerManager.get(); - else - this.committer = this.brokerPollSubsystem; - } else { - this.producerManager = Optional.empty(); - this.committer = this.brokerPollSubsystem; + private void submitWorkToPoolInner(final Function, List> usersFunction, + final Consumer callback, + final List> batch) { + // for each record, construct dispatch to the executor and capture a Future + log.trace("Sending work ({}) to pool", batch); + Future outputRecordFuture = workerThreadPool.submit(() -> { + addInstanceMDC(); + return this.runner.runUserFunction(usersFunction, callback, batch); + }); + // for a batch, each message in the batch shares the same result + for (final WorkContainer workContainer : batch) { + workContainer.setFuture(outputRecordFuture); } } @@ -763,20 +754,32 @@ protected void submitWorkToPool(Function, List> } } + /** + * An inbound message to the controller. + *

+ * Currently, an Either type class, representing either newly polled records to ingest, or a work result. + */ + @Value + @RequiredArgsConstructor(access = PRIVATE) + // todo refactor - this has been extracted in #270 + private static class ControllerEventMessage { + WorkContainer workContainer; + EpochAndRecordsMap consumerRecords; + + private boolean isWorkResult() { + return workContainer != null; + } - private void submitWorkToPoolInner(final Function, List> usersFunction, - final Consumer callback, - final List> batch) { - // for each record, construct dispatch to the executor and capture a Future - log.trace("Sending work ({}) to pool", batch); - Future outputRecordFuture = workerThreadPool.submit(() -> { - addInstanceMDC(); - UserFunctionRunner runner = new UserFunctionRunner<>(this); - return runner.runUserFunction(usersFunction, callback, batch); - }); - // for a batch, each message in the batch shares the same result - for (final WorkContainer workContainer : batch) { - workContainer.setFuture(outputRecordFuture); + private boolean isNewConsumerRecords() { + return !isWorkResult(); + } + + private static ControllerEventMessage of(EpochAndRecordsMap polledRecords) { + return new ControllerEventMessage<>(null, polledRecords); + } + + public static ControllerEventMessage of(WorkContainer work) { + return new ControllerEventMessage(work, null); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java index 93f49ea5e..c2487efcb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java @@ -5,8 +5,10 @@ */ import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; import io.confluent.parallelconsumer.state.WorkManager; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -25,10 +27,16 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static io.confluent.csid.utils.StringUtils.msg; +/** + * todo docs + */ @Slf4j public class ProducerManager extends AbstractOffsetCommitter implements OffsetCommitter { @@ -123,7 +131,7 @@ private TransactionManager getTransactionManager() { * @see ParallelConsumer#poll * @see ParallelStreamProcessor#pollAndProduceMany */ - public List, Future>> produceMessages(List> outMsgs) { + public List, Future>> produceMessages(List> outMsgs) { // only needed if not using tx Callback callback = (RecordMetadata metadata, Exception exception) -> { if (exception != null) { @@ -134,12 +142,12 @@ public List, Future> ReentrantReadWriteLock.ReadLock readLock = producerTransactionLock.readLock(); readLock.lock(); - List, Future>> futures = new ArrayList<>(outMsgs.size()); + List, Future>> futures = new ArrayList<>(outMsgs.size()); try { for (ProducerRecord rec : outMsgs) { log.trace("Producing {}", rec); var future = producer.send(rec, callback); - futures.add(ParallelConsumer.Tuple.pairOf(rec, future)); + futures.add(Tuple.pairOf(rec, future)); } } finally { readLock.unlock(); @@ -148,6 +156,33 @@ public List, Future> return futures; } + public List> produceMessagesSyncWithContext(PollContext context, List> outMsgs) { + return produceMessagesSync(outMsgs).map(tuple -> { + RecordMetadata recordMetadata = tuple.getRight(); + ProducerRecord producerRecord = tuple.getLeft(); + return new ParallelStreamProcessor.ConsumeProduceResult<>(context, producerRecord, recordMetadata); + }).collect(Collectors.toList()); + } + + public Stream, RecordMetadata>> produceMessagesSync(List> outMsgs) { + var futures = produceMessages(outMsgs); + + try { + return futures.stream().map(tuple -> { + Future futureSendResult = tuple.getRight(); + ProducerRecord producerRecord = tuple.getLeft(); + try { + RecordMetadata recordMetadata = futureSendResult.get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS); + return new Tuple<>(producerRecord, recordMetadata); + } catch (Exception e) { + throw new RuntimeException(msg("Error while waiting on individual produce result {}", producerRecord), e); + } + }); + } catch (Exception e) { + throw new InternalRuntimeError(msg("Error while waiting for produce results {}", outMsgs), e); + } + } + @Override protected void preAcquireWork() { acquireCommitLock(); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 4964b52f4..e42690128 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -8,27 +8,46 @@ import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.state.WorkContainer; import lombok.AllArgsConstructor; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.slf4j.MDC; import pl.tlinkowski.unij.api.UniLists; +import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Collectors; import static io.confluent.csid.utils.StringUtils.msg; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; -import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SKIP; +import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.*; import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR; @AllArgsConstructor @Slf4j public class UserFunctionRunner { + private static final String FAILURE_COUNT_KEY = "pc-failure-count"; + + private static final String LAST_FAILURE_KEY = "pc-last-failure-at"; + private static final Serializer serializer = Serdes.String().serializer(); + private AbstractParallelEoSStreamProcessor pc; + private final Clock clock; + + private final Optional> producer; + + /** * Run the supplied function. */ @@ -113,7 +132,8 @@ private List, R>> handleUserSuccess(Consumer c } private List, R>> handleUserTerminalFailure(PollContextInternal context, - PCTerminalException e, Consumer callback) { + PCTerminalException e, + Consumer callback) { var reaction = pc.getOptions().getTerminalFailureReaction(); if (reaction == SKIP) { @@ -134,11 +154,71 @@ private List, R>> handleUserTerminalFailure(PollC // throw again to make the future failed throw e; + } else if (reaction == DLQ) { + handleDlqReaction(context, e); + // othewise pretend to succeed + return handleUserSuccess(callback, context.getWorkContainers(), UniLists.of()); } else { throw new InternalRuntimeError(msg("Unsupported reaction config ({}) - submit a bug report.", reaction)); } } + private void handleDlqReaction(PollContextInternal context, PCTerminalException userError) { + try { + var producerRecords = prepareDlqMsgs(context); + //noinspection OptionalGetWithoutIsPresent - presence handled in Options verifier + producer.get().produceMessagesSync(producerRecords); + } catch (Exception sendError) { + InternalRuntimeError multiRoot = new InternalRuntimeError( + msg("Error sending record to DLQ, while reacting to the user failure: {}", userError.getMessage()), + sendError); + attachRootCause(sendError, userError); + multiRoot.addSuppressed(userError); + throw multiRoot; + } + } + + private void attachRootCause(final Exception sendError, final PCTerminalException userError) { + //noinspection ThrowableNotThrown + Throwable root = getRootCause(sendError); + root.initCause(userError); + } + + @NonNull + private Throwable getRootCause(final Exception sendError) { + if (sendError.getCause() == null) return sendError; + else return getRootCause(sendError); + } + + @SuppressWarnings("FeatureEnvy") + private List> prepareDlqMsgs(PollContextInternal context) { + return context.stream().map(recordContext -> { + Iterable

headers = makeDlqHeaders(recordContext); + Integer partition = null; + return new ProducerRecord<>(recordContext.topic(), + partition, + recordContext.key(), + recordContext.value(), + headers); + }).collect(Collectors.toList()); + } + + @SuppressWarnings("FeatureEnvy") + private Iterable
makeDlqHeaders(RecordContext recordContext) { + int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts(); + Optional lastFailureAt = recordContext.getLastFailureAt(); + + byte[] failures = serializer.serialize(recordContext.topic(), Integer.toString(numberOfFailedAttempts)); + + Instant resolvedInstant = lastFailureAt.orElse(clock.instant()); + byte[] last = serializer.serialize(recordContext.topic(), resolvedInstant.toString()); + + return UniLists.of( + new RecordHeader(FAILURE_COUNT_KEY, failures), + new RecordHeader(LAST_FAILURE_KEY, last) + ); + } + private void handleExplicitUserRetriableFailure(PollContextInternal context, PCRetriableException e) { logUserFunctionException(e); markRecordsFailed(context.getWorkContainers(), e); diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index 84d928b27..4c9134791 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -12,7 +12,9 @@ import io.confluent.parallelconsumer.state.WorkManager; import org.junit.jupiter.api.Test; +import java.time.Clock; import java.util.List; +import java.util.Optional; import java.util.function.Function; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.SHUTDOWN; @@ -39,7 +41,7 @@ private void run(TerminalFailureReaction shutdown, Function r = new UserFunctionRunner<>(mock); + UserFunctionRunner r = new UserFunctionRunner<>(mock, Clock.systemUTC(), Optional.empty()); var workFor = ModelUtils.createWorkFor(0); r.runUserFunction(fake, o -> { From 901baf5de61062b9698ae1fc35aeb110b1ac00f4 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 13:14:16 +0100 Subject: [PATCH 2/7] START: DLQ draft --- .../ParallelConsumerOptions.java | 25 +-- .../AbstractParallelEoSStreamProcessor.java | 143 +++++++++--------- 2 files changed, 85 insertions(+), 83 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index a3753707b..d875d70c4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -226,6 +226,19 @@ public boolean isUsingBatching() { private final TerminalFailureReaction terminalFailureReaction; + public enum TerminalFailureReaction { + SHUTDOWN, + SKIP, + DLQ + } + + /** + * @return the combined target of the desired concurrency by the configured batch size + */ + public int getTargetAmountOfRecordsInFlight() { + return getMaxConcurrency() * getBatchSize(); + } + public void validate() { Objects.requireNonNull(consumer, "A consumer must be supplied"); @@ -245,18 +258,6 @@ public void validate() { } - /** - * @return the combined target of the desired concurrency by the configured batch size - */ - public int getTargetAmountOfRecordsInFlight() { - return getMaxConcurrency() * getBatchSize(); - } - - public enum TerminalFailureReaction { - SHUTDOWN, - SKIP, - DLQ - } public boolean isUsingTransactionalProducer() { return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a4c8cd3d6..a54f867bb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -107,46 +107,32 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall private final UserFunctionRunner runner; /** - * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which - * way as per normal. - * - * @see ParallelConsumerOptions + * An inbound message to the controller. + *

+ * Currently, an Either type class, representing either newly polled records to ingest, or a work result. */ - public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { - Objects.requireNonNull(newOptions, "Options must be supplied"); - - log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions); - - options = newOptions; - options.validate(); - - this.dynamicExtraLoadFactor = new DynamicLoadFactor(); - this.consumer = options.getConsumer(); - - checkGroupIdConfigured(consumer); - checkNotSubscribed(consumer); - checkAutoCommitIsDisabled(consumer); - - workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency()); - - this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock()); + @Value + @RequiredArgsConstructor(access = PRIVATE) + // todo refactor - this has been extracted in #270 + private static class ControllerEventMessage { + WorkContainer workContainer; + EpochAndRecordsMap consumerRecords; - ConsumerManager consumerMgr = new ConsumerManager<>(consumer); + private boolean isWorkResult() { + return workContainer != null; + } - this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions); + private boolean isNewConsumerRecords() { + return !isWorkResult(); + } - if (options.isProducerSupplied()) { - this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options)); - if (options.isUsingTransactionalProducer()) - this.committer = this.producerManager.get(); - else - this.committer = this.brokerPollSubsystem; - } else { - this.producerManager = Optional.empty(); - this.committer = this.brokerPollSubsystem; + private static ControllerEventMessage of(EpochAndRecordsMap polledRecords) { + return new ControllerEventMessage<>(null, polledRecords); } - this.runner = new UserFunctionRunner(this, clock, getProducerManager()); + public static ControllerEventMessage of(WorkContainer work) { + return new ControllerEventMessage(work, null); + } } private final BrokerPollSystem brokerPollSubsystem; @@ -234,19 +220,47 @@ public Exception getFailureCause() { */ private boolean lastWorkRequestWasFulfilled = false; - private void submitWorkToPoolInner(final Function, List> usersFunction, - final Consumer callback, - final List> batch) { - // for each record, construct dispatch to the executor and capture a Future - log.trace("Sending work ({}) to pool", batch); - Future outputRecordFuture = workerThreadPool.submit(() -> { - addInstanceMDC(); - return this.runner.runUserFunction(usersFunction, callback, batch); - }); - // for a batch, each message in the batch shares the same result - for (final WorkContainer workContainer : batch) { - workContainer.setFuture(outputRecordFuture); + /** + * Construct the AsyncConsumer by wrapping this passed in conusmer and producer, which can be configured any which + * way as per normal. + * + * @see ParallelConsumerOptions + */ + public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { + Objects.requireNonNull(newOptions, "Options must be supplied"); + + log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions); + + options = newOptions; + options.validate(); + + this.dynamicExtraLoadFactor = new DynamicLoadFactor(); + this.consumer = options.getConsumer(); + + checkGroupIdConfigured(consumer); + checkNotSubscribed(consumer); + checkAutoCommitIsDisabled(consumer); + + workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency()); + + this.wm = new WorkManager(newOptions, consumer, dynamicExtraLoadFactor, TimeUtils.getClock()); + + ConsumerManager consumerMgr = new ConsumerManager<>(consumer); + + this.brokerPollSubsystem = new BrokerPollSystem<>(consumerMgr, wm, this, newOptions); + + if (options.isProducerSupplied()) { + this.producerManager = Optional.of(new ProducerManager<>(options.getProducer(), consumerMgr, this.wm, options)); + if (options.isUsingTransactionalProducer()) + this.committer = this.producerManager.get(); + else + this.committer = this.brokerPollSubsystem; + } else { + this.producerManager = Optional.empty(); + this.committer = this.brokerPollSubsystem; } + + this.runner = new UserFunctionRunner(this, clock, getProducerManager()); } private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer consumer) { @@ -754,32 +768,19 @@ protected void submitWorkToPool(Function, List> } } - /** - * An inbound message to the controller. - *

- * Currently, an Either type class, representing either newly polled records to ingest, or a work result. - */ - @Value - @RequiredArgsConstructor(access = PRIVATE) - // todo refactor - this has been extracted in #270 - private static class ControllerEventMessage { - WorkContainer workContainer; - EpochAndRecordsMap consumerRecords; - private boolean isWorkResult() { - return workContainer != null; - } - - private boolean isNewConsumerRecords() { - return !isWorkResult(); - } - - private static ControllerEventMessage of(EpochAndRecordsMap polledRecords) { - return new ControllerEventMessage<>(null, polledRecords); - } - - public static ControllerEventMessage of(WorkContainer work) { - return new ControllerEventMessage(work, null); + private void submitWorkToPoolInner(final Function, List> usersFunction, + final Consumer callback, + final List> batch) { + // for each record, construct dispatch to the executor and capture a Future + log.trace("Sending work ({}) to pool", batch); + Future outputRecordFuture = workerThreadPool.submit(() -> { + addInstanceMDC(); + return this.runner.runUserFunction(usersFunction, callback, batch); + }); + // for a batch, each message in the batch shares the same result + for (final WorkContainer workContainer : batch) { + workContainer.setFuture(outputRecordFuture); } } From b130f5a9cb2a1f0cc9ac8e09d2740458e63c1459 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 14:13:36 +0100 Subject: [PATCH 3/7] add failure cause to headers --- .../internal/UserFunctionRunner.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index e42690128..4d5262759 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -36,9 +36,14 @@ @Slf4j public class UserFunctionRunner { - private static final String FAILURE_COUNT_KEY = "pc-failure-count"; + public static final String HEADER_PREFIX = "pc-"; + + private static final String FAILURE_COUNT_KEY = HEADER_PREFIX + "failure-count"; + + private static final String LAST_FAILURE_KEY = HEADER_PREFIX + "last-failure-at"; + + private static final String FAILURE_CAUSE_KEY = HEADER_PREFIX + "last-failure-cause"; - private static final String LAST_FAILURE_KEY = "pc-last-failure-at"; private static final Serializer serializer = Serdes.String().serializer(); private AbstractParallelEoSStreamProcessor pc; @@ -165,7 +170,7 @@ private List, R>> handleUserTerminalFailure(PollC private void handleDlqReaction(PollContextInternal context, PCTerminalException userError) { try { - var producerRecords = prepareDlqMsgs(context); + var producerRecords = prepareDlqMsgs(context, userError); //noinspection OptionalGetWithoutIsPresent - presence handled in Options verifier producer.get().produceMessagesSync(producerRecords); } catch (Exception sendError) { @@ -191,9 +196,9 @@ private Throwable getRootCause(final Exception sendError) { } @SuppressWarnings("FeatureEnvy") - private List> prepareDlqMsgs(PollContextInternal context) { + private List> prepareDlqMsgs(PollContextInternal context, final PCTerminalException userError) { return context.stream().map(recordContext -> { - Iterable

headers = makeDlqHeaders(recordContext); + Iterable
headers = makeDlqHeaders(recordContext, userError); Integer partition = null; return new ProducerRecord<>(recordContext.topic(), partition, @@ -204,18 +209,23 @@ private List> prepareDlqMsgs(PollContextInternal cont } @SuppressWarnings("FeatureEnvy") - private Iterable
makeDlqHeaders(RecordContext recordContext) { + private Iterable
makeDlqHeaders(RecordContext recordContext, final PCTerminalException userError) { int numberOfFailedAttempts = recordContext.getNumberOfFailedAttempts(); Optional lastFailureAt = recordContext.getLastFailureAt(); - byte[] failures = serializer.serialize(recordContext.topic(), Integer.toString(numberOfFailedAttempts)); + String topic = recordContext.topic(); + + byte[] failures = serializer.serialize(topic, Integer.toString(numberOfFailedAttempts)); Instant resolvedInstant = lastFailureAt.orElse(clock.instant()); - byte[] last = serializer.serialize(recordContext.topic(), resolvedInstant.toString()); + byte[] last = serializer.serialize(topic, resolvedInstant.toString()); + + byte[] cause = serializer.serialize(topic, userError.getMessage()); return UniLists.of( new RecordHeader(FAILURE_COUNT_KEY, failures), - new RecordHeader(LAST_FAILURE_KEY, last) + new RecordHeader(LAST_FAILURE_KEY, last), + new RecordHeader(FAILURE_CAUSE_KEY, cause) ); } From a91da4c6d9cdf2e3d553c9de7a6aac9fb566c662 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 14:28:37 +0100 Subject: [PATCH 4/7] try to create dlq topics if missing, with server defaults --- .../internal/ProducerManager.java | 10 +++---- .../internal/UserFunctionRunner.java | 27 ++++++++++++++++++- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java index c2487efcb..1f5fc31eb 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java @@ -22,7 +22,6 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; -import java.util.ArrayList; import java.util.ConcurrentModificationException; import java.util.List; import java.util.Map; @@ -142,18 +141,15 @@ public List, Future>> produceMessages ReentrantReadWriteLock.ReadLock readLock = producerTransactionLock.readLock(); readLock.lock(); - List, Future>> futures = new ArrayList<>(outMsgs.size()); try { - for (ProducerRecord rec : outMsgs) { + return outMsgs.stream().map(rec -> { log.trace("Producing {}", rec); var future = producer.send(rec, callback); - futures.add(Tuple.pairOf(rec, future)); - } + return Tuple.pairOf(rec, future); + }).collect(Collectors.toList()); } finally { readLock.unlock(); } - - return futures; } public List> produceMessagesSyncWithContext(PollContext context, List> outMsgs) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 4d5262759..3a0b48d7d 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -10,8 +10,12 @@ import lombok.AllArgsConstructor; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.serialization.Serdes; @@ -31,6 +35,7 @@ import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.ParallelConsumerOptions.TerminalFailureReaction.*; import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_WORK_CONTAINER_DESCRIPTOR; +import static java.util.Optional.empty; @AllArgsConstructor @Slf4j @@ -46,12 +51,16 @@ public class UserFunctionRunner { private static final Serializer serializer = Serdes.String().serializer(); + private static final String DLQ_SUFFIX = ".DLQ"; + private AbstractParallelEoSStreamProcessor pc; private final Clock clock; private final Optional> producer; + private final AdminClient adminClient; + /** * Run the supplied function. @@ -172,7 +181,14 @@ private void handleDlqReaction(PollContextInternal context, PCTerminalExce try { var producerRecords = prepareDlqMsgs(context, userError); //noinspection OptionalGetWithoutIsPresent - presence handled in Options verifier - producer.get().produceMessagesSync(producerRecords); + try { + producer.get().produceMessagesSync(producerRecords); + } catch (TopicExistsException e) { + // only do this on exception, otherwise we have to check for presence every time we try to send + tryEnsureTopicExists(context); + // send again + producer.get().produceMessagesSync(producerRecords); + } } catch (Exception sendError) { InternalRuntimeError multiRoot = new InternalRuntimeError( msg("Error sending record to DLQ, while reacting to the user failure: {}", userError.getMessage()), @@ -183,6 +199,15 @@ private void handleDlqReaction(PollContextInternal context, PCTerminalExce } } + private void tryEnsureTopicExists(PollContextInternal context) { + var topics = context.getByTopicPartitionMap().keySet().stream() + .map(TopicPartition::topic) + .distinct() + .map(name -> new NewTopic(DLQ_SUFFIX + name, empty(), empty())) + .collect(Collectors.toList()); + this.adminClient.createTopics(topics); + } + private void attachRootCause(final Exception sendError, final PCTerminalException userError) { //noinspection ThrowableNotThrown Throwable root = getRootCause(sendError); From 5d517bb6f3345cb88e2c93acaf17646089f44d45 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 14:41:39 +0100 Subject: [PATCH 5/7] try to create dlq topics if missing, with server defaults --- .../ParallelConsumerOptions.java | 11 +++++++++-- .../AbstractParallelEoSStreamProcessor.java | 2 +- .../internal/UserFunctionRunner.java | 18 ++++++++++++++---- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java index d875d70c4..f0c4704a4 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java @@ -9,6 +9,7 @@ import lombok.Builder; import lombok.Getter; import lombok.ToString; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.Producer; @@ -44,6 +45,11 @@ public class ParallelConsumerOptions { */ private final Producer producer; + /** + * Supplying a producer is only needed for some functions, which will cause an error if used when it's missing. + */ + private final AdminClient adminClient; + /** * Path to Managed executor service for Java EE */ @@ -251,8 +257,9 @@ public void validate() { WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay()); // - if (!isProducerSupplied() && getTerminalFailureReaction() == DLQ) { - throw new IllegalArgumentException(msg("Wanting to use DQL failure mode ({}) without supplying a Producer instance", + // todo should admin be required? + if (!isProducerSupplied() && getAdminClient() == null && getTerminalFailureReaction() == DLQ) { + throw new IllegalArgumentException(msg("Wanting to use DQL failure mode ({}) without supplying a either a Producer or Admin client (both are needed) instance", getTerminalFailureReaction())); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a54f867bb..471e8ac12 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -260,7 +260,7 @@ public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptio this.committer = this.brokerPollSubsystem; } - this.runner = new UserFunctionRunner(this, clock, getProducerManager()); + this.runner = new UserFunctionRunner<>(this, clock, getProducerManager(), options.getAdminClient()); } private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer consumer) { diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 3a0b48d7d..01e5749b5 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -49,6 +49,10 @@ public class UserFunctionRunner { private static final String FAILURE_CAUSE_KEY = HEADER_PREFIX + "last-failure-cause"; + private static final String PARTITION_KEY = HEADER_PREFIX + "partition"; + + private static final String OFFSET_KEY = HEADER_PREFIX + "offset"; + private static final Serializer serializer = Serdes.String().serializer(); private static final String DLQ_SUFFIX = ".DLQ"; @@ -240,17 +244,23 @@ private Iterable
makeDlqHeaders(RecordContext recordContext, final String topic = recordContext.topic(); - byte[] failures = serializer.serialize(topic, Integer.toString(numberOfFailedAttempts)); + var failures = serializer.serialize(topic, Integer.toString(numberOfFailedAttempts)); Instant resolvedInstant = lastFailureAt.orElse(clock.instant()); - byte[] last = serializer.serialize(topic, resolvedInstant.toString()); + var last = serializer.serialize(topic, resolvedInstant.toString()); + + var cause = serializer.serialize(topic, userError.getMessage()); + + var offset = serializer.serialize(topic, String.valueOf(recordContext.offset())); - byte[] cause = serializer.serialize(topic, userError.getMessage()); + var partition = serializer.serialize(topic, String.valueOf(recordContext.partition())); return UniLists.of( new RecordHeader(FAILURE_COUNT_KEY, failures), new RecordHeader(LAST_FAILURE_KEY, last), - new RecordHeader(FAILURE_CAUSE_KEY, cause) + new RecordHeader(FAILURE_CAUSE_KEY, cause), + new RecordHeader(PARTITION_KEY, partition), + new RecordHeader(OFFSET_KEY, offset) ); } From 0175ea9173ba11dda1916936e3ba12e4b8dab613 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Fri, 20 May 2022 15:04:26 +0100 Subject: [PATCH 6/7] step --- .../parallelconsumer/internal/ProducerManager.java | 7 +++++-- .../parallelconsumer/internal/UserFunctionRunner.java | 9 ++++----- .../internal/UserFunctionRunnerTest.java | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java index 1f5fc31eb..05ce77d61 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java @@ -34,6 +34,10 @@ import static io.confluent.csid.utils.StringUtils.msg; /** + * Central control point for {@link KafkaProducer} where it's needed in some scenarios, like transactions. + *

+ * Also has useful wrapping methods. + *

* todo docs */ @Slf4j @@ -45,9 +49,8 @@ public class ProducerManager extends AbstractOffsetCommitter impleme private final boolean producerIsConfiguredForTransactions; - /** - * The {@link KafkaProducer) isn't actually completely thread safe, at least when using it transactionally. We must + * The {@link KafkaProducer} isn't actually completely thread safe, at least when using it transactionally. We must * be careful not to send messages to the producer, while we are committing a transaction - "Cannot call send in * state COMMITTING_TRANSACTION". */ diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java index 01e5749b5..af3fab38c 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/UserFunctionRunner.java @@ -8,7 +8,6 @@ import io.confluent.parallelconsumer.ParallelConsumer.Tuple; import io.confluent.parallelconsumer.state.WorkContainer; import lombok.AllArgsConstructor; -import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; @@ -218,10 +217,10 @@ private void attachRootCause(final Exception sendError, final PCTerminalExceptio root.initCause(userError); } - @NonNull - private Throwable getRootCause(final Exception sendError) { - if (sendError.getCause() == null) return sendError; - else return getRootCause(sendError); + private Throwable getRootCause(Throwable sendError) { + Throwable cause = sendError.getCause(); + if (cause == null) return sendError; + else return getRootCause(cause); } @SuppressWarnings("FeatureEnvy") diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java index 4c9134791..ab90eec1d 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/internal/UserFunctionRunnerTest.java @@ -10,6 +10,7 @@ import io.confluent.parallelconsumer.PollContextInternal; import io.confluent.parallelconsumer.state.ModelUtils; import io.confluent.parallelconsumer.state.WorkManager; +import org.apache.kafka.clients.admin.AdminClient; import org.junit.jupiter.api.Test; import java.time.Clock; @@ -41,7 +42,7 @@ private void run(TerminalFailureReaction shutdown, Function r = new UserFunctionRunner<>(mock, Clock.systemUTC(), Optional.empty()); + UserFunctionRunner r = new UserFunctionRunner<>(mock, Clock.systemUTC(), Optional.empty(), mock(AdminClient.class)); var workFor = ModelUtils.createWorkFor(0); r.runUserFunction(fake, o -> { From a656b775d08517f036e1aad7b4504a6087b6c4ad Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Sat, 21 May 2022 21:57:46 +0100 Subject: [PATCH 7/7] step --- .../internal/AbstractParallelEoSStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index a54f867bb..a405109f0 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -113,7 +113,7 @@ public abstract class AbstractParallelEoSStreamProcessor implements Parall */ @Value @RequiredArgsConstructor(access = PRIVATE) - // todo refactor - this has been extracted in #270 + // todo refactor - this has been extracted in PR#270 private static class ControllerEventMessage { WorkContainer workContainer; EpochAndRecordsMap consumerRecords;