Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions tokio-stream/src/stream_ext/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use core::mem;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use pin_project_lite::pin_project;
use std::collections::BTreeSet;

// Do not export this struct until `FromStream` can be unsealed.
pin_project! {
Expand Down Expand Up @@ -135,6 +136,25 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
}
}

impl<T: Ord> FromStream<T> for BTreeSet<T> {}

impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
type InternalCollection = BTreeSet<T>;

fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeSet<T> {
BTreeSet::new()
}

fn extend(_: sealed::Internal, collection: &mut BTreeSet<T>, item: T) -> bool {
collection.insert(item);
true
}

fn finalize(_: sealed::Internal, collection: &mut BTreeSet<T>) -> BTreeSet<T> {
mem::take(collection)
}
}

impl<T> FromStream<T> for Box<[T]> {}

impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
Expand Down
23 changes: 23 additions & 0 deletions tokio-stream/tests/stream_collect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use tokio_stream::{self as stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};

Expand Down Expand Up @@ -61,6 +63,27 @@ async fn collect_vec_items() {
assert_eq!(vec![1, 2], coll);
}

#[tokio::test]
async fn collect_btreeset_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<BTreeSet<i32>>());

assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
Comment on lines +77 to +79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This test correctly verifies ordering. To make it more comprehensive, consider also verifying that BTreeSet's uniqueness property is handled correctly by sending a duplicate item.

Suggested change
tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
// Send a duplicate item.
tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! Currently the test verifies that the items in the BTreeSet are ordered (2 is sent first but it is printed after 1 in the final result). It would be good to extend the test with sending a duplicate (either 1 or 2) and verifying that there is only one occurrence of it in the result.


drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(BTreeSet::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_string_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
Expand Down