Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ pub use stream_ext::{collect::FromStream, StreamExt};
/// Adapters for [`Stream`]s created by methods in [`StreamExt`].
pub mod adapters {
pub use crate::stream_ext::{
Chain, Filter, FilterMap, Fuse, Map, MapWhile, Merge, Peekable, Skip, SkipWhile, Take,
TakeWhile, Then,
Chain, Filter, FilterMap, FilterMapAsync, Fuse, Map, MapWhile, Merge, Peekable, Skip,
SkipWhile, Take, TakeWhile, Then,
};
cfg_time! {
pub use crate::stream_ext::{ChunksTimeout, Timeout, TimeoutRepeating};
Expand Down
50 changes: 50 additions & 0 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use collect::{Collect, FromStream};
mod filter;
pub use filter::Filter;

mod filter_map_async;
pub use filter_map_async::FilterMapAsync;

mod filter_map;
pub use filter_map::FilterMap;

Expand Down Expand Up @@ -479,6 +482,53 @@ pub trait StreamExt: Stream {
FilterMap::new(self, f)
}

/// Filters the values produced by this stream asynchronously while
/// simultaneously mapping them to a different type according to the
/// provided async closure.
///
/// The provided closure is executed over all elements of this stream as
/// they are made available, and the returned future is executed. Only one
/// future is executed at the time. If the returned future resolves to
/// [`Some(item)`](Some) then the stream will yield the value `item`, but if
/// it resolves to [`None`], then the value will be skipped.
///
/// Note that this function consumes the stream passed into it and returns a
/// wrapped version of it, similar to [`Iterator::filter_map`] method in the
/// standard library.
///
/// Be aware that if the future is not `Unpin`, then neither is the `Stream`
/// returned by this method. To handle this, you can use `tokio::pin!` as in
/// the example below or put the stream in a `Box` with `Box::pin(stream)`.
///
/// # Examples
/// ```
/// # #[tokio::main(flavor = "current_thread")]
/// # async fn main() {
/// use tokio_stream::{self as stream, StreamExt};
///
/// let stream = stream::iter(0..=7);
/// let odds = stream.filter_map_async(async |x| {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example uses async |x| { ... } syntax, which won’t compile on Tokio’s MSRV (Rust 1.71). This can break rustdoc/doctests for filter_map_async’s docs.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The Augment AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it

/// if x % 2 == 0 { Some(x + 1) } else { None }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc example uses async closure syntax incompatible with MSRV

Medium Severity

The doc example uses async |x| { ... } (async closure syntax), which was stabilized in Rust 1.85. The crate declares rust-version = "1.71" in its Cargo.toml. This is the only use of async closure syntax in the entire tokio-stream crate. The analogous then method's doc example uses a named async fn instead. The idiomatic pre-1.85 pattern is |x| async move { ... }.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The Bugbot AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it

/// });
Comment on lines +510 to +512
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The async |x| { ... } syntax for async closures was stabilized in Rust 1.79. Given that tokio-stream has a Minimum Supported Rust Version (MSRV) of 1.71 (as per Cargo.toml), this example will fail to compile for users on the MSRV. Please use the |x| async move { ... } syntax which is compatible with older Rust versions.

Suggested change
/// let odds = stream.filter_map_async(async |x| {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
/// let odds = stream.filter_map_async(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The Gemini AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it

Comment on lines +510 to +512
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== rust-version declarations =="
fd '^Cargo.toml$' -a | while read -r f; do
  if rg -n '^\s*rust-version\s*=' "$f" >/dev/null; then
    echo "-- $f"
    rg -n '^\s*(name|version|rust-version)\s*=' "$f"
  fi
done

echo
echo "== async-closure syntax occurrences in tokio-stream =="
rg -nP '\basync\s*\|' tokio-stream/src

Repository: martin-augment/tokio

Length of output: 835


🌐 Web query:

Rust async closure syntax stabilization version

💡 Result:

Rust’s async closure syntax (e.g., async |args| { ... } / async move |args| { ... }) was stabilized in Rust 1.85.0, released February 20, 2025. [1], [2]

Before 1.85, it was only available on nightly behind #![feature(async_closure)] (tracking issue: rust-lang/rust #62290). [3]

Sources:
[1] Rust Blog: “Announcing Rust 1.85.0 and Rust 2024” (2025-02-20)
[2] rust-lang/rust RELEASES.md (1.85.0): “Stabilize async closures”
[3] rust-lang/rust GitHub: Tracking issue #62290 (feature(async_closure))


Use MSRV-compatible closure syntax in the doctest example.

The async |x| syntax at line 510 is incompatible with tokio-stream's MSRV of 1.71. This syntax was only stabilized in Rust 1.85.0; it is not available in the supported range. Use |x| async move { ... } instead, which is supported across all MSRV-compatible versions.

Suggested fix
-/// let odds = stream.filter_map_async(async |x| {
+/// let odds = stream.filter_map_async(|x| async move {
 ///     if x % 2 == 0 { Some(x + 1) } else { None }
 /// });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let odds = stream.filter_map_async(async |x| {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
/// let odds = stream.filter_map_async(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tokio-stream/src/stream_ext.rs` around lines 510 - 512, The doctest snippet
for filter_map_async uses the unstable `async |x|` closure syntax; update the
example to MSRV-compatible syntax by replacing the `async |x| { ... }` closure
with `|x| async move { ... }` so code in the doctest (the example under
stream.filter_map_async) compiles on Rust 1.71–1.85; ensure you keep the same
body logic and include `move` to avoid borrow issues.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:valid-but-wont-fix; category:bug; feedback: The CodeRabbit AI reviewer is correct but since this is a doctest it is executed only with the nightly toolchain, so the build passes! Most of the Tokio users will use a recent toolchain so the example will work as is. The users who has to use older toolchain than 1.79 could easily adapt it

///
/// tokio::pin!(odds);
///
/// assert_eq!(Some(1), odds.next().await);
/// assert_eq!(Some(3), odds.next().await);
/// assert_eq!(Some(5), odds.next().await);
/// assert_eq!(Some(7), odds.next().await);
/// assert_eq!(None, odds.next().await);
/// # }
/// ```
fn filter_map_async<T, F, Fut>(self, f: F) -> FilterMapAsync<Self, Fut, F>
where
F: FnMut(Self::Item) -> Fut,
Fut: Future<Output = Option<T>>,
Self: Sized,
{
FilterMapAsync::new(self, f)
}

/// Creates a stream which ends after the first `None`.
///
/// After a stream returns `None`, behavior is undefined. Future calls to
Expand Down
75 changes: 75 additions & 0 deletions tokio-stream/src/stream_ext/filter_map_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use crate::Stream;

use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [`filter_map_async`](super::StreamExt::filter_map_async) method.
#[must_use = "streams do nothing unless polled"]
pub struct FilterMapAsync<St, Fut, F> {
#[pin]
stream: St,
#[pin]
future: Option<Fut>,
f: F,
}
}

impl<St, Fut, F> fmt::Debug for FilterMapAsync<St, Fut, F>
where
St: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FilterMapAsync")
.field("stream", &self.stream)
.finish()
}
}

impl<St, Fut, F> FilterMapAsync<St, Fut, F> {
pub(super) fn new(stream: St, f: F) -> Self {
FilterMapAsync {
stream,
future: None,
f,
}
}
}

impl<T, St, F, Fut> Stream for FilterMapAsync<St, Fut, F>
where
St: Stream,
Fut: Future<Output = Option<T>>,
F: FnMut(St::Item) -> Fut,
{
type Item = T;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let mut me = self.project();

loop {
if let Some(future) = me.future.as_mut().as_pin_mut() {
if let Some(item) = ready!(future.poll(cx)) {
me.future.set(None);
return Poll::Ready(Some(item));
} else {
me.future.set(None);
}
Comment on lines +55 to +60
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The call to me.future.set(None) is duplicated in both branches of the if-else statement. This can be simplified by hoisting the call out to improve readability and reduce redundancy.

                let result = ready!(future.poll(cx));
                me.future.set(None);
                if let Some(item) = result {
                    return Poll::Ready(Some(item));
                }

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The same statement is executed in both 'if' and 'else' bodies, so it could be extracted before the if. It won't fix any bug but it will be easier to read and maintain

}

match ready!(me.stream.as_mut().poll_next(cx)) {
Some(item) => {
me.future.set(Some((me.f)(item)));
}
None => return Poll::Ready(None),
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, self.stream.size_hint().1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_hint()’s upper bound ignores whether self.future is currently Some, so it can under-report (e.g., stream upper is Some(0) but an in-flight future may still yield one item). That violates the size_hint upper-bound contract for streams.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The logic should take into account also the current future as the others combinators do. Since this is hint it does not need to be exact but it would be nice to use a more correct value if it is not too complex to calculate it.

}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

size_hint upper bound ignores in-flight future

Low Severity

The size_hint upper bound doesn't account for an in-flight future. When an item has been consumed from the inner stream and a future is pending, self.stream.size_hint().1 no longer counts that item, but FilterMapAsync could still yield it if the future resolves to Some. This makes the reported upper bound potentially one less than the true maximum, violating the size_hint contract. The analogous Then adapter correctly adds future_len to the bounds.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! The logic should take into account also the current future as the others combinators do. Since this is hint it does not need to be exact but it would be nice to use a more correct value if it is not too complex to calculate it.

}
Loading