7971: stream: add StreamExt::filter_map_async#78
7971: stream: add StreamExt::filter_map_async#78martin-augment wants to merge 1 commit intomasterfrom
StreamExt::filter_map_async#78Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request adds a new asynchronous filter-map stream combinator to tokio-stream. The change introduces the FilterMapAsync type and a corresponding filter_map_async method to the StreamExt trait. The implementation includes a new module containing the FilterMapAsync struct, which applies an async function to each stream item, maintaining a single in-flight future at a time and emitting Some values while skipping None results. The new combinator is re-exported in the adapters module, making it part of the public API. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces StreamExt::filter_map_async, a useful addition for asynchronous stream processing. The implementation is mostly solid. I've pointed out a couple of issues: one is a high-severity problem in the documentation example which uses syntax not available in the crate's MSRV, and the other is a medium-severity suggestion to improve code clarity in the implementation of poll_next. Once these are addressed, this will be a great contribution.
| /// let odds = stream.filter_map_async(async |x| { | ||
| /// if x % 2 == 0 { Some(x + 1) } else { None } | ||
| /// }); |
There was a problem hiding this comment.
The async |x| { ... } syntax for async closures was stabilized in Rust 1.79. Given that tokio-stream has a Minimum Supported Rust Version (MSRV) of 1.71 (as per Cargo.toml), this example will fail to compile for users on the MSRV. Please use the |x| async move { ... } syntax which is compatible with older Rust versions.
| /// let odds = stream.filter_map_async(async |x| { | |
| /// if x % 2 == 0 { Some(x + 1) } else { None } | |
| /// }); | |
| /// let odds = stream.filter_map_async(|x| async move { | |
| /// if x % 2 == 0 { Some(x + 1) } else { None } | |
| /// }); |
There was a problem hiding this comment.
value:valid-but-wont-fix; category:bug; feedback: The Gemini AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it
| if let Some(item) = ready!(future.poll(cx)) { | ||
| me.future.set(None); | ||
| return Poll::Ready(Some(item)); | ||
| } else { | ||
| me.future.set(None); | ||
| } |
There was a problem hiding this comment.
The call to me.future.set(None) is duplicated in both branches of the if-else statement. This can be simplified by hoisting the call out to improve readability and reduce redundancy.
let result = ready!(future.poll(cx));
me.future.set(None);
if let Some(item) = result {
return Poll::Ready(Some(item));
}There was a problem hiding this comment.
value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The same statement is executed in both 'if' and 'else' bodies, so it could be extracted before the if. It won't fix any bug but it will be easier to read and maintain
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
|
|
||
| fn size_hint(&self) -> (usize, Option<usize>) { | ||
| (0, self.stream.size_hint().1) | ||
| } |
There was a problem hiding this comment.
size_hint upper bound ignores in-flight future
Low Severity
The size_hint upper bound doesn't account for an in-flight future. When an item has been consumed from the inner stream and a future is pending, self.stream.size_hint().1 no longer counts that item, but FilterMapAsync could still yield it if the future resolves to Some. This makes the reported upper bound potentially one less than the true maximum, violating the size_hint contract. The analogous Then adapter correctly adds future_len to the bounds.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! The logic should take into account also the current future as the others combinators do. Since this is hint it does not need to be exact but it would be nice to use a more correct value if it is not too complex to calculate it.
| /// | ||
| /// let stream = stream::iter(0..=7); | ||
| /// let odds = stream.filter_map_async(async |x| { | ||
| /// if x % 2 == 0 { Some(x + 1) } else { None } |
There was a problem hiding this comment.
Doc example uses async closure syntax incompatible with MSRV
Medium Severity
The doc example uses async |x| { ... } (async closure syntax), which was stabilized in Rust 1.85. The crate declares rust-version = "1.71" in its Cargo.toml. This is the only use of async closure syntax in the entire tokio-stream crate. The analogous then method's doc example uses a named async fn instead. The idiomatic pre-1.85 pattern is |x| async move { ... }.
There was a problem hiding this comment.
value:valid-but-wont-fix; category:bug; feedback: The Bugbot AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it
🤖 Augment PR SummarySummary: Adds a new Changes:
Technical Notes: The adapter runs at most one future at a time and may be 🤖 Was this summary useful? React with 👍 or 👎 |
| /// use tokio_stream::{self as stream, StreamExt}; | ||
| /// | ||
| /// let stream = stream::iter(0..=7); | ||
| /// let odds = stream.filter_map_async(async |x| { |
There was a problem hiding this comment.
value:valid-but-wont-fix; category:bug; feedback: The Augment AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it
| } | ||
|
|
||
| fn size_hint(&self) -> (usize, Option<usize>) { | ||
| (0, self.stream.size_hint().1) |
There was a problem hiding this comment.
size_hint()’s upper bound ignores whether self.future is currently Some, so it can under-report (e.g., stream upper is Some(0) but an in-flight future may still yield one item). That violates the size_hint upper-bound contract for streams.
Severity: medium
🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.
There was a problem hiding this comment.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! The logic should take into account also the current future as the others combinators do. Since this is hint it does not need to be exact but it would be nice to use a more correct value if it is not too complex to calculate it.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tokio-stream/src/stream_ext.rs`:
- Around line 510-512: The doctest snippet for filter_map_async uses the
unstable `async |x|` closure syntax; update the example to MSRV-compatible
syntax by replacing the `async |x| { ... }` closure with `|x| async move { ...
}` so code in the doctest (the example under stream.filter_map_async) compiles
on Rust 1.71–1.85; ensure you keep the same body logic and include `move` to
avoid borrow issues.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5467696f-7f8c-4561-a145-2d3db34d73a4
📒 Files selected for processing (3)
tokio-stream/src/lib.rstokio-stream/src/stream_ext.rstokio-stream/src/stream_ext/filter_map_async.rs
| /// let odds = stream.filter_map_async(async |x| { | ||
| /// if x % 2 == 0 { Some(x + 1) } else { None } | ||
| /// }); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== rust-version declarations =="
fd '^Cargo.toml$' -a | while read -r f; do
if rg -n '^\s*rust-version\s*=' "$f" >/dev/null; then
echo "-- $f"
rg -n '^\s*(name|version|rust-version)\s*=' "$f"
fi
done
echo
echo "== async-closure syntax occurrences in tokio-stream =="
rg -nP '\basync\s*\|' tokio-stream/srcRepository: martin-augment/tokio
Length of output: 835
🌐 Web query:
Rust async closure syntax stabilization version
💡 Result:
Rust’s async closure syntax (e.g., async |args| { ... } / async move |args| { ... }) was stabilized in Rust 1.85.0, released February 20, 2025. [1], [2]
Before 1.85, it was only available on nightly behind #![feature(async_closure)] (tracking issue: rust-lang/rust #62290). [3]
Sources:
[1] Rust Blog: “Announcing Rust 1.85.0 and Rust 2024” (2025-02-20)
[2] rust-lang/rust RELEASES.md (1.85.0): “Stabilize async closures”
[3] rust-lang/rust GitHub: Tracking issue #62290 (feature(async_closure))
Use MSRV-compatible closure syntax in the doctest example.
The async |x| syntax at line 510 is incompatible with tokio-stream's MSRV of 1.71. This syntax was only stabilized in Rust 1.85.0; it is not available in the supported range. Use |x| async move { ... } instead, which is supported across all MSRV-compatible versions.
Suggested fix
-/// let odds = stream.filter_map_async(async |x| {
+/// let odds = stream.filter_map_async(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// let odds = stream.filter_map_async(async |x| { | |
| /// if x % 2 == 0 { Some(x + 1) } else { None } | |
| /// }); | |
| /// let odds = stream.filter_map_async(|x| async move { | |
| /// if x % 2 == 0 { Some(x + 1) } else { None } | |
| /// }); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tokio-stream/src/stream_ext.rs` around lines 510 - 512, The doctest snippet
for filter_map_async uses the unstable `async |x|` closure syntax; update the
example to MSRV-compatible syntax by replacing the `async |x| { ... }` closure
with `|x| async move { ... }` so code in the doctest (the example under
stream.filter_map_async) compiles on Rust 1.71–1.85; ensure you keep the same
body logic and include `move` to avoid borrow issues.
There was a problem hiding this comment.
value:valid-but-wont-fix; category:bug; feedback: The CodeRabbit AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it


7971: To review by AI