diff --git a/packages/stream_video/CHANGELOG.md b/packages/stream_video/CHANGELOG.md index be9741d16..2be2d570c 100644 --- a/packages/stream_video/CHANGELOG.md +++ b/packages/stream_video/CHANGELOG.md @@ -1,3 +1,9 @@ +## Upcoming + +### 🐞 Fixed +* Added handling for SFU `iceRestart` event — the client now correctly performs ICE restart and renegotiation when instructed by the SFU, improving fast reconnect reliability. +* Added PeerConnection SDP rollback on failed remote answer to prevent the publisher from getting stuck in an inconsistent signaling state. + ## 1.2.4 ### 🐞 Fixed diff --git a/packages/stream_video/lib/src/call/session/call_session.dart b/packages/stream_video/lib/src/call/session/call_session.dart index 68f3075c2..875fc46a0 100644 --- a/packages/stream_video/lib/src/call/session/call_session.dart +++ b/packages/stream_video/lib/src/call/session/call_session.dart @@ -491,6 +491,12 @@ class CallSession extends Disposable { _logger.v(() => '[fastReconnect] restarting ICE'); await rtcManager?.publisher?.pc.restartIce(); + final publisher = rtcManager?.publisher; + if (publisher != null) { + _logger.v(() => '[fastReconnect] triggering publisher renegotiation'); + await _onRenegotiationNeeded(publisher); + } + final remoteTracks = rtcManager!.tracks.values .whereType() .toList(); @@ -620,6 +626,8 @@ class CallSession extends Disposable { } else if (event is SfuChangePublishOptionsEvent) { _tracer.trace('PublishOptionsChanged', event.toJson()); await _onPublishOptionsChanged(event); + } else if (event is SfuIceRestartEvent) { + await _onIceRestart(event); } else if (event is SfuGoAwayEvent) { _tracer.trace('GoAway', event.toJson()); } else if (event is SfuErrorEvent) { @@ -767,6 +775,13 @@ class CallSession extends Disposable { } Future _onSubscriberOffer(SfuSubscriberOfferEvent event) async { + if (_isLeavingOrClosed || stateManager.callState.status.isDisconnected) { + _logger.w( + () => '[onSubscriberOffer] rejected (session is disconnecting)', + ); + return; + } + final offerSdp = event.sdp; _logger.v(() => '[onSubscriberOffer] event: $event'); @@ -786,6 +801,42 @@ class CallSession extends Disposable { _logger.v(() => '[onSubscriberOffer] result: $result'); } + Future _onIceRestart(SfuIceRestartEvent event) async { + _logger.d(() => '[onIceRestart] event: $event'); + + final peerType = event.peerType; + + if (peerType == StreamPeerType.publisher) { + final publisher = rtcManager?.publisher; + if (publisher == null) { + _logger.w(() => '[onIceRestart] publisher peer connection is null'); + return; + } + + _logger.i(() => '[onIceRestart] restarting ICE for publisher'); + final result = await publisher.restartIce(); + _logger.d(() => '[onIceRestart] publisher ICE restart result: $result'); + + // After marking ICE restart, explicitly trigger renegotiation to create + // a new offer with fresh ICE credentials and send it to the SFU. + // pc.restartIce() only sets a flag; we must create and send the offer. + _logger.i(() => '[onIceRestart] triggering publisher renegotiation'); + await _onRenegotiationNeeded(publisher); + } else if (peerType == StreamPeerType.subscriber) { + final subscriber = rtcManager?.subscriber; + if (subscriber == null) { + _logger.w(() => '[onIceRestart] subscriber peer connection is null'); + return; + } + + _logger.i(() => '[onIceRestart] restarting ICE for subscriber'); + final result = await subscriber.restartIce(); + _logger.d(() => '[onIceRestart] subscriber ICE restart result: $result'); + } else { + _logger.w(() => '[onIceRestart] unknown peer type: $peerType'); + } + } + void _onLocalTrackMuted(RtcLocalTrack track, bool muted) { _logger.d(() => '[onPublisherTrackMuted] track: $track'); @@ -863,17 +914,21 @@ class CallSession extends Disposable { _logger.v(() => '[onLocalIceCandidate] result: $result'); } - Future _onRenegotiationNeeded(StreamPeerConnection pc) async { + Future> _onRenegotiationNeeded(StreamPeerConnection pc) async { if (_isLeavingOrClosed || stateManager.callState.status.isDisconnected) { _logger.w(() => '[negotiate] call is disconnected'); - return; + return Result.error('Call is disconnected'); } await _negotiationLock.synchronized(() async { _logger.d(() => '[negotiate] type: ${pc.type}'); final offer = await pc.createOffer(); - if (offer is! Success) return; + if (offer is! Success) { + return Result.error( + 'Failed to create offer: ${offer.getErrorOrNull()}', + ); + } final sdp = offer.data.sdp; final tracksInfo = await rtcManager?.getAnnouncedTracks(sdp: sdp) ?? []; @@ -882,31 +937,40 @@ class CallSession extends Disposable { _logger.w( () => '[negotiate] rejected(tracksInfo is empty): $tracksInfo', ); - return; + return pc.rollbackLocalDescription(); } _logger.v(() => '[negotiate] announcing tracks: $tracksInfo'); - final pubResult = await sfuClient.setPublisher( - sfu.SetPublisherRequest( - sdp: sdp, - sessionId: sessionId, - tracks: tracksInfo.toDTO(), - ), - ); + try { + final pubResult = await sfuClient.setPublisher( + sfu.SetPublisherRequest( + sdp: sdp, + sessionId: sessionId, + tracks: tracksInfo.toDTO(), + ), + ); - if (pubResult is! Success) { - _logger.w(() => '[negotiate] #setPublisher; failed: $pubResult'); - return; - } + if (pubResult is! Success) { + _logger.w(() => '[negotiate] #setPublisher; failed: $pubResult'); + return pc.rollbackLocalDescription(); + } - if (pubResult.data.hasSdp()) { - final ansResult = await pc.setRemoteAnswer(pubResult.data.sdp); - if (ansResult is! Success) { - _logger.w(() => '[negotiate] #setRemoteAnswer; failed: $ansResult'); + if (pubResult.data.hasSdp()) { + final ansResult = await pc.setRemoteAnswer(pubResult.data.sdp); + if (ansResult is! Success) { + _logger.w( + () => '[negotiate] #setRemoteAnswer; failed: $ansResult', + ); + } } + } catch (e, stk) { + _logger.e(() => '[negotiate] failed: $e\n$stk'); + return pc.rollbackLocalDescription(); } }); + + return const Result.success(null); } Future _onRemoteTrackReceived( diff --git a/packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart b/packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart index 198e2dd72..2012db2ee 100644 --- a/packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart +++ b/packages/stream_video/lib/src/sfu/data/events/sfu_event_mapper_extensions.dart @@ -202,6 +202,11 @@ extension SfuEventMapper on sfu_events.SfuEvent { ); case sfu_events.SfuEvent_EventPayload.participantMigrationComplete: return const SfuParticipantMigrationCompleteEvent(); + case sfu_events.SfuEvent_EventPayload.iceRestart: + final payload = iceRestart; + return SfuIceRestartEvent( + peerType: payload.peerType.toDomain(), + ); default: return const SfuUnknownEvent(); } diff --git a/packages/stream_video/lib/src/sfu/data/events/sfu_events.dart b/packages/stream_video/lib/src/sfu/data/events/sfu_events.dart index 387fd152e..24fcd736b 100644 --- a/packages/stream_video/lib/src/sfu/data/events/sfu_events.dart +++ b/packages/stream_video/lib/src/sfu/data/events/sfu_events.dart @@ -34,6 +34,16 @@ class SfuUnknownEvent extends SfuEvent { const SfuUnknownEvent(); } +@internal +class SfuIceRestartEvent extends SfuEvent { + const SfuIceRestartEvent({required this.peerType}); + + final StreamPeerType peerType; + + @override + List get props => [peerType]; +} + @internal class SfuJoinResponseEvent extends SfuEvent { const SfuJoinResponseEvent({ diff --git a/packages/stream_video/lib/src/webrtc/peer_connection.dart b/packages/stream_video/lib/src/webrtc/peer_connection.dart index f1423b9f9..97e99d97f 100644 --- a/packages/stream_video/lib/src/webrtc/peer_connection.dart +++ b/packages/stream_video/lib/src/webrtc/peer_connection.dart @@ -179,9 +179,19 @@ class StreamPeerConnection extends Disposable { try { final localOffer = await pc.createOffer(mediaConstraints); final modifiedSdp = sdpEditor.edit(localOffer.sdp?.let(Sdp.localOffer)); + + if (modifiedSdp == null || modifiedSdp.isEmpty) { + _logger.w(() => '[createOffer] rejected (SDP is null/empty)'); + return Result.error('createOffer produced null/empty SDP'); + } + final modifiedOffer = localOffer.copyWith(sdp: modifiedSdp); - await setLocalDescription(modifiedOffer); + final setResult = await setLocalDescription(modifiedOffer); + if (setResult is Failure) { + return Result.failure(setResult.error); + } + return Result.success(modifiedOffer); } catch (e, stk) { return Result.failure(VideoErrors.compose(e, stk)); @@ -200,11 +210,23 @@ class StreamPeerConnection extends Disposable { ); final localAnswer = await pc.createAnswer(mediaConstraints); final modifiedSdp = sdpEditor.edit(localAnswer.sdp?.let(Sdp.localAnswer)); + + if (modifiedSdp == null || modifiedSdp.isEmpty) { + _logger.w( + () => '[createLocalAnswer] #$type; rejected (SDP is null/empty)', + ); + return Result.error('createAnswer produced null/empty SDP'); + } + final modifiedAnswer = localAnswer.copyWith(sdp: modifiedSdp); _logger.v( () => '[createLocalAnswer] #$type; sdp:\n${modifiedAnswer.sdp}', ); - await setLocalDescription(modifiedAnswer); + + final setResult = await setLocalDescription(modifiedAnswer); + if (setResult is Failure) { + return Result.failure(setResult.error); + } return Result.success(modifiedAnswer); } catch (e, stk) { return Result.failure(VideoErrors.compose(e, stk)); @@ -261,6 +283,29 @@ class StreamPeerConnection extends Disposable { } } + /// Rolls back the local description to the stable state if the peer + /// connection is currently in the `have-local-offer` signaling state. + Future> rollbackLocalDescription() async { + try { + final state = pc.signalingState; + if (state != rtc.RTCSignalingState.RTCSignalingStateHaveLocalOffer) { + return const Result.success(null); + } + + await pc.setLocalDescription( + rtc.RTCSessionDescription('', 'rollback'), + ); + + return const Result.success(null); + } catch (e, stk) { + _logger.w( + () => '[rollbackLocalDescription] #$type; failed: $e', + ); + + return Result.failure(VideoErrors.compose(e, stk)); + } + } + /// Adds an ice candidate to the peer connection. /// /// If the peer connection is not yet ready, the candidate is added to a list @@ -317,6 +362,11 @@ class StreamPeerConnection extends Disposable { ), ); + final params = transceiver.sender.parameters; + params.degradationPreference = + rtc.RTCDegradationPreference.MAINTAIN_FRAMERATE; + await transceiver.sender.setParameters(params); + return Result.success(transceiver); } catch (e, stk) { return Result.failure(VideoErrors.compose(e, stk)); diff --git a/packages/stream_video/lib/src/webrtc/transceiver_cache.dart b/packages/stream_video/lib/src/webrtc/transceiver_cache.dart index fa02ace64..1f246c595 100644 --- a/packages/stream_video/lib/src/webrtc/transceiver_cache.dart +++ b/packages/stream_video/lib/src/webrtc/transceiver_cache.dart @@ -62,8 +62,8 @@ class TransceiverManager { } /// Gets the last transceiver for the given track type and publish option id. - RTCRtpTransceiver? getWith(SfuTrackType trackType, int id) { - return _findTransceiver(trackType, id)?.transceiver; + RTCRtpTransceiver? getWith(SfuTrackType trackType, int publishOptionId) { + return _findTransceiver(trackType, publishOptionId)?.transceiver; } /// Checks if the cache has the given publish option.