Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.first
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant

@Mockable
internal interface IncrementalSyncRepository {
Expand All @@ -47,6 +50,21 @@ internal interface IncrementalSyncRepository {

suspend fun updateIncrementalSyncState(newState: IncrementalSyncStatus)

/**
* Returns the timestamp of the last received WebSocket event.
* Used to detect stale WebSocket connections that stopped receiving events
* without proper disconnection notification.
*
* @return The [Instant] when the last WebSocket event was received, or null if no events were received yet.
*/
fun lastWebSocketEventInstant(): Instant?

/**
* Records the current timestamp as the last WebSocket event time.
* Should be called whenever a WebSocket event is received.
*/
fun recordLastWebSocketEvent()

companion object {
// The same default buffer size used by Coroutines channels
const val BUFFER_SIZE = 64
Expand All @@ -65,6 +83,8 @@ internal class InMemoryIncrementalSyncRepository(
onBufferOverflow = BufferOverflow.DROP_OLDEST
)

private val _lastWebSocketEventInstant = MutableStateFlow<Instant?>(null)

override val incrementalSyncState = _syncState
.asSharedFlow()
.distinctUntilChanged()
Expand All @@ -78,4 +98,9 @@ internal class InMemoryIncrementalSyncRepository(
_syncState.emit(newState)
}

override fun lastWebSocketEventInstant(): Instant? = _lastWebSocketEventInstant.value

override fun recordLastWebSocketEvent() {
_lastWebSocketEventInstant.value = Clock.System.now()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ import com.wire.kalium.logic.feature.user.screenshotCensoring.ObserveScreenshotC
import com.wire.kalium.logic.feature.user.screenshotCensoring.ObserveScreenshotCensoringConfigUseCaseImpl
import com.wire.kalium.logic.feature.user.screenshotCensoring.PersistScreenshotCensoringConfigUseCase
import com.wire.kalium.logic.feature.user.screenshotCensoring.PersistScreenshotCensoringConfigUseCaseImpl
import com.wire.kalium.logic.feature.user.webSocketStatus.GetLastWebSocketEventInstantUseCase
import com.wire.kalium.logic.feature.user.webSocketStatus.GetLastWebSocketEventInstantUseCaseImpl
import com.wire.kalium.logic.feature.user.webSocketStatus.GetPersistentWebSocketStatus
import com.wire.kalium.logic.feature.user.webSocketStatus.GetPersistentWebSocketStatusImpl
import com.wire.kalium.logic.feature.user.webSocketStatus.PersistPersistentWebSocketConnectionStatusUseCase
Expand Down Expand Up @@ -1352,6 +1354,7 @@ public class UserSessionScope internal constructor(
cryptoTransactionProvider,
userStorage.database,
eventRepository,
incrementalSyncRepository,
userScopedLogger,
)
}
Expand Down Expand Up @@ -2350,6 +2353,9 @@ public class UserSessionScope internal constructor(
public val getPersistentWebSocketStatus: GetPersistentWebSocketStatus
get() = GetPersistentWebSocketStatusImpl(userId, globalScope.sessionRepository)

public val getLastWebSocketEventInstant: GetLastWebSocketEventInstantUseCase
get() = GetLastWebSocketEventInstantUseCaseImpl(incrementalSyncRepository)

private val featureConfigRepository: FeatureConfigRepository
get() = FeatureConfigDataSource(
featureConfigApi = authenticatedNetworkContainer.featureConfigApi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Wire
* Copyright (C) 2026 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.user.webSocketStatus

import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import kotlinx.datetime.Instant

/**
* Use case to get the timestamp of the last WebSocket event received.
* This can be used to detect stale WebSocket connections that stopped receiving events
* without proper disconnection notification.
*/
public interface GetLastWebSocketEventInstantUseCase {
/**
* Returns the timestamp when the last WebSocket event was received.
*
* @return The [Instant] when the last WebSocket event was received, or null if no events were received yet.
*/
public operator fun invoke(): Instant?
}

internal class GetLastWebSocketEventInstantUseCaseImpl(
private val incrementalSyncRepository: IncrementalSyncRepository
) : GetLastWebSocketEventInstantUseCase {
override fun invoke(): Instant? = incrementalSyncRepository.lastWebSocketEventInstant()
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ internal fun IncrementalSyncManager(
incrementalSyncRepository.updateIncrementalSyncState(newState)

// when the source is LIVE, we need to generate a new syncId since it means the previous one is done
if (eventSource == EventSource.LIVE) Uuid.random().toString() to Clock.System.now() else syncData
if (eventSource == EventSource.LIVE) {
Uuid.random().toString() to Clock.System.now()
} else {
syncData
}
}.collect()
incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending)
logger.i("IncrementalSync stopped.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC
import com.wire.kalium.logic.data.client.CryptoTransactionProvider
import com.wire.kalium.logic.data.event.EventRepository
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.sync.KaliumSyncException
import com.wire.kalium.persistence.db.UserDatabaseBuilder
import io.mockative.Mockable
Expand All @@ -48,12 +49,14 @@ internal interface IncrementalSyncWorker {
suspend fun processEventsFlow(): Flow<EventSource>
}

@Suppress("LongParameterList")
internal class IncrementalSyncWorkerImpl(
private val eventGatherer: EventGatherer,
private val eventProcessor: EventProcessor,
private val transactionProvider: CryptoTransactionProvider,
private val databaseBuilder: UserDatabaseBuilder,
private val eventRepository: EventRepository,
private val incrementalSyncRepository: IncrementalSyncRepository,
logger: KaliumLogger = kaliumLogger,
) : IncrementalSyncWorker {

Expand All @@ -62,17 +65,22 @@ internal class IncrementalSyncWorkerImpl(
override suspend fun processEventsFlow(): Flow<EventSource> = channelFlow {
// We start as PENDING
send(EventSource.PENDING)
var isLive = false

kaliumLogger.d("$TAG gatherEvents starting...")
eventGatherer.gatherEvents()
// If we ever become Up-To-Date, move to LIVE
.onEach { eventStreamData ->
if (eventStreamData is EventStreamData.IsUpToDate) {
isLive = true
send(EventSource.LIVE) // We are LIVE!!!!!!
}
}
.filterIsInstance<EventStreamData.NewEvents>()
.collect { streamData ->
if (isLive) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to record only when we are live or for each event we receive via socket?
I mean again the async notifications - what if there is a scenario that we only got some messages when we were offline - in that case when we connect the socket we do receive these messages as pending non-live events so it means that at this moment the socket is healthy, but with this if (isLive) we don't record that for these non-live events, so if we don't receive any live events after pending ones then the LastWebSocketEvent will not update and will be outdated. 🤔

incrementalSyncRepository.recordLastWebSocketEvent()
}
val envelopes = streamData.eventList
kaliumLogger.d("$TAG Received ${envelopes.size} events to process")
transactionProvider.transaction("processEvents") { context ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.datetime.Clock
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration
import kotlin.time.Duration.Companion.days
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -162,4 +166,45 @@ internal class IncrementalSyncRepositoryTest {
slowCollectionJob.cancel()
}
}

@Test
fun givenNoWebSocketEventsRecorded_whenGettingLastWebSocketEventInstant_thenReturnsNull() = runTest {
// Given - fresh repository with no events recorded

// When
val result = incrementalSyncRepository.lastWebSocketEventInstant()

// Then
assertNull(result)
}

@Test
fun givenWebSocketEventRecorded_whenGettingLastWebSocketEventInstant_thenReturnsTimestamp() = runTest {
// Given
val beforeRecord = Clock.System.now()
incrementalSyncRepository.recordLastWebSocketEvent()
val afterRecord = Clock.System.now()

// When
val result = incrementalSyncRepository.lastWebSocketEventInstant()

// Then
assertNotNull(result)
assertTrue(result >= beforeRecord && result <= afterRecord)
}

@Test
fun givenMultipleWebSocketEventsRecorded_whenGettingLastWebSocketEventInstant_thenReturnsLatestTimestamp() = runTest {
// Given
incrementalSyncRepository.recordLastWebSocketEvent()
val firstTimestamp = incrementalSyncRepository.lastWebSocketEventInstant()
delay(10) // Small delay to ensure different timestamps
incrementalSyncRepository.recordLastWebSocketEvent()
val secondTimestamp = incrementalSyncRepository.lastWebSocketEventInstant()

// Then
assertNotNull(firstTimestamp)
assertNotNull(secondTimestamp)
assertTrue(secondTimestamp >= firstTimestamp)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.wire.kalium.common.functional.Either
import com.wire.kalium.logic.data.event.EventRepository
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.toDao
import com.wire.kalium.logic.data.sync.IncrementalSyncRepository
import com.wire.kalium.logic.framework.TestEvent
import com.wire.kalium.logic.framework.TestEvent.wrapInEnvelope
import com.wire.kalium.logic.sync.KaliumSyncException
Expand Down Expand Up @@ -180,6 +181,7 @@ class IncrementalSyncWorkerTest {
val eventProcessor: EventProcessor = mock(EventProcessor::class)
val eventGatherer: EventGatherer = mock(EventGatherer::class)
val eventRepository: EventRepository = mock(EventRepository::class)
val incrementalSyncRepository: IncrementalSyncRepository = mock(IncrementalSyncRepository::class)
val database = TestUserDatabase(
userId = QualifiedID("value", "domain").toDao(),
dispatcher = TestKaliumDispatcher.default
Expand Down Expand Up @@ -216,7 +218,7 @@ class IncrementalSyncWorkerTest {
block()
withTransactionReturning(Either.Right(Unit))
this to IncrementalSyncWorkerImpl(
eventGatherer, eventProcessor, cryptoTransactionProvider, database.builder, eventRepository
eventGatherer, eventProcessor, cryptoTransactionProvider, database.builder, eventRepository, incrementalSyncRepository
)
}
}
Expand Down
Loading