diff --git a/README.md b/README.md index 6535b9a..149082f 100644 --- a/README.md +++ b/README.md @@ -61,15 +61,34 @@ producer.send( Setup a listener to process the events ```kotlin +/** + * Process messages one by one + */ class TestListener : EventListener { @EventHandler fun on(event: RecordCreated) { logger.info { "handle record created $event" } } + + @EventHandler + fun on(event: RecordCreated) { + logger.info { "handle record created $event" } + } +} + +/** + * Batch process messages + */ +class BatchListener : EventListener { + @EventHandler + fun on(event: List) { + logger.info { "handle record created ${event.size}" } + } } val listeners = listOf( - TestListener() + TestListener(), + BatchListener(), ) val consumer = PgConsumer( diff --git a/build.gradle.kts b/build.gradle.kts index 0908e79..d35fcf5 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -30,6 +30,7 @@ allprojects { val detekt = rootProject.libs.versions.detekt.get() detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:$detekt") implementation(platform("org.jetbrains.kotlin:kotlin-bom")) + testImplementation(kotlin("test")) testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") testImplementation(rootProject.libs.junit) } diff --git a/kueue-core/build.gradle.kts b/kueue-core/build.gradle.kts index 5c42d2e..aca1601 100644 --- a/kueue-core/build.gradle.kts +++ b/kueue-core/build.gradle.kts @@ -1,8 +1,6 @@ -plugins { - `java-library` -} - dependencies { implementation(libs.kotlinReflect) implementation(libs.kotlinxJson) + testImplementation(libs.kotlinxCoroutinesTest) + testImplementation(libs.kotlinReflect) } diff --git a/kueue-core/src/main/kotlin/eu/kueue/Consumer.kt b/kueue-core/src/main/kotlin/eu/kueue/Consumer.kt index 91cd746..3062d16 100644 --- a/kueue-core/src/main/kotlin/eu/kueue/Consumer.kt +++ b/kueue-core/src/main/kotlin/eu/kueue/Consumer.kt @@ -28,5 +28,4 @@ suspend inline fun Consumer.subscribe( topic: String, batchSize: Int, listeners: List, -) = - subscribe(topic, batchSize, listeners, T::class) +) = subscribe(topic, batchSize, listeners, T::class) diff --git a/kueue-core/src/main/kotlin/eu/kueue/EventListener.kt b/kueue-core/src/main/kotlin/eu/kueue/EventListener.kt index 7cb561a..b66a760 100644 --- a/kueue-core/src/main/kotlin/eu/kueue/EventListener.kt +++ b/kueue-core/src/main/kotlin/eu/kueue/EventListener.kt @@ -1,8 +1,6 @@ package eu.kueue -import kotlin.reflect.KClass -import kotlin.reflect.KFunction -import kotlin.reflect.KParameter +import kotlin.reflect.* import kotlin.reflect.full.callSuspend import kotlin.reflect.full.declaredFunctions import kotlin.reflect.jvm.jvmErasure @@ -12,29 +10,41 @@ interface EventListener data class CallableListener( val listener: EventListener, val method: KFunction<*>, - val firstArgumentType: KClass<*>, + val firstParameter: Parameter, ) { suspend fun processMessage(message: Message) { method.callSuspend(listener, message) } + + suspend fun processMessages(messages: List) { + method.callSuspend(listener, messages) + } +} + +@JvmInline +value class Parameter(val value: KType) { + fun type(): KClassifier? = value.classifier + fun isList(): Boolean = value.classifier == List::class + fun listType(): KClassifier? = value.arguments.firstOrNull()?.type?.classifier } fun List.eventHandlers() = this.flatMap { listener -> - listener::class.declaredFunctions.filter { func -> - func.annotations.any { annotation -> - annotation is EventHandler - } - }.map { method -> - CallableListener( - listener = listener, - method = method, - firstArgumentType = method.firstArgumentType() - ) + listener.eventHandlers() + } + +fun EventListener.eventHandlers(): List = + this::class.declaredFunctions.filter { func -> + func.annotations.any { annotation -> + annotation is EventHandler } + }.map { method -> + CallableListener( + listener = this, + method = method, + firstParameter = Parameter(method.firstParameterType()), + ) } -private fun KFunction<*>.firstArgumentType(): KClass<*> = - parameters.first { - it.kind == KParameter.Kind.VALUE - }.type.jvmErasure +fun KFunction<*>.firstParameterType(): KType = + parameters.first { it.kind == KParameter.Kind.VALUE }.type diff --git a/kueue-core/src/main/kotlin/eu/kueue/RetryStrategy.kt b/kueue-core/src/main/kotlin/eu/kueue/RetryStrategy.kt index cb6d7ad..e7c6b27 100644 --- a/kueue-core/src/main/kotlin/eu/kueue/RetryStrategy.kt +++ b/kueue-core/src/main/kotlin/eu/kueue/RetryStrategy.kt @@ -1,12 +1,9 @@ package eu.kueue -typealias MessageProcessor = suspend (message: T) -> Unit - typealias RetryPredicate = (cause: Throwable) -> Boolean -interface RetryStrategy { +fun interface RetryStrategy { suspend fun runWithRetry( - message: T, - processMessage: MessageProcessor, - ) + action: suspend () -> T + ): Result } diff --git a/kueue-core/src/test/kotlin/eu/kueue/EventListenerTest.kt b/kueue-core/src/test/kotlin/eu/kueue/EventListenerTest.kt new file mode 100644 index 0000000..ef8f1d4 --- /dev/null +++ b/kueue-core/src/test/kotlin/eu/kueue/EventListenerTest.kt @@ -0,0 +1,43 @@ +package eu.kueue + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class EventListenerTest { + + @Test + fun `get parameter type of listener`() { + val listener = SingleEventListener() + assertEquals(RecordCreated::class, listener.eventHandlers().first().firstParameter.type()) + } + + @Test + fun `get parameter type of batch listener`() { + val listener = BatchEventListener() + val handler = listener.eventHandlers().first() + assertTrue { + handler.firstParameter.isList() + } + + assertEquals(RecordCreated::class, handler.firstParameter.listType()) + } +} + +data class RecordCreated( + val id: Int, +) : Message + +class SingleEventListener : EventListener { + @EventHandler + suspend fun handle(event: RecordCreated) { + println(event.id) + } +} + +class BatchEventListener : EventListener { + @EventHandler + suspend fun handle(event: List) { + println(event) + } +} diff --git a/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt b/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt index 4605ccc..3d43934 100644 --- a/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt +++ b/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt @@ -2,6 +2,7 @@ package eu.kueue.example.pg.integration import eu.kueue.* import eu.kueue.example.pg.SerializerType +import eu.kueue.example.pg.message.RecordCreated import eu.kueue.example.pg.message.RecordUpdated import eu.kueue.example.pg.serializer import eu.kueue.pg.vertx.PgConsumer @@ -9,7 +10,9 @@ import eu.kueue.pg.vertx.PgProducer import io.vertx.pgclient.PgBuilder import io.vertx.pgclient.PgConnectOptions import io.vertx.sqlclient.PoolOptions +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout import mu.KotlinLogging import org.junit.jupiter.api.* import org.junit.jupiter.api.Assertions.assertEquals @@ -19,7 +22,9 @@ import org.testcontainers.containers.wait.strategy.Wait import org.testcontainers.junit.jupiter.Container import org.testcontainers.junit.jupiter.Testcontainers import org.testcontainers.utility.DockerImageName -import java.lang.System.getenv +import kotlin.concurrent.timer +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds private const val PG_USER = "test" private const val PG_PASS = "t3st" @@ -87,11 +92,6 @@ class IntegrationTest { serializer = serializer, ) - private val listener = CountListener( - consumer = consumer, - stopOnCount = messageCount, - ) - @Test @Order(1) fun `test producer send`() = runBlocking { @@ -115,13 +115,52 @@ class IntegrationTest { @Test @Order(2) fun `test consumer receive`() = runBlocking { + val listener = CountListener( + consumer = consumer, + stopOnCount = messageCount, + ) consumer.subscribe( topic = TOPIC, batchSize = 8, listeners = listOf(listener) ) - consumer.start() - assertEquals(messageCount, listener.totalReceived) + consumeWithTimeout(consumer) { + assertEquals(messageCount, listener.totalReceived) + } + } + + @Test + @Order(3) + fun `test consumer receive bulk`() = runBlocking { + val sendMessageCount = 100 + val producer = PgProducer( + client = pool, + serializer = serializer, + ) + + assertDoesNotThrow { + repeat(sendMessageCount) { + val message = RecordCreated( + id = it, + title = "test $sendMessageCount" + ) + producer.send(TOPIC, message) + } + } + + val listener = CountListener( + consumer = consumer, + stopOnCount = messageCount, + ) + consumer.subscribe( + topic = TOPIC, + batchSize = 10, + listeners = listOf(listener) + ) + consumeWithTimeout(consumer) { + assertEquals(sendMessageCount, listener.totalReceived) + assertEquals(10, listener.listenerCalled) + } } class CountListener( @@ -129,10 +168,22 @@ class IntegrationTest { private val stopOnCount: Int, ) : EventListener { var totalReceived = 0 + var listenerCalled = 0 @EventHandler suspend fun on(event: RecordUpdated) { totalReceived++ + listenerCalled++ + logger.info { "received $event" } + if (totalReceived == stopOnCount) { + consumer.stop() + } + } + + @EventHandler + suspend fun on(event: List) { + totalReceived += event.size + listenerCalled++ logger.info { "received $event" } if (totalReceived == stopOnCount) { consumer.stop() @@ -140,3 +191,18 @@ class IntegrationTest { } } } + +suspend fun consumeWithTimeout( + consumer: Consumer, + timeout: Duration = 15.seconds, + block: suspend () -> Unit +) = + try { + withTimeout(timeout) { + consumer.start() + block() + } + } catch (e: TimeoutCancellationException) { + consumer.stop() + throw e + } diff --git a/kueue-pg-vertx/src/main/kotlin/eu/kueue/pg/vertx/PgConsumer.kt b/kueue-pg-vertx/src/main/kotlin/eu/kueue/pg/vertx/PgConsumer.kt index a41b7ca..81e7cf3 100644 --- a/kueue-pg-vertx/src/main/kotlin/eu/kueue/pg/vertx/PgConsumer.kt +++ b/kueue-pg-vertx/src/main/kotlin/eu/kueue/pg/vertx/PgConsumer.kt @@ -8,10 +8,7 @@ import io.vertx.sqlclient.Row import io.vertx.sqlclient.RowSet import io.vertx.sqlclient.SqlConnection import io.vertx.sqlclient.Tuple -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import kotlinx.coroutines.* import mu.KotlinLogging import kotlin.reflect.KClass import kotlin.time.Duration @@ -24,7 +21,7 @@ class PgConsumer( private val serializer: MessageSerializer, limitedParallelism: Int = 4, private val pollRetryDelay: Duration = 5.seconds, - private val retryStrategy: RetryStrategy = TimeoutRetryStrategy(), + private val retryStrategy: RetryStrategy = TimeoutRetryStrategy(), ) : Consumer { @OptIn(kotlinx.coroutines.ExperimentalCoroutinesApi::class) @@ -89,22 +86,48 @@ class PgConsumer( if (jobs.isEmpty()) { hasJobs = false } else { - jobs.forEach { message -> - subscription.listeners.forEach { callable -> - if (callable.firstArgumentType == message::class) { - retryStrategy.runWithRetry( - message, - callable::processMessage, - ) - } - } - } + runJobsWithRetry(subscription, jobs) } } } } while (hasJobs) } + private suspend fun runJobsWithRetry( + subscription: Subscription, + jobs: List, + ) { + val (batchListener, listener) = subscription.listeners.partition { + it.firstParameter.isList() + } + + // handle batch jobs + batchListener.forEach { batch -> + val type = batch.firstParameter.listType() + val batchJobs = jobs.filter { type == it::class } + if (batchJobs.isNotEmpty()) { + retryStrategy.runWithRetry { + batch::processMessages.invoke(batchJobs) + }.onFailure { + logger.error(it) { "failed to process batch job" } + } + } + } + + // handle single message processor + jobs.forEach { message -> + listener.forEach { callable -> + if (callable.firstParameter.type() == message::class) { + retryStrategy.runWithRetry { + callable::processMessage.invoke(message) + }.onFailure { + logger.error(it) { "failed to process message" } + } + } + } + } + } + private suspend fun pickJobs( topic: String, limit: Int, diff --git a/kueue-retry/src/main/kotlin/eu/kueue/retry/TimeoutRetryStrategy.kt b/kueue-retry/src/main/kotlin/eu/kueue/retry/TimeoutRetryStrategy.kt index c7535ad..1ec934f 100644 --- a/kueue-retry/src/main/kotlin/eu/kueue/retry/TimeoutRetryStrategy.kt +++ b/kueue-retry/src/main/kotlin/eu/kueue/retry/TimeoutRetryStrategy.kt @@ -2,7 +2,6 @@ package eu.kueue.retry -import eu.kueue.MessageProcessor import eu.kueue.RetryPredicate import eu.kueue.RetryStrategy import kotlinx.coroutines.delay @@ -17,41 +16,45 @@ class TimeoutRetryStrategy( private val retries: Int = 5, private val timeout: Duration = 5.seconds, ) : RetryStrategy { - override suspend fun runWithRetry(message: T, processMessage: MessageProcessor) = retryWithTimeOut( + override suspend fun runWithRetry(action: suspend () -> T) = retryWithTimeOut( predicate = predicate, retries = retries, timeout = timeout, - message = message, ) { - processMessage(message) + action() } } +@Suppress("ThrowsCount") suspend inline fun retryWithTimeOut( predicate: RetryPredicate = { it is Exception }, retries: Int = 5, timeout: Duration = 5.seconds, - message: T, - processor: MessageProcessor, -) { + action: () -> T, +): Result { for (i in 0..retries) { return try { - processor(message) + runCatching { + action() + }.onFailure { + throw it + } } catch (e: Throwable) { when { !predicate(e) -> throw e i < retries -> { val logger = KotlinLogging.logger { } - logger.error(e) { "retry attempt $i wait $timeout : $message" } + logger.error(e) { "retry attempt $i wait $timeout" } delay(timeout) continue } - else -> Unit + else -> Result.failure(e) } } } + error("Retry failure") } diff --git a/kueue-retry/src/test/kotlin/eu/kueue/retry/TimeoutRetryStrategyTest.kt b/kueue-retry/src/test/kotlin/eu/kueue/retry/TimeoutRetryStrategyTest.kt index 21490ae..22be848 100644 --- a/kueue-retry/src/test/kotlin/eu/kueue/retry/TimeoutRetryStrategyTest.kt +++ b/kueue-retry/src/test/kotlin/eu/kueue/retry/TimeoutRetryStrategyTest.kt @@ -2,7 +2,6 @@ package eu.kueue.retry -import eu.kueue.Message import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -10,25 +9,16 @@ import org.junit.jupiter.api.Test @kotlinx.coroutines.ExperimentalCoroutinesApi class TimeoutRetryStrategyTest { - data class TestMessage( - val id: Int, - ) : Message - - private val message = TestMessage(10) - @Test fun `test retry with timeout`() { runTest { val retry = 5 - var attemps = 0 - retryWithTimeOut( - message = message, - retries = retry, - ) { - attemps++ + var attempts = 0 + retryWithTimeOut(retries = retry) { + attempts++ throw Exception("oh no") } - assertEquals(retry + 1, attemps) + assertEquals(retry + 1, attempts) } } @@ -36,10 +26,7 @@ class TimeoutRetryStrategyTest { fun `test 0 retries`() { runTest { var attempts = 0 - retryWithTimeOut( - message = message, - retries = 0, - ) { + retryWithTimeOut(retries = 0) { attempts++ throw Exception("oh no") }