diff --git a/tokio-util/CHANGELOG.md b/tokio-util/CHANGELOG.md index 64228895200..48dfe778e26 100644 --- a/tokio-util/CHANGELOG.md +++ b/tokio-util/CHANGELOG.md @@ -1,11 +1,11 @@ +### Added +- io: `poll_read_buf` util fn (#2972). + # 0.5.0 (October 30, 2020) ### Changed - io: update `bytes` to 0.6 (#3071). -### Added -- io: `poll_read_buf` util fn (#2972). - # 0.4.0 (October 15, 2020) ### Added diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index c161808f66e..e8b29999e10 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -150,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { + let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index 6f181ab1703..eefd65a5e83 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -13,3 +13,4 @@ mod stream_reader; pub use self::read_buf::read_buf; pub use self::reader_stream::ReaderStream; pub use self::stream_reader::StreamReader; +pub use crate::util::poll_read_buf; diff --git a/tokio-util/src/io/read_buf.rs b/tokio-util/src/io/read_buf.rs index 5bc0d5866f4..cc3c505f934 100644 --- a/tokio-util/src/io/read_buf.rs +++ b/tokio-util/src/io/read_buf.rs @@ -59,7 +59,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - crate::util::poll_read_buf(cx, Pin::new(this.0), this.1) + crate::util::poll_read_buf(Pin::new(this.0), cx, this.1) } } } diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index ab0c22fba73..3e6a05eff4d 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -83,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match poll_read_buf(cx, reader, &mut this.buf) { + match poll_read_buf(reader, cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 253f4437aac..09dd5a1071c 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -69,10 +69,49 @@ mod util { use std::pin::Pin; use std::task::{Context, Poll}; - pub(crate) fn poll_read_buf( - cx: &mut Context<'_>, + /// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait. + /// + /// [`Buf`]: bytes::Buf + /// + /// # Example + /// + /// ``` + /// use bytes::{Bytes, BytesMut}; + /// use tokio::stream; + /// use tokio::io::Result; + /// use tokio_util::io::{StreamReader, poll_read_buf}; + /// use futures::future::poll_fn; + /// use std::pin::Pin; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a reader from an iterator. This particular reader will always be + /// // ready. + /// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))])); + /// + /// let mut buf = BytesMut::new(); + /// let mut reads = 0; + /// + /// loop { + /// reads += 1; + /// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?; + /// + /// if n == 0 { + /// break; + /// } + /// } + /// + /// // one or more reads might be necessary. + /// assert!(reads >= 1); + /// assert_eq!(&buf[..], &[0, 1, 2, 3]); + /// # Ok(()) + /// # } + /// ``` + #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] + pub fn poll_read_buf( io: Pin<&mut T>, - buf: &mut impl BufMut, + cx: &mut Context<'_>, + buf: &mut B, ) -> Poll> { if !buf.has_remaining_mut() { return Poll::Ready(Ok(0));