Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/stream_video/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Upcoming

### 🐞 Fixed
* Added handling for SFU `iceRestart` event — the client now correctly performs ICE restart and renegotiation when instructed by the SFU, improving fast reconnect reliability.
* Added PeerConnection SDP rollback on failed remote answer to prevent the publisher from getting stuck in an inconsistent signaling state.

## 1.2.4

### 🐞 Fixed
Expand Down
102 changes: 83 additions & 19 deletions packages/stream_video/lib/src/call/session/call_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,12 @@ class CallSession extends Disposable {
_logger.v(() => '[fastReconnect] restarting ICE');
await rtcManager?.publisher?.pc.restartIce();

final publisher = rtcManager?.publisher;
if (publisher != null) {
_logger.v(() => '[fastReconnect] triggering publisher renegotiation');
await _onRenegotiationNeeded(publisher);
}

final remoteTracks = rtcManager!.tracks.values
.whereType<RtcRemoteTrack>()
.toList();
Expand Down Expand Up @@ -620,6 +626,8 @@ class CallSession extends Disposable {
} else if (event is SfuChangePublishOptionsEvent) {
_tracer.trace('PublishOptionsChanged', event.toJson());
await _onPublishOptionsChanged(event);
} else if (event is SfuIceRestartEvent) {
await _onIceRestart(event);
} else if (event is SfuGoAwayEvent) {
_tracer.trace('GoAway', event.toJson());
} else if (event is SfuErrorEvent) {
Expand Down Expand Up @@ -767,6 +775,13 @@ class CallSession extends Disposable {
}

Future<void> _onSubscriberOffer(SfuSubscriberOfferEvent event) async {
if (_isLeavingOrClosed || stateManager.callState.status.isDisconnected) {
_logger.w(
() => '[onSubscriberOffer] rejected (session is disconnecting)',
);
return;
}

final offerSdp = event.sdp;
_logger.v(() => '[onSubscriberOffer] event: $event');

Expand All @@ -786,6 +801,42 @@ class CallSession extends Disposable {
_logger.v(() => '[onSubscriberOffer] result: $result');
}

Future<void> _onIceRestart(SfuIceRestartEvent event) async {
_logger.d(() => '[onIceRestart] event: $event');

final peerType = event.peerType;

if (peerType == StreamPeerType.publisher) {
final publisher = rtcManager?.publisher;
if (publisher == null) {
_logger.w(() => '[onIceRestart] publisher peer connection is null');
return;
}

_logger.i(() => '[onIceRestart] restarting ICE for publisher');
final result = await publisher.restartIce();
_logger.d(() => '[onIceRestart] publisher ICE restart result: $result');

// After marking ICE restart, explicitly trigger renegotiation to create
// a new offer with fresh ICE credentials and send it to the SFU.
// pc.restartIce() only sets a flag; we must create and send the offer.
_logger.i(() => '[onIceRestart] triggering publisher renegotiation');
await _onRenegotiationNeeded(publisher);
} else if (peerType == StreamPeerType.subscriber) {
final subscriber = rtcManager?.subscriber;
if (subscriber == null) {
_logger.w(() => '[onIceRestart] subscriber peer connection is null');
return;
}

_logger.i(() => '[onIceRestart] restarting ICE for subscriber');
final result = await subscriber.restartIce();
_logger.d(() => '[onIceRestart] subscriber ICE restart result: $result');
} else {
_logger.w(() => '[onIceRestart] unknown peer type: $peerType');
}
}

void _onLocalTrackMuted(RtcLocalTrack track, bool muted) {
_logger.d(() => '[onPublisherTrackMuted] track: $track');

Expand Down Expand Up @@ -863,17 +914,21 @@ class CallSession extends Disposable {
_logger.v(() => '[onLocalIceCandidate] result: $result');
}

Future<void> _onRenegotiationNeeded(StreamPeerConnection pc) async {
Future<Result<void>> _onRenegotiationNeeded(StreamPeerConnection pc) async {
if (_isLeavingOrClosed || stateManager.callState.status.isDisconnected) {
_logger.w(() => '[negotiate] call is disconnected');
return;
return Result.error('Call is disconnected');
}

await _negotiationLock.synchronized(() async {
_logger.d(() => '[negotiate] type: ${pc.type}');

final offer = await pc.createOffer();
if (offer is! Success<rtc.RTCSessionDescription>) return;
if (offer is! Success<rtc.RTCSessionDescription>) {
return Result<void>.error(
'Failed to create offer: ${offer.getErrorOrNull()}',
);
}

final sdp = offer.data.sdp;
final tracksInfo = await rtcManager?.getAnnouncedTracks(sdp: sdp) ?? [];
Expand All @@ -882,31 +937,40 @@ class CallSession extends Disposable {
_logger.w(
() => '[negotiate] rejected(tracksInfo is empty): $tracksInfo',
);
return;
return pc.rollbackLocalDescription();
}

_logger.v(() => '[negotiate] announcing tracks: $tracksInfo');

final pubResult = await sfuClient.setPublisher(
sfu.SetPublisherRequest(
sdp: sdp,
sessionId: sessionId,
tracks: tracksInfo.toDTO(),
),
);
try {
final pubResult = await sfuClient.setPublisher(
sfu.SetPublisherRequest(
sdp: sdp,
sessionId: sessionId,
tracks: tracksInfo.toDTO(),
),
);

if (pubResult is! Success<sfu.SetPublisherResponse>) {
_logger.w(() => '[negotiate] #setPublisher; failed: $pubResult');
return;
}
if (pubResult is! Success<sfu.SetPublisherResponse>) {
_logger.w(() => '[negotiate] #setPublisher; failed: $pubResult');
return pc.rollbackLocalDescription();
}

if (pubResult.data.hasSdp()) {
final ansResult = await pc.setRemoteAnswer(pubResult.data.sdp);
if (ansResult is! Success<void>) {
_logger.w(() => '[negotiate] #setRemoteAnswer; failed: $ansResult');
if (pubResult.data.hasSdp()) {
final ansResult = await pc.setRemoteAnswer(pubResult.data.sdp);
if (ansResult is! Success<void>) {
_logger.w(
() => '[negotiate] #setRemoteAnswer; failed: $ansResult',
);
}
}
} catch (e, stk) {
_logger.e(() => '[negotiate] failed: $e\n$stk');
return pc.rollbackLocalDescription();
}
});

return const Result.success(null);
}

Future<void> _onRemoteTrackReceived(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ extension SfuEventMapper on sfu_events.SfuEvent {
);
case sfu_events.SfuEvent_EventPayload.participantMigrationComplete:
return const SfuParticipantMigrationCompleteEvent();
case sfu_events.SfuEvent_EventPayload.iceRestart:
final payload = iceRestart;
return SfuIceRestartEvent(
peerType: payload.peerType.toDomain(),
);
default:
return const SfuUnknownEvent();
}
Expand Down
10 changes: 10 additions & 0 deletions packages/stream_video/lib/src/sfu/data/events/sfu_events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ class SfuUnknownEvent extends SfuEvent {
const SfuUnknownEvent();
}

@internal
class SfuIceRestartEvent extends SfuEvent {
const SfuIceRestartEvent({required this.peerType});

final StreamPeerType peerType;

@override
List<Object?> get props => [peerType];
}

@internal
class SfuJoinResponseEvent extends SfuEvent {
const SfuJoinResponseEvent({
Expand Down
54 changes: 52 additions & 2 deletions packages/stream_video/lib/src/webrtc/peer_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,19 @@ class StreamPeerConnection extends Disposable {
try {
final localOffer = await pc.createOffer(mediaConstraints);
final modifiedSdp = sdpEditor.edit(localOffer.sdp?.let(Sdp.localOffer));

if (modifiedSdp == null || modifiedSdp.isEmpty) {
_logger.w(() => '[createOffer] rejected (SDP is null/empty)');
return Result.error('createOffer produced null/empty SDP');
}

final modifiedOffer = localOffer.copyWith(sdp: modifiedSdp);

await setLocalDescription(modifiedOffer);
final setResult = await setLocalDescription(modifiedOffer);
if (setResult is Failure) {
return Result.failure(setResult.error);
}

return Result.success(modifiedOffer);
} catch (e, stk) {
return Result.failure(VideoErrors.compose(e, stk));
Expand All @@ -200,11 +210,23 @@ class StreamPeerConnection extends Disposable {
);
final localAnswer = await pc.createAnswer(mediaConstraints);
final modifiedSdp = sdpEditor.edit(localAnswer.sdp?.let(Sdp.localAnswer));

if (modifiedSdp == null || modifiedSdp.isEmpty) {
_logger.w(
() => '[createLocalAnswer] #$type; rejected (SDP is null/empty)',
);
return Result.error('createAnswer produced null/empty SDP');
}

final modifiedAnswer = localAnswer.copyWith(sdp: modifiedSdp);
_logger.v(
() => '[createLocalAnswer] #$type; sdp:\n${modifiedAnswer.sdp}',
);
await setLocalDescription(modifiedAnswer);

final setResult = await setLocalDescription(modifiedAnswer);
if (setResult is Failure) {
return Result.failure(setResult.error);
}
return Result.success(modifiedAnswer);
} catch (e, stk) {
return Result.failure(VideoErrors.compose(e, stk));
Expand Down Expand Up @@ -261,6 +283,29 @@ class StreamPeerConnection extends Disposable {
}
}

/// Rolls back the local description to the stable state if the peer
/// connection is currently in the `have-local-offer` signaling state.
Future<Result<void>> rollbackLocalDescription() async {
try {
final state = pc.signalingState;
if (state != rtc.RTCSignalingState.RTCSignalingStateHaveLocalOffer) {
return const Result.success(null);
}

await pc.setLocalDescription(
rtc.RTCSessionDescription('', 'rollback'),
);

return const Result.success(null);
} catch (e, stk) {
_logger.w(
() => '[rollbackLocalDescription] #$type; failed: $e',
);

return Result.failure(VideoErrors.compose(e, stk));
}
}

/// Adds an ice candidate to the peer connection.
///
/// If the peer connection is not yet ready, the candidate is added to a list
Expand Down Expand Up @@ -317,6 +362,11 @@ class StreamPeerConnection extends Disposable {
),
);

final params = transceiver.sender.parameters;
params.degradationPreference =
rtc.RTCDegradationPreference.MAINTAIN_FRAMERATE;
await transceiver.sender.setParameters(params);

return Result.success(transceiver);
} catch (e, stk) {
return Result.failure(VideoErrors.compose(e, stk));
Expand Down
4 changes: 2 additions & 2 deletions packages/stream_video/lib/src/webrtc/transceiver_cache.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class TransceiverManager {
}

/// Gets the last transceiver for the given track type and publish option id.
RTCRtpTransceiver? getWith(SfuTrackType trackType, int id) {
return _findTransceiver(trackType, id)?.transceiver;
RTCRtpTransceiver? getWith(SfuTrackType trackType, int publishOptionId) {
return _findTransceiver(trackType, publishOptionId)?.transceiver;
}

/// Checks if the cache has the given publish option.
Expand Down