diff --git a/README.md b/README.md index e6274b4..22b6b25 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,8 @@ Check out [the example database](test/utils/test_database.dart) for a complete c final syncManager = SyncManager( localDatabase: localDatabase, supabaseClient: supabaseClient, + onSyncStarted: (event) => showLoadingIndicator(), + onSyncCompleted: (event) => hideLoadingIndicator(), ); ``` @@ -190,6 +192,70 @@ It goes through the local tables for all registered syncables and sets the user ID for all items that don't have a user ID yet. If syncing is enabled, those items will then get synced to the backend automatically. +### Monitoring Sync State 📢 + +The SyncManager provides two ways to monitor sync state: + +#### Simple Approach (Recommended) + +For most use cases, use the built-in `ChangeNotifier` interface: + +```dart +final syncManager = SyncManager( + localDatabase: localDatabase, + supabaseClient: supabaseClient, +); + +// Listen to sync state changes +syncManager.addListener(() { + if (syncManager.syncInProgress) { + showLoadingIndicator(); + } else { + hideLoadingIndicator(); + } +}); + +// Or use with ValueListenableBuilder in Flutter +ValueListenableBuilder( + valueListenable: syncManager, + builder: (context, syncInProgress, child) { + return syncInProgress + ? CircularProgressIndicator() + : Icon(Icons.check); + }, +); +``` + +#### Advanced Event Notifications + +For advanced use cases requiring detailed information per syncable type, enable detailed events: + +```dart +final syncManager = SyncManager( + localDatabase: localDatabase, + supabaseClient: supabaseClient, + enableDetailedEvents: true, // Enable detailed events + onSyncStarted: (event) { + print('Sync started for ${event.syncableType} from ${event.source}'); + // event.source can be SyncEventSource.fullSync or SyncEventSource.realtime + }, + onSyncCompleted: (event) { + print('Sync completed for ${event.syncableType}: ${event.itemsReceived} items received'); + // Access detailed statistics: itemsReceived, itemsUpdated, itemsDeleted + }, +); +``` + +**When to use detailed events:** +- You need to track sync progress per table/syncable type +- You want to distinguish between full sync and real-time sync events +- You need detailed statistics about sync operations + +**When to use simple approach:** +- You just want to show a loading indicator during sync +- You want minimal complexity and overhead +- You don't need per-table sync information + ### Optimizations ⚡ There are a few mechanisms that can drastically reduce the ongoing data diff --git a/devtools_options.yaml b/devtools_options.yaml new file mode 100644 index 0000000..fa0b357 --- /dev/null +++ b/devtools_options.yaml @@ -0,0 +1,3 @@ +description: This file stores settings for Dart & Flutter DevTools. +documentation: https://docs.flutter.dev/tools/devtools/extensions#configure-extension-enablement-states +extensions: diff --git a/justfile b/justfile index c421119..7f0ca31 100644 --- a/justfile +++ b/justfile @@ -19,7 +19,7 @@ generate-test-entrypoints: # Runs all tests (with coverage) test: generate-test-entrypoints start-supabase supabase db test - dart test test/_test.dart --test-randomize-ordering-seed=random --coverage coverage + flutter test test/_test.dart --test-randomize-ordering-seed=random --coverage coverage dart run coverage:format_coverage --lcov --report-on lib --check-ignore -i coverage/test/_test.dart.vm.json -o coverage/lcov.info # Start Supabase for local development or testing diff --git a/lib/src/sync_dead_letter_queue.dart b/lib/src/sync_dead_letter_queue.dart new file mode 100644 index 0000000..bb13485 --- /dev/null +++ b/lib/src/sync_dead_letter_queue.dart @@ -0,0 +1,203 @@ +import 'dart:convert'; + +import 'package:drift/drift.dart'; +import 'package:logging/logging.dart'; + +/// Dead Letter Queue for sync errors that cannot be automatically resolved. +/// +/// Items are moved here after multiple failed sync attempts (application errors). +/// Provides persistence and allows manual intervention via admin UI. +class SyncDeadLetterQueue { + SyncDeadLetterQueue(this._database); + + final GeneratedDatabase _database; + final _logger = Logger('SyncDeadLetterQueue'); + + /// Saves a failed sync item to the dead letter queue. + Future saveFailedItem({ + required String tableName, + required String itemId, + required Map itemJson, + required String errorType, + required String errorMessage, + String? stackTrace, + required int retryCount, + }) async { + try { + final now = DateTime.now().millisecondsSinceEpoch; + + await _database.customInsert( + ''' + INSERT OR REPLACE INTO sync_dead_letter_queue_table ( + id, table_name, item_json, error_type, error_message, + retry_count, first_error_at, last_error_at, last_stack_trace, status + ) VALUES (?, ?, ?, ?, ?, ?, + COALESCE((SELECT first_error_at FROM sync_dead_letter_queue_table WHERE id = ?), ?), + ?, ?, 'pending') + ''', + variables: [ + Variable.withString('${tableName}__$itemId'), + Variable.withString(tableName), + Variable.withString(jsonEncode(itemJson)), + Variable.withString(errorType), + Variable.withString(errorMessage), + Variable.withInt(retryCount), + Variable.withString('${tableName}__$itemId'), + Variable.withInt(now), + Variable.withInt(now), + Variable.withString(stackTrace ?? ''), + ], + ); + + _logger.info('Saved failed item to DLQ: $tableName/$itemId'); + } catch (e, s) { + _logger.severe('Failed to save item to dead letter queue: $e\n$s'); + } + } + + /// Retrieves all pending items from the dead letter queue. + Future> getPendingItems() async { + try { + final result = await _database.customSelect(''' + SELECT id, table_name, item_json, error_type, error_message, + retry_count, first_error_at, last_error_at, last_stack_trace, status + FROM sync_dead_letter_queue_table + WHERE status = 'pending' + ORDER BY last_error_at DESC + ''').get(); + + return result.map((row) => DeadLetterItem.fromRow(row)).toList(); + } catch (e, s) { + _logger.severe('Failed to retrieve pending DLQ items: $e\n$s'); + return []; + } + } + + /// Gets count of pending items. + Future getPendingCount() async { + try { + final result = await _database + .customSelect( + 'SELECT COUNT(*) as count FROM sync_dead_letter_queue_table WHERE status = \'pending\'', + ) + .getSingle(); + + return result.read('count'); + } catch (e) { + _logger.severe('Failed to get pending DLQ count: $e'); + return 0; + } + } + + /// Retrieves a failed item's JSON data for retry. + /// + /// ⚠️ IMPORTANT: Does NOT remove the item from DLQ. + /// The caller (SyncService) must call deleteItem() after successful retry + /// to avoid losing data if the retry fails. + /// + /// Returns the parsed item data to be re-queued by the caller. + Future?> retryItem(String itemId) async { + try { + // Retrieve item without deleting it + final result = await _database.customSelect(''' + SELECT item_json FROM sync_dead_letter_queue_table WHERE id = ? + ''', variables: [Variable.withString(itemId)]).getSingleOrNull(); + + if (result == null) { + _logger.warning('DLQ item not found for retry: $itemId'); + return null; + } + + final itemJson = jsonDecode(result.read('item_json')) as Map; + + _logger.info('Retrieved DLQ item for retry: $itemId'); + return itemJson; + } catch (e, s) { + _logger.severe('Failed to retrieve DLQ item for retry: $e\n$s'); + return null; + } + } + + /// Marks an item as ignored (status = 'ignored'). + /// The item stays in DLQ but won't be shown in pending list. + Future ignoreItem(String itemId) async { + try { + await _database.customUpdate( + 'UPDATE sync_dead_letter_queue_table SET status = ? WHERE id = ?', + variables: [ + Variable.withString('ignored'), + Variable.withString(itemId), + ], + ); + + _logger.info('Ignored DLQ item: $itemId'); + return true; + } catch (e, s) { + _logger.severe('Failed to ignore DLQ item: $e\n$s'); + return false; + } + } + + /// Permanently deletes an item from DLQ. + /// Use this when the error is understood and the item should be discarded. + Future deleteItem(String itemId) async { + try { + await _database.customStatement( + 'DELETE FROM sync_dead_letter_queue_table WHERE id = ?', + [itemId], // customStatement expects raw values, not Variable + ); + + _logger.info('Deleted DLQ item: $itemId'); + return true; + } catch (e, s) { + _logger.severe('Failed to delete DLQ item: $e\n$s'); + return false; + } + } +} + +/// Represents an item in the dead letter queue. +class DeadLetterItem { + const DeadLetterItem({ + required this.id, + required this.tableName, + required this.itemJson, + required this.errorType, + required this.errorMessage, + required this.retryCount, + required this.firstErrorAt, + required this.lastErrorAt, + this.lastStackTrace, + required this.status, + }); + + final String id; + final String tableName; + final String itemJson; + final String errorType; + final String errorMessage; + final int retryCount; + final DateTime firstErrorAt; + final DateTime lastErrorAt; + final String? lastStackTrace; + final String status; + + factory DeadLetterItem.fromRow(QueryRow row) { + return DeadLetterItem( + id: row.read('id'), + tableName: row.read('table_name'), + itemJson: row.read('item_json'), + errorType: row.read('error_type'), + errorMessage: row.read('error_message'), + retryCount: row.read('retry_count'), + firstErrorAt: DateTime.fromMillisecondsSinceEpoch( + row.read('first_error_at'), + ), + lastErrorAt: DateTime.fromMillisecondsSinceEpoch( + row.read('last_error_at'), + ), + lastStackTrace: row.readNullable('last_stack_trace'), + status: row.read('status'), + ); + } +} diff --git a/lib/src/sync_error_classifier.dart b/lib/src/sync_error_classifier.dart new file mode 100644 index 0000000..f4e96f4 --- /dev/null +++ b/lib/src/sync_error_classifier.dart @@ -0,0 +1,184 @@ +import 'dart:async'; +import 'dart:io'; +import 'package:supabase/supabase.dart'; + +/// Synchronization error type +enum SyncErrorType { + /// Network error (no connection, timeout, backend down) + /// These errors should be retried indefinitely + network, + + /// Application error (validation, permissions, corrupted data) + /// These errors should be moved to an error queue after N attempts + application, +} + +/// Synchronization error classification +/// +/// This class distinguishes between network errors (temporary, retry indefinitely) +/// and application errors (bugs, validation, handle manually). +class SyncErrorClassifier { + /// Classifies an error as network or application type + /// + /// Network errors: + /// - SocketException + /// - HttpException (connection timeout, refused, etc.) + /// - ClientException with network message + /// - HTTP status codes: 502, 503, 504 (backend down) + /// - Timeout exceptions + /// + /// Application errors: + /// - HTTP status codes: 400, 422 (validation) + /// - HTTP status codes: 403, 401 (permissions) + /// - HTTP status codes: 500 with non-network message + /// - FormatException (JSON parsing) + /// - DatabaseException (local constraints) + static SyncErrorType classify(Object error) { + // 1. Obvious network errors + if (error is SocketException) { + return SyncErrorType.network; + } + + if (error is HttpException) { + return SyncErrorType.network; + } + + // 2. Timeout = network + if (error is TimeoutException) { + return SyncErrorType.network; + } + + // 3. Supabase exceptions + if (error is PostgrestException) { + return _classifyPostgrestException(error); + } + + // 4. Error message analysis + final errorMessage = error.toString().toLowerCase(); + + // Network error patterns + if (_isNetworkErrorMessage(errorMessage)) { + return SyncErrorType.network; + } + + // 5. By default, consider as application error + // (safer to have a false positive application error than to block indefinitely) + return SyncErrorType.application; + } + + /// Classifies a Supabase/Postgrest exception + static SyncErrorType _classifyPostgrestException(PostgrestException error) { + final code = error.code; + final message = error.message.toLowerCase(); + + // Network status codes + if (code == '502' || code == '503' || code == '504') { + return SyncErrorType.network; + } + + // Application status codes + if (code == '400' || code == '422' || code == '401' || code == '403') { + return SyncErrorType.application; + } + + // Message analysis for 500 + if (code == '500') { + if (_isNetworkErrorMessage(message)) { + return SyncErrorType.network; + } + return SyncErrorType.application; + } + + // Specific network messages + if (_isNetworkErrorMessage(message)) { + return SyncErrorType.network; + } + + // Default for Postgrest errors: application + return SyncErrorType.application; + } + + /// Detects if an error message indicates a network problem + static bool _isNetworkErrorMessage(String message) { + final networkPatterns = [ + 'network', + 'connection', + 'timeout', + 'unreachable', + 'refused', + 'socket', + 'dns', + 'host', + 'internet', + 'offline', + 'no connection', + 'could not connect', + 'failed to connect', + 'connection lost', + 'connection reset', + 'connection closed', + 'network is unreachable', + 'no route to host', + 'broken pipe', + 'connection timed out', + 'software caused connection abort', + ]; + + for (final pattern in networkPatterns) { + if (message.contains(pattern)) { + return true; + } + } + + return false; + } + + /// Gets a user-readable error message + static String getUserFriendlyMessage(Object error, SyncErrorType type) { + if (type == SyncErrorType.network) { + return 'Network connection problem. Synchronization will resume automatically.'; + } + + // Application error + if (error is PostgrestException) { + switch (error.code) { + case '400': + case '422': + return 'Invalid data. Please verify your changes.'; + case '401': + case '403': + return 'Access denied. Check your permissions.'; + default: + return 'Synchronization error. The administrator has been notified.'; + } + } + + return 'Synchronization error. The administrator has been notified.'; + } + + /// Gets a technical message for logs + static String getTechnicalMessage(Object error, StackTrace? stackTrace) { + final buffer = StringBuffer(); + + buffer.writeln('Error Type: ${error.runtimeType}'); + buffer.writeln('Error: $error'); + + if (error is PostgrestException) { + buffer.writeln('Code: ${error.code}'); + buffer.writeln('Message: ${error.message}'); + if (error.details != null) { + buffer.writeln('Details: ${error.details}'); + } + if (error.hint != null) { + buffer.writeln('Hint: ${error.hint}'); + } + } + + if (stackTrace != null) { + buffer.writeln('Stack Trace:'); + buffer.writeln(stackTrace.toString()); + } + + return buffer.toString(); + } +} diff --git a/lib/src/sync_event.dart b/lib/src/sync_event.dart new file mode 100644 index 0000000..cf78f00 --- /dev/null +++ b/lib/src/sync_event.dart @@ -0,0 +1,85 @@ +/// The source of a sync event. +enum SyncEventSource { + /// Data received from a realtime subscription to the backend. + realtime, + + /// Data received from a full sync with the backend. + fullSync, +} + +/// The type of sync event. +enum SyncEventType { + /// A sync operation started. + syncStarted, + + /// A sync operation completed. + syncCompleted, +} + +/// An event that occurs during data synchronization. +abstract class SyncEvent { + const SyncEvent({ + required this.type, + required this.syncableType, + required this.source, + required this.timestamp, + }); + + /// The type of sync event. + final SyncEventType type; + + /// The type of syncable that was affected. + final Type syncableType; + + /// The source of the sync event. + final SyncEventSource source; + + /// When the event occurred. + final DateTime timestamp; +} + +/// An event for when a sync operation starts. +class SyncStartedEvent extends SyncEvent { + const SyncStartedEvent({ + required super.syncableType, + required super.source, + required super.timestamp, + required this.reason, + }) : super(type: SyncEventType.syncStarted); + + /// The reason why the sync started. + final String reason; +} + +/// An event for when a sync operation completes. +class SyncCompletedEvent extends SyncEvent { + const SyncCompletedEvent({ + required super.syncableType, + required super.source, + required super.timestamp, + required this.itemsReceived, + required this.itemsUpdated, + required this.itemsDeleted, + }) : super(type: SyncEventType.syncCompleted); + + /// The number of items that were received (inserted) during this sync. + final int itemsReceived; + + /// The number of items that were updated during this sync. + final int itemsUpdated; + + /// The number of items that were deleted during this sync. + final int itemsDeleted; + + /// The total number of items processed during this sync. + int get totalItemsProcessed => itemsReceived + itemsUpdated + itemsDeleted; +} + +/// Callback function for sync events. +typedef SyncEventCallback = void Function(SyncEvent event); + +/// Callback function specifically for sync started events. +typedef SyncStartedEventCallback = void Function(SyncStartedEvent event); + +/// Callback function specifically for sync completed events. +typedef SyncCompletedEventCallback = void Function(SyncCompletedEvent event); diff --git a/lib/src/sync_manager.dart b/lib/src/sync_manager.dart index bd2211e..228ace2 100644 --- a/lib/src/sync_manager.dart +++ b/lib/src/sync_manager.dart @@ -1,15 +1,186 @@ import 'dart:async'; +import 'dart:io'; import 'package:collection/collection.dart'; import 'package:drift/drift.dart'; +import 'package:flutter/foundation.dart'; import 'package:logging/logging.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; +import 'package:syncable/src/sync_dead_letter_queue.dart'; +import 'package:syncable/src/sync_error_classifier.dart'; +import 'package:syncable/src/sync_event.dart'; import 'package:syncable/src/sync_timestamp_storage.dart'; import 'package:syncable/src/syncable.dart'; import 'package:syncable/src/syncable_database.dart'; import 'package:syncable/src/syncable_table.dart'; +// ============= SYNC CONFIGURATION CONSTANTS ============= + +/// Interval at which to perform a safety re-sync from Drift database. +/// Every N loop iterations, all tables are re-fetched from local Drift DB +/// to capture any items that might have been lost from RAM. +const int _DRIFT_RESYNC_INTERVAL = 20; + +/// Interval at which to cleanup error queues to prevent memory leaks. +/// Every N loop iterations, error queues are cleared (errors already persisted in DLQ). +const int _ERROR_QUEUE_CLEANUP_INTERVAL = 100; + +/// Number of consecutive network errors before opening the circuit breaker. +/// After this many network failures, the circuit breaker opens and pauses sync. +const int _CIRCUIT_BREAKER_THRESHOLD = 5; + +/// Duration after which the circuit breaker auto-resets. +/// After this cooldown period, sync attempts resume. +const Duration _CIRCUIT_BREAKER_COOLDOWN = Duration(minutes: 2); + +/// Maximum number of retry attempts before moving an item to the Dead Letter Queue. +/// For application errors: item fails after this many attempts (0, 1, 2 = 3 total). +/// For network errors: retries continue indefinitely (this doesn't apply). +const int _MAX_RETRY_ATTEMPTS = 2; // 0, 1, 2 = 3 total attempts + +/// Callback to notify errors persisted in the Dead Letter Queue. +/// +/// Allows calling code (e.g., SyncService) to send these errors to +/// an external monitoring system (e.g., Sentry) without creating a direct +/// dependency in the syncable package. +/// +/// [tableName] Backend table name (e.g., "competitions", "photos") +/// [itemId] ID of the failed item +/// [itemJson] Complete JSON representation of the item +/// [errorType] Error type: 'network' or 'application' +/// [errorMessage] Readable error message +/// [stackTrace] Error stack trace (may be null) +/// [retryCount] Number of attempts before final failure +typedef OnDLQErrorCallback = + void Function({ + required String tableName, + required String itemId, + required Map itemJson, + required String errorType, + required String errorMessage, + String? stackTrace, + required int retryCount, + }); + +/// Callback to send synchronization breadcrumbs. +/// +/// Allows tracing important sync cycle events to an external monitoring +/// system (e.g., Sentry breadcrumbs). +/// +/// [message] Event description +/// [category] Category (e.g., "sync", "circuit_breaker", "error_recovery") +/// [level] Severity level: "debug", "info", "warning", "error" +/// [data] Additional contextual data +typedef OnSyncBreadcrumbCallback = + void Function({ + required String message, + required String category, + required String level, + Map? data, + }); + +/// Sync mode based on user activity patterns. +/// +/// This is used to adaptively adjust sync intervals to balance +/// battery consumption and responsiveness. +enum SyncMode { + /// User is actively modifying data (last change < 10 seconds). + /// Sync interval: 5 seconds for maximum responsiveness. + active, + + /// Recent modifications but user is no longer actively editing (10s - 2min). + /// Sync interval: 15 seconds for good balance. + recent, + + /// No modifications for > 2 minutes. + /// Sync interval: 30 seconds for battery conservation. + idle, +} + +/// IMPORTANT: Maximum retry counter value to prevent integer overflow. +/// +/// For network errors that retry indefinitely, we cap the counter at this value. +/// This prevents memory issues while maintaining retry behavior. +const int _maxRetryCount = 10000; + +/// Circuit breaker state to prevent spamming the backend during network outages. +/// +/// After detecting multiple consecutive network errors, the circuit breaker +/// enters an "open" state and pauses sync attempts for a cooldown period. +class CircuitBreakerState { + CircuitBreakerState(); + + /// Number of consecutive network errors + int consecutiveNetworkErrors = 0; + + /// Timestamp when the circuit breaker was opened + DateTime? openedAt; + + /// Whether the circuit breaker is currently open (paused) + bool get isOpen { + if (openedAt == null) return false; + + // Auto-reset after cooldown period + const cooldownPeriod = _CIRCUIT_BREAKER_COOLDOWN; + final now = DateTime.now(); + + if (now.difference(openedAt!) > cooldownPeriod) { + // Reset the circuit breaker + reset(); + return false; + } + + return true; + } + + /// Opens the circuit breaker (pauses syncing) + void open({OnSyncBreadcrumbCallback? onBreadcrumb, Logger? logger}) { + openedAt = DateTime.now(); + + // IMPORTANT: Breadcrumb - Circuit breaker opened + try { + onBreadcrumb?.call( + message: + 'Circuit breaker opened after $consecutiveNetworkErrors consecutive network errors', + category: 'circuit_breaker', + level: 'warning', + data: { + 'consecutive_errors': consecutiveNetworkErrors, + 'cooldown_minutes': _CIRCUIT_BREAKER_COOLDOWN.inMinutes, + }, + ); + } catch (e, s) { + logger?.warning('Error in onSyncBreadcrumb callback: $e\n$s'); + } + } + + /// Resets the circuit breaker (resumes syncing) + void reset() { + consecutiveNetworkErrors = 0; + openedAt = null; + } + + /// Records a network error and opens circuit if threshold is reached + void recordNetworkError({ + OnSyncBreadcrumbCallback? onBreadcrumb, + Logger? logger, + }) { + consecutiveNetworkErrors++; + + // Open circuit after threshold is reached + if (consecutiveNetworkErrors >= _CIRCUIT_BREAKER_THRESHOLD) { + open(onBreadcrumb: onBreadcrumb, logger: logger); + } + } + + /// Records a successful sync (resets error counter and closes circuit breaker) + void recordSuccess() { + consecutiveNetworkErrors = 0; + openedAt = null; // IMPORTANT: Close circuit breaker on success + } +} + /// The [SyncManager] is the main class for syncing data between a local Drift /// database and a Supabase backend. /// @@ -19,7 +190,10 @@ import 'package:syncable/src/syncable_table.dart'; /// /// The [SyncManager] is designed to be used with the [SyncableDatabase] /// class, which provides the local database functionality. -class SyncManager { +/// +/// The [SyncManager] extends [ChangeNotifier] to provide a simple way to listen +/// for sync state changes via the [syncInProgress] property. +class SyncManager extends ChangeNotifier { /// Creates a new [SyncManager] instance. /// /// The [localDatabase] parameter is required and must be an instance of @@ -49,6 +223,14 @@ class SyncManager { /// in combination with [lastTimeOtherDeviceWasActive] to determine whether /// other devices are currently active or not. A real-time subscription to /// the backend is only created if other devices are considered active. + /// + /// For simple use cases, you can listen to sync state changes using the + /// [syncInProgress] property and [ChangeNotifier] interface. + /// + /// For advanced use cases, set [enableDetailedEvents] to `true` and provide + /// [onSyncStarted] and [onSyncCompleted] callbacks. These will be called when + /// sync events occur, allowing your application to respond to detailed + /// synchronization events per syncable type. SyncManager({ required T localDatabase, required SupabaseClient supabaseClient, @@ -56,12 +238,22 @@ class SyncManager { int maxRows = 1000, SyncTimestampStorage? syncTimestampStorage, Duration otherDevicesConsideredInactiveAfter = const Duration(minutes: 2), + bool enableDetailedEvents = false, + SyncStartedEventCallback? onSyncStarted, + SyncCompletedEventCallback? onSyncCompleted, + OnDLQErrorCallback? onDLQError, + OnSyncBreadcrumbCallback? onSyncBreadcrumb, }) : _localDb = localDatabase, _supabaseClient = supabaseClient, _syncInterval = syncInterval, _maxRows = maxRows, _syncTimestampStorage = syncTimestampStorage, _devicesConsideredInactiveAfter = otherDevicesConsideredInactiveAfter, + _enableDetailedEvents = enableDetailedEvents, + _onSyncStarted = enableDetailedEvents ? onSyncStarted : null, + _onSyncCompleted = enableDetailedEvents ? onSyncCompleted : null, + _onDLQError = onDLQError, + _onSyncBreadcrumb = onSyncBreadcrumb, assert( syncInterval.inMilliseconds > 0, 'Sync interval must be positive', @@ -72,10 +264,25 @@ class SyncManager { final T _localDb; final SupabaseClient _supabaseClient; final SyncTimestampStorage? _syncTimestampStorage; - final Duration _syncInterval; final int _maxRows; + final Duration _syncInterval; final Duration _devicesConsideredInactiveAfter; + // 🔴 NEW: Dead Letter Queue for persistent error storage + // Initialized in enableSync() to ensure database is ready + SyncDeadLetterQueue? _deadLetterQueue; + + // Enable detailed events (disabled by default for simplicity) + final bool _enableDetailedEvents; + + // Callback functions for sync events + final SyncStartedEventCallback? _onSyncStarted; + final SyncCompletedEventCallback? _onSyncCompleted; + + // Callback functions for monitoring integration (Sentry, etc.) + final OnDLQErrorCallback? _onDLQError; + final OnSyncBreadcrumbCallback? _onSyncBreadcrumb; + /// This is what gets set when [enableSync] gets called. Internally, whether /// the syncing is enabled or not is determined by [_syncingEnabled]. bool __syncingEnabled = false; @@ -83,6 +290,37 @@ class SyncManager { bool get _syncingEnabled => __syncingEnabled && !_disposed && userId.isNotEmpty; + /// Whether a sync is currently in progress. + /// + /// This is a simple boolean that tracks if any sync operation is currently + /// running. Use this with [ChangeNotifier] to listen for sync state changes. + /// For more granular sync events per syncable type, set [enableDetailedEvents] + /// to `true` and use the callback functions. + bool _syncInProgress = false; + bool get syncInProgress => _syncInProgress; + + /// Sets the sync in progress state and notifies listeners. + void _setSyncInProgress(bool inProgress) { + if (_syncInProgress != inProgress) { + _syncInProgress = inProgress; + notifyListeners(); + } + } + + // ============= ADAPTIVE SYNC INTERVAL FIELDS ============= + + /// Timestamp of the last detected local change. + /// Used to determine the current sync mode (active/recent/idle). + DateTime? _lastChangeDetected; + + /// Completer used to interrupt the sync loop sleep when immediate sync is needed. + /// This allows waking up the loop before the normal interval expires. + Completer? _syncTrigger; + + /// Counter for sync loop iterations. + /// Used to trigger periodic safety checks (e.g., re-sync from Drift every N iterations). + int _loopIterationCounter = 0; + /// Enables syncing for all registered syncables. /// /// This method will throw an exception if no syncables are registered. @@ -100,6 +338,9 @@ class SyncManager { ); } + // IMPORTANT: Initialize Dead Letter Queue + _deadLetterQueue = SyncDeadLetterQueue(_localDb); + __syncingEnabled = true; _startLoop(); _onDependenciesChanged('syncing enabled'); @@ -160,13 +401,29 @@ class SyncManager { final Map> _inQueues = {}; final Map> _outQueues = {}; + // IMPORTANT: Error queues for application errors (moved after N retries) + final Map> _errorQueues = {}; + + // IMPORTANT: Permanent tracking of item IDs that failed with application errors + // This prevents re-injection after cleanup. Items stay here until manual resolution. + final Map> _permanentErrorItemIds = {}; + + // IMPORTANT: Retry counters for each item (key: "tableName:itemId") + final Map _retryCounters = {}; + + // IMPORTANT: Circuit breaker state (per table) + final Map _circuitBreakers = {}; + + // Track sync source for incoming items + final Map> _incomingSources = {}; + final Map> _sentItems = {}; final Map> _receivedItems = {}; final Map>> _localSubscriptions = {}; - RealtimeChannel? _backendSubscription; - bool get isSubscribedToBackend => _backendSubscription != null; + final Map _backendSubscriptions = {}; + bool get isSubscribedToBackend => _backendSubscriptions.isNotEmpty; /// The number of items of type [syncable] that have been synced to the /// backend. @@ -181,12 +438,78 @@ class SyncManager { int _nFullSyncs = 0; int get nFullSyncs => _nFullSyncs; + /// Provides access to the Dead Letter Queue for viewing/managing sync errors. + /// Returns null if sync is not enabled yet. + SyncDeadLetterQueue? get deadLetterQueue => _deadLetterQueue; + + /// Returns a map of syncable types to their backend table names. + /// Useful for displaying table names in the UI. + Map get backendTableNames => Map.unmodifiable(_backendTables); + + /// Returns a map of syncable types to their local table info. + /// Useful for getting detailed table metadata. + Map> get localTables => + Map.unmodifiable(_localTables); + + /// Returns the count of items waiting to be uploaded for each syncable type. + /// Key: Syncable type, Value: Number of pending items + Map get uploadQueueSizes { + final result = {}; + for (final type in _syncables) { + result[type] = _outQueues[type]?.length ?? 0; + } + return result; + } + + /// Returns the count of items in error state for each syncable type. + /// Key: Syncable type, Value: Number of items in error + Map get errorQueueSizes { + final result = {}; + for (final type in _syncables) { + result[type] = _errorQueues[type]?.length ?? 0; + } + return result; + } + + /// Returns the circuit breaker state for each syncable type. + /// Circuit breakers open after 5 consecutive network errors and close after 2 minutes. + Map get circuitBreakers => + Map.unmodifiable(_circuitBreakers); + + /// Returns whether each syncable type has an active realtime subscription. + Map get hasActiveRealtimeSubscription { + final result = {}; + for (final type in _syncables) { + final tableName = _backendTables[type]; + if (tableName != null) { + result[type] = _backendSubscriptions.containsKey(tableName); + } else { + result[type] = false; + } + } + return result; + } + + @override void dispose() { _disposed = true; for (final subscription in _localSubscriptions.values) { subscription.cancel(); } - _backendSubscription?.unsubscribe(); + // Unsubscribe from all channels + for (final subscription in _backendSubscriptions.values) { + subscription.unsubscribe(); + } + _backendSubscriptions.clear(); + + // IMPORTANT: Cleanup error management structures + _errorQueues.clear(); + _permanentErrorItemIds.clear(); + _retryCounters.clear(); + _circuitBreakers.clear(); + _deadLetterQueue = null; // Allow GC to collect + + super.dispose(); } /// Registers a syncable table with the sync manager. @@ -230,15 +553,72 @@ class SyncManager { _companions[S] = companionConstructor; _inQueues[S] = {}; _outQueues[S] = {}; + _errorQueues[S] = {}; // IMPORTANT: Initialize error queue + _permanentErrorItemIds[S] = + {}; // IMPORTANT: Initialize permanent error tracking + _circuitBreakers[S] = + CircuitBreakerState(); // IMPORTANT: Initialize circuit breaker + _incomingSources[S] = {}; _sentItems[S] = {}; _receivedItems[S] = {}; } Future _startLoop() async { + if (_loopRunning) { + _logger.warning('Sync loop already running, skipping start'); + return; + } _loopRunning = true; - _logger.info('Sync loop started'); + _logger.info('Sync loop started with adaptive intervals'); + + // IMPORTANT: Breadcrumb - Sync loop started + try { + _onSyncBreadcrumb?.call( + message: 'Sync loop started with adaptive intervals', + category: 'sync', + level: 'info', + ); + } catch (e, s) { + _logger.warning('Error in onSyncBreadcrumb callback: $e\n$s'); + } while (!_disposed) { + final iterationStart = DateTime.now(); + _loopIterationCounter++; + _logger.fine('Sync loop iteration #$_loopIterationCounter starting'); + + // SUCCESS: Periodic safety check - Re-sync from Drift every N iterations + // This captures any items that might have been lost from RAM + if (_loopIterationCounter % _DRIFT_RESYNC_INTERVAL == 0) { + _logger.info( + 'Periodic safety check (iteration #$_loopIterationCounter): re-syncing from Drift', + ); + try { + for (final syncable in _syncables) { + if (_disposed) break; + + // Retrieve local items from Drift + final localItems = await _localDb + .select(_localTables[syncable]!) + .get(); + + // Push to outQueue only items from current user + _pushLocalChangesToOutQueue( + syncable, + localItems.where((i) => i.userId == _userId), + ); + } + } catch (e, s) { + _logger.severe('Error during periodic Drift re-sync: $e\n$s'); + } + } + + // IMPORTANT: Cleanup error queues periodically to prevent memory leak + // Errors are already persisted in DLQ (SQLite), we can clear RAM + if (_loopIterationCounter % _ERROR_QUEUE_CLEANUP_INTERVAL == 0) { + _cleanupErrorQueues(); + } + try { for (final syncable in _syncables) { if (_disposed) break; @@ -262,7 +642,25 @@ class SyncManager { } if (_disposed) break; - await Future.delayed(_syncInterval); + + // PERFORMANCE: Determine current mode and interval for adaptive sync + final currentMode = _getCurrentMode(); + final interval = _getIntervalForMode(currentMode); + + _logger.fine( + 'Loop iteration complete, sleeping for ${interval.inSeconds}s (mode: $currentMode)', + ); + + // Create a new completer for the next potential interruption + _syncTrigger = Completer(); + + // Wait for EITHER the timeout OR an immediate sync trigger + await Future.any([Future.delayed(interval), _syncTrigger!.future]); + + final actualWaitTime = DateTime.now().difference(iterationStart); + _logger.fine( + 'Woke up after ${actualWaitTime.inSeconds}s (expected: ${interval.inSeconds}s)', + ); } _loopRunning = false; @@ -342,16 +740,151 @@ class SyncManager { void _pushLocalChangesToOutQueue(Type syncable, Iterable rows) { final outQueue = _outQueues[syncable]!; final receivedItems = _receivedItems[syncable]!; + final errorQueue = _errorQueues[syncable]!; + final permanentErrorIds = _permanentErrorItemIds[syncable]!; bool updateHasNotBeenSentYet(Syncable row) => row.updatedAt.isAfter(outQueue[row.id]?.updatedAt ?? DateTime(0)) && row.updatedAt.isAfter(_lastPushedTimestamp(syncable) ?? DateTime(0)); + bool hasNewItems = false; + for (final row in rows .where((r) => !receivedItems.contains(r)) .where(updateHasNotBeenSentYet)) { + // IMPORTANT: Check permanent error tracking first (survives cleanup) + // This prevents re-injection of items that failed even after errorQueue is cleared + if (permanentErrorIds.contains(row.id)) { + // Check if user modified the item (updatedAt changed) + final errorItem = errorQueue[row.id]; // May be null if cleanup happened + + // If item is in errorQueue, check if it was modified + if (errorItem != null && row.updatedAt.isAfter(errorItem.updatedAt)) { + _logger.info( + 'Item ${row.id} was in permanent error tracking but has been modified locally ' + '(${errorItem.updatedAt} → ${row.updatedAt}) - giving it a second chance', + ); + + // IMPORTANT: Breadcrumb - Second chance for item + final backendTable = _backendTables[syncable]!; + try { + _onSyncBreadcrumb?.call( + message: 'Item modified after error - giving second chance', + category: 'error_recovery', + level: 'info', + data: { + 'table': backendTable, + 'item_id': row.id, + 'previous_update': errorItem.updatedAt.toIso8601String(), + 'new_update': row.updatedAt.toIso8601String(), + }, + ); + } catch (e, s) { + _logger.warning('Error in onSyncBreadcrumb callback: $e\n$s'); + } + + errorQueue.remove(row.id); + permanentErrorIds.remove(row.id); + // Also clear its retry counter to start fresh + final retryKey = '$backendTable:${row.id}'; + _retryCounters.remove(retryKey); + } else { + // Item still in permanent error tracking, skip it + _logger.fine( + 'Skipping item ${row.id} - in permanent error tracking (needs manual resolution or modification)', + ); + continue; + } + } + + // Legacy check for errorQueue (redundant but kept for safety) + if (errorQueue.containsKey(row.id)) { + final errorItem = errorQueue[row.id]!; + // If updatedAt changed, user made changes → give it a second chance + if (row.updatedAt.isAfter(errorItem.updatedAt)) { + _logger.info( + 'Item ${row.id} was in error queue but has been modified locally ' + '(${errorItem.updatedAt} → ${row.updatedAt}) - giving it a second chance', + ); + errorQueue.remove(row.id); + permanentErrorIds.remove( + row.id, + ); // Also remove from permanent tracking + // Also clear its retry counter to start fresh + final backendTable = _backendTables[syncable]!; + final retryKey = '$backendTable:${row.id}'; + _retryCounters.remove(retryKey); + } else { + // Same version still in error queue, skip it + _logger.fine( + 'Skipping item ${row.id} - still in error queue (unmodified)', + ); + continue; + } + } + outQueue[row.id] = row; + hasNewItems = true; + } + + // PERFORMANCE: Adaptive sync - detect changes and wake up the loop + if (hasNewItems) { + // Update the last change timestamp + _lastChangeDetected = DateTime.now(); + + // Always trigger immediate sync when local changes are detected + // This ensures tests and real-world usage get fast sync response + _logger.info('Local changes detected - triggering immediate sync'); + _triggerImmediateSync(); + } + } + + /// IMPORTANT: Cleanup error queues to prevent memory leak + /// + /// Error items are already persisted in Dead Letter Queue (SQLite). + /// We can safely clear them from RAM since they're not actively retrying. + /// Also cleans retry counters for cleaned items. + /// + /// Note: We do NOT reset circuit breakers here. Circuit breakers manage + /// their own state and auto-reset after cooldown period. Resetting them during + /// cleanup could cause issues if network errors are still ongoing in outQueue. + void _cleanupErrorQueues() { + try { + int totalCleared = 0; + int totalRetryCountersCleared = 0; + + for (final syncable in _syncables) { + final backendTable = _backendTables[syncable]!; + final errorQueue = _errorQueues[syncable]; + + if (errorQueue != null && errorQueue.isNotEmpty) { + // Clean retry counters for items in error queue + for (final itemId in errorQueue.keys) { + final retryKey = '$backendTable:$itemId'; + if (_retryCounters.remove(retryKey) != null) { + totalRetryCountersCleared++; + } + } + + final count = errorQueue.length; + errorQueue.clear(); + totalCleared += count; + + // IMPORTANT: Do NOT reset circuit breaker here + // Circuit breaker has its own auto-reset logic (cooldown period) + // Resetting it here could interfere with network error handling + } + } + + if (totalCleared > 0) { + _logger.info( + 'Cleaned $totalCleared items from error queues and ' + '$totalRetryCountersCleared retry counters (persisted in DLQ)', + ); + } + } catch (e, s) { + _logger.warning('Failed to cleanup error queues: $e\n$s'); } } @@ -359,13 +892,13 @@ class SyncManager { final otherDevicesActive = _otherDevicesActive(); if (!_syncingEnabled || !otherDevicesActive) { - if (_backendSubscription != null) { - _backendSubscription?.unsubscribe(); - _backendSubscription = null; + // Unsubscribe from all existing channels + for (final subscription in _backendSubscriptions.values) { + subscription.unsubscribe(); } + _backendSubscriptions.clear(); String reason; - if (!__syncingEnabled) { reason = 'syncing is disabled'; } else if (userId.isEmpty) { @@ -377,26 +910,70 @@ class SyncManager { } _logger.warning('Not subscribed to backend changes because $reason'); - return; } - if (_backendSubscription != null) { - return; + if (_backendSubscriptions.isNotEmpty) { + return; // Already subscribed } - _backendSubscription = _supabaseClient.channel('backend_changes'); - + // Create a dedicated channel for each table for (final syncable in _syncables) { - _backendSubscription?.onPostgresChanges( + final tableName = _backendTables[syncable]!; + final channelName = 'sync_$tableName'; + + _logger.info('Creating Realtime subscription for table: $tableName'); + + final channel = _supabaseClient.channel(channelName); + + channel.onPostgresChanges( schema: publicSchema, - table: _backendTables[syncable], + table: tableName, event: PostgresChangeEvent.all, callback: (p) { - if (_disposed) return; + if (_disposed) { + return; + } + if (p.newRecord.isNotEmpty) { - final item = _fromJsons[syncable]!(p.newRecord); - _inQueues[syncable]!.add(item); + try { + final item = _fromJsons[syncable]!(p.newRecord); + final timestamp = DateTime.now().millisecondsSinceEpoch; + _logger.fine( + 'REALTIME [$timestamp]: Received item ${item.id} for $syncable', + ); + + _inQueues[syncable]!.add(item); + _logger.fine( + 'REALTIME: Queue size after add: ${_inQueues[syncable]!.length}', + ); + + if (_enableDetailedEvents) { + _incomingSources[syncable]![item.id] = SyncEventSource.realtime; + } + + // PERFORMANCE: Process immediately instead of waiting for sync loop + // Uses unawaited to not block the Realtime callback + _processIncomingImmediate(syncable) + .then((_) { + final endTimestamp = DateTime.now().millisecondsSinceEpoch; + final latency = endTimestamp - timestamp; + _logger.fine('REALTIME: Processed in ${latency}ms'); + }) + .catchError((e, stackTrace) { + _logger.severe( + 'Error in immediate processing for $tableName: $e', + e, + stackTrace as StackTrace?, + ); + }); + } catch (e, stack) { + _logger.severe( + 'Error processing Realtime event for $tableName: $e', + e, + stack, + ); + } } }, filter: PostgresChangeFilter( @@ -405,17 +982,23 @@ class SyncManager { value: _userId, ), ); - } - _backendSubscription?.subscribe((status, error) { - if (error != null) { - // coverage:ignore-start - _logger.severe('Backend subscription error: $error'); - // coverage:ignore-end - } - }); + // Subscribe to the channel + channel.subscribe((status, error) { + if (error != null) { + _logger.severe('Realtime subscription error for $tableName: $error'); + } else if (status == RealtimeSubscribeStatus.subscribed) { + _logger.info('Realtime subscription active for $tableName'); + } + }); - _logger.info('Subscribed to backend changes'); + // Store the channel for later cleanup + _backendSubscriptions[tableName] = channel; + } + + _logger.info( + 'Subscribed to backend changes for ${_syncables.length} tables', + ); } /// Syncs all tables registered with the sync manager. @@ -430,24 +1013,108 @@ class SyncManager { await _syncTables('Manual sync'); } + /// Process all pending incoming data immediately. + /// + /// This method processes any data that has been fetched from the backend + /// but not yet written to the local database. This is useful for ensuring + /// data is immediately available in the UI after a sync operation. + /// + /// Normally, incoming data is processed in the main sync loop which runs + /// at regular intervals. This method allows you to bypass that wait. + Future processIncomingImmediately() async { + if (!__syncingEnabled) { + _logger.warning('Cannot process incoming data - syncing is disabled'); + return; + } + + if (userId.isEmpty) { + _logger.warning('Cannot process incoming data - user ID is empty'); + return; + } + + _logger.info('Processing incoming data immediately'); + + // Process all pending incoming data for each syncable + for (final syncable in _syncables) { + await _processIncoming(syncable); + } + + _logger.info('Immediate incoming data processing complete'); + } + Future _syncTables(String reason) async { + _logger.fine('_syncTables called - reason: $reason'); + if (!__syncingEnabled) { + _logger.warning('_syncTables aborted - syncing disabled'); _logger.warning('Tables not getting synced because syncing is disabled'); return; } if (userId.isEmpty) { + _logger.warning('_syncTables aborted - userId empty'); _logger.warning('Tables not getting synced because user ID is empty'); return; } _logger.info('Syncing all tables. Reason: $reason'); - for (final syncable in _syncables) { - await _syncTable(syncable); - } + // Set sync in progress and notify listeners + _setSyncInProgress(true); + + try { + // Emit sync started events (only if detailed events are enabled) + if (_enableDetailedEvents) { + for (final syncable in _syncables) { + if (_onSyncStarted != null) { + final event = SyncStartedEvent( + syncableType: syncable, + source: SyncEventSource.fullSync, + timestamp: DateTime.now().toUtc(), + reason: reason, + ); + _onSyncStarted(event); + } + } + } + + // Track initial queue sizes to detect if items were added during sync + final initialQueueSizes = {}; + for (final syncable in _syncables) { + initialQueueSizes[syncable] = _inQueues[syncable]!.length; + } + + for (final syncable in _syncables) { + await _syncTable(syncable); + } + + // Emit fallback sync completed events for tables that didn't get any new items + // Real events with statistics are emitted in _processIncoming + if (_enableDetailedEvents) { + for (final syncable in _syncables) { + final initialSize = initialQueueSizes[syncable]!; + final currentSize = _inQueues[syncable]!.length; + + // Only emit fallback event if no items were added during sync + if (currentSize == initialSize && _onSyncCompleted != null) { + final event = SyncCompletedEvent( + syncableType: syncable, + source: SyncEventSource.fullSync, + timestamp: DateTime.now().toUtc(), + itemsReceived: 0, + itemsUpdated: 0, + itemsDeleted: 0, + ); + _onSyncCompleted(event); + } + } + } - _nFullSyncs++; + _nFullSyncs++; + } finally { + // Clear sync in progress and notify listeners + _setSyncInProgress(false); + } } Future _syncTable(Type syncable) async { @@ -491,14 +1158,44 @@ class SyncManager { // Use batches because all the UUIDs make the URI become too long otherwise. for (final batch in itemsToPull.slices(100)) { if (!_syncingEnabled) return; - final pulledBatch = await _supabaseClient - .from(_backendTables[syncable]!) - .select() - .eq(userIdKey, _userId) - .inFilter(idKey, batch) - .then((data) => data.map(_fromJsons[syncable]!)); - _inQueues[syncable]!.addAll(pulledBatch); + try { + final pulledBatch = await _supabaseClient + .from(_backendTables[syncable]!) + .select() + .eq(userIdKey, _userId) + .inFilter(idKey, batch) + .then((data) => data.map(_fromJsons[syncable]!)); + + _logger.fine( + 'Adding ${pulledBatch.length} items to queue for $syncable', + ); + _inQueues[syncable]!.addAll(pulledBatch); + _logger.fine( + 'Queue size after add: ${_inQueues[syncable]!.length} for $syncable', + ); + // Mark these as full sync items (only if detailed events are enabled) + if (_enableDetailedEvents) { + for (final item in pulledBatch) { + _incomingSources[syncable]![item.id] = SyncEventSource.fullSync; + } + } + } on SocketException catch (e) { + _logger.warning( + 'Network error during item pull for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on network error + } on HttpException catch (e) { + _logger.warning( + 'HTTP error during item pull for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on HTTP error + } catch (e) { + _logger.severe( + 'Unexpected error during item pull for ${_backendTables[syncable]}: $e', + ); + break; // Exit loop on any other error + } } _updateLastPulledTimeStamp(syncable, DateTime.now().toUtc()); @@ -520,35 +1217,77 @@ class SyncManager { return false; } + /// Quick connectivity check to prevent network calls when offline + Future _hasNetworkConnectivity() async { + try { + final result = await InternetAddress.lookup( + 'google.com', + ).timeout(const Duration(seconds: 2)); + return result.isNotEmpty && result[0].rawAddress.isNotEmpty; + } on SocketException catch (_) { + return false; + } catch (_) { + return false; + } + } + /// Retrieves the IDs and `lastUpdatedAt` timestamps for all rows of a /// syncable in the backend. These can be used to determine which items need /// to be synced from the backend. Future>> _fetchBackendItemMetadata( Type syncable, ) async { + // 🛡️ OFFLINE PROTECTION: Check network connectivity before making requests + // This prevents ClientSocketException crashes when offline + final hasNetwork = await _hasNetworkConnectivity(); + if (!hasNetwork) { + _logger.warning( + 'Skipping backend metadata fetch for ${_backendTables[syncable]} - ' + 'no network connectivity detected', + ); + return []; // Return empty list instead of crashing + } + final List> backendItems = []; int offset = 0; bool hasMore = true; while (hasMore && _syncingEnabled) { - final batch = await _supabaseClient - .from(_backendTables[syncable]!) - .select('$idKey,$updatedAtKey') - .eq(userIdKey, _userId) - .range(offset, offset + _maxRows - 1) - // Use consistent ordering to prevent duplicates - .order(idKey, ascending: true); - - backendItems.addAll(batch); - hasMore = batch.length == _maxRows; - offset += _maxRows; - - if (batch.isNotEmpty) { - _logger.info( - 'Fetched batch of ${batch.length} metadata items for table ' - "'${_backendTables[syncable]!}', total so far: ${backendItems.length}", + try { + final batch = await _supabaseClient + .from(_backendTables[syncable]!) + .select('$idKey,$updatedAtKey') + .eq(userIdKey, _userId) + .range(offset, offset + _maxRows - 1) + // Use consistent ordering to prevent duplicates + .order(idKey, ascending: true); + + backendItems.addAll(batch); + hasMore = batch.length == _maxRows; + offset += _maxRows; + + if (batch.isNotEmpty) { + _logger.info( + 'Fetched batch of ${batch.length} metadata items for table ' + "'${_backendTables[syncable]!}', total so far: ${backendItems.length}", + ); + } + } on SocketException catch (e) { + _logger.warning( + 'Network error during metadata fetch for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on network error to prevent crashes + } on HttpException catch (e) { + _logger.warning( + 'HTTP error during metadata fetch for ${_backendTables[syncable]}: ${e.message}', + ); + break; // Exit loop on HTTP error + } catch (e) { + _logger.severe( + 'Unexpected error during metadata fetch for ${_backendTables[syncable]}: $e', ); + break; // Exit loop on any other error } } @@ -575,76 +1314,378 @@ class SyncManager { Future _processOutgoing(Type syncable) async { final outQueue = _outQueues[syncable]!; + final errorQueue = _errorQueues[syncable]!; final backendTable = _backendTables[syncable]!; final sentItems = _sentItems[syncable]!; + final circuitBreaker = _circuitBreakers[syncable]!; - while (_syncingEnabled && outQueue.isNotEmpty) { - final outgoing = Set.from( - outQueue.values.where((f) => f.userId == _userId), + // IMPORTANT: Check circuit breaker before attempting sync + if (circuitBreaker.isOpen) { + _logger.warning( + 'Circuit breaker OPEN for $backendTable - skipping sync ' + '(${circuitBreaker.consecutiveNetworkErrors} consecutive network errors)', ); - outQueue.clear(); + return; // Skip this table entirely when circuit is open + } - if (outgoing.isEmpty) continue; + if (outQueue.isEmpty) return; - _logger.info( - 'Syncing ${outgoing.length} items to backend table $backendTable', - ); + _logger.info( + 'Processing ${outQueue.length} outgoing items for $backendTable', + ); - assert(!outgoing.any((s) => s.userId?.isEmpty ?? true)); + // IMPORTANT: Process items one by one to avoid blocking entire queue on single error + final itemsToProcess = outQueue.values.toList(); - await _supabaseClient - .from(backendTable) - .upsert( - outgoing.map((x) => x.toJson()).toList(), - onConflict: '$idKey,$userIdKey', - ); + for (final item in itemsToProcess) { + if (!_syncingEnabled) break; + if (item.userId != _userId) continue; // Skip items from other users + + final retryKey = '$backendTable:${item.id}'; + final retryCount = _retryCounters[retryKey] ?? 0; - sentItems.addAll(outgoing); + // IMPORTANT: Skip items that are in error queue (application errors) + if (errorQueue.containsKey(item.id)) { + _logger.fine('Skipping item ${item.id} - already in error queue'); + continue; + } - _nSyncedToBackend[syncable] = - nSyncedToBackend(syncable) + outgoing.length; + try { + // Try to upsert this single item + await _supabaseClient.from(backendTable).upsert([ + item.toJson(), + ], onConflict: idKey); + + // SUCCESS: Remove from queue and reset counters + outQueue.remove(item.id); + _retryCounters.remove(retryKey); + circuitBreaker.recordSuccess(); // Reset circuit breaker on success + + sentItems.add(item); + _nSyncedToBackend[syncable] = nSyncedToBackend(syncable) + 1; + + // Update timestamp + if (_lastPushedTimestamp(syncable) == null || + item.updatedAt.isAfter(_lastPushedTimestamp(syncable)!)) { + await _updateLastPushedTimestamp(syncable, item.updatedAt); + } - final lastUpdatedAtForThisBatch = outgoing.map((r) => r.updatedAt).max; + _logger.fine('Successfully synced item ${item.id} to $backendTable'); + } catch (e, stackTrace) { + // IMPORTANT: Classify the error + final errorType = SyncErrorClassifier.classify(e); - if (_lastPushedTimestamp(syncable) == null || - lastUpdatedAtForThisBatch.isAfter(_lastPushedTimestamp(syncable)!)) { - await _updateLastPushedTimestamp(syncable, lastUpdatedAtForThisBatch); + _logger.warning( + 'Error syncing item ${item.id} to $backendTable ' + '(retry #$retryCount, type: $errorType): $e', + ); + + if (errorType == SyncErrorType.network) { + // ============ NETWORK ERROR ============ + // Keep in queue, retry indefinitely, don't block other items + // IMPORTANT: Cap retry counter to prevent overflow + _retryCounters[retryKey] = (retryCount + 1).clamp(0, _maxRetryCount); + circuitBreaker.recordNetworkError( + onBreadcrumb: _onSyncBreadcrumb, + logger: _logger, + ); + + _logger.info( + 'Network error for item ${item.id} - keeping in queue (retry #${retryCount + 1})', + ); + + // CRITICAL: Use continue, NOT break! + // This allows processing other items even if one fails + continue; + } else { + // ============ APPLICATION ERROR ============ + // Move to error queue after N retries + // IMPORTANT: Cap retry counter to prevent overflow + _retryCounters[retryKey] = (retryCount + 1).clamp(0, _maxRetryCount); + + if (retryCount >= _MAX_RETRY_ATTEMPTS) { + // Max attempts reached (e.g., 0, 1, 2 = 3 total attempts) + // Move to error queue + errorQueue[item.id] = item; + outQueue.remove(item.id); + _retryCounters.remove(retryKey); + + // IMPORTANT: Track permanently to prevent re-injection after cleanup + final permanentErrorIds = _permanentErrorItemIds[syncable]!; + permanentErrorIds.add(item.id); + + _logger.severe( + 'APPLICATION ERROR for item ${item.id} after ${retryCount + 1} attempts - ' + 'moved to error queue and permanent error tracking. Error: $e\n' + 'Stack trace: $stackTrace', + ); + + // IMPORTANT: Save to Dead Letter Queue (SQLite) + if (_deadLetterQueue != null) { + await _deadLetterQueue!.saveFailedItem( + tableName: backendTable, + itemId: item.id, + itemJson: item.toJson(), + errorType: errorType.name, // Use .name instead of .toString() to get "application" or "network" + errorMessage: e.toString(), + stackTrace: stackTrace.toString(), + retryCount: retryCount + 1, + ); + } else { + _logger.warning( + 'Dead Letter Queue not initialized - error not persisted to DB', + ); + } + + // IMPORTANT: Breadcrumb - Item moved to DLQ + try { + _onSyncBreadcrumb?.call( + message: + 'Item moved to Dead Letter Queue after ${retryCount + 1} failed attempts', + category: 'sync', + level: 'error', + data: { + 'table': backendTable, + 'item_id': item.id, + 'error_type': errorType.toString(), + 'retry_count': retryCount + 1, + }, + ); + } catch (callbackError, callbackStack) { + _logger.warning( + 'Error in onSyncBreadcrumb callback: $callbackError\n$callbackStack', + ); + } + + // IMPORTANT: Notify monitoring system (Sentry) via callback + // Only notify for application errors (not network errors) + try { + _onDLQError?.call( + tableName: backendTable, + itemId: item.id, + itemJson: item.toJson(), + errorType: errorType.toString(), + errorMessage: e.toString(), + stackTrace: stackTrace.toString(), + retryCount: retryCount + 1, + ); + } catch (callbackError, callbackStack) { + _logger.severe( + 'Error in onDLQError callback: $callbackError\n$callbackStack', + ); + } + } else { + _logger.warning( + 'Application error for item ${item.id} - will retry (attempt ${retryCount + 1}/${_MAX_RETRY_ATTEMPTS + 1})', + ); + } + + // CRITICAL: Use continue, NOT break! + continue; + } } } + + // Log final status + if (outQueue.isNotEmpty) { + _logger.info( + '📊 $backendTable: ${outQueue.length} items remaining in queue, ' + '${errorQueue.length} in error queue', + ); + } } Future _processIncoming(Type syncable) async { final inQueue = _inQueues[syncable]!; + final backendTable = _backendTables[syncable]!; if (inQueue.isEmpty) return; + _logger.fine( + '📥 Processing ${inQueue.length} incoming items for $backendTable', + ); + final sentItems = _sentItems[syncable]!; final receivedItems = _receivedItems[syncable]!; final itemsToWrite = {}; + var syncSource = SyncEventSource.fullSync; // Default for (final item in inQueue) { - // Skip if already processed - if (sentItems.contains(item) || receivedItems.contains(item)) { + // Skip only if already received from backend + if (receivedItems.contains(item)) { + _logger.fine('⏭️ Skipping already received item: ${item.id}'); continue; } + + // Log if item was sent locally but is now being received from backend + if (sentItems.contains(item)) { + _logger.fine( + '🔄 Processing server confirmation for locally sent item: ${item.id}', + ); + } else { + _logger.fine('✅ Adding new item from backend: ${item.id}'); + } + itemsToWrite[item.id] = item; + + // Use the first item's source as the batch source (only if detailed events are enabled) + if (_enableDetailedEvents && itemsToWrite.length == 1) { + final incomingSources = _incomingSources[syncable]!; + syncSource = incomingSources[item.id] ?? SyncEventSource.fullSync; + } } + // Clear queue BEFORE attempting writes + // If writes fail, items will need to be re-fetched from backend inQueue.clear(); - await _batchWriteIncoming(syncable, itemsToWrite); + // Clean up source tracking for processed items (only if detailed events are enabled) + if (_enableDetailedEvents) { + final incomingSources = _incomingSources[syncable]!; + for (final itemId in itemsToWrite.keys) { + incomingSources.remove(itemId); + } + } + + if (itemsToWrite.isEmpty) return; + + // IMPORTANT: Try to write items with error handling + try { + final writeStats = await _batchWriteIncoming(syncable, itemsToWrite); + + receivedItems.addAll(itemsToWrite.values); + _nSyncedFromBackend[syncable] = + nSyncedFromBackend(syncable) + itemsToWrite.length; + + _logger.info( + 'Successfully wrote ${itemsToWrite.length} items from backend to local DB ' + '(${writeStats.itemsInserted} inserted, ${writeStats.itemsUpdated} updated)', + ); + + // Emit sync completed event with real statistics (only if detailed events are enabled) + if (_enableDetailedEvents && _onSyncCompleted != null) { + final event = SyncCompletedEvent( + syncableType: syncable, + source: syncSource, + timestamp: DateTime.now().toUtc(), + itemsReceived: writeStats.itemsInserted, + itemsUpdated: writeStats.itemsUpdated, + itemsDeleted: 0, // Not implemented yet + ); + _onSyncCompleted(event); + } + } catch (e, stackTrace) { + // IMPORTANT: Handle errors during local database writes + final errorType = SyncErrorClassifier.classify(e); + + _logger.severe( + 'ERROR: Writing ${itemsToWrite.length} items to local DB for $backendTable ' + '(type: $errorType): $e\n' + 'Stack trace: $stackTrace', + ); + + // For incoming data errors, we log but don't retry automatically + // The data will be re-fetched on the next full sync + // This is safer than risking data corruption or infinite loops - receivedItems.addAll(itemsToWrite.values); - _nSyncedFromBackend[syncable] = - nSyncedFromBackend(syncable) + itemsToWrite.length; + if (errorType == SyncErrorType.application) { + _logger.severe( + 'APPLICATION ERROR writing incoming data - this may indicate ' + 'database corruption or schema mismatch. Items will be re-fetched on next sync.', + ); + // Note: Monitoring systems should be configured to capture severe log errors + } + } + } + + /// Processes incoming items immediately for a specific syncable type. + /// + /// This method is called when items arrive via Realtime subscriptions to + /// provide instant UI updates instead of waiting for the next sync loop iteration. + /// + /// Unlike [_processIncoming], this only processes items for one syncable type + /// and is designed to be called asynchronously without blocking the Realtime callback. + Future _processIncomingImmediate(Type syncable) async { + if (!_syncingEnabled) { + _logger.fine('Skipping immediate processing - syncing disabled'); + return; + } + + _logger.fine( + 'IMMEDIATE processing triggered for $syncable', + ); + + // Process this specific syncable immediately + await _processIncoming(syncable); + + _logger.fine( + 'IMMEDIATE processing completed for $syncable', + ); + } + + // ============= ADAPTIVE SYNC HELPER METHODS ============= + + /// Determines the current sync mode based on the time since last change. + /// + /// - ACTIVE: Last change < 10 seconds (sync every 5s) + /// - RECENT: Last change between 10s and 2 minutes (sync every 15s) + /// - IDLE: No change for > 2 minutes (sync every 30s) + SyncMode _getCurrentMode() { + if (_lastChangeDetected == null) { + return SyncMode.idle; + } + + final timeSinceLastChange = DateTime.now().difference(_lastChangeDetected!); + + if (timeSinceLastChange < const Duration(seconds: 10)) { + return SyncMode.active; + } else if (timeSinceLastChange < const Duration(minutes: 2)) { + return SyncMode.recent; + } else { + return SyncMode.idle; + } } - Future _batchWriteIncoming( + /// Returns the sync interval for the given mode. + /// + /// If a custom syncInterval was provided (not the default 1s), it will be used + /// instead of the adaptive intervals. This ensures backward compatibility with + /// tests and allows users to override the adaptive behavior. + Duration _getIntervalForMode(SyncMode mode) { + // Use custom interval if explicitly provided (not the default) + if (_syncInterval != const Duration(seconds: 1)) { + return _syncInterval; + } + + // Otherwise use adaptive intervals based on activity + switch (mode) { + case SyncMode.active: + return const Duration(seconds: 5); + case SyncMode.recent: + return const Duration(seconds: 15); + case SyncMode.idle: + return const Duration(seconds: 30); + } + } + + /// Triggers an immediate sync by completing the sync trigger. + /// + /// This wakes up the sync loop before the normal interval expires, + /// allowing for faster sync when changes are detected. + void _triggerImmediateSync() { + if (_syncTrigger != null && !_syncTrigger!.isCompleted) { + _syncTrigger!.complete(); + } + } + + Future _batchWriteIncoming( Type syncable, Map incomingItems, ) async { - if (incomingItems.isEmpty) return; + if (incomingItems.isEmpty) { + return const WriteStats(itemsInserted: 0, itemsUpdated: 0); + } final table = _localTables[syncable]! as TableInfo; @@ -662,9 +1703,17 @@ class SyncManager { for (final incomingItem in incomingItems.values) { final existingUpdatedAt = existingItems[incomingItem.id]; if (existingUpdatedAt == null) { + _logger.fine('Inserting new item: ${incomingItem.id}'); itemsToInsert.add(incomingItem.toCompanion()); } else if (incomingItem.updatedAt.isAfter(existingUpdatedAt)) { + _logger.fine( + 'Updating item: ${incomingItem.id} (${incomingItem.updatedAt} > $existingUpdatedAt)', + ); itemsToReplace.add(incomingItem.toCompanion()); + } else { + _logger.fine( + 'SKIPPING item: ${incomingItem.id} - incoming: ${incomingItem.updatedAt}, existing: $existingUpdatedAt', + ); } } @@ -672,6 +1721,11 @@ class SyncManager { batch.insertAll(table, itemsToInsert); batch.replaceAll(table, itemsToReplace); }); + + return WriteStats( + itemsInserted: itemsToInsert.length, + itemsUpdated: itemsToReplace.length, + ); } DateTime? _lastPushedTimestamp(Type syncable) { @@ -724,6 +1778,134 @@ class SyncManager { return DateTime.now().difference(lastTimeOtherDeviceWasActive!) < _devicesConsideredInactiveAfter; } + + /// Clears all internal sync state collections to ensure a clean sync state. + /// + /// This should be called during user authentication changes to prevent + /// newly synchronized items from being treated as "already processed" + /// due to persistent state from previous sync sessions. + void clearSyncState() { + _logger.info('Clearing sync state collections for clean authentication'); + + // Clear tracking collections for all syncable types + for (final syncable in _syncables) { + _inQueues[syncable]?.clear(); + _outQueues[syncable]?.clear(); + _incomingSources[syncable]?.clear(); + _sentItems[syncable]?.clear(); + _receivedItems[syncable]?.clear(); + + // IMPORTANT: Clear error management structures + _errorQueues[syncable]?.clear(); + _permanentErrorItemIds[syncable]?.clear(); + _circuitBreakers[syncable]?.reset(); + } + + // Reset sync counters + _nSyncedToBackend.clear(); + _nSyncedFromBackend.clear(); + + // IMPORTANT: Clear retry counters + _retryCounters.clear(); + + _logger.info( + 'Sync state cleared successfully (including error queues, permanent error tracking, and retry counters)', + ); + } + + /// ======================================== + /// Dead Letter Queue (DLQ) Retry Method + /// ======================================== + + /// Retries a failed item by re-submitting it to the upload queue. + /// + /// [tableName] Backend table name (e.g., "sponsors", "competitions") + /// [itemJson] JSON representation of the item to retry + /// + /// This method is called by SyncService.retryDLQItem() after retrieving + /// the item from the Dead Letter Queue. + /// + /// The item is automatically parsed, added to the upload queue, and the + /// sync cycle will retry sending it to the backend. + Future retryDLQItem( + String tableName, + Map itemJson, + ) async { + try { + // Find the Syncable type corresponding to the backend table name + Type? syncableType; + for (final type in _syncables) { + if (_backendTables[type] == tableName) { + syncableType = type; + break; + } + } + + if (syncableType == null) { + _logger.warning( + 'Cannot retry DLQ item - table not registered: $tableName', + ); + return; + } + + // Parse JSON into typed Syncable object + final fromJson = _fromJsons[syncableType]; + if (fromJson == null) { + _logger.warning( + 'Cannot retry DLQ item - fromJson not found for type: $syncableType', + ); + return; + } + + final syncableItem = fromJson(itemJson); + + // Add item to upload queue + final outQueue = _outQueues[syncableType]; + if (outQueue == null) { + _logger.warning( + 'Cannot retry DLQ item - outQueue not found for type: $syncableType', + ); + return; + } + + // Clean previous errors for this item + _errorQueues[syncableType]?.remove(syncableItem.id); + _permanentErrorItemIds[syncableType]?.remove(syncableItem.id); + + // Reset retry counter + final retryKey = '$tableName:${syncableItem.id}'; + _retryCounters.remove(retryKey); + + // Add to upload queue + outQueue[syncableItem.id] = syncableItem; + + _logger.info( + 'DLQ item re-queued for upload: $tableName/${syncableItem.id}', + ); + + // Breadcrumb to trace the retry + try { + _onSyncBreadcrumb?.call( + message: 'DLQ item retried and re-queued', + category: 'dlq', + level: 'info', + data: { + 'table': tableName, + 'item_id': syncableItem.id, + }, + ); + } catch (e, s) { + _logger.warning('Error in onSyncBreadcrumb callback: $e\n$s'); + } + + // Trigger immediate synchronization + _lastChangeDetected = DateTime.now(); + notifyListeners(); + } catch (e, s) { + _logger.severe('Failed to retry DLQ item: $e\n$s'); + rethrow; + } + } } typedef CompanionConstructor = @@ -742,3 +1924,11 @@ enum TimestampType { const TimestampType(this.name); final String name; } + +/// Statistics returned by batch write operations. +class WriteStats { + const WriteStats({required this.itemsInserted, required this.itemsUpdated}); + + final int itemsInserted; + final int itemsUpdated; +} diff --git a/lib/syncable.dart b/lib/syncable.dart index 11f9ca3..201a8c2 100644 --- a/lib/syncable.dart +++ b/lib/syncable.dart @@ -1,6 +1,9 @@ /// Syncable is a library for offline-first multi-device data synchronization in Flutter apps./// library; +export 'package:syncable/src/sync_dead_letter_queue.dart'; +export 'package:syncable/src/sync_error_classifier.dart'; +export 'package:syncable/src/sync_event.dart'; export 'package:syncable/src/sync_manager.dart'; export 'package:syncable/src/sync_timestamp_storage.dart'; export 'package:syncable/src/syncable.dart'; diff --git a/pubspec.yaml b/pubspec.yaml index c802bfa..f8768c9 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -10,6 +10,8 @@ environment: dependencies: collection: ^1.19.0 drift: ^2.26.0 + flutter: + sdk: flutter logging: ^1.3.0 supabase: ^2.6.3 @@ -17,6 +19,8 @@ dev_dependencies: build_runner: ^2.4.15 drift_dev: ^2.26.0 equatable: ^2.0.7 + flutter_test: + sdk: flutter http: ^1.3.0 json_annotation: ^4.9.0 json_serializable: ^6.9.4 diff --git a/test/integration_test.dart b/test/integration_test.dart index a3526f1..669ced1 100644 --- a/test/integration_test.dart +++ b/test/integration_test.dart @@ -5,7 +5,7 @@ import 'package:drift/native.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; import 'package:syncable/syncable.dart'; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; @@ -219,11 +219,14 @@ void main() { }); }); - test('Reading from backend uses paging', () async { - // The maximum number of rows returned from a query in Supabase is limited, - // so syncing more items than that requires paging. + test( + 'Reading from backend uses paging', + () async { + // The maximum number of rows returned from a query in Supabase is limited, + // so syncing more items than that requires paging. + // This test creates 1001 items and can take time on slower machines - const maxRows = 1000; // Defined in `supabase/config.toml` + const maxRows = 1000; // Defined in `supabase/config.toml` await supabaseClient.auth.signInAnonymously(); @@ -276,6 +279,7 @@ void main() { syncManager.enableSync(); // Wait for items to sync to local database + // This test syncs 1001 items which can take time, especially on slower machines await waitForFunctionToPass(() async { await testDb.select(testDb.items).get().then((localItems) { expect( @@ -283,8 +287,10 @@ void main() { equals(List.generate(maxRows + 1, (i) => i.toString()).toSet()), ); }); - }, timeout: const Duration(seconds: 30)); - }); + }, timeout: const Duration(seconds: 60)); + }, + timeout: const Timeout(Duration(minutes: 2)), + ); test( 'Local database rejects items from backend with old modification dates', diff --git a/test/sync_events_test.dart b/test/sync_events_test.dart new file mode 100644 index 0000000..8bf46cd --- /dev/null +++ b/test/sync_events_test.dart @@ -0,0 +1,253 @@ +import 'dart:convert'; + +import 'package:drift/drift.dart' as drift; +import 'package:drift/native.dart' as drift_native; +import 'package:http/http.dart'; +import 'package:mockito/mockito.dart'; +import 'package:supabase/supabase.dart'; +import 'package:syncable/src/supabase_names.dart'; +import 'package:syncable/syncable.dart'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:uuid/uuid.dart'; + +import 'utils/test_database.dart'; +import 'utils/test_mocks.mocks.dart'; +import 'utils/test_supabase_names.dart'; +import 'utils/wait_for_function_to_pass.dart'; + +void main() { + late TestDatabase testDb; + late MockSupabaseClient mockSupabaseClient; + late MockSupabaseQueryBuilder mockQueryBuilder; + late MockClient mockHttpClient; + + setUp(() { + testDb = TestDatabase( + drift.DatabaseConnection( + drift_native.NativeDatabase.memory(), + closeStreamsSynchronously: true, + ), + ); + + // Set up mocks for Supabase + mockSupabaseClient = MockSupabaseClient(); + mockQueryBuilder = MockSupabaseQueryBuilder(); + mockHttpClient = MockClient(); + + when( + mockSupabaseClient.from(itemsTable), + ).thenAnswer((_) => mockQueryBuilder); + when( + mockQueryBuilder.upsert(any, onConflict: anyNamed('onConflict')), + ).thenAnswer( + (_) => PostgrestFilterBuilder( + PostgrestBuilder( + url: Uri(), + headers: {}, + method: 'POST', + httpClient: mockHttpClient, + ), + ), + ); + when( + mockHttpClient.post( + any, + headers: anyNamed('headers'), + body: anyNamed('body'), + ), + ).thenAnswer( + (_) async => Response( + jsonEncode([ + {idKey: 'abc'}, + ]), + 200, + request: Request('POST', Uri()), + ), + ); + when(mockQueryBuilder.select(any)).thenAnswer( + (_) => PostgrestFilterBuilder( + PostgrestBuilder( + url: Uri(), + headers: {}, + method: 'GET', + httpClient: mockHttpClient, + ), + ), + ); + when(mockHttpClient.get(any, headers: anyNamed('headers'))).thenAnswer( + (_) async => + Response(jsonEncode([]), 200, request: Request('GET', Uri())), + ); + + // Set up mocks for real-time + final mockRealtimeChannel = MockRealtimeChannel(); + when(mockSupabaseClient.channel(any)).thenReturn(mockRealtimeChannel); + when( + mockRealtimeChannel.onPostgresChanges( + schema: anyNamed('schema'), + table: anyNamed('table'), + event: anyNamed('event'), + callback: anyNamed('callback'), + ), + ).thenReturn(mockRealtimeChannel); + when( + mockRealtimeChannel.subscribe(), + ).thenAnswer((_) => mockRealtimeChannel); + }); + + tearDown(() async { + await testDb.close(); + }); + + test('SyncManager calls onSyncStarted callback when sync begins', () async { + final syncStartedEvents = []; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) { + syncStartedEvents.add(event); + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(syncStartedEvents.length, greaterThanOrEqualTo(1)); + }); + + final event = syncStartedEvents.first; + expect(event.type, equals(SyncEventType.syncStarted)); + expect(event.syncableType, equals(Item)); + expect(event.source, equals(SyncEventSource.fullSync)); + expect(event.reason, isNotEmpty); + }); + + test( + 'SyncManager calls onSyncCompleted callback when sync completes', + () async { + final syncCompletedEvents = []; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncCompleted: (event) { + syncCompletedEvents.add(event); + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(syncCompletedEvents.length, greaterThanOrEqualTo(1)); + }); + + final event = syncCompletedEvents.first; + expect(event.type, equals(SyncEventType.syncCompleted)); + expect(event.syncableType, equals(Item)); + expect(event.source, equals(SyncEventSource.fullSync)); + }, + ); + + test('Callback parameters are correctly typed', () async { + // Test that callback parameter types are working correctly + SyncStartedEvent? startedEvent; + SyncCompletedEvent? completedEvent; + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) { + startedEvent = event; + }, + onSyncCompleted: (event) { + completedEvent = event; + }, + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(startedEvent, isNotNull); + expect(completedEvent, isNotNull); + }); + + expect(startedEvent?.reason, isNotEmpty); + expect( + completedEvent?.totalItemsProcessed, + equals(0), + ); // No items from mock + }); + + test('Sync events contain proper timestamps', () async { + final events = []; + final testStartTime = DateTime.now().toUtc(); + + final syncManager = SyncManager( + localDatabase: testDb, + supabaseClient: mockSupabaseClient, + syncInterval: const Duration(milliseconds: 1), + enableDetailedEvents: true, + onSyncStarted: (event) => events.add(event), + onSyncCompleted: (event) => events.add(event), + ); + + syncManager.registerSyncable( + backendTable: itemsTable, + fromJson: Item.fromJson, + companionConstructor: ItemsCompanion.new, + ); + + final userId = const Uuid().v4(); + + syncManager.enableSync(); + syncManager.setUserId(userId); + + await waitForFunctionToPass(() async { + expect(events.length, greaterThanOrEqualTo(1)); + }); + + for (final event in events) { + expect(event.timestamp.isAfter(testStartTime), isTrue); + expect( + event.timestamp.isBefore( + DateTime.now().toUtc().add(const Duration(seconds: 1)), + ), + isTrue, + ); + } + }); +} diff --git a/test/sync_manager_test.dart b/test/sync_manager_test.dart index cb8f294..7731ede 100644 --- a/test/sync_manager_test.dart +++ b/test/sync_manager_test.dart @@ -7,7 +7,7 @@ import 'package:mockito/mockito.dart'; import 'package:supabase/supabase.dart'; import 'package:syncable/src/supabase_names.dart'; import 'package:syncable/syncable.dart'; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; diff --git a/test/syncable_database_test.dart b/test/syncable_database_test.dart index 2d559c0..f3afa87 100644 --- a/test/syncable_database_test.dart +++ b/test/syncable_database_test.dart @@ -2,7 +2,7 @@ import 'dart:async'; import 'package:drift/drift.dart' as drift; import 'package:drift/native.dart' as drift_native; -import 'package:test/test.dart'; +import 'package:flutter_test/flutter_test.dart'; import 'package:uuid/uuid.dart'; import 'utils/test_database.dart'; diff --git a/test/utils/test_database.g.dart b/test/utils/test_database.g.dart index 4e82331..4aee127 100644 --- a/test/utils/test_database.g.dart +++ b/test/utils/test_database.g.dart @@ -125,7 +125,7 @@ class $ItemsTable extends Items with TableInfo<$ItemsTable, Item> { @override Item map(Map data, {String? tablePrefix}) { final effectivePrefix = tablePrefix != null ? '$tablePrefix.' : ''; - return Item( + return Item.new( id: attachedDatabase.typeMapping.read( DriftSqlType.string, data['${effectivePrefix}id'],