diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a02..ffbfc7eae9 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState, -} - -impl Unpin for IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead + where + St: TryStream, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState> { impl IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl AsyncRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl AsyncWrite for IntoAsyncRead where - St: TryStream + AsyncWrite + Unpin, + St: TryStream + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_close(cx) } } impl AsyncBufRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 0df1218a10..dcfdc70a3b 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -985,12 +985,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1002,12 +997,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1015,7 +1010,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead where - Self: Sized + TryStreamExt + Unpin, + Self: Sized + TryStreamExt, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self))