Skip to content

Commit

Permalink
Add AsyncBufReadExt::fill_buf (#2225)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo157 committed Oct 3, 2020
1 parent 8f20432 commit 8472da9
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 2 deletions.
50 changes: 50 additions & 0 deletions futures-util/src/io/fill_buf.rs
@@ -0,0 +1,50 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use std::io;
use std::pin::Pin;

/// Future for the [`fill_buf`](super::AsyncBufReadExt::fill_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FillBuf<'a, R: ?Sized> {
reader: Option<&'a mut R>,
}

impl<R: ?Sized> Unpin for FillBuf<'_, R> {}

impl<'a, R: AsyncBufRead + ?Sized + Unpin> FillBuf<'a, R> {
pub(super) fn new(reader: &'a mut R) -> Self {
FillBuf { reader: Some(reader) }
}
}

impl<'a, R> Future for FillBuf<'a, R>
where R: AsyncBufRead + ?Sized + Unpin,
{
type Output = io::Result<&'a [u8]>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let reader = this.reader.take().expect("Polled FillBuf after completion");

match Pin::new(&mut *reader).poll_fill_buf(cx) {
// With polinius it is possible to remove this inner match and just have the correct
// lifetime of the reference inferred based on which branch is taken
Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
Poll::Ready(Err(err)) => {
unreachable!("reader indicated readiness but then returned an error: {:?}", err)
}
Poll::Pending => {
unreachable!("reader indicated readiness but then returned pending")
}
},
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
this.reader = Some(reader);
Poll::Pending
}
}
}
}
57 changes: 56 additions & 1 deletion futures-util/src/io/mod.rs
Expand Up @@ -12,7 +12,7 @@
#[cfg(feature = "io-compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
use crate::compat::Compat;
use std::ptr;
use std::{ptr, pin::Pin};

pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
Expand Down Expand Up @@ -67,6 +67,9 @@ pub use self::cursor::Cursor;
mod empty;
pub use self::empty::{empty, Empty};

mod fill_buf;
pub use self::fill_buf::FillBuf;

mod flush;
pub use self::flush::Flush;

Expand Down Expand Up @@ -591,6 +594,58 @@ impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {
/// Creates a future which will wait for a non-empty buffer to be available from this I/O
/// object or EOF to be reached.
///
/// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
///
/// ```rust
/// # futures::executor::block_on(async {
/// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
///
/// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
///
/// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
/// stream.consume_unpin(2);
///
/// assert_eq!(stream.fill_buf().await?, vec![3]);
/// stream.consume_unpin(1);
///
/// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
/// stream.consume_unpin(3);
///
/// assert_eq!(stream.fill_buf().await?, vec![]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn fill_buf(&mut self) -> FillBuf<'_, Self>
where Self: Unpin,
{
FillBuf::new(self)
}

/// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
///
/// ```rust
/// # futures::executor::block_on(async {
/// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
///
/// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
///
/// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
/// stream.consume_unpin(2);
///
/// assert_eq!(stream.fill_buf().await?, vec![3]);
/// stream.consume_unpin(1);
///
/// assert_eq!(stream.fill_buf().await?, vec![]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn consume_unpin(&mut self, amt: usize)
where Self: Unpin,
{
Pin::new(self).consume(amt)
}

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Expand Up @@ -343,7 +343,7 @@ pub mod io {
pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
BufReader, BufWriter, Cursor, Chain, Close, copy, Copy, copy_buf, CopyBuf,
empty, Empty, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
empty, Empty, FillBuf, Flush, IntoSink, Lines, Read, ReadExact, ReadHalf,
ReadLine, ReadToEnd, ReadToString, ReadUntil, ReadVectored, repeat,
Repeat, ReuniteError, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf,
WriteVectored,
Expand Down

0 comments on commit 8472da9

Please sign in to comment.