Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 15 additions & 17 deletions rtc-datachannel/src/data_channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,21 @@ impl DataChannel {
) -> Result<Self> {
let mut data_channel = DataChannel::new(config.clone(), association_handle, stream_id);

if !config.negotiated {
let msg = Message::DataChannelOpen(DataChannelOpen {
channel_type: config.channel_type,
priority: config.priority,
reliability_parameter: config.reliability_parameter,
label: config.label.bytes().collect(),
protocol: config.protocol.bytes().collect(),
})
.marshal()?;

data_channel.write_outs.push_back(DataChannelMessage {
association_handle,
stream_id,
ppi: PayloadProtocolIdentifier::Dcep,
payload: msg,
});
}
let msg = Message::DataChannelOpen(DataChannelOpen {
channel_type: config.channel_type,
priority: config.priority,
reliability_parameter: config.reliability_parameter,
label: config.label.bytes().collect(),
protocol: config.protocol.bytes().collect(),
})
.marshal()?;

data_channel.write_outs.push_back(DataChannelMessage {
association_handle,
stream_id,
ppi: PayloadProtocolIdentifier::Dcep,
payload: msg,
});

Ok(data_channel)
}
Expand Down
60 changes: 54 additions & 6 deletions rtc-sctp/src/association/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ use crate::param::{
use crate::queue::{payload_queue::PayloadQueue, pending_queue::PendingQueue};
use crate::shared::{AssociationEventInner, AssociationId, EndpointEvent, EndpointEventInner};
use crate::util::{sna16lt, sna32gt, sna32gte, sna32lt, sna32lte};
use crate::{AssociationEvent, Payload, Side};
use crate::{AssociationEvent, FlushIds, Payload, Side};
use shared::error::{Error, Result};
use shared::{TransportContext, TransportMessage, TransportProtocol};
use stream::{ReliabilityType, Stream, StreamEvent, StreamId, StreamState};
use timer::{ACK_INTERVAL, RtoManager, Timer, TimerTable};

use crate::association::stream::RecvSendState;
use crate::queue::pending_queue::{FlushEntry, QueueEntry};
use bytes::Bytes;
use log::{debug, error, trace, warn};
use rand::random;
Expand Down Expand Up @@ -429,6 +430,22 @@ impl Association {
/// - a call was made to `handle_timeout`
#[must_use]
pub fn poll_transmit(&mut self, now: Instant) -> Option<TransportMessage<Payload>> {

// first, see if the next queue entry is a flush signal
if let Some(ids) = self.pop_pending_flush() {
trace!("polled flush({})", ids.flush_id);
return Some(TransportMessage {
now,
transport: TransportContext {
local_addr: self.local_addr,
peer_addr: self.remote_addr,
ecn: None,
transport_protocol: Default::default(),
},
message: Payload::Flush(ids),
});
}

let (contents, _) = self.gather_outbound(now);
if contents.is_empty() {
None
Expand Down Expand Up @@ -2375,6 +2392,24 @@ impl Association {
self.bundle_data_chunks_into_packets(chunks)
}

fn pop_pending_flush(&mut self) -> Option<FlushIds> {

// if the first queue entry is a flush signal, pop it off
if let Some(QueueEntry::Flush(e)) = self.pending_queue.peek() {
let unordered = e.unordered;
match self.pending_queue.pop(true, unordered) {
Some(QueueEntry::Flush(e)) => Some(e.ids),
_ => None
}
} else {
None
}

// TODO: is popping off the pending queue enough to guarantee all the previous messages
// have been written to the final output queue?
// TODO: pop multiple consecutive flush signals?
}

/// pop_pending_data_chunks_to_send pops chunks from the pending queues as many as
/// the cwnd and rwnd allows to send.
fn pop_pending_data_chunks_to_send(
Expand All @@ -2392,7 +2427,7 @@ impl Association {
// is 0), the data sender can always have one DATA chunk in flight to
// the receiver if allowed by cwnd (see rule B, below).

while let Some(c) = self.pending_queue.peek() {
while let Some(QueueEntry::Payload(c)) = self.pending_queue.peek() {
let (beginning_fragment, unordered, data_len, stream_identifier) = (
c.beginning_fragment,
c.unordered,
Expand Down Expand Up @@ -2434,7 +2469,7 @@ impl Association {
// the data sender can always have one DATA chunk in flight to the receiver
if chunks.is_empty() && self.inflight_queue.is_empty() {
// Send zero window probe
if let Some(c) = self.pending_queue.peek() {
if let Some(QueueEntry::Payload(c)) = self.pending_queue.peek() {
let (beginning_fragment, unordered) = (c.beginning_fragment, c.unordered);

if let Some(chunk) = self.move_pending_data_chunk_to_inflight_queue(
Expand Down Expand Up @@ -2608,7 +2643,7 @@ impl Association {
unordered: bool,
now: Instant,
) -> Option<ChunkPayloadData> {
if let Some(mut c) = self.pending_queue.pop(beginning_fragment, unordered) {
if let Some(QueueEntry::Payload(mut c)) = self.pending_queue.pop(beginning_fragment, unordered) {
// Mark all fragements are in-flight now
if c.ending_fragment {
c.set_all_inflight();
Expand Down Expand Up @@ -2665,7 +2700,7 @@ impl Association {
..Default::default()
};

self.pending_queue.push(c);
self.pending_queue.push(QueueEntry::Payload(c));
self.awake_write_loop();

Ok(())
Expand All @@ -2680,9 +2715,22 @@ impl Association {

// Push the chunks into the pending queue first.
for c in chunks {
self.pending_queue.push(c);
self.pending_queue.push(QueueEntry::Payload(c));
}

self.awake_write_loop();
Ok(())
}

pub(crate) fn send_flush(&mut self, ids: FlushIds, unordered: bool) -> Result<()> {

let state = self.state();
if state != AssociationState::Established {
return Err(Error::ErrPayloadDataStateNotExist);
}

self.pending_queue.push(QueueEntry::Flush(FlushEntry { ids, unordered }));

self.awake_write_loop();
Ok(())
}
Expand Down
18 changes: 17 additions & 1 deletion rtc-sctp/src/association/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::association::Association;
use crate::association::state::AssociationState;
use crate::chunk::chunk_payload_data::{ChunkPayloadData, PayloadProtocolIdentifier};
use crate::queue::reassembly_queue::{Chunks, ReassemblyQueue};
use crate::{ErrorCauseCode, Event, Side};
use crate::{ErrorCauseCode, Event, FlushIds, Side};
use shared::error::{Error, Result};

use crate::util::{ByteSlice, BytesArray, BytesSource};
Expand Down Expand Up @@ -204,6 +204,22 @@ impl Stream<'_> {
}
}

/// Pushes a flush signal into the stream, which can be collected later by polling
/// the connection after all previous messages are processed.
/// The flush signal is not sent to the remote peer.
pub fn flush(&mut self, ids: FlushIds) -> Result<()> {

if !self.is_writable() {
return Err(Error::ErrStreamClosed);
}

let Some(s) = self.association.streams.get_mut(&self.stream_identifier)
else { return Err(Error::ErrStreamClosed); };
let unordered = s.unordered;

self.association.send_flush(ids, unordered)
}

pub fn is_readable(&self) -> bool {
if let Some(s) = self.association.streams.get(&self.stream_identifier) {
s.state == RecvSendState::Readable || s.state == RecvSendState::ReadWritable
Expand Down
10 changes: 10 additions & 0 deletions rtc-sctp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,14 @@ use crate::packet::PartialDecode;
pub enum Payload {
PartialDecode(PartialDecode),
RawEncode(Vec<Bytes>),
Flush(FlushIds)
}


#[derive(Debug, Clone)]
pub struct FlushIds {
pub flush_id: i64,
pub data_channel_id: u16,
pub association_handle: usize,
pub stream_id: u16
}
91 changes: 72 additions & 19 deletions rtc-sctp/src/queue/pending_queue.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::chunk::chunk_payload_data::ChunkPayloadData;
use crate::FlushIds;

use std::collections::VecDeque;

/// pendingBaseQueue
pub(crate) type PendingBaseQueue = VecDeque<ChunkPayloadData>;
pub(crate) type PendingBaseQueue = VecDeque<QueueEntry>;

/// pendingQueue
#[derive(Debug, Default)]
Expand All @@ -21,17 +22,17 @@ impl PendingQueue {
PendingQueue::default()
}

pub(crate) fn push(&mut self, c: ChunkPayloadData) {
self.n_bytes += c.user_data.len();
if c.unordered {
self.unordered_queue.push_back(c);
pub(crate) fn push(&mut self, e: QueueEntry) {
self.n_bytes += e.len();
if e.unordered() {
self.unordered_queue.push_back(e);
} else {
self.ordered_queue.push_back(c);
self.ordered_queue.push_back(e);
}
self.queue_len += 1;
}

pub(crate) fn peek(&self) -> Option<&ChunkPayloadData> {
pub(crate) fn peek(&self) -> Option<&QueueEntry> {
if self.selected {
if self.unordered_is_selected {
return self.unordered_queue.front();
Expand All @@ -40,10 +41,10 @@ impl PendingQueue {
}
}

let c = self.unordered_queue.front();
let e = self.unordered_queue.front();

if c.is_some() {
return c;
if e.is_some() {
return e;
}

self.ordered_queue.front()
Expand All @@ -53,15 +54,15 @@ impl PendingQueue {
&mut self,
beginning_fragment: bool,
unordered: bool,
) -> Option<ChunkPayloadData> {
) -> Option<QueueEntry> {
let popped = if self.selected {
let popped = if self.unordered_is_selected {
self.unordered_queue.pop_front()
} else {
self.ordered_queue.pop_front()
};
if let Some(p) = &popped
&& p.ending_fragment
if let Some(e) = &popped
&& e.ending_fragment() == Some(true)
{
self.selected = false;
}
Expand All @@ -72,17 +73,17 @@ impl PendingQueue {
}
if unordered {
let popped = { self.unordered_queue.pop_front() };
if let Some(p) = &popped
&& !p.ending_fragment
if let Some(e) = &popped
&& e.ending_fragment() == Some(false)
{
self.selected = true;
self.unordered_is_selected = true;
}
popped
} else {
let popped = { self.ordered_queue.pop_front() };
if let Some(p) = &popped
&& !p.ending_fragment
if let Some(e) = &popped
&& e.ending_fragment() == Some(false)
{
self.selected = true;
self.unordered_is_selected = false;
Expand All @@ -91,8 +92,8 @@ impl PendingQueue {
}
};

if let Some(p) = &popped {
self.n_bytes -= p.user_data.len();
if let Some(e) = &popped {
self.n_bytes -= e.len();
self.queue_len -= 1;
}

Expand All @@ -111,3 +112,55 @@ impl PendingQueue {
self.len() == 0
}
}


#[derive(Debug)]
pub(crate) struct FlushEntry {
pub(crate) ids: FlushIds,
pub(crate) unordered: bool
}

/// A queue entry can either be a chunk payload, or a flush signal
#[derive(Debug)]
pub(crate) enum QueueEntry {
Payload(ChunkPayloadData),
Flush(FlushEntry)
}

impl QueueEntry {

fn len(&self) -> usize {
match self {
Self::Payload(data) => data.user_data.len(),
Self::Flush(_) => 0
}
}

fn unordered(&self) -> bool {
match self {
Self::Payload(data) => data.unordered,
Self::Flush(flush) => flush.unordered
}
}

fn ending_fragment(&self) -> Option<bool> {
match self {
Self::Payload(data) => Some(data.ending_fragment),
Self::Flush(_) => None
}
}

pub fn as_payload(&self) -> &ChunkPayloadData {
match self {
Self::Payload(data) => data,
Self::Flush(_) => panic!("Expected QueueEntry::Payload, but was QueueEntry::Flush instead")
}
}

pub fn into_payload(self) -> ChunkPayloadData {
match self {
Self::Payload(data) => data,
Self::Flush(_) => panic!("Expected QueueEntry::Payload, but was QueueEntry::Flush instead")
}
}
}
Loading