diff --git a/kueue-example-pg/.env b/kueue-example-pg/.env index a359ebd..c09fcfe 100644 --- a/kueue-example-pg/.env +++ b/kueue-example-pg/.env @@ -1,5 +1,5 @@ PG_USER=test PG_PASS=t3st PG_NAME=kueue -PG_PORT=5432 +PG_PORT=65432 PG_POOL=10 diff --git a/kueue-example-pg/src/main/kotlin/eu/kueue/example/pg/command/CommandProducer.kt b/kueue-example-pg/src/main/kotlin/eu/kueue/example/pg/command/CommandProducer.kt index 40d3463..0aee402 100644 --- a/kueue-example-pg/src/main/kotlin/eu/kueue/example/pg/command/CommandProducer.kt +++ b/kueue-example-pg/src/main/kotlin/eu/kueue/example/pg/command/CommandProducer.kt @@ -40,10 +40,8 @@ class CommandProducer : CliktCommand(name = "producer") { ) } - @OptIn(ExperimentalCoroutinesApi::class) private val dispatcher = Dispatchers.Default.limitedParallelism(6) - @OptIn(ExperimentalTime::class) override fun run() = runBlocking { val duration = measureTime { @@ -55,8 +53,9 @@ class CommandProducer : CliktCommand(name = "producer") { } private suspend fun sendData() = withContext(dispatcher) { + val jobs = mutableListOf() repeat(amount) { - async(dispatcher) { + launch(dispatcher) { val (topic, message) = listOf( DEFAULT_TOPIC to RecordCreated( id = it, @@ -77,7 +76,8 @@ class CommandProducer : CliktCommand(name = "producer") { ).also { logger.info { "send $message" } } - } + }.also { jobs.add(it) } } + jobs.joinAll() } } diff --git a/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt b/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt index e74f96f..4605ccc 100644 --- a/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt +++ b/kueue-example-pg/src/test/kotlin/eu/kueue/example/pg/integration/IntegrationTest.kt @@ -133,7 +133,7 @@ class IntegrationTest { @EventHandler suspend fun on(event: RecordUpdated) { totalReceived++ - logger.info { "recevied $event" } + logger.info { "received $event" } if (totalReceived == stopOnCount) { consumer.stop() }