Skip to content

Commit

Permalink
Use our own io::{Empy, Repeat, Sink}
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 27, 2019
1 parent cde791c commit 7c0aea0
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 20 deletions.
16 changes: 0 additions & 16 deletions futures-io/src/lib.rs
Expand Up @@ -416,14 +416,6 @@ mod if_std {
unsafe_delegate_async_read_to_stdio!();
}

impl AsyncRead for io::Repeat {
unsafe_delegate_async_read_to_stdio!();
}

impl AsyncRead for io::Empty {
unsafe_delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}
Expand Down Expand Up @@ -547,10 +539,6 @@ mod if_std {
delegate_async_write_to_stdio!();
}

impl AsyncWrite for io::Sink {
delegate_async_write_to_stdio!();
}

macro_rules! deref_async_seek {
() => {
fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom)
Expand Down Expand Up @@ -651,10 +639,6 @@ mod if_std {
delegate_async_buf_read_to_stdio!();
}

impl AsyncBufRead for io::Empty {
delegate_async_buf_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncBufRead for io::Cursor<T> {
delegate_async_buf_read_to_stdio!();
}
Expand Down
63 changes: 63 additions & 0 deletions futures-util/src/io/empty.rs
@@ -0,0 +1,63 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, Initializer};
use std::fmt;
use std::io;
use std::pin::Pin;

/// Stream for the [`empty()`] function.
#[must_use = "streams do nothing unless polled"]
pub struct Empty {
_priv: (),
}

/// Constructs a new handle to an empty reader.
///
/// All reads from the returned reader will return `Poll::Ready(Ok(0))`.
///
/// # Examples
///
/// A slightly sad example of not reading anything into a buffer:
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncReadExt};
///
/// let mut buffer = String::new();
/// io::empty().read_to_string(&mut buffer).await?;
/// assert!(buffer.is_empty());
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn empty() -> Empty {
Empty { _priv: () }
}

impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}

#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}

impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Poll::Ready(Ok(&[]))
}
#[inline]
fn consume(self: Pin<&mut Self>, _: usize) {}
}

impl fmt::Debug for Empty {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Empty { .. }")
}
}
9 changes: 9 additions & 0 deletions futures-util/src/io/mod.rs
Expand Up @@ -41,6 +41,9 @@ pub use self::copy_into::CopyInto;
mod copy_buf_into;
pub use self::copy_buf_into::CopyBufInto;

mod empty;
pub use self::empty::{empty, Empty};

mod flush;
pub use self::flush::Flush;

Expand Down Expand Up @@ -73,9 +76,15 @@ pub use self::read_to_string::ReadToString;
mod read_until;
pub use self::read_until::ReadUntil;

mod repeat;
pub use self::repeat::{repeat, Repeat};

mod seek;
pub use self::seek::Seek;

mod sink;
pub use self::sink::{sink, Sink};

mod split;
pub use self::split::{ReadHalf, WriteHalf};

Expand Down
69 changes: 69 additions & 0 deletions futures-util/src/io/repeat.rs
@@ -0,0 +1,69 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, Initializer, IoSliceMut};
use std::fmt;
use std::io;
use std::pin::Pin;

/// Stream for the [`repeat()`] function.
#[must_use = "streams do nothing unless polled"]
pub struct Repeat {
byte: u8,
}

/// Creates an instance of a reader that infinitely repeats one byte.
///
/// All reads from this reader will succeed by filling the specified buffer with
/// the given byte.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncReadExt};
///
/// let mut buffer = [0; 3];
/// io::repeat(0b101).read_exact(&mut buffer).await.unwrap();
/// assert_eq!(buffer, [0b101, 0b101, 0b101]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn repeat(byte: u8) -> Repeat {
Repeat { byte }
}

impl AsyncRead for Repeat {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
for slot in &mut *buf {
*slot = self.byte;
}
Poll::Ready(Ok(buf.len()))
}

#[inline]
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let mut nwritten = 0;
for buf in bufs {
nwritten += ready!(self.as_mut().poll_read(cx, buf))?;
}
Poll::Ready(Ok(nwritten))
}

#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}

impl fmt::Debug for Repeat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Repeat { .. }")
}
}
66 changes: 66 additions & 0 deletions futures-util/src/io/sink.rs
@@ -0,0 +1,66 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncWrite, IoSlice};
use std::fmt;
use std::io;
use std::pin::Pin;

/// Stream for the [`sink()`] function.
#[must_use = "streams do nothing unless polled"]
pub struct Sink {
_priv: (),
}

/// Creates an instance of a writer which will successfully consume all data.
///
/// All calls to `poll_write` on the returned instance will return `Poll::Ready(Ok(buf.len()))`
/// and the contents of the buffer will not be inspected.
///
/// # Examples
///
/// ```rust
/// # futures::executor::block_on(async {
/// use futures::io::{self, AsyncWriteExt};
///
/// let buffer = vec![1, 2, 3, 5, 8];
/// let num_bytes = io::sink().write(&buffer).await?;
/// assert_eq!(num_bytes, 5);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
pub fn sink() -> Sink {
Sink { _priv: () }
}

impl AsyncWrite for Sink {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(buf.len()))
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(bufs.iter().map(|b| b.len()).sum()))
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl fmt::Debug for Sink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sink { .. }")
}
}
8 changes: 4 additions & 4 deletions futures/src/lib.rs
Expand Up @@ -300,10 +300,10 @@ pub mod io {

pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
BufReader, BufWriter, Chain, Close, CopyInto, CopyBufInto, Flush,
IntoSink, Lines, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
ReadToString, ReadUntil, ReadVectored, Seek, Take, Window, Write,
WriteAll, WriteHalf, WriteVectored,
BufReader, BufWriter, Chain, Close, CopyInto, CopyBufInto, empty, Empty,
Flush, IntoSink, Lines, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
ReadToString, ReadUntil, ReadVectored, repeat, Repeat, Seek, sink, Sink,
Take, Window, Write, WriteAll, WriteHalf, WriteVectored,
};
}

Expand Down

0 comments on commit 7c0aea0

Please sign in to comment.