Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/main/kotlin/at/ac/uibk/dps/cirrina/Cirrina.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ class Cirrina {
}

companion object {
const val ETCD_CONNECTION_TIMEOUT = 1000L

init {
configureLogging()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@ object EnvironmentVariables {
val influxMetricOrg = EnvironmentVariable("INFLUX_METRIC_ORG", "org")
val influxMetricBucket = EnvironmentVariable("INFLUX_METRIC_BUCKET", "bucket")
val influxMetricToken = EnvironmentVariable("INFLUX_METRIC_TOKEN", "bzO10KmR8x")
val influxMetricStep = EnvironmentVariable("INFLUX_METRIC_STEP", 5000L)
val influxMetricStep = EnvironmentVariable("INFLUX_METRIC_STEP", 5000L, { it.toLong() })

val zipkinTraceUrl = EnvironmentVariable<String?>("ZIPKIN_TRACE_URL", null)

val csmMainUri = EnvironmentVariable("CSM_MAIN_URI", "file:///app/main.pkl")

val csmBarrier = EnvironmentVariable<String?>("CSM_BARRIER", null)
val csmParties = EnvironmentVariable("CSM_PARTIES", null, { it.toInt() })

val eventProvider =
EnvironmentVariable(
"EVENT_PROVIDER",
Expand Down
14 changes: 14 additions & 0 deletions src/main/kotlin/at/ac/uibk/dps/cirrina/Runtime.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package at.ac.uibk.dps.cirrina

import at.ac.uibk.dps.cirrina.cirrina.di.CsmMain
import at.ac.uibk.dps.cirrina.cirrina.di.Identifier
import at.ac.uibk.dps.cirrina.execution.`object`.Context
import at.ac.uibk.dps.cirrina.execution.`object`.ContextVariable
import at.ac.uibk.dps.cirrina.execution.`object`.Event
Expand Down Expand Up @@ -34,6 +35,7 @@ private val logger = KotlinLogging.logger {}
class Runtime
@Inject
constructor(
@Identifier private val identifier: String,
private val eventHandler: EventHandler,
private val stateMachineFactory: StateMachine.Factory,
persistentContext: Context?,
Expand Down Expand Up @@ -122,6 +124,18 @@ constructor(
stateMachineInstances[stateMachineObjectName]

fun run() = runBlocking {
val barrier = EnvironmentVariables.csmBarrier.get()
val parties = EnvironmentVariables.csmParties.get()

if (barrier != null && parties != null) {
logger.info { "waiting for barrier '$barrier' with '$parties' parties" }

eventHandler.register(barrier, identifier)
eventHandler.wait(barrier, parties)

logger.info { "barrier reached" }
}

measureTime {
stateMachineInstances.values.forEach { it.start() }

Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/at/ac/uibk/dps/cirrina/di/CirrinaComponent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package at.ac.uibk.dps.cirrina.di

import at.ac.uibk.dps.cirrina.Runtime
import at.ac.uibk.dps.cirrina.cirrina.di.CirrinaModule
import at.ac.uibk.dps.cirrina.cirrina.di.Identifier
import at.ac.uibk.dps.cirrina.execution.`object`.Context
import at.ac.uibk.dps.cirrina.execution.`object`.EventHandler
import dagger.Component
import io.micrometer.core.instrument.MeterRegistry
import jakarta.inject.Named
import jakarta.inject.Singleton

@Singleton
Expand All @@ -21,5 +21,5 @@ interface CirrinaComponent {

fun runtime(): Runtime

@Named("identifier") fun identifier(): String
@Identifier fun identifier(): String
}
14 changes: 6 additions & 8 deletions src/main/kotlin/at/ac/uibk/dps/cirrina/di/CirrinaModule.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@ import io.opentelemetry.sdk.resources.Resource
import io.opentelemetry.sdk.trace.SdkTracerProvider
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor
import io.opentelemetry.semconv.ServiceAttributes
import jakarta.inject.Named
import jakarta.inject.Qualifier
import jakarta.inject.Singleton
import java.net.URI
import java.time.Duration
import java.util.UUID
import mu.KotlinLogging

private val logger = KotlinLogging.logger {}
@Qualifier @Retention(AnnotationRetention.RUNTIME) annotation class Identifier

@Qualifier @Retention(AnnotationRetention.RUNTIME) annotation class CsmMain

Expand Down Expand Up @@ -97,7 +95,7 @@ class CirrinaModule {
@Provides
@Singleton
fun provideObservationRegistry(
@Named("identifier") identifier: String,
@Identifier identifier: String,
meterRegistry: MeterRegistry,
): ObservationRegistry {
val observationRegistry = ObservationRegistry.create()
Expand Down Expand Up @@ -141,12 +139,12 @@ class CirrinaModule {
}
}

@Provides @Singleton @Identifier fun provideIdentifier(): String = "cirrina.${UUID.randomUUID()}"

@Provides
@Singleton
@Named("identifier")
fun provideIdentifier(): String = "cirrina.${UUID.randomUUID()}"

@Provides @CsmMain fun provideCsmMain(): URI = URI(EnvironmentVariables.csmMainUri.get())
@CsmMain
fun provideCsmMain(): URI = URI(EnvironmentVariables.csmMainUri.get())

@Provides
@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ abstract class EventHandler() : AutoCloseable {

abstract fun unsubscribe(source: String)

abstract fun register(group: String, member: String)

abstract fun wait(group: String, parties: Int)

fun registerHandler(handler: PropagationHandler) {
handlers.add(handler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ class EventHandlerInMemory : EventHandler() {

override fun unsubscribe(source: String) {}

override fun register(barrier: String, member: String) {}

override fun wait(barrier: String, n: Int) {}

override fun close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import io.zenoh.pubsub.Subscriber
import io.zenoh.sample.Sample
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

class EventHandlerZenoh() : EventHandler() {
private val session: Session
Expand All @@ -26,7 +28,7 @@ class EventHandlerZenoh() : EventHandler() {
Config.fromFile(File(uri)).getOrThrow()
} ?: Config.default()

this.session = Zenoh.open(config).getOrThrow()
session = Zenoh.open(config).getOrThrow()
}

override fun send(event: Event) {
Expand Down Expand Up @@ -66,6 +68,54 @@ class EventHandlerZenoh() : EventHandler() {
activeSubscriptions.remove(selectorString)?.close()
}

override fun register(group: String, member: String) {
val key = "liveness/$group/$member"
val keyExpr = KeyExpr.tryFrom(key).getOrThrow()

session.liveliness().declareToken(keyExpr).onFailure {
error("failed to register liveness '$group/$member'")
}
}

override fun wait(group: String, parties: Int) {
val key = "liveness/$group/**"
val keyExpr = KeyExpr.tryFrom(key).getOrThrow()
val discoveredMembers = ConcurrentHashMap.newKeySet<String>()
val latch = CountDownLatch(parties)

val sub =
session
.liveliness()
.declareSubscriber(
KeyExpr.tryFrom(key).getOrThrow(),
callback = { sample ->
if (discoveredMembers.add(sample.keyExpr.toString())) {
latch.countDown()
}
},
)
.getOrElse({ error("failed to subscribe to liveness '$group'") })

session
.liveliness()
.get(
keyExpr,
callback = { reply ->
reply.result.onSuccess { sample ->
if (discoveredMembers.add(sample.keyExpr.toString())) {
latch.countDown()
}
}
},
)
.onFailure { error("failed to get liveness '$group'") }

if (!latch.await(30, TimeUnit.SECONDS)) {
error("timeout: ${discoveredMembers.size}/$parties members.")
}
sub.close()
}

private fun getZenohPath(event: Event): String? {
return when (event.channel) {
Csml.EventChannel.EXTERNAL -> event.source.let { "$it/${event.topic}" }
Expand Down
4 changes: 4 additions & 0 deletions src/test/kotlin/at/ac/uibk/dps/cirrina/di/TestModule.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package at.ac.uibk.dps.cirrina.di

import at.ac.uibk.dps.cirrina.cirrina.di.CsmMain
import at.ac.uibk.dps.cirrina.cirrina.di.Identifier
import at.ac.uibk.dps.cirrina.execution.`object`.ActionCommandFactory
import at.ac.uibk.dps.cirrina.execution.`object`.ActionCommandFactoryImpl
import at.ac.uibk.dps.cirrina.execution.`object`.Context
Expand All @@ -12,6 +13,7 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import io.micrometer.observation.ObservationRegistry
import jakarta.inject.Singleton
import java.net.URI
import java.util.UUID

@Module
class TestModule(
Expand All @@ -28,6 +30,8 @@ class TestModule(

@Provides fun provideObservationRegistry(): ObservationRegistry = ObservationRegistry.create()

@Provides @Singleton @Identifier fun provideIdentifier(): String = "cirrina.${UUID.randomUUID()}"

@Provides @CsmMain fun provideCsmMain() = mainUri

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,43 @@ package at.ac.uibk.dps.cirrina.execution.provider

import at.ac.uibk.dps.cirrina.execution.`object`.EventHandler
import at.ac.uibk.dps.cirrina.execution.`object`.EventHandlerTest
import java.util.concurrent.atomic.AtomicInteger
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class EventHandlerZenohTest : EventHandlerTest() {
override fun createEventHandler(): EventHandler = EventHandlerZenoh()

@Test
fun testRegisterWait() = runBlocking {
val parties = 10
val group = "group"
val completedParties = AtomicInteger(0)

val jobs =
(1..parties).map { i ->
launch(Dispatchers.IO) {
createEventHandler().use { handler ->
val memberName = "member-$i"

if (i > parties / 2) delay(100)

handler.register(group, memberName)
handler.wait(group, parties)

completedParties.incrementAndGet()
}
}
}

withTimeout(35000) { jobs.joinAll() }

assertEquals(parties, completedParties.get())
}
}