diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepository.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepository.kt index 7a39bb3a102..67e9aafff26 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepository.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepository.kt @@ -27,8 +27,11 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.first +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant @Mockable internal interface IncrementalSyncRepository { @@ -47,6 +50,21 @@ internal interface IncrementalSyncRepository { suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus) + /** + * Returns the timestamp of the last received WebSocket event. + * Used to detect stale WebSocket connections that stopped receiving events + * without proper disconnection notification. + * + * @return The [Instant] when the last WebSocket event was received, or null if no events were received yet. + */ + fun lastWebSocketEventInstant(): Instant? + + /** + * Records the current timestamp as the last WebSocket event time. + * Should be called whenever a WebSocket event is received. + */ + fun recordLastWebSocketEvent() + companion object { // The same default buffer size used by Coroutines channels const val BUFFER_SIZE = 64 @@ -65,6 +83,8 @@ internal class InMemoryIncrementalSyncRepository( onBufferOverflow = BufferOverflow.DROP_OLDEST ) + private val _lastWebSocketEventInstant = MutableStateFlow(null) + override val incrementalSyncState = _syncState .asSharedFlow() .distinctUntilChanged() @@ -78,4 +98,9 @@ internal class InMemoryIncrementalSyncRepository( _syncState.emit(newState) } + override fun lastWebSocketEventInstant(): Instant? = _lastWebSocketEventInstant.value + + override fun recordLastWebSocketEvent() { + _lastWebSocketEventInstant.value = Clock.System.now() + } } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt index 1460b30a35e..59883409586 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/UserSessionScope.kt @@ -394,6 +394,8 @@ import com.wire.kalium.logic.feature.user.screenshotCensoring.ObserveScreenshotC import com.wire.kalium.logic.feature.user.screenshotCensoring.ObserveScreenshotCensoringConfigUseCaseImpl import com.wire.kalium.logic.feature.user.screenshotCensoring.PersistScreenshotCensoringConfigUseCase import com.wire.kalium.logic.feature.user.screenshotCensoring.PersistScreenshotCensoringConfigUseCaseImpl +import com.wire.kalium.logic.feature.user.webSocketStatus.GetLastWebSocketEventInstantUseCase +import com.wire.kalium.logic.feature.user.webSocketStatus.GetLastWebSocketEventInstantUseCaseImpl import com.wire.kalium.logic.feature.user.webSocketStatus.GetPersistentWebSocketStatus import com.wire.kalium.logic.feature.user.webSocketStatus.GetPersistentWebSocketStatusImpl import com.wire.kalium.logic.feature.user.webSocketStatus.PersistPersistentWebSocketConnectionStatusUseCase @@ -1352,6 +1354,7 @@ public class UserSessionScope internal constructor( cryptoTransactionProvider, userStorage.database, eventRepository, + incrementalSyncRepository, userScopedLogger, ) } @@ -2350,6 +2353,9 @@ public class UserSessionScope internal constructor( public val getPersistentWebSocketStatus: GetPersistentWebSocketStatus get() = GetPersistentWebSocketStatusImpl(userId, globalScope.sessionRepository) + public val getLastWebSocketEventInstant: GetLastWebSocketEventInstantUseCase + get() = GetLastWebSocketEventInstantUseCaseImpl(incrementalSyncRepository) + private val featureConfigRepository: FeatureConfigRepository get() = FeatureConfigDataSource( featureConfigApi = authenticatedNetworkContainer.featureConfigApi diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/webSocketStatus/GetLastWebSocketEventInstantUseCase.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/webSocketStatus/GetLastWebSocketEventInstantUseCase.kt new file mode 100644 index 00000000000..5574774ef1f --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/feature/user/webSocketStatus/GetLastWebSocketEventInstantUseCase.kt @@ -0,0 +1,42 @@ +/* + * Wire + * Copyright (C) 2026 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ + +package com.wire.kalium.logic.feature.user.webSocketStatus + +import com.wire.kalium.logic.data.sync.IncrementalSyncRepository +import kotlinx.datetime.Instant + +/** + * Use case to get the timestamp of the last WebSocket event received. + * This can be used to detect stale WebSocket connections that stopped receiving events + * without proper disconnection notification. + */ +public interface GetLastWebSocketEventInstantUseCase { + /** + * Returns the timestamp when the last WebSocket event was received. + * + * @return The [Instant] when the last WebSocket event was received, or null if no events were received yet. + */ + public operator fun invoke(): Instant? +} + +internal class GetLastWebSocketEventInstantUseCaseImpl( + private val incrementalSyncRepository: IncrementalSyncRepository +) : GetLastWebSocketEventInstantUseCase { + override fun invoke(): Instant? = incrementalSyncRepository.lastWebSocketEventInstant() +} diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt index a9a3126872e..df1f0f24767 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt @@ -166,7 +166,11 @@ internal fun IncrementalSyncManager( incrementalSyncRepository.updateIncrementalSyncState(newState) // when the source is LIVE, we need to generate a new syncId since it means the previous one is done - if (eventSource == EventSource.LIVE) Uuid.random().toString() to Clock.System.now() else syncData + if (eventSource == EventSource.LIVE) { + Uuid.random().toString() to Clock.System.now() + } else { + syncData + } }.collect() incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending) logger.i("IncrementalSync stopped.") diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt index d528c00ebeb..e6e608a8822 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorker.kt @@ -27,6 +27,7 @@ import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC import com.wire.kalium.logic.data.client.CryptoTransactionProvider import com.wire.kalium.logic.data.event.EventRepository +import com.wire.kalium.logic.data.sync.IncrementalSyncRepository import com.wire.kalium.logic.sync.KaliumSyncException import com.wire.kalium.persistence.db.UserDatabaseBuilder import io.mockative.Mockable @@ -48,12 +49,14 @@ internal interface IncrementalSyncWorker { suspend fun processEventsFlow(): Flow } +@Suppress("LongParameterList") internal class IncrementalSyncWorkerImpl( private val eventGatherer: EventGatherer, private val eventProcessor: EventProcessor, private val transactionProvider: CryptoTransactionProvider, private val databaseBuilder: UserDatabaseBuilder, private val eventRepository: EventRepository, + private val incrementalSyncRepository: IncrementalSyncRepository, logger: KaliumLogger = kaliumLogger, ) : IncrementalSyncWorker { @@ -62,17 +65,22 @@ internal class IncrementalSyncWorkerImpl( override suspend fun processEventsFlow(): Flow = channelFlow { // We start as PENDING send(EventSource.PENDING) + var isLive = false kaliumLogger.d("$TAG gatherEvents starting...") eventGatherer.gatherEvents() // If we ever become Up-To-Date, move to LIVE .onEach { eventStreamData -> if (eventStreamData is EventStreamData.IsUpToDate) { + isLive = true send(EventSource.LIVE) // We are LIVE!!!!!! } } .filterIsInstance() .collect { streamData -> + if (isLive) { + incrementalSyncRepository.recordLastWebSocketEvent() + } val envelopes = streamData.eventList kaliumLogger.d("$TAG Received ${envelopes.size} events to process") transactionProvider.transaction("processEvents") { context -> diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepositoryTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepositoryTest.kt index 3ef0e4e9be3..e6b0062de41 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepositoryTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/data/sync/IncrementalSyncRepositoryTest.kt @@ -36,10 +36,14 @@ import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout +import kotlinx.datetime.Clock import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertContentEquals import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue import kotlin.time.Duration import kotlin.time.Duration.Companion.days import kotlin.time.Duration.Companion.seconds @@ -162,4 +166,45 @@ internal class IncrementalSyncRepositoryTest { slowCollectionJob.cancel() } } + + @Test + fun givenNoWebSocketEventsRecorded_whenGettingLastWebSocketEventInstant_thenReturnsNull() = runTest { + // Given - fresh repository with no events recorded + + // When + val result = incrementalSyncRepository.lastWebSocketEventInstant() + + // Then + assertNull(result) + } + + @Test + fun givenWebSocketEventRecorded_whenGettingLastWebSocketEventInstant_thenReturnsTimestamp() = runTest { + // Given + val beforeRecord = Clock.System.now() + incrementalSyncRepository.recordLastWebSocketEvent() + val afterRecord = Clock.System.now() + + // When + val result = incrementalSyncRepository.lastWebSocketEventInstant() + + // Then + assertNotNull(result) + assertTrue(result >= beforeRecord && result <= afterRecord) + } + + @Test + fun givenMultipleWebSocketEventsRecorded_whenGettingLastWebSocketEventInstant_thenReturnsLatestTimestamp() = runTest { + // Given + incrementalSyncRepository.recordLastWebSocketEvent() + val firstTimestamp = incrementalSyncRepository.lastWebSocketEventInstant() + delay(10) // Small delay to ensure different timestamps + incrementalSyncRepository.recordLastWebSocketEvent() + val secondTimestamp = incrementalSyncRepository.lastWebSocketEventInstant() + + // Then + assertNotNull(firstTimestamp) + assertNotNull(secondTimestamp) + assertTrue(secondTimestamp >= firstTimestamp) + } } diff --git a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorkerTest.kt b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorkerTest.kt index 498ef1073cd..4407c1c257b 100644 --- a/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorkerTest.kt +++ b/logic/src/commonTest/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncWorkerTest.kt @@ -26,6 +26,7 @@ import com.wire.kalium.common.functional.Either import com.wire.kalium.logic.data.event.EventRepository import com.wire.kalium.logic.data.id.QualifiedID import com.wire.kalium.logic.data.id.toDao +import com.wire.kalium.logic.data.sync.IncrementalSyncRepository import com.wire.kalium.logic.framework.TestEvent import com.wire.kalium.logic.framework.TestEvent.wrapInEnvelope import com.wire.kalium.logic.sync.KaliumSyncException @@ -180,6 +181,7 @@ class IncrementalSyncWorkerTest { val eventProcessor: EventProcessor = mock(EventProcessor::class) val eventGatherer: EventGatherer = mock(EventGatherer::class) val eventRepository: EventRepository = mock(EventRepository::class) + val incrementalSyncRepository: IncrementalSyncRepository = mock(IncrementalSyncRepository::class) val database = TestUserDatabase( userId = QualifiedID("value", "domain").toDao(), dispatcher = TestKaliumDispatcher.default @@ -216,7 +218,7 @@ class IncrementalSyncWorkerTest { block() withTransactionReturning(Either.Right(Unit)) this to IncrementalSyncWorkerImpl( - eventGatherer, eventProcessor, cryptoTransactionProvider, database.builder, eventRepository + eventGatherer, eventProcessor, cryptoTransactionProvider, database.builder, eventRepository, incrementalSyncRepository ) } }