Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions packages/ndk/lib/domain_layer/entities/broadcast_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class BroadcastState {
Nip01Event? event;

/// stream controller for state updates
final BehaviorSubject<BroadcastState> _stateUpdatesController =
BehaviorSubject<BroadcastState>();
final ReplaySubject<BroadcastState> _stateUpdatesController =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be simplified

  • BehaviorSubject -> ReplaySubject(maxSize: 1) in broadcast_state.dart:49 is likely optional. The real fix is emitting a final state before dispose (broadcast_state.dart:120) and handling empty relay set.
    • If there isn’t a specific replay semantic requirement, this swap may be unnecessary churn.
  • The “if empty -> close networkController” logic is duplicated across engines. Could be centralized into a small helper (engine utility or method on BroadcastState) to reduce branching and future drift.

ReplaySubject<BroadcastState>(maxSize: 1);

/// [networkController] used by relay manger to write responses
StreamController<RelayBroadcastResponse> networkController =
Expand All @@ -65,11 +65,16 @@ class BroadcastState {
/// completes when all relays have responded or timed out
/// first string is the relay url, second is the response
bool get publishDone {
// If no relays were registered and the engine closed the controller, it's done
if (broadcasts.isEmpty) {
return networkController.isClosed;
}

// Check if all relays have responded (success or failure)
final allResponded = broadcasts.values.every(
(element) => element.okReceived || element.msg.isNotEmpty,
);
if (allResponded && broadcasts.isNotEmpty) {
if (allResponded) {
return true;
}

Expand Down Expand Up @@ -98,6 +103,8 @@ class BroadcastState {
_stateUpdatesController.add(this);
// check if all relays responded
_checkBroadcastDone();
}, onDone: () {
_checkBroadcastDone();
});

Future.delayed(timeout, () {
Expand All @@ -110,6 +117,7 @@ class BroadcastState {

void _checkBroadcastDone() {
if (publishDone) {
_stateUpdatesController.add(this);
_dispose();
}
}
Expand Down
13 changes: 10 additions & 3 deletions packages/ndk/lib/domain_layer/usecases/jit_engine/jit_engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,22 @@ class JitEngine with Logger implements NetworkEngine {

if (specificRelays != null) {
final cleanedSpecificRelays = cleanRelayUrls(specificRelays.toList());
return RelayJitBroadcastSpecificRelaysStrategy.broadcast(
await RelayJitBroadcastSpecificRelaysStrategy.broadcast(
specificRelays: cleanedSpecificRelays,
relayManager: relayManagerLight,
eventToPublish: workingNostrEvent,
connectedRelays: relayManagerLight.connectedRelays
.whereType<RelayConnectivity<JitEngineRelayConnectivityData>>()
.toList(),
);
if (broadcastState.broadcasts.isEmpty) {
broadcastState.networkController.close();
}
return;
}

// default publish to own outbox
RelayJitBroadcastOutboxStrategy.broadcast(
await RelayJitBroadcastOutboxStrategy.broadcast(
eventToPublish: workingNostrEvent,
connectedRelays: relayManagerLight.connectedRelays
.whereType<RelayConnectivity<JitEngineRelayConnectivityData>>()
Expand All @@ -209,7 +213,7 @@ class JitEngine with Logger implements NetworkEngine {
// check if we need to publish to others inboxes
if (workingNostrEvent.pTags.isNotEmpty &&
workingNostrEvent.kind != ContactList.kKind) {
RelayJitBroadcastOtherReadStrategy.broadcast(
await RelayJitBroadcastOtherReadStrategy.broadcast(
eventToPublish: workingNostrEvent,
connectedRelays: relayManagerLight.connectedRelays
.whereType<RelayConnectivity<JitEngineRelayConnectivityData>>()
Expand All @@ -219,6 +223,9 @@ class JitEngine with Logger implements NetworkEngine {
pubkeysOfInbox: workingNostrEvent.pTags,
);
}
if (broadcastState.broadcasts.isEmpty) {
broadcastState.networkController.close();
}
}

asyncStuff();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,22 @@ class RelayJitBroadcastOtherReadStrategy {
continue;
}

relayManager
.connectRelay(
final success = await relayManager.connectRelay(
dirtyUrl: relayUrl,
connectionSource: ConnectionSource.broadcastOther,
)
.then((success) {
if (!success.first) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
return;
}
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
);
if (!success.first) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
continue;
}
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);

sendToRelay(relay: relay);
});
sendToRelay(relay: relay);
Comment on lines +91 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing error handling for relay lookup after connection.

Same issue as relay_jit_broadcast_own.dart: firstWhere can throw StateError if the relay isn't found in connectedRelays. This should be wrapped in try-catch for consistency with relay_jit_broadcast_specific.dart.

Proposed fix: wrap in try-catch for consistency
       if (!success.first) {
         relayManager.failBroadcast(
           eventToPublish.id,
           relayUrl,
           "connection failed",
         );
         continue;
       }
-      final relay = relayManager.connectedRelays
-          .firstWhere((element) => element.url == relayUrl);
-
-      sendToRelay(relay: relay);
+      try {
+        final relay = relayManager.connectedRelays
+            .firstWhere((element) => element.url == relayUrl);
+        sendToRelay(relay: relay);
+      } catch (e) {
+        relayManager.failBroadcast(
+          eventToPublish.id,
+          relayUrl,
+          "relay not found after connection",
+        );
+      }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
sendToRelay(relay: relay);
});
sendToRelay(relay: relay);
if (!success.first) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
continue;
}
try {
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
sendToRelay(relay: relay);
} catch (e) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"relay not found after connection",
);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_other_read.dart`
around lines 91 - 94, The code uses
relayManager.connectedRelays.firstWhere((element) => element.url == relayUrl)
which can throw a StateError if no relay matches; wrap that lookup in a
try-catch (or use firstWhere with orElse) to handle the missing-relay case, and
only call sendToRelay(relay: relay) when a relay is found; ensure the catch logs
or handles the error similarly to relay_jit_broadcast_specific.dart so behavior
is consistent with other strategies.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,22 @@ class RelayJitBroadcastOutboxStrategy {
);
continue;
}
relayManager
.connectRelay(
final success = await relayManager.connectRelay(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to do await here?
Before we were not blocking on a relay connect, since it was using .then
That means the code kicked off many relay connection attempts and callbacks quickly, instead of waiting relay-by-relay.

AI says:
I’d parallelize JIT at the per-relay level, same idea as relay-sets, while keeping per-relay failure isolated.
Core pattern

  • Keep one async function per relay (_sendToRelayUrl) that:
    • registers broadcast
    • checks connected / reconnects
    • sends or calls failBroadcast
    • catches its own errors so one relay never aborts others
  • Run all relay tasks with Future.wait(...).
    Example for relay_jit_broadcast_own.dart:
    static Future broadcast({
    required Nip01Event eventToPublish,
    required CacheManager cacheManager,
    required RelayManager relayManager,
    required List bootstrapRelays,
    // connectedRelays can be removed; use relayManager lookup
    }) async {
    final nip65Data = await UserRelayLists.getUserRelayListCacheLatestSingle(
    pubkey: eventToPublish.pubKey,
    cacheManager: cacheManager,
    );
    final urls = (nip65Data == null)
    ? bootstrapRelays
    : nip65Data.relays.entries
    .where((e) => e.value.isWrite)
    .map((e) => e.key)
    .toList();
    Future sendToUrl(String relayUrl) async {
    relayManager.registerRelayBroadcast(
    eventToPublish: eventToPublish,
    relayUrl: relayUrl,
    );
    try {
    final isConnected = relayManager.isRelayConnected(relayUrl);
    if (!isConnected) {
    final ok = await relayManager.connectRelay(
    dirtyUrl: relayUrl,
    connectionSource: ConnectionSource.broadcastOwn,
    );
    if (!ok.first) {
    relayManager.failBroadcast(eventToPublish.id, relayUrl, "connection failed");
    return;
    }
    }
    final relay = relayManager.getRelayConnectivity(relayUrl);
    if (relay == null) {
    relayManager.failBroadcast(eventToPublish.id, relayUrl, "relay not found");
    return;
    }
    relayManager.send(
    relay,
    ClientMsg(ClientMsgType.kEvent, event: eventToPublish),
    );
    } catch (e) {
    relayManager.failBroadcast(eventToPublish.id, relayUrl, "broadcast error: $e");
    }
    }
    await Future.wait(urls.map(sendToUrl), eagerError: false);
    }
    Apply same shape to:
  • relay_jit_broadcast_other_read.dart
  • relay_jit_broadcast_specific.dart
    Two extra improvements I’d do while there:
  • Deduplicate relay URLs before Future.wait (urls.toSet().toList()), to avoid duplicate work.
  • Optionally add bounded parallelism (batch size like 5–10) if opening too many sockets at once is a concern.

dirtyUrl: relayUrl,
connectionSource: ConnectionSource.broadcastOwn,
)
.then((success) {
if (!success.first) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
return;
}
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
);
if (!success.first) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
continue;
}
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);

sendToRelay(relay: relay);
});
sendToRelay(relay: relay);
Comment on lines +82 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing error handling for relay lookup after connection.

firstWhere throws StateError if no matching element is found. Unlike relay_jit_broadcast_specific.dart (lines 64-74), this code lacks a try-catch wrapper. If connectRelay succeeds but the relay isn't in connectedRelays, the loop will throw instead of continuing gracefully.

Proposed fix: wrap in try-catch for consistency
       final success = await relayManager.connectRelay(
         dirtyUrl: relayUrl,
         connectionSource: ConnectionSource.broadcastOwn,
       );
       if (!success.first) {
         relayManager.failBroadcast(
           eventToPublish.id,
           relayUrl,
           "connection failed",
         );
         continue;
       }
-      final relay = relayManager.connectedRelays
-          .firstWhere((element) => element.url == relayUrl);
-
-      sendToRelay(relay: relay);
+      try {
+        final relay = relayManager.connectedRelays
+            .firstWhere((element) => element.url == relayUrl);
+        sendToRelay(relay: relay);
+      } catch (e) {
+        relayManager.failBroadcast(
+          eventToPublish.id,
+          relayUrl,
+          "relay not found after connection",
+        );
+      }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/ndk/lib/domain_layer/usecases/jit_engine/relay_jit_broadcast_strategies/relay_jit_broadcast_own.dart`
around lines 82 - 85, The relay lookup using
relayManager.connectedRelays.firstWhere (the block that finds a relay by
relayUrl before calling sendToRelay) can throw StateError if no match is found;
wrap that lookup in a try-catch (matching the pattern used in
relay_jit_broadcast_specific.dart) so if the relay isn't present after
connectRelay we catch the error, skip/continue the loop and do not call
sendToRelay, and optionally log the missing-relay case; update the code around
firstWhere and sendToRelay to handle the exception gracefully.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,29 @@ class RelayJitBroadcastSpecificRelaysStrategy {
continue;
}

relayManager
.reconnectRelay(
final success = await relayManager.reconnectRelay(
relayUrl,
connectionSource: ConnectionSource.broadcastSpecific,
)
.then((success) {
if (!success) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
return;
}
try {
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
sendToRelay(relay: relay);
} catch (e) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"relay not found after connection",
);
}
});
);
if (!success) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"connection failed",
);
continue;
}
try {
final relay = relayManager.connectedRelays
.firstWhere((element) => element.url == relayUrl);
sendToRelay(relay: relay);
} catch (e) {
relayManager.failBroadcast(
eventToPublish.id,
relayUrl,
"relay not found after connection",
);
}
}
}
}
7 changes: 3 additions & 4 deletions packages/ndk/lib/domain_layer/usecases/relay_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ class RelayManager<T> {
});
await relayConnectivity.relayTransport!.ready.timeout(
Duration(seconds: connectTimeout),
onTimeout: () {
Logger.log.w(() => "timed out connecting to relay $url");
},
);

_startListeningToSocket(relayConnectivity);
Expand All @@ -221,7 +218,7 @@ class RelayManager<T> {
return Tuple(true, "");
} catch (e) {
Logger.log.e(() => "!! could not connect to $url -> $e");
relayConnectivity!.relayTransport == null;
relayConnectivity!.relayTransport = null;
}
relayConnectivity.relay.failedToConnect();
relayConnectivity.stats.connectionErrors++;
Expand All @@ -246,6 +243,7 @@ class RelayManager<T> {
);
}
if (relayConnectivity == null ||
relayConnectivity.relayTransport == null ||
!relayConnectivity.relayTransport!.isOpen()) {
if (!force &&
(relayConnectivity != null &&
Expand All @@ -267,6 +265,7 @@ class RelayManager<T> {
}
relayConnectivity = globalState.relays[url];
if (relayConnectivity == null ||
relayConnectivity.relayTransport == null ||
!relayConnectivity.relayTransport!.isOpen()) {
// web socket is not open
return false;
Expand Down
Loading
Loading