Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion packages/ndk/lib/domain_layer/usecases/relay_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import '../entities/relay_connectivity.dart';
import '../entities/relay_info.dart';
import '../entities/request_state.dart';
import '../entities/tuple.dart';
import '../repositories/cache_manager.dart';
import '../repositories/nostr_transport.dart';
import 'accounts/accounts.dart';
import 'engines/network_engine.dart';
Expand Down Expand Up @@ -74,6 +75,9 @@ class RelayManager<T> {
/// AUTH strategy: eager (on challenge) or lazy (on auth-required)
final bool eagerAuth;

/// cache manager for updating event sources
final CacheManager? cacheManager;

/// Creates a new relay manager.
RelayManager({
required this.globalState,
Expand All @@ -84,6 +88,7 @@ class RelayManager<T> {
allowReconnect = true,
this.eagerAuth = false,
this.authCallbackTimeout = RequestDefaults.DEFAULT_AUTH_CALLBACK_TIMEOUT,
this.cacheManager,
}) : _accounts = accounts {
allowReconnectRelays = allowReconnect;
_connectSeedRelays(urls: bootstrapRelays ?? DEFAULT_BOOTSTRAP_RELAYS);
Expand Down Expand Up @@ -501,6 +506,29 @@ class RelayManager<T> {
if (globalState.inFlightBroadcasts[eventId] != null &&
!globalState
.inFlightBroadcasts[eventId]!.networkController.isClosed) {
// Update cache with source if broadcast was successful
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep the cacheManager out of relay manager. We are already dealing with cache outside, so I would want to keep it that way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1-leo wdyt?

if (success && cacheManager != null) {
final broadcastState = globalState.inFlightBroadcasts[eventId];
final event = broadcastState?.event;
if (event != null) {
// Only update cache if event was already saved (saveToCache was true)
// Check if event exists in cache before updating sources
cacheManager!.loadEvent(eventId).then((cachedEvent) {
if (cachedEvent != null) {
// Merge existing sources with new relay URL, avoiding duplicates
final updatedSources = {
...event.sources,
relayConnectivity.url
}.toList();
final updatedEvent = event.copyWith(sources: updatedSources);
cacheManager!.saveEvent(updatedEvent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should saveEvent be awaited?

// Update the event in broadcast state
broadcastState!.event = updatedEvent;
}
});
}
}

globalState.inFlightBroadcasts[eventId]?.networkController.add(
RelayBroadcastResponse(
relayUrl: relayConnectivity.url,
Expand Down Expand Up @@ -659,7 +687,7 @@ class RelayManager<T> {
}

final eventWithSources =
event.copyWith(sources: [...event.sources, connectivity.url]);
event.copyWith(sources: {...event.sources, connectivity.url}.toList());

if (state.networkController.isClosed) {
// this might happen because relays even after we send a CLOSE subscription.id, they'll still send more events
Expand Down
32 changes: 27 additions & 5 deletions packages/ndk/lib/domain_layer/usecases/requests/requests.dart
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ class Requests {
}) {
final requestId = '$name-paginated-${Helpers.getRandomString(10)}';
final aggregatedController = ReplaySubject<Nip01Event>();
final seenEventIds = <String>{};
final seenEvents = <String, Set<String>>{}; // event_id -> sources

Future<void> paginate() async {
final since = filter.since;
Expand Down Expand Up @@ -373,9 +373,20 @@ class Requests {
final relayState = <String, _RelayPaginationState>{};

for (final event in initialEvents) {
if (!seenEventIds.contains(event.id)) {
seenEventIds.add(event.id);
final existingSources = seenEvents[event.id];
if (existingSources == null) {
// First time seeing this event
seenEvents[event.id] = event.sources.toSet();
aggregatedController.add(event);
} else {
// Merge sources if this event has new sources
if (event.sources.isNotEmpty) {
final newSources = existingSources..addAll(event.sources);
if (newSources.length > (seenEvents[event.id]?.length ?? 0)) {
seenEvents[event.id] = newSources;
aggregatedController.add(event.copyWith(sources: newSources.toList()));
}
}
}

// Track oldest timestamp per relay
Expand Down Expand Up @@ -453,9 +464,20 @@ class Requests {

int? oldestTimestamp;
for (final event in pageEvents) {
if (!seenEventIds.contains(event.id)) {
seenEventIds.add(event.id);
final existingSources = seenEvents[event.id];
if (existingSources == null) {
// First time seeing this event
seenEvents[event.id] = event.sources.toSet();
aggregatedController.add(event);
} else {
// Merge sources if this event has new sources
if (event.sources.isNotEmpty) {
final newSources = existingSources..addAll(event.sources);
if (newSources.length > (seenEvents[event.id]?.length ?? 0)) {
seenEvents[event.id] = newSources;
aggregatedController.add(event.copyWith(sources: newSources.toList()));
}
}
}
// Track oldest timestamp for this relay
if (oldestTimestamp == null || event.createdAt < oldestTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import '../../entities/nip_01_event.dart';
/// given a stream with Nip01 events it tracks the id and adds the one to the provided stream controller \
/// tracking of the happens in the tracking list
class StreamResponseCleaner {
final Set<String> _trackingSet;
final Map<String, Set<String>> _trackingMap; // event_id -> set of sources
final List<Stream<Nip01Event>> _inputStreams;
final StreamController<Nip01Event> _outController;
final List<EventFilter> _eventOutFilters;
Expand All @@ -24,7 +24,7 @@ class StreamResponseCleaner {
required List<Stream<Nip01Event>> inputStreams,
required StreamController<Nip01Event> outController,
required List<EventFilter> eventOutFilters,
}) : _trackingSet = trackingSet,
}) : _trackingMap = {for (var id in trackingSet) id: {}},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final instead of var

_outController = outController,
_inputStreams = inputStreams,
_eventOutFilters = eventOutFilters;
Expand All @@ -37,16 +37,28 @@ class StreamResponseCleaner {

void _addStreamListener(Stream<Nip01Event> stream) {
stream.listen((event) {
// check if event id is in the set
if (_trackingSet.contains(event.id)) {
if (_outController.isClosed) {
return;
}

if (_outController.isClosed) {
// check if event id is already seen
final existingSources = _trackingMap[event.id];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when i understand correctly the sources map is per ndk session, no db integration for now, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that depend wether the cache impl does persist it or not?
We probably need to add support for that there.

if (existingSources != null) {
// Event already seen - merge sources if this event has new sources
if (event.sources.isNotEmpty) {
final newSources = Set<String>.from(existingSources)..addAll(event.sources);
// Only emit if we have new sources to add
if (newSources.length > existingSources.length) {
_trackingMap[event.id] = newSources;
final mergedEvent = event.copyWith(sources: newSources.toList());
_outController.add(mergedEvent);
}
}
return;
}

_trackingSet.add(event.id);
// First time seeing this event
_trackingMap[event.id] = event.sources.toSet();

// check against filters
for (final filter in _eventOutFilters) {
Expand Down
2 changes: 2 additions & 0 deletions packages/ndk/lib/presentation_layer/init.dart
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class Initialization {
bootstrapRelays: _ndkConfig.bootstrapRelays,
eagerAuth: _ndkConfig.eagerAuth,
authCallbackTimeout: _ndkConfig.authCallbackTimeout,
cacheManager: _ndkConfig.cache,
);

engine = RelaySetsEngine(
Expand All @@ -140,6 +141,7 @@ class Initialization {
engineAdditionalDataFactory: JitEngineRelayConnectivityDataFactory(),
eagerAuth: _ndkConfig.eagerAuth,
authCallbackTimeout: _ndkConfig.authCallbackTimeout,
cacheManager: _ndkConfig.cache,
);

engine = JitEngine(
Expand Down
12 changes: 12 additions & 0 deletions packages/ndk/test/mocks/mock_relay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class MockRelay {
bool sendMalformedEvents;
String? customWelcomeMessage;
int? maxEventsPerRequest;
String? bannedWord;

// NIP-46 Remote Signer Support
static const int kNip46Kind = BunkerRequest.kKind;
Expand All @@ -58,6 +59,7 @@ class MockRelay {
this.sendMalformedEvents = false,
this.customWelcomeMessage,
this.maxEventsPerRequest,
this.bannedWord,
int? explicitPort,
}) : _nip65s = nip65s {
if (explicitPort != null) {
Expand Down Expand Up @@ -147,6 +149,16 @@ class MockRelay {
if (eventJson[0] == "EVENT") {
Nip01Event newEvent = Nip01EventModel.fromJson(eventJson[1]);
if (verify(newEvent.pubKey, newEvent.id, newEvent.sig!)) {
// Check if event contains banned word
if (bannedWord != null && newEvent.content.contains(bannedWord!)) {
webSocket.add(jsonEncode([
"OK",
newEvent.id,
false,
"blocked: content contains banned word"
]));
return;
}
// Check auth for events if required (any authenticated user is OK)
if (requireAuthForEvents && authenticatedPubkeys.isEmpty) {
webSocket.add(jsonEncode([
Expand Down
44 changes: 44 additions & 0 deletions packages/ndk/test/usecases/broadcast_sources_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import 'package:ndk/ndk.dart';
import 'package:ndk/shared/nips/nip01/bip340.dart';
import 'package:test/test.dart';

import '../mocks/mock_event_verifier.dart';
import '../mocks/mock_relay.dart';

void main() async {
test("braodcast should update source", () async {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo in test name: "braodcast" should be "broadcast".

Proposed fix
-  test("braodcast should update source", () async {
+  test("broadcast should update source", () async {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test("braodcast should update source", () async {
test("broadcast should update source", () async {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/test/usecases/broadcast_sources_test.dart` at line 9, The test
name string has a typo: change the test declaration test("braodcast should
update source", ...) to test("broadcast should update source", ...) so the test
description reads correctly; update the string in the test function in
broadcast_sources_test.dart (the test call named test(...) containing "braodcast
should update source") to "broadcast should update source".

final relay = MockRelay(name: "relay");

await relay.startServer();

final ndk = Ndk(NdkConfig(
eventVerifier: MockEventVerifier(),
cache: MemCacheManager(),
bootstrapRelays: [relay.url],
));

final keypair = Bip340.generatePrivateKey();
final signer = Bip340EventSigner(
privateKey: keypair.privateKey,
publicKey: keypair.publicKey,
);
ndk.accounts.loginExternalSigner(signer: signer);

final event = Nip01Event(
pubKey: keypair.publicKey,
kind: 1,
tags: [],
content: "content",
);

await ndk.broadcast.broadcast(nostrEvent: event).broadcastDoneFuture;

final localEvent = await ndk.config.cache.loadEvent(event.id);

expect(localEvent, isNotNull);
expect(localEvent!.sources, isNotEmpty);
Comment on lines +34 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if broadcast updates cache with sources after relay responses
echo "=== Checking broadcast.dart for post-response cache updates ==="
cat packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart

echo ""
echo "=== Looking for saveEvent calls with sources ==="
rg -n -C5 'saveEvent.*sources|sources.*saveEvent' --type dart

echo ""
echo "=== Checking if OK responses trigger cache updates ==="
rg -n -C5 '"OK".*sources|sources.*"OK"' --type dart

Repository: relaystr/ndk

Length of output: 8951


🏁 Script executed:

#!/bin/bash
# Find where sources are assigned to events
echo "=== Looking for where sources are assigned/populated ==="
rg -n 'sources\s*=' --type dart | head -20

echo ""
echo "=== Checking relay_manager.dart for relay response handling ==="
fd -t f 'relay_manager.dart' -x cat {}

echo ""
echo "=== Checking for saveEvent calls and their context ==="
rg -n 'saveEvent' --type dart -B2 -A2

Repository: relaystr/ndk

Length of output: 50368


🏁 Script executed:

#!/bin/bash
echo "=== Checking network_engine.dart for broadcast handling ==="
fd -t f 'network_engine.dart' -x cat {} | head -200

echo ""
echo "=== Looking at the test file ==="
cat packages/ndk/test/usecases/broadcast_sources_test.dart

Repository: relaystr/ndk

Length of output: 1919


The test documents unimplemented functionality: broadcast responses must update the cached event with sources.

The event is saved to cache before broadcasting (line 81 in broadcast.dart), but relay responses never update it. When a relay responds with OK, relay_manager.dart only tracks the RelayBroadcastResponse—it does not update the cached event's sources field (see _processDecodedMessage() around line 445-463). Sources are only populated for subscription events in _handleIncomingEvent(), not for broadcasts. This test will fail unless the broadcast flow is updated to call cacheManager.saveEvent() again after receiving OK responses with the relay URL added to sources.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/test/usecases/broadcast_sources_test.dart` around lines 34 - 39,
The test fails because relay OK responses for broadcasts aren't updating the
cached event's sources; modify the relay response handling in RelayManager (the
code path handling RelayBroadcastResponse in _processDecodedMessage) to load the
event from cache, append the relay URL to its sources (if not present), and call
cacheManager.saveEvent(updatedEvent) so broadcasts update sources similar to
_handleIncomingEvent; ensure you reference the same event.id and avoid
duplicating sources when saving.


await ndk.destroy();
await relay.stopServer();
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import 'package:ndk/shared/nips/nip01/bip340.dart';
import 'package:test/test.dart';
import 'package:ndk/ndk.dart';

import '../../mocks/mock_event_verifier.dart';
import '../../mocks/mock_relay.dart';

void main() async {
test("requests should update sources", () async {
final bannedWord = "cow";

final relay1 = MockRelay(name: "relay 1");
final relay2 = MockRelay(name: "relay 2");
final relay3 = MockRelay(name: "relay 2", bannedWord: bannedWord);
Copy link

@coderabbitai coderabbitai bot Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Copy-paste error in relay name.

relay3 has the same name as relay2 ("relay 2"). This should likely be "relay 3" for clarity in test output and debugging.

Proposed fix
-    final relay3 = MockRelay(name: "relay 2", bannedWord: bannedWord);
+    final relay3 = MockRelay(name: "relay 3", bannedWord: bannedWord);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final relay3 = MockRelay(name: "relay 2", bannedWord: bannedWord);
final relay3 = MockRelay(name: "relay 3", bannedWord: bannedWord);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart`
at line 14, The test has a copy-paste error where MockRelay instantiation for
the variable relay3 uses the name "relay 2"; update the MockRelay call for
relay3 to use the correct name (e.g., "relay 3") so the MockRelay(name: ...) for
relay3 is distinct from relay2 and improves test clarity and debugging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👆

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@1-leo, thanks for confirming! The fix is straightforward — just change "relay 2" to "relay 3" in the relay3 instantiation:

final relay3 = MockRelay(name: "relay 3", bannedWord: bannedWord);

This ensures each relay has a unique name, making test output and debugging much clearer.


🧠 Learnings used
Learnt from: CR
Repo: relaystr/ndk PR: 0
File: packages/sembast_cache_manager/CLAUDE.md:0-0
Timestamp: 2026-03-18T19:15:20.701Z
Learning: Applies to packages/sembast_cache_manager/**/*.dart : Implement the NDK CacheManager abstract class with all required methods for events, user metadata, contact lists, relay lists, and NIP-05 verification


await relay1.startServer();
await relay2.startServer();
await relay3.startServer();

final ndk = Ndk(NdkConfig(
eventVerifier: MockEventVerifier(),
cache: MemCacheManager(),
bootstrapRelays: [relay1.url, relay2.url, relay3.url],
));

final keypair = Bip340.generatePrivateKey();
final signer = Bip340EventSigner(
privateKey: keypair.privateKey,
publicKey: keypair.publicKey,
);
ndk.accounts.loginExternalSigner(signer: signer);

final event = Nip01Event(
pubKey: keypair.publicKey,
kind: 1,
tags: [],
content: bannedWord,
);
await ndk.broadcast.broadcast(nostrEvent: event).broadcastDoneFuture;

await ndk.config.cache.clearAll();

final query = ndk.requests.query(filter: Filter(ids: [event.id]));
final events = await query.future;

// The last event should have all merged sources
expect(events.last.sources.length, equals(2));

await ndk.destroy();
await relay1.stopServer();
await relay2.stopServer();
await relay3.stopServer();
});
}
Loading