From 303406cbae0379bd8216fca643fcf329460fe7b1 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 7 Jul 2019 16:53:15 +0900 Subject: [PATCH 1/3] Add AsyncReadExt::read_to_string --- futures-util/src/io/mod.rs | 30 +++++++++++++ futures-util/src/io/read_to_end.rs | 2 +- futures-util/src/io/read_to_string.rs | 62 +++++++++++++++++++++++++++ futures/src/lib.rs | 4 +- futures/tests/io_read_to_string.rs | 46 ++++++++++++++++++++ 5 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 futures-util/src/io/read_to_string.rs create mode 100644 futures/tests/io_read_to_string.rs diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 16272b8f81..337b3fda93 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -60,6 +60,9 @@ pub use self::read_line::ReadLine; mod read_to_end; pub use self::read_to_end::ReadToEnd; +mod read_to_string; +pub use self::read_to_string::ReadToString; + mod read_until; pub use self::read_until::ReadUntil; @@ -241,6 +244,33 @@ pub trait AsyncReadExt: AsyncRead { ReadToEnd::new(self, buf) } + /// Creates a future which will read all the bytes from this `AsyncRead`. + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await)] + /// # futures::executor::block_on(async { + /// use futures::io::AsyncReadExt; + /// use std::io::Cursor; + /// + /// let mut reader = Cursor::new(&b"1234"[..]); + /// let mut buffer = String::with_capacity(4); + /// + /// reader.read_to_string(&mut buffer).await?; + /// + /// assert_eq!(buffer, String::from("1234")); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn read_to_string<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ReadToString<'a, Self> + where Self: Unpin, + { + ReadToString::new(self, buf) + } + /// Helper method for splitting this read/write object into two halves. /// /// The two halves returned implement the `AsyncRead` and `AsyncWrite` diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs index 691e66dd09..afccbcb549 100644 --- a/futures-util/src/io/read_to_end.rs +++ b/futures-util/src/io/read_to_end.rs @@ -38,7 +38,7 @@ impl Drop for Guard<'_> { // // Because we're extending the buffer with uninitialized data for trusted // readers, we need to make sure to truncate that if any of this panics. -fn read_to_end_internal( +pub(super) fn read_to_end_internal( mut rd: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec, diff --git a/futures-util/src/io/read_to_string.rs b/futures-util/src/io/read_to_string.rs new file mode 100644 index 0000000000..625fd4ec56 --- /dev/null +++ b/futures-util/src/io/read_to_string.rs @@ -0,0 +1,62 @@ +use super::read_to_end::read_to_end_internal; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncRead; +use std::pin::Pin; +use std::vec::Vec; +use std::{io, mem, str}; + +/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadToString<'a, R: ?Sized + Unpin> { + reader: &'a mut R, + buf: &'a mut String, + bytes: Vec, +} + +impl Unpin for ReadToString<'_, R> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + Self { + reader, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + } + } +} + +fn read_to_string_internal( + reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut String, + bytes: &mut Vec, +) -> Poll> { + let ret = ready!(read_to_end_internal(reader, cx, bytes)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } +} + +impl Future for ReadToString<'_, A> +where + A: AsyncRead + ?Sized + Unpin, +{ + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf, bytes } = &mut *self; + read_to_string_internal(Pin::new(reader), cx, buf, bytes) + } +} diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 065546eb70..529af6a922 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -299,8 +299,8 @@ pub mod io { pub use futures_util::io::{ AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, BufReader, BufWriter, Close, CopyInto, CopyBufInto, Flush, Lines, Read, - ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadUntil, ReadVectored, Seek, - Window, Write, WriteAll, WriteHalf, WriteVectored, + ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil, + ReadVectored, Seek, Window, Write, WriteAll, WriteHalf, WriteVectored, }; } diff --git a/futures/tests/io_read_to_string.rs b/futures/tests/io_read_to_string.rs new file mode 100644 index 0000000000..b1efb0ea40 --- /dev/null +++ b/futures/tests/io_read_to_string.rs @@ -0,0 +1,46 @@ +use futures::executor::block_on; +use futures::future::{Future, FutureExt}; +use futures::stream::{self, StreamExt, TryStreamExt}; +use futures::io::AsyncReadExt; +use futures::task::Poll; +use futures_test::io::AsyncReadTestExt; +use futures_test::task::noop_context; +use std::io::Cursor; + +#[test] +fn read_to_string() { + let mut c = Cursor::new(&b""[..]); + let mut v = String::new(); + assert!(block_on(c.read_to_string(&mut v)).is_ok()); + assert_eq!(v, ""); + + let mut c = Cursor::new(&b"1"[..]); + let mut v = String::new(); + assert!(block_on(c.read_to_string(&mut v)).is_ok()); + assert_eq!(v, "1"); + + let mut c = Cursor::new(&b"\xff"[..]); + let mut v = String::new(); + assert!(block_on(c.read_to_string(&mut v)).is_err()); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = f.poll_unpin(&mut cx) { + return x; + } + } +} + +#[test] +fn interleave_pending() { + let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]]) + .map(Ok) + .into_async_read() + .interleave_pending(); + + let mut v = String::new(); + assert!(run(buf.read_to_string(&mut v)).is_ok()); + assert_eq!(v, "12333"); +} From a152e6f8c69a807dff37232e1837e70a747b7784 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 7 Jul 2019 17:07:48 +0900 Subject: [PATCH 2/3] Unify the order of the arguments of read_*_internal functions Unify to the same order as public read_* methods. --- futures-util/src/io/lines.rs | 2 +- futures-util/src/io/read_line.rs | 6 +++--- futures-util/src/io/read_until.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/futures-util/src/io/lines.rs b/futures-util/src/io/lines.rs index b2933fcb66..2e1261689b 100644 --- a/futures-util/src/io/lines.rs +++ b/futures-util/src/io/lines.rs @@ -35,7 +35,7 @@ impl Stream for Lines { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { reader, buf, bytes, read } = unsafe { self.get_unchecked_mut() }; let reader = unsafe { Pin::new_unchecked(reader) }; - let n = ready!(read_line_internal(reader, buf, bytes, read, cx))?; + let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?; if n == 0 && buf.is_empty() { return Poll::Ready(None) } diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs index 9517e4e4eb..23e8bb74ce 100644 --- a/futures-util/src/io/read_line.rs +++ b/futures-util/src/io/read_line.rs @@ -32,12 +32,12 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn read_line_internal( reader: Pin<&mut R>, + cx: &mut Context<'_>, buf: &mut String, bytes: &mut Vec, read: &mut usize, - cx: &mut Context<'_>, ) -> Poll> { - let ret = ready!(read_until_internal(reader, b'\n', bytes, read, cx)); + let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read)); if str::from_utf8(&bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8")) @@ -56,6 +56,6 @@ impl Future for ReadLine<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let Self { reader, buf, bytes, read } = &mut *self; - read_line_internal(Pin::new(reader), buf, bytes, read, cx) + read_line_internal(Pin::new(reader), cx, buf, bytes, read) } } diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs index 01e1ad827d..4cb1036670 100644 --- a/futures-util/src/io/read_until.rs +++ b/futures-util/src/io/read_until.rs @@ -25,10 +25,10 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> { pub(super) fn read_until_internal( mut reader: Pin<&mut R>, + cx: &mut Context<'_>, byte: u8, buf: &mut Vec, read: &mut usize, - cx: &mut Context<'_>, ) -> Poll> { loop { let (done, used) = { @@ -54,6 +54,6 @@ impl Future for ReadUntil<'_, R> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let Self { reader, byte, buf, read } = &mut *self; - read_until_internal(Pin::new(reader), *byte, buf, read, cx) + read_until_internal(Pin::new(reader), cx, *byte, buf, read) } } From c98b68b0593967287298c3f75fc402ae9646bebb Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 7 Jul 2019 18:01:17 +0900 Subject: [PATCH 3/3] Change read_to_{end, string} to return the total number of bytes read --- futures-util/src/io/mod.rs | 10 ++++++++-- futures-util/src/io/read_to_end.rs | 17 ++++++++++++----- futures-util/src/io/read_to_string.rs | 14 +++++++++----- futures/tests/io_read_to_string.rs | 6 +++--- 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 337b3fda93..7f7e029516 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -219,6 +219,8 @@ pub trait AsyncReadExt: AsyncRead { /// Creates a future which will read all the bytes from this `AsyncRead`. /// + /// On success the total number of bytes read is returned. + /// /// # Examples /// /// ``` @@ -230,8 +232,9 @@ pub trait AsyncReadExt: AsyncRead { /// let mut reader = Cursor::new([1, 2, 3, 4]); /// let mut output = Vec::with_capacity(4); /// - /// reader.read_to_end(&mut output).await?; + /// let bytes = reader.read_to_end(&mut output).await?; /// + /// assert_eq!(bytes, 4); /// assert_eq!(output, vec![1, 2, 3, 4]); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` @@ -246,6 +249,8 @@ pub trait AsyncReadExt: AsyncRead { /// Creates a future which will read all the bytes from this `AsyncRead`. /// + /// On success the total number of bytes read is returned. + /// /// # Examples /// /// ``` @@ -257,8 +262,9 @@ pub trait AsyncReadExt: AsyncRead { /// let mut reader = Cursor::new(&b"1234"[..]); /// let mut buffer = String::with_capacity(4); /// - /// reader.read_to_string(&mut buffer).await?; + /// let bytes = reader.read_to_string(&mut buffer).await?; /// + /// assert_eq!(bytes, 4); /// assert_eq!(buffer, String::from("1234")); /// # Ok::<(), Box>(()) }).unwrap(); /// ``` diff --git a/futures-util/src/io/read_to_end.rs b/futures-util/src/io/read_to_end.rs index afccbcb549..4e342e0574 100644 --- a/futures-util/src/io/read_to_end.rs +++ b/futures-util/src/io/read_to_end.rs @@ -11,13 +11,19 @@ use std::vec::Vec; pub struct ReadToEnd<'a, R: ?Sized + Unpin> { reader: &'a mut R, buf: &'a mut Vec, + start_len: usize, } impl Unpin for ReadToEnd<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec) -> Self { - ReadToEnd { reader, buf } + let start_len = buf.len(); + Self { + reader, + buf, + start_len, + } } } @@ -42,7 +48,8 @@ pub(super) fn read_to_end_internal( mut rd: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec, -) -> Poll> { + start_len: usize, +) -> Poll> { let mut g = Guard { len: buf.len(), buf }; let ret; loop { @@ -57,7 +64,7 @@ pub(super) fn read_to_end_internal( match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { Ok(0) => { - ret = Poll::Ready(Ok(())); + ret = Poll::Ready(Ok(g.len - start_len)); break; } Ok(n) => g.len += n, @@ -74,10 +81,10 @@ pub(super) fn read_to_end_internal( impl Future for ReadToEnd<'_, A> where A: AsyncRead + ?Sized + Unpin, { - type Output = io::Result<()>; + type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; - read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf) + read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len) } } diff --git a/futures-util/src/io/read_to_string.rs b/futures-util/src/io/read_to_string.rs index 625fd4ec56..93ace40b68 100644 --- a/futures-util/src/io/read_to_string.rs +++ b/futures-util/src/io/read_to_string.rs @@ -13,16 +13,19 @@ pub struct ReadToString<'a, R: ?Sized + Unpin> { reader: &'a mut R, buf: &'a mut String, bytes: Vec, + start_len: usize, } impl Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + let start_len = buf.len(); Self { reader, bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, buf, + start_len, } } } @@ -32,8 +35,9 @@ fn read_to_string_internal( cx: &mut Context<'_>, buf: &mut String, bytes: &mut Vec, -) -> Poll> { - let ret = ready!(read_to_end_internal(reader, cx, bytes)); + start_len: usize, +) -> Poll> { + let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len)); if str::from_utf8(&bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( @@ -53,10 +57,10 @@ impl Future for ReadToString<'_, A> where A: AsyncRead + ?Sized + Unpin, { - type Output = io::Result<()>; + type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf, bytes } = &mut *self; - read_to_string_internal(Pin::new(reader), cx, buf, bytes) + let Self { reader, buf, bytes, start_len } = &mut *self; + read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len) } } diff --git a/futures/tests/io_read_to_string.rs b/futures/tests/io_read_to_string.rs index b1efb0ea40..a31c5609e3 100644 --- a/futures/tests/io_read_to_string.rs +++ b/futures/tests/io_read_to_string.rs @@ -11,12 +11,12 @@ use std::io::Cursor; fn read_to_string() { let mut c = Cursor::new(&b""[..]); let mut v = String::new(); - assert!(block_on(c.read_to_string(&mut v)).is_ok()); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0); assert_eq!(v, ""); let mut c = Cursor::new(&b"1"[..]); let mut v = String::new(); - assert!(block_on(c.read_to_string(&mut v)).is_ok()); + assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1); assert_eq!(v, "1"); let mut c = Cursor::new(&b"\xff"[..]); @@ -41,6 +41,6 @@ fn interleave_pending() { .interleave_pending(); let mut v = String::new(); - assert!(run(buf.read_to_string(&mut v)).is_ok()); + assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5); assert_eq!(v, "12333"); }