From 8472da905bc7b7f269b7452e7ce7c8487c8196e8 Mon Sep 17 00:00:00 2001 From: Nemo157 Date: Sat, 3 Oct 2020 13:43:12 +0200 Subject: [PATCH] Add AsyncBufReadExt::fill_buf (#2225) --- futures-util/src/io/fill_buf.rs | 50 +++++++++++++++++++++++++++++ futures-util/src/io/mod.rs | 57 ++++++++++++++++++++++++++++++++- futures/src/lib.rs | 2 +- 3 files changed, 107 insertions(+), 2 deletions(-) create mode 100644 futures-util/src/io/fill_buf.rs diff --git a/futures-util/src/io/fill_buf.rs b/futures-util/src/io/fill_buf.rs new file mode 100644 index 0000000000..015547e0bf --- /dev/null +++ b/futures-util/src/io/fill_buf.rs @@ -0,0 +1,50 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::pin::Pin; + +/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct FillBuf<'a, R: ?Sized> { + reader: Option<&'a mut R>, +} + +impl Unpin for FillBuf<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> { + pub(super) fn new(reader: &'a mut R) -> Self { + FillBuf { reader: Some(reader) } + } +} + +impl<'a, R> Future for FillBuf<'a, R> + where R: AsyncBufRead + ?Sized + Unpin, +{ + type Output = io::Result<&'a [u8]>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + let reader = this.reader.take().expect("Polled FillBuf after completion"); + + match Pin::new(&mut *reader).poll_fill_buf(cx) { + // With polinius it is possible to remove this inner match and just have the correct + // lifetime of the reference inferred based on which branch is taken + Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) { + Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)), + Poll::Ready(Err(err)) => { + unreachable!("reader indicated readiness but then returned an error: {:?}", err) + } + Poll::Pending => { + unreachable!("reader indicated readiness but then returned pending") + } + }, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Pending => { + this.reader = Some(reader); + Poll::Pending + } + } + } +} diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 41d5a863c6..51ee995d11 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -12,7 +12,7 @@ #[cfg(feature = "io-compat")] #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))] use crate::compat::Compat; -use std::ptr; +use std::{ptr, pin::Pin}; pub use futures_io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, @@ -67,6 +67,9 @@ pub use self::cursor::Cursor; mod empty; pub use self::empty::{empty, Empty}; +mod fill_buf; +pub use self::fill_buf::FillBuf; + mod flush; pub use self::flush::Flush; @@ -591,6 +594,58 @@ impl AsyncSeekExt for S {} /// An extension trait which adds utility methods to `AsyncBufRead` types. pub trait AsyncBufReadExt: AsyncBufRead { + /// Creates a future which will wait for a non-empty buffer to be available from this I/O + /// object or EOF to be reached. + /// + /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf). + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}}; + /// + /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read(); + /// + /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]); + /// stream.consume_unpin(2); + /// + /// assert_eq!(stream.fill_buf().await?, vec![3]); + /// stream.consume_unpin(1); + /// + /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]); + /// stream.consume_unpin(3); + /// + /// assert_eq!(stream.fill_buf().await?, vec![]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn fill_buf(&mut self) -> FillBuf<'_, Self> + where Self: Unpin, + { + FillBuf::new(self) + } + + /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types. + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}}; + /// + /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read(); + /// + /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]); + /// stream.consume_unpin(2); + /// + /// assert_eq!(stream.fill_buf().await?, vec![3]); + /// stream.consume_unpin(1); + /// + /// assert_eq!(stream.fill_buf().await?, vec![]); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn consume_unpin(&mut self, amt: usize) + where Self: Unpin, + { + Pin::new(self).consume(amt) + } + /// Creates a future which will read all the bytes associated with this I/O /// object into `buf` until the delimiter `byte` or EOF is reached. /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until). diff --git a/futures/src/lib.rs b/futures/src/lib.rs index c85a483818..48f2ee971e 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -343,7 +343,7 @@ pub mod io { pub use futures_util::io::{ AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf, - empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, + empty, Empty, FillBuf, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat, Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, WriteVectored,