Skip to content

Commit

Permalink
Implement AsyncRead for BufWriter and vice versa
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka authored and cramertj committed Jan 21, 2020
1 parent 58c3d98 commit fd62b0b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
32 changes: 30 additions & 2 deletions futures-util/src/io/buf_reader.rs
@@ -1,7 +1,7 @@
use futures_core::task::{Context, Poll};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, IoSliceMut, SeekFrom};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::io::{self, Read};
use std::pin::Pin;
Expand Down Expand Up @@ -34,11 +34,13 @@ pub struct BufReader<R> {
cap: usize,
}

impl<R: AsyncRead> BufReader<R> {
impl<R> BufReader<R> {
unsafe_pinned!(inner: R);
unsafe_unpinned!(pos: usize);
unsafe_unpinned!(cap: usize);
}

impl<R: AsyncRead> BufReader<R> {
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
/// but may change in the future.
pub fn new(inner: R) -> Self {
Expand Down Expand Up @@ -172,6 +174,32 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
}
}

impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner().poll_write(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.inner().poll_write_vectored(cx, bufs)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner().poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner().poll_close(cx)
}
}

impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
Expand Down
45 changes: 43 additions & 2 deletions futures-util/src/io/buf_writer.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom};
#[cfg(feature = "read-initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::fmt;
use std::io::{self, Write};
Expand Down Expand Up @@ -33,10 +35,12 @@ pub struct BufWriter<W> {
written: usize,
}

impl<W: AsyncWrite> BufWriter<W> {
impl<W> BufWriter<W> {
unsafe_pinned!(inner: W);
unsafe_unpinned!(buf: Vec<u8>);
}

impl<W: AsyncWrite> BufWriter<W> {
/// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB,
/// but may change in the future.
pub fn new(inner: W) -> Self {
Expand Down Expand Up @@ -156,6 +160,43 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}

impl<W: AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.inner().poll_read(cx, buf)
}

fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
self.inner().poll_read_vectored(cx, bufs)
}

// we can't skip unconditionally because of the large buffer case in read.
#[cfg(feature = "read-initializer")]
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}

impl<W: AsyncBufRead> AsyncBufRead for BufWriter<W> {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
self.inner().poll_fill_buf(cx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.inner().consume(amt)
}
}

impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufWriter")
Expand Down

0 comments on commit fd62b0b

Please sign in to comment.