diff --git a/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestrator.kt b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestrator.kt new file mode 100644 index 00000000..93227f5d --- /dev/null +++ b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestrator.kt @@ -0,0 +1,149 @@ +package eu.darken.bluemusic.monitor.core.service + +import eu.darken.bluemusic.bluetooth.core.BluetoothRepo +import eu.darken.bluemusic.bluetooth.core.currentState +import eu.darken.bluemusic.common.debug.logging.Logging.Priority.INFO +import eu.darken.bluemusic.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.bluemusic.common.debug.logging.Logging.Priority.WARN +import eu.darken.bluemusic.common.debug.logging.asLog +import eu.darken.bluemusic.common.debug.logging.log +import eu.darken.bluemusic.common.debug.logging.logTag +import eu.darken.bluemusic.common.flow.setupCommonEventHandlers +import eu.darken.bluemusic.common.flow.throttleLatest +import eu.darken.bluemusic.devices.core.DeviceRepo +import eu.darken.bluemusic.devices.core.ManagedDevice +import eu.darken.bluemusic.devices.core.currentDevices +import eu.darken.bluemusic.monitor.core.audio.RingerModeObserver +import eu.darken.bluemusic.monitor.core.audio.VolumeObserver +import eu.darken.bluemusic.monitor.core.ownership.AudioStreamOwnerRegistry +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import java.time.Duration +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class MonitorOrchestrator @Inject constructor( + private val bluetoothRepo: BluetoothRepo, + private val deviceRepo: DeviceRepo, + private val volumeObserver: VolumeObserver, + private val ringerModeObserver: RingerModeObserver, + private val bluetoothEventQueue: BluetoothEventQueue, + private val eventDispatcher: EventDispatcher, + private val ringerModeTransitionHandler: RingerModeTransitionHandler, + private val ownerRegistry: AudioStreamOwnerRegistry, + private val volumeEventDispatcher: VolumeEventDispatcher, +) { + + /** + * Runs the monitoring pipeline. Suspends until a shutdown condition is met, then returns normally. + * The caller is responsible for acting on the return (e.g., stopping the service). + * + * @param scope the scope in which observer flows are launched. An internal child [Job] is used + * for lifecycle control — the caller's scope is never cancelled by this method. + * @param onActiveDevicesChanged called whenever the active device list changes, including the + * initial snapshot. The caller typically updates a notification. + */ + suspend fun monitor( + scope: CoroutineScope, + onActiveDevicesChanged: suspend (List) -> Unit, + ) { + val bluetoothState = bluetoothRepo.currentState() + if (!bluetoothState.isReady) { + log(TAG, WARN) { "Aborting, Bluetooth state is not ready: $bluetoothState" } + return + } + + val initialDevices = deviceRepo.currentDevices() + ownerRegistry.reset() + ownerRegistry.bootstrap(initialDevices) + + onActiveDevicesChanged(initialDevices.filter { it.isActive }) + + val monitorJob = Job(scope.coroutineContext[Job]) + val monitorScope = CoroutineScope(scope.coroutineContext + monitorJob) + + ringerModeObserver.ringerMode + .setupCommonEventHandlers(TAG) { "RingerMode monitor" } + .distinctUntilChanged() + .onEach { ringerModeTransitionHandler.handle(it) } + .catch { log(TAG, WARN) { "RingerMode monitor flow failed:\n${it.asLog()}" } } + .launchIn(monitorScope) + + volumeObserver.volumes + .setupCommonEventHandlers(TAG) { "Volume monitor" } + .distinctUntilChanged() + .onEach { volumeEventDispatcher.dispatch(it) } + .catch { log(TAG, WARN) { "Volume monitor flow failed:\n${it.asLog()}" } } + .launchIn(monitorScope) + + bluetoothEventQueue.events + .setupCommonEventHandlers(TAG) { "Event monitor" } + .onEach { event -> + log(TAG, INFO) { "START Handling bluetooth event: $event" } + eventDispatcher.dispatch(event) + log(TAG, INFO) { "STOP Handling bluetooth event: $event" } + } + .catch { log(TAG, WARN) { "Event monitor flow failed:\n${it.asLog()}" } } + .launchIn(monitorScope) + + val deviceMonitorJob = deviceRepo.devices + .setupCommonEventHandlers(TAG) { "Devices monitor" } + .distinctUntilChanged() + .throttleLatest(3000) + .flatMapLatest { devices -> + val activeDevices = devices.filter { it.isActive } + + log(TAG) { "monitor: Currently active devices: ${activeDevices.map { "${it.address}/${it.label}" }}" } + onActiveDevicesChanged(activeDevices) + + val stayActive = activeDevices.any { it.requiresMonitor } + + when { + activeDevices.isNotEmpty() && stayActive -> { + log(TAG) { "Staying connected for active devices." } + emptyFlow() + } + + activeDevices.isNotEmpty() -> flow { + log(TAG) { "There are active devices but we don't need to stay active for them." } + val maxMonitoringDuration = activeDevices.maxOf { it.monitoringDuration } + log(TAG) { "Maximum monitoring duration: $maxMonitoringDuration" } + val toDelay = Duration.ofSeconds(15) + maxMonitoringDuration + delay(toDelay.toMillis()) + log(TAG) { "Stopping now, nothing changed." } + monitorJob.cancel() + } + + else -> flow { + log(TAG) { "No devices connected, stopping soon" } + delay(15 * 1000) + log(TAG) { "Stopping now, still no devices connected." } + monitorJob.cancel() + } + } + } + .catch { log(TAG, WARN) { "Monitor flow failed:\n${it.asLog()}" } } + .launchIn(monitorScope) + + try { + log(TAG, VERBOSE) { "Monitor job is active" } + deviceMonitorJob.join() + log(TAG, VERBOSE) { "Monitor job quit" } + } finally { + monitorJob.cancel() + } + } + + companion object { + private val TAG = logTag("Monitor", "Orchestrator") + } +} diff --git a/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorService.kt b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorService.kt index a2c5b72c..af2f0eef 100644 --- a/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorService.kt +++ b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/MonitorService.kt @@ -8,51 +8,27 @@ import android.content.Intent import android.content.IntentFilter import android.content.pm.ServiceInfo import android.os.IBinder -import android.util.SparseArray import androidx.core.content.ContextCompat -import androidx.core.util.size import dagger.hilt.android.AndroidEntryPoint -import eu.darken.bluemusic.bluetooth.core.BluetoothRepo -import eu.darken.bluemusic.bluetooth.core.currentState import eu.darken.bluemusic.common.coroutine.DispatcherProvider import eu.darken.bluemusic.common.debug.logging.Logging.Priority.ERROR -import eu.darken.bluemusic.common.debug.logging.Logging.Priority.INFO import eu.darken.bluemusic.common.debug.logging.Logging.Priority.VERBOSE import eu.darken.bluemusic.common.debug.logging.Logging.Priority.WARN import eu.darken.bluemusic.common.debug.logging.asLog import eu.darken.bluemusic.common.debug.logging.log import eu.darken.bluemusic.common.debug.logging.logTag -import eu.darken.bluemusic.common.flow.setupCommonEventHandlers -import eu.darken.bluemusic.common.flow.throttleLatest import eu.darken.bluemusic.common.hasApiLevel +import eu.darken.bluemusic.devices.core.ManagedDevice import eu.darken.bluemusic.common.ui.Service2 -import eu.darken.bluemusic.devices.core.DeviceRepo -import eu.darken.bluemusic.devices.core.currentDevices -import eu.darken.bluemusic.monitor.core.audio.RingerModeObserver -import eu.darken.bluemusic.monitor.core.audio.VolumeEvent -import eu.darken.bluemusic.monitor.core.audio.VolumeObserver -import eu.darken.bluemusic.monitor.core.modules.VolumeModule import eu.darken.bluemusic.monitor.core.ownership.AudioStreamOwnerRegistry import eu.darken.bluemusic.monitor.ui.MonitorNotifications import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll import kotlinx.coroutines.cancel import kotlinx.coroutines.cancelChildren -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import java.time.Duration import javax.inject.Inject @AndroidEntryPoint @@ -61,14 +37,8 @@ class MonitorService : Service2() { @Inject lateinit var dispatcherProvider: DispatcherProvider @Inject lateinit var notifications: MonitorNotifications @Inject lateinit var notificationManager: NotificationManager - @Inject lateinit var deviceRepo: DeviceRepo - @Inject lateinit var bluetoothRepo: BluetoothRepo - @Inject lateinit var volumeModuleMap: Set<@JvmSuppressWildcards VolumeModule> - @Inject lateinit var volumeObserver: VolumeObserver - @Inject lateinit var ringerModeObserver: RingerModeObserver - @Inject lateinit var bluetoothEventQueue: BluetoothEventQueue + @Inject lateinit var orchestrator: MonitorOrchestrator @Inject lateinit var eventDispatcher: EventDispatcher - @Inject lateinit var ringerModeTransitionHandler: RingerModeTransitionHandler @Inject lateinit var ownerRegistry: AudioStreamOwnerRegistry private val serviceScope by lazy { @@ -141,7 +111,7 @@ class MonitorService : Service2() { monitoringJob = serviceScope.launch { try { - startMonitoring() + orchestrator.monitor(serviceScope, ::updateNotification) } catch (_: CancellationException) { log(TAG) { "Monitor cancelled." } } catch (e: Exception) { @@ -176,90 +146,11 @@ class MonitorService : Service2() { super.onDestroy() } - private suspend fun startMonitoring() { - val bluetoothState = bluetoothRepo.currentState() - if (!bluetoothState.isReady) { - log(TAG, WARN) { "Aborting, Bluetooth state is not ready: $bluetoothState" } - return - } - - ownerRegistry.reset() - ownerRegistry.bootstrap(deviceRepo.currentDevices()) - + private suspend fun updateNotification(activeDevices: List) { notificationManager.notify( MonitorNotifications.NOTIFICATION_ID, - notifications.getDevicesNotification(deviceRepo.currentDevices().filter { it.isActive }), + notifications.getDevicesNotification(activeDevices), ) - - ringerModeObserver.ringerMode - .setupCommonEventHandlers(TAG) { "RingerMode monitor" } - .distinctUntilChanged() - .onEach { ringerModeTransitionHandler.handle(it) } - .catch { log(TAG, WARN) { "RingerMode monitor flow failed:\n${it.asLog()}" } } - .launchIn(serviceScope) - - volumeObserver.volumes - .setupCommonEventHandlers(TAG) { "Volume monitor" } - .distinctUntilChanged() - .onEach { handleVolumeChange(it) } - .catch { log(TAG, WARN) { "Volume monitor flow failed:\n${it.asLog()}" } } - .launchIn(serviceScope) - - bluetoothEventQueue.events - .setupCommonEventHandlers(TAG) { "Event monitor" } - .onEach { event -> - log(TAG, INFO) { "START Handling bluetooth event: $event" } - eventDispatcher.dispatch(event) - log(TAG, INFO) { "STOP Handling bluetooth event: $event" } - } - .catch { log(TAG, WARN) { "Event monitor flow failed:\n${it.asLog()}" } } - .launchIn(serviceScope) - - val monitorJob = deviceRepo.devices - .setupCommonEventHandlers(TAG) { "Devices monitor" } - .distinctUntilChanged() - .throttleLatest(3000) - .flatMapLatest { devices -> - val activeDevices = devices.filter { it.isActive } - - log(TAG) { "monitorJob: Currently active devices: ${activeDevices.map { "${it.address}/${it.label}" }}" } - notificationManager.notify( - MonitorNotifications.NOTIFICATION_ID, - notifications.getDevicesNotification(activeDevices), - ) - - val stayActive = activeDevices.any { it.requiresMonitor } - - when { - activeDevices.isNotEmpty() && stayActive -> { - log(TAG) { "Staying connected for active devices." } - emptyFlow() - } - - activeDevices.isNotEmpty() -> flow { - log(TAG) { "There are active devices but we don't need to stay active for them." } - val maxMonitoringDuration = activeDevices.maxOf { it.monitoringDuration } - log(TAG) { "Maximum monitoring duration: $maxMonitoringDuration" } - val toDelay = Duration.ofSeconds(15) + maxMonitoringDuration - delay(toDelay.toMillis()) - log(TAG) { "Stopping service now, nothing changed." } - serviceScope.coroutineContext.cancelChildren() - } - - else -> flow { - log(TAG) { "No devices connected, stopping soon" } - delay(15 * 1000) - log(TAG) { "Stopping service now, still no devices connected." } - serviceScope.coroutineContext.cancelChildren() - } - } - } - .catch { log(TAG, WARN) { "Monitor flow failed:\n${it.asLog()}" } } - .launchIn(serviceScope) - - log(TAG, VERBOSE) { "Monitor job is active" } - monitorJob.join() - log(TAG, VERBOSE) { "Monitor job quit" } } private val stopMonitorReceiver = object : BroadcastReceiver() { @@ -269,41 +160,6 @@ class MonitorService : Service2() { } } - private suspend fun handleVolumeChange(event: VolumeEvent) { - val priorityArray = SparseArray>() - - for (module in volumeModuleMap) { - val priority = module.priority - var list = priorityArray.get(priority) - if (list == null) { - list = ArrayList() - priorityArray.put(priority, list) - } - list.add(module) - } - - for (i in 0 until priorityArray.size) { - val currentPriorityModules = priorityArray.get(priorityArray.keyAt(i)) - log(TAG, VERBOSE) { - "handleVolume: ${currentPriorityModules.size} modules at priority ${priorityArray.keyAt(i)}" - } - - coroutineScope { - currentPriorityModules.map { module -> - async { - try { - log(TAG, VERBOSE) { "handleVolume: ${module.tag} HANDLE-START" } - module.handle(event) - log(TAG, VERBOSE) { "handleVolume: ${module.tag} HANDLE-STOP" } - } catch (e: Exception) { - log(TAG, ERROR) { "handleVolume: error: ${module.tag}: ${e.asLog()}" } - } - } - }.awaitAll() - } - } - } - companion object { val TAG = logTag("Monitor", "Service") private const val EXTRA_FORCE_START = "extra.force_start" diff --git a/app/src/main/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcher.kt b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcher.kt new file mode 100644 index 00000000..01631522 --- /dev/null +++ b/app/src/main/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcher.kt @@ -0,0 +1,51 @@ +package eu.darken.bluemusic.monitor.core.service + +import eu.darken.bluemusic.common.debug.logging.Logging.Priority.ERROR +import eu.darken.bluemusic.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.bluemusic.common.debug.logging.asLog +import eu.darken.bluemusic.common.debug.logging.log +import eu.darken.bluemusic.common.debug.logging.logTag +import eu.darken.bluemusic.monitor.core.audio.VolumeEvent +import eu.darken.bluemusic.monitor.core.modules.VolumeModule +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.CancellationException +import javax.inject.Inject +import javax.inject.Singleton + +@Singleton +class VolumeEventDispatcher @Inject constructor( + private val volumeModuleMap: Set<@JvmSuppressWildcards VolumeModule>, +) { + + suspend fun dispatch(event: VolumeEvent) { + val modulesByPriority = volumeModuleMap + .groupBy { it.priority } + .toSortedMap() + + for ((priority, modules) in modulesByPriority) { + log(TAG, VERBOSE) { "dispatch: ${modules.size} modules at priority $priority" } + + coroutineScope { + modules.map { module -> + async { + try { + log(TAG, VERBOSE) { "dispatch: ${module.tag} HANDLE-START" } + module.handle(event) + log(TAG, VERBOSE) { "dispatch: ${module.tag} HANDLE-STOP" } + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + log(TAG, ERROR) { "dispatch: error: ${module.tag}: ${e.asLog()}" } + } + } + }.awaitAll() + } + } + } + + companion object { + private val TAG = logTag("Monitor", "Volume", "Dispatcher") + } +} diff --git a/app/src/test/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestratorTest.kt b/app/src/test/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestratorTest.kt new file mode 100644 index 00000000..606a396a --- /dev/null +++ b/app/src/test/java/eu/darken/bluemusic/monitor/core/service/MonitorOrchestratorTest.kt @@ -0,0 +1,298 @@ +package eu.darken.bluemusic.monitor.core.service + +import eu.darken.bluemusic.bluetooth.core.BluetoothRepo +import eu.darken.bluemusic.devices.core.DeviceRepo +import eu.darken.bluemusic.devices.core.ManagedDevice +import eu.darken.bluemusic.monitor.core.audio.RingerModeEvent +import eu.darken.bluemusic.monitor.core.audio.RingerModeObserver +import eu.darken.bluemusic.monitor.core.audio.VolumeEvent +import eu.darken.bluemusic.monitor.core.audio.VolumeObserver +import eu.darken.bluemusic.monitor.core.ownership.AudioStreamOwnerRegistry +import io.kotest.matchers.shouldBe +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import testhelpers.BaseTest +import java.time.Duration + +class MonitorOrchestratorTest : BaseTest() { + + private lateinit var bluetoothRepo: BluetoothRepo + private lateinit var deviceRepo: DeviceRepo + private lateinit var volumeObserver: VolumeObserver + private lateinit var ringerModeObserver: RingerModeObserver + private lateinit var bluetoothEventQueue: BluetoothEventQueue + private lateinit var eventDispatcher: EventDispatcher + private lateinit var ringerModeTransitionHandler: RingerModeTransitionHandler + private lateinit var ownerRegistry: AudioStreamOwnerRegistry + private lateinit var volumeEventDispatcher: VolumeEventDispatcher + + private lateinit var devicesFlow: MutableStateFlow> + private lateinit var stateFlow: MutableStateFlow + + @BeforeEach + fun setup() { + stateFlow = MutableStateFlow( + BluetoothRepo.State(isEnabled = true, hasPermission = true, devices = emptySet()) + ) + bluetoothRepo = mockk { every { state } returns stateFlow } + + devicesFlow = MutableStateFlow(emptyList()) + deviceRepo = mockk(relaxed = true) { + every { devices } returns devicesFlow + } + + volumeObserver = mockk { every { volumes } returns MutableSharedFlow() } + ringerModeObserver = mockk { every { ringerMode } returns MutableSharedFlow() } + bluetoothEventQueue = mockk { every { events } returns MutableSharedFlow() } + eventDispatcher = mockk(relaxed = true) + ringerModeTransitionHandler = mockk(relaxed = true) + ownerRegistry = mockk(relaxed = true) + volumeEventDispatcher = mockk(relaxed = true) + } + + private fun createOrchestrator() = MonitorOrchestrator( + bluetoothRepo = bluetoothRepo, + deviceRepo = deviceRepo, + volumeObserver = volumeObserver, + ringerModeObserver = ringerModeObserver, + bluetoothEventQueue = bluetoothEventQueue, + eventDispatcher = eventDispatcher, + ringerModeTransitionHandler = ringerModeTransitionHandler, + ownerRegistry = ownerRegistry, + volumeEventDispatcher = volumeEventDispatcher, + ) + + private fun managedDevice( + address: String, + active: Boolean = true, + requiresMonitor: Boolean = false, + monitoringDuration: Duration = Duration.ZERO, + ): ManagedDevice = mockk(relaxed = true) { + every { this@mockk.address } returns address + every { isActive } returns active + every { this@mockk.requiresMonitor } returns requiresMonitor + every { this@mockk.monitoringDuration } returns monitoringDuration + every { label } returns address + } + + // --- Bootstrap --- + + @Test + fun `bluetooth not ready - returns immediately`() = runTest { + stateFlow.value = BluetoothRepo.State(isEnabled = false, hasPermission = true, devices = emptySet()) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + orchestrator.monitor(this) { callbackInvocations.add(it) } + advanceUntilIdle() + + callbackInvocations shouldBe emptyList() + coVerify(exactly = 0) { ownerRegistry.reset() } + } + + @Test + fun `bootstrap uses single device snapshot for reset, bootstrap, and callback`() = runTest { + val device = managedDevice("AA:BB:CC:DD:EE:FF", active = true) + devicesFlow.value = listOf(device) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + orchestrator.monitor(this) { callbackInvocations.add(it) } + advanceUntilIdle() + + coVerify(exactly = 1) { ownerRegistry.reset() } + coVerify(exactly = 1) { ownerRegistry.bootstrap(listOf(device)) } + callbackInvocations.first() shouldBe listOf(device) + } + + // --- Shutdown heuristics --- + + @Test + fun `requiresMonitor devices keep monitoring alive`() = runTest { + val device = managedDevice("AA:BB:CC:DD:EE:FF", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device) + + val orchestrator = createOrchestrator() + var monitorReturned = false + + val job = launch { + orchestrator.monitor(this@runTest) {} + monitorReturned = true + } + + advanceTimeBy(60_000) + monitorReturned shouldBe false + + job.cancel() + } + + @Test + fun `active devices without requiresMonitor - stops after grace period`() = runTest { + val monitoringDuration = Duration.ofSeconds(5) + val device = managedDevice( + "AA:BB:CC:DD:EE:FF", + active = true, + requiresMonitor = false, + monitoringDuration = monitoringDuration, + ) + devicesFlow.value = listOf(device) + + val orchestrator = createOrchestrator() + var monitorReturned = false + + val job = launch { + orchestrator.monitor(this@runTest) {} + monitorReturned = true + } + + // 15s grace + 5s monitoring = 20s (throttleLatest emits first value immediately) + advanceTimeBy(19_000) + monitorReturned shouldBe false + + advanceTimeBy(2_000) + advanceUntilIdle() + monitorReturned shouldBe true + + job.cancel() + } + + @Test + fun `no devices connected - stops after 15s`() = runTest { + devicesFlow.value = emptyList() + + val orchestrator = createOrchestrator() + var monitorReturned = false + + val job = launch { + orchestrator.monitor(this@runTest) {} + monitorReturned = true + } + + // 15s grace (throttleLatest emits first value immediately) + advanceTimeBy(14_000) + monitorReturned shouldBe false + + advanceTimeBy(2_000) + advanceUntilIdle() + monitorReturned shouldBe true + + job.cancel() + } + + @Test + fun `device list changes mid-monitoring fires callback with updated list`() = runTest { + val device1 = managedDevice("AA:BB:CC:DD:EE:FF", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device1) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + val job = launch { + orchestrator.monitor(this@runTest) { callbackInvocations.add(it) } + } + + advanceTimeBy(5_000) + + val device2 = managedDevice("11:22:33:44:55:66", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device1, device2) + advanceTimeBy(5_000) + + // Initial callback + at least one update + (callbackInvocations.size >= 2) shouldBe true + + job.cancel() + } + + // --- Failure isolation --- + + @Test + fun `ringerMode handler crash does not break device monitoring`() = runTest { + val ringerFlow = MutableSharedFlow() + ringerModeObserver = mockk { every { ringerMode } returns ringerFlow } + + coEvery { ringerModeTransitionHandler.handle(any()) } throws RuntimeException("boom") + + val device = managedDevice("AA:BB:CC:DD:EE:FF", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + val job = launch { + orchestrator.monitor(this@runTest) { callbackInvocations.add(it) } + } + + advanceTimeBy(1_000) + ringerFlow.emit(mockk(relaxed = true)) + advanceTimeBy(5_000) + + // Device monitoring still worked (initial callback fired) + callbackInvocations.isNotEmpty() shouldBe true + + job.cancel() + } + + @Test + fun `volume dispatcher crash does not break device monitoring`() = runTest { + val volumeFlow = MutableSharedFlow() + volumeObserver = mockk { every { volumes } returns volumeFlow } + + coEvery { volumeEventDispatcher.dispatch(any()) } throws RuntimeException("boom") + + val device = managedDevice("AA:BB:CC:DD:EE:FF", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + val job = launch { + orchestrator.monitor(this@runTest) { callbackInvocations.add(it) } + } + + advanceTimeBy(1_000) + volumeFlow.emit(mockk(relaxed = true)) + advanceTimeBy(5_000) + + callbackInvocations.isNotEmpty() shouldBe true + + job.cancel() + } + + @Test + fun `bluetooth event dispatcher crash does not break device monitoring`() = runTest { + val btEventFlow = MutableSharedFlow() + bluetoothEventQueue = mockk { every { events } returns btEventFlow } + + coEvery { eventDispatcher.dispatch(any()) } throws RuntimeException("boom") + + val device = managedDevice("AA:BB:CC:DD:EE:FF", active = true, requiresMonitor = true) + devicesFlow.value = listOf(device) + + val callbackInvocations = mutableListOf>() + val orchestrator = createOrchestrator() + + val job = launch { + orchestrator.monitor(this@runTest) { callbackInvocations.add(it) } + } + + advanceTimeBy(1_000) + btEventFlow.emit(mockk(relaxed = true)) + advanceTimeBy(5_000) + + callbackInvocations.isNotEmpty() shouldBe true + + job.cancel() + } +} diff --git a/app/src/test/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcherTest.kt b/app/src/test/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcherTest.kt new file mode 100644 index 00000000..8bc231d9 --- /dev/null +++ b/app/src/test/java/eu/darken/bluemusic/monitor/core/service/VolumeEventDispatcherTest.kt @@ -0,0 +1,92 @@ +package eu.darken.bluemusic.monitor.core.service + +import eu.darken.bluemusic.monitor.core.audio.AudioStream +import eu.darken.bluemusic.monitor.core.audio.VolumeEvent +import eu.darken.bluemusic.monitor.core.modules.VolumeModule +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.shouldBe +import io.mockk.coEvery +import io.mockk.coVerify +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import testhelpers.BaseTest + +class VolumeEventDispatcherTest : BaseTest() { + + private val testEvent = VolumeEvent( + streamId = AudioStream.Id.STREAM_MUSIC, + oldVolume = 5, + newVolume = 10, + self = false, + ) + + private fun module(priority: Int, tag: String): VolumeModule = mockk(relaxed = true) { + every { this@mockk.priority } returns priority + every { this@mockk.tag } returns tag + } + + @Test + fun `modules are executed in priority order`() = runTest { + val executionOrder = mutableListOf() + + val high = module(1, "High") + coEvery { high.handle(any()) } coAnswers { executionOrder.add("high") } + + val low = module(10, "Low") + coEvery { low.handle(any()) } coAnswers { executionOrder.add("low") } + + val dispatcher = VolumeEventDispatcher(setOf(low, high)) + + dispatcher.dispatch(testEvent) + + executionOrder shouldBe listOf("high", "low") + } + + @Test + fun `same-priority modules run in parallel`() = runTest { + val moduleA = module(10, "A") + val moduleB = module(10, "B") + + val dispatcher = VolumeEventDispatcher(setOf(moduleA, moduleB)) + + dispatcher.dispatch(testEvent) + + coVerify(exactly = 1) { moduleA.handle(testEvent) } + coVerify(exactly = 1) { moduleB.handle(testEvent) } + } + + @Test + fun `module exception does not break other modules at same priority`() = runTest { + val failing = module(10, "Failing") + coEvery { failing.handle(any()) } throws RuntimeException("boom") + + val healthy = module(10, "Healthy") + + val dispatcher = VolumeEventDispatcher(setOf(failing, healthy)) + + dispatcher.dispatch(testEvent) + + coVerify(exactly = 1) { healthy.handle(testEvent) } + } + + @Test + fun `CancellationException is rethrown`() = runTest { + val cancelling = module(10, "Cancelling") + coEvery { cancelling.handle(any()) } throws CancellationException("cancelled") + + val dispatcher = VolumeEventDispatcher(setOf(cancelling)) + + shouldThrow { + dispatcher.dispatch(testEvent) + } + } + + @Test + fun `empty module set is a no-op`() = runTest { + val dispatcher = VolumeEventDispatcher(emptySet()) + dispatcher.dispatch(testEvent) + } +}