Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package eu.darken.bluemusic.monitor.core.service

import eu.darken.bluemusic.bluetooth.core.BluetoothRepo
import eu.darken.bluemusic.bluetooth.core.currentState
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.INFO
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.VERBOSE
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.WARN
import eu.darken.bluemusic.common.debug.logging.asLog
import eu.darken.bluemusic.common.debug.logging.log
import eu.darken.bluemusic.common.debug.logging.logTag
import eu.darken.bluemusic.common.flow.setupCommonEventHandlers
import eu.darken.bluemusic.common.flow.throttleLatest
import eu.darken.bluemusic.devices.core.DeviceRepo
import eu.darken.bluemusic.devices.core.ManagedDevice
import eu.darken.bluemusic.devices.core.currentDevices
import eu.darken.bluemusic.monitor.core.audio.RingerModeObserver
import eu.darken.bluemusic.monitor.core.audio.VolumeObserver
import eu.darken.bluemusic.monitor.core.ownership.AudioStreamOwnerRegistry
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import java.time.Duration
import javax.inject.Inject
import javax.inject.Singleton

@Singleton
class MonitorOrchestrator @Inject constructor(
private val bluetoothRepo: BluetoothRepo,
private val deviceRepo: DeviceRepo,
private val volumeObserver: VolumeObserver,
private val ringerModeObserver: RingerModeObserver,
private val bluetoothEventQueue: BluetoothEventQueue,
private val eventDispatcher: EventDispatcher,
private val ringerModeTransitionHandler: RingerModeTransitionHandler,
private val ownerRegistry: AudioStreamOwnerRegistry,
private val volumeEventDispatcher: VolumeEventDispatcher,
) {

/**
* Runs the monitoring pipeline. Suspends until a shutdown condition is met, then returns normally.
* The caller is responsible for acting on the return (e.g., stopping the service).
*
* @param scope the scope in which observer flows are launched. An internal child [Job] is used
* for lifecycle control — the caller's scope is never cancelled by this method.
* @param onActiveDevicesChanged called whenever the active device list changes, including the
* initial snapshot. The caller typically updates a notification.
*/
suspend fun monitor(
scope: CoroutineScope,
onActiveDevicesChanged: suspend (List<ManagedDevice>) -> Unit,
) {
val bluetoothState = bluetoothRepo.currentState()
if (!bluetoothState.isReady) {
log(TAG, WARN) { "Aborting, Bluetooth state is not ready: $bluetoothState" }
return
}

val initialDevices = deviceRepo.currentDevices()
ownerRegistry.reset()
ownerRegistry.bootstrap(initialDevices)

onActiveDevicesChanged(initialDevices.filter { it.isActive })

val monitorJob = Job(scope.coroutineContext[Job])
val monitorScope = CoroutineScope(scope.coroutineContext + monitorJob)

ringerModeObserver.ringerMode
.setupCommonEventHandlers(TAG) { "RingerMode monitor" }
.distinctUntilChanged()
.onEach { ringerModeTransitionHandler.handle(it) }
.catch { log(TAG, WARN) { "RingerMode monitor flow failed:\n${it.asLog()}" } }
.launchIn(monitorScope)

volumeObserver.volumes
.setupCommonEventHandlers(TAG) { "Volume monitor" }
.distinctUntilChanged()
.onEach { volumeEventDispatcher.dispatch(it) }
.catch { log(TAG, WARN) { "Volume monitor flow failed:\n${it.asLog()}" } }
.launchIn(monitorScope)

bluetoothEventQueue.events
.setupCommonEventHandlers(TAG) { "Event monitor" }
.onEach { event ->
log(TAG, INFO) { "START Handling bluetooth event: $event" }
eventDispatcher.dispatch(event)
log(TAG, INFO) { "STOP Handling bluetooth event: $event" }
}
.catch { log(TAG, WARN) { "Event monitor flow failed:\n${it.asLog()}" } }
.launchIn(monitorScope)

val deviceMonitorJob = deviceRepo.devices
.setupCommonEventHandlers(TAG) { "Devices monitor" }
.distinctUntilChanged()
.throttleLatest(3000)
.flatMapLatest { devices ->
val activeDevices = devices.filter { it.isActive }

log(TAG) { "monitor: Currently active devices: ${activeDevices.map { "${it.address}/${it.label}" }}" }
onActiveDevicesChanged(activeDevices)

val stayActive = activeDevices.any { it.requiresMonitor }

when {
activeDevices.isNotEmpty() && stayActive -> {
log(TAG) { "Staying connected for active devices." }
emptyFlow()
}

activeDevices.isNotEmpty() -> flow {
log(TAG) { "There are active devices but we don't need to stay active for them." }
val maxMonitoringDuration = activeDevices.maxOf { it.monitoringDuration }
log(TAG) { "Maximum monitoring duration: $maxMonitoringDuration" }
val toDelay = Duration.ofSeconds(15) + maxMonitoringDuration
delay(toDelay.toMillis())
log(TAG) { "Stopping now, nothing changed." }
monitorJob.cancel()
}

else -> flow<Unit> {
log(TAG) { "No devices connected, stopping soon" }
delay(15 * 1000)
log(TAG) { "Stopping now, still no devices connected." }
monitorJob.cancel()
}
}
}
.catch { log(TAG, WARN) { "Monitor flow failed:\n${it.asLog()}" } }
.launchIn(monitorScope)

try {
log(TAG, VERBOSE) { "Monitor job is active" }
deviceMonitorJob.join()
log(TAG, VERBOSE) { "Monitor job quit" }
} finally {
monitorJob.cancel()
}
}

companion object {
private val TAG = logTag("Monitor", "Orchestrator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,27 @@ import android.content.Intent
import android.content.IntentFilter
import android.content.pm.ServiceInfo
import android.os.IBinder
import android.util.SparseArray
import androidx.core.content.ContextCompat
import androidx.core.util.size
import dagger.hilt.android.AndroidEntryPoint
import eu.darken.bluemusic.bluetooth.core.BluetoothRepo
import eu.darken.bluemusic.bluetooth.core.currentState
import eu.darken.bluemusic.common.coroutine.DispatcherProvider
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.ERROR
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.INFO
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.VERBOSE
import eu.darken.bluemusic.common.debug.logging.Logging.Priority.WARN
import eu.darken.bluemusic.common.debug.logging.asLog
import eu.darken.bluemusic.common.debug.logging.log
import eu.darken.bluemusic.common.debug.logging.logTag
import eu.darken.bluemusic.common.flow.setupCommonEventHandlers
import eu.darken.bluemusic.common.flow.throttleLatest
import eu.darken.bluemusic.common.hasApiLevel
import eu.darken.bluemusic.devices.core.ManagedDevice
import eu.darken.bluemusic.common.ui.Service2
import eu.darken.bluemusic.devices.core.DeviceRepo
import eu.darken.bluemusic.devices.core.currentDevices
import eu.darken.bluemusic.monitor.core.audio.RingerModeObserver
import eu.darken.bluemusic.monitor.core.audio.VolumeEvent
import eu.darken.bluemusic.monitor.core.audio.VolumeObserver
import eu.darken.bluemusic.monitor.core.modules.VolumeModule
import eu.darken.bluemusic.monitor.core.ownership.AudioStreamOwnerRegistry
import eu.darken.bluemusic.monitor.ui.MonitorNotifications
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import java.time.Duration
import javax.inject.Inject

@AndroidEntryPoint
Expand All @@ -61,14 +37,8 @@ class MonitorService : Service2() {
@Inject lateinit var dispatcherProvider: DispatcherProvider
@Inject lateinit var notifications: MonitorNotifications
@Inject lateinit var notificationManager: NotificationManager
@Inject lateinit var deviceRepo: DeviceRepo
@Inject lateinit var bluetoothRepo: BluetoothRepo
@Inject lateinit var volumeModuleMap: Set<@JvmSuppressWildcards VolumeModule>
@Inject lateinit var volumeObserver: VolumeObserver
@Inject lateinit var ringerModeObserver: RingerModeObserver
@Inject lateinit var bluetoothEventQueue: BluetoothEventQueue
@Inject lateinit var orchestrator: MonitorOrchestrator
@Inject lateinit var eventDispatcher: EventDispatcher
@Inject lateinit var ringerModeTransitionHandler: RingerModeTransitionHandler
@Inject lateinit var ownerRegistry: AudioStreamOwnerRegistry

private val serviceScope by lazy {
Expand Down Expand Up @@ -141,7 +111,7 @@ class MonitorService : Service2() {

monitoringJob = serviceScope.launch {
try {
startMonitoring()
orchestrator.monitor(serviceScope, ::updateNotification)
} catch (_: CancellationException) {
log(TAG) { "Monitor cancelled." }
} catch (e: Exception) {
Expand Down Expand Up @@ -176,90 +146,11 @@ class MonitorService : Service2() {
super.onDestroy()
}

private suspend fun startMonitoring() {
val bluetoothState = bluetoothRepo.currentState()
if (!bluetoothState.isReady) {
log(TAG, WARN) { "Aborting, Bluetooth state is not ready: $bluetoothState" }
return
}

ownerRegistry.reset()
ownerRegistry.bootstrap(deviceRepo.currentDevices())

private suspend fun updateNotification(activeDevices: List<ManagedDevice>) {
notificationManager.notify(
MonitorNotifications.NOTIFICATION_ID,
notifications.getDevicesNotification(deviceRepo.currentDevices().filter { it.isActive }),
notifications.getDevicesNotification(activeDevices),
)

ringerModeObserver.ringerMode
.setupCommonEventHandlers(TAG) { "RingerMode monitor" }
.distinctUntilChanged()
.onEach { ringerModeTransitionHandler.handle(it) }
.catch { log(TAG, WARN) { "RingerMode monitor flow failed:\n${it.asLog()}" } }
.launchIn(serviceScope)

volumeObserver.volumes
.setupCommonEventHandlers(TAG) { "Volume monitor" }
.distinctUntilChanged()
.onEach { handleVolumeChange(it) }
.catch { log(TAG, WARN) { "Volume monitor flow failed:\n${it.asLog()}" } }
.launchIn(serviceScope)

bluetoothEventQueue.events
.setupCommonEventHandlers(TAG) { "Event monitor" }
.onEach { event ->
log(TAG, INFO) { "START Handling bluetooth event: $event" }
eventDispatcher.dispatch(event)
log(TAG, INFO) { "STOP Handling bluetooth event: $event" }
}
.catch { log(TAG, WARN) { "Event monitor flow failed:\n${it.asLog()}" } }
.launchIn(serviceScope)

val monitorJob = deviceRepo.devices
.setupCommonEventHandlers(TAG) { "Devices monitor" }
.distinctUntilChanged()
.throttleLatest(3000)
.flatMapLatest { devices ->
val activeDevices = devices.filter { it.isActive }

log(TAG) { "monitorJob: Currently active devices: ${activeDevices.map { "${it.address}/${it.label}" }}" }
notificationManager.notify(
MonitorNotifications.NOTIFICATION_ID,
notifications.getDevicesNotification(activeDevices),
)

val stayActive = activeDevices.any { it.requiresMonitor }

when {
activeDevices.isNotEmpty() && stayActive -> {
log(TAG) { "Staying connected for active devices." }
emptyFlow()
}

activeDevices.isNotEmpty() -> flow {
log(TAG) { "There are active devices but we don't need to stay active for them." }
val maxMonitoringDuration = activeDevices.maxOf { it.monitoringDuration }
log(TAG) { "Maximum monitoring duration: $maxMonitoringDuration" }
val toDelay = Duration.ofSeconds(15) + maxMonitoringDuration
delay(toDelay.toMillis())
log(TAG) { "Stopping service now, nothing changed." }
serviceScope.coroutineContext.cancelChildren()
}

else -> flow<Unit> {
log(TAG) { "No devices connected, stopping soon" }
delay(15 * 1000)
log(TAG) { "Stopping service now, still no devices connected." }
serviceScope.coroutineContext.cancelChildren()
}
}
}
.catch { log(TAG, WARN) { "Monitor flow failed:\n${it.asLog()}" } }
.launchIn(serviceScope)

log(TAG, VERBOSE) { "Monitor job is active" }
monitorJob.join()
log(TAG, VERBOSE) { "Monitor job quit" }
}

private val stopMonitorReceiver = object : BroadcastReceiver() {
Expand All @@ -269,41 +160,6 @@ class MonitorService : Service2() {
}
}

private suspend fun handleVolumeChange(event: VolumeEvent) {
val priorityArray = SparseArray<MutableList<VolumeModule>>()

for (module in volumeModuleMap) {
val priority = module.priority
var list = priorityArray.get(priority)
if (list == null) {
list = ArrayList()
priorityArray.put(priority, list)
}
list.add(module)
}

for (i in 0 until priorityArray.size) {
val currentPriorityModules = priorityArray.get(priorityArray.keyAt(i))
log(TAG, VERBOSE) {
"handleVolume: ${currentPriorityModules.size} modules at priority ${priorityArray.keyAt(i)}"
}

coroutineScope {
currentPriorityModules.map { module ->
async {
try {
log(TAG, VERBOSE) { "handleVolume: ${module.tag} HANDLE-START" }
module.handle(event)
log(TAG, VERBOSE) { "handleVolume: ${module.tag} HANDLE-STOP" }
} catch (e: Exception) {
log(TAG, ERROR) { "handleVolume: error: ${module.tag}: ${e.asLog()}" }
}
}
}.awaitAll()
}
}
}

companion object {
val TAG = logTag("Monitor", "Service")
private const val EXTRA_FORCE_START = "extra.force_start"
Expand Down
Loading