diff --git a/README.md b/README.md index e6274b4..22b6b25 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,8 @@ Check out [the example database](test/utils/test_database.dart) for a complete c final syncManager = SyncManager( localDatabase: localDatabase, supabaseClient: supabaseClient, + onSyncStarted: (event) => showLoadingIndicator(), + onSyncCompleted: (event) => hideLoadingIndicator(), ); ``` @@ -190,6 +192,70 @@ It goes through the local tables for all registered syncables and sets the user ID for all items that don't have a user ID yet. If syncing is enabled, those items will then get synced to the backend automatically. +### Monitoring Sync State 📢 + +The SyncManager provides two ways to monitor sync state: + +#### Simple Approach (Recommended) + +For most use cases, use the built-in `ChangeNotifier` interface: + +```dart +final syncManager = SyncManager( + localDatabase: localDatabase, + supabaseClient: supabaseClient, +); + +// Listen to sync state changes +syncManager.addListener(() { + if (syncManager.syncInProgress) { + showLoadingIndicator(); + } else { + hideLoadingIndicator(); + } +}); + +// Or use with ValueListenableBuilder in Flutter +ValueListenableBuilder( + valueListenable: syncManager, + builder: (context, syncInProgress, child) { + return syncInProgress + ? CircularProgressIndicator() + : Icon(Icons.check); + }, +); +``` + +#### Advanced Event Notifications + +For advanced use cases requiring detailed information per syncable type, enable detailed events: + +```dart +final syncManager = SyncManager( + localDatabase: localDatabase, + supabaseClient: supabaseClient, + enableDetailedEvents: true, // Enable detailed events + onSyncStarted: (event) { + print('Sync started for ${event.syncableType} from ${event.source}'); + // event.source can be SyncEventSource.fullSync or SyncEventSource.realtime + }, + onSyncCompleted: (event) { + print('Sync completed for ${event.syncableType}: ${event.itemsReceived} items received'); + // Access detailed statistics: itemsReceived, itemsUpdated, itemsDeleted + }, +); +``` + +**When to use detailed events:** +- You need to track sync progress per table/syncable type +- You want to distinguish between full sync and real-time sync events +- You need detailed statistics about sync operations + +**When to use simple approach:** +- You just want to show a loading indicator during sync +- You want minimal complexity and overhead +- You don't need per-table sync information + ### Optimizations ⚡ There are a few mechanisms that can drastically reduce the ongoing data diff --git a/devtools_options.yaml b/devtools_options.yaml new file mode 100644 index 0000000..fa0b357 --- /dev/null +++ b/devtools_options.yaml @@ -0,0 +1,3 @@ +description: This file stores settings for Dart & Flutter DevTools. +documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states +extensions: diff --git a/justfile b/justfile index c421119..7f0ca31 100644 --- a/justfile +++ b/justfile @@ -19,7 +19,7 @@ generate-test-entrypoints: # Runs all tests (with coverage) test: generate-test-entrypoints start-supabase supabase db test - dart test test/_test.dart --test-randomize-ordering-seed=random --coverage coverage + flutter test test/_test.dart --test-randomize-ordering-seed=random --coverage coverage dart run coverage:format_coverage --lcov --report-on lib --check-ignore -i coverage/test/_test.dart.vm.json -o coverage/lcov.info # Start Supabase for local development or testing diff --git a/lib/src/sync_event.dart b/lib/src/sync_event.dart new file mode 100644 index 0000000..cf78f00 --- /dev/null +++ b/lib/src/sync_event.dart @@ -0,0 +1,85 @@ +/// The source of a sync event. +enum SyncEventSource { + /// Data received from a realtime subscription to the backend. + realtime, + + /// Data received from a full sync with the backend. + fullSync, +} + +/// The type of sync event. +enum SyncEventType { + /// A sync operation started. + syncStarted, + + /// A sync operation completed. + syncCompleted, +} + +/// An event that occurs during data synchronization. +abstract class SyncEvent { + const SyncEvent({ + required this.type, + required this.syncableType, + required this.source, + required this.timestamp, + }); + + /// The type of sync event. + final SyncEventType type; + + /// The type of syncable that was affected. + final Type syncableType; + + /// The source of the sync event. + final SyncEventSource source; + + /// When the event occurred. + final DateTime timestamp; +} + +/// An event for when a sync operation starts. +class SyncStartedEvent extends SyncEvent { + const SyncStartedEvent({ + required super.syncableType, + required super.source, + required super.timestamp, + required this.reason, + }) : super(type: SyncEventType.syncStarted); + + /// The reason why the sync started. + final String reason; +} + +/// An event for when a sync operation completes. +class SyncCompletedEvent extends SyncEvent { + const SyncCompletedEvent({ + required super.syncableType, + required super.source, + required super.timestamp, + required this.itemsReceived, + required this.itemsUpdated, + required this.itemsDeleted, + }) : super(type: SyncEventType.syncCompleted); + + /// The number of items that were received (inserted) during this sync. + final int itemsReceived; + + /// The number of items that were updated during this sync. + final int itemsUpdated; + + /// The number of items that were deleted during this sync. + final int itemsDeleted; + + /// The total number of items processed during this sync. + int get totalItemsProcessed => itemsReceived + itemsUpdated + itemsDeleted; +} + +/// Callback function for sync events. +typedef SyncEventCallback = void Function(SyncEvent event); + +/// Callback function specifically for sync started events. +typedef SyncStartedEventCallback = void Function(SyncStartedEvent event); + +/// Callback function specifically for sync completed events. +typedef SyncCompletedEventCallback = void Function(SyncCompletedEvent event); diff --git a/lib/src/sync_manager.dart b/lib/src/sync_manager.dart index bd2211e..7aeb585 100644 --- a/lib/src/sync_manager.dart +++ b/lib/src/sync_manager.dart @@ -1,15 +1,36 @@ import 'dart:async'; +import 'dart:io'; import 'package:collection/collection.dart'; import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; import 'package:logging/logging.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; +import 'package:syncable/src/sync_event.dart'; import 'package:syncable/src/sync_timestamp_storage.dart'; import 'package:syncable/src/syncable.dart'; import 'package:syncable/src/syncable_database.dart'; import 'package:syncable/src/syncable_table.dart'; +/// Sync mode based on user activity patterns. +/// +/// This is used to adaptively adjust sync intervals to balance +/// battery consumption and responsiveness. +enum SyncMode { + /// User is actively modifying data (last change < 10 seconds). + /// Sync interval: 5 seconds for maximum responsiveness. + active, + + /// Recent modifications but user is no longer actively editing (10s - 2min). + /// Sync interval: 15 seconds for good balance. + recent, + + /// No modifications for > 2 minutes. + /// Sync interval: 30 seconds for battery conservation. + idle, +} + /// The [SyncManager] is the main class for syncing data between a local Drift /// database and a Supabase backend. /// @@ -19,7 +40,10 @@ import 'package:syncable/src/syncable_table.dart'; /// /// The [SyncManager] is designed to be used with the [SyncableDatabase] /// class, which provides the local database functionality. -class SyncManager { +/// +/// The [SyncManager] extends [ChangeNotifier] to provide a simple way to listen +/// for sync state changes via the [syncInProgress] property. +class SyncManager extends ChangeNotifier { /// Creates a new [SyncManager] instance. /// /// The [localDatabase] parameter is required and must be an instance of @@ -49,6 +73,14 @@ class SyncManager { /// in combination with [lastTimeOtherDeviceWasActive] to determine whether /// other devices are currently active or not. A real-time subscription to /// the backend is only created if other devices are considered active. + /// + /// For simple use cases, you can listen to sync state changes using the + /// [syncInProgress] property and [ChangeNotifier] interface. + /// + /// For advanced use cases, set [enableDetailedEvents] to `true` and provide + /// [onSyncStarted] and [onSyncCompleted] callbacks. These will be called when + /// sync events occur, allowing your application to respond to detailed + /// synchronization events per syncable type. SyncManager({ required T localDatabase, required SupabaseClient supabaseClient, @@ -56,12 +88,17 @@ class SyncManager { int maxRows = 1000, SyncTimestampStorage? syncTimestampStorage, Duration otherDevicesConsideredInactiveAfter = const Duration(minutes: 2), + bool enableDetailedEvents = false, + SyncStartedEventCallback? onSyncStarted, + SyncCompletedEventCallback? onSyncCompleted, }) : _localDb = localDatabase, _supabaseClient = supabaseClient, - _syncInterval = syncInterval, _maxRows = maxRows, _syncTimestampStorage = syncTimestampStorage, _devicesConsideredInactiveAfter = otherDevicesConsideredInactiveAfter, + _enableDetailedEvents = enableDetailedEvents, + _onSyncStarted = enableDetailedEvents ? onSyncStarted : null, + _onSyncCompleted = enableDetailedEvents ? onSyncCompleted : null, assert( syncInterval.inMilliseconds > 0, 'Sync interval must be positive', @@ -72,10 +109,16 @@ class SyncManager { final T _localDb; final SupabaseClient _supabaseClient; final SyncTimestampStorage? _syncTimestampStorage; - final Duration _syncInterval; final int _maxRows; final Duration _devicesConsideredInactiveAfter; + // Enable detailed events (disabled by default for simplicity) + final bool _enableDetailedEvents; + + // Callback functions for sync events + final SyncStartedEventCallback? _onSyncStarted; + final SyncCompletedEventCallback? _onSyncCompleted; + /// This is what gets set when [enableSync] gets called. Internally, whether /// the syncing is enabled or not is determined by [_syncingEnabled]. bool __syncingEnabled = false; @@ -83,6 +126,37 @@ class SyncManager { bool get _syncingEnabled => __syncingEnabled && !_disposed && userId.isNotEmpty; + /// Whether a sync is currently in progress. + /// + /// This is a simple boolean that tracks if any sync operation is currently + /// running. Use this with [ChangeNotifier] to listen for sync state changes. + /// For more granular sync events per syncable type, set [enableDetailedEvents] + /// to `true` and use the callback functions. + bool _syncInProgress = false; + bool get syncInProgress => _syncInProgress; + + /// Sets the sync in progress state and notifies listeners. + void _setSyncInProgress(bool inProgress) { + if (_syncInProgress != inProgress) { + _syncInProgress = inProgress; + notifyListeners(); + } + } + + // ============= ADAPTIVE SYNC INTERVAL FIELDS ============= + + /// Timestamp of the last detected local change. + /// Used to determine the current sync mode (active/recent/idle). + DateTime? _lastChangeDetected; + + /// Completer used to interrupt the sync loop sleep when immediate sync is needed. + /// This allows waking up the loop before the normal interval expires. + Completer? _syncTrigger; + + /// Counter for sync loop iterations. + /// Used to trigger periodic safety checks (e.g., re-sync from Drift every N iterations). + int _loopIterationCounter = 0; + /// Enables syncing for all registered syncables. /// /// This method will throw an exception if no syncables are registered. @@ -160,13 +234,16 @@ class SyncManager { final Map> _inQueues = {}; final Map> _outQueues = {}; + // Track sync source for incoming items + final Map> _incomingSources = {}; + final Map> _sentItems = {}; final Map> _receivedItems = {}; final Map>> _localSubscriptions = {}; - RealtimeChannel? _backendSubscription; - bool get isSubscribedToBackend => _backendSubscription != null; + final Map _backendSubscriptions = {}; + bool get isSubscribedToBackend => _backendSubscriptions.isNotEmpty; /// The number of items of type [syncable] that have been synced to the /// backend. @@ -181,12 +258,18 @@ class SyncManager { int _nFullSyncs = 0; int get nFullSyncs => _nFullSyncs; + @override void dispose() { _disposed = true; for (final subscription in _localSubscriptions.values) { subscription.cancel(); } - _backendSubscription?.unsubscribe(); + // Unsubscribe from all channels + for (final subscription in _backendSubscriptions.values) { + subscription.unsubscribe(); + } + _backendSubscriptions.clear(); + super.dispose(); } /// Registers a syncable table with the sync manager. @@ -230,15 +313,46 @@ class SyncManager { _companions[S] = companionConstructor; _inQueues[S] = {}; _outQueues[S] = {}; + _incomingSources[S] = {}; _sentItems[S] = {}; _receivedItems[S] = {}; } Future _startLoop() async { + if (_loopRunning) { + _logger.warning('Sync loop already running, skipping start'); + return; + } _loopRunning = true; - _logger.info('Sync loop started'); + _logger.info('Sync loop started with adaptive intervals'); while (!_disposed) { + final iterationStart = DateTime.now(); + _loopIterationCounter++; + _logger.fine('Sync loop iteration #$_loopIterationCounter starting'); + + // ✅ SÉCURITÉ : Tous les 20 loops, resynchroniser depuis Drift + // Cela capture tout item qui aurait pu être perdu de la RAM + if (_loopIterationCounter % 20 == 0) { + _logger.info('🔄 Periodic safety check (iteration #$_loopIterationCounter): re-syncing from Drift'); + try { + for (final syncable in _syncables) { + if (_disposed) break; + + // Récupérer les items locaux depuis Drift + final localItems = await _localDb.select(_localTables[syncable]!).get(); + + // Pousser vers outQueue seulement les items du user actuel + _pushLocalChangesToOutQueue( + syncable, + localItems.where((i) => i.userId == _userId), + ); + } + } catch (e, s) { + _logger.severe('Error during periodic Drift re-sync: $e\n$s'); + } + } + try { for (final syncable in _syncables) { if (_disposed) break; @@ -262,7 +376,24 @@ class SyncManager { } if (_disposed) break; - await Future.delayed(_syncInterval); + + // ✨ ADAPTIVE SYNC: Determine current mode and interval + final currentMode = _getCurrentMode(); + final interval = _getIntervalForMode(currentMode); + + print('💤 [Syncable] Loop iteration complete, sleeping for ${interval.inSeconds}s (mode: $currentMode)'); + + // Create a new completer for the next potential interruption + _syncTrigger = Completer(); + + // Wait for EITHER the timeout OR an immediate sync trigger + await Future.any([ + Future.delayed(interval), + _syncTrigger!.future, + ]); + + final actualWaitTime = DateTime.now().difference(iterationStart); + print('⏰ [Syncable] Woke up after ${actualWaitTime.inSeconds}s (expected: ${interval.inSeconds}s)'); } _loopRunning = false; @@ -347,11 +478,29 @@ class SyncManager { row.updatedAt.isAfter(outQueue[row.id]?.updatedAt ?? DateTime(0)) && row.updatedAt.isAfter(_lastPushedTimestamp(syncable) ?? DateTime(0)); + bool hasNewItems = false; + for (final row in rows .where((r) => !receivedItems.contains(r)) .where(updateHasNotBeenSentYet)) { outQueue[row.id] = row; + hasNewItems = true; + } + + // ✨ ADAPTIVE SYNC: Detect changes and potentially wake up the loop + if (hasNewItems) { + // Update the last change timestamp + _lastChangeDetected = DateTime.now(); + + // If we're in IDLE or RECENT mode, wake up the loop immediately for faster sync + final currentMode = _getCurrentMode(); + if (currentMode == SyncMode.idle || currentMode == SyncMode.recent) { + print('⚡ [Syncable] Local changes detected in $currentMode mode - triggering immediate sync'); + _triggerImmediateSync(); + } else { + print('📝 [Syncable] Local changes detected in $currentMode mode - will sync at next iteration'); + } } } @@ -359,13 +508,13 @@ class SyncManager { final otherDevicesActive = _otherDevicesActive(); if (!_syncingEnabled || !otherDevicesActive) { - if (_backendSubscription != null) { - _backendSubscription?.unsubscribe(); - _backendSubscription = null; + // Unsubscribe from all existing channels + for (final subscription in _backendSubscriptions.values) { + subscription.unsubscribe(); } + _backendSubscriptions.clear(); String reason; - if (!__syncingEnabled) { reason = 'syncing is disabled'; } else if (userId.isEmpty) { @@ -377,26 +526,57 @@ class SyncManager { } _logger.warning('Not subscribed to backend changes because $reason'); - return; } - if (_backendSubscription != null) { - return; + if (_backendSubscriptions.isNotEmpty) { + return; // Already subscribed } - _backendSubscription = _supabaseClient.channel('backend_changes'); - + // Create a dedicated channel for each table for (final syncable in _syncables) { - _backendSubscription?.onPostgresChanges( + final tableName = _backendTables[syncable]!; + final channelName = 'sync_$tableName'; + + _logger.info('Creating Realtime subscription for table: $tableName'); + + final channel = _supabaseClient.channel(channelName); + + channel.onPostgresChanges( schema: publicSchema, - table: _backendTables[syncable], + table: tableName, event: PostgresChangeEvent.all, callback: (p) { - if (_disposed) return; + if (_disposed) { + return; + } + if (p.newRecord.isNotEmpty) { - final item = _fromJsons[syncable]!(p.newRecord); - _inQueues[syncable]!.add(item); + try { + final item = _fromJsons[syncable]!(p.newRecord); + final timestamp = DateTime.now().millisecondsSinceEpoch; + print('🔔 [Syncable] REALTIME [$timestamp]: Received item ${item.id} for ${syncable.toString()}'); + + _inQueues[syncable]!.add(item); + print('🔔 [Syncable] REALTIME: Queue size after add: ${_inQueues[syncable]!.length}'); + + if (_enableDetailedEvents) { + _incomingSources[syncable]![item.id] = SyncEventSource.realtime; + } + + // ⚡ NOUVEAU : Traiter immédiatement au lieu d'attendre le sync loop + // Utilise unawaited pour ne pas bloquer le callback Realtime + _processIncomingImmediate(syncable).then((_) { + final endTimestamp = DateTime.now().millisecondsSinceEpoch; + final latency = endTimestamp - timestamp; + print('✅ [Syncable] REALTIME: Processed in ${latency}ms'); + }).catchError((e, stackTrace) { + _logger.severe('Error in immediate processing for $tableName: $e', e, stackTrace as StackTrace?); + }); + + } catch (e, stack) { + _logger.severe('Error processing Realtime event for $tableName: $e', e, stack); + } } }, filter: PostgresChangeFilter( @@ -405,17 +585,21 @@ class SyncManager { value: _userId, ), ); - } - _backendSubscription?.subscribe((status, error) { - if (error != null) { - // coverage:ignore-start - _logger.severe('Backend subscription error: $error'); - // coverage:ignore-end - } - }); + // Subscribe to the channel + channel.subscribe((status, error) { + if (error != null) { + _logger.severe('Realtime subscription error for $tableName: $error'); + } else if (status == RealtimeSubscribeStatus.subscribed) { + _logger.info('Realtime subscription active for $tableName'); + } + }); + + // Store the channel for later cleanup + _backendSubscriptions[tableName] = channel; + } - _logger.info('Subscribed to backend changes'); + _logger.info('Subscribed to backend changes for ${_syncables.length} tables'); } /// Syncs all tables registered with the sync manager. @@ -430,24 +614,108 @@ class SyncManager { await _syncTables('Manual sync'); } + /// Process all pending incoming data immediately. + /// + /// This method processes any data that has been fetched from the backend + /// but not yet written to the local database. This is useful for ensuring + /// data is immediately available in the UI after a sync operation. + /// + /// Normally, incoming data is processed in the main sync loop which runs + /// at regular intervals. This method allows you to bypass that wait. + Future processIncomingImmediately() async { + if (!__syncingEnabled) { + _logger.warning('Cannot process incoming data - syncing is disabled'); + return; + } + + if (userId.isEmpty) { + _logger.warning('Cannot process incoming data - user ID is empty'); + return; + } + + _logger.info('Processing incoming data immediately'); + + // Process all pending incoming data for each syncable + for (final syncable in _syncables) { + await _processIncoming(syncable); + } + + _logger.info('Immediate incoming data processing complete'); + } + Future _syncTables(String reason) async { + print('🎯 [Syncable] _syncTables called - reason: $reason'); + if (!__syncingEnabled) { + print('❌ [Syncable] _syncTables aborted - syncing disabled'); _logger.warning('Tables not getting synced because syncing is disabled'); return; } if (userId.isEmpty) { + print('❌ [Syncable] _syncTables aborted - userId empty'); _logger.warning('Tables not getting synced because user ID is empty'); return; } _logger.info('Syncing all tables. Reason: $reason'); - for (final syncable in _syncables) { - await _syncTable(syncable); - } + // Set sync in progress and notify listeners + _setSyncInProgress(true); + + try { + // Emit sync started events (only if detailed events are enabled) + if (_enableDetailedEvents) { + for (final syncable in _syncables) { + if (_onSyncStarted != null) { + final event = SyncStartedEvent( + syncableType: syncable, + source: SyncEventSource.fullSync, + timestamp: DateTime.now().toUtc(), + reason: reason, + ); + _onSyncStarted(event); + } + } + } - _nFullSyncs++; + // Track initial queue sizes to detect if items were added during sync + final initialQueueSizes = {}; + for (final syncable in _syncables) { + initialQueueSizes[syncable] = _inQueues[syncable]!.length; + } + + for (final syncable in _syncables) { + await _syncTable(syncable); + } + + // Emit fallback sync completed events for tables that didn't get any new items + // Real events with statistics are emitted in _processIncoming + if (_enableDetailedEvents) { + for (final syncable in _syncables) { + final initialSize = initialQueueSizes[syncable]!; + final currentSize = _inQueues[syncable]!.length; + + // Only emit fallback event if no items were added during sync + if (currentSize == initialSize && _onSyncCompleted != null) { + final event = SyncCompletedEvent( + syncableType: syncable, + source: SyncEventSource.fullSync, + timestamp: DateTime.now().toUtc(), + itemsReceived: 0, + itemsUpdated: 0, + itemsDeleted: 0, + ); + _onSyncCompleted(event); + } + } + } + + _nFullSyncs++; + } finally { + // Clear sync in progress and notify listeners + _setSyncInProgress(false); + } } Future _syncTable(Type syncable) async { @@ -491,14 +759,40 @@ class SyncManager { // Use batches because all the UUIDs make the URI become too long otherwise. for (final batch in itemsToPull.slices(100)) { if (!_syncingEnabled) return; - final pulledBatch = await _supabaseClient - .from(_backendTables[syncable]!) - .select() - .eq(userIdKey, _userId) - .inFilter(idKey, batch) - .then((data) => data.map(_fromJsons[syncable]!)); - _inQueues[syncable]!.addAll(pulledBatch); + try { + final pulledBatch = await _supabaseClient + .from(_backendTables[syncable]!) + .select() + .eq(userIdKey, _userId) + .inFilter(idKey, batch) + .then((data) => data.map(_fromJsons[syncable]!)); + + print('📦 [Syncable] Adding ${pulledBatch.length} items to queue for ${syncable.toString()}'); + _inQueues[syncable]!.addAll(pulledBatch); + print('📦 [Syncable] Queue size after add: ${_inQueues[syncable]!.length} for ${syncable.toString()}'); + // Mark these as full sync items (only if detailed events are enabled) + if (_enableDetailedEvents) { + for (final item in pulledBatch) { + _incomingSources[syncable]![item.id] = SyncEventSource.fullSync; + } + } + } on SocketException catch (e) { + _logger.warning( + 'Network error during item pull for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on network error + } on HttpException catch (e) { + _logger.warning( + 'HTTP error during item pull for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on HTTP error + } catch (e) { + _logger.severe( + 'Unexpected error during item pull for ${_backendTables[syncable]}: $e', + ); + break; // Exit loop on any other error + } } _updateLastPulledTimeStamp(syncable, DateTime.now().toUtc()); @@ -520,35 +814,76 @@ class SyncManager { return false; } + /// Quick connectivity check to prevent network calls when offline + Future _hasNetworkConnectivity() async { + try { + final result = await InternetAddress.lookup('google.com') + .timeout(const Duration(seconds: 2)); + return result.isNotEmpty && result[0].rawAddress.isNotEmpty; + } on SocketException catch (_) { + return false; + } catch (_) { + return false; + } + } + /// Retrieves the IDs and `lastUpdatedAt` timestamps for all rows of a /// syncable in the backend. These can be used to determine which items need /// to be synced from the backend. Future>> _fetchBackendItemMetadata( Type syncable, ) async { + // 🛡️ OFFLINE PROTECTION: Check network connectivity before making requests + // This prevents ClientSocketException crashes when offline + final hasNetwork = await _hasNetworkConnectivity(); + if (!hasNetwork) { + _logger.warning( + 'Skipping backend metadata fetch for ${_backendTables[syncable]} - ' + 'no network connectivity detected', + ); + return []; // Return empty list instead of crashing + } + final List> backendItems = []; int offset = 0; bool hasMore = true; while (hasMore && _syncingEnabled) { - final batch = await _supabaseClient - .from(_backendTables[syncable]!) - .select('$idKey,$updatedAtKey') - .eq(userIdKey, _userId) - .range(offset, offset + _maxRows - 1) - // Use consistent ordering to prevent duplicates - .order(idKey, ascending: true); - - backendItems.addAll(batch); - hasMore = batch.length == _maxRows; - offset += _maxRows; - - if (batch.isNotEmpty) { - _logger.info( - 'Fetched batch of ${batch.length} metadata items for table ' - "'${_backendTables[syncable]!}', total so far: ${backendItems.length}", + try { + final batch = await _supabaseClient + .from(_backendTables[syncable]!) + .select('$idKey,$updatedAtKey') + .eq(userIdKey, _userId) + .range(offset, offset + _maxRows - 1) + // Use consistent ordering to prevent duplicates + .order(idKey, ascending: true); + + backendItems.addAll(batch); + hasMore = batch.length == _maxRows; + offset += _maxRows; + + if (batch.isNotEmpty) { + _logger.info( + 'Fetched batch of ${batch.length} metadata items for table ' + "'${_backendTables[syncable]!}', total so far: ${backendItems.length}", + ); + } + } on SocketException catch (e) { + _logger.warning( + 'Network error during metadata fetch for ${_backendTables[syncable]}: ${e.message}', ); + break; // Exit loop on network error to prevent crashes + } on HttpException catch (e) { + _logger.warning( + 'HTTP error during metadata fetch for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on HTTP error + } catch (e) { + _logger.severe( + 'Unexpected error during metadata fetch for ${_backendTables[syncable]}: $e', + ); + break; // Exit loop on any other error } } @@ -578,11 +913,30 @@ class SyncManager { final backendTable = _backendTables[syncable]!; final sentItems = _sentItems[syncable]!; + // ✅ NOUVEAU : Log si on retry des items + if (outQueue.isNotEmpty) { + final itemCount = outQueue.length; + _logger.info('📤 Processing $itemCount outgoing items for $backendTable'); + + // Détecter si c'est un retry (items plus vieux que 30 secondes) + final now = DateTime.now(); + final hasOldItems = outQueue.values.any((item) => + now.difference(item.updatedAt) > const Duration(seconds: 30) + ); + + if (hasOldItems) { + _logger.warning('⚠️ Retrying items from previous failed sync attempt for $backendTable'); + } + } + while (_syncingEnabled && outQueue.isNotEmpty) { final outgoing = Set.from( outQueue.values.where((f) => f.userId == _userId), ); - outQueue.clear(); + + // ✅ PROTECTION DONNÉES : Ne PAS vider immédiatement + // Queue sera vidée SEULEMENT après succès de l'upsert + // outQueue.clear(); ← SUPPRIMÉ pour éviter perte de données if (outgoing.isEmpty) continue; @@ -592,12 +946,36 @@ class SyncManager { assert(!outgoing.any((s) => s.userId?.isEmpty ?? true)); - await _supabaseClient - .from(backendTable) - .upsert( - outgoing.map((x) => x.toJson()).toList(), - onConflict: '$idKey,$userIdKey', - ); + try { + await _supabaseClient + .from(backendTable) + .upsert( + outgoing.map((x) => x.toJson()).toList(), + onConflict: idKey, + ); + + // ✅ NOUVEAU : Vider la queue SEULEMENT après succès de l'upsert + for (final item in outgoing) { + outQueue.remove(item.id); + } + + } on SocketException catch (e) { + _logger.warning( + 'Network error during upsert to $backendTable: ${e.message}', + ); + // ✅ Queue intacte, items seront retentés au prochain loop + break; + } on HttpException catch (e) { + _logger.warning( + 'HTTP error during upsert to $backendTable: ${e.message}', + ); + break; + } catch (e) { + _logger.severe( + 'Unexpected error during upsert to $backendTable: $e', + ); + break; + } sentItems.addAll(outgoing); @@ -616,35 +994,147 @@ class SyncManager { Future _processIncoming(Type syncable) async { final inQueue = _inQueues[syncable]!; - if (inQueue.isEmpty) return; + print('🔍 [Syncable] _processIncoming called for ${syncable.toString()} - queue size: ${inQueue.length}'); + if (inQueue.isEmpty) { + print('❌ [Syncable] Queue is empty for ${syncable.toString()}, skipping processing'); + return; + } final sentItems = _sentItems[syncable]!; final receivedItems = _receivedItems[syncable]!; final itemsToWrite = {}; + var syncSource = SyncEventSource.fullSync; // Default for (final item in inQueue) { - // Skip if already processed - if (sentItems.contains(item) || receivedItems.contains(item)) { + // Skip only if already received from backend + // Items that were sent locally should still be processed when they come back from the server + if (receivedItems.contains(item)) { + print('❌ [Syncable] SKIP - already received: ${item.id}'); continue; } + + // Log if item was sent locally but is now being received from backend + if (sentItems.contains(item)) { + print('🔄 [Syncable] Processing server confirmation for locally sent item: ${item.id}'); + } else { + print('✅ [Syncable] Adding new item from backend: ${item.id}'); + } + itemsToWrite[item.id] = item; + + // Use the first item's source as the batch source (only if detailed events are enabled) + if (_enableDetailedEvents && itemsToWrite.length == 1) { + final incomingSources = _incomingSources[syncable]!; + syncSource = incomingSources[item.id] ?? SyncEventSource.fullSync; + } } inQueue.clear(); - await _batchWriteIncoming(syncable, itemsToWrite); + // Clean up source tracking for processed items (only if detailed events are enabled) + if (_enableDetailedEvents) { + final incomingSources = _incomingSources[syncable]!; + for (final itemId in itemsToWrite.keys) { + incomingSources.remove(itemId); + } + } - receivedItems.addAll(itemsToWrite.values); - _nSyncedFromBackend[syncable] = - nSyncedFromBackend(syncable) + itemsToWrite.length; + if (itemsToWrite.isNotEmpty) { + final writeStats = await _batchWriteIncoming(syncable, itemsToWrite); + + receivedItems.addAll(itemsToWrite.values); + _nSyncedFromBackend[syncable] = + nSyncedFromBackend(syncable) + itemsToWrite.length; + + // Emit sync completed event with real statistics (only if detailed events are enabled) + if (_enableDetailedEvents && _onSyncCompleted != null) { + final event = SyncCompletedEvent( + syncableType: syncable, + source: syncSource, + timestamp: DateTime.now().toUtc(), + itemsReceived: writeStats.itemsInserted, + itemsUpdated: writeStats.itemsUpdated, + itemsDeleted: 0, // Not implemented yet + ); + _onSyncCompleted(event); + } + } + } + + /// Processes incoming items immediately for a specific syncable type. + /// + /// This method is called when items arrive via Realtime subscriptions to + /// provide instant UI updates instead of waiting for the next sync loop iteration. + /// + /// Unlike [_processIncoming], this only processes items for one syncable type + /// and is designed to be called asynchronously without blocking the Realtime callback. + Future _processIncomingImmediate(Type syncable) async { + if (!_syncingEnabled) { + print('⚠️ [Syncable] Skipping immediate processing - syncing disabled'); + return; + } + + print('⚡ [Syncable] IMMEDIATE processing triggered for ${syncable.toString()}'); + + // Process this specific syncable immediately + await _processIncoming(syncable); + + print('✅ [Syncable] IMMEDIATE processing completed for ${syncable.toString()}'); } - Future _batchWriteIncoming( + // ============= ADAPTIVE SYNC HELPER METHODS ============= + + /// Determines the current sync mode based on the time since last change. + /// + /// - ACTIVE: Last change < 10 seconds (sync every 5s) + /// - RECENT: Last change between 10s and 2 minutes (sync every 15s) + /// - IDLE: No change for > 2 minutes (sync every 30s) + SyncMode _getCurrentMode() { + if (_lastChangeDetected == null) { + return SyncMode.idle; + } + + final timeSinceLastChange = DateTime.now().difference(_lastChangeDetected!); + + if (timeSinceLastChange < const Duration(seconds: 10)) { + return SyncMode.active; + } else if (timeSinceLastChange < const Duration(minutes: 2)) { + return SyncMode.recent; + } else { + return SyncMode.idle; + } + } + + /// Returns the sync interval for the given mode. + Duration _getIntervalForMode(SyncMode mode) { + switch (mode) { + case SyncMode.active: + return const Duration(seconds: 5); + case SyncMode.recent: + return const Duration(seconds: 15); + case SyncMode.idle: + return const Duration(seconds: 30); + } + } + + /// Triggers an immediate sync by completing the sync trigger. + /// + /// This wakes up the sync loop before the normal interval expires, + /// allowing for faster sync when changes are detected. + void _triggerImmediateSync() { + if (_syncTrigger != null && !_syncTrigger!.isCompleted) { + _syncTrigger!.complete(); + } + } + + Future _batchWriteIncoming( Type syncable, Map incomingItems, ) async { - if (incomingItems.isEmpty) return; + if (incomingItems.isEmpty) { + return const WriteStats(itemsInserted: 0, itemsUpdated: 0); + } final table = _localTables[syncable]! as TableInfo; @@ -662,9 +1152,13 @@ class SyncManager { for (final incomingItem in incomingItems.values) { final existingUpdatedAt = existingItems[incomingItem.id]; if (existingUpdatedAt == null) { + print('🔧 [Syncable] Inserting new item: ${incomingItem.id}'); itemsToInsert.add(incomingItem.toCompanion()); } else if (incomingItem.updatedAt.isAfter(existingUpdatedAt)) { + print('🔧 [Syncable] Updating item: ${incomingItem.id} (${incomingItem.updatedAt} > $existingUpdatedAt)'); itemsToReplace.add(incomingItem.toCompanion()); + } else { + print('❌ [Syncable] SKIPPING item: ${incomingItem.id} - incoming: ${incomingItem.updatedAt}, existing: $existingUpdatedAt'); } } @@ -672,6 +1166,11 @@ class SyncManager { batch.insertAll(table, itemsToInsert); batch.replaceAll(table, itemsToReplace); }); + + return WriteStats( + itemsInserted: itemsToInsert.length, + itemsUpdated: itemsToReplace.length, + ); } DateTime? _lastPushedTimestamp(Type syncable) { @@ -724,6 +1223,30 @@ class SyncManager { return DateTime.now().difference(lastTimeOtherDeviceWasActive!) < _devicesConsideredInactiveAfter; } + + /// Clears all internal sync state collections to ensure a clean sync state. + /// + /// This should be called during user authentication changes to prevent + /// newly synchronized items from being treated as "already processed" + /// due to persistent state from previous sync sessions. + void clearSyncState() { + _logger.info('Clearing sync state collections for clean authentication'); + + // Clear tracking collections for all syncable types + for (final syncable in _syncables) { + _inQueues[syncable]?.clear(); + _outQueues[syncable]?.clear(); + _incomingSources[syncable]?.clear(); + _sentItems[syncable]?.clear(); + _receivedItems[syncable]?.clear(); + } + + // Reset sync counters + _nSyncedToBackend.clear(); + _nSyncedFromBackend.clear(); + + _logger.info('Sync state cleared successfully'); + } } typedef CompanionConstructor = @@ -742,3 +1265,11 @@ enum TimestampType { const TimestampType(this.name); final String name; } + +/// Statistics returned by batch write operations. +class WriteStats { + const WriteStats({required this.itemsInserted, required this.itemsUpdated}); + + final int itemsInserted; + final int itemsUpdated; +} diff --git a/lib/syncable.dart b/lib/syncable.dart index 11f9ca3..4f5fbe0 100644 --- a/lib/syncable.dart +++ b/lib/syncable.dart @@ -1,6 +1,7 @@ /// Syncable is a library for offline-first multi-device data synchronization in Flutter apps./// library; +export 'package:syncable/src/sync_event.dart'; export 'package:syncable/src/sync_manager.dart'; export 'package:syncable/src/sync_timestamp_storage.dart'; export 'package:syncable/src/syncable.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index c802bfa..f8768c9 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -10,6 +10,8 @@ environment: dependencies: collection: ^1.19.0 drift: ^2.26.0 + flutter: + sdk: flutter logging: ^1.3.0 supabase: ^2.6.3 @@ -17,6 +19,8 @@ dev_dependencies: build_runner: ^2.4.15 drift_dev: ^2.26.0 equatable: ^2.0.7 + flutter_test: + sdk: flutter http: ^1.3.0 json_annotation: ^4.9.0 json_serializable: ^6.9.4 diff --git a/test/integration_test.dart b/test/integration_test.dart index a3526f1..83b5af5 100644 --- a/test/integration_test.dart +++ b/test/integration_test.dart @@ -5,7 +5,7 @@ import 'package:drift/native.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; import 'package:syncable/syncable.dart'; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; diff --git a/test/sync_events_test.dart b/test/sync_events_test.dart new file mode 100644 index 0000000..8bf46cd --- /dev/null +++ b/test/sync_events_test.dart @@ -0,0 +1,253 @@ +import 'dart:convert'; + +import 'package:drift/drift.dart' as drift; +import 'package:drift/native.dart' as drift_native; +import 'package:http/http.dart'; +import 'package:mockito/mockito.dart'; +import 'package:supabase/supabase.dart'; +import 'package:syncable/src/supabase_names.dart'; +import 'package:syncable/syncable.dart'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:uuid/uuid.dart'; + +import 'utils/test_database.dart'; +import 'utils/test_mocks.mocks.dart'; +import 'utils/test_supabase_names.dart'; +import 'utils/wait_for_function_to_pass.dart'; + +void main() { + late TestDatabase testDb; + late MockSupabaseClient mockSupabaseClient; + late MockSupabaseQueryBuilder mockQueryBuilder; + late MockClient mockHttpClient; + + setUp(() { + testDb = TestDatabase( + drift.DatabaseConnection( + drift_native.NativeDatabase.memory(), + closeStreamsSynchronously: true, + ), + ); + + // Set up mocks for Supabase + mockSupabaseClient = MockSupabaseClient(); + mockQueryBuilder = MockSupabaseQueryBuilder(); + mockHttpClient = MockClient(); + + when( + mockSupabaseClient.from(itemsTable), + ).thenAnswer((_) => mockQueryBuilder); + when( + mockQueryBuilder.upsert(any, onConflict: anyNamed('onConflict')), + ).thenAnswer( + (_) => PostgrestFilterBuilder( + PostgrestBuilder( + url: Uri(), + headers: {}, + method: 'POST', + httpClient: mockHttpClient, + ), + ), + ); + when( + mockHttpClient.post( + any, + headers: anyNamed('headers'), + body: anyNamed('body'), + ), + ).thenAnswer( + (_) async => Response( + jsonEncode([ + {idKey: 'abc'}, + ]), + 200, + request: Request('POST', Uri()), + ), + ); + when(mockQueryBuilder.select(any)).thenAnswer( + (_) => PostgrestFilterBuilder( + PostgrestBuilder( + url: Uri(), + headers: {}, + method: 'GET', + httpClient: mockHttpClient, + ), + ), + ); + when(mockHttpClient.get(any, headers: anyNamed('headers'))).thenAnswer( + (_) async => + Response(jsonEncode([]), 200, request: Request('GET', Uri())), + ); + + // Set up mocks for real-time + final mockRealtimeChannel = MockRealtimeChannel(); + when(mockSupabaseClient.channel(any)).thenReturn(mockRealtimeChannel); + when( + mockRealtimeChannel.onPostgresChanges( + schema: anyNamed('schema'), + table: anyNamed('table'), + event: anyNamed('event'), + callback: anyNamed('callback'), + ), + ).thenReturn(mockRealtimeChannel); + when( + mockRealtimeChannel.subscribe(), + ).thenAnswer((_) => mockRealtimeChannel); + }); + + tearDown(() async { + await testDb.close(); + }); + + test('SyncManager calls onSyncStarted callback when sync begins', () async { + final syncStartedEvents = []; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) { + syncStartedEvents.add(event); + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(syncStartedEvents.length, greaterThanOrEqualTo(1)); + }); + + final event = syncStartedEvents.first; + expect(event.type, equals(SyncEventType.syncStarted)); + expect(event.syncableType, equals(Item)); + expect(event.source, equals(SyncEventSource.fullSync)); + expect(event.reason, isNotEmpty); + }); + + test( + 'SyncManager calls onSyncCompleted callback when sync completes', + () async { + final syncCompletedEvents = []; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncCompleted: (event) { + syncCompletedEvents.add(event); + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(syncCompletedEvents.length, greaterThanOrEqualTo(1)); + }); + + final event = syncCompletedEvents.first; + expect(event.type, equals(SyncEventType.syncCompleted)); + expect(event.syncableType, equals(Item)); + expect(event.source, equals(SyncEventSource.fullSync)); + }, + ); + + test('Callback parameters are correctly typed', () async { + // Test that callback parameter types are working correctly + SyncStartedEvent? startedEvent; + SyncCompletedEvent? completedEvent; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) { + startedEvent = event; + }, + onSyncCompleted: (event) { + completedEvent = event; + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(startedEvent, isNotNull); + expect(completedEvent, isNotNull); + }); + + expect(startedEvent?.reason, isNotEmpty); + expect( + completedEvent?.totalItemsProcessed, + equals(0), + ); // No items from mock + }); + + test('Sync events contain proper timestamps', () async { + final events = []; + final testStartTime = DateTime.now().toUtc(); + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) => events.add(event), + onSyncCompleted: (event) => events.add(event), + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(events.length, greaterThanOrEqualTo(1)); + }); + + for (final event in events) { + expect(event.timestamp.isAfter(testStartTime), isTrue); + expect( + event.timestamp.isBefore( + DateTime.now().toUtc().add(const Duration(seconds: 1)), + ), + isTrue, + ); + } + }); +} diff --git a/test/sync_manager_test.dart b/test/sync_manager_test.dart index cb8f294..7731ede 100644 --- a/test/sync_manager_test.dart +++ b/test/sync_manager_test.dart @@ -7,7 +7,7 @@ import 'package:mockito/mockito.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; import 'package:syncable/syncable.dart'; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; diff --git a/test/syncable_database_test.dart b/test/syncable_database_test.dart index 2d559c0..f3afa87 100644 --- a/test/syncable_database_test.dart +++ b/test/syncable_database_test.dart @@ -2,7 +2,7 @@ import 'dart:async'; import 'package:drift/drift.dart' as drift; import 'package:drift/native.dart' as drift_native; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; diff --git a/test/utils/test_database.g.dart b/test/utils/test_database.g.dart index 4e82331..4aee127 100644 --- a/test/utils/test_database.g.dart +++ b/test/utils/test_database.g.dart @@ -125,7 +125,7 @@ class $ItemsTable extends Items with TableInfo<$ItemsTable, Item> { @override Item map(Map data, {String? tablePrefix}) { final effectivePrefix = tablePrefix != null ? '$tablePrefix.' : ''; - return Item( + return Item.new( id: attachedDatabase.typeMapping.read( DriftSqlType.string, data['${effectivePrefix}id'],