Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add back public poll_read_buf() function #3079

Merged
merged 5 commits into from Nov 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions tokio-util/CHANGELOG.md
@@ -1,11 +1,11 @@
### Added
- io: `poll_read_buf` util fn (#2972).

# 0.5.0 (October 30, 2020)

### Changed
- io: update `bytes` to 0.6 (#3071).

### Added
- io: `poll_read_buf` util fn (#2972).

# 0.4.0 (October 15, 2020)

### Added
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/codec/framed_impl.rs
Expand Up @@ -150,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
Expand Down
1 change: 1 addition & 0 deletions tokio-util/src/io/mod.rs
Expand Up @@ -13,3 +13,4 @@ mod stream_reader;
pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use crate::util::poll_read_buf;
2 changes: 1 addition & 1 deletion tokio-util/src/io/read_buf.rs
Expand Up @@ -59,7 +59,7 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
crate::util::poll_read_buf(cx, Pin::new(this.0), this.1)
crate::util::poll_read_buf(Pin::new(this.0), cx, this.1)
}
}
}
2 changes: 1 addition & 1 deletion tokio-util/src/io/reader_stream.rs
Expand Up @@ -83,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}

match poll_read_buf(cx, reader, &mut this.buf) {
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Expand Down
45 changes: 42 additions & 3 deletions tokio-util/src/lib.rs
Expand Up @@ -69,10 +69,49 @@ mod util {
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn poll_read_buf<T: AsyncRead>(
cx: &mut Context<'_>,
/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait.
///
/// [`Buf`]: bytes::Buf
///
/// # Example
///
/// ```
/// use bytes::{Bytes, BytesMut};
/// use tokio::stream;
/// use tokio::io::Result;
/// use tokio_util::io::{StreamReader, poll_read_buf};
/// use futures::future::poll_fn;
/// use std::pin::Pin;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a reader from an iterator. This particular reader will always be
/// // ready.
/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
///
/// let mut buf = BytesMut::new();
/// let mut reads = 0;
///
/// loop {
/// reads += 1;
/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
///
/// if n == 0 {
/// break;
/// }
/// }
///
/// // one or more reads might be necessary.
/// assert!(reads >= 1);
/// assert_eq!(&buf[..], &[0, 1, 2, 3]);
/// # Ok(())
/// # }
/// ```
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
buf: &mut impl BufMut,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
Expand Down