Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove TryStreamExt::into_async_read Unpin bound #2599

Merged
merged 1 commit into from May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
101 changes: 51 additions & 50 deletions futures-util/src/stream/try_stream/into_async_read.rs
@@ -1,30 +1,26 @@
use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};

/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
pub struct IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
stream: St,
state: ReadState<St::Ok>,
}

impl<St> Unpin for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St::Ok: AsRef<[u8]>,
{
pin_project! {
/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
#[derive(Debug)]
#[must_use = "readers do nothing unless polled"]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
pub struct IntoAsyncRead<St>
where
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
#[pin]
stream: St,
state: ReadState<St::Ok>,
}
}

#[derive(Debug)]
Expand All @@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {

impl<St> IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
Expand All @@ -46,16 +42,18 @@ where

impl<St> AsyncRead for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
let mut this = self.project();

loop {
match &mut self.state {
match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
Expand All @@ -64,23 +62,23 @@ where
*chunk_start += len;

if chunk.len() == *chunk_start {
self.state = ReadState::PendingChunk;
*this.state = ReadState::PendingChunk;
}

return Poll::Ready(Ok(len));
}
ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
Expand All @@ -94,51 +92,52 @@ where

impl<St> AsyncWrite for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + AsyncWrite + Unpin,
St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
let this = self.project();
this.stream.poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
this.stream.poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
Pin::new(&mut self.stream).poll_close(cx)
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
this.stream.poll_close(cx)
}
}

impl<St> AsyncBufRead for IntoAsyncRead<St>
where
St: TryStream<Error = Error> + Unpin,
St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
while let ReadState::PendingChunk = self.state {
match ready!(self.stream.try_poll_next_unpin(cx)) {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
let mut this = self.project();

while let ReadState::PendingChunk = this.state {
match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
self.state = ReadState::Ready { chunk, chunk_start: 0 };
*this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
self.state = ReadState::Eof;
*this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}

if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
Expand All @@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}

fn consume(mut self: Pin<&mut Self>, amount: usize) {
fn consume(self: Pin<&mut Self>, amount: usize) {
let this = self.project();

// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
self.state = ReadState::PendingChunk;
*this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
Expand Down
17 changes: 6 additions & 11 deletions futures-util/src/stream/try_stream/mod.rs
Expand Up @@ -985,12 +985,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}

/// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
///
/// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
/// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
/// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
/// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
/// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
Expand All @@ -1002,20 +997,20 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
/// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
/// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
/// let mut buf = Vec::new();
///
/// assert!(reader.read_to_end(&mut buf).await.is_ok());
/// assert_eq!(buf, &[1, 2, 3, 4, 5]);
/// let mut buf = Vec::new();
/// reader.read_to_end(&mut buf).await.unwrap();
/// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))
Expand Down