Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-stream: add StreamExt::map_while #4351

Merged
merged 5 commits into from Dec 31, 2021

Conversation

BraulioVM
Copy link
Contributor

Fixes #4337

Rust 1.57 stabilized the Iterator::map_while API. This PR adds the same functionality to the StreamExt trait, to keep parity.

Motivation

Keep parity with the standard iterator trait. There are many APIs that are available in std::iter::Iterator that aren't in StreamExt, but I guess it wouldn't harm to add this one?

Solution

The change is straight-forward, and is heavily inspired by the implementations of map and take_while.

@Darksonn Darksonn added the A-tokio-stream Area: The tokio-stream crate label Dec 28, 2021
Comment on lines 12 to 15
#[pin]
stream: St,
f: F,
done: bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lets the Option optimization make the struct smaller.

Suggested change
#[pin]
stream: St,
f: F,
done: bool
#[pin]
stream: Option<St>,
f: F,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know the option optimization applies here? Wouldn't that depend on the monomorphized St type? Is it common for the optimization to apply to Stream implementations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it depends on the underlying Stream, but it includes any stream containing pretty much any kind of pointer, and that's very common.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

A perhaps interesting aspect of this change is that the St object will be dropped as soon as the f returns None. I believe this is an acceptable aspect of this API, but should that be documented explicitly? The same does not apply to take_while, for example, where the St is not dropped until the TakeWhile is dropped, even if the TakeWhile predicate returns false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's necessary to document that it happens.

Comment on lines 48 to 60
if !*self.as_mut().project().done {
self.as_mut().project().stream.poll_next(cx).map(|ready| {
let ready = ready.and_then(self.as_mut().project().f);

if ready.is_none() {
*self.as_mut().project().done = true;
}

ready
})
} else {
Poll::Ready(None)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here is quite noisy with all the self.as_mut().project() repetitions. Consider something like this:

Suggested change
if !*self.as_mut().project().done {
self.as_mut().project().stream.poll_next(cx).map(|ready| {
let ready = ready.and_then(self.as_mut().project().f);
if ready.is_none() {
*self.as_mut().project().done = true;
}
ready
})
} else {
Poll::Ready(None)
}
let me = self.project();
match me.stream.as_pin_mut() {
Some(stream) => match stream.poll_next(cx).map(|opt| opt.and_then(me.f)) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => {
me.stream.set(None);
Poll::Ready(None)
},
Poll::Pending => Poll::Pending,
},
None => Poll::Ready(None),
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with reducing the noise in the implemetnation, but I'm not sure this will work. In particular, me.stream.as_pin_mut will consume the Pin<&mut Option<St>> and then we won't be able to call set on it. I can't find any other way of accessing the Pin<&mut St> we need to call poll_next on the underlying structure.

On the other hand, if we leave St as it is but then make the F optional, (f: Option<F>), because pinning is not structural to f, we can empty out the Option<F> whenever we want. I am not sure if the Option optimization is less likely to apply to F than it is to the St though. For what it's worth, the skip_while combinator also makes the associated closure optional, to the St.

Would it make sense to make the F optional instead of the St?

I'm not super comfortable with pinning so maybe there's something obvious I'm missing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do me.stream.as_mut().as_pin_mut().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!! Thanks, that's useful.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

#[must_use = "streams do nothing unless polled"]
pub struct MapWhile<St, F> {
#[pin]
stream: Option<St>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this Option is needed. Users who need this behavior can use .fuse() explicitly.

See also implementation of Iterator::map_while

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a very good point. Will address

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few combinators (skip_while and take_while at least) where we keep an extra piece of state (like the Option here) to remember whether we were done. I guess we don't need those either! They can be fixed in a different PR though

@Darksonn
Copy link
Contributor

Also, there's a merge conflict preventing this from getting merged.

Fixes tokio-rs#4337

Rust 1.57 stabilized the `Iterator::map_while` API. This PR adds the
same functionality to the `StreamExt` trait, to keep parity.
This way we can take leverage the Option optimization where for non-null
types we don't need any extra space to represent the `None`-ness.
Whether this applies will depend on the specific `St` type, but it's
very common.
`MapWhile` doesn't need to remember whether it's done. As per the contract of the `Stream` trait:

> Once a stream has finished (returned Ready(None) from poll_next), calling its poll_next method again may panic, block forever, or cause other kinds of problems; the Stream trait places no requirements on the effects of such a call.

This allows us to save the space that may be needed by the boolean,
but also lets us simplify the implementation. People can use `fuse`
if they so need.
Copy link
Member

@taiki-e taiki-e left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM aside from a nit.

tokio-stream/src/stream_ext.rs Outdated Show resolved Hide resolved
Co-authored-by: Taiki Endo <te316e89@gmail.com>
@taiki-e
Copy link
Member

taiki-e commented Dec 31, 2021

(CI failure will be fixed in #4361)

@Darksonn
Copy link
Contributor

You can merge in master to your branch to fix the CI failure.

@taiki-e taiki-e merged commit fb35c83 into tokio-rs:master Dec 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add tokio_stream::StreamExt::map_while
3 participants