diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt index b17c8cb3dd1..94b24166d11 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt @@ -110,6 +110,8 @@ internal constructor( internal abstract inner class QueueReceiver(queueName: QueueName) { val queue = queues.getForReceiving(queueName) protected val shouldKeepRunning = AtomicBoolean(true) + private var cachedConsumerCount: Int? = null + private var cacheExpiry: Instant = Instant.MIN protected abstract fun receive(): List @@ -122,8 +124,18 @@ internal constructor( log.info { "shutting down receiver for ${queue.queueName}" } return Status.NO_RESCHEDULE } - val size = sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy) - val futures = List(size) { CompletableFuture.supplyAsync({ receive() }, receivingThreads) } + val now = clock.instant() + val size = if (cachedConsumerCount == null || now.isAfter(cacheExpiry)) { + sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy).also { + cachedConsumerCount = it + cacheExpiry = now.plusSeconds(10) + } + } else { + cachedConsumerCount!! + } + val futures = List(size) { + CompletableFuture.supplyAsync({ receive() }, receivingThreads) + } // Either all messages are consumed and processed successfully, or we signal failure. // If none of the received consume any messages, return NO_WORK for backoff.