Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ internal constructor(
internal abstract inner class QueueReceiver(queueName: QueueName) {
val queue = queues.getForReceiving(queueName)
protected val shouldKeepRunning = AtomicBoolean(true)
private var cachedConsumerCount: Int? = null
private var cacheExpiry: Instant = Instant.MIN

protected abstract fun receive(): List<Status>

Expand All @@ -122,8 +124,18 @@ internal constructor(
log.info { "shutting down receiver for ${queue.queueName}" }
return Status.NO_RESCHEDULE
}
val size = sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy)
val futures = List(size) { CompletableFuture.supplyAsync({ receive() }, receivingThreads) }
val now = clock.instant()
val size = if (cachedConsumerCount == null || now.isAfter(cacheExpiry)) {
sqsConsumerAllocator.computeSqsConsumersForPod(queue.name, receiverPolicy).also {
cachedConsumerCount = it
cacheExpiry = now.plusSeconds(10)
}
} else {
cachedConsumerCount!!
}
val futures = List(size) {
CompletableFuture.supplyAsync({ receive() }, receivingThreads)
}

// Either all messages are consumed and processed successfully, or we signal failure.
// If none of the received consume any messages, return NO_WORK for backoff.
Expand Down
Loading