Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 116 additions & 1 deletion tokio-stream/src/stream_ext/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use core::mem;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use pin_project_lite::pin_project;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque};
use std::hash::Hash;

// Do not export this struct until `FromStream` can be unsealed.
pin_project! {
Expand Down Expand Up @@ -136,6 +137,44 @@ impl<T> sealed::FromStreamPriv<T> for Vec<T> {
}
}

impl<T> FromStream<T> for VecDeque<T> {}

impl<T> sealed::FromStreamPriv<T> for VecDeque<T> {
type InternalCollection = VecDeque<T>;

fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> VecDeque<T> {
VecDeque::with_capacity(lower)
}

fn extend(_: sealed::Internal, collection: &mut VecDeque<T>, item: T) -> bool {
collection.push_back(item);
true
}

fn finalize(_: sealed::Internal, collection: &mut VecDeque<T>) -> VecDeque<T> {
mem::take(collection)
}
}

impl<T> FromStream<T> for LinkedList<T> {}

impl<T> sealed::FromStreamPriv<T> for LinkedList<T> {
type InternalCollection = LinkedList<T>;

fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> LinkedList<T> {
LinkedList::new()
}

fn extend(_: sealed::Internal, collection: &mut LinkedList<T>, item: T) -> bool {
collection.push_back(item);
true
}

fn finalize(_: sealed::Internal, collection: &mut LinkedList<T>) -> LinkedList<T> {
mem::take(collection)
}
}

impl<T: Ord> FromStream<T> for BTreeSet<T> {}

impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
Expand All @@ -155,6 +194,82 @@ impl<T: Ord> sealed::FromStreamPriv<T> for BTreeSet<T> {
}
}

impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {}

impl<K: Ord, V> sealed::FromStreamPriv<(K, V)> for BTreeMap<K, V> {
type InternalCollection = BTreeMap<K, V>;

fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> BTreeMap<K, V> {
BTreeMap::new()
}

fn extend(_: sealed::Internal, collection: &mut BTreeMap<K, V>, (key, value): (K, V)) -> bool {
collection.insert(key, value);
true
}

fn finalize(_: sealed::Internal, collection: &mut BTreeMap<K, V>) -> BTreeMap<K, V> {
mem::take(collection)
}
}

impl<T: Eq + Hash> FromStream<T> for HashSet<T> {}

impl<T: Eq + Hash> sealed::FromStreamPriv<T> for HashSet<T> {
type InternalCollection = HashSet<T>;

fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashSet<T> {
HashSet::with_capacity(lower)
}

fn extend(_: sealed::Internal, collection: &mut HashSet<T>, item: T) -> bool {
collection.insert(item);
true
}

fn finalize(_: sealed::Internal, collection: &mut HashSet<T>) -> HashSet<T> {
mem::take(collection)
}
}

impl<K: Eq + Hash, V> FromStream<(K, V)> for HashMap<K, V> {}

impl<K: Eq + Hash, V> sealed::FromStreamPriv<(K, V)> for HashMap<K, V> {
type InternalCollection = HashMap<K, V>;

fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> HashMap<K, V> {
HashMap::with_capacity(lower)
}

fn extend(_: sealed::Internal, collection: &mut HashMap<K, V>, (key, value): (K, V)) -> bool {
collection.insert(key, value);
true
}

fn finalize(_: sealed::Internal, collection: &mut HashMap<K, V>) -> HashMap<K, V> {
mem::take(collection)
}
}

impl<T: Ord> FromStream<T> for BinaryHeap<T> {}

impl<T: Ord> sealed::FromStreamPriv<T> for BinaryHeap<T> {
type InternalCollection = BinaryHeap<T>;

fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> BinaryHeap<T> {
BinaryHeap::with_capacity(lower)
}

fn extend(_: sealed::Internal, collection: &mut BinaryHeap<T>, item: T) -> bool {
collection.push(item);
true
}

fn finalize(_: sealed::Internal, collection: &mut BinaryHeap<T>) -> BinaryHeap<T> {
mem::take(collection)
}
}

impl<T> FromStream<T> for Box<[T]> {}

impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
Expand Down
128 changes: 127 additions & 1 deletion tokio-stream/tests/stream_collect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque};

use tokio_stream::{self as stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
Expand Down Expand Up @@ -63,6 +63,48 @@ async fn collect_vec_items() {
assert_eq!(vec![1, 2], coll);
}

#[tokio::test]
async fn collect_vecdeque_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<VecDeque<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(VecDeque::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_linkedlist_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<LinkedList<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(LinkedList::from([1, 2]), coll);
}
Comment on lines +67 to +106
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These new tests for VecDeque and LinkedList are very similar and contain a lot of boilerplate, a pattern that is repeated for the other collections added in this PR. To improve maintainability and reduce this duplication, consider using a macro to generate the test cases.

This would make the test suite more concise and easier to extend. I've added a similar comment on the other block of new tests with a concrete example.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! There is a lot of duplicated logic that could be extracted into a macro and reused for all collection types. This would prevent fixing/improving the same thing multiple times.


#[tokio::test]
async fn collect_btreeset_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
Expand All @@ -84,6 +126,90 @@ async fn collect_btreeset_items() {
assert_eq!(BTreeSet::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_btreemap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<BTreeMap<i32, i32>>());

assert_pending!(fut.poll());

tx.send((3, 4)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send((1, 2)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(BTreeMap::from([(1, 2), (3, 4)]), coll);
}

#[tokio::test]
async fn collect_hashset_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<HashSet<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(HashSet::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_hashmap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<HashMap<i32, i32>>());

assert_pending!(fut.poll());

tx.send((1, 2)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send((3, 4)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(HashMap::from([(1, 2), (3, 4)]), coll);
}

#[tokio::test]
async fn collect_binaryheap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<BinaryHeap<i32>>());

assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(vec![1, 2], coll.into_sorted_vec());
}
Comment on lines +130 to +211
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

As mentioned for the other new tests, there is significant code duplication here. Using a macro would greatly improve the maintainability of these tests.

Here is an example of how such a macro could look and be used:

macro_rules! test_collect {
    ($name:ident, $ty:ty, $items:expr, $expected:expr) => {
        #[tokio::test]
        async fn $name() {
            let (tx, rx) = mpsc::unbounded_channel_stream();
            let mut fut = task::spawn(rx.collect::<$ty>());

            assert_pending!(fut.poll());

            for item in $items {
                tx.send(item).unwrap();
                assert!(fut.is_woken());
                assert_pending!(fut.poll());
            }

            drop(tx);
            assert!(fut.is_woken());
            let coll = assert_ready!(fut.poll());
            assert_eq!($expected, coll);
        }
    };
}

// Usage:
test_collect!(
    collect_btreemap_items,
    BTreeMap<i32, i32>,
    vec![(3, 4), (1, 2)],
    BTreeMap::from([(1, 2), (3, 4)])
);

Tests with special assertion logic, like for BinaryHeap, might need to remain as separate functions.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! There is a lot of duplicated logic that could be extracted into a macro and reused for all collection types. This would prevent fixing/improving the same thing multiple times.


#[tokio::test]
async fn collect_string_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
Expand Down
Loading