Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,34 @@ producer.send(
Setup a listener to process the events

```kotlin
/**
* Process messages one by one
*/
class TestListener : EventListener {
@EventHandler
fun on(event: RecordCreated) {
logger.info { "handle record created $event" }
}

@EventHandler
fun on(event: RecordCreated) {
logger.info { "handle record created $event" }
}
}

/**
* Batch process messages
*/
class BatchListener : EventListener {
@EventHandler
fun on(event: List<RecordCreated>) {
logger.info { "handle record created ${event.size}" }
}
}

val listeners = listOf(
TestListener()
TestListener(),
BatchListener(),
)

val consumer = PgConsumer(
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ allprojects {
val detekt = rootProject.libs.versions.detekt.get()
detektPlugins("io.gitlab.arturbosch.detekt:detekt-formatting:$detekt")
implementation(platform("org.jetbrains.kotlin:kotlin-bom"))
testImplementation(kotlin("test"))
testImplementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
testImplementation(rootProject.libs.junit)
}
Expand Down
6 changes: 2 additions & 4 deletions kueue-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
plugins {
`java-library`
}

dependencies {
implementation(libs.kotlinReflect)
implementation(libs.kotlinxJson)
testImplementation(libs.kotlinxCoroutinesTest)
testImplementation(libs.kotlinReflect)
}
3 changes: 1 addition & 2 deletions kueue-core/src/main/kotlin/eu/kueue/Consumer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ suspend inline fun <reified T : Message> Consumer.subscribe(
topic: String,
batchSize: Int,
listeners: List<EventListener>,
) =
subscribe(topic, batchSize, listeners, T::class)
) = subscribe(topic, batchSize, listeners, T::class)
46 changes: 28 additions & 18 deletions kueue-core/src/main/kotlin/eu/kueue/EventListener.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package eu.kueue

import kotlin.reflect.KClass
import kotlin.reflect.KFunction
import kotlin.reflect.KParameter
import kotlin.reflect.*
import kotlin.reflect.full.callSuspend
import kotlin.reflect.full.declaredFunctions
import kotlin.reflect.jvm.jvmErasure
Expand All @@ -12,29 +10,41 @@ interface EventListener
data class CallableListener(
val listener: EventListener,
val method: KFunction<*>,
val firstArgumentType: KClass<*>,
val firstParameter: Parameter,
) {
suspend fun processMessage(message: Message) {
method.callSuspend(listener, message)
}

suspend fun processMessages(messages: List<Message>) {
method.callSuspend(listener, messages)
}
}

@JvmInline
value class Parameter(val value: KType) {
fun type(): KClassifier? = value.classifier
fun isList(): Boolean = value.classifier == List::class
fun listType(): KClassifier? = value.arguments.firstOrNull()?.type?.classifier
}

fun List<EventListener>.eventHandlers() =
this.flatMap { listener ->
listener::class.declaredFunctions.filter { func ->
func.annotations.any { annotation ->
annotation is EventHandler
}
}.map { method ->
CallableListener(
listener = listener,
method = method,
firstArgumentType = method.firstArgumentType()
)
listener.eventHandlers()
}

fun EventListener.eventHandlers(): List<CallableListener> =
this::class.declaredFunctions.filter { func ->
func.annotations.any { annotation ->
annotation is EventHandler
}
}.map { method ->
CallableListener(
listener = this,
method = method,
firstParameter = Parameter(method.firstParameterType()),
)
}

private fun KFunction<*>.firstArgumentType(): KClass<*> =
parameters.first {
it.kind == KParameter.Kind.VALUE
}.type.jvmErasure
fun KFunction<*>.firstParameterType(): KType =
parameters.first { it.kind == KParameter.Kind.VALUE }.type
9 changes: 3 additions & 6 deletions kueue-core/src/main/kotlin/eu/kueue/RetryStrategy.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package eu.kueue

typealias MessageProcessor<T> = suspend (message: T) -> Unit

typealias RetryPredicate = (cause: Throwable) -> Boolean

interface RetryStrategy<T> {
fun interface RetryStrategy<T> {
suspend fun runWithRetry(
message: T,
processMessage: MessageProcessor<T>,
)
action: suspend () -> T
): Result<T>
}
43 changes: 43 additions & 0 deletions kueue-core/src/test/kotlin/eu/kueue/EventListenerTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package eu.kueue

import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class EventListenerTest {

@Test
fun `get parameter type of listener`() {
val listener = SingleEventListener()
assertEquals(RecordCreated::class, listener.eventHandlers().first().firstParameter.type())
}

@Test
fun `get parameter type of batch listener`() {
val listener = BatchEventListener()
val handler = listener.eventHandlers().first()
assertTrue {
handler.firstParameter.isList()
}

assertEquals(RecordCreated::class, handler.firstParameter.listType())
}
}

data class RecordCreated(
val id: Int,
) : Message

class SingleEventListener : EventListener {
@EventHandler
suspend fun handle(event: RecordCreated) {
println(event.id)
}
}

class BatchEventListener : EventListener {
@EventHandler
suspend fun handle(event: List<RecordCreated>) {
println(event)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package eu.kueue.example.pg.integration

import eu.kueue.*
import eu.kueue.example.pg.SerializerType
import eu.kueue.example.pg.message.RecordCreated
import eu.kueue.example.pg.message.RecordUpdated
import eu.kueue.example.pg.serializer
import eu.kueue.pg.vertx.PgConsumer
import eu.kueue.pg.vertx.PgProducer
import io.vertx.pgclient.PgBuilder
import io.vertx.pgclient.PgConnectOptions
import io.vertx.sqlclient.PoolOptions
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import mu.KotlinLogging
import org.junit.jupiter.api.*
import org.junit.jupiter.api.Assertions.assertEquals
Expand All @@ -19,7 +22,9 @@ import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.junit.jupiter.Container
import org.testcontainers.junit.jupiter.Testcontainers
import org.testcontainers.utility.DockerImageName
import java.lang.System.getenv
import kotlin.concurrent.timer
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

private const val PG_USER = "test"
private const val PG_PASS = "t3st"
Expand Down Expand Up @@ -87,11 +92,6 @@ class IntegrationTest {
serializer = serializer,
)

private val listener = CountListener(
consumer = consumer,
stopOnCount = messageCount,
)

@Test
@Order(1)
fun `test producer send`() = runBlocking {
Expand All @@ -115,28 +115,94 @@ class IntegrationTest {
@Test
@Order(2)
fun `test consumer receive`() = runBlocking {
val listener = CountListener(
consumer = consumer,
stopOnCount = messageCount,
)
consumer.subscribe<Message>(
topic = TOPIC,
batchSize = 8,
listeners = listOf(listener)
)
consumer.start()
assertEquals(messageCount, listener.totalReceived)
consumeWithTimeout(consumer) {
assertEquals(messageCount, listener.totalReceived)
}
}

@Test
@Order(3)
fun `test consumer receive bulk`() = runBlocking {
val sendMessageCount = 100
val producer = PgProducer(
client = pool,
serializer = serializer,
)

assertDoesNotThrow {
repeat(sendMessageCount) {
val message = RecordCreated(
id = it,
title = "test $sendMessageCount"
)
producer.send(TOPIC, message)
}
}

val listener = CountListener(
consumer = consumer,
stopOnCount = messageCount,
)
consumer.subscribe<Message>(
topic = TOPIC,
batchSize = 10,
listeners = listOf(listener)
)
consumeWithTimeout(consumer) {
assertEquals(sendMessageCount, listener.totalReceived)
assertEquals(10, listener.listenerCalled)
}
}

class CountListener(
private val consumer: Consumer,
private val stopOnCount: Int,
) : EventListener {
var totalReceived = 0
var listenerCalled = 0

@EventHandler
suspend fun on(event: RecordUpdated) {
totalReceived++
listenerCalled++
logger.info { "received $event" }
if (totalReceived == stopOnCount) {
consumer.stop()
}
}

@EventHandler
suspend fun on(event: List<RecordCreated>) {
totalReceived += event.size
listenerCalled++
logger.info { "received $event" }
if (totalReceived == stopOnCount) {
consumer.stop()
}
}
}
}

suspend fun consumeWithTimeout(
consumer: Consumer,
timeout: Duration = 15.seconds,
block: suspend () -> Unit
) =
try {
withTimeout(timeout) {
consumer.start()
block()
}
} catch (e: TimeoutCancellationException) {
consumer.stop()
throw e
}
Loading