Skip to content

Commit

Permalink
Rollup merge of #81797 - yoshuawuyts:stream_from_iter, r=dtolnay
Browse files Browse the repository at this point in the history
Add `core::stream::from_iter`

_Tracking issue: https://github.com/rust-lang/rust/issues/81798_

This_ PR implements `std::stream::from_iter`, as outlined in the _"Converting an Iterator to a Stream"_ section of the [Stream RFC](https://github.com/nellshamrell/rfcs/blob/add-async-stream-rfc/text/0000-async-stream.md#converting-an-iterator-to-a-stream). This function enables converting an `Iterator` to a `Stream` by wrapping each item in the iterator with a `Poll::Ready` instance.

r? `@tmandry`

cc/ `@rust-lang/libs` `@rust-lang/wg-async-foundations`

## Example

Being able to convert from an iterator into a stream is useful when refactoring from iterative loops into a more functional adapter-based style. This is fairly common when using more complex `filter` / `map` / `find` chains. In its basic form this conversion looks like this:

**before**
```rust
let mut output = vec![];
for item in my_vec {
    let out = do_io(item).await?;
    output.push(out);
}
```
**after**
```rust
use std::stream;

let output = stream::from_iter(my_vec.iter())
    .map(async |item| do_io(item).await)
    .collect()?;
```

Having a way to convert an `Iterator` to a `Stream` is essential in enabling this flow.

## Implementation Notes

This PR makes use of `unsafe {}` to pin an item. Currently we're having conversations on the libs stream in Zulip how to bring `pin-project` in as a dependency to `core` so we can omit the `unsafe {}`.

This PR also includes a documentation block which references `Stream::next` which currently doesn't exist in the stdlib (originally included in the RFC and PR, but later omitted because of an unresolved issue). `stream::from_iter` can't stabilize before `Stream` does, and there's still a chance we may stabilize `Stream` with a `next` method. So this PR includes documentation referencing that method, which we can remove as part of stabilization if by any chance we don't have `Stream::next`.

## Alternatives Considered

### `impl IntoStream for T: IntoIterator`

An obvious question would be whether we could make it so every iterator can automatically be converted into a stream by calling `into_stream` on it. The answer is: "perhaps, but it could cause type issues". Types like `std::collections` may want to opt to create manual implementations for `IntoStream` and `IntoIter`, which wouldn't be possible if it was implemented through a catch-all trait.

Possibly an alternative such as `impl IntoStream for T: Iterator` could work, but it feels somewhat restrictive. In the end, converting an iterator to a stream is likely to be a bit of a niche case. And even then, **adding a standalone function to convert an `Iterator` into a `Stream` would not be mutually exclusive with a blanket implementation**.

### Naming

The exact name can be debated in the period before stabilization. But I've chosen `stream::from_iter` rather than `stream::iter` because we are _creating a stream from an iterator_ rather than _iterating a stream_. We also expect to add a stream counterpart to `iter::from_fn` later on (blocked on async closures), and having `stream::from_fn` and `stream::from_iter` would feel like a consistent pair. It also has prior art in `async_std::stream::from_iter`.

## Future Directions
### Stream conversions for collections

This is a building block towards implementing `stream/stream_mut/into_stream` methods for `std::collections`, `std::vec`, and more. This would allow even quicker refactorings from using loops to using iterator adapters by omitting the import altogether:

**before**
```rust
use std::stream;

let output = stream::from_iter(my_vec.iter())
    .map(async |item| do_io(item).await)
    .collect()?;
```
**after**
```rust
let output = my_vec
    .stream()
    .map(async |item| do_io(item).await)
    .collect()?;
```
  • Loading branch information
JohnTitor committed Aug 3, 2021
2 parents a6ece56 + 660f585 commit ad74828
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
38 changes: 38 additions & 0 deletions library/core/src/stream/from_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use crate::pin::Pin;

use crate::stream::Stream;
use crate::task::{Context, Poll};

/// A stream that was created from iterator.
///
/// This stream is created by the [`from_iter`] function.
/// See it documentation for more.
///
/// [`from_iter`]: fn.from_iter.html
#[unstable(feature = "stream_from_iter", issue = "81798")]
#[derive(Clone, Debug)]
pub struct FromIter<I> {
iter: I,
}

#[unstable(feature = "stream_from_iter", issue = "81798")]
impl<I> Unpin for FromIter<I> {}

/// Converts an iterator into a stream.
#[unstable(feature = "stream_from_iter", issue = "81798")]
pub fn from_iter<I: IntoIterator>(iter: I) -> FromIter<I::IntoIter> {
FromIter { iter: iter.into_iter() }
}

#[unstable(feature = "stream_from_iter", issue = "81798")]
impl<I: Iterator> Stream for FromIter<I> {
type Item = I::Item;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.iter.next())
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
2 changes: 2 additions & 0 deletions library/core/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@
//! warning: unused result that must be used: streams do nothing unless polled
//! ```

mod from_iter;
mod stream;

pub use from_iter::{from_iter, FromIter};
pub use stream::Stream;

0 comments on commit ad74828

Please sign in to comment.