From f80d99a886682060025d2b943e39ac025c5edbda Mon Sep 17 00:00:00 2001 From: Yue Yuan Date: Fri, 19 Dec 2025 01:15:19 -0500 Subject: [PATCH] cache number of consumers in QueueReceiver --- .../main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt index d889c44204b..e4339041527 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt @@ -116,6 +116,8 @@ internal class SqsJobConsumer @Inject internal constructor( internal abstract inner class QueueReceiver(queueName: QueueName) { val queue = queues.getForReceiving(queueName) protected val shouldKeepRunning = AtomicBoolean(true) + private var cachedConsumerCount: Int? = null + private var cacheExpiry: Instant = Instant.MIN protected abstract fun receive(): List @@ -130,7 +132,15 @@ internal class SqsJobConsumer @Inject internal constructor( } return Status.NO_RESCHEDULE } - val size = sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy) + val now = clock.instant() + val size = if (cachedConsumerCount == null || now.isAfter(cacheExpiry)) { + sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy).also { + cachedConsumerCount = it + cacheExpiry = now.plusSeconds(10) + } + } else { + cachedConsumerCount!! + } val futures = List(size) { CompletableFuture.supplyAsync({ receive() }, receivingThreads) }