From 6de5ac4be76afc1a3106eec9510b5c3911d2a358 Mon Sep 17 00:00:00 2001 From: Anilcan Cakir Date: Sun, 5 Apr 2026 23:34:58 +0300 Subject: [PATCH 1/3] feat(broadcasting): Echo facade, BroadcastManager, Reverb driver, and full testing support (#37) Laravel Echo equivalent for real-time WebSocket channels. Pusher-compatible ReverbBroadcastDriver with automatic reconnection (exponential backoff), event deduplication (ring buffer), application-level heartbeat, and interceptor pipeline. NullBroadcastDriver for local dev. Echo.fake() for testing with assertion helpers. BroadcastServiceProvider (opt-in). CLI install stubs with --without-broadcasting flag. --- .claude/rules/broadcasting.md | 104 +++ CHANGELOG.md | 5 + README.md | 3 +- doc/digging-deeper/broadcasting.md | 511 ++++++++++ doc/getting-started/installation.md | 4 +- doc/packages/magic-cli.md | 1 + example/pubspec.lock | 16 + lib/config/broadcasting.dart | 43 + lib/magic.dart | 15 + .../broadcast_connection_state.dart | 18 + lib/src/broadcasting/broadcast_event.dart | 34 + lib/src/broadcasting/broadcast_manager.dart | 89 ++ .../broadcast_service_provider.dart | 57 ++ .../contracts/broadcast_channel.dart | 39 + .../contracts/broadcast_driver.dart | 75 ++ .../contracts/broadcast_interceptor.dart | 39 + .../contracts/broadcast_presence_channel.dart | 31 + .../drivers/null_broadcast_driver.dart | 129 +++ .../drivers/reverb_broadcast_driver.dart | 819 ++++++++++++++++ lib/src/facades/echo.dart | 195 ++++ lib/src/foundation/application.dart | 4 +- lib/src/testing/fake_broadcast_manager.dart | 211 +++++ pubspec.yaml | 1 + skills/magic-framework/SKILL.md | 5 +- .../references/cli-commands.md | 2 + .../references/secondary-systems.md | 159 ++++ test/broadcasting/broadcast_event_test.dart | 98 ++ test/broadcasting/broadcast_manager_test.dart | 279 ++++++ .../broadcast_service_provider_test.dart | 129 +++ .../drivers/null_broadcast_driver_test.dart | 121 +++ .../drivers/reverb_broadcast_driver_test.dart | 874 ++++++++++++++++++ test/broadcasting/echo_facade_test.dart | 108 +++ test/testing/fake_broadcast_manager_test.dart | 378 ++++++++ 33 files changed, 4591 insertions(+), 5 deletions(-) create mode 100644 .claude/rules/broadcasting.md create mode 100644 doc/digging-deeper/broadcasting.md create mode 100644 lib/config/broadcasting.dart create mode 100644 lib/src/broadcasting/broadcast_connection_state.dart create mode 100644 lib/src/broadcasting/broadcast_event.dart create mode 100644 lib/src/broadcasting/broadcast_manager.dart create mode 100644 lib/src/broadcasting/broadcast_service_provider.dart create mode 100644 lib/src/broadcasting/contracts/broadcast_channel.dart create mode 100644 lib/src/broadcasting/contracts/broadcast_driver.dart create mode 100644 lib/src/broadcasting/contracts/broadcast_interceptor.dart create mode 100644 lib/src/broadcasting/contracts/broadcast_presence_channel.dart create mode 100644 lib/src/broadcasting/drivers/null_broadcast_driver.dart create mode 100644 lib/src/broadcasting/drivers/reverb_broadcast_driver.dart create mode 100644 lib/src/facades/echo.dart create mode 100644 lib/src/testing/fake_broadcast_manager.dart create mode 100644 test/broadcasting/broadcast_event_test.dart create mode 100644 test/broadcasting/broadcast_manager_test.dart create mode 100644 test/broadcasting/broadcast_service_provider_test.dart create mode 100644 test/broadcasting/drivers/null_broadcast_driver_test.dart create mode 100644 test/broadcasting/drivers/reverb_broadcast_driver_test.dart create mode 100644 test/broadcasting/echo_facade_test.dart create mode 100644 test/testing/fake_broadcast_manager_test.dart diff --git a/.claude/rules/broadcasting.md b/.claude/rules/broadcasting.md new file mode 100644 index 0000000..6579bf5 --- /dev/null +++ b/.claude/rules/broadcasting.md @@ -0,0 +1,104 @@ +# Broadcasting Domain + +- `Echo` facade proxies to `BroadcastManager` bound at `'broadcasting'` in the IoC container +- `BroadcastServiceProvider` is **NOT** auto-registered — add explicitly to `providers` list, same as `EncryptionServiceProvider` +- Bootstrap: `BroadcastServiceProvider.register()` binds `BroadcastManager` singleton; `boot()` auto-connects unless default is `'null'` + +## Echo Facade API + +- `Echo.channel(name)` — public channel (no auth) +- `Echo.private(name)` — private channel (driver adds `private-` prefix, performs HTTP auth) +- `Echo.join(name)` — presence channel (driver adds `presence-` prefix, auth + member tracking) +- `Echo.listen(channel, event, callback)` — shorthand for `channel().listen()` +- `Echo.leave(name)` — unsubscribe from channel +- `Echo.connect()` / `Echo.disconnect()` — connection lifecycle +- `Echo.socketId` — server-assigned ID; `null` when disconnected +- `Echo.connectionState` — `Stream` (connecting/connected/disconnected/reconnecting) +- `Echo.onReconnect` — `Stream` emits once per successful reconnect +- `Echo.addInterceptor(interceptor)` — attach interceptor to default connection +- `Echo.manager` — direct `BroadcastManager` access (for `extend()`) +- `Echo.fake()` / `Echo.unfake()` — test double swap + +## BroadcastChannel Contract + +- `channel.listen(event, callback)` — returns `this` (chainable) +- `channel.stopListening(event)` — remove named listener +- `channel.events` — raw `Stream` for all events +- `channel.name` — fully-qualified name as sent to server + +## BroadcastPresenceChannel Contract + +Extends `BroadcastChannel` with: +- `channel.members` — `List>` (immutable snapshot) +- `channel.onJoin` — `Stream>` member join events +- `channel.onLeave` — `Stream>` member leave events + +## BroadcastManager + +- `BroadcastManager.extend(name, factory)` — register custom driver; `factory` receives `Map` config +- `BroadcastManager.resetDrivers()` — clear custom drivers (testing only) +- `manager.connection([name])` — resolve named or default driver; default is cached after first resolution +- Config key: `broadcasting.default` for default connection name, `broadcasting.connections.{name}` for per-connection config + +## BroadcastDriver Contract + +Abstract interface all drivers must implement: `connect()`, `disconnect()`, `socketId`, `isConnected`, `connectionState`, `onReconnect`, `channel(name)`, `private(name)`, `join(name)`, `leave(name)`, `addInterceptor(interceptor)`. + +## BroadcastInterceptor Contract + +All hooks have pass-through defaults — override only what you need: +- `onSend(Map message) => message` — called before outbound message; return empty map to suppress +- `onReceive(BroadcastEvent event) => event` — called on inbound event; return modified event +- `onError(dynamic error) => error` — called on driver error; return replacement to recover + +Register via `Echo.addInterceptor()` or `driver.addInterceptor()` in a ServiceProvider `boot()`. + +## ReverbBroadcastDriver + +- Implements Pusher-compatible WebSocket protocol (Laravel Reverb, Soketi, etc.) +- `channelFactory` constructor DI parameter overrides WebSocket creation — use for testing without a real server +- Auto-reconnection: exponential backoff `min(500ms × 2^attempt, max_reconnect_delay)` — set `reconnect: false` to disable +- Pusher error codes: 4000–4099 = fatal (no reconnect), 4100–4199 = immediate, 4200–4299 = backoff +- Deduplication: ring buffer of size `dedup_buffer_size` (default 100) fingerprints — suppresses duplicate events on reconnect +- Heartbeat: responds to `pusher:ping` with `pusher:pong` automatically +- Private/presence auth: HTTP POST to `auth_endpoint` with `{socket_id, channel_name}` — expects `{auth: '...'}` response + +## NullBroadcastDriver + +- Silently no-ops all operations — used for local dev or when `broadcasting.default` is `'null'` +- `BroadcastServiceProvider.boot()` skips `connect()` when default connection is `'null'` + +## FakeBroadcastManager (Testing) + +- `Echo.fake()` — binds `FakeBroadcastManager` in container; returns the fake for assertions +- `Echo.unfake()` — removes fake binding (or use `MagicApp.reset()` + `Magic.flush()` in `setUp()`) +- Assertions: `assertConnected()`, `assertDisconnected()`, `assertSubscribed(channel)`, `assertNotSubscribed(channel)`, `assertInterceptorAdded()` — all throw `AssertionError` with descriptive messages +- `fake.reset()` — clear all recorded state +- `fake.driver` — access underlying `FakeBroadcastDriver` for low-level inspection (`.subscribedChannels`, `.addedInterceptors`, `.isConnected`) + +## Config + +```dart +// config/broadcasting.dart +final broadcastingConfig = { + 'broadcasting': { + 'default': Env.get('BROADCAST_CONNECTION', 'null'), + 'connections': { + 'reverb': { + 'driver': 'reverb', + 'host': Env.get('REVERB_HOST', 'localhost'), + 'port': int.parse(Env.get('REVERB_PORT', '8080')!), + 'scheme': Env.get('REVERB_SCHEME', 'ws'), + 'app_key': Env.get('REVERB_APP_KEY', ''), + 'auth_endpoint': '/broadcasting/auth', + 'reconnect': true, + 'max_reconnect_delay': 30000, + 'dedup_buffer_size': 100, + }, + 'null': {'driver': 'null'}, + }, + }, +}; +``` + +Use `configFactories` (not `configs`) when config values depend on `Env.get()`. diff --git a/CHANGELOG.md b/CHANGELOG.md index 915a464..6b47a58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. +## [Unreleased] + +### ✨ Features +- **Broadcasting**: `Echo` facade, `BroadcastManager`, `ReverbBroadcastDriver` (Pusher-compatible WebSocket with reconnection, dedup, heartbeat), `NullBroadcastDriver`, `BroadcastInterceptor` pipeline, `FakeBroadcastManager`, `BroadcastServiceProvider`. Laravel Echo equivalent for real-time channels. (#37) + ## [1.0.0-alpha.6] - 2026-04-05 ### ✨ Features diff --git a/README.md b/README.md index fd311bf..430525d 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,8 @@ final user = await User.find(1); | 💾 | **Caching** | Memory and file drivers with TTL and `remember()` | | 🌍 | **Localization** | JSON-based i18n with `:attribute` placeholders | | 🎨 | **Wind UI** | Built-in Tailwind CSS-like styling with `className` syntax | -| 🧪 | **Testing** | Laravel-style `Http.fake()`, `Auth.fake()`, `Cache.fake()`, `Vault.fake()`, `Log.fake()` — no mockito needed | +| 📡 | **Broadcasting** | Laravel Echo equivalent — real-time WebSocket channels via `Echo` facade, `ReverbBroadcastDriver`, presence channels, and `Echo.fake()` for testing | +| 🧪 | **Testing** | Laravel-style `Http.fake()`, `Auth.fake()`, `Cache.fake()`, `Vault.fake()`, `Log.fake()`, `Echo.fake()` — no mockito needed | | 🧰 | **Magic CLI** | Artisan-style code generation: `magic make:model`, `magic make:controller` | ## Quick Start diff --git a/doc/digging-deeper/broadcasting.md b/doc/digging-deeper/broadcasting.md new file mode 100644 index 0000000..58f90a7 --- /dev/null +++ b/doc/digging-deeper/broadcasting.md @@ -0,0 +1,511 @@ +# Broadcasting + +- [Introduction](#introduction) +- [Configuration](#configuration) + - [Connection Options](#connection-options) + - [Environment Variables](#environment-variables) +- [Echo Facade](#echo-facade) + - [API Reference](#api-reference) +- [Channels](#channels) + - [Public Channels](#public-channels) + - [Private Channels](#private-channels) + - [Presence Channels](#presence-channels) +- [Interceptors](#interceptors) + - [Creating a Custom Interceptor](#creating-a-custom-interceptor) + - [Registering Interceptors](#registering-interceptors) +- [Custom Drivers](#custom-drivers) + - [Implementing a Custom Driver](#implementing-a-custom-driver) + - [Registering the Custom Driver](#registering-the-custom-driver) +- [Testing](#testing) + - [Using FakeBroadcastManager](#using-fakebroadcastmanager) + - [Assertion Helpers](#assertion-helpers) +- [Connection](#connection) + - [Connection Lifecycle](#connection-lifecycle) + - [Reconnection and Heartbeat](#reconnection-and-heartbeat) + - [Deduplication](#deduplication) + + +## Introduction + +Magic provides a Laravel Echo-equivalent broadcasting system that lets your Flutter app receive real-time events over WebSocket connections. The `Echo` facade mirrors the Laravel Echo JavaScript client API — if you know how to use Laravel Echo, you already know how to use this. + +The broadcasting system is: +- **Pusher-compatible**: Works with Laravel Reverb, Soketi, and any Pusher-protocol server out of the box. +- **Resilient**: Automatic reconnection with exponential backoff, application-level heartbeat, and event deduplication. +- **Extensible**: Register custom drivers via `BroadcastManager.extend()`. +- **Testable**: `Echo.fake()` swaps the real driver for an in-memory fake with assertion helpers. + +The `BroadcastServiceProvider` is **not** auto-registered. You must add it explicitly to your providers list. + + +## Configuration + +Copy `defaultBroadcastingConfig` into your application config and register `BroadcastServiceProvider`: + +```dart +// lib/config/broadcasting.dart +import 'package:magic/magic.dart'; + +final Map broadcastingConfig = { + 'broadcasting': { + 'default': Env.get('BROADCAST_CONNECTION', 'null'), + + 'connections': { + 'reverb': { + 'driver': 'reverb', + 'host': Env.get('REVERB_HOST', 'localhost'), + 'port': int.parse(Env.get('REVERB_PORT', '8080')!), + 'scheme': Env.get('REVERB_SCHEME', 'ws'), + 'app_key': Env.get('REVERB_APP_KEY', ''), + 'auth_endpoint': '/broadcasting/auth', + 'reconnect': true, + 'max_reconnect_delay': 30000, + 'activity_timeout': 120, + 'dedup_buffer_size': 100, + }, + 'null': {'driver': 'null'}, + }, + }, +}; +``` + +Register the provider in `Magic.init()`: + +```dart +await Magic.init( + configFactories: [ + () => appConfig, + () => broadcastingConfig, + ], + configs: [ + { + 'app': { + 'providers': [ + (app) => BroadcastServiceProvider(app), + ], + }, + }, + ], +); +``` + + +### Connection Options + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `driver` | `String` | — | Driver name (`reverb`, `null`, or custom) | +| `host` | `String` | `'localhost'` | WebSocket server hostname | +| `port` | `int` | `8080` | WebSocket server port | +| `scheme` | `String` | `'ws'` | Connection scheme (`ws` or `wss`) | +| `app_key` | `String` | `''` | Reverb/Pusher application key | +| `auth_endpoint` | `String` | `'/broadcasting/auth'` | HTTP endpoint for private/presence channel auth | +| `reconnect` | `bool` | `true` | Whether to auto-reconnect on unexpected disconnect | +| `max_reconnect_delay` | `int` | `30000` | Maximum backoff delay in milliseconds | +| `activity_timeout` | `int` | `120` | Seconds before a heartbeat ping is expected | +| `dedup_buffer_size` | `int` | `100` | Number of recent event fingerprints kept for deduplication | + + +### Environment Variables + +```dotenv +BROADCAST_CONNECTION=reverb +REVERB_HOST=localhost +REVERB_PORT=8080 +REVERB_SCHEME=ws +REVERB_APP_KEY=your-app-key +``` + + +## Echo Facade + +The `Echo` facade provides static access to the broadcasting system, proxying all calls to the bound `BroadcastManager`. + + +### API Reference + +| Method / Property | Returns | Description | +|:------------------|:--------|:------------| +| `Echo.channel(name)` | `BroadcastChannel` | Subscribe to a public channel | +| `Echo.private(name)` | `BroadcastChannel` | Subscribe to a private channel (auth required) | +| `Echo.join(name)` | `BroadcastPresenceChannel` | Join a presence channel (auth + member tracking) | +| `Echo.listen(channel, event, callback)` | `BroadcastChannel` | Shorthand: subscribe + listen in one call | +| `Echo.leave(name)` | `void` | Unsubscribe from a channel | +| `Echo.connect()` | `Future` | Establish the WebSocket connection | +| `Echo.disconnect()` | `Future` | Close the connection and release resources | +| `Echo.connection` | `BroadcastDriver` | The resolved default driver instance | +| `Echo.socketId` | `String?` | Server-assigned socket identifier, or `null` when disconnected | +| `Echo.connectionState` | `Stream` | Stream of connection lifecycle state changes | +| `Echo.onReconnect` | `Stream` | Emits once each time the driver successfully reconnects | +| `Echo.addInterceptor(interceptor)` | `void` | Register an interceptor on the default connection | +| `Echo.manager` | `BroadcastManager` | The underlying manager (for `extend()` and advanced use) | +| `Echo.fake()` | `FakeBroadcastManager` | Swap to in-memory fake for testing | +| `Echo.unfake()` | `void` | Restore the real manager binding | + + +## Channels + + +### Public Channels + +Public channels require no authentication. Any connected client may subscribe. + +```dart +// Subscribe and listen for a specific event +Echo.channel('orders').listen('OrderShipped', (event) { + final orderId = event.data['id']; + print('Order $orderId has shipped!'); +}); + +// Fluent chaining for multiple events on the same channel +Echo.channel('orders') + .listen('OrderShipped', onShipped) + .listen('OrderCancelled', onCancelled); + +// Stop listening to a specific event +Echo.channel('orders').stopListening('OrderShipped'); + +// Leave the channel entirely +Echo.leave('orders'); +``` + +The `BroadcastEvent` envelope provides full context for each received message: + +| Property | Type | Description | +|:---------|:-----|:------------| +| `event` | `String` | Event name (e.g. `'App\\Events\\OrderShipped'`) | +| `channel` | `String` | Channel name the event arrived on | +| `data` | `Map` | Decoded JSON payload | +| `receivedAt` | `DateTime` | Local timestamp of receipt | + + +### Private Channels + +Private channels perform an HTTP auth handshake at `auth_endpoint` before subscribing. The driver sends the `socket_id` and `channel_name` to your server, which validates the request and returns an auth token. + +```dart +// Subscribe to a private channel (driver adds 'private-' prefix automatically) +Echo.private('user.${Auth.user()!.id}') + .listen('ProfileUpdated', (event) { + print('Profile updated: ${event.data}'); + }); + +// Convenience shorthand +Echo.listen('user.1', 'ProfileUpdated', (event) { + print(event.data); +}); +``` + +On the Laravel server side, the channel authorization lives in `routes/channels.php`: + +```php +Broadcast::channel('user.{id}', function ($user, $id) { + return (int) $user->id === (int) $id; +}); +``` + + +### Presence Channels + +Presence channels extend private channels with real-time member tracking. The server returns member data on subscription success, and the driver emits `onJoin`/`onLeave` streams as membership changes. + +```dart +final channel = Echo.join('room.1'); + +// Members currently in the channel +print('Online: ${channel.members.length}'); + +// React to member join/leave +channel.onJoin.listen((member) { + print('${member['name']} joined the room'); +}); + +channel.onLeave.listen((member) { + print('${member['name']} left the room'); +}); + +// Presence channels also support event listening +channel.listen('MessagePosted', (event) { + print('New message: ${event.data['body']}'); +}); +``` + +`BroadcastPresenceChannel` API: + +| Property | Type | Description | +|:---------|:-----|:------------| +| `members` | `List>` | Current member list (immutable snapshot) | +| `onJoin` | `Stream>` | Emits member payload on each new join | +| `onLeave` | `Stream>` | Emits member payload on each leave | + + +## Interceptors + +`BroadcastInterceptor` hooks into the driver message pipeline — identical in spirit to `MagicNetworkInterceptor` in the HTTP layer. All three hook methods have pass-through default implementations; subclass only what you need. + +| Method | Parameters | Returns | Description | +|:-------|:-----------|:--------|:------------| +| `onSend(message)` | `Map` | `Map` | Called before an outbound message is sent. Return modified map or empty map to suppress. | +| `onReceive(event)` | `BroadcastEvent` | `BroadcastEvent` | Called when an event arrives from the server. Return modified event to pass downstream. | +| `onError(error)` | `dynamic` | `dynamic` | Called when the driver encounters an error. Return original to propagate or a replacement to recover. | + + +### Creating a Custom Interceptor + +```dart +// lib/app/broadcasting/logging_broadcast_interceptor.dart +import 'package:magic/magic.dart'; + +class LoggingBroadcastInterceptor extends BroadcastInterceptor { + @override + BroadcastEvent onReceive(BroadcastEvent event) { + Log.debug('Broadcast received', { + 'event': event.event, + 'channel': event.channel, + 'data': event.data, + }); + return event; + } + + @override + dynamic onError(dynamic error) { + Log.error('Broadcast error', {'error': error.toString()}); + return error; + } +} +``` + + +### Registering Interceptors + +Register interceptors in a Service Provider's `boot()` phase, after the connection is established: + +```dart +class BroadcastingServiceProvider extends ServiceProvider { + BroadcastingServiceProvider(super.app); + + @override + Future boot() async { + Echo.addInterceptor(LoggingBroadcastInterceptor()); + } +} +``` + + +## Custom Drivers + + +### Implementing a Custom Driver + +Implement the `BroadcastDriver` abstract class: + +```dart +// lib/app/broadcasting/pusher_broadcast_driver.dart +import 'package:magic/magic.dart'; + +class PusherBroadcastDriver implements BroadcastDriver { + PusherBroadcastDriver(this._config); + + final Map _config; + + @override + Future connect() async { + // Establish connection to Pusher. + } + + @override + Future disconnect() async { + // Close the connection. + } + + @override + String? get socketId => /* ... */; + + @override + bool get isConnected => /* ... */; + + @override + Stream get connectionState => /* ... */; + + @override + Stream get onReconnect => /* ... */; + + @override + BroadcastChannel channel(String name) => /* ... */; + + @override + BroadcastChannel private(String name) => /* ... */; + + @override + BroadcastPresenceChannel join(String name) => /* ... */; + + @override + void leave(String name) { /* ... */ } + + @override + void addInterceptor(BroadcastInterceptor interceptor) { /* ... */ } +} +``` + + +### Registering the Custom Driver + +Register your driver via `BroadcastManager.extend()` in a Service Provider's `boot()` phase: + +```dart +class AppServiceProvider extends ServiceProvider { + AppServiceProvider(super.app); + + @override + Future boot() async { + BroadcastManager.extend('pusher', (config) => PusherBroadcastDriver(config)); + } +} +``` + +Then reference the driver name in config: + +```dart +'connections': { + 'pusher': { + 'driver': 'pusher', + 'app_key': Env.get('PUSHER_APP_KEY'), + 'cluster': Env.get('PUSHER_CLUSTER', 'mt1'), + }, +}, +``` + +This follows the same `extend()` pattern used by `Auth.manager.extend(...)` for custom auth guards and `LogManager.extend(...)` for custom log drivers. + + +## Testing + + +### Using FakeBroadcastManager + +`Echo.fake()` swaps the real `BroadcastManager` binding with an in-memory `FakeBroadcastManager`. All channel operations are recorded but no WebSocket connection is opened. + +```dart +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; +import 'package:magic/testing.dart'; + +void main() { + MagicTest.init(); + + test('subscribes to orders channel on init', () async { + final fake = Echo.fake(); + + // Exercise code under test + final controller = OrderController(); + await controller.onInit(); + + // Assert + fake.assertSubscribed('orders'); + fake.assertConnected(); + }); +} +``` + +Always call `Echo.unfake()` in tearDown — or use `MagicTest.init()` which resets the container automatically. + + +### Assertion Helpers + +`FakeBroadcastManager` exposes assertion methods that throw `AssertionError` with descriptive messages on failure: + +| Method | Description | +|:-------|:------------| +| `assertConnected()` | Assert the fake driver is in a connected state | +| `assertDisconnected()` | Assert the fake driver is disconnected | +| `assertSubscribed(channel)` | Assert a channel name is in the subscribed list | +| `assertNotSubscribed(channel)` | Assert a channel name is NOT subscribed | +| `assertInterceptorAdded()` | Assert at least one interceptor has been registered | +| `reset()` | Clear all recorded state on the fake driver | + +Access the underlying `FakeBroadcastDriver` via `fake.driver` for low-level inspection: + +```dart +final fake = Echo.fake(); + +Echo.channel('orders'); +Echo.private('user.1'); + +expect(fake.driver.subscribedChannels, contains('orders')); +expect(fake.driver.subscribedChannels, contains('private-user.1')); +expect(fake.driver.isConnected, isFalse); +``` + +Simulate received events by publishing directly to a channel in tests: + +```dart +// Inject a fake event into the channel stream +final channel = Echo.channel('orders') as _FakeBroadcastChannel; +// ... or use fake.driver to inspect subscriptions and simulate state changes +``` + + +## Connection + + +### Connection Lifecycle + +`BroadcastConnectionState` tracks the lifecycle of the WebSocket connection: + +| State | Description | +|:------|:------------| +| `connecting` | Establishing the connection | +| `connected` | Active, healthy connection | +| `disconnected` | Not connected, not attempting reconnect | +| `reconnecting` | Lost connection, attempting to re-establish | + +Subscribe to `Echo.connectionState` to react to transitions in your UI: + +```dart +Echo.connectionState.listen((state) { + switch (state) { + case BroadcastConnectionState.connected: + showOnlineBadge(); + case BroadcastConnectionState.reconnecting: + showReconnectingBanner(); + case BroadcastConnectionState.disconnected: + showOfflineBanner(); + default: + break; + } +}); +``` + +Re-subscribe to channels after a reconnect using `Echo.onReconnect`: + +```dart +Echo.onReconnect.listen((_) { + Echo.channel('orders').listen('OrderShipped', onShipped); +}); +``` + + +### Reconnection and Heartbeat + +`ReverbBroadcastDriver` implements automatic reconnection with **exponential backoff**: + +- Formula: `min(500ms × 2^attempt, max_reconnect_delay)` +- Default `max_reconnect_delay` is 30,000 ms (30 seconds) +- Set `reconnect: false` in config to disable auto-reconnect + +Pusher protocol error codes determine the reconnect strategy: + +| Code Range | Action | +|:-----------|:-------| +| 4000–4099 | Fatal — do not reconnect | +| 4100–4199 | Reconnect immediately without backoff | +| 4200–4299 | Reconnect with exponential backoff | + +The driver handles `pusher:ping` frames automatically, responding with `pusher:pong` to satisfy the server keepalive requirement. + + +### Deduplication + +The Reverb driver maintains a ring buffer of recently seen event fingerprints (channel + event name + raw data). Duplicate messages — which can arrive during reconnection — are silently dropped. + +Configure the buffer size with `dedup_buffer_size` (default: `100`). A larger buffer consumes more memory but reduces false duplicate detection during high-throughput scenarios. diff --git a/doc/getting-started/installation.md b/doc/getting-started/installation.md index 22121b8..093d440 100644 --- a/doc/getting-started/installation.md +++ b/doc/getting-started/installation.md @@ -115,7 +115,7 @@ dart run magic:magic install ``` This command creates everything you need: -- `lib/config/` — Configuration files (app, auth, cache, database, network, logging, view) +- `lib/config/` — Configuration files (app, auth, broadcasting, cache, database, network, logging, view) - `lib/app/` — Controllers, models, providers, middleware, policies - `lib/routes/` — Route definitions - `lib/resources/views/` — UI view classes @@ -129,6 +129,8 @@ You can exclude features you don't need: dart run magic:magic install --without-database --without-auth --without-cache ``` +Available flags: `--without-auth`, `--without-database`, `--without-network`, `--without-cache`, `--without-events`, `--without-localization`, `--without-logging`, `--without-broadcasting`. See [Magic CLI](/doc/packages/magic-cli.md#install) for details. + > [!TIP] > For convenience, you can also activate the CLI globally: `dart pub global activate magic_cli`. This lets you use the shorter `magic install` syntax instead of `dart run magic:magic install`. diff --git a/doc/packages/magic-cli.md b/doc/packages/magic-cli.md index 0e81b1a..46e5e3d 100644 --- a/doc/packages/magic-cli.md +++ b/doc/packages/magic-cli.md @@ -89,6 +89,7 @@ dart run magic:magic install --without-auth --without-events | `--without-events` | `lib/app/events/` and `lib/app/listeners/` directories | | `--without-localization` | `assets/lang/` directory, `LocalizationServiceProvider` | | `--without-logging` | `config/logging.dart` | +| `--without-broadcasting` | `config/broadcasting.dart`, `BroadcastServiceProvider` | ### key:generate diff --git a/example/pubspec.lock b/example/pubspec.lock index b17ee10..42a7aa4 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -953,6 +953,22 @@ packages: url: "https://pub.dev" source: hosted version: "1.1.1" + web_socket: + dependency: transitive + description: + name: web_socket + sha256: "34d64019aa8e36bf9842ac014bb5d2f5586ca73df5e4d9bf5c936975cae6982c" + url: "https://pub.dev" + source: hosted + version: "1.0.1" + web_socket_channel: + dependency: transitive + description: + name: web_socket_channel + sha256: d645757fb0f4773d602444000a8131ff5d48c9e47adfe9772652dd1a4f2d45c8 + url: "https://pub.dev" + source: hosted + version: "3.0.3" win32: dependency: transitive description: diff --git a/lib/config/broadcasting.dart b/lib/config/broadcasting.dart new file mode 100644 index 0000000..d665ee1 --- /dev/null +++ b/lib/config/broadcasting.dart @@ -0,0 +1,43 @@ +/// Broadcasting Configuration. +/// +/// ## Connections +/// +/// - `reverb` — Laravel Reverb WebSocket server (default driver) +/// - `null` — No-op driver; drops all broadcast events silently +/// +/// ## Reverb Options +/// +/// - `host` / `port` / `scheme` — WebSocket endpoint coordinates +/// - `app_key` — Reverb application key (matches server config) +/// - `auth_endpoint` — HTTP endpoint for private/presence channel auth +/// - `reconnect` — Auto-reconnect on unexpected disconnect +/// - `max_reconnect_delay` — Maximum back-off delay in milliseconds +/// - `activity_timeout` — Seconds of inactivity before ping is sent +/// - `dedup_buffer_size` — Number of recent event IDs kept for deduplication +final Map defaultBroadcastingConfig = { + 'broadcasting': { + // ------------------------------------------------------------------------- + // Default Connection + // ------------------------------------------------------------------------- + 'default': 'null', + + // ------------------------------------------------------------------------- + // Connections + // ------------------------------------------------------------------------- + 'connections': { + 'reverb': { + 'driver': 'reverb', + 'host': 'localhost', + 'port': 8080, + 'scheme': 'ws', + 'app_key': '', + 'auth_endpoint': '/broadcasting/auth', + 'reconnect': true, + 'max_reconnect_delay': 30000, + 'activity_timeout': 120, + 'dedup_buffer_size': 100, + }, + 'null': {'driver': 'null'}, + }, + }, +}; diff --git a/lib/magic.dart b/lib/magic.dart index 3177620..7054a37 100644 --- a/lib/magic.dart +++ b/lib/magic.dart @@ -100,6 +100,7 @@ export 'src/testing/fake_auth_manager.dart'; export 'src/testing/fake_cache_manager.dart'; export 'src/testing/fake_vault_service.dart'; export 'src/testing/fake_log_manager.dart'; +export 'src/testing/fake_broadcast_manager.dart'; // Database export 'src/database/database_manager.dart'; @@ -193,3 +194,17 @@ export 'src/launch/launch_adapter.dart'; export 'src/launch/launch_service.dart'; export 'src/launch/launch_service_provider.dart'; export 'src/facades/launch.dart'; + +// Broadcasting +export 'config/broadcasting.dart'; +export 'src/broadcasting/broadcast_connection_state.dart'; +export 'src/broadcasting/broadcast_event.dart'; +export 'src/broadcasting/broadcast_manager.dart'; +export 'src/broadcasting/broadcast_service_provider.dart'; +export 'src/broadcasting/contracts/broadcast_channel.dart'; +export 'src/broadcasting/contracts/broadcast_driver.dart'; +export 'src/broadcasting/contracts/broadcast_interceptor.dart'; +export 'src/broadcasting/contracts/broadcast_presence_channel.dart'; +export 'src/broadcasting/drivers/null_broadcast_driver.dart'; +export 'src/broadcasting/drivers/reverb_broadcast_driver.dart'; +export 'src/facades/echo.dart'; diff --git a/lib/src/broadcasting/broadcast_connection_state.dart b/lib/src/broadcasting/broadcast_connection_state.dart new file mode 100644 index 0000000..9bc7d0c --- /dev/null +++ b/lib/src/broadcasting/broadcast_connection_state.dart @@ -0,0 +1,18 @@ +/// The possible connection states for a broadcast driver. +/// +/// Represents the lifecycle of a WebSocket (or equivalent) connection managed +/// by a [BroadcastDriver]. Consumers can react to state transitions by +/// subscribing to [BroadcastDriver.connectionState]. +enum BroadcastConnectionState { + /// The driver is in the process of establishing a connection. + connecting, + + /// The driver has an active, healthy connection to the broadcast server. + connected, + + /// The driver is not connected and is not attempting to reconnect. + disconnected, + + /// The driver lost its connection and is attempting to re-establish it. + reconnecting, +} diff --git a/lib/src/broadcasting/broadcast_event.dart b/lib/src/broadcasting/broadcast_event.dart new file mode 100644 index 0000000..e52cd16 --- /dev/null +++ b/lib/src/broadcasting/broadcast_event.dart @@ -0,0 +1,34 @@ +/// An event received from the broadcast server. +/// +/// Encapsulates the raw event name, the channel it arrived on, the decoded +/// payload, and the local timestamp at which it was received. Instances are +/// immutable and created by drivers when messages arrive. +class BroadcastEvent { + /// Creates a [BroadcastEvent]. + /// + /// All fields are required so that consumers always have full context about + /// the origin of the event. + const BroadcastEvent({ + required this.event, + required this.channel, + required this.data, + required this.receivedAt, + }); + + /// The event name as broadcast by the server (e.g. `'App\\Events\\OrderShipped'`). + final String event; + + /// The channel name the event was received on (e.g. `'orders'`, `'private-inbox.1'`). + final String channel; + + /// The decoded JSON payload sent with the event. + final Map data; + + /// The local [DateTime] at which this event was received by the driver. + final DateTime receivedAt; + + @override + String toString() => + 'BroadcastEvent(event: $event, channel: $channel, ' + 'data: $data, receivedAt: $receivedAt)'; +} diff --git a/lib/src/broadcasting/broadcast_manager.dart b/lib/src/broadcasting/broadcast_manager.dart new file mode 100644 index 0000000..0c16a24 --- /dev/null +++ b/lib/src/broadcasting/broadcast_manager.dart @@ -0,0 +1,89 @@ +import '../facades/config.dart'; +import 'contracts/broadcast_driver.dart'; +import 'drivers/null_broadcast_driver.dart'; +import 'drivers/reverb_broadcast_driver.dart'; + +/// The Broadcast Manager. +/// +/// Resolves the configured broadcasting connection and returns the appropriate +/// driver. Follows the same manager pattern as [LogManager] and [CacheManager]. +/// +/// ```dart +/// final driver = BroadcastManager().connection(); +/// await driver.connect(); +/// ``` +class BroadcastManager { + BroadcastDriver? _cachedConnection; + + static final Map)> + _customDrivers = {}; + + /// Register a custom broadcast driver factory. + /// + /// ```dart + /// BroadcastManager.extend( + /// 'pusher', + /// (config) => PusherBroadcastDriver(config), + /// ); + /// ``` + static void extend( + String name, + BroadcastDriver Function(Map config) factory, + ) { + _customDrivers[name] = factory; + } + + /// Reset all custom drivers (for testing). + static void resetDrivers() { + _customDrivers.clear(); + } + + /// Get the broadcast driver for the given [name], or the default connection. + /// + /// Calling without arguments returns (and caches) the default connection + /// defined by `broadcasting.default` in config. Passing an explicit [name] + /// resolves that named connection without affecting the cache. + /// + /// ```dart + /// final driver = BroadcastManager().connection(); // default + /// final reverb = BroadcastManager().connection('reverb'); // named + /// ``` + BroadcastDriver connection([String? name]) { + if (_cachedConnection != null && name == null) { + return _cachedConnection!; + } + + final connectionName = + name ?? Config.get('broadcasting.default', 'null')!; + final resolved = _resolveConnection(connectionName); + + if (name == null) { + _cachedConnection = resolved; + } + + return resolved; + } + + /// Resolve a connection by name from config. + BroadcastDriver _resolveConnection(String name) { + final connections = + Config.get>('broadcasting.connections') ?? {}; + final connectionConfig = connections[name] as Map? ?? {}; + final driverName = connectionConfig['driver'] as String? ?? 'null'; + + if (_customDrivers.containsKey(driverName)) { + return _customDrivers[driverName]!(connectionConfig); + } + + switch (driverName) { + case 'reverb': + return ReverbBroadcastDriver(connectionConfig); + case 'null': + default: + return _createNullDriver(); + } + } + + /// Create a [NullBroadcastDriver] instance. + BroadcastDriver _createNullDriver() => NullBroadcastDriver(); +} diff --git a/lib/src/broadcasting/broadcast_service_provider.dart b/lib/src/broadcasting/broadcast_service_provider.dart new file mode 100644 index 0000000..7d6f956 --- /dev/null +++ b/lib/src/broadcasting/broadcast_service_provider.dart @@ -0,0 +1,57 @@ +import '../facades/config.dart'; +import '../support/service_provider.dart'; +import 'broadcast_manager.dart'; + +/// Broadcast Service Provider. +/// +/// Registers the [BroadcastManager] as a singleton in the service container +/// and, during the boot phase, auto-connects the default driver — unless the +/// configured default connection is `'null'`, in which case the connect step +/// is intentionally skipped. +/// +/// This provider is **not** auto-registered. Add it explicitly to your +/// application's `providers` list, the same way as [EncryptionServiceProvider]: +/// +/// ```dart +/// await Magic.init( +/// configs: [ +/// { +/// 'app': { +/// 'providers': [ +/// (app) => BroadcastServiceProvider(app), +/// ], +/// }, +/// }, +/// ], +/// ); +/// ``` +class BroadcastServiceProvider extends ServiceProvider { + /// Creates the provider with a reference to the application container. + BroadcastServiceProvider(super.app); + + /// Bind the [BroadcastManager] singleton into the service container. + /// + /// After this phase any other provider or service can resolve the manager via + /// `app.make('broadcasting')` or the `Broadcast` facade. + @override + void register() { + app.singleton('broadcasting', () => BroadcastManager()); + } + + /// Auto-connect the default broadcast connection. + /// + /// Skips the connection attempt when `broadcasting.default` is `'null'` + /// to avoid unnecessary work during local development or testing. + @override + Future boot() async { + final manager = app.make('broadcasting'); + final defaultConnection = Config.get( + 'broadcasting.default', + 'null', + ); + + if (defaultConnection != 'null') { + await manager.connection().connect(); + } + } +} diff --git a/lib/src/broadcasting/contracts/broadcast_channel.dart b/lib/src/broadcasting/contracts/broadcast_channel.dart new file mode 100644 index 0000000..401ecf5 --- /dev/null +++ b/lib/src/broadcasting/contracts/broadcast_channel.dart @@ -0,0 +1,39 @@ +import '../broadcast_event.dart'; + +/// The Broadcast Channel contract. +/// +/// Represents a public broadcast channel. Consumers subscribe to events by +/// calling [listen] with an event name and a callback. The raw [events] stream +/// emits every event received on this channel regardless of registration. +/// +/// ```dart +/// final channel = Broadcast.channel('orders'); +/// channel.listen('OrderShipped', (event) { +/// print('Order shipped: ${event.data}'); +/// }); +/// ``` +abstract class BroadcastChannel { + /// The fully-qualified channel name as sent to the server (e.g. `'orders'`). + String get name; + + /// A broadcast stream of every [BroadcastEvent] received on this channel. + /// + /// The stream is multi-subscription — multiple listeners may be attached + /// simultaneously without replaying prior events. + Stream get events; + + /// Registers a [callback] to be invoked whenever [event] is received. + /// + /// Returns `this` to allow fluent chaining: + /// ```dart + /// channel + /// .listen('OrderShipped', onShipped) + /// .listen('OrderCancelled', onCancelled); + /// ``` + BroadcastChannel listen(String event, void Function(BroadcastEvent) callback); + + /// Removes the listener previously registered for [event]. + /// + /// Has no effect if no listener was registered for [event]. + void stopListening(String event); +} diff --git a/lib/src/broadcasting/contracts/broadcast_driver.dart b/lib/src/broadcasting/contracts/broadcast_driver.dart new file mode 100644 index 0000000..1bab82e --- /dev/null +++ b/lib/src/broadcasting/contracts/broadcast_driver.dart @@ -0,0 +1,75 @@ +import '../broadcast_connection_state.dart'; +import 'broadcast_channel.dart'; +import 'broadcast_interceptor.dart'; +import 'broadcast_presence_channel.dart'; + +/// The Broadcast Driver contract. +/// +/// All transport implementations (Pusher, Ably, Soketi, etc.) must implement +/// this interface. The driver owns the underlying connection lifecycle and +/// vends typed channel objects to consumers. +/// +/// ```dart +/// await Broadcast.driver().connect(); +/// final channel = Broadcast.driver().channel('orders'); +/// channel.listen('OrderShipped', (event) => print(event.data)); +/// ``` +abstract class BroadcastDriver { + /// Establishes the connection to the broadcast server. + /// + /// Resolves once the connection is ready. Throws on unrecoverable failure. + Future connect(); + + /// Closes the connection and releases all resources. + /// + /// After calling [disconnect] the driver should transition to + /// [BroadcastConnectionState.disconnected]. + Future disconnect(); + + /// The socket identifier assigned by the server, or `null` when not connected. + /// + /// Required for server-side auth endpoints that validate channel subscriptions. + String? get socketId; + + /// Whether the driver currently has an active connection. + bool get isConnected; + + /// A broadcast stream of [BroadcastConnectionState] transitions. + /// + /// Emits the new state each time the connection lifecycle changes. + Stream get connectionState; + + /// A stream that emits once each time the driver successfully reconnects. + /// + /// Useful for re-subscribing to channels after a connection drop. + Stream get onReconnect; + + /// Returns a public [BroadcastChannel] for [name]. + /// + /// Subscribes to the channel on first call; subsequent calls for the same + /// [name] should return the cached instance. + BroadcastChannel channel(String name); + + /// Returns a private [BroadcastChannel] for [name]. + /// + /// The driver will prefix the name (e.g. `'private-'`) as required by the + /// server and perform the appropriate auth handshake. + BroadcastChannel private(String name); + + /// Joins a presence channel for [name] and returns it. + /// + /// The driver will prefix the name (e.g. `'presence-'`) and perform the + /// presence auth handshake to obtain membership data. + BroadcastPresenceChannel join(String name); + + /// Unsubscribes from the channel identified by [name]. + /// + /// Has no effect if the channel was not previously subscribed. + void leave(String name); + + /// Registers an [interceptor] to be applied to all outbound and inbound + /// messages processed by this driver. + /// + /// Interceptors are invoked in the order they are added. + void addInterceptor(BroadcastInterceptor interceptor); +} diff --git a/lib/src/broadcasting/contracts/broadcast_interceptor.dart b/lib/src/broadcasting/contracts/broadcast_interceptor.dart new file mode 100644 index 0000000..17d4c94 --- /dev/null +++ b/lib/src/broadcasting/contracts/broadcast_interceptor.dart @@ -0,0 +1,39 @@ +import '../broadcast_event.dart'; + +/// The Broadcast Interceptor contract. +/// +/// Provides hooks for inspecting and mutating messages at the driver level, +/// mirroring the [MagicNetworkInterceptor] pattern. All methods have +/// pass-through default implementations so subclasses only override what they +/// need. +/// +/// ```dart +/// class LoggingBroadcastInterceptor extends BroadcastInterceptor { +/// @override +/// BroadcastEvent onReceive(BroadcastEvent event) { +/// print('Received ${event.event} on ${event.channel}'); +/// return event; +/// } +/// } +/// ``` +/// +/// Register interceptors via [BroadcastDriver.addInterceptor]. +abstract class BroadcastInterceptor { + /// Called before an outbound message is sent to the server. + /// + /// [message] is the raw payload map. Return the (optionally modified) map + /// to allow the message to proceed, or an empty map to suppress it. + Map onSend(Map message) => message; + + /// Called when a [BroadcastEvent] is received from the server. + /// + /// Return the (optionally modified) event to pass it downstream to channel + /// listeners. + BroadcastEvent onReceive(BroadcastEvent event) => event; + + /// Called when the driver encounters an error. + /// + /// Return the original [error] to propagate it, or return a replacement + /// value to recover (e.g. a fallback [BroadcastEvent]). + dynamic onError(dynamic error) => error; +} diff --git a/lib/src/broadcasting/contracts/broadcast_presence_channel.dart b/lib/src/broadcasting/contracts/broadcast_presence_channel.dart new file mode 100644 index 0000000..cf7b203 --- /dev/null +++ b/lib/src/broadcasting/contracts/broadcast_presence_channel.dart @@ -0,0 +1,31 @@ +import 'broadcast_channel.dart'; + +/// The Broadcast Presence Channel contract. +/// +/// Extends [BroadcastChannel] with membership awareness. Presence channels +/// expose the list of currently-connected members and emit streams when members +/// join or leave, enabling features such as online indicators and typing +/// notifications. +/// +/// ```dart +/// final channel = Broadcast.join('presence-room.1'); +/// channel.onJoin.listen((member) => print('${member['name']} joined')); +/// channel.onLeave.listen((member) => print('${member['name']} left')); +/// ``` +abstract class BroadcastPresenceChannel extends BroadcastChannel { + /// The current list of members connected to this presence channel. + /// + /// Each entry is the member payload returned by the server's auth endpoint. + /// The list is updated automatically as members join and leave. + List> get members; + + /// A stream that emits a member payload each time a new member joins. + /// + /// The emitted map contains the same fields as entries in [members]. + Stream> get onJoin; + + /// A stream that emits a member payload each time a member leaves. + /// + /// The emitted map contains the same fields as entries in [members]. + Stream> get onLeave; +} diff --git a/lib/src/broadcasting/drivers/null_broadcast_driver.dart b/lib/src/broadcasting/drivers/null_broadcast_driver.dart new file mode 100644 index 0000000..71e3d53 --- /dev/null +++ b/lib/src/broadcasting/drivers/null_broadcast_driver.dart @@ -0,0 +1,129 @@ +import '../broadcast_connection_state.dart'; +import '../broadcast_event.dart'; +import '../contracts/broadcast_channel.dart'; +import '../contracts/broadcast_driver.dart'; +import '../contracts/broadcast_interceptor.dart'; +import '../contracts/broadcast_presence_channel.dart'; + +/// A no-op [BroadcastDriver] for local development and testing environments +/// where no WebSocket server is available. +/// +/// All connection methods resolve immediately without side-effects. Channel +/// methods return silent [_NullBroadcastChannel] instances whose event streams +/// never emit. This allows application code that depends on broadcasting to +/// compile and run without a real server. +/// +/// ```dart +/// Magic.init( +/// providers: [BroadcastServiceProvider()], +/// config: {'broadcast': {'default': 'null'}}, +/// ); +/// ``` +class NullBroadcastDriver implements BroadcastDriver { + @override + /// Resolves immediately — no connection is established. + Future connect() => Future.value(); + + @override + /// Resolves immediately — nothing to close. + Future disconnect() => Future.value(); + + @override + /// Always `null` — no server assigns a socket identifier to a null driver. + String? get socketId => null; + + @override + /// Always `false` — the null driver never establishes a real connection. + bool get isConnected => false; + + @override + /// Never emits — the null driver has no connection lifecycle transitions. + Stream get connectionState => + const Stream.empty(); + + @override + /// Never emits — the null driver never reconnects. + Stream get onReconnect => const Stream.empty(); + + @override + /// Returns a [_NullBroadcastChannel] for [name]. + /// + /// The returned channel accepts listener registrations but never delivers + /// events. + BroadcastChannel channel(String name) => _NullBroadcastChannel(name); + + @override + /// Returns a [_NullBroadcastChannel] for [name]. + /// + /// Behaves identically to [channel] — no auth handshake is performed. + BroadcastChannel private(String name) => _NullBroadcastChannel(name); + + @override + /// Returns a [_NullBroadcastPresenceChannel] for [name] with no members. + /// + /// No presence auth handshake is performed. + BroadcastPresenceChannel join(String name) => + _NullBroadcastPresenceChannel(name); + + @override + /// No-op — the null driver holds no subscriptions to unsubscribe from. + void leave(String name) {} + + @override + /// No-op — the null driver processes no messages for interceptors to act on. + void addInterceptor(BroadcastInterceptor interceptor) {} +} + +/// A no-op [BroadcastChannel] whose event streams never emit. +/// +/// Listener registrations are accepted without error but never invoked. +/// The [events] stream is a permanent empty broadcast stream — no +/// [StreamController] is allocated, so there is no resource to leak. +class _NullBroadcastChannel implements BroadcastChannel { + /// Creates a [_NullBroadcastChannel] with the given [name]. + const _NullBroadcastChannel(this.name); + + @override + final String name; + + @override + /// A stream that never emits any [BroadcastEvent]. + Stream get events => const Stream.empty(); + + @override + /// Registers [callback] for [event] and returns `this` for fluent chaining. + /// + /// The callback will never be invoked because this driver receives no events. + BroadcastChannel listen( + String event, + void Function(BroadcastEvent) callback, + ) => this; + + @override + /// No-op — no listener was ever registered. + void stopListening(String event) {} +} + +/// A no-op [BroadcastPresenceChannel] with no members and silent streams. +/// +/// Extends [_NullBroadcastChannel] and satisfies the [BroadcastPresenceChannel] +/// interface with empty membership data and never-emitting join/leave streams. +class _NullBroadcastPresenceChannel extends _NullBroadcastChannel + implements BroadcastPresenceChannel { + /// Creates a [_NullBroadcastPresenceChannel] with the given [name]. + const _NullBroadcastPresenceChannel(super.name); + + @override + /// Always empty — no members are present when there is no real server. + List> get members => const >[]; + + @override + /// Never emits — no members join on a null driver. + Stream> get onJoin => + const Stream>.empty(); + + @override + /// Never emits — no members leave on a null driver. + Stream> get onLeave => + const Stream>.empty(); +} diff --git a/lib/src/broadcasting/drivers/reverb_broadcast_driver.dart b/lib/src/broadcasting/drivers/reverb_broadcast_driver.dart new file mode 100644 index 0000000..d2c361b --- /dev/null +++ b/lib/src/broadcasting/drivers/reverb_broadcast_driver.dart @@ -0,0 +1,819 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:convert'; +import 'dart:math'; + +import 'package:web_socket_channel/web_socket_channel.dart'; + +import '../../facades/http.dart'; +import '../broadcast_connection_state.dart'; +import '../broadcast_event.dart'; +import '../contracts/broadcast_channel.dart'; +import '../contracts/broadcast_driver.dart'; +import '../contracts/broadcast_interceptor.dart'; +import '../contracts/broadcast_presence_channel.dart'; + +/// Pusher error code classification for reconnection strategy. +/// +/// Pusher protocol defines three error ranges that dictate how the client +/// should respond after receiving an error frame. +enum PusherErrorAction { + /// 4000–4099: Do not reconnect — the error is permanent. + fatal, + + /// 4100–4199: Reconnect immediately without backoff. + reconnectImmediate, + + /// 4200–4299: Reconnect with exponential backoff. + reconnectBackoff, +} + +/// A [BroadcastDriver] that connects to Laravel Reverb (Pusher-compatible) +/// via a pure-Dart WebSocket client. +/// +/// Handles the full Pusher protocol: connection handshake, application-level +/// ping/pong, channel subscriptions (public, private, presence), event +/// deduplication, interceptor chains, and automatic reconnection with +/// exponential backoff. +/// +/// ## Usage +/// ```dart +/// final driver = ReverbBroadcastDriver({ +/// 'host': 'localhost', +/// 'port': 8080, +/// 'scheme': 'ws', +/// 'app_key': 'my-app-key', +/// 'auth_endpoint': '/broadcasting/auth', +/// }); +/// await driver.connect(); +/// driver.channel('orders').listen('OrderShipped', (event) { +/// print(event.data); +/// }); +/// ``` +class ReverbBroadcastDriver implements BroadcastDriver { + // --------------------------------------------------------------------------- + // Constructor + // --------------------------------------------------------------------------- + + /// Creates a [ReverbBroadcastDriver]. + /// + /// [config] contains connection parameters read from + /// `broadcasting.connections.reverb`. [channelFactory] overrides WebSocket + /// creation for testing — defaults to [WebSocketChannel.connect]. + ReverbBroadcastDriver( + this._config, { + WebSocketChannel Function(Uri uri)? channelFactory, + }) : _channelFactory = channelFactory ?? WebSocketChannel.connect; + + // --------------------------------------------------------------------------- + // Dependencies + // --------------------------------------------------------------------------- + + final Map _config; + final WebSocketChannel Function(Uri uri) _channelFactory; + + // --------------------------------------------------------------------------- + // Connection state + // --------------------------------------------------------------------------- + + WebSocketChannel? _channel; + StreamSubscription? _streamSubscription; + bool _isConnected = false; + String? _socketId; + + /// The activity timeout in seconds reported by the server. + /// + /// Parsed from the `pusher:connection_established` frame. Used to determine + /// how frequently the server expects keepalive traffic. + int activityTimeout = 30; + Completer? _connectionCompleter; + + /// Broadcast controller that re-exposes the single-subscription + /// [WebSocketChannel.stream] as a multi-subscriber stream for internal + /// multi-channel routing. + StreamController? _broadcastStreamController; + + final StreamController _connectionStateController = + StreamController.broadcast(); + + final StreamController _onReconnectController = + StreamController.broadcast(); + + // --------------------------------------------------------------------------- + // Channel management + // --------------------------------------------------------------------------- + + final Map _channels = + {}; + + // --------------------------------------------------------------------------- + // Subscription queue — buffers subscribe calls during reconnection + // --------------------------------------------------------------------------- + + final List _subscriptionQueue = []; + + // --------------------------------------------------------------------------- + // Deduplication — ring buffer + // --------------------------------------------------------------------------- + + final Queue _dedupQueue = Queue(); + final Set _dedupSet = {}; + + // --------------------------------------------------------------------------- + // Reconnection + // --------------------------------------------------------------------------- + + Timer? _reconnectTimer; + int _attempt = 0; + + // --------------------------------------------------------------------------- + // Interceptors + // --------------------------------------------------------------------------- + + final List _interceptors = []; + + // --------------------------------------------------------------------------- + // BroadcastDriver — getters + // --------------------------------------------------------------------------- + + @override + String? get socketId => _socketId; + + @override + bool get isConnected => _isConnected; + + @override + Stream get connectionState => + _connectionStateController.stream; + + @override + Stream get onReconnect => _onReconnectController.stream; + + // --------------------------------------------------------------------------- + // BroadcastDriver — connection lifecycle + // --------------------------------------------------------------------------- + + @override + Future connect() async { + _connectionStateController.add(BroadcastConnectionState.connecting); + + final host = _config['host'] as String; + final port = _config['port'] as int; + final scheme = _config['scheme'] as String; + final appKey = _config['app_key'] as String; + + final uri = Uri.parse( + '$scheme://$host:$port/app/$appKey' + '?protocol=7&client=dart&version=1.0.0', + ); + + _channel = _channelFactory(uri); + await _channel!.ready; + + _connectionCompleter = Completer(); + + // Wrap the single-subscription stream as a broadcast stream. + _broadcastStreamController = StreamController.broadcast(); + _streamSubscription = _channel!.stream.listen( + _broadcastStreamController!.add, + onDone: () { + _broadcastStreamController?.close(); + _onDone(); + }, + onError: (Object error) { + _broadcastStreamController?.addError(error); + _onError(error); + }, + ); + + _broadcastStreamController!.stream.listen( + (raw) { + try { + _onMessage(raw); + } on StateError catch (error) { + // Protocol violation (e.g. missing 'event' key). Route through + // interceptor error chain rather than crashing the subscription. + dynamic processed = error; + for (final interceptor in _interceptors) { + processed = interceptor.onError(processed); + } + } + }, + onError: (Object error) { + // Route stream-level errors through interceptors. + dynamic processed = error; + for (final interceptor in _interceptors) { + processed = interceptor.onError(processed); + } + }, + ); + + return _connectionCompleter!.future; + } + + @override + Future disconnect() async { + _reconnectTimer?.cancel(); + _reconnectTimer = null; + _attempt = 0; + + _subscriptionQueue.clear(); + + _streamSubscription?.cancel(); + _streamSubscription = null; + + _broadcastStreamController?.close(); + _broadcastStreamController = null; + + _channel?.sink.close(); + _channel = null; + + _isConnected = false; + _socketId = null; + + // Dispose all channels. + for (final channel in _channels.values) { + channel.dispose(); + } + _channels.clear(); + + _dedupQueue.clear(); + _dedupSet.clear(); + + if (_connectionCompleter != null && !_connectionCompleter!.isCompleted) { + _connectionCompleter!.complete(); + } + _connectionCompleter = null; + + _connectionStateController.add(BroadcastConnectionState.disconnected); + } + + // --------------------------------------------------------------------------- + // BroadcastDriver — channel operations + // --------------------------------------------------------------------------- + + @override + BroadcastChannel channel(String name) { + if (_channels.containsKey(name)) { + return _channels[name]!; + } + + final ch = ReverbBroadcastChannel(name); + _channels[name] = ch; + _subscribePublic(name); + return ch; + } + + @override + BroadcastChannel private(String name) { + final prefixed = name.startsWith('private-') ? name : 'private-$name'; + + if (_channels.containsKey(prefixed)) { + return _channels[prefixed]!; + } + + final ch = ReverbBroadcastChannel(prefixed); + _channels[prefixed] = ch; + _subscribePrivate(prefixed); + return ch; + } + + @override + BroadcastPresenceChannel join(String name) { + final prefixed = name.startsWith('presence-') ? name : 'presence-$name'; + + if (_channels.containsKey(prefixed)) { + return _channels[prefixed]! as ReverbBroadcastPresenceChannel; + } + + final ch = ReverbBroadcastPresenceChannel(prefixed); + _channels[prefixed] = ch; + _subscribePresence(prefixed); + return ch; + } + + @override + void leave(String name) { + final ch = _channels.remove(name); + if (ch == null) return; + + _send({ + 'event': 'pusher:unsubscribe', + 'data': {'channel': name}, + }); + + ch.dispose(); + } + + @override + void addInterceptor(BroadcastInterceptor interceptor) { + _interceptors.add(interceptor); + } + + // --------------------------------------------------------------------------- + // Backoff delay — pure function, exposed for testing + // --------------------------------------------------------------------------- + + /// Computes the reconnect delay for [attempt] using exponential backoff. + /// + /// Formula: `min(500 * 2^attempt, maxReconnectDelay)` milliseconds. + Duration backoffDelay(int attempt) { + final maxDelay = _config['max_reconnect_delay'] as int? ?? 30000; + final ms = min(500 * pow(2, attempt).toInt(), maxDelay); + return Duration(milliseconds: ms); + } + + /// Classifies a Pusher error [code] into a reconnection action. + /// + /// - 4000–4099: [PusherErrorAction.fatal] — do not reconnect. + /// - 4100–4199: [PusherErrorAction.reconnectImmediate] — reconnect without delay. + /// - 4200–4299: [PusherErrorAction.reconnectBackoff] — reconnect with backoff. + /// - Other: defaults to [PusherErrorAction.reconnectBackoff]. + PusherErrorAction classifyErrorCode(int code) { + if (code >= 4000 && code <= 4099) { + return PusherErrorAction.fatal; + } + if (code >= 4100 && code <= 4199) { + return PusherErrorAction.reconnectImmediate; + } + return PusherErrorAction.reconnectBackoff; + } + + // --------------------------------------------------------------------------- + // Pusher protocol handling (private) + // --------------------------------------------------------------------------- + + void _onMessage(dynamic raw) { + final json = jsonDecode(raw as String) as Map; + final event = json['event'] as String?; + + if (event == null) { + throw StateError( + 'Received WebSocket frame without required "event" key: $json', + ); + } + + switch (event) { + case 'pusher:connection_established': + _handleConnectionEstablished(json); + case 'pusher:ping': + _sendPong(); + case 'pusher:subscription_succeeded': + _handleSubscriptionSucceeded(json); + case 'pusher:error': + _handlePusherError(json); + case 'pusher_internal:member_added': + _handlePresenceEvent(json); + case 'pusher_internal:member_removed': + _handlePresenceEvent(json); + default: + _handleApplicationEvent(json); + } + } + + void _handleConnectionEstablished(Map json) { + final data = jsonDecode(json['data'] as String) as Map; + _socketId = data['socket_id'] as String; + activityTimeout = data['activity_timeout'] as int? ?? 30; + _isConnected = true; + _attempt = 0; + + _connectionStateController.add(BroadcastConnectionState.connected); + + // Flush subscription queue. + if (_subscriptionQueue.isNotEmpty) { + for (final action in _subscriptionQueue) { + action(); + } + _subscriptionQueue.clear(); + } + + if (_connectionCompleter != null && !_connectionCompleter!.isCompleted) { + _connectionCompleter!.complete(); + } + } + + void _handleSubscriptionSucceeded(Map json) { + final channelName = json['channel'] as String?; + if (channelName == null) return; + + final ch = _channels[channelName]; + if (ch is ReverbBroadcastPresenceChannel) { + final rawData = json['data']; + final Map data; + if (rawData is String) { + data = jsonDecode(rawData) as Map; + } else if (rawData is Map) { + data = rawData; + } else { + return; + } + ch.handlePresenceEvent('pusher:subscription_succeeded', data); + } + } + + void _handlePusherError(Map json) { + final rawData = json['data']; + + // Route through interceptor chain. + dynamic error = StateError('Pusher error: $rawData'); + for (final interceptor in _interceptors) { + error = interceptor.onError(error); + } + + if (rawData is String) { + try { + final data = jsonDecode(rawData) as Map; + final code = data['code'] as int?; + if (code != null) { + final action = classifyErrorCode(code); + switch (action) { + case PusherErrorAction.fatal: + // Do not reconnect. + return; + case PusherErrorAction.reconnectImmediate: + _scheduleReconnect(immediate: true); + case PusherErrorAction.reconnectBackoff: + _scheduleReconnect(); + } + } + } catch (_) { + // Malformed data. + } + } + } + + void _handlePresenceEvent(Map json) { + final channelName = json['channel'] as String?; + if (channelName == null) return; + + final ch = _channels[channelName]; + if (ch is! ReverbBroadcastPresenceChannel) return; + + final event = json['event'] as String; + final rawData = json['data']; + final Map data; + if (rawData is String) { + data = jsonDecode(rawData) as Map; + } else if (rawData is Map) { + data = rawData; + } else { + return; + } + + ch.handlePresenceEvent(event, data); + } + + void _handleApplicationEvent(Map json) { + final channelName = json['channel'] as String?; + if (channelName == null) return; + + final ch = _channels[channelName]; + if (ch == null) return; + + final eventName = json['event'] as String; + final rawData = json['data']; + + // Deduplication. + final rawDataString = rawData is String ? rawData : jsonEncode(rawData); + final dedupKey = '$channelName:$eventName:$rawDataString'; + final maxDedupSize = _config['dedup_buffer_size'] as int? ?? 100; + + if (_dedupSet.contains(dedupKey)) return; + _addToDedupBuffer(dedupKey, maxDedupSize); + + // Double-JSON decode: if data is a String, decode it. If already a Map, + // use directly. + final Map data; + if (rawData is String) { + data = jsonDecode(rawData) as Map; + } else if (rawData is Map) { + data = rawData; + } else { + data = {}; + } + + var event = BroadcastEvent( + event: eventName, + channel: channelName, + data: data, + receivedAt: DateTime.now(), + ); + + // Interceptor chain — onReceive. + for (final interceptor in _interceptors) { + event = interceptor.onReceive(event); + } + + ch.addEvent(event); + } + + // --------------------------------------------------------------------------- + // Deduplication ring buffer + // --------------------------------------------------------------------------- + + void _addToDedupBuffer(String key, int maxSize) { + _dedupQueue.add(key); + _dedupSet.add(key); + + if (_dedupQueue.length > maxSize) { + final evicted = _dedupQueue.removeFirst(); + _dedupSet.remove(evicted); + } + } + + // --------------------------------------------------------------------------- + // Subscription helpers + // --------------------------------------------------------------------------- + + void _subscribePublic(String name) { + if (!_isConnected) { + _subscriptionQueue.add(() => _subscribePublic(name)); + return; + } + + _send({ + 'event': 'pusher:subscribe', + 'data': {'channel': name}, + }); + } + + void _subscribePrivate(String name) { + if (!_isConnected) { + _subscriptionQueue.add(() => _subscribePrivate(name)); + return; + } + + _authenticateAndSubscribe(name); + } + + void _subscribePresence(String name) { + if (!_isConnected) { + _subscriptionQueue.add(() => _subscribePresence(name)); + return; + } + + _authenticateAndSubscribe(name); + } + + /// Authenticates a private or presence channel via HTTP POST and then sends + /// the `pusher:subscribe` frame with the auth token. + Future _authenticateAndSubscribe(String channelName) async { + if (_socketId == null) return; + + try { + final authEndpoint = + _config['auth_endpoint'] as String? ?? '/broadcasting/auth'; + + final response = await Http.post( + authEndpoint, + data: { + 'socket_id': _socketId, + 'channel_name': channelName, + }, + ); + + final authData = response.data as Map?; + if (authData == null || authData['auth'] == null) return; + + final subscribeData = { + 'channel': channelName, + 'auth': authData['auth'], + }; + + // Presence channels include channel_data. + if (authData.containsKey('channel_data')) { + subscribeData['channel_data'] = authData['channel_data']; + } + + _send({'event': 'pusher:subscribe', 'data': subscribeData}); + } catch (_) { + // Auth failure — channel will not be subscribed. + } + } + + // --------------------------------------------------------------------------- + // Reconnection + // --------------------------------------------------------------------------- + + void _onDone() { + if (_connectionCompleter != null && !_connectionCompleter!.isCompleted) { + _connectionCompleter!.completeError( + StateError('WebSocket closed before connection established'), + ); + _connectionCompleter = null; + } + + if (!_isConnected) return; + _isConnected = false; + _socketId = null; + _connectionStateController.add(BroadcastConnectionState.reconnecting); + _scheduleReconnect(); + } + + void _onError(Object error) { + // Route through interceptor chain. + dynamic processed = error; + for (final interceptor in _interceptors) { + processed = interceptor.onError(processed); + } + + if (_connectionCompleter != null && !_connectionCompleter!.isCompleted) { + _connectionCompleter!.completeError(error); + _connectionCompleter = null; + } + + if (!_isConnected) return; + _isConnected = false; + _socketId = null; + _connectionStateController.add(BroadcastConnectionState.reconnecting); + _scheduleReconnect(); + } + + void _scheduleReconnect({bool immediate = false}) { + final shouldReconnect = _config['reconnect'] as bool? ?? true; + if (!shouldReconnect) return; + + _reconnectTimer?.cancel(); + + final delay = immediate ? Duration.zero : backoffDelay(_attempt); + _attempt++; + + _reconnectTimer = Timer(delay, () async { + try { + _streamSubscription?.cancel(); + _streamSubscription = null; + _broadcastStreamController?.close(); + _broadcastStreamController = null; + _channel = null; + _isConnected = false; + + await connect(); + + // Resubscribe all channels. + for (final entry in _channels.entries) { + final name = entry.key; + if (name.startsWith('presence-') || name.startsWith('private-')) { + _authenticateAndSubscribe(name); + } else { + _send({ + 'event': 'pusher:subscribe', + 'data': {'channel': name}, + }); + } + } + + _onReconnectController.add(null); + } catch (_) { + _scheduleReconnect(); + } + }); + } + + // --------------------------------------------------------------------------- + // Wire protocol helpers + // --------------------------------------------------------------------------- + + void _sendPong() { + _send({'event': 'pusher:pong', 'data': {}}); + } + + void _send(Map payload) { + // Interceptor chain — onSend. + var message = payload; + for (final interceptor in _interceptors) { + message = interceptor.onSend(message); + } + + final encoded = jsonEncode(message); + _channel?.sink.add(encoded); + } +} + +// --------------------------------------------------------------------------- +// ReverbBroadcastChannel +// --------------------------------------------------------------------------- + +/// A [BroadcastChannel] implementation backed by a [StreamController] that +/// receives events routed by the [ReverbBroadcastDriver]. +class ReverbBroadcastChannel implements BroadcastChannel { + /// Creates a [ReverbBroadcastChannel] for [name]. + ReverbBroadcastChannel(this.name); + + @override + final String name; + + final StreamController _controller = + StreamController.broadcast(); + + final Map> _listeners = + >{}; + + @override + Stream get events => _controller.stream; + + @override + BroadcastChannel listen( + String event, + void Function(BroadcastEvent) callback, + ) { + // Cancel any existing listener for this event name. + _listeners[event]?.cancel(); + + _listeners[event] = _controller.stream + .where((e) => e.event == event) + .listen(callback); + + return this; + } + + @override + void stopListening(String event) { + _listeners[event]?.cancel(); + _listeners.remove(event); + } + + /// Adds an [event] to this channel's stream (internal, called by driver). + void addEvent(BroadcastEvent event) { + if (!_controller.isClosed) { + _controller.add(event); + } + } + + /// Disposes this channel, cancelling all listeners and closing the stream. + void dispose() { + for (final sub in _listeners.values) { + sub.cancel(); + } + _listeners.clear(); + _controller.close(); + } +} + +// --------------------------------------------------------------------------- +// ReverbBroadcastPresenceChannel +// --------------------------------------------------------------------------- + +/// A presence-aware [BroadcastChannel] that tracks connected members and +/// emits join/leave events. +class ReverbBroadcastPresenceChannel extends ReverbBroadcastChannel + implements BroadcastPresenceChannel { + /// Creates a [ReverbBroadcastPresenceChannel] for [name]. + ReverbBroadcastPresenceChannel(super.name); + + final List> _members = >[]; + + final StreamController> _onJoinController = + StreamController>.broadcast(); + + final StreamController> _onLeaveController = + StreamController>.broadcast(); + + @override + List> get members => + List>.unmodifiable(_members); + + @override + Stream> get onJoin => _onJoinController.stream; + + @override + Stream> get onLeave => _onLeaveController.stream; + + /// Handles a Pusher internal presence event. + /// + /// Supports `pusher_internal:member_added`, `pusher_internal:member_removed`, + /// and `pusher:subscription_succeeded`. + void handlePresenceEvent(String event, Map data) { + switch (event) { + case 'pusher_internal:member_added': + _members.add(data); + _onJoinController.add(data); + case 'pusher_internal:member_removed': + _members.removeWhere((m) => m['user_id'] == data['user_id']); + _onLeaveController.add(data); + case 'pusher:subscription_succeeded': + _parseInitialMembers(data); + } + } + + void _parseInitialMembers(Map data) { + final presence = data['presence'] as Map?; + if (presence == null) return; + + final hash = presence['hash'] as Map? ?? {}; + _members.clear(); + for (final entry in hash.entries) { + _members.add({ + 'user_id': entry.key, + 'user_info': entry.value, + }); + } + } + + @override + void dispose() { + _onJoinController.close(); + _onLeaveController.close(); + super.dispose(); + } +} diff --git a/lib/src/facades/echo.dart b/lib/src/facades/echo.dart new file mode 100644 index 0000000..be702b8 --- /dev/null +++ b/lib/src/facades/echo.dart @@ -0,0 +1,195 @@ +import '../broadcasting/broadcast_connection_state.dart'; +import '../broadcasting/broadcast_event.dart'; +import '../broadcasting/broadcast_manager.dart'; +import '../broadcasting/contracts/broadcast_channel.dart'; +import '../broadcasting/contracts/broadcast_driver.dart'; +import '../broadcasting/contracts/broadcast_interceptor.dart'; +import '../broadcasting/contracts/broadcast_presence_channel.dart'; +import '../foundation/magic.dart'; +import '../testing/fake_broadcast_manager.dart'; + +/// The Echo Facade. +/// +/// Provides static access to the broadcasting system, proxying all calls to +/// the bound [BroadcastManager]. Modeled after the Laravel Echo API. +/// +/// ```dart +/// // Subscribe to a public channel +/// Echo.channel('orders').listen('OrderShipped', (event) { +/// print('Order shipped: ${event.data}'); +/// }); +/// +/// // Subscribe to a private channel +/// Echo.private('user.1').listen('ProfileUpdated', (event) { +/// print('Profile updated: ${event.data}'); +/// }); +/// +/// // Join a presence channel +/// final presence = Echo.join('room.1'); +/// presence.onJoin.listen((member) => print('${member['name']} joined')); +/// ``` +class Echo { + Echo._(); + + static BroadcastManager get _manager => + Magic.make('broadcasting'); + + // --------------------------------------------------------------------------- + // Channel subscriptions + // --------------------------------------------------------------------------- + + /// Returns a public [BroadcastChannel] for [name]. + /// + /// Subscribes to the channel on the underlying driver. Subsequent calls for + /// the same [name] return the cached instance from the driver. + /// + /// ```dart + /// Echo.channel('orders').listen('OrderShipped', (e) => print(e.data)); + /// ``` + static BroadcastChannel channel(String name) => + _manager.connection().channel(name); + + /// Returns a private [BroadcastChannel] for [name]. + /// + /// The driver handles the `private-` prefix and auth handshake as required + /// by the server configuration. + /// + /// ```dart + /// Echo.private('user.1').listen('ProfileUpdated', (e) => print(e.data)); + /// ``` + static BroadcastChannel private(String name) => + _manager.connection().private(name); + + /// Joins a presence channel for [name] and returns it. + /// + /// The driver handles the `presence-` prefix, auth handshake, and member + /// list tracking. + /// + /// ```dart + /// final ch = Echo.join('room.1'); + /// ch.onJoin.listen((m) => print('${m['name']} joined')); + /// ``` + static BroadcastPresenceChannel join(String name) => + _manager.connection().join(name); + + /// Listens for [event] on [channelName], invoking [callback] on each receipt. + /// + /// Convenience shorthand for `Echo.channel(name).listen(event, callback)`. + /// Returns the underlying [BroadcastChannel] for fluent chaining. + /// + /// ```dart + /// Echo.listen('orders', 'OrderShipped', (e) => print(e.data)); + /// ``` + static BroadcastChannel listen( + String channelName, + String event, + void Function(BroadcastEvent) callback, + ) => _manager.connection().channel(channelName).listen(event, callback); + + /// Unsubscribes from the channel identified by [name]. + /// + /// Has no effect if the channel was not previously subscribed. + /// + /// ```dart + /// Echo.leave('orders'); + /// ``` + static void leave(String name) => _manager.connection().leave(name); + + // --------------------------------------------------------------------------- + // Connection lifecycle + // --------------------------------------------------------------------------- + + /// Establishes the connection to the broadcast server. + /// + /// Resolves once the connection is ready. Throws on unrecoverable failure. + /// + /// ```dart + /// await Echo.connect(); + /// ``` + static Future connect() => _manager.connection().connect(); + + /// Closes the connection and releases all resources. + /// + /// ```dart + /// await Echo.disconnect(); + /// ``` + static Future disconnect() => _manager.connection().disconnect(); + + // --------------------------------------------------------------------------- + // Driver / connection accessors + // --------------------------------------------------------------------------- + + /// Returns the default [BroadcastDriver] resolved by the manager. + /// + /// Use this when you need direct access to driver-level capabilities not + /// exposed by the facade. + static BroadcastDriver get connection => _manager.connection(); + + /// The socket identifier assigned by the server, or `null` when not connected. + /// + /// Required for server-side auth endpoints that validate channel subscriptions. + static String? get socketId => _manager.connection().socketId; + + /// A broadcast stream of [BroadcastConnectionState] transitions. + /// + /// Emits the new state each time the connection lifecycle changes. + static Stream get connectionState => + _manager.connection().connectionState; + + /// A stream that emits once each time the driver successfully reconnects. + /// + /// Useful for re-subscribing to channels after a connection drop. + static Stream get onReconnect => _manager.connection().onReconnect; + + // --------------------------------------------------------------------------- + // Interceptors + // --------------------------------------------------------------------------- + + /// Registers an [interceptor] to be applied to all messages on the connection. + /// + /// Interceptors are invoked in the order they are added. + /// + /// ```dart + /// Echo.addInterceptor(LoggingBroadcastInterceptor()); + /// ``` + static void addInterceptor(BroadcastInterceptor interceptor) => + _manager.connection().addInterceptor(interceptor); + + // --------------------------------------------------------------------------- + // Manager access + // --------------------------------------------------------------------------- + + /// Returns the underlying [BroadcastManager]. + /// + /// Use for advanced configuration such as registering custom drivers via + /// [BroadcastManager.extend]. + static BroadcastManager get manager => _manager; + + // --------------------------------------------------------------------------- + // Testing + // --------------------------------------------------------------------------- + + /// Replace the bound [BroadcastManager] with a [FakeBroadcastManager] for testing. + /// + /// Returns the [FakeBroadcastManager] so callers can make assertions. + /// + /// ```dart + /// final fake = Echo.fake(); + /// + /// Echo.channel('orders'); + /// + /// fake.assertSubscribed('orders'); + /// ``` + static FakeBroadcastManager fake() { + final manager = FakeBroadcastManager(); + Magic.app.setInstance('broadcasting', manager); + return manager; + } + + /// Restore the real [BroadcastManager] binding, removing the fake. + /// + /// ```dart + /// Echo.unfake(); + /// ``` + static void unfake() => Magic.app.removeInstance('broadcasting'); +} diff --git a/lib/src/foundation/application.dart b/lib/src/foundation/application.dart index c3951aa..3febf4d 100644 --- a/lib/src/foundation/application.dart +++ b/lib/src/foundation/application.dart @@ -2,6 +2,7 @@ import '../../config/app.dart'; import '../../config/auth.dart'; import '../../config/cache.dart'; import '../../config/database.dart'; +import '../../config/broadcasting.dart'; import '../../config/localization.dart'; import '../../config/logging.dart'; import '../../config/network.dart'; @@ -136,7 +137,8 @@ class MagicApp { ..addAll(defaultLoggingConfig) ..addAll(defaultNetworkConfig) ..addAll(defaultViewConfig) - ..addAll(defaultLocalizationConfig); + ..addAll(defaultLocalizationConfig) + ..addAll(defaultBroadcastingConfig); } // --------------------------------------------------------------------------- diff --git a/lib/src/testing/fake_broadcast_manager.dart b/lib/src/testing/fake_broadcast_manager.dart new file mode 100644 index 0000000..0fa35f1 --- /dev/null +++ b/lib/src/testing/fake_broadcast_manager.dart @@ -0,0 +1,211 @@ +import '../broadcasting/broadcast_connection_state.dart'; +import '../broadcasting/broadcast_event.dart'; +import '../broadcasting/broadcast_manager.dart'; +import '../broadcasting/contracts/broadcast_channel.dart'; +import '../broadcasting/contracts/broadcast_driver.dart'; +import '../broadcasting/contracts/broadcast_interceptor.dart'; +import '../broadcasting/contracts/broadcast_presence_channel.dart'; + +/// A fake [BroadcastManager] for testing. +/// +/// Routes all broadcasting operations through an in-memory fake driver instead +/// of a real WebSocket connection. Provides assertion helpers for verifying +/// expected broadcasting activity. +/// +/// ```dart +/// final fake = Echo.fake(); +/// +/// Echo.channel('orders'); +/// +/// fake.assertSubscribed('orders'); +/// fake.assertConnected(); +/// ``` +class FakeBroadcastManager extends BroadcastManager { + final FakeBroadcastDriver _driver = FakeBroadcastDriver(); + + /// The underlying fake driver, exposed for direct inspection in tests. + FakeBroadcastDriver get driver => _driver; + + @override + BroadcastDriver connection([String? name]) => _driver; + + // --------------------------------------------------------------------------- + // Assertions + // --------------------------------------------------------------------------- + + /// Assert that the fake driver is currently connected. + /// + /// Throws [AssertionError] if the driver is not connected. + void assertConnected() { + if (!_driver._connected) { + throw AssertionError( + 'Expected the broadcast driver to be connected but it was disconnected.', + ); + } + } + + /// Assert that the fake driver is currently disconnected. + /// + /// Throws [AssertionError] if the driver is connected. + void assertDisconnected() { + if (_driver._connected) { + throw AssertionError( + 'Expected the broadcast driver to be disconnected but it was connected.', + ); + } + } + + /// Assert that [channel] is currently in the subscribed channels list. + /// + /// Throws [AssertionError] if [channel] was not subscribed. + void assertSubscribed(String channel) { + if (!_driver._subscribedChannels.contains(channel)) { + throw AssertionError( + 'Expected channel "$channel" to be subscribed but it was not found. ' + 'Subscribed channels: ${_driver._subscribedChannels}', + ); + } + } + + /// Assert that [channel] is NOT in the subscribed channels list. + /// + /// Throws [AssertionError] if [channel] is currently subscribed. + void assertNotSubscribed(String channel) { + if (_driver._subscribedChannels.contains(channel)) { + throw AssertionError( + 'Expected channel "$channel" to not be subscribed but it was found.', + ); + } + } + + /// Assert that at least one interceptor has been added to the driver. + /// + /// Throws [AssertionError] if no interceptors have been added. + void assertInterceptorAdded() { + if (_driver._addedInterceptors.isEmpty) { + throw AssertionError( + 'Expected at least one interceptor to have been added but none were recorded.', + ); + } + } + + // --------------------------------------------------------------------------- + // Reset + // --------------------------------------------------------------------------- + + /// Clear all recorded state on the fake driver. + void reset() => _driver._reset(); +} + +// --------------------------------------------------------------------------- +// Fake driver — records operations, no real connection +// --------------------------------------------------------------------------- + +/// An in-memory [BroadcastDriver] that records all operations for assertions. +/// +/// Accessed via [FakeBroadcastManager.driver] in tests. +class FakeBroadcastDriver implements BroadcastDriver { + bool _connected = false; + final List _subscribedChannels = []; + final List _addedInterceptors = []; + + /// Whether the driver is currently connected. + @override + bool get isConnected => _connected; + + /// The list of channel names currently recorded as subscribed. + List get subscribedChannels => List.unmodifiable(_subscribedChannels); + + /// The list of interceptors that have been added. + List get addedInterceptors => + List.unmodifiable(_addedInterceptors); + + @override + Future connect() async => _connected = true; + + @override + Future disconnect() async => _connected = false; + + @override + String? get socketId => _connected ? 'fake-socket-id' : null; + + @override + Stream get connectionState => const Stream.empty(); + + @override + Stream get onReconnect => const Stream.empty(); + + @override + BroadcastChannel channel(String name) { + _subscribedChannels.add(name); + return _FakeBroadcastChannel(name); + } + + @override + BroadcastChannel private(String name) { + _subscribedChannels.add('private-$name'); + return _FakeBroadcastChannel('private-$name'); + } + + @override + BroadcastPresenceChannel join(String name) { + _subscribedChannels.add('presence-$name'); + return _FakeBroadcastPresenceChannel('presence-$name'); + } + + @override + void leave(String name) => _subscribedChannels.remove(name); + + @override + void addInterceptor(BroadcastInterceptor interceptor) => + _addedInterceptors.add(interceptor); + + void _reset() { + _connected = false; + _subscribedChannels.clear(); + _addedInterceptors.clear(); + } +} + +// --------------------------------------------------------------------------- +// Internal fake channel — stub with empty events stream +// --------------------------------------------------------------------------- + +class _FakeBroadcastChannel implements BroadcastChannel { + _FakeBroadcastChannel(this._name); + + final String _name; + + @override + String get name => _name; + + @override + Stream get events => const Stream.empty(); + + @override + BroadcastChannel listen( + String event, + void Function(BroadcastEvent) callback, + ) => this; + + @override + void stopListening(String event) {} +} + +// --------------------------------------------------------------------------- +// Internal fake presence channel — empty members and streams +// --------------------------------------------------------------------------- + +class _FakeBroadcastPresenceChannel extends _FakeBroadcastChannel + implements BroadcastPresenceChannel { + _FakeBroadcastPresenceChannel(super.name); + + @override + List> get members => const []; + + @override + Stream> get onJoin => const Stream.empty(); + + @override + Stream> get onLeave => const Stream.empty(); +} diff --git a/pubspec.yaml b/pubspec.yaml index 9430eeb..bf440f2 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -45,6 +45,7 @@ dependencies: file_picker: ^10.3.10 image_picker: ^1.1.2 faker: ^2.2.0 + web_socket_channel: ^3.0.0 magic_cli: ^0.0.1-alpha.3 flutter_test: sdk: flutter diff --git a/skills/magic-framework/SKILL.md b/skills/magic-framework/SKILL.md index 77970b0..55a2789 100644 --- a/skills/magic-framework/SKILL.md +++ b/skills/magic-framework/SKILL.md @@ -64,7 +64,7 @@ Use `configFactories` (not `configs`) when any value depends on `Env.get()`. The | `Magic.flush()` | Clear all controllers (testing) | | `MagicApp.reset()` | Full container reset (testing) | -### Facade Summary (16 Facades) +### Facade Summary (17 Facades) | Facade | Purpose | Key Methods | |--------|---------|-------------| @@ -76,6 +76,7 @@ Use `configFactories` (not `configs`) when any value depends on `Env.get()`. The | `Schema` | Migrations | `create()`, `drop()`, `hasTable()` | | `Log` | Logging | `info()`, `error()`, `warning()`, `debug()` | | `Event` | Events | `dispatch(event)` | +| `Echo` | Broadcasting | `channel()`, `private()`, `join()`, `listen()`, `leave()`, `connect()`, `disconnect()`, `socketId`, `fake()` | | `MagicRoute` | Routing | `page()`, `group()`, `layout()`, `to()`, `back({fallback?})`, `replace()`, `push()`, `toNamed()` | | `Gate` | Authorization | `allows()`, `denies()`, `define()`, `policy()` | | `Lang` | Localization | `get()`, `locale()` | @@ -623,7 +624,7 @@ Official plugins extending Magic Framework. Each has its own package, service pr | `references/routing-navigation.md` | MagicRoute.page(), group(), layout(), navigation (to/back/replace/push/toNamed), middleware, transitions, MagicRouterOutlet, path/query parameters | Defining routes, navigation, or middleware | | `references/http-network.md` | Http facade (get/post/put/delete/upload + RESTful resource methods), MagicResponse API, interceptors, network config | Making HTTP requests, handling responses, or configuring network layer | | `references/auth-system.md` | Auth facade, AuthManager, guards (Bearer, BasicAuth, ApiKey), token refresh, setUserFactory, restore, policies, Gate, MagicCan | Implementing authentication, authorization, or token management | -| `references/secondary-systems.md` | Cache, Events (EventDispatcher, register listeners), Logging, Localization (trans()), Storage, Encryption, Vault, Carbon date helper, Launch, Policies | Using caching, events, logging, i18n, file storage, encryption, or URL launching | +| `references/secondary-systems.md` | Cache, Events (EventDispatcher, register listeners), Logging, Localization (trans()), Storage, Encryption, Vault, Carbon date helper, Launch, Policies, Broadcasting (Echo facade, BroadcastManager, channels, interceptors) | Using caching, events, logging, i18n, file storage, encryption, URL launching, or real-time broadcasting | | `references/testing-patterns.md` | Test setup (MagicApp.reset + Magic.flush), mocking via contracts, controller/model/middleware testing patterns | Writing tests for any Magic framework code | | `references/cli-commands.md` | Full CLI reference: install, all make:* generators with flags, key:generate | Scaffolding code, initializing projects, or generating files with the CLI | | `references/plugin-deeplink.md` | DeeplinkManager, handlers, drivers, config, RouteDeeplinkHandler, OneSignalDeeplinkHandler | Working with deep links, universal links, or app links | diff --git a/skills/magic-framework/references/cli-commands.md b/skills/magic-framework/references/cli-commands.md index 0bf582c..3b72db0 100644 --- a/skills/magic-framework/references/cli-commands.md +++ b/skills/magic-framework/references/cli-commands.md @@ -51,6 +51,7 @@ dart run magic:magic install --without-database --without-events | `--without-localization` | Skip i18n/translator and `assets/lang/` directory | | `--without-logging` | Skip logging channels and `config/logging.dart` | | `--without-network` | Skip HTTP/Dio network layer and `config/network.dart` | +| `--without-broadcasting` | Skip broadcasting/WebSocket setup and `config/broadcasting.dart` | **Generated structure:** @@ -59,6 +60,7 @@ lib/ ├── config/ │ ├── app.dart # App name, env, providers list │ ├── auth.dart # Guard config, token endpoints +│ ├── broadcasting.dart # Broadcasting connections (Reverb, null) │ ├── cache.dart # Cache driver, TTL │ ├── database.dart # SQLite connection │ ├── logging.dart # Log channels diff --git a/skills/magic-framework/references/secondary-systems.md b/skills/magic-framework/references/secondary-systems.md index 702b337..bc15470 100644 --- a/skills/magic-framework/references/secondary-systems.md +++ b/skills/magic-framework/references/secondary-systems.md @@ -786,6 +786,165 @@ await file.storeAs(diskName); // Store with disk selection ⚠️ Desktop camera requires custom delegate setup. +## Broadcasting + +Laravel Echo-equivalent real-time channel system over WebSockets. Accessed via the `Echo` facade backed by `BroadcastManager`. + +> **Important**: `BroadcastServiceProvider` is NOT auto-registered. Add it explicitly to the `providers` list in config. + +### Echo Facade API + +| Method / Property | Returns | Description | +|:------------------|:--------|:------------| +| `Echo.channel(name)` | `BroadcastChannel` | Subscribe to a public channel | +| `Echo.private(name)` | `BroadcastChannel` | Subscribe to a private channel (auth handshake required) | +| `Echo.join(name)` | `BroadcastPresenceChannel` | Join a presence channel (auth + member tracking) | +| `Echo.listen(channel, event, callback)` | `BroadcastChannel` | Shorthand: subscribe + listen in one call | +| `Echo.leave(name)` | `void` | Unsubscribe from a channel | +| `Echo.connect()` | `Future` | Establish the WebSocket connection | +| `Echo.disconnect()` | `Future` | Close the connection | +| `Echo.connection` | `BroadcastDriver` | The resolved default driver instance | +| `Echo.socketId` | `String?` | Server-assigned socket ID, or `null` when disconnected | +| `Echo.connectionState` | `Stream` | Stream of connection lifecycle state changes | +| `Echo.onReconnect` | `Stream` | Emits once each time the driver reconnects | +| `Echo.addInterceptor(interceptor)` | `void` | Register a `BroadcastInterceptor` on the connection | +| `Echo.manager` | `BroadcastManager` | The underlying manager (for `extend()` and advanced use) | +| `Echo.fake()` | `FakeBroadcastManager` | Swap to in-memory fake for testing | +| `Echo.unfake()` | `void` | Restore the real manager binding | + +### Channel Types + +- **Public** (`Echo.channel('name')`): No auth. Any connected client may subscribe. +- **Private** (`Echo.private('name')`): Driver adds `private-` prefix, performs HTTP auth via `auth_endpoint`. +- **Presence** (`Echo.join('name')`): Driver adds `presence-` prefix, auth + member list tracking. Returns `BroadcastPresenceChannel` with `members`, `onJoin`, `onLeave`. + +### BroadcastChannel API + +| Method | Returns | Description | +|:-------|:--------|:------------| +| `channel.listen(event, callback)` | `BroadcastChannel` | Register a listener for an event name (chainable) | +| `channel.stopListening(event)` | `void` | Remove a listener | +| `channel.events` | `Stream` | Raw stream of all events on this channel | +| `channel.name` | `String` | Fully-qualified channel name | + +### BroadcastEvent Fields + +| Property | Type | Description | +|:---------|:-----|:------------| +| `event` | `String` | Event name (e.g. `'App\\Events\\OrderShipped'`) | +| `channel` | `String` | Channel name the event arrived on | +| `data` | `Map` | Decoded JSON payload | +| `receivedAt` | `DateTime` | Local timestamp of receipt | + +### BroadcastConnectionState + +Values: `connecting`, `connected`, `disconnected`, `reconnecting`. + +### BroadcastManager + +| Method | Description | +|:-------|:------------| +| `BroadcastManager.extend(name, factory)` | Register a custom driver factory `(Map) => BroadcastDriver` | +| `BroadcastManager.resetDrivers()` | Clear all custom driver registrations (testing) | +| `manager.connection([name])` | Resolve named or default driver (result cached for default) | + +### BroadcastInterceptor Contract + +All methods have pass-through default implementations; override only what you need: + +```dart +abstract class BroadcastInterceptor { + Map onSend(Map message) => message; + BroadcastEvent onReceive(BroadcastEvent event) => event; + dynamic onError(dynamic error) => error; +} +``` + +Register interceptors via `Echo.addInterceptor(interceptor)` or `driver.addInterceptor(interceptor)`. + +### ReverbBroadcastDriver (Pusher Protocol) + +Handles the full Pusher protocol over WebSocket: connection handshake, ping/pong keepalive, public/private/presence subscriptions, event deduplication via ring buffer, and automatic reconnection with exponential backoff. + +Config keys under `broadcasting.connections.reverb`: + +| Key | Default | Description | +|:----|:--------|:------------| +| `host` | `'localhost'` | WebSocket server host | +| `port` | `8080` | WebSocket server port | +| `scheme` | `'ws'` | `ws` or `wss` | +| `app_key` | `''` | Reverb/Pusher application key | +| `auth_endpoint` | `'/broadcasting/auth'` | HTTP endpoint for private/presence auth | +| `reconnect` | `true` | Auto-reconnect on disconnect | +| `max_reconnect_delay` | `30000` | Max backoff delay in ms | +| `dedup_buffer_size` | `100` | Ring buffer size for deduplication | + +The `channelFactory` constructor parameter overrides WebSocket creation for testing (dependency injection). + +### NullBroadcastDriver + +Silently drops all broadcast operations. Used for local development or when `broadcasting.default` is `'null'`. `BroadcastServiceProvider` skips `connect()` when the default connection is `null`. + +### FakeBroadcastManager (Testing) + +```dart +final fake = Echo.fake(); + +// Trigger some code that uses Echo... +Echo.channel('orders'); +Echo.private('user.1'); + +// Assert +fake.assertConnected(); +fake.assertSubscribed('orders'); +fake.assertSubscribed('private-user.1'); +fake.assertNotSubscribed('presence-room.1'); +fake.assertInterceptorAdded(); + +// Inspect driver directly +expect(fake.driver.subscribedChannels, hasLength(2)); + +// Reset between test cases +fake.reset(); +``` + +### Usage Examples + +```dart +import 'package:magic/magic.dart'; + +// Public channel +Echo.channel('orders').listen('OrderShipped', (event) { + print('Order ${event.data['id']} shipped'); +}); + +// Private channel (auth required) +Echo.private('user.${userId}').listen('ProfileUpdated', (event) { + print('Profile updated: ${event.data}'); +}); + +// Presence channel +final room = Echo.join('room.1'); +room.onJoin.listen((member) => print('${member['name']} joined')); +room.onLeave.listen((member) => print('${member['name']} left')); +room.listen('MessagePosted', (event) => print(event.data['body'])); + +// React to connection state +Echo.connectionState.listen((state) { + if (state == BroadcastConnectionState.reconnecting) { + showReconnectingBanner(); + } +}); + +// Re-subscribe after reconnect +Echo.onReconnect.listen((_) { + Echo.channel('orders').listen('OrderShipped', onShipped); +}); + +// Custom driver +BroadcastManager.extend('pusher', (config) => PusherBroadcastDriver(config)); +``` + ## Key Gotchas - **Cache**: `remember()` returns cached value directly (not awaited) if it exists; only awaits callback on miss. diff --git a/test/broadcasting/broadcast_event_test.dart b/test/broadcasting/broadcast_event_test.dart new file mode 100644 index 0000000..161a4ae --- /dev/null +++ b/test/broadcasting/broadcast_event_test.dart @@ -0,0 +1,98 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/src/broadcasting/broadcast_connection_state.dart'; +import 'package:magic/src/broadcasting/broadcast_event.dart'; + +void main() { + group('BroadcastEvent', () { + test('constructs with required fields', () { + final now = DateTime.now(); + final event = BroadcastEvent( + event: 'OrderShipped', + channel: 'orders', + data: {'orderId': 42}, + receivedAt: now, + ); + + expect(event.event, 'OrderShipped'); + expect(event.channel, 'orders'); + expect(event.data, {'orderId': 42}); + expect(event.receivedAt, now); + }); + + test('data defaults to empty map when not provided', () { + final event = BroadcastEvent( + event: 'Ping', + channel: 'public', + data: const {}, + receivedAt: DateTime.now(), + ); + + expect(event.data, isEmpty); + }); + + test('toString includes event name and channel', () { + final event = BroadcastEvent( + event: 'UserJoined', + channel: 'presence-room.1', + data: const {'userId': 7}, + receivedAt: DateTime(2024, 1, 15, 10, 30), + ); + + final str = event.toString(); + expect(str, contains('UserJoined')); + expect(str, contains('presence-room.1')); + }); + + test('data map is preserved as-is', () { + final data = { + 'nested': {'key': 'value'}, + 'list': [1, 2, 3], + 'flag': true, + }; + final event = BroadcastEvent( + event: 'Complex', + channel: 'test', + data: data, + receivedAt: DateTime.now(), + ); + + expect(event.data['nested'], {'key': 'value'}); + expect(event.data['list'], [1, 2, 3]); + expect(event.data['flag'], true); + }); + }); + + group('BroadcastConnectionState', () { + test('has exactly 4 values', () { + expect(BroadcastConnectionState.values, hasLength(4)); + }); + + test('contains connecting state', () { + expect( + BroadcastConnectionState.values, + contains(BroadcastConnectionState.connecting), + ); + }); + + test('contains connected state', () { + expect( + BroadcastConnectionState.values, + contains(BroadcastConnectionState.connected), + ); + }); + + test('contains disconnected state', () { + expect( + BroadcastConnectionState.values, + contains(BroadcastConnectionState.disconnected), + ); + }); + + test('contains reconnecting state', () { + expect( + BroadcastConnectionState.values, + contains(BroadcastConnectionState.reconnecting), + ); + }); + }); +} diff --git a/test/broadcasting/broadcast_manager_test.dart b/test/broadcasting/broadcast_manager_test.dart new file mode 100644 index 0000000..e70ccdc --- /dev/null +++ b/test/broadcasting/broadcast_manager_test.dart @@ -0,0 +1,279 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; + +// --------------------------------------------------------------------------- +// Test doubles +// --------------------------------------------------------------------------- + +/// Minimal [BroadcastDriver] stub — records which config it was created with. +class _StubDriver implements BroadcastDriver { + final Map config; + + _StubDriver(this.config); + + @override + Future connect() async {} + + @override + Future disconnect() async {} + + @override + String? get socketId => null; + + @override + bool get isConnected => false; + + @override + Stream get connectionState => const Stream.empty(); + + @override + Stream get onReconnect => const Stream.empty(); + + @override + BroadcastChannel channel(String name) => throw UnimplementedError(); + + @override + BroadcastChannel private(String name) => throw UnimplementedError(); + + @override + BroadcastPresenceChannel join(String name) => throw UnimplementedError(); + + @override + void leave(String name) {} + + @override + void addInterceptor(BroadcastInterceptor interceptor) {} +} + +/// A second distinct stub — lets tests distinguish between two custom drivers. +class _AnotherStubDriver extends _StubDriver { + _AnotherStubDriver(super.config); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Registers a minimal broadcasting config so [BroadcastManager] can resolve. +void _setConfig({ + String defaultConnection = 'null', + Map? connections, +}) { + Config.set('broadcasting.default', defaultConnection); + Config.set( + 'broadcasting.connections', + connections ?? + { + 'null': {'driver': 'null'}, + }, + ); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + BroadcastManager.resetDrivers(); + }); + + group('BroadcastManager — custom driver registration', () { + test('extend() registers a custom driver factory', () { + BroadcastManager.extend( + 'stub', + (Map config) => _StubDriver(config), + ); + + _setConfig( + defaultConnection: 'my_conn', + connections: { + 'my_conn': {'driver': 'stub'}, + }, + ); + + final manager = BroadcastManager(); + final driver = manager.connection(); + + expect(driver, isA<_StubDriver>()); + }); + + test('extend() with different names registers independently', () { + BroadcastManager.extend( + 'stub_a', + (Map config) => _StubDriver(config), + ); + BroadcastManager.extend( + 'stub_b', + (Map config) => _AnotherStubDriver(config), + ); + + _setConfig( + connections: { + 'conn_a': {'driver': 'stub_a'}, + 'conn_b': {'driver': 'stub_b'}, + }, + ); + + final manager = BroadcastManager(); + + expect(manager.connection('conn_a'), isA<_StubDriver>()); + expect(manager.connection('conn_b'), isA<_AnotherStubDriver>()); + }); + }); + + group('BroadcastManager — resetDrivers()', () { + test('resetDrivers() clears all registered custom drivers', () { + BroadcastManager.extend( + 'stub', + (Map config) => _StubDriver(config), + ); + + BroadcastManager.resetDrivers(); + + // After reset, 'stub' is no longer registered — resolving falls through + // to the default (NullBroadcastDriver). + _setConfig( + defaultConnection: 'my_conn', + connections: { + 'my_conn': {'driver': 'stub'}, + }, + ); + + final manager = BroadcastManager(); + final driver = manager.connection(); + + // Falls back to default — should NOT be _StubDriver. + expect(driver, isNot(isA<_StubDriver>())); + }); + + test('resetDrivers() is idempotent on empty registry', () { + expect(() => BroadcastManager.resetDrivers(), returnsNormally); + }); + }); + + group('BroadcastManager — config forwarding', () { + test('custom driver factory receives full connection config', () { + Map? capturedConfig; + + BroadcastManager.extend('spy', (Map config) { + capturedConfig = config; + return _StubDriver(config); + }); + + _setConfig( + defaultConnection: 'spy_conn', + connections: { + 'spy_conn': { + 'driver': 'spy', + 'host': 'ws.example.com', + 'port': 6001, + 'app_key': 'abc123', + }, + }, + ); + + BroadcastManager().connection(); + + expect(capturedConfig, isNotNull); + expect(capturedConfig!['host'], equals('ws.example.com')); + expect(capturedConfig!['port'], equals(6001)); + expect(capturedConfig!['app_key'], equals('abc123')); + }); + }); + + group('BroadcastManager — default connection caching', () { + test( + 'connection() without args returns same instance on repeated calls', + () { + _setConfig(); + + final manager = BroadcastManager(); + final first = manager.connection(); + final second = manager.connection(); + + expect(identical(first, second), isTrue); + }, + ); + + test('named connection() is NOT cached as default', () { + _setConfig( + connections: { + 'null': {'driver': 'null'}, + 'other': {'driver': 'null'}, + }, + ); + + final manager = BroadcastManager(); + final named = manager.connection('other'); + final defaultConn = manager.connection(); + + // They are separate instances because named was never set as cached. + expect(identical(named, defaultConn), isFalse); + }); + + test('second manager instance resolves fresh (no shared cache)', () { + _setConfig(); + + final a = BroadcastManager().connection(); + final b = BroadcastManager().connection(); + + // Different manager instances → different cached objects. + expect(identical(a, b), isFalse); + }); + }); + + group('BroadcastManager — named connection resolution', () { + test('connection(name) resolves the named connection', () { + BroadcastManager.extend( + 'stub', + (Map config) => _StubDriver(config), + ); + + _setConfig( + connections: { + 'null': {'driver': 'null'}, + 'custom': {'driver': 'stub'}, + }, + ); + + final manager = BroadcastManager(); + final driver = manager.connection('custom'); + + expect(driver, isA<_StubDriver>()); + }); + + test('connection() falls back to broadcasting.default when no arg', () { + BroadcastManager.extend( + 'stub', + (Map config) => _StubDriver(config), + ); + + _setConfig( + defaultConnection: 'my_default', + connections: { + 'my_default': {'driver': 'stub'}, + }, + ); + + final manager = BroadcastManager(); + final driver = manager.connection(); + + expect(driver, isA<_StubDriver>()); + }); + + test('unknown driver name falls back to NullBroadcastDriver', () { + _setConfig( + defaultConnection: 'weird', + connections: { + 'weird': {'driver': 'totally_unknown'}, + }, + ); + + // Should not throw — falls back gracefully. + expect(() => BroadcastManager().connection(), returnsNormally); + }); + }); +} diff --git a/test/broadcasting/broadcast_service_provider_test.dart b/test/broadcasting/broadcast_service_provider_test.dart new file mode 100644 index 0000000..2f0f12b --- /dev/null +++ b/test/broadcasting/broadcast_service_provider_test.dart @@ -0,0 +1,129 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + BroadcastManager.resetDrivers(); + }); + + group('BroadcastServiceProvider — registration', () { + test('extends ServiceProvider', () { + final provider = BroadcastServiceProvider(MagicApp.instance); + + expect(provider, isA()); + }); + + test( + 'register() binds BroadcastManager singleton under "broadcasting"', + () { + final app = MagicApp.instance; + final provider = BroadcastServiceProvider(app); + + provider.register(); + + expect( + app.make('broadcasting'), + isA(), + ); + }, + ); + + test( + 'register() binds a true singleton — same instance on repeated resolution', + () { + final app = MagicApp.instance; + final provider = BroadcastServiceProvider(app); + + provider.register(); + + final first = app.make('broadcasting'); + final second = app.make('broadcasting'); + + expect(identical(first, second), isTrue); + }, + ); + }); + + group('BroadcastServiceProvider — boot with null driver', () { + test('boot() completes without error when default is "null"', () async { + Config.set('broadcasting.default', 'null'); + Config.set('broadcasting.connections', { + 'null': {'driver': 'null'}, + }); + + final app = MagicApp.instance; + final provider = BroadcastServiceProvider(app); + + provider.register(); + + await expectLater(provider.boot(), completes); + }); + + test('boot() skips connect() when default is "null"', () async { + var connectCalled = false; + + BroadcastManager.extend('null', (Map config) { + return _SpyNullDriver(onConnect: () => connectCalled = true); + }); + + Config.set('broadcasting.default', 'null'); + Config.set('broadcasting.connections', { + 'null': {'driver': 'null'}, + }); + + final app = MagicApp.instance; + final provider = BroadcastServiceProvider(app); + + provider.register(); + await provider.boot(); + + expect(connectCalled, isFalse); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test doubles +// --------------------------------------------------------------------------- + +/// Spy driver that records whether [connect] was invoked. +class _SpyNullDriver implements BroadcastDriver { + final void Function() onConnect; + + _SpyNullDriver({required this.onConnect}); + + @override + Future connect() async => onConnect(); + + @override + Future disconnect() async {} + + @override + String? get socketId => null; + + @override + bool get isConnected => false; + + @override + Stream get connectionState => const Stream.empty(); + + @override + Stream get onReconnect => const Stream.empty(); + + @override + BroadcastChannel channel(String name) => throw UnimplementedError(); + + @override + BroadcastChannel private(String name) => throw UnimplementedError(); + + @override + BroadcastPresenceChannel join(String name) => throw UnimplementedError(); + + @override + void leave(String name) {} + + @override + void addInterceptor(BroadcastInterceptor interceptor) {} +} diff --git a/test/broadcasting/drivers/null_broadcast_driver_test.dart b/test/broadcasting/drivers/null_broadcast_driver_test.dart new file mode 100644 index 0000000..0c6720f --- /dev/null +++ b/test/broadcasting/drivers/null_broadcast_driver_test.dart @@ -0,0 +1,121 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + }); + + group('NullBroadcastDriver', () { + late NullBroadcastDriver driver; + + setUp(() { + driver = NullBroadcastDriver(); + }); + + test('connect() completes without error', () async { + await expectLater(driver.connect(), completes); + }); + + test('disconnect() completes without error', () async { + await expectLater(driver.disconnect(), completes); + }); + + test('isConnected is false', () { + expect(driver.isConnected, isFalse); + }); + + test('socketId is null', () { + expect(driver.socketId, isNull); + }); + + test('connectionState emits no events', () async { + final events = await driver.connectionState.toList(); + expect(events, isEmpty); + }); + + test('onReconnect emits no events', () async { + final events = await driver.onReconnect.toList(); + expect(events, isEmpty); + }); + + test('channel() returns a BroadcastChannel with correct name', () { + final ch = driver.channel('orders'); + expect(ch, isA()); + expect(ch.name, equals('orders')); + }); + + test('private() returns a BroadcastChannel with correct name', () { + final ch = driver.private('inbox'); + expect(ch, isA()); + expect(ch.name, equals('inbox')); + }); + + test( + 'join() returns a BroadcastPresenceChannel with correct name and empty members', + () { + final ch = driver.join('room.1'); + expect(ch, isA()); + expect(ch.name, equals('room.1')); + expect(ch.members, isEmpty); + }, + ); + + test('leave() does not throw', () { + expect(() => driver.leave('orders'), returnsNormally); + }); + + test('addInterceptor() does not throw', () { + final interceptor = _NoOpInterceptor(); + expect(() => driver.addInterceptor(interceptor), returnsNormally); + }); + }); + + group('_NullBroadcastChannel (via channel())', () { + late NullBroadcastDriver driver; + + setUp(() { + driver = NullBroadcastDriver(); + }); + + test('events stream emits no events', () async { + final ch = driver.channel('test'); + final events = await ch.events.toList(); + expect(events, isEmpty); + }); + + test('listen() returns this for fluent chaining', () { + final ch = driver.channel('test'); + final result = ch.listen('SomeEvent', (_) {}); + expect(result, same(ch)); + }); + + test('stopListening() does not throw', () { + final ch = driver.channel('test'); + expect(() => ch.stopListening('SomeEvent'), returnsNormally); + }); + }); + + group('_NullBroadcastPresenceChannel (via join())', () { + late NullBroadcastDriver driver; + + setUp(() { + driver = NullBroadcastDriver(); + }); + + test('onJoin emits no events', () async { + final ch = driver.join('room.1'); + final events = await ch.onJoin.toList(); + expect(events, isEmpty); + }); + + test('onLeave emits no events', () async { + final ch = driver.join('room.1'); + final events = await ch.onLeave.toList(); + expect(events, isEmpty); + }); + }); +} + +class _NoOpInterceptor extends BroadcastInterceptor {} diff --git a/test/broadcasting/drivers/reverb_broadcast_driver_test.dart b/test/broadcasting/drivers/reverb_broadcast_driver_test.dart new file mode 100644 index 0000000..a368b9f --- /dev/null +++ b/test/broadcasting/drivers/reverb_broadcast_driver_test.dart @@ -0,0 +1,874 @@ +import 'dart:async'; +import 'dart:convert'; + +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +// --------------------------------------------------------------------------- +// Mock WebSocket infrastructure +// --------------------------------------------------------------------------- + +/// A mock [WebSocketSink] that captures all sent messages. +class _MockWebSocketSink implements WebSocketSink { + final List messages = []; + bool isClosed = false; + int? closeCode; + String? closeReason; + + @override + void add(dynamic data) { + messages.add(data); + } + + @override + void addError(Object error, [StackTrace? stackTrace]) {} + + @override + Future addStream(Stream stream) => stream.drain(); + + @override + Future close([int? closeCode, String? closeReason]) { + isClosed = true; + this.closeCode = closeCode; + this.closeReason = closeReason; + return Future.value(); + } + + @override + Future get done => Future.value(); + + /// Returns all sent messages decoded as JSON maps. + List> get sentFrames => messages + .map((m) => jsonDecode(m as String) as Map) + .toList(); +} + +/// A mock [WebSocketChannel] backed by a [StreamController] for incoming +/// messages and a [_MockWebSocketSink] for outgoing messages. +class _MockWebSocketChannel implements WebSocketChannel { + _MockWebSocketChannel() + : _incomingController = StreamController.broadcast(); + + final StreamController _incomingController; + final _MockWebSocketSink _sink = _MockWebSocketSink(); + + @override + Stream get stream => _incomingController.stream; + + @override + WebSocketSink get sink => _sink; + + @override + Future get ready => Future.value(); + + @override + int? get closeCode => null; + + @override + String? get closeReason => null; + + @override + String? get protocol => null; + + @override + dynamic noSuchMethod(Invocation invocation) => super.noSuchMethod(invocation); + + /// Simulates a message arriving from the server. + void simulateMessage(Map payload) { + _incomingController.add(jsonEncode(payload)); + } + + /// Simulates the server closing the connection. + void simulateClose() { + _incomingController.close(); + } + + /// Simulates a stream error. + void simulateError(Object error) { + _incomingController.addError(error); + } + + /// Returns decoded sent frames from the sink. + List> get sentFrames => _sink.sentFrames; + + /// Closes the incoming controller for cleanup. + void dispose() { + _incomingController.close(); + } +} + +// --------------------------------------------------------------------------- +// Test interceptor +// --------------------------------------------------------------------------- + +class _TestInterceptor extends BroadcastInterceptor { + final List> sentMessages = []; + final List receivedEvents = []; + final List errors = []; + + @override + Map onSend(Map message) { + sentMessages.add(message); + return message; + } + + @override + BroadcastEvent onReceive(BroadcastEvent event) { + receivedEvents.add(event); + return event; + } + + @override + dynamic onError(dynamic error) { + errors.add(error); + return error; + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +Map _defaultConfig({Map? overrides}) { + final config = { + 'host': 'localhost', + 'port': 8080, + 'scheme': 'ws', + 'app_key': 'test-app-key', + 'auth_endpoint': '/broadcasting/auth', + 'reconnect': true, + 'max_reconnect_delay': 30000, + 'activity_timeout': 30, + 'dedup_buffer_size': 100, + }; + if (overrides != null) { + config.addAll(overrides); + } + return config; +} + +/// Schedules the Pusher `connection_established` handshake on [mock] after a +/// microtask, allowing [connect] to finish setting up stream listeners. +void _simulateConnectionEstablished( + _MockWebSocketChannel mock, { + String socketId = 'test-socket-id', + int activityTimeout = 30, +}) { + Future.delayed(Duration.zero, () { + mock.simulateMessage({ + 'event': 'pusher:connection_established', + 'data': jsonEncode({ + 'socket_id': socketId, + 'activity_timeout': activityTimeout, + }), + }); + }); +} + +/// Creates a driver connected to a mock channel with handshake complete. +Future<(ReverbBroadcastDriver, _MockWebSocketChannel)> _createConnectedDriver({ + Map? configOverrides, +}) async { + final mock = _MockWebSocketChannel(); + final driver = ReverbBroadcastDriver( + _defaultConfig(overrides: configOverrides), + channelFactory: (_) => mock, + ); + + _simulateConnectionEstablished(mock); + await driver.connect(); + return (driver, mock); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + }); + + group('ReverbBroadcastDriver — connection lifecycle', () { + test( + 'transitions from connecting to connected on connection_established', + () async { + final mock = _MockWebSocketChannel(); + final driver = ReverbBroadcastDriver( + _defaultConfig(), + channelFactory: (_) => mock, + ); + + final states = []; + driver.connectionState.listen(states.add); + + _simulateConnectionEstablished(mock); + await driver.connect(); + + expect(states, contains(BroadcastConnectionState.connecting)); + expect(states, contains(BroadcastConnectionState.connected)); + expect(driver.isConnected, isTrue); + + await driver.disconnect(); + }, + ); + + test('parses socketId from connection_established event', () async { + final (driver, _) = await _createConnectedDriver(); + + expect(driver.socketId, equals('test-socket-id')); + + await driver.disconnect(); + }); + + test('disconnect transitions to disconnected state', () async { + final (driver, _) = await _createConnectedDriver(); + + final states = []; + driver.connectionState.listen(states.add); + + await driver.disconnect(); + + expect(driver.isConnected, isFalse); + expect(driver.socketId, isNull); + expect(states, contains(BroadcastConnectionState.disconnected)); + }); + }); + + group('ReverbBroadcastDriver — ping/pong', () { + test('responds to pusher:ping with pusher:pong', () async { + final (driver, mock) = await _createConnectedDriver(); + + // Clear frames from handshake. + mock.sentFrames; + mock._sink.messages.clear(); + + mock.simulateMessage({ + 'event': 'pusher:ping', + 'data': {}, + }); + + // Allow microtask to process. + await Future.delayed(Duration.zero); + + final frames = mock.sentFrames; + expect( + frames, + contains( + predicate>((f) => f['event'] == 'pusher:pong'), + ), + ); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — channel subscription', () { + test('channel() sends pusher:subscribe frame with correct name', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock._sink.messages.clear(); + + driver.channel('orders'); + + await Future.delayed(Duration.zero); + + final frames = mock.sentFrames; + expect( + frames, + contains( + predicate>( + (f) => + f['event'] == 'pusher:subscribe' && + (f['data'] as Map)['channel'] == 'orders', + ), + ), + ); + + await driver.disconnect(); + }); + + test('channel() returns same instance on repeated calls', () async { + final (driver, _) = await _createConnectedDriver(); + + final ch1 = driver.channel('orders'); + final ch2 = driver.channel('orders'); + + expect(identical(ch1, ch2), isTrue); + + await driver.disconnect(); + }); + + test('private() sends subscribe frame with private- prefix', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock._sink.messages.clear(); + + // private() will attempt HTTP auth which will fail in unit test — + // but we can verify the channel name prefix is correct. + driver.private('orders'); + + // The channel object should be created with the prefixed name. + expect(driver.private('orders').name, equals('private-orders')); + + await driver.disconnect(); + }); + + test( + 'join() returns BroadcastPresenceChannel with presence- prefix', + () async { + final (driver, _) = await _createConnectedDriver(); + + final ch = driver.join('room.1'); + + expect(ch, isA()); + expect(ch.name, equals('presence-room.1')); + + await driver.disconnect(); + }, + ); + }); + + group('ReverbBroadcastDriver — event parsing', () { + test('routes parsed BroadcastEvent to correct channel', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.listen('OrderShipped', events.add); + + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': jsonEncode({'order_id': 42}), + }); + + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + expect(events.first.event, equals('OrderShipped')); + expect(events.first.channel, equals('orders')); + expect(events.first.data, equals({'order_id': 42})); + + await driver.disconnect(); + }); + + test('double-JSON decodes string data field to Map', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.listen('OrderShipped', events.add); + + // Data is a JSON-encoded string (Pusher standard). + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': jsonEncode({'item': 'widget'}), + }); + + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + expect(events.first.data, isA>()); + expect(events.first.data['item'], equals('widget')); + + await driver.disconnect(); + }); + + test('uses Map data directly when already decoded', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.events.listen(events.add); + + // Simulate a message where data is already a map (non-standard but defensive). + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': {'item': 'widget'}, + }); + + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + expect(events.first.data['item'], equals('widget')); + + await driver.disconnect(); + }); + + test( + 'routes StateError through interceptor when event key is missing', + () async { + final (driver, mock) = await _createConnectedDriver(); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + mock.simulateMessage({'channel': 'orders', 'data': '{}'}); + + await Future.delayed(Duration.zero); + + // _onMessage throws StateError, caught by listener, routed to interceptor. + expect(interceptor.errors.whereType(), isNotEmpty); + + await driver.disconnect(); + }, + ); + }); + + group('ReverbBroadcastDriver — deduplication', () { + test('drops duplicate events', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.events.listen(events.add); + + final payload = { + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': jsonEncode({'order_id': 1}), + }; + + mock.simulateMessage(payload); + await Future.delayed(Duration.zero); + mock.simulateMessage(payload); + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + + await driver.disconnect(); + }); + + test('evicts oldest entry when buffer is full', () async { + final (driver, mock) = await _createConnectedDriver( + configOverrides: {'dedup_buffer_size': 3}, + ); + + final ch = driver.channel('orders'); + final events = []; + ch.events.listen(events.add); + + // Fill buffer with 3 unique events. + for (var i = 0; i < 3; i++) { + mock.simulateMessage({ + 'event': 'Event$i', + 'channel': 'orders', + 'data': jsonEncode({'i': i}), + }); + await Future.delayed(Duration.zero); + } + expect(events, hasLength(3)); + + // Add one more — evicts event 0. + mock.simulateMessage({ + 'event': 'Event3', + 'channel': 'orders', + 'data': jsonEncode({'i': 3}), + }); + await Future.delayed(Duration.zero); + expect(events, hasLength(4)); + + // Replay event 0 — should pass through because it was evicted. + // Buffer is now [E1, E2, E3]. + mock.simulateMessage({ + 'event': 'Event0', + 'channel': 'orders', + 'data': jsonEncode({'i': 0}), + }); + await Future.delayed(Duration.zero); + expect(events, hasLength(5)); + + // Replay event 2 — still in buffer [E2, E3, E0], so dropped. + mock.simulateMessage({ + 'event': 'Event2', + 'channel': 'orders', + 'data': jsonEncode({'i': 2}), + }); + await Future.delayed(Duration.zero); + expect(events, hasLength(5)); + + await driver.disconnect(); + }); + + test('different events with same channel are not deduplicated', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.events.listen(events.add); + + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': jsonEncode({'id': 1}), + }); + await Future.delayed(Duration.zero); + + mock.simulateMessage({ + 'event': 'OrderCancelled', + 'channel': 'orders', + 'data': jsonEncode({'id': 1}), + }); + await Future.delayed(Duration.zero); + + expect(events, hasLength(2)); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — exponential backoff', () { + test('computes correct backoff delays capped at max', () { + final driver = ReverbBroadcastDriver( + _defaultConfig(overrides: {'max_reconnect_delay': 16000}), + ); + + // Formula: min(500 * 2^attempt, maxDelay) + expect(driver.backoffDelay(0), equals(const Duration(milliseconds: 500))); + expect( + driver.backoffDelay(1), + equals(const Duration(milliseconds: 1000)), + ); + expect( + driver.backoffDelay(2), + equals(const Duration(milliseconds: 2000)), + ); + expect( + driver.backoffDelay(3), + equals(const Duration(milliseconds: 4000)), + ); + expect( + driver.backoffDelay(4), + equals(const Duration(milliseconds: 8000)), + ); + expect( + driver.backoffDelay(5), + equals(const Duration(milliseconds: 16000)), + ); + // Capped at max. + expect( + driver.backoffDelay(6), + equals(const Duration(milliseconds: 16000)), + ); + expect( + driver.backoffDelay(10), + equals(const Duration(milliseconds: 16000)), + ); + }); + }); + + group('ReverbBroadcastDriver — interceptor chain', () { + test('invokes onSend for outbound messages', () async { + final (driver, mock) = await _createConnectedDriver(); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + mock._sink.messages.clear(); + + driver.channel('orders'); + + await Future.delayed(Duration.zero); + + expect(interceptor.sentMessages, isNotEmpty); + expect( + interceptor.sentMessages.first['event'], + equals('pusher:subscribe'), + ); + + await driver.disconnect(); + }); + + test('invokes onReceive for inbound events', () async { + final (driver, mock) = await _createConnectedDriver(); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + driver.channel('orders'); + + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': jsonEncode({'id': 1}), + }); + + await Future.delayed(Duration.zero); + + expect(interceptor.receivedEvents, hasLength(1)); + expect(interceptor.receivedEvents.first.event, equals('OrderShipped')); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — leave', () { + test('sends pusher:unsubscribe frame and removes channel', () async { + final (driver, mock) = await _createConnectedDriver(); + + driver.channel('orders'); + await Future.delayed(Duration.zero); + + mock._sink.messages.clear(); + + driver.leave('orders'); + + await Future.delayed(Duration.zero); + + final frames = mock.sentFrames; + expect( + frames, + contains( + predicate>( + (f) => + f['event'] == 'pusher:unsubscribe' && + (f['data'] as Map)['channel'] == 'orders', + ), + ), + ); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — reconnection', () { + test('reconnect resubscribes all active channels', () async { + final mock1 = _MockWebSocketChannel(); + var connectionCount = 0; + _MockWebSocketChannel? currentMock; + + final driver = ReverbBroadcastDriver( + _defaultConfig(overrides: {'reconnect': true}), + channelFactory: (_) { + connectionCount++; + if (connectionCount == 1) { + currentMock = mock1; + return mock1; + } + // Second connection (reconnect) — new mock. + final mock2 = _MockWebSocketChannel(); + currentMock = mock2; + // Simulate handshake on reconnect. + Future.delayed(Duration.zero, () { + mock2.simulateMessage({ + 'event': 'pusher:connection_established', + 'data': jsonEncode({ + 'socket_id': 'reconnected-socket-id', + 'activity_timeout': 30, + }), + }); + }); + return mock2; + }, + ); + + // Connect and subscribe to two channels. + _simulateConnectionEstablished(mock1); + await driver.connect(); + + driver.channel('orders'); + driver.channel('notifications'); + await Future.delayed(Duration.zero); + + // Simulate server closing the connection — triggers reconnect. + mock1.simulateClose(); + + // Allow reconnect timer (immediate=false, attempt 0 = 500ms, but in tests + // we can advance with a short wait and fakeAsync is not needed because + // the _scheduleReconnect uses Timer which runs in the test event loop). + // For a unit test we just need to verify the reconnection logic sends + // subscribe frames on the new connection. + await Future.delayed(const Duration(milliseconds: 600)); + + // After reconnect, the new mock should have received subscribe frames. + if (currentMock != null && currentMock != mock1) { + final frames = currentMock!.sentFrames; + final subscribeChannels = frames + .where((f) => f['event'] == 'pusher:subscribe') + .map( + (f) => (f['data'] as Map)['channel'] as String, + ) + .toSet(); + + expect(subscribeChannels, contains('orders')); + expect(subscribeChannels, contains('notifications')); + } + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — Pusher error codes', () { + test('4000-4099 are fatal (no reconnect)', () { + final driver = ReverbBroadcastDriver(_defaultConfig()); + + expect(driver.classifyErrorCode(4000), equals(PusherErrorAction.fatal)); + expect(driver.classifyErrorCode(4050), equals(PusherErrorAction.fatal)); + expect(driver.classifyErrorCode(4099), equals(PusherErrorAction.fatal)); + }); + + test('4100-4199 reconnect immediately', () { + final driver = ReverbBroadcastDriver(_defaultConfig()); + + expect( + driver.classifyErrorCode(4100), + equals(PusherErrorAction.reconnectImmediate), + ); + expect( + driver.classifyErrorCode(4150), + equals(PusherErrorAction.reconnectImmediate), + ); + expect( + driver.classifyErrorCode(4199), + equals(PusherErrorAction.reconnectImmediate), + ); + }); + + test('4200-4299 reconnect with backoff', () { + final driver = ReverbBroadcastDriver(_defaultConfig()); + + expect( + driver.classifyErrorCode(4200), + equals(PusherErrorAction.reconnectBackoff), + ); + expect( + driver.classifyErrorCode(4250), + equals(PusherErrorAction.reconnectBackoff), + ); + expect( + driver.classifyErrorCode(4299), + equals(PusherErrorAction.reconnectBackoff), + ); + }); + + test('unknown codes default to backoff', () { + final driver = ReverbBroadcastDriver(_defaultConfig()); + + expect( + driver.classifyErrorCode(4300), + equals(PusherErrorAction.reconnectBackoff), + ); + expect( + driver.classifyErrorCode(1000), + equals(PusherErrorAction.reconnectBackoff), + ); + }); + }); + + group('ReverbBroadcastChannel', () { + test('listen() filters events by name and returns this', () async { + final ch = ReverbBroadcastChannel('test'); + final events = []; + + final result = ch.listen('OrderShipped', events.add); + expect(result, same(ch)); + + ch.addEvent( + BroadcastEvent( + event: 'OrderShipped', + channel: 'test', + data: {'id': 1}, + receivedAt: DateTime.now(), + ), + ); + + ch.addEvent( + BroadcastEvent( + event: 'OrderCancelled', + channel: 'test', + data: {'id': 2}, + receivedAt: DateTime.now(), + ), + ); + + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + expect(events.first.data['id'], equals(1)); + + ch.dispose(); + }); + + test('stopListening() removes listener for event', () async { + final ch = ReverbBroadcastChannel('test'); + final events = []; + + ch.listen('OrderShipped', events.add); + ch.stopListening('OrderShipped'); + + ch.addEvent( + BroadcastEvent( + event: 'OrderShipped', + channel: 'test', + data: {'id': 1}, + receivedAt: DateTime.now(), + ), + ); + + await Future.delayed(Duration.zero); + + expect(events, isEmpty); + + ch.dispose(); + }); + }); + + group('ReverbBroadcastPresenceChannel', () { + test('handles member_added and member_removed', () async { + final ch = ReverbBroadcastPresenceChannel('presence-room.1'); + + final joinedMembers = >[]; + final leftMembers = >[]; + ch.onJoin.listen(joinedMembers.add); + ch.onLeave.listen(leftMembers.add); + + ch.handlePresenceEvent('pusher_internal:member_added', { + 'user_id': '1', + 'user_info': {'name': 'Alice'}, + }); + + await Future.delayed(Duration.zero); + + expect(ch.members, hasLength(1)); + expect(joinedMembers, hasLength(1)); + + ch.handlePresenceEvent('pusher_internal:member_removed', { + 'user_id': '1', + 'user_info': {'name': 'Alice'}, + }); + + await Future.delayed(Duration.zero); + + expect(ch.members, isEmpty); + expect(leftMembers, hasLength(1)); + + ch.dispose(); + }); + + test('handles subscription_succeeded with member list', () async { + final ch = ReverbBroadcastPresenceChannel('presence-room.1'); + + ch.handlePresenceEvent('pusher:subscription_succeeded', { + 'presence': { + 'count': 2, + 'ids': ['1', '2'], + 'hash': { + '1': {'name': 'Alice'}, + '2': {'name': 'Bob'}, + }, + }, + }); + + await Future.delayed(Duration.zero); + + expect(ch.members, hasLength(2)); + + ch.dispose(); + }); + }); +} diff --git a/test/broadcasting/echo_facade_test.dart b/test/broadcasting/echo_facade_test.dart new file mode 100644 index 0000000..a886737 --- /dev/null +++ b/test/broadcasting/echo_facade_test.dart @@ -0,0 +1,108 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Registers the minimal broadcasting config that resolves to NullBroadcastDriver. +void _setNullConfig() { + Config.set('broadcasting.default', 'null'); + Config.set('broadcasting.connections', { + 'null': {'driver': 'null'}, + }); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + BroadcastManager.resetDrivers(); + Magic.app.singleton('broadcasting', () => BroadcastManager()); + _setNullConfig(); + }); + + group('Echo facade — channel methods', () { + test('channel() returns a BroadcastChannel', () { + final result = Echo.channel('orders'); + expect(result, isA()); + }); + + test('private() returns a BroadcastChannel', () { + final result = Echo.private('user.1'); + expect(result, isA()); + }); + + test('join() returns a BroadcastPresenceChannel', () { + final result = Echo.join('room.1'); + expect(result, isA()); + }); + }); + + group('Echo facade — connection lifecycle', () { + test('connect() completes without error', () async { + await expectLater(Echo.connect(), completes); + }); + + test('disconnect() completes without error', () async { + await expectLater(Echo.disconnect(), completes); + }); + }); + + group('Echo facade — driver accessors', () { + test('connection getter returns a BroadcastDriver', () { + expect(Echo.connection, isA()); + }); + + test('socketId returns null for NullBroadcastDriver', () { + expect(Echo.socketId, isNull); + }); + + test('connectionState returns a stream', () { + expect(Echo.connectionState, isA()); + }); + + test('onReconnect returns a stream', () { + expect(Echo.onReconnect, isA()); + }); + }); + + group('Echo facade — interceptors', () { + test('addInterceptor() does not throw', () { + final interceptor = _NoOpInterceptor(); + expect(() => Echo.addInterceptor(interceptor), returnsNormally); + }); + }); + + group('Echo facade — manager access', () { + test('manager getter returns the BroadcastManager instance', () { + expect(Echo.manager, isA()); + }); + }); + + group('Echo facade — fake/unfake', () { + test('fake() replaces manager with FakeBroadcastManager', () { + final fake = Echo.fake(); + expect(fake, isA()); + expect(Echo.manager, same(fake)); + Echo.unfake(); + }); + + test('unfake() restores real manager resolution', () { + Echo.fake(); + Echo.unfake(); + expect(Echo.manager, isA()); + }); + }); +} + +// --------------------------------------------------------------------------- +// Test doubles +// --------------------------------------------------------------------------- + +/// No-op interceptor used to verify [Echo.addInterceptor] accepts an instance. +class _NoOpInterceptor extends BroadcastInterceptor {} diff --git a/test/testing/fake_broadcast_manager_test.dart b/test/testing/fake_broadcast_manager_test.dart new file mode 100644 index 0000000..2bb12a0 --- /dev/null +++ b/test/testing/fake_broadcast_manager_test.dart @@ -0,0 +1,378 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:magic/magic.dart'; + +// --------------------------------------------------------------------------- +// Test stub interceptor +// --------------------------------------------------------------------------- + +class _TestInterceptor extends BroadcastInterceptor {} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +void main() { + setUp(() { + MagicApp.reset(); + Magic.flush(); + BroadcastManager.resetDrivers(); + }); + + tearDown(() { + Echo.unfake(); + }); + + // --------------------------------------------------------------------------- + // 1. Basic operations + // --------------------------------------------------------------------------- + + group('basic operations', () { + test('driver is disconnected by default', () { + final fake = FakeBroadcastManager(); + expect(fake.driver.isConnected, isFalse); + }); + + test('connect() sets isConnected to true', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + expect(fake.driver.isConnected, isTrue); + }); + + test('disconnect() sets isConnected to false', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + await fake.connection().disconnect(); + expect(fake.driver.isConnected, isFalse); + }); + + test('channel() returns a BroadcastChannel with correct name', () { + final fake = FakeBroadcastManager(); + final ch = fake.connection().channel('orders'); + expect(ch.name, equals('orders')); + }); + + test('private() returns a channel with private- prefix recorded', () { + final fake = FakeBroadcastManager(); + fake.connection().private('user.1'); + expect(fake.driver.subscribedChannels, contains('private-user.1')); + }); + + test( + 'join() returns a presence channel with presence- prefix recorded', + () { + final fake = FakeBroadcastManager(); + fake.connection().join('room.1'); + expect(fake.driver.subscribedChannels, contains('presence-room.1')); + }, + ); + + test('leave() removes channel from subscribed list', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + fake.connection().leave('orders'); + expect(fake.driver.subscribedChannels, isNot(contains('orders'))); + }); + + test('socketId returns fake-socket-id when connected', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + expect(fake.driver.socketId, equals('fake-socket-id')); + }); + + test('socketId returns null when disconnected', () { + final fake = FakeBroadcastManager(); + expect(fake.driver.socketId, isNull); + }); + + test('connectionState returns empty stream', () { + final fake = FakeBroadcastManager(); + expect(fake.driver.connectionState, isA()); + }); + + test('onReconnect returns empty stream', () { + final fake = FakeBroadcastManager(); + expect(fake.driver.onReconnect, isA()); + }); + }); + + // --------------------------------------------------------------------------- + // 2. Assertion helpers — assertConnected + // --------------------------------------------------------------------------- + + group('assertConnected', () { + test('passes when connected', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + expect(() => fake.assertConnected(), returnsNormally); + }); + + test('throws AssertionError when not connected', () { + final fake = FakeBroadcastManager(); + expect(() => fake.assertConnected(), throwsA(isA())); + }); + }); + + // --------------------------------------------------------------------------- + // 3. Assertion helpers — assertDisconnected + // --------------------------------------------------------------------------- + + group('assertDisconnected', () { + test('passes when not connected', () { + final fake = FakeBroadcastManager(); + expect(() => fake.assertDisconnected(), returnsNormally); + }); + + test('throws AssertionError when connected', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + expect(() => fake.assertDisconnected(), throwsA(isA())); + }); + }); + + // --------------------------------------------------------------------------- + // 4. Assertion helpers — assertSubscribed + // --------------------------------------------------------------------------- + + group('assertSubscribed', () { + test('passes when channel is in subscribed list', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + expect(() => fake.assertSubscribed('orders'), returnsNormally); + }); + + test('throws AssertionError when channel is not in subscribed list', () { + final fake = FakeBroadcastManager(); + expect( + () => fake.assertSubscribed('orders'), + throwsA(isA()), + ); + }); + + test('passes for private- prefixed channel', () { + final fake = FakeBroadcastManager(); + fake.connection().private('user.1'); + expect(() => fake.assertSubscribed('private-user.1'), returnsNormally); + }); + + test('passes for presence- prefixed channel', () { + final fake = FakeBroadcastManager(); + fake.connection().join('room.1'); + expect(() => fake.assertSubscribed('presence-room.1'), returnsNormally); + }); + }); + + // --------------------------------------------------------------------------- + // 5. Assertion helpers — assertNotSubscribed + // --------------------------------------------------------------------------- + + group('assertNotSubscribed', () { + test('passes when channel is not in subscribed list', () { + final fake = FakeBroadcastManager(); + expect(() => fake.assertNotSubscribed('orders'), returnsNormally); + }); + + test('throws AssertionError when channel IS in subscribed list', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + expect( + () => fake.assertNotSubscribed('orders'), + throwsA(isA()), + ); + }); + }); + + // --------------------------------------------------------------------------- + // 6. Interceptor tracking + // --------------------------------------------------------------------------- + + group('interceptor tracking', () { + test('addInterceptor records the interceptor', () { + final fake = FakeBroadcastManager(); + final interceptor = _TestInterceptor(); + fake.connection().addInterceptor(interceptor); + expect(fake.driver.addedInterceptors, contains(interceptor)); + }); + + test('assertInterceptorAdded passes after adding interceptor', () { + final fake = FakeBroadcastManager(); + fake.connection().addInterceptor(_TestInterceptor()); + expect(() => fake.assertInterceptorAdded(), returnsNormally); + }); + + test('assertInterceptorAdded throws when no interceptors added', () { + final fake = FakeBroadcastManager(); + expect( + () => fake.assertInterceptorAdded(), + throwsA(isA()), + ); + }); + + test('multiple interceptors can be added', () { + final fake = FakeBroadcastManager(); + fake.connection().addInterceptor(_TestInterceptor()); + fake.connection().addInterceptor(_TestInterceptor()); + expect(fake.driver.addedInterceptors, hasLength(2)); + }); + }); + + // --------------------------------------------------------------------------- + // 7. Recording — driver tracks subscriptions + // --------------------------------------------------------------------------- + + group('recording', () { + test('channel() records channel name', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('notifications'); + expect(fake.driver.subscribedChannels, contains('notifications')); + }); + + test('multiple channel() calls accumulate', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + fake.connection().channel('notifications'); + expect( + fake.driver.subscribedChannels, + containsAll(['orders', 'notifications']), + ); + }); + + test('leave() removes only the matching channel', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + fake.connection().channel('notifications'); + fake.connection().leave('orders'); + expect(fake.driver.subscribedChannels, isNot(contains('orders'))); + expect(fake.driver.subscribedChannels, contains('notifications')); + }); + }); + + // --------------------------------------------------------------------------- + // 8. reset() + // --------------------------------------------------------------------------- + + group('reset()', () { + test('clears subscribed channels', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + fake.reset(); + expect(fake.driver.subscribedChannels, isEmpty); + }); + + test('clears connected state', () async { + final fake = FakeBroadcastManager(); + await fake.connection().connect(); + fake.reset(); + expect(fake.driver.isConnected, isFalse); + }); + + test('clears added interceptors', () { + final fake = FakeBroadcastManager(); + fake.connection().addInterceptor(_TestInterceptor()); + fake.reset(); + expect(fake.driver.addedInterceptors, isEmpty); + }); + + test('after reset assertNothingSubscribed passes', () { + final fake = FakeBroadcastManager(); + fake.connection().channel('orders'); + fake.reset(); + expect(() => fake.assertNotSubscribed('orders'), returnsNormally); + }); + }); + + // --------------------------------------------------------------------------- + // 9. Facade integration — Echo.fake() + // --------------------------------------------------------------------------- + + group('Echo.fake()', () { + test('returns a FakeBroadcastManager instance', () { + final fake = Echo.fake(); + expect(fake, isA()); + }); + + test('Echo.channel() routes through the fake', () { + final fake = Echo.fake(); + Config.set('broadcasting.default', 'null'); + + Echo.channel('orders'); + + expect(fake.driver.subscribedChannels, contains('orders')); + }); + + test('Echo.private() routes through the fake', () { + final fake = Echo.fake(); + Config.set('broadcasting.default', 'null'); + + Echo.private('user.1'); + + fake.assertSubscribed('private-user.1'); + }); + + test('Echo.join() routes through the fake', () { + final fake = Echo.fake(); + Config.set('broadcasting.default', 'null'); + + Echo.join('room.1'); + + fake.assertSubscribed('presence-room.1'); + }); + + test('Echo.connect() routes through the fake', () async { + final fake = Echo.fake(); + Config.set('broadcasting.default', 'null'); + + await Echo.connect(); + + fake.assertConnected(); + }); + + test('Echo.disconnect() routes through the fake', () async { + final fake = Echo.fake(); + Config.set('broadcasting.default', 'null'); + + await Echo.connect(); + await Echo.disconnect(); + + fake.assertDisconnected(); + }); + }); + + // --------------------------------------------------------------------------- + // 10. Echo.unfake() + // --------------------------------------------------------------------------- + + group('Echo.unfake()', () { + test('can be called without throwing', () { + Echo.fake(); + expect(() => Echo.unfake(), returnsNormally); + }); + + test('can be called when not faked without throwing', () { + expect(() => Echo.unfake(), returnsNormally); + }); + }); + + // --------------------------------------------------------------------------- + // 11. Presence channel + // --------------------------------------------------------------------------- + + group('presence channel', () { + test('join() returns an object with empty members list', () { + final fake = FakeBroadcastManager(); + final presence = fake.connection().join('room.1'); + expect(presence.members, isEmpty); + }); + + test('join() onJoin stream is empty', () { + final fake = FakeBroadcastManager(); + final presence = fake.connection().join('room.1'); + expect(presence.onJoin, isA()); + }); + + test('join() onLeave stream is empty', () { + final fake = FakeBroadcastManager(); + final presence = fake.connection().join('room.1'); + expect(presence.onLeave, isA()); + }); + }); +} From 6de7ab92bd3f5dc7a01c89ec8d1b54da9b761058 Mon Sep 17 00:00:00 2001 From: Anilcan Cakir Date: Mon, 6 Apr 2026 00:01:01 +0300 Subject: [PATCH 2/3] =?UTF-8?q?test(broadcasting):=20improve=20coverage=20?= =?UTF-8?q?=E2=80=94=20onDone,=20onError,=20Pusher=20errors,=20presence,?= =?UTF-8?q?=20interceptor=20defaults?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 29 tests covering previously uncovered paths: - _onDone/_onError reconnection and interceptor routing - _handlePusherError with fatal/immediate/backoff/malformed codes - Presence events routed through driver (member_added/removed, subscription_succeeded) - Subscription queue for private/presence channels before connect - BroadcastInterceptor base class pass-through defaults (onSend/onReceive/onError) - Echo.listen() and Echo.leave() facade methods - BroadcastManager reverb driver resolution - BroadcastServiceProvider boot with non-null driver - Application event edge cases (unknown channel, missing channel, non-Map data) --- test/broadcasting/broadcast_manager_test.dart | 25 + .../broadcast_service_provider_test.dart | 23 + .../drivers/reverb_broadcast_driver_test.dart | 436 ++++++++++++++++++ test/broadcasting/echo_facade_test.dart | 38 ++ 4 files changed, 522 insertions(+) diff --git a/test/broadcasting/broadcast_manager_test.dart b/test/broadcasting/broadcast_manager_test.dart index e70ccdc..a012cc6 100644 --- a/test/broadcasting/broadcast_manager_test.dart +++ b/test/broadcasting/broadcast_manager_test.dart @@ -264,6 +264,31 @@ void main() { expect(driver, isA<_StubDriver>()); }); + test('reverb driver name resolves to ReverbBroadcastDriver', () { + _setConfig( + defaultConnection: 'reverb', + connections: { + 'reverb': { + 'driver': 'reverb', + 'host': 'localhost', + 'port': 8080, + 'scheme': 'ws', + 'app_key': 'test', + 'auth_endpoint': '/broadcasting/auth', + 'reconnect': false, + 'max_reconnect_delay': 30000, + 'activity_timeout': 30, + 'dedup_buffer_size': 100, + }, + }, + ); + + final manager = BroadcastManager(); + final driver = manager.connection(); + + expect(driver, isA()); + }); + test('unknown driver name falls back to NullBroadcastDriver', () { _setConfig( defaultConnection: 'weird', diff --git a/test/broadcasting/broadcast_service_provider_test.dart b/test/broadcasting/broadcast_service_provider_test.dart index 2f0f12b..411959a 100644 --- a/test/broadcasting/broadcast_service_provider_test.dart +++ b/test/broadcasting/broadcast_service_provider_test.dart @@ -46,6 +46,29 @@ void main() { ); }); + group('BroadcastServiceProvider — boot with non-null driver', () { + test('boot() calls connect() when default is not "null"', () async { + var connectCalled = false; + + BroadcastManager.extend('spy', (Map config) { + return _SpyNullDriver(onConnect: () => connectCalled = true); + }); + + Config.set('broadcasting.default', 'my_conn'); + Config.set('broadcasting.connections', { + 'my_conn': {'driver': 'spy'}, + }); + + final app = MagicApp.instance; + final provider = BroadcastServiceProvider(app); + + provider.register(); + await provider.boot(); + + expect(connectCalled, isTrue); + }); + }); + group('BroadcastServiceProvider — boot with null driver', () { test('boot() completes without error when default is "null"', () async { Config.set('broadcasting.default', 'null'); diff --git a/test/broadcasting/drivers/reverb_broadcast_driver_test.dart b/test/broadcasting/drivers/reverb_broadcast_driver_test.dart index ee30528..e857098 100644 --- a/test/broadcasting/drivers/reverb_broadcast_driver_test.dart +++ b/test/broadcasting/drivers/reverb_broadcast_driver_test.dart @@ -759,6 +759,442 @@ void main() { }); }); + group('ReverbBroadcastDriver — onDone and onError', () { + test('onDone triggers reconnecting state when connected', () async { + final (driver, mock) = await _createConnectedDriver(); + + final states = []; + driver.connectionState.listen(states.add); + + // Simulate server closing the connection. + mock.simulateClose(); + + await Future.delayed(const Duration(milliseconds: 50)); + + expect(states, contains(BroadcastConnectionState.reconnecting)); + + await driver.disconnect(); + }); + + test('onError routes through interceptor chain', () async { + final (driver, mock) = await _createConnectedDriver(); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + mock.simulateError(Exception('test error')); + + await Future.delayed(const Duration(milliseconds: 50)); + + expect(interceptor.errors, isNotEmpty); + + await driver.disconnect(); + }); + + test( + 'onDone completes connection completer with error if not yet connected', + () async { + final mock = _MockWebSocketChannel(); + final driver = ReverbBroadcastDriver( + _defaultConfig(overrides: {'reconnect': false}), + channelFactory: (_) => mock, + ); + + // Close before handshake completes. + Future.delayed(const Duration(milliseconds: 10), () { + mock.simulateClose(); + }); + + await expectLater(driver.connect(), throwsA(isA())); + }, + ); + }); + + group('ReverbBroadcastDriver — Pusher error handling', () { + test('fatal error (4000-4099) does not reconnect', () async { + final (driver, mock) = await _createConnectedDriver( + configOverrides: {'reconnect': true}, + ); + + final states = []; + driver.connectionState.listen(states.add); + + mock.simulateMessage({ + 'event': 'pusher:error', + 'data': jsonEncode({'code': 4001, 'message': 'App disabled'}), + }); + + await Future.delayed(const Duration(milliseconds: 100)); + + // Fatal error should NOT trigger reconnecting state. + expect(states, isNot(contains(BroadcastConnectionState.reconnecting))); + + await driver.disconnect(); + }); + + test('reconnectImmediate error (4100-4199) schedules reconnect', () async { + final (driver, mock) = await _createConnectedDriver( + configOverrides: {'reconnect': true}, + ); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + mock.simulateMessage({ + 'event': 'pusher:error', + 'data': jsonEncode({'code': 4100, 'message': 'Over capacity'}), + }); + + await Future.delayed(const Duration(milliseconds: 50)); + + // Error should be routed through interceptor. + expect(interceptor.errors, isNotEmpty); + + await driver.disconnect(); + }); + + test('reconnectBackoff error (4200-4299) schedules reconnect', () async { + final (driver, mock) = await _createConnectedDriver( + configOverrides: {'reconnect': true}, + ); + + final interceptor = _TestInterceptor(); + driver.addInterceptor(interceptor); + + mock.simulateMessage({ + 'event': 'pusher:error', + 'data': jsonEncode({'code': 4200, 'message': 'Rate limited'}), + }); + + await Future.delayed(const Duration(milliseconds: 50)); + + expect(interceptor.errors, isNotEmpty); + + await driver.disconnect(); + }); + + test('malformed error data does not crash', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({'event': 'pusher:error', 'data': 'not-json'}); + + await Future.delayed(const Duration(milliseconds: 50)); + + // Should not throw — malformed data is caught. + expect(driver.isConnected, isTrue); + + await driver.disconnect(); + }); + + test('non-string error data is handled', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({'event': 'pusher:error', 'data': 42}); + + await Future.delayed(const Duration(milliseconds: 50)); + + expect(driver.isConnected, isTrue); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — presence events via driver', () { + test('routes member_added through driver to presence channel', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.join('room.1'); + final joined = >[]; + ch.onJoin.listen(joined.add); + + mock.simulateMessage({ + 'event': 'pusher_internal:member_added', + 'channel': 'presence-room.1', + 'data': jsonEncode({ + 'user_id': '1', + 'user_info': {'name': 'Alice'}, + }), + }); + + await Future.delayed(Duration.zero); + + expect(joined, hasLength(1)); + expect(ch.members, hasLength(1)); + + await driver.disconnect(); + }); + + test('routes member_removed through driver to presence channel', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.join('room.1'); + + // Add a member first. + mock.simulateMessage({ + 'event': 'pusher_internal:member_added', + 'channel': 'presence-room.1', + 'data': jsonEncode({ + 'user_id': '1', + 'user_info': {'name': 'Alice'}, + }), + }); + await Future.delayed(Duration.zero); + + final left = >[]; + ch.onLeave.listen(left.add); + + mock.simulateMessage({ + 'event': 'pusher_internal:member_removed', + 'channel': 'presence-room.1', + 'data': jsonEncode({ + 'user_id': '1', + 'user_info': {'name': 'Alice'}, + }), + }); + await Future.delayed(Duration.zero); + + expect(left, hasLength(1)); + expect(ch.members, isEmpty); + + await driver.disconnect(); + }); + + test('subscription_succeeded populates members via driver', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.join('room.1'); + + mock.simulateMessage({ + 'event': 'pusher:subscription_succeeded', + 'channel': 'presence-room.1', + 'data': jsonEncode({ + 'presence': { + 'count': 2, + 'ids': ['1', '2'], + 'hash': { + '1': {'name': 'Alice'}, + '2': {'name': 'Bob'}, + }, + }, + }), + }); + + await Future.delayed(Duration.zero); + + expect(ch.members, hasLength(2)); + + await driver.disconnect(); + }); + + test('presence event on non-presence channel is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + // Subscribe to a regular (non-presence) channel. + driver.channel('orders'); + + // Send a presence event to it — should be silently ignored. + mock.simulateMessage({ + 'event': 'pusher_internal:member_added', + 'channel': 'orders', + 'data': jsonEncode({'user_id': '1'}), + }); + + await Future.delayed(Duration.zero); + + // No crash — test passes if no exception. + await driver.disconnect(); + }); + + test('presence event without channel name is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({ + 'event': 'pusher_internal:member_added', + 'data': jsonEncode({'user_id': '1'}), + }); + + await Future.delayed(Duration.zero); + + await driver.disconnect(); + }); + + test( + 'subscription_succeeded with Map data (not String) is handled', + () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.join('room.1'); + + mock.simulateMessage({ + 'event': 'pusher:subscription_succeeded', + 'channel': 'presence-room.1', + 'data': { + 'presence': { + 'count': 1, + 'ids': ['1'], + 'hash': { + '1': {'name': 'Alice'}, + }, + }, + }, + }); + + await Future.delayed(Duration.zero); + + expect(ch.members, hasLength(1)); + + await driver.disconnect(); + }, + ); + + test('subscription_succeeded without channel is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({ + 'event': 'pusher:subscription_succeeded', + 'data': jsonEncode({'presence': {}}), + }); + + await Future.delayed(Duration.zero); + + await driver.disconnect(); + }); + + test('subscription_succeeded on non-presence channel is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + driver.channel('orders'); + + mock.simulateMessage({ + 'event': 'pusher:subscription_succeeded', + 'channel': 'orders', + 'data': jsonEncode({'presence': {}}), + }); + + await Future.delayed(Duration.zero); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — subscription queue', () { + test('queues private subscribe when not connected', () async { + final mock = _MockWebSocketChannel(); + final driver = ReverbBroadcastDriver( + _defaultConfig(), + channelFactory: (_) => mock, + ); + + // Call private() before connect — should queue. + driver.private('secret'); + + // Now connect — queued subscription should flush. + _simulateConnectionEstablished(mock); + await driver.connect(); + + await Future.delayed(Duration.zero); + + // Channel should exist with private- prefix. + expect(driver.private('secret').name, equals('private-secret')); + + await driver.disconnect(); + }); + + test('queues presence subscribe when not connected', () async { + final mock = _MockWebSocketChannel(); + final driver = ReverbBroadcastDriver( + _defaultConfig(), + channelFactory: (_) => mock, + ); + + // Call join() before connect — should queue. + driver.join('room.1'); + + _simulateConnectionEstablished(mock); + await driver.connect(); + + await Future.delayed(Duration.zero); + + expect(driver.join('room.1').name, equals('presence-room.1')); + + await driver.disconnect(); + }); + }); + + group('ReverbBroadcastDriver — disconnect cleanup', () { + test('disconnect clears channels and dedup state', () async { + final (driver, mock) = await _createConnectedDriver(); + + // Subscribe to channels. + driver.channel('orders'); + driver.channel('notifications'); + + // Send events to populate dedup buffer. + mock.simulateMessage({ + 'event': 'Test', + 'channel': 'orders', + 'data': jsonEncode({'id': 1}), + }); + await Future.delayed(Duration.zero); + + await driver.disconnect(); + + expect(driver.isConnected, isFalse); + expect(driver.socketId, isNull); + }); + }); + + group('ReverbBroadcastDriver — application event edge cases', () { + test('event for unknown channel is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({ + 'event': 'SomeEvent', + 'channel': 'nonexistent', + 'data': jsonEncode({'key': 'value'}), + }); + + await Future.delayed(Duration.zero); + + // No crash — passes if no exception. + await driver.disconnect(); + }); + + test('event without channel is ignored', () async { + final (driver, mock) = await _createConnectedDriver(); + + mock.simulateMessage({ + 'event': 'SomeEvent', + 'data': jsonEncode({'key': 'value'}), + }); + + await Future.delayed(Duration.zero); + + await driver.disconnect(); + }); + + test('event with non-Map non-String data uses empty map', () async { + final (driver, mock) = await _createConnectedDriver(); + + final ch = driver.channel('orders'); + final events = []; + ch.events.listen(events.add); + + mock.simulateMessage({ + 'event': 'OrderShipped', + 'channel': 'orders', + 'data': 42, + }); + + await Future.delayed(Duration.zero); + + expect(events, hasLength(1)); + expect(events.first.data, isEmpty); + + await driver.disconnect(); + }); + }); + group('ReverbBroadcastChannel', () { test('listen() filters events by name and returns this', () async { final ch = ReverbBroadcastChannel('test'); diff --git a/test/broadcasting/echo_facade_test.dart b/test/broadcasting/echo_facade_test.dart index a886737..59f0d58 100644 --- a/test/broadcasting/echo_facade_test.dart +++ b/test/broadcasting/echo_facade_test.dart @@ -84,6 +84,44 @@ void main() { }); }); + group('Echo facade — listen and leave', () { + test('listen() subscribes to channel event', () { + final events = []; + final channel = Echo.listen('orders', 'OrderShipped', events.add); + expect(channel, isA()); + }); + + test('leave() does not throw', () { + Echo.channel('orders'); + expect(() => Echo.leave('orders'), returnsNormally); + }); + }); + + group('BroadcastInterceptor — default pass-through', () { + test('onSend returns message unchanged', () { + final interceptor = _NoOpInterceptor(); + final message = {'event': 'test'}; + expect(interceptor.onSend(message), same(message)); + }); + + test('onReceive returns event unchanged', () { + final interceptor = _NoOpInterceptor(); + final event = BroadcastEvent( + event: 'test', + channel: 'ch', + data: const {}, + receivedAt: DateTime.now(), + ); + expect(interceptor.onReceive(event), same(event)); + }); + + test('onError returns error unchanged', () { + final interceptor = _NoOpInterceptor(); + final error = Exception('test'); + expect(interceptor.onError(error), same(error)); + }); + }); + group('Echo facade — fake/unfake', () { test('fake() replaces manager with FakeBroadcastManager', () { final fake = Echo.fake(); From 4e819d0784e26d8499adb11e06af79812c4d720b Mon Sep 17 00:00:00 2001 From: Anilcan Cakir Date: Mon, 6 Apr 2026 00:59:57 +0300 Subject: [PATCH 3/3] chore(deps): bump magic_cli to ^0.0.1-alpha.4 Published version includes broadcasting config stub, env var fixes, and install command --without-broadcasting support. --- example/pubspec.lock | 4 ++-- pubspec.yaml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/example/pubspec.lock b/example/pubspec.lock index 42a7aa4..8074922 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -512,10 +512,10 @@ packages: dependency: transitive description: name: magic_cli - sha256: "4101d1523015be47e9a91e3610f9a719cb04840623dd125fc2af1f6b0c4ec9e3" + sha256: a7190d3aac4e028feb6d68342dc5db3e4e62b5a340dbfc27181198daac297223 url: "https://pub.dev" source: hosted - version: "0.0.1-alpha.3" + version: "0.0.1-alpha.4" matcher: dependency: transitive description: diff --git a/pubspec.yaml b/pubspec.yaml index bf440f2..7fbec24 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -46,7 +46,7 @@ dependencies: image_picker: ^1.1.2 faker: ^2.2.0 web_socket_channel: ^3.0.0 - magic_cli: ^0.0.1-alpha.3 + magic_cli: ^0.0.1-alpha.4 flutter_test: sdk: flutter