-
Notifications
You must be signed in to change notification settings - Fork 2
[MS-1278] Spike: Sync architecture revamp, part 1 #1537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…tSyncState - tests
…untEventsUseCase; crude replacements of counter usage
…e counter to be included)
Note: reactive count is candidate record data sources are not implemented yet.
…ocal data sources
…rd data sources. CommCare limitations outlined in a comment.
…n points eliminated in its main block
…nstream users in SyncInfoViewModel
… records from CommCare candidate data source
…rker, to prevent event sync flow from getting stuck
…ount details commented
| override suspend fun getNumberOfImagesToUpload(projectId: String): Int = localDataSource.listImages(projectId).count() | ||
|
|
||
| override suspend fun observeNumberOfImagesToUpload(projectId: String): Flow<Int> = | ||
| localDataSource.observeImageCounts(projectId).distinctUntilChanged() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that it hurts but the local data source already has .distinctUntilChanged()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just a guarantee on this level of abstraction.
| import kotlinx.coroutines.flow.map | ||
| import javax.inject.Inject | ||
|
|
||
| internal class CountEnrolmentRecordsUseCase @Inject constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CountEnrolmentRecordsUseCase() suggests it does counting on demand, not that it creates a flow.
Do we even need a dedicated use case for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to ObserveEnrolmentRecordsCountUseCase. And yes - its specialized purpose is to flatMapLatest the counts for a project ID when the project ID changes. It abstracts the query away from the callers.
| ) { | ||
| internal operator fun invoke(): Flow<Int> = configRepository | ||
| .observeProjectConfiguration() | ||
| .map { it.projectId } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Also, projectId is unused, do we need to "track" it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as above. The image repository's function takes Project ID, but we are hiding it from the caller's concern, and switching automatically when it changes (tracking).
| import kotlinx.coroutines.flow.distinctUntilChanged | ||
| import javax.inject.Inject | ||
|
|
||
| internal class EventSyncUseCase @Inject constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above - name would suggest something else (I'd think it starts syncing). And here, too - I'm not convinced the one-liner merits extraction in a use case.
| import kotlinx.coroutines.flow.transformLatest | ||
| import javax.inject.Inject | ||
|
|
||
| internal class ImageSyncUseCase @Inject constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like ObserveImageSyncStatus would convey the job it does better IMHO.
| import javax.inject.Singleton | ||
|
|
||
| @Singleton | ||
| class EventDownSyncCountsRepository @Inject internal constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a repository. More like CountEventsToDownloadUseCase()
| * together in a reactive way. | ||
| */ | ||
| @Singleton | ||
| class CountSyncableUseCase @Inject internal constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe ObserveSyncableCountsUseCase would better convey the meaning?
| @@ -0,0 +1,3 @@ | |||
| package com.simprints.infra.sync | |||
|
|
|||
| enum class SyncCommand { ObserveOnly } | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have messed up and gave you the wrong info on this one, your initial code was correct. I have confused sealed object hierarchy with enums, which have basically identical behaviour and therefore I assumed that they should be styled similarly.
Feel free to leave this as-is to avoid unnecessary back-n-forth in this PR and fix it when new commands are added, tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are both variants in the codebase, for example ScreenOrientation also has non all-uppercase values. Anyway it's going to be overwritten indeed, I'll use upper case in phase 2 then.
| combine( | ||
| observeEnrolmentRecordsCount(), | ||
| eventDownSyncCount(), | ||
| flow { // recordEventsToDownload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it would be better finally fix the "suspending function should not return flow" error in the data layer. If we are moving to use flows more, we should do it correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the ones that are part of this branch changes, non-suspending. There are other suspending ones that return flows, already in main - won't fix, out of scope. Added a "housekeeping" ticket MS-1309 for that instead.
| authStore.observeSignedInProjectId(), | ||
| eventSyncStateFlow, | ||
| imageSyncStatusFlow, | ||
| sync(eventSync = SyncCommand.OBSERVE_ONLY, imageSync = SyncCommand.OBSERVE_ONLY), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arguably, this means that the naming should be better here or you are planning to put too much into a "use case". I would suggest rethinking the "use case" into some kind of "manager"/"helper"/"doer" with dedicated methods that hide all of that complexity and provide simpler API:
class SyncSomething constructor(...) {
fun observe() = executeCommands(
eventSync = SyncCommand.OBSERVE_ONLY,
imageSync = SyncCommand.OBSERVE_ONLY
)
// ...
fun executeCommands(...) {
// ...
}
// ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 61 out of 61 changed files in this pull request and generated 5 comments.
| flow { // recordEventsToDownload | ||
| emitAll(eventRepository.observeEventCount(type = null)) | ||
| }, | ||
| flow { // eventsToUpload | ||
| emitAll( | ||
| combine( | ||
| eventRepository.observeEventCount(EventType.ENROLMENT_V2), | ||
| eventRepository.observeEventCount(EventType.ENROLMENT_V4), | ||
| ) { countV2, countV4 -> | ||
| countV2 + countV4 | ||
| } | ||
| ) | ||
| }, |
Copilot
AI
Jan 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The eventsToUpload parameter in line 38 is being set to the total count of all events (type = null), but line 40-48 shows that enrolmentsToUpload is specifically counting ENROLMENT_V2 and ENROLMENT_V4 types. This means enrolments are being counted twice: once in the total eventsToUpload count and again separately in enrolmentsToUpload. This double-counting will lead to incorrect sync counters displayed in the UI.
| flow { // recordEventsToDownload | ||
| emitAll(eventRepository.observeEventCount(type = null)) | ||
| }, | ||
| flow { // eventsToUpload | ||
| emitAll( | ||
| combine( | ||
| eventRepository.observeEventCount(EventType.ENROLMENT_V2), | ||
| eventRepository.observeEventCount(EventType.ENROLMENT_V4), | ||
| ) { countV2, countV4 -> | ||
| countV2 + countV4 | ||
| } | ||
| ) | ||
| }, | ||
| observeSamplesToUploadCount(), | ||
| ) { totalRecords, recordEventsToDownload, eventsToUpload, enrolmentsToUpload, samplesToUpload -> |
Copilot
AI
Jan 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter name recordEventsToDownload (line 37) is confusing because it's actually representing a flow that observes total events to upload (with type = null), not events to download. Based on the combine parameters and the final SyncableCounts construction, it appears the parameter on line 51 should actually be named eventsToUpload and the one on line 40-48 should be enrolmentsToUpload. The ordering and naming is inconsistent with the final data class, making this code difficult to understand and maintain.
|
|
||
| val syncReporterStates = syncStartReporterStates(syncWorkers) + syncEndReporterStates(syncWorkers) | ||
|
|
||
| val lastSyncTime = eventSyncCache.readLastSuccessfulSyncTime() |
Copilot
AI
Jan 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lastSyncTime is read from cache on every flow emission (line 50), which happens whenever worker info changes. This means the flow will call eventSyncCache.readLastSuccessfulSyncTime() potentially many times during a sync. For consistency and performance, consider reading the lastSyncTime once when the sync completes (in the END_SYNC_REPORTER worker) or caching it within this flow's scope to avoid repeated cache reads.
| flow { | ||
| while (true) { | ||
| delay(CASE_COUNT_FALLBACK_POLL_INTERVAL_MILLIS) | ||
| emit(Unit) | ||
| } | ||
| }, | ||
| ) |
Copilot
AI
Jan 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The periodic polling flow (lines 320-325) uses an infinite while (true) loop that continuously emits after delays. This coroutine is not properly scoped to the callbackFlow lifecycle. If the flow is collected and then cancelled, the periodic polling coroutine in lines 320-325 will continue running indefinitely because it's merged independently. The periodic polling should be part of the callbackFlow block so it gets cancelled when the flow is cancelled, or it should check isActive in the while loop condition.
| override fun observeImageCounts(projectId: String): Flow<Int> = imageRefChanges | ||
| .onStart { | ||
| emit(Unit) | ||
| } // initial listing | ||
| .mapLatest { listImages(projectId).size } | ||
| .distinctUntilChanged() |
Copilot
AI
Jan 26, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The observeImageCounts flow uses mapLatest which cancels the previous listImages call when a new emission arrives. However, listImages performs filesystem I/O (line 84-91 walking the directory tree), which is not cancellable. If multiple image operations happen in quick succession, this could lead to multiple concurrent filesystem walks that can't be cancelled, potentially causing performance issues. Consider using map instead of mapLatest, or adding cancellation checks within listImages if it becomes a bottleneck.
|
|
As the release/2026.1.0 branch is now created, this (to target 2026.2.0) is ready to merge in |



JIRA ticket
Will be released in: 2026.2.0
Notable changes
Goal: sync revamp, with reactive usecases.
Reasons:
ObserveSyncInfoUseCase(sync info UI computation) is convoluted and has many suspension points, despite being driven by combined flows. This also adds jitter to visualized sync process.EventSyncManagerandSyncOrchestratorare large, broadly scoped, essentially each contain loosely related use cases. They are neither repositories nor usecases.Boundaries for the changes - what's intentionally out of scope:
WorkManagerand workers. This is already the optimal foundation for sync.Main changes:
ObserveSyncInfoUseCasemade fully reactive - without suspension points.SyncUseCase, a unified (events + images) sync status reactive observation use case introduced. It is aStateFlow, with up-to-date sync status available as.valuesyncronously from anywhere.CountSyncableUseCase, a unified (events + images) counters reactive observation use case introduced.Additional effort (ongoing, partial):
EventSyncManagerandSyncOrchestratorslimmed down as some of their functions were moved to usecases.ObserveSyncInfoUseCasecomputation logic is simplified only as far as direct application of the new usecases goes. This is intentional as a waypoint in this ongoing work, to limit the size of changes and for easier code review.SyncUseCasenow only observes the sync process. This is intentional, to limit the size of changes. Sync controls are still in the SyncOrchestrator.Special note about CommCare:
CommCareCandidateRecordDataSourcecode. A fix PR can be offered on the CommCare repo to have it notify the observer abount changes, as one of the options. This may need further evaluation of importance, considering thatCommCareCandidateRecordDataSourceisn't used anyway for UI-visible counters right now.Not yet done (keeping scope limited) - also marked as
todo MS-1278in the codebase:SyncOrchestratortoSyncUseCase(with helper internal usecases where they fit)SyncOrchestratorinto more specific and focused use cases.EventSyncManagerinto more specific and focused use cases.Testing guidance
Additional work checklist