Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Extend in Stream::collect with custom FromStream (a la FromIterator) #1833

Open
mehcode opened this issue Aug 28, 2019 · 5 comments
Open
Labels
A-stream Area: futures::stream C-feature-request S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed.

Comments

@mehcode
Copy link
Contributor

mehcode commented Aug 28, 2019

I'm puzzled on this one. I could understand Rust not being able to infer the type with how generic Stream::collect is but Rust can somehow infer it from the declaration if the type is not Copy.

use futures::executor::block_on;
use futures::future::ready;
use futures::stream::once;
use futures::Stream;
use futures::StreamExt;

fn defaults_stream<T: Default>() -> impl Stream<Item = T> {
    once(ready(T::default()))
}

fn defaults_iter<T: Default>() -> impl Iterator<Item = T> {
    vec![T::default()].into_iter()
}

fn main() {
    // iter / String : ok
    let _: Vec<String> = defaults_iter().collect();

    // iter / u32 : ok
    let _: Vec<u32> = defaults_iter().collect();

    // stream / String : ok
    let _: Vec<String> = block_on(defaults_stream().collect());

    // stream / u32 : err (?)
    let _: Vec<u32> = block_on(defaults_stream().collect());

    // stream / (u32, String) : ok (?)
    let _: Vec<(u32, String)> = block_on(defaults_stream().collect());

    // stream / (u32, u32) : err (?)
    let _: Vec<(u32, u32)> = block_on(defaults_stream().collect());
}

Any idea what's going on here?

I should note that I've tried and reproduced this in 1.36.0 and nightly 2019-08-28.


Update: After talking this through with a colleague we figured out that the issue here is that .collect() in Stream depends on Extend which has two possible implementations for Copy types on Vec.

Possible solution is to make a FromStream trait like Iterator has FromIterator.

@mehcode mehcode changed the title Type inference through Stream::collect breaks if the item type is not String Type inference through Stream::collect breaks if the item type is not a primitive Aug 28, 2019
@mehcode mehcode changed the title Type inference through Stream::collect breaks if the item type is not a primitive Type inference through Stream::collect breaks if the item type is Copy Aug 28, 2019
@cramertj cramertj changed the title Type inference through Stream::collect breaks if the item type is Copy Replace Extend in Stream::collect with custom FromStream (a la FromIterator) Oct 30, 2019
@taiki-e taiki-e added the A-stream Area: futures::stream label Dec 17, 2020
@taiki-e taiki-e added this to the futures-0.4 milestone Dec 17, 2020
@ibraheemdev
Copy link
Member

ibraheemdev commented May 9, 2021

I think that this is a high priority issue, primarily because collect does not take the size-hint into account, meaning that it allocates for each item in the stream, which is very inefficient.

Ideally FromStream would look like this:

trait FromStream<A>: Sized {
    async fn from_stream<S: Stream<Item = A>>(stream: S) -> Self;
}

But of course, we don't have async fn in traits, so this wouldn't work. Instead, we have a couple of different options.

  1. Boxed Future
trait FromStream<A>: Sized {
    fn from_stream<'a, S: Stream<Item = A>>(stream: S) -> BoxFuture<'a, Self>;
}

This is what async-std does, and is problematic for obvious reasons.

  1. Poll based
trait FromStream<A>: Sized {
    type Buf;

    fn initialize(size_hint: (usize, Option<usize>)) -> Self::Buf;

    fn poll_from_stream<S: Stream<Output = A>>(
        buf: &mut Self::Buf,
        stream: Pin<&mut S>,
        cx: &mut Context<'_>,
    ) -> Poll<Self>;
}

The problem with this approach is that it is very limited. Control flow is hard because you can't store the stream anywhere. For example, when implementing it for Result, I had hack around this by creating once streams to push to the underlying buffer, or and empty streams to construct the final collection.

  1. Extend based

This is what tokio-stream does (as private API):

pub trait FromStream<A>: Sized {
    type Buf;

    fn initialize(size_hint: (usize, Option<usize>)) -> Self::Buf;
    fn extend(buf: &mut Self::Buf, val: A) -> ControlFlow;
    fn finish(buf: Self::Buf) -> Self;
}

This is very similar to the current Default + Extend approach. Again, it is limited, but would probably work for most implementations. It doesn't completely suffer from the same problem as the poll-based version because extend and finish are separate methods that wrapper implementations can use.

All of these APIs feel subpar in comparison to async fn. The closest that we can get to what is desired is with a futures-based API:

  1. Future Based
trait FromStream<A>: Sized {
    type Output: Future<Output = Self>;

    fn from_stream<S: Stream<Item = A>>(stream: S) -> Self::Output;
}

The problem with this approach is that Output does not have access to the generic S, which would require GATs:

impl FromStream<Foo> for Bar {
    type Output<A> = BarFromStreamFut<A>;

    fn from_stream<S: Stream<Item = A>>(stream: S) -> Self::Output<S>;
}

Instead, the future would have to box the stream, which suffers from the same issues as #1.

  1. Future Based + Generic over stream

A solution to the above problem is to make FromStream generic over the actual stream, rather than the item:

trait FromStream<S: Stream> {
    type Output: Future<Output = Self>;

    fn from_stream(stream: S) -> Self::Output;
}

However, generics with different associated types are still considered conflicting:

// ERROR: conflicting implementations

impl<S: Stream<Item = char>> FromStream<S> for String { ... }
impl<S: Stream<Item = String>> FromStream<S> for String { ... }

This means that FromStream would have to be generic over both the stream, and the item:

  1. Future Based + Generic over stream and item
trait FromStream<A, S>
where
    S: Stream<Output = A>,
{
    type Output: Future<Output = Self>;

    fn from_stream(stream: S) -> Self::Output;
}

This solution works, and is forward compatible with type_alias_impl_trait, which will make implementation easy in the future. The main problem is the same one that the original FromIterator<A, S> suffered from:

If they are on the trait then it is extremely annoying to use them as generic parameters to a function, e.g. with the iterator param on the trait itself, if one was to pass an Extendable to a function that filled it either from a Range or a Map, one needs to write something like:

fn foo<E: Extendable<int, Range<int>> +
          Extendable<int, Map<&'self int, int, VecIterator<int>>>
      (e: &mut E, ...) { ... }

since using a generic, i.e. foo<E: Extendable<int, I>, I: Iterator> means that foo takes 2 type parameters, and the caller has to specify them (which doesn't work anyway, as they'll mismatch with the iterators used in foo itself).

I still think that this approach has merit, and is least-breaking to change in the future when we get GATs or async fn in traits. Those however are quite far from getting stabilized, so it makes sense to add this definition to futures now. Users will not really interact directly with FromStream - it's main purpose is for use with StreamExt::collect, so slightly more complex bounds are probably fine.

It's also worth noting that Extend::extend_reserve is a nightly feature that could be used to solve the issue of reserving space based on the size-hint, but this does not allow for things like collecting into a result.

I'm working on a PR for the 6th method I mentioned, and it should be opened soon. Any thoughts?

cc @Nemo157 @taiki-e

@ibraheemdev
Copy link
Member

Hm.. so the problem with adding a second generic parameter means that you can't implement it for Result<V, E> because you have to introduce a second stream parameter for V: FromStream<A, VS>, and VS is not constrained because there could potentially be multipe impls...

@ibraheemdev
Copy link
Member

@taiki-e which API do you prefer?

@taiki-e
Copy link
Member

taiki-e commented May 11, 2021

Hmm... My thought when I looked into this before was "there is no best way at this time". rust-lang/wg-async#15 (comment)

@taiki-e taiki-e added the S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed. label Jul 19, 2023
@ebkalderon
Copy link
Contributor

ebkalderon commented Feb 28, 2024

Regarding option 4 outlined above, GATs have been stabilized as of Rust 1.65. Not sure if there is any appetite in moving forward with this approach in a next major/minor release of the futures crate, or if alternative approaches have been discussed in the meantime, but I thought this would be worth bubbling to the top.

async fn in traits is now stable as well, but this feature arrived much more recently in Rust 1.75 and has some notable caveats. It may not be the best candidate to use for the time being until those are ironed out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-stream Area: futures::stream C-feature-request S-needs-api-design Status: Before implementing this, a discussion or decision on the new API is needed.
Projects
None yet
Development

No branches or pull requests

5 participants