From ef7390bc91aae5c7e6a6fa26f1a2188afee20896 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 16 Jan 2020 16:16:42 +0100 Subject: [PATCH] Implement AsyncRead for BufWriter and vice versa --- futures-util/src/io/buf_reader.rs | 32 ++++++++++++++++++++-- futures-util/src/io/buf_writer.rs | 45 +++++++++++++++++++++++++++++-- 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs index d4d54ae3eb..96d3f2815e 100644 --- a/futures-util/src/io/buf_reader.rs +++ b/futures-util/src/io/buf_reader.rs @@ -1,7 +1,7 @@ use futures_core::task::{Context, Poll}; #[cfg(feature = "read-initializer")] use futures_io::Initializer; -use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, IoSliceMut, SeekFrom}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::io::{self, Read}; use std::pin::Pin; @@ -34,11 +34,13 @@ pub struct BufReader { cap: usize, } -impl BufReader { +impl BufReader { unsafe_pinned!(inner: R); unsafe_unpinned!(pos: usize); unsafe_unpinned!(cap: usize); +} +impl BufReader { /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. pub fn new(inner: R) -> Self { @@ -172,6 +174,32 @@ impl AsyncBufRead for BufReader { } } +impl AsyncWrite for BufReader { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.inner().poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + self.inner().poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner().poll_close(cx) + } +} + impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") diff --git a/futures-util/src/io/buf_writer.rs b/futures-util/src/io/buf_writer.rs index 535b0a4a5f..b0afbd81f2 100644 --- a/futures-util/src/io/buf_writer.rs +++ b/futures-util/src/io/buf_writer.rs @@ -1,5 +1,7 @@ use futures_core::task::{Context, Poll}; -use futures_io::{AsyncSeek, AsyncWrite, IoSlice, SeekFrom}; +#[cfg(feature = "read-initializer")] +use futures_io::Initializer; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; use std::fmt; use std::io::{self, Write}; @@ -33,10 +35,12 @@ pub struct BufWriter { written: usize, } -impl BufWriter { +impl BufWriter { unsafe_pinned!(inner: W); unsafe_unpinned!(buf: Vec); +} +impl BufWriter { /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, /// but may change in the future. pub fn new(inner: W) -> Self { @@ -156,6 +160,43 @@ impl AsyncWrite for BufWriter { } } +impl AsyncRead for BufWriter { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + self.inner().poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.inner().poll_read_vectored(cx, bufs) + } + + // we can't skip unconditionally because of the large buffer case in read. + #[cfg(feature = "read-initializer")] + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl AsyncBufRead for BufWriter { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.inner().poll_fill_buf(cx) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.inner().consume(amt) + } +} + impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufWriter")