From 6b0bdcae5dd24389817c9659708a51f41fa7f9f2 Mon Sep 17 00:00:00 2001 From: Hamdan Javeed Date: Mon, 15 Apr 2024 17:02:54 -0400 Subject: [PATCH 1/2] Make AwsSqsJobHandlerSubscriptionService public... ... to allow usage in the service dependency graph Users may wish to let other Services depend on the AWS JobHandler Subscription service in cases where you want to ensure that SQS job handling has begun before your service has started. --- .../src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobHandlerModule.kt | 2 +- .../kotlin/misk/jobqueue/sqs/AwsSqsQueueAttributeImporter.kt | 2 +- misk-aws/src/main/kotlin/misk/jobqueue/sqs/QueueResolver.kt | 2 +- misk-aws/src/main/kotlin/misk/jobqueue/sqs/ResolvedQueue.kt | 2 +- misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt | 2 +- misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsMetrics.kt | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobHandlerModule.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobHandlerModule.kt index 38b933103c2..53b9d1716a5 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobHandlerModule.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsJobHandlerModule.kt @@ -72,7 +72,7 @@ class AwsSqsJobHandlerModule private constructor( } @Singleton -internal class AwsSqsJobHandlerSubscriptionService @Inject constructor( +class AwsSqsJobHandlerSubscriptionService @Inject internal constructor( private val attributeImporter: AwsSqsQueueAttributeImporter, private val consumer: SqsJobConsumer, private val consumerMapping: Map, diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsQueueAttributeImporter.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsQueueAttributeImporter.kt index b792271f575..1c14c272c0d 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsQueueAttributeImporter.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/AwsSqsQueueAttributeImporter.kt @@ -11,7 +11,7 @@ import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean import jakarta.inject.Inject -internal class AwsSqsQueueAttributeImporter @Inject constructor( +class AwsSqsQueueAttributeImporter @Inject internal constructor( private val config: AwsSqsJobQueueConfig, private val leaseManager: LeaseManager, private val metrics: SqsMetrics, diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/QueueResolver.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/QueueResolver.kt index ec47e9a3906..a88019e84b7 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/QueueResolver.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/QueueResolver.kt @@ -13,7 +13,7 @@ import jakarta.inject.Inject import jakarta.inject.Singleton @Singleton -internal class QueueResolver @Inject internal constructor( +class QueueResolver @Inject internal constructor( private val currentRegion: AwsRegion, private val currentAccount: AwsAccountId, private val defaultSQS: AmazonSQS, diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/ResolvedQueue.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/ResolvedQueue.kt index a2ff768636d..f1a51c5e113 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/ResolvedQueue.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/ResolvedQueue.kt @@ -7,7 +7,7 @@ import misk.cloud.aws.AwsRegion import misk.jobqueue.QueueName /** [ResolvedQueue] provides information needed to reach an SQS queue */ -internal class ResolvedQueue( +class ResolvedQueue internal constructor( val name: QueueName, val sqsQueueName: QueueName, val url: String, diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt index 0cd7480edf5..5613557efc8 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsJobConsumer.kt @@ -31,7 +31,7 @@ import com.google.inject.Provider import jakarta.inject.Singleton @Singleton -internal class SqsJobConsumer @Inject internal constructor( +class SqsJobConsumer @Inject internal constructor( @ForSqsHandling private val handlingThreads: ExecutorService, @ForSqsHandling private val taskQueue: RepeatedTaskQueue, @ForSqsReceiving private val receivingThreads: ExecutorService, diff --git a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsMetrics.kt b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsMetrics.kt index 6b48f1b0490..748df7b3da4 100644 --- a/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsMetrics.kt +++ b/misk-aws/src/main/kotlin/misk/jobqueue/sqs/SqsMetrics.kt @@ -11,7 +11,7 @@ import jakarta.inject.Singleton * for queues both client-side and on SQS with the same label. */ @Singleton -internal class SqsMetrics @Inject internal constructor(metrics: Metrics) { +class SqsMetrics @Inject internal constructor(metrics: Metrics) { val jobsEnqueued = metrics.counter( "jobs_enqueued_total", "total # of jobs sent to a queueName", From 8e32fde56092f83104713cbc9b153c987d92e29b Mon Sep 17 00:00:00 2001 From: Hamdan Javeed Date: Mon, 15 Apr 2024 17:15:44 -0400 Subject: [PATCH 2/2] fix: include public API changes to misk-aws --- misk-aws/api/misk-aws.api | 66 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/misk-aws/api/misk-aws.api b/misk-aws/api/misk-aws.api index bb57330f3f9..bc5cef9393e 100644 --- a/misk-aws/api/misk-aws.api +++ b/misk-aws/api/misk-aws.api @@ -56,6 +56,9 @@ public final class misk/jobqueue/sqs/AwsSqsJobHandlerModule$Companion { public static synthetic fun create$default (Lmisk/jobqueue/sqs/AwsSqsJobHandlerModule$Companion;Lmisk/jobqueue/QueueName;Lkotlin/reflect/KClass;ZLjava/util/List;ILjava/lang/Object;)Lmisk/jobqueue/sqs/AwsSqsJobHandlerModule; } +public final class misk/jobqueue/sqs/AwsSqsJobHandlerSubscriptionService : com/google/common/util/concurrent/AbstractIdleService { +} + public final class misk/jobqueue/sqs/AwsSqsJobQueueConfig : wisp/config/Config { public fun ()V public fun (Ljava/util/Map;)V @@ -104,6 +107,16 @@ public final class misk/jobqueue/sqs/AwsSqsJobReceiverPolicy : java/lang/Enum { public static fun values ()[Lmisk/jobqueue/sqs/AwsSqsJobReceiverPolicy; } +public final class misk/jobqueue/sqs/AwsSqsQueueAttributeImporter { + public static final field Companion Lmisk/jobqueue/sqs/AwsSqsQueueAttributeImporter$Companion; + public final fun getRunning ()Ljava/util/concurrent/atomic/AtomicBoolean; + public final fun import (Lmisk/jobqueue/QueueName;)V + public final fun shutDown ()V +} + +public final class misk/jobqueue/sqs/AwsSqsQueueAttributeImporter$Companion { +} + public final class misk/jobqueue/sqs/AwsSqsQueueConfig { public fun ()V public fun (Ljava/lang/String;)V @@ -187,12 +200,38 @@ public final class misk/jobqueue/sqs/FlaggedBufferedSqsClient$Companion { public final fun getFEATURE ()Lmisk/feature/Feature; } +public final class misk/jobqueue/sqs/QueueResolver { + public static final field Companion Lmisk/jobqueue/sqs/QueueResolver$Companion; + public final fun getDeadLetter (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/sqs/ResolvedQueue; + public final fun getForReceiving (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/sqs/ResolvedQueue; + public final fun getForSending (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/sqs/ResolvedQueue; +} + +public final class misk/jobqueue/sqs/QueueResolver$Companion { + public final fun getLog ()Lmu/KLogger; +} + public final class misk/jobqueue/sqs/QueueResolverKt { public static final fun getParentQueue (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName; public static final fun getRetryQueue (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName; public static final fun isRetryQueue (Lmisk/jobqueue/QueueName;)Z } +public final class misk/jobqueue/sqs/ResolvedQueue { + public final fun call (Lkotlin/jvm/functions/Function1;)Ljava/lang/Object; + public final fun getAccountId ()Lmisk/cloud/aws/AwsAccountId; + public final fun getClient ()Lcom/amazonaws/services/sqs/AmazonSQS; + public final fun getName ()Lmisk/jobqueue/QueueName; + public final fun getQueueName ()Ljava/lang/String; + public final fun getRegion ()Lmisk/cloud/aws/AwsRegion; + public final fun getSqsQueueName ()Lmisk/jobqueue/QueueName; + public final fun getUrl ()Ljava/lang/String; +} + +public final class misk/jobqueue/sqs/ResolvedQueue$SQSException : java/lang/RuntimeException { + public fun (Lcom/amazonaws/AmazonClientException;Lmisk/jobqueue/sqs/ResolvedQueue;)V +} + public final class misk/jobqueue/sqs/SqsConsumerAllocator { public static final field Companion Lmisk/jobqueue/sqs/SqsConsumerAllocator$Companion; public fun (Lwisp/lease/LeaseManager;Lmisk/feature/FeatureFlags;)V @@ -203,6 +242,33 @@ public final class misk/jobqueue/sqs/SqsConsumerAllocator$Companion { public final fun leaseName (Lmisk/jobqueue/QueueName;I)Ljava/lang/String; } +public final class misk/jobqueue/sqs/SqsJobConsumer : misk/jobqueue/JobConsumer { + public static final field Companion Lmisk/jobqueue/sqs/SqsJobConsumer$Companion; + public final fun shutDown ()V + public fun subscribe (Lmisk/jobqueue/QueueName;Lmisk/jobqueue/JobHandler;)V + public fun unsubscribe (Lmisk/jobqueue/QueueName;)V +} + +public final class misk/jobqueue/sqs/SqsJobConsumer$Companion { +} + +public final class misk/jobqueue/sqs/SqsMetrics { + public final fun getHandlerDispatchTime ()Lmisk/metrics/Histogram; + public final fun getHandlerFailures ()Lio/prometheus/client/Counter; + public final fun getJobEnqueueFailures ()Lio/prometheus/client/Counter; + public final fun getJobsAcknowledged ()Lio/prometheus/client/Counter; + public final fun getJobsDeadLettered ()Lio/prometheus/client/Counter; + public final fun getJobsEnqueued ()Lio/prometheus/client/Counter; + public final fun getJobsReceived ()Lio/prometheus/client/Counter; + public final fun getQueueProcessingLag ()Lmisk/metrics/Histogram; + public final fun getSqsApproxNumberOfMessages ()Lio/prometheus/client/Gauge; + public final fun getSqsApproxNumberOfMessagesNotVisible ()Lio/prometheus/client/Gauge; + public final fun getSqsDeleteTime ()Lmisk/metrics/Histogram; + public final fun getSqsReceiveTime ()Lmisk/metrics/Histogram; + public final fun getSqsSendTime ()Lmisk/metrics/Histogram; + public final fun getVisibilityTime ()Lio/prometheus/client/Gauge; +} + public final class misk/jobqueue/sqs/StaticDeadLetterQueueProvider : misk/jobqueue/sqs/DeadLetterQueueProvider { public fun (Ljava/lang/String;)V public fun deadLetterQueueFor (Lmisk/jobqueue/QueueName;)Lmisk/jobqueue/QueueName;