Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stream rfc #15

Merged
merged 26 commits into from
Jul 29, 2020
Merged
Changes from 14 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7ede7b7
update terminology to lending stream, rather than detached stream
nellshamrell Jun 30, 2020
3ccfcb0
adds more information on IntoStream trait
nellshamrell Jun 30, 2020
b5f7031
expands on FromStream trait
nellshamrell Jun 30, 2020
f6cd705
add more info on converting between lending streams and non-lending s…
nellshamrell Jun 30, 2020
a6a69a0
even more information about converting Streams to LendingStreams
nellshamrell Jun 30, 2020
81a2d51
adds in information about the next method
nellshamrell Jul 2, 2020
7c95855
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
9f6aa05
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
1750343
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
0050a3e
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
9d48ced
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
8d19261
Update rfc-drafts/stream.md
nellshamrell Jul 7, 2020
06205c4
adds in info on IntoStream
nellshamrell Jul 7, 2020
d646a7e
adds in more info about generators
nellshamrell Jul 7, 2020
c0bbd43
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
8cf7b37
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
df39f12
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
a13dd48
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
e398679
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
d41c12b
Update rfc-drafts/stream.md
nellshamrell Jul 9, 2020
ddec9e0
a few clarifications
nellshamrell Jul 9, 2020
4bcc194
fixes name of crate
nellshamrell Jul 10, 2020
6d06d3f
Update rfc-drafts/stream.md
nellshamrell Jul 10, 2020
35efa81
removes unnecessary bullet points
nellshamrell Jul 10, 2020
642935a
Update rfc-drafts/stream.md
nellshamrell Jul 14, 2020
b9c8bff
adds more info based on feedback
nellshamrell Jul 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
264 changes: 243 additions & 21 deletions rfc-drafts/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,52 @@ where
}
```

## Next method/struct

We should also implement a next method, similar to [the implementation in the futures crate](https://docs.rs/futures-util/0.3.5/src/futures_util/stream/stream/next.rs.html#10-12).
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

```rust
/// A future that advances the stream and returns the next value.
///
/// This `struct` is created by the [`next`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`next`]: trait.Stream.html#method.next
/// [`Stream`]: trait.Stream.html
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, S: ?Sized> {
stream: &'a mut S,
}

impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
pub(super) fn new(stream: &'a mut St) -> Self {
Next { stream }
}
}

impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
type Output = Option<St::Item>;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
```

This would allow a user to await on a future:

```rust
while let Some(v) = stream.next().await {

}
```

# Reference-level explanation
[reference-level-explanation]: #reference-level-explanation

Expand Down Expand Up @@ -235,27 +281,156 @@ Designing such a migration feature is out of scope for this RFC.

### IntoStream

**Iterators**

Iterators have an `IntoIterator` that is used with `for` loops to convert items of other types to an iterator.

```rust
pub trait IntoIterator where
<Self::IntoIter as Iterator>::Item == Self::Item,
{
type Item;

type IntoIter: Iterator;

fn into_iter(self) -> Self::IntoIter;
}
```

Examples taken from the Rust docs on [for loops and into_iter]](https://doc.rust-lang.org/std/iter/index.html#for-loops-and-intoiterator)
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

* `for x in iter` uses `impl IntoIterator for T`

```rust
let values = vec![1, 2, 3, 4, 5];

for x in values {
println!("{}", x);
}
```

Desugars to:

```rust
let values = vec![1, 2, 3, 4, 5];
{
let result = match IntoIterator::into_iter(values) {
mut iter => loop {
let next;
match iter.next() {
Some(val) => next = val,
None => break,
};
let x = next;
let () = { println!("{}", x); };
},
};
result
}
```
* `for x in &iter` uses `impl IntoIterator for &T`
* `for x in &mut iter` uses `impl IntoIterator for &mut T`

**Streams**

We may want a trait similar to this for `Stream`. The `IntoStream` trait would provide a way to convert something into a `Stream`.

This trait could look like this:

[TO BE ADDED]
```rust
pub trait IntoStream where
<Self::IntoStream as Stream>::Item == Self::Item,
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved
{
type Item;

type IntoStream: Stream;

fn into_stream(self) -> Self::IntoStream;
}
```
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

This trait (as expressed by @taiki-e in [a comment on a draft of this RFC](https://github.com/rust-lang/wg-async-foundations/pull/15/files#r449880986)) makes it easy to write streams in combination with [async stream](https://github.com/taiki-e/futures-async-stream).
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

```rust
type S(usize);

impl IntoStream for S {
type Item = usize;
type IntoStream: impl Stream<Item = Self::Item>;

fn into_stream(self) -> Self::IntoStream {
#[stream]
async move {
for i in 0..self.0 {
yield i;
}
}
}
}
```

### FromStream

**Iterators**

Iterators have an `FromIterator` that is used to convert iterators into another type.

```rust
pub trait FromIterator<A> {

fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = A>;
}
```

It should be noted that this trait is rarely used directly, instead used through Iterator's collect method ([source](https://doc.rust-lang.org/std/iter/trait.FromIterator.html)).

```rust
pub trait Interator {
fn collect<B>(self) -> B
where
B: FromIterator<Self::Item>,
{ ... }
}
```

Examples taken from the Rust docs on [iter and collect]](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect)
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved


```rust
let a = [1, 2, 3];

let doubled: Vec<i32> = a.iter()
.map(|&x| x * 2)
.collect();

```

**Streams**

We may want a trait similar to this for `Stream`. The `FromStream` trait would provide way to convert a `Stream` into another type.

This trait could look like this:

[TO BE ADDED]
```rust
pub trait FromStream<A> {
async fn from_stream<T>(stream: T) -> Self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One very open-ended question I haven't thought too hard about yet: Would it ever be possible to automatically implement this for all T where T: FromIterator? It seems like, in most cases, the implementation should be logically equivalent, with the (rather large) wrinkle that now calling next() suspends you :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely think that's possible. To clarify, are you thinking of implementing this same trait for both Streams and Iterators, or are you thinking of having separate traits with very similar functionality?

Copy link
Member

@tmandry tmandry Jul 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm saying that FromStream (as defined here) and the existing FromIterator trait are separate, but with very similar functionality.

Thinking more about it, FromStream is actually more general, because an await point is allowed to suspend execution of the current function but doesn't actually have to. So many, but not all, existing impls of FromIterator would work for FromStream as well. If not for coherence with existing impls, we could do impl<T: FromStream<Item = I>> FromIterator<I> for T, and switch a bunch of FromIterator impls to be FromStream impls without having to repeat ourselves.

This is a somewhat similar problem to the LendingStream/Stream relationship below, but the relationship is reversed (the trait impls we're starting with are allowed to assume too much – that there are no suspend points – rather than too little).

This might be another possible bullet point for the lang team discussion, @nikomatsakis? (cc rust-lang/lang-team#34)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way we solved this in async-std is by manually copying all implementations of FromIterator from std (ref). When we choose to introduce std::stream::FromStream we could pretty much copy-paste all of the impls from async-std. I forget whether there were coherence issues (likely), but the main reason why we took this approach is because doing general conversions between sync and async code is hard to get right in practice:

  • a generic conversion from sync -> async means operations may quietly block in the background which can lead to perf issues in runtimes
  • a generic conversion from async -> sync means using methods such as task::block_on which is hard to tune, or risk deadlocks when nested (both futures-rs and async-std have had issues with this)

To help with the conversion from Iterator -> Stream we introduced async_std::stream::from_iter. To my knowledge all of this together has covered people's needs quite well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the way currently used in async-std is the ideal way. That way means that all third-party crates that provide the FromIterator implementation must be patched in order to fully support.

As far as I know, this is why futures implements collect using existing traits (Default, Extend) rather than its own. The way used in futures cannot currently optimize based on the size-hint of the stream, but this will be possible once rust-lang/rust#72631 stabilizes.

If we want the collection to extend asynchronously, you can use Sink or use a loop.

To help with the conversion from Iterator -> Stream we introduced async_std::stream::from_iter. To my knowledge all of this together has covered people's needs quite well.

This seems copy of futures::stream::iter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want the collection to extend asynchronously, you can use Sink or use a loop.

I don't think that's a compelling outcome; this means that patterns from sync Rust wouldn't be transferable to async Rust. Where possible I think it's incredibly valuable to aim for parity between the two. Though this kind of gets into "design philosophy", and at least in the context of this PR we don't need to find consensus.

That way means that all third-party crates that provide the FromIterator implementation must be patched in order to fully support.

That's accurate, and I think that's the right outcome. In some cases FromIterator implementations perform blocking IO which I think is I think enough reason to not introduce a blanket conversion. For example if a function were to use impl FromStream<Result<fs::Metadata>> as an input, then both an iterator constructed from std::fs::ReadDir and a stream constructed from async_std::fs::ReadDir would be valid*. However the synchronous variant would likely interfere with the scheduler.

*Admittedly it's a bit of a contrived example, but hopefully it still serves to illustrate the point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and at least in the context of this PR we don't need to find consensus.

Agreed, it actually requires real async trait method to add this to std, and afaik all of the existing approaches have some problems.

In some cases FromIterator implementations perform blocking IO which I think is I think enough reason to not introduce a blanket conversion.

Well, this problem also exists in futures::stream::iter/async_std::stream::from_iter.


Aside from it's preferable or not, even a crate that is obviously unrelated to async can't be used as a return type for collect without implementing it, which is necessary information when explaining/considering-adding it (I mainly think about collections).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside from it's preferable or not, even a crate that is obviously unrelated to async can't be used as a return type for collect without implementing it, which is necessary information when explaining/considering-adding it (I mainly think about collections).

One solution I was thinking about for this is, leave futures's current collect method (as collect2 or another name), even after the FromStream based collect is implemented.
(This is not an ideal solution, but it means third-party crates can provide mitigation for this, even if std can't solve this problem.)

where
T: IntoStream<Item = A>;
}
```

We could potentially include a collect method for Stream as well.

```rust
pub trait Stream {
async fn collect<B>(self) -> B
where
B: FromStream<Self::Item>,
{ ... }
}
```

## Other Traits

Expand Down Expand Up @@ -289,25 +464,29 @@ for elem in stream { ... }

Designing this extension is out of scope for this RFC. However, it could be prototyped using procedural macros today.

## "Attached" streams
## "Lending" streams

There has been much discussion around attached/detached streams.
There has been much discussion around lending streams (also referred to as attached streams).

### Definitions

[Source](https://smallcultfollowing.com/babysteps/blog/2019/12/10/async-interview-2-cramertj-part-2/#the-need-for-streaming-streams-and-iterators)

In a **detached** stream, the `Item` that gets returned by `Stream` is "detached" from self. This means it can be stored and moved about independently from `self`.

In an **attached** stream, the `Item` that gets returned by `Stream` may be borrowed from `self`. It can only be used as long as the `self` reference remains live.
In an **lending** stream (also known as an "attached" stream), the `Item` that gets returned by `Stream` may be borrowed from `self`. It can only be used as long as the `self` reference remains live.
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

In a **non-lending** stream (also known as a "detached" stream), the `Item` that gets returned by `Stream` is "detached" from self. This means it can be stored and moved about independently from `self`.

This RFC does not cover the addition of lending streams (streams as implemented through this RFC are all non-lending streams).
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

This RFC does not cover the addition of attached/detached owned/borrowed streams.
We can add the `Stream` trait to the standard library now and delay
adding in this distinction between two types of streams. The advantage of this
is it would allow us to copy the `Stream` trait from `futures` largely 'as is'.
The disadvantage of this is functions that consume streams would first be written
to work with `Stream`, and then potentially have to be rewritten later to work with
`AttachedStream`s.
adding in this distinction between the two types of streams - lending and
non-lending. The advantage of this is it would allow us to copy the `Stream`
trait from `futures` largely 'as is'.

The disadvantage of this is functions that consume streams would
first be written to work with `Stream`, and then potentially have
to be rewritten later to work with `LendingStream`s.

### Current Stream Trait

Expand All @@ -327,10 +506,10 @@ pub trait Stream {
This trait, like `Iterator`, always gives ownership of each item back to its caller. This offers flexibility -
such as the ability to spawn off futures processing each item in parallel.

### Potential Attached Stream Trait
### Potential Lending Stream Trait

```rust
impl<S> AttachedStream for S
impl<S> LendingStream for S
where
S: Stream,
{
Expand All @@ -346,19 +525,42 @@ where
```

This is a "conversion" trait such that anything which implements `Stream` can also implement
`Attached Stream`.
`Lending Stream`.

This trait captures the case we re-use internal buffers. This would be less flexible for
consumers, but potentially more efficient. Types could implement the `AttachedStream`
consumers, but potentially more efficient. Types could implement the `LendingStream`
where they need to re-use an internal buffer and `Stream` if they do not. There is room for both.

We would also need to pursue the same design for iterators - whether through adding two traits
or one new trait with a "conversion" from the old trait.

This also brings up the question of whether we should allow conversion in the opposite way - if
every "Detached" stream can become an attached one, should _some_ detached streams be able to
become attached ones? These use cases need more thought, which is part of the reason
it is out of the scope of this particular RFC.
every non-lending stream can become a lending one, should _some_ non-lending streams be able to
become lending ones?
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

We can say that, as the Rust language stands today, we cannot cleanly convert impl Stream to impl LendingStream due to a coherence conflict.
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

If you have other impls like:

```rust
impl<T> Stream for Box<T> where T: Stream
```

and

```rust
impl<T> LendingStream for Box<T> where T: LendingStream
```

There is a coherence conflict for `Box<impl Stream>`, so presumably it will fail the coherence rules.

[More examples are available here](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=a667a7560f8dc97ab82a780e27dfc9eb).
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved

Resolving this would require either an explicit “wrapper” step or else some form of language extension.

It should be noted that the same applies to Iterator, it is not unique to Stream.

These use cases for lending/non-lending streams need more thought, which is part of the reason it is out of the scope of this particular RFC.

## Generator syntax
[generator syntax]: #generator-syntax
Expand All @@ -373,9 +575,29 @@ or "owned" stream, the generator yield could return things
that you own or things that were borrowed from your caller.

```rust
gen async fn foo() -> X {
gen async fn foo() -> Value {
nellshamrell marked this conversation as resolved.
Show resolved Hide resolved
yield value;
}
```

Designing generator functions is out of the scope of this RFC.
After desugaring, this would result in a function like:

```rust
fn foo() -> impl Iterator<Item = Value>
```

```rust
async gen fn foo() -> Value
```

nellshamrell marked this conversation as resolved.
Show resolved Hide resolved
After desugaring would result in a function like:

```rust
fn foo() -> impl Stream<Item = Value>
```

If we introduce `-> Stream` first, we will have to permit `LendingStream` in the future.
Additionally, if we introduce `LendingStream` later, we'll have to figure out how
to convert a `LendingStream` into a `Stream` seamlessly.

Further designing generator functions is out of the scope of this RFC.