From 6172c9800dfced486348671fbbf9b8851f491804 Mon Sep 17 00:00:00 2001 From: Gringo Date: Thu, 26 Feb 2026 16:36:18 +0100 Subject: [PATCH 1/2] test: broadcast + requests --- packages/ndk/test/mocks/mock_relay.dart | 12 +++++ .../test/usecases/broadcast_sources_test.dart | 44 +++++++++++++++ .../event_sources_merge_test.dart | 53 +++++++++++++++++++ 3 files changed, 109 insertions(+) create mode 100644 packages/ndk/test/usecases/broadcast_sources_test.dart create mode 100644 packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart diff --git a/packages/ndk/test/mocks/mock_relay.dart b/packages/ndk/test/mocks/mock_relay.dart index 5bc37d565..b4700019e 100644 --- a/packages/ndk/test/mocks/mock_relay.dart +++ b/packages/ndk/test/mocks/mock_relay.dart @@ -33,6 +33,7 @@ class MockRelay { bool sendMalformedEvents; String? customWelcomeMessage; int? maxEventsPerRequest; + String? bannedWord; // NIP-46 Remote Signer Support static const int kNip46Kind = BunkerRequest.kKind; @@ -58,6 +59,7 @@ class MockRelay { this.sendMalformedEvents = false, this.customWelcomeMessage, this.maxEventsPerRequest, + this.bannedWord, int? explicitPort, }) : _nip65s = nip65s { if (explicitPort != null) { @@ -147,6 +149,16 @@ class MockRelay { if (eventJson[0] == "EVENT") { Nip01Event newEvent = Nip01EventModel.fromJson(eventJson[1]); if (verify(newEvent.pubKey, newEvent.id, newEvent.sig!)) { + // Check if event contains banned word + if (bannedWord != null && newEvent.content.contains(bannedWord!)) { + webSocket.add(jsonEncode([ + "OK", + newEvent.id, + false, + "blocked: content contains banned word" + ])); + return; + } // Check auth for events if required (any authenticated user is OK) if (requireAuthForEvents && authenticatedPubkeys.isEmpty) { webSocket.add(jsonEncode([ diff --git a/packages/ndk/test/usecases/broadcast_sources_test.dart b/packages/ndk/test/usecases/broadcast_sources_test.dart new file mode 100644 index 000000000..b788b6c4b --- /dev/null +++ b/packages/ndk/test/usecases/broadcast_sources_test.dart @@ -0,0 +1,44 @@ +import 'package:ndk/ndk.dart'; +import 'package:ndk/shared/nips/nip01/bip340.dart'; +import 'package:test/test.dart'; + +import '../mocks/mock_event_verifier.dart'; +import '../mocks/mock_relay.dart'; + +void main() async { + test("braodcast should update source", () async { + final relay = MockRelay(name: "relay"); + + await relay.startServer(); + + final ndk = Ndk(NdkConfig( + eventVerifier: MockEventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: [relay.url], + )); + + final keypair = Bip340.generatePrivateKey(); + final signer = Bip340EventSigner( + privateKey: keypair.privateKey, + publicKey: keypair.publicKey, + ); + ndk.accounts.loginExternalSigner(signer: signer); + + final event = Nip01Event( + pubKey: keypair.publicKey, + kind: 1, + tags: [], + content: "content", + ); + + await ndk.broadcast.broadcast(nostrEvent: event).broadcastDoneFuture; + + final localEvent = await ndk.config.cache.loadEvent(event.id); + + expect(localEvent, isNotNull); + expect(localEvent!.sources, isNotEmpty); + + await ndk.destroy(); + await relay.stopServer(); + }); +} diff --git a/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart b/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart new file mode 100644 index 000000000..3a429357f --- /dev/null +++ b/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart @@ -0,0 +1,53 @@ +import 'package:ndk/shared/nips/nip01/bip340.dart'; +import 'package:test/test.dart'; +import 'package:ndk/ndk.dart'; + +import '../../mocks/mock_event_verifier.dart'; +import '../../mocks/mock_relay.dart'; + +void main() async { + test("requests should update sources", () async { + final bannedWord = "cow"; + + final relay1 = MockRelay(name: "relay 1"); + final relay2 = MockRelay(name: "relay 2"); + final relay3 = MockRelay(name: "relay 2", bannedWord: bannedWord); + + await relay1.startServer(); + await relay2.startServer(); + await relay3.startServer(); + + final ndk = Ndk(NdkConfig( + eventVerifier: MockEventVerifier(), + cache: MemCacheManager(), + bootstrapRelays: [relay1.url, relay2.url, relay3.url], + )); + + final keypair = Bip340.generatePrivateKey(); + final signer = Bip340EventSigner( + privateKey: keypair.privateKey, + publicKey: keypair.publicKey, + ); + ndk.accounts.loginExternalSigner(signer: signer); + + final event = Nip01Event( + pubKey: keypair.publicKey, + kind: 1, + tags: [], + content: bannedWord, + ); + await ndk.broadcast.broadcast(nostrEvent: event).broadcastDoneFuture; + + await ndk.config.cache.clearAll(); + + final query = ndk.requests.query(filter: Filter(ids: [event.id])); + final events = await query.future; + + expect(events.first.sources.length, equals(2)); + + await ndk.destroy(); + await relay1.stopServer(); + await relay2.stopServer(); + await relay3.stopServer(); + }); +} From f38e5fb2e033f0334395d10911a5229c8f2decf9 Mon Sep 17 00:00:00 2001 From: Gringo Date: Wed, 25 Mar 2026 14:10:40 +0100 Subject: [PATCH 2/2] fix: merge event sources from multiple relays --- .../domain_layer/usecases/relay_manager.dart | 30 ++++++++++++++++- .../usecases/requests/requests.dart | 32 ++++++++++++++++--- .../stream_response_cleaner.dart | 24 ++++++++++---- packages/ndk/lib/presentation_layer/init.dart | 2 ++ .../event_sources_merge_test.dart | 3 +- 5 files changed, 78 insertions(+), 13 deletions(-) diff --git a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart index fb155bb56..9f6a4136a 100644 --- a/packages/ndk/lib/domain_layer/usecases/relay_manager.dart +++ b/packages/ndk/lib/domain_layer/usecases/relay_manager.dart @@ -23,6 +23,7 @@ import '../entities/relay_connectivity.dart'; import '../entities/relay_info.dart'; import '../entities/request_state.dart'; import '../entities/tuple.dart'; +import '../repositories/cache_manager.dart'; import '../repositories/nostr_transport.dart'; import 'accounts/accounts.dart'; import 'engines/network_engine.dart'; @@ -74,6 +75,9 @@ class RelayManager { /// AUTH strategy: eager (on challenge) or lazy (on auth-required) final bool eagerAuth; + /// cache manager for updating event sources + final CacheManager? cacheManager; + /// Creates a new relay manager. RelayManager({ required this.globalState, @@ -84,6 +88,7 @@ class RelayManager { allowReconnect = true, this.eagerAuth = false, this.authCallbackTimeout = RequestDefaults.DEFAULT_AUTH_CALLBACK_TIMEOUT, + this.cacheManager, }) : _accounts = accounts { allowReconnectRelays = allowReconnect; _connectSeedRelays(urls: bootstrapRelays ?? DEFAULT_BOOTSTRAP_RELAYS); @@ -501,6 +506,29 @@ class RelayManager { if (globalState.inFlightBroadcasts[eventId] != null && !globalState .inFlightBroadcasts[eventId]!.networkController.isClosed) { + // Update cache with source if broadcast was successful + if (success && cacheManager != null) { + final broadcastState = globalState.inFlightBroadcasts[eventId]; + final event = broadcastState?.event; + if (event != null) { + // Only update cache if event was already saved (saveToCache was true) + // Check if event exists in cache before updating sources + cacheManager!.loadEvent(eventId).then((cachedEvent) { + if (cachedEvent != null) { + // Merge existing sources with new relay URL, avoiding duplicates + final updatedSources = { + ...event.sources, + relayConnectivity.url + }.toList(); + final updatedEvent = event.copyWith(sources: updatedSources); + cacheManager!.saveEvent(updatedEvent); + // Update the event in broadcast state + broadcastState!.event = updatedEvent; + } + }); + } + } + globalState.inFlightBroadcasts[eventId]?.networkController.add( RelayBroadcastResponse( relayUrl: relayConnectivity.url, @@ -659,7 +687,7 @@ class RelayManager { } final eventWithSources = - event.copyWith(sources: [...event.sources, connectivity.url]); + event.copyWith(sources: {...event.sources, connectivity.url}.toList()); if (state.networkController.isClosed) { // this might happen because relays even after we send a CLOSE subscription.id, they'll still send more events diff --git a/packages/ndk/lib/domain_layer/usecases/requests/requests.dart b/packages/ndk/lib/domain_layer/usecases/requests/requests.dart index 946eaed06..e0fd2b2a3 100644 --- a/packages/ndk/lib/domain_layer/usecases/requests/requests.dart +++ b/packages/ndk/lib/domain_layer/usecases/requests/requests.dart @@ -345,7 +345,7 @@ class Requests { }) { final requestId = '$name-paginated-${Helpers.getRandomString(10)}'; final aggregatedController = ReplaySubject(); - final seenEventIds = {}; + final seenEvents = >{}; // event_id -> sources Future paginate() async { final since = filter.since; @@ -373,9 +373,20 @@ class Requests { final relayState = {}; for (final event in initialEvents) { - if (!seenEventIds.contains(event.id)) { - seenEventIds.add(event.id); + final existingSources = seenEvents[event.id]; + if (existingSources == null) { + // First time seeing this event + seenEvents[event.id] = event.sources.toSet(); aggregatedController.add(event); + } else { + // Merge sources if this event has new sources + if (event.sources.isNotEmpty) { + final newSources = existingSources..addAll(event.sources); + if (newSources.length > (seenEvents[event.id]?.length ?? 0)) { + seenEvents[event.id] = newSources; + aggregatedController.add(event.copyWith(sources: newSources.toList())); + } + } } // Track oldest timestamp per relay @@ -453,9 +464,20 @@ class Requests { int? oldestTimestamp; for (final event in pageEvents) { - if (!seenEventIds.contains(event.id)) { - seenEventIds.add(event.id); + final existingSources = seenEvents[event.id]; + if (existingSources == null) { + // First time seeing this event + seenEvents[event.id] = event.sources.toSet(); aggregatedController.add(event); + } else { + // Merge sources if this event has new sources + if (event.sources.isNotEmpty) { + final newSources = existingSources..addAll(event.sources); + if (newSources.length > (seenEvents[event.id]?.length ?? 0)) { + seenEvents[event.id] = newSources; + aggregatedController.add(event.copyWith(sources: newSources.toList())); + } + } } // Track oldest timestamp for this relay if (oldestTimestamp == null || event.createdAt < oldestTimestamp) { diff --git a/packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart b/packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart index 7a296f9eb..845be9a26 100644 --- a/packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart +++ b/packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart @@ -6,7 +6,7 @@ import '../../entities/nip_01_event.dart'; /// given a stream with Nip01 events it tracks the id and adds the one to the provided stream controller \ /// tracking of the happens in the tracking list class StreamResponseCleaner { - final Set _trackingSet; + final Map> _trackingMap; // event_id -> set of sources final List> _inputStreams; final StreamController _outController; final List _eventOutFilters; @@ -24,7 +24,7 @@ class StreamResponseCleaner { required List> inputStreams, required StreamController outController, required List eventOutFilters, - }) : _trackingSet = trackingSet, + }) : _trackingMap = {for (var id in trackingSet) id: {}}, _outController = outController, _inputStreams = inputStreams, _eventOutFilters = eventOutFilters; @@ -37,16 +37,28 @@ class StreamResponseCleaner { void _addStreamListener(Stream stream) { stream.listen((event) { - // check if event id is in the set - if (_trackingSet.contains(event.id)) { + if (_outController.isClosed) { return; } - if (_outController.isClosed) { + // check if event id is already seen + final existingSources = _trackingMap[event.id]; + if (existingSources != null) { + // Event already seen - merge sources if this event has new sources + if (event.sources.isNotEmpty) { + final newSources = Set.from(existingSources)..addAll(event.sources); + // Only emit if we have new sources to add + if (newSources.length > existingSources.length) { + _trackingMap[event.id] = newSources; + final mergedEvent = event.copyWith(sources: newSources.toList()); + _outController.add(mergedEvent); + } + } return; } - _trackingSet.add(event.id); + // First time seeing this event + _trackingMap[event.id] = event.sources.toSet(); // check against filters for (final filter in _eventOutFilters) { diff --git a/packages/ndk/lib/presentation_layer/init.dart b/packages/ndk/lib/presentation_layer/init.dart index bb743bb9c..5b38f665f 100644 --- a/packages/ndk/lib/presentation_layer/init.dart +++ b/packages/ndk/lib/presentation_layer/init.dart @@ -122,6 +122,7 @@ class Initialization { bootstrapRelays: _ndkConfig.bootstrapRelays, eagerAuth: _ndkConfig.eagerAuth, authCallbackTimeout: _ndkConfig.authCallbackTimeout, + cacheManager: _ndkConfig.cache, ); engine = RelaySetsEngine( @@ -140,6 +141,7 @@ class Initialization { engineAdditionalDataFactory: JitEngineRelayConnectivityDataFactory(), eagerAuth: _ndkConfig.eagerAuth, authCallbackTimeout: _ndkConfig.authCallbackTimeout, + cacheManager: _ndkConfig.cache, ); engine = JitEngine( diff --git a/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart b/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart index 3a429357f..0a31508a0 100644 --- a/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart +++ b/packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart @@ -43,7 +43,8 @@ void main() async { final query = ndk.requests.query(filter: Filter(ids: [event.id])); final events = await query.future; - expect(events.first.sources.length, equals(2)); + // The last event should have all merged sources + expect(events.last.sources.length, equals(2)); await ndk.destroy(); await relay1.stopServer();