Skip to content

7966: impl Stream for std::collections::*#77

Open
martin-augment wants to merge 6 commits intomasterfrom
pr-7966-2026-03-12-06-22-02
Open

7966: impl Stream for std::collections::*#77
martin-augment wants to merge 6 commits intomasterfrom
pr-7966-2026-03-12-06-22-02

Conversation

@martin-augment
Copy link
Copy Markdown
Owner

7966: To review by AI

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the tokio-stream crate by extending its StreamExt::collect functionality. It allows for seamless conversion of stream data into a broader range of standard Rust collection types, making stream processing more flexible and convenient for developers working with asynchronous data flows.

Highlights

  • New FromStream Implementations: The FromStream trait has been implemented for several additional std::collections types, including VecDeque, LinkedList, BTreeMap, HashSet, HashMap, and BinaryHeap.
  • Expanded Collection Support: Users can now directly collect items from a Stream into these new collection types using StreamExt::collect().
  • Comprehensive Testing: New asynchronous tests have been added to verify the correct functionality of collect() for each of the newly supported collection types.
Changelog
  • tokio-stream/src/stream_ext/collect.rs
    • Added necessary std::collections and std::hash::Hash imports.
    • Implemented FromStream and sealed::FromStreamPriv for VecDeque<T>.
    • Implemented FromStream and sealed::FromStreamPriv for LinkedList<T>.
    • Implemented FromStream and sealed::FromStreamPriv for BTreeMap<K, V>.
    • Implemented FromStream and sealed::FromStreamPriv for HashSet<T>.
    • Implemented FromStream and sealed::FromStreamPriv for HashMap<K, V>.
    • Implemented FromStream and sealed::FromStreamPriv for BinaryHeap<T>.
  • tokio-stream/tests/stream_collect.rs
    • Updated std::collections imports to include new types.
    • Added collect_vecdeque_items test.
    • Added collect_linkedlist_items test.
    • Added collect_btreemap_items test.
    • Added collect_hashset_items test.
    • Added collect_hashmap_items test.
    • Added collect_binaryheap_items test.
Activity
  • The pull request was opened by martin-augment with the title "7966: impl Stream for std::collections::*".
  • The author has requested review by AI.
  • No human review activity or comments have been recorded yet.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 12, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bc2b9b7c-eab4-45f8-b93a-3a9a0d2b6454

📥 Commits

Reviewing files that changed from the base of the PR and between a0ba4e8 and 236154d.

📒 Files selected for processing (2)
  • tokio-stream/src/stream_ext/collect.rs
  • tokio-stream/tests/stream_collect.rs

Walkthrough

The pull request adds support for collecting streams into additional standard library container types. Implementations of FromStream and corresponding sealed trait methods are added for VecDeque<T>, LinkedList<T>, BTreeMap<K, V>, HashSet<T>, HashMap<K, V>, and BinaryHeap<T>, extending the existing Vec<T> and BTreeSet<T> support. Each container implements initialize, extend, and finalize methods, with map-type containers providing key/value pair handling. Updated imports include the new collection types and Hash trait. Test coverage is expanded with new test cases validating collection functionality for each new type.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-7966-2026-03-12-06-22-02

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds FromStream implementations for several standard library collections, allowing them to be used with StreamExt::collect. The new collections supported are VecDeque, LinkedList, BTreeMap, HashSet, HashMap, and BinaryHeap. The implementations correctly use size hints for allocation where possible and are accompanied by a comprehensive set of tests. My feedback focuses on improving the maintainability of the newly added test code by reducing duplication.

Comment on lines +67 to +106
async fn collect_vecdeque_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<VecDeque<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(VecDeque::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_linkedlist_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<LinkedList<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(LinkedList::from([1, 2]), coll);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These new tests for VecDeque and LinkedList are very similar and contain a lot of boilerplate, a pattern that is repeated for the other collections added in this PR. To improve maintainability and reduce this duplication, consider using a macro to generate the test cases.

This would make the test suite more concise and easier to extend. I've added a similar comment on the other block of new tests with a concrete example.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! There is a lot of duplicated logic that could be extracted into a macro and reused for all collection types. This would prevent fixing/improving the same thing multiple times.

Comment on lines +130 to +211
async fn collect_btreemap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<BTreeMap<i32, i32>>());

assert_pending!(fut.poll());

tx.send((3, 4)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send((1, 2)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(BTreeMap::from([(1, 2), (3, 4)]), coll);
}

#[tokio::test]
async fn collect_hashset_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<HashSet<i32>>());

assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(HashSet::from([1, 2]), coll);
}

#[tokio::test]
async fn collect_hashmap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<HashMap<i32, i32>>());

assert_pending!(fut.poll());

tx.send((1, 2)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send((3, 4)).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(HashMap::from([(1, 2), (3, 4)]), coll);
}

#[tokio::test]
async fn collect_binaryheap_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<BinaryHeap<i32>>());

assert_pending!(fut.poll());

tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());

drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(vec![1, 2], coll.into_sorted_vec());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

As mentioned for the other new tests, there is significant code duplication here. Using a macro would greatly improve the maintainability of these tests.

Here is an example of how such a macro could look and be used:

macro_rules! test_collect {
    ($name:ident, $ty:ty, $items:expr, $expected:expr) => {
        #[tokio::test]
        async fn $name() {
            let (tx, rx) = mpsc::unbounded_channel_stream();
            let mut fut = task::spawn(rx.collect::<$ty>());

            assert_pending!(fut.poll());

            for item in $items {
                tx.send(item).unwrap();
                assert!(fut.is_woken());
                assert_pending!(fut.poll());
            }

            drop(tx);
            assert!(fut.is_woken());
            let coll = assert_ready!(fut.poll());
            assert_eq!($expected, coll);
        }
    };
}

// Usage:
test_collect!(
    collect_btreemap_items,
    BTreeMap<i32, i32>,
    vec![(3, 4), (1, 2)],
    BTreeMap::from([(1, 2), (3, 4)])
);

Tests with special assertion logic, like for BinaryHeap, might need to remain as separate functions.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! There is a lot of duplicated logic that could be extracted into a macro and reused for all collection types. This would prevent fixing/improving the same thing multiple times.

@augmentcode
Copy link
Copy Markdown

augmentcode bot commented Mar 12, 2026

🤖 Augment PR Summary

Summary: This PR expands StreamExt::collect() support to more standard library collection types.

Changes:

  • Added FromStream / FromStreamPriv implementations for VecDeque, LinkedList, BTreeMap, HashSet, HashMap, and BinaryHeap.
  • Implemented per-collection initialization and extension logic (e.g., with_capacity(lower) where applicable, push_back for VecDeque, insert for maps/sets, push for BinaryHeap).
  • Updated test coverage to validate collecting into each newly supported collection type using the existing mpsc stream harness.

Technical Notes: Map/set implementations use insert semantics (matching typical iterator collection behavior), and capacity preallocation is based on the stream’s size_hint() lower bound where the collection supports it.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. No suggestions at this time.

Comment augment review to trigger a new review at any time.

@claude
Copy link
Copy Markdown

claude bot commented Mar 12, 2026

Code Review

Overall this is a clean, well-structured PR that extends FromStream to the remaining standard library collection types. The implementations are correct, consistent with existing patterns, and well-tested.

PR Title Misnomer: The title says impl Stream for std::collections but the trait being implemented is FromStream, not Stream. Stream is a producer, FromStream is a consumer/collector. Consider updating the PR title accordingly.

Missing CHANGELOG Entry: tokio-stream/Cargo.toml explicitly notes Update CHANGELOG.md as a pre-release step. This PR adds six new FromStream impls and should include a CHANGELOG entry.

Undocumented Duplicate Key Behavior: For HashMap and BTreeMap, inserting a duplicate key silently overwrites the previous value (last write wins). A test covering the duplicate-key case would make the behavior clear to future readers.

Missing Empty-Stream Tests (non-blocking): The existing suite has empty_vec, empty_box_slice, empty_string, and empty_result. A few empty-stream tests for the new types would align with the existing test style.

What Looks Good: All six implementations correctly follow the sealed FromStreamPriv pattern. with_capacity(lower) is used where available and omitted where not. The BinaryHeap test uses into_sorted_vec() for deterministic assertion. The BTreeMap test sends keys out-of-order and asserts the sorted result. std::hash::Hash is correctly added for HashSet and HashMap impls. Tests for all six new types follow the exact polling/wakeup pattern used by existing tests.

Summary: The implementation is correct and well-structured. The main items to address are the PR title and the missing CHANGELOG entry. Documenting or testing the duplicate-key behavior for map types would also be a worthwhile improvement.

Repository owner deleted a comment from claude bot Mar 12, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants