Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rtc/src/peer_connection/handler/datachannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::statistics::accumulator::RTCStatsAccumulator;
use log::{debug, warn};
use sctp::PayloadProtocolIdentifier;
use shared::TransportContext;
use shared::error::{Error, Result};
use shared::error::{Error, Result, flatten_errs};
use std::collections::{HashMap, VecDeque};
use std::time::Instant;

Expand Down Expand Up @@ -329,6 +329,13 @@ impl<'a> sansio::Protocol<TaggedRTCMessageInternal, TaggedRTCMessageInternal, RT
}

fn close(&mut self) -> Result<()> {
Ok(())
let mut errs = vec![];
for dc in self.data_channels.values_mut() {
if let Err(e) = dc.close() {
errs.push(e);
}
}
self.data_channels.clear();
flatten_errs(errs)
}
}
51 changes: 8 additions & 43 deletions rtc/src/peer_connection/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,51 +388,16 @@ where
handler.close()?;
}));

// W3C WebRTC §close steps #4–#10 are implemented in individual handler close() methods:
// InterceptorHandler::close() → interceptor.close()
// DataChannelHandler::close() → closes all RTCDataChannelInternal instances
// SctpHandler::close() → closes all SCTP associations + endpoint
// DtlsHandler::close() → dtls_transport.stop()
// IceHandler::close() → ice_transport.agent.close()
//
// The for_each_handler!(forward: ...) loop above invokes each handler's close() in order.
let close_errs: Vec<Error> = vec![];

/* TODO:
if let Err(err) = self.interceptor.close().await {
close_errs.push(Error::new(format!("interceptor: {err}")));
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
{
let mut rtp_transceivers = self.internal.rtp_transceivers.lock().await;
for t in &*rtp_transceivers {
if let Err(err) = t.stop().await {
close_errs.push(Error::new(format!("rtp_transceivers: {err}")));
}
}
rtp_transceivers.clear();
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5)
{
let mut data_channels = self.internal.sctp_transport.data_channels.lock().await;
for d in &*data_channels {
if let Err(err) = d.close().await {
close_errs.push(Error::new(format!("data_channels: {err}")));
}
}
data_channels.clear();
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6)
if let Err(err) = self.internal.sctp_transport.stop().await {
close_errs.push(Error::new(format!("sctp_transport: {err}")));
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7)
if let Err(err) = self.internal.dtls_transport.stop().await {
close_errs.push(Error::new(format!("dtls_transport: {err}")));
}

// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
if let Err(err) = self.internal.ice_transport.stop().await {
close_errs.push(Error::new(format!("ice_transport: {err}")));
}
*/

self.update_connection_state(true);

flatten_errs(close_errs)
Expand Down
17 changes: 14 additions & 3 deletions rtc/src/peer_connection/handler/sctp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use datachannel::message::Message;
use datachannel::message::message_channel_threshold::DataChannelThreshold;
use log::{debug, warn};
use sctp::{
AssociationEvent, AssociationHandle, ClientConfig, DatagramEvent, EndpointEvent, Event,
Payload, PayloadProtocolIdentifier, StreamEvent,
AssociationError, AssociationEvent, AssociationHandle, ClientConfig, DatagramEvent,
EndpointEvent, Event, Payload, PayloadProtocolIdentifier, StreamEvent,
};
use shared::error::{Error, Result};
use shared::marshal::Unmarshal;
Expand Down Expand Up @@ -467,7 +467,18 @@ impl<'a> sansio::Protocol<TaggedRTCMessageInternal, TaggedRTCMessageInternal, RT
}

fn close(&mut self) -> Result<()> {
Ok(())
// Close all SCTP associations and drain the endpoint so state machines shut down cleanly.
let sctp = &mut self.ctx.sctp_transport;
let mut errs = vec![];
for (_, assoc) in sctp.sctp_associations.iter_mut() {
if let Err(e) = assoc.close(AssociationError::LocallyClosed) {
errs.push(e);
}
}
sctp.sctp_associations.clear();
sctp.sctp_endpoint = None;

shared::error::flatten_errs(errs)
}
}

Expand Down