diff --git a/packages/ndk/lib/domain_layer/entities/broadcast_state.dart b/packages/ndk/lib/domain_layer/entities/broadcast_state.dart index 6871772cb..5938dd14f 100644 --- a/packages/ndk/lib/domain_layer/entities/broadcast_state.dart +++ b/packages/ndk/lib/domain_layer/entities/broadcast_state.dart @@ -46,8 +46,8 @@ class BroadcastState { Nip01Event? event; /// stream controller for state updates - final BehaviorSubject _stateUpdatesController = - BehaviorSubject(); + final ReplaySubject _stateUpdatesController = + ReplaySubject(maxSize: 1); /// [networkController] used by relay manger to write responses StreamController networkController = @@ -65,11 +65,16 @@ class BroadcastState { /// completes when all relays have responded or timed out /// first string is the relay url, second is the response bool get publishDone { + // If no relays were registered and the engine closed the controller, it's done + if (broadcasts.isEmpty) { + return networkController.isClosed; + } + // Check if all relays have responded (success or failure) final allResponded = broadcasts.values.every( (element) => element.okReceived || element.msg.isNotEmpty, ); - if (allResponded && broadcasts.isNotEmpty) { + if (allResponded) { return true; } @@ -98,6 +103,8 @@ class BroadcastState { _stateUpdatesController.add(this); // check if all relays responded _checkBroadcastDone(); + }, onDone: () { + _checkBroadcastDone(); }); Future.delayed(timeout, () { @@ -110,6 +117,7 @@ class BroadcastState { void _checkBroadcastDone() { if (publishDone) { + _stateUpdatesController.add(this); _dispose(); } } diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart index d444cd37e..2f6807d92 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart @@ -185,7 +185,7 @@ class JitEngine with Logger implements NetworkEngine { if (specificRelays != null) { final cleanedSpecificRelays = cleanRelayUrls(specificRelays.toList()); - return RelayJitBroadcastSpecificRelaysStrategy.broadcast( + await RelayJitBroadcastSpecificRelaysStrategy.broadcast( specificRelays: cleanedSpecificRelays, relayManager: relayManagerLight, eventToPublish: workingNostrEvent, @@ -193,10 +193,14 @@ class JitEngine with Logger implements NetworkEngine { .whereType>() .toList(), ); + if (broadcastState.broadcasts.isEmpty) { + broadcastState.networkController.close(); + } + return; } // default publish to own outbox - RelayJitBroadcastOutboxStrategy.broadcast( + await RelayJitBroadcastOutboxStrategy.broadcast( eventToPublish: workingNostrEvent, connectedRelays: relayManagerLight.connectedRelays .whereType>() @@ -209,7 +213,7 @@ class JitEngine with Logger implements NetworkEngine { // check if we need to publish to others inboxes if (workingNostrEvent.pTags.isNotEmpty && workingNostrEvent.kind != ContactList.kKind) { - RelayJitBroadcastOtherReadStrategy.broadcast( + await RelayJitBroadcastOtherReadStrategy.broadcast( eventToPublish: workingNostrEvent, connectedRelays: relayManagerLight.connectedRelays .whereType>() @@ -219,6 +223,9 @@ class JitEngine with Logger implements NetworkEngine { pubkeysOfInbox: workingNostrEvent.pTags, ); } + if (broadcastState.broadcasts.isEmpty) { + broadcastState.networkController.close(); + } } asyncStuff(); diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_other_read.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_other_read.dart index c44f3a37a..b0f3cbab8 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_other_read.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_other_read.dart @@ -76,25 +76,22 @@ class RelayJitBroadcastOtherReadStrategy { continue; } - relayManager - .connectRelay( + final success = await relayManager.connectRelay( dirtyUrl: relayUrl, connectionSource: ConnectionSource.broadcastOther, - ) - .then((success) { - if (!success.first) { - relayManager.failBroadcast( - eventToPublish.id, - relayUrl, - "connection failed", - ); - return; - } - final relay = relayManager.connectedRelays - .firstWhere((element) => element.url == relayUrl); + ); + if (!success.first) { + relayManager.failBroadcast( + eventToPublish.id, + relayUrl, + "connection failed", + ); + continue; + } + final relay = relayManager.connectedRelays + .firstWhere((element) => element.url == relayUrl); - sendToRelay(relay: relay); - }); + sendToRelay(relay: relay); } } } diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_own.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_own.dart index cc28d3901..6073fdb53 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_own.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_own.dart @@ -67,25 +67,22 @@ class RelayJitBroadcastOutboxStrategy { ); continue; } - relayManager - .connectRelay( + final success = await relayManager.connectRelay( dirtyUrl: relayUrl, connectionSource: ConnectionSource.broadcastOwn, - ) - .then((success) { - if (!success.first) { - relayManager.failBroadcast( - eventToPublish.id, - relayUrl, - "connection failed", - ); - return; - } - final relay = relayManager.connectedRelays - .firstWhere((element) => element.url == relayUrl); + ); + if (!success.first) { + relayManager.failBroadcast( + eventToPublish.id, + relayUrl, + "connection failed", + ); + continue; + } + final relay = relayManager.connectedRelays + .firstWhere((element) => element.url == relayUrl); - sendToRelay(relay: relay); - }); + sendToRelay(relay: relay); } } } diff --git a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_specific.dart b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_specific.dart index 914f01dcc..213226b02 100644 --- a/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_specific.dart +++ b/packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_specific.dart @@ -49,32 +49,29 @@ class RelayJitBroadcastSpecificRelaysStrategy { continue; } - relayManager - .reconnectRelay( + final success = await relayManager.reconnectRelay( relayUrl, connectionSource: ConnectionSource.broadcastSpecific, - ) - .then((success) { - if (!success) { - relayManager.failBroadcast( - eventToPublish.id, - relayUrl, - "connection failed", - ); - return; - } - try { - final relay = relayManager.connectedRelays - .firstWhere((element) => element.url == relayUrl); - sendToRelay(relay: relay); - } catch (e) { - relayManager.failBroadcast( - eventToPublish.id, - relayUrl, - "relay not found after connection", - ); - } - }); + ); + if (!success) { + relayManager.failBroadcast( + eventToPublish.id, + relayUrl, + "connection failed", + ); + continue; + } + try { + final relay = relayManager.connectedRelays + .firstWhere((element) => element.url == relayUrl); + sendToRelay(relay: relay); + } catch (e) { + relayManager.failBroadcast( + eventToPublish.id, + relayUrl, + "relay not found after connection", + ); + } } } } diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index d3c013428..3ee3d0ba2 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -204,9 +204,6 @@ class RelayManager { }); await relayConnectivity.relayTransport!.ready.timeout( Duration(seconds: connectTimeout), - onTimeout: () { - Logger.log.w(() => "timed out connecting to relay $url"); - }, ); _startListeningToSocket(relayConnectivity); @@ -221,7 +218,7 @@ class RelayManager { return Tuple(true, ""); } catch (e) { Logger.log.e(() => "!! could not connect to $url -> $e"); - relayConnectivity!.relayTransport == null; + relayConnectivity!.relayTransport = null; } relayConnectivity.relay.failedToConnect(); relayConnectivity.stats.connectionErrors++; @@ -246,6 +243,7 @@ class RelayManager { ); } if (relayConnectivity == null || + relayConnectivity.relayTransport == null || !relayConnectivity.relayTransport!.isOpen()) { if (!force && (relayConnectivity != null && @@ -267,6 +265,7 @@ class RelayManager { } relayConnectivity = globalState.relays[url]; if (relayConnectivity == null || + relayConnectivity.relayTransport == null || !relayConnectivity.relayTransport!.isOpen()) { // web socket is not open return false; diff --git a/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart b/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart index d5e7978a7..6a672ab73 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_sets_engine.dart @@ -290,124 +290,70 @@ class RelaySetsEngine implements NetworkEngine { // specific relays // ===================================================================================== if (specificRelays != null) { - for (final relayUrl in specificRelays) { - // broadcast async - doRelayBroadcast(relayUrl, workingEvent); + if (specificRelays.isNotEmpty) { + await Future.wait(specificRelays.map((relayUrl) => + // broadcast async + doRelayBroadcast(relayUrl, workingEvent))); } - return; - } - // ===================================================================================== - // own outbox - // ===================================================================================== - // TODO should not only depend on cached, but go fetch it if not present in cache - final nip65List = (await UserRelayLists.getUserRelayListCacheLatest( - pubkeys: [workingEvent.pubKey], - cacheManager: _cacheManager, - )); - // make a copy of the keys since connectRelay may mutate the underlying map - List writeRelaysUrls = - _relayManager.globalState.relays.keys.toList(); - if (nip65List.isNotEmpty) { - writeRelaysUrls = nip65List.first.relays.entries - .where((element) => element.value.isWrite) - .map((e) => e.key) - .toList(); } else { - Logger.log.w(() => - "could not find user relay list from nip65, using default bootstrap relays"); - } - - for (final relayUrl in writeRelaysUrls) { - final isConnected = - _globalState.relays[relayUrl]?.relayTransport?.isOpen() ?? false; - if (isConnected) { - continue; - } - - await _relayManager.connectRelay( - dirtyUrl: relayUrl, - connectionSource: ConnectionSource.broadcastOwn, - ); - } - - for (final relayUrl in writeRelaysUrls) { - final relay = _globalState.relays[relayUrl]; - if (relay == null) { - Logger.log.w(() => "relay $relayUrl not found"); - continue; - } - - _relayManager.registerRelayBroadcast( - eventToPublish: workingEvent, - relayUrl: relayUrl, - ); - - _relayManager.send( - relay, - ClientMsg( - ClientMsgType.kEvent, - event: workingEvent, - )); - } - - // ===================================================================================== - // other inbox - // ===================================================================================== - if (workingEvent.pTags.isNotEmpty) { - final nip65Data = await UserRelayLists.getUserRelayListCacheLatest( - pubkeys: workingEvent.pTags, + // ===================================================================================== + // own outbox + // ===================================================================================== + // TODO should not only depend on cached, but go fetch it if not present in cache + final nip65List = (await UserRelayLists.getUserRelayListCacheLatest( + pubkeys: [workingEvent.pubKey], cacheManager: _cacheManager, - ); - - List myWriteRelayUrlsOthers = []; - - /// filter read relays - for (final userNip65 in nip65Data) { - final completeList = userNip65.relays.entries - .where((element) => element.value.isRead) + )); + // make a copy of the keys since connectRelay may mutate the underlying map + List writeRelaysUrls = + _relayManager.globalState.relays.keys.toList(); + if (nip65List.isNotEmpty) { + writeRelaysUrls = nip65List.first.relays.entries + .where((element) => element.value.isWrite) .map((e) => e.key) .toList(); - - // cut list of at a certain threshold - final maxList = completeList.sublist( - 0, - min(completeList.length, - BroadcastDefaults.MAX_INBOX_RELAYS_TO_BROADCAST), - ); - myWriteRelayUrlsOthers.addAll(maxList); - } - - for (final relayUrl in myWriteRelayUrlsOthers) { - final isConnected = - _globalState.relays[relayUrl]?.relayTransport?.isOpen() ?? false; - if (isConnected) { - continue; - } - await _relayManager.connectRelay( - dirtyUrl: relayUrl, - connectionSource: ConnectionSource.broadcastOther); + } else { + Logger.log.w(() => + "could not find user relay list from nip65, using default bootstrap relays"); } - for (final relayUrl in myWriteRelayUrlsOthers) { - final relay = _globalState.relays[relayUrl]; - if (relay == null) { - Logger.log.w(() => "relay $relayUrl not found"); - continue; - } + await Future.wait(writeRelaysUrls.map((relayUrl) => + doRelayBroadcast(relayUrl, workingEvent))); - _relayManager.registerRelayBroadcast( - eventToPublish: workingEvent, - relayUrl: relayUrl, + // ===================================================================================== + // other inbox + // ===================================================================================== + if (workingEvent.pTags.isNotEmpty) { + final nip65Data = await UserRelayLists.getUserRelayListCacheLatest( + pubkeys: workingEvent.pTags, + cacheManager: _cacheManager, ); - _relayManager.send( - relay, - ClientMsg( - ClientMsgType.kEvent, - event: workingEvent, - )); + List myWriteRelayUrlsOthers = []; + + /// filter read relays + for (final userNip65 in nip65Data) { + final completeList = userNip65.relays.entries + .where((element) => element.value.isRead) + .map((e) => e.key) + .toList(); + + // cut list of at a certain threshold + final maxList = completeList.sublist( + 0, + min(completeList.length, + BroadcastDefaults.MAX_INBOX_RELAYS_TO_BROADCAST), + ); + myWriteRelayUrlsOthers.addAll(maxList); + } + + await Future.wait(myWriteRelayUrlsOthers.map((relayUrl) => + doRelayBroadcast(relayUrl, workingEvent))); } } + if (broadcastState.broadcasts.isEmpty) { + broadcastState.networkController.close(); + } } asyncStuff(); diff --git a/packages/ndk/test/broadcast/broadcast_test.dart b/packages/ndk/test/broadcast/broadcast_test.dart new file mode 100644 index 000000000..be6139c32 --- /dev/null +++ b/packages/ndk/test/broadcast/broadcast_test.dart @@ -0,0 +1,82 @@ +import 'package:test/test.dart'; +import 'package:ndk/ndk.dart'; +import 'package:ndk/shared/nips/nip01/bip340.dart'; + +void main() { + test( + 'broadcast should complete quickly even with one offline relay', + () async { + final ndk = Ndk( + NdkConfig( + eventVerifier: Bip340EventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: ["ws://localhost:25565"], + ), + ); + + final keyPair = Bip340.generatePrivateKey(); + ndk.accounts.loginPrivateKey( + privkey: keyPair.privateKey!, + pubkey: keyPair.publicKey, + ); + + final event = Nip01Event( + pubKey: keyPair.publicKey, + kind: 1, + content: '', + tags: [], + ); + + final stopwatch = Stopwatch()..start(); + + final broadcast = ndk.broadcast.broadcast(nostrEvent: event); + await broadcast.broadcastDoneFuture; + + stopwatch.stop(); + + expect( + stopwatch.elapsedMilliseconds, + lessThan(5000), + ); + + await ndk.destroy(); + }, + ); + + test( + 'broadcast to 0 relay should not time out', + () async { + final ndk = Ndk.emptyBootstrapRelaysConfig(); + + final keyPair = Bip340.generatePrivateKey(); + ndk.accounts.loginPrivateKey( + privkey: keyPair.privateKey!, + pubkey: keyPair.publicKey, + ); + + final event = Nip01Event( + pubKey: keyPair.publicKey, + kind: 1, + content: '', + tags: [], + ); + + final stopwatch = Stopwatch()..start(); + + final broadcast = ndk.broadcast.broadcast( + nostrEvent: event, + specificRelays: [], + ); + await broadcast.broadcastDoneFuture; + + stopwatch.stop(); + + expect( + stopwatch.elapsedMilliseconds, + lessThan(5000), + ); + + await ndk.destroy(); + }, + ); +}