Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncReadExt::read_to_string #1721

Merged
merged 3 commits into from Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion futures-util/src/io/lines.rs
Expand Up @@ -35,7 +35,7 @@ impl<R: AsyncBufRead> Stream for Lines<R> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self { reader, buf, bytes, read } = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = ready!(read_line_internal(reader, buf, bytes, read, cx))?;
let n = ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None)
}
Expand Down
38 changes: 37 additions & 1 deletion futures-util/src/io/mod.rs
Expand Up @@ -60,6 +60,9 @@ pub use self::read_line::ReadLine;
mod read_to_end;
pub use self::read_to_end::ReadToEnd;

mod read_to_string;
pub use self::read_to_string::ReadToString;

mod read_until;
pub use self::read_until::ReadUntil;

Expand Down Expand Up @@ -216,6 +219,8 @@ pub trait AsyncReadExt: AsyncRead {

/// Creates a future which will read all the bytes from this `AsyncRead`.
///
/// On success the total number of bytes read is returned.
///
/// # Examples
///
/// ```
Expand All @@ -227,8 +232,9 @@ pub trait AsyncReadExt: AsyncRead {
/// let mut reader = Cursor::new([1, 2, 3, 4]);
/// let mut output = Vec::with_capacity(4);
///
/// reader.read_to_end(&mut output).await?;
/// let bytes = reader.read_to_end(&mut output).await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(output, vec![1, 2, 3, 4]);
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
Expand All @@ -241,6 +247,36 @@ pub trait AsyncReadExt: AsyncRead {
ReadToEnd::new(self, buf)
}

/// Creates a future which will read all the bytes from this `AsyncRead`.
///
/// On success the total number of bytes read is returned.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncReadExt;
/// use std::io::Cursor;
///
/// let mut reader = Cursor::new(&b"1234"[..]);
/// let mut buffer = String::with_capacity(4);
///
/// let bytes = reader.read_to_string(&mut buffer).await?;
///
/// assert_eq!(bytes, 4);
/// assert_eq!(buffer, String::from("1234"));
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ReadToString<'a, Self>
where Self: Unpin,
{
ReadToString::new(self, buf)
}

/// Helper method for splitting this read/write object into two halves.
///
/// The two halves returned implement the `AsyncRead` and `AsyncWrite`
Expand Down
6 changes: 3 additions & 3 deletions futures-util/src/io/read_line.rs
Expand Up @@ -32,12 +32,12 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {

pub(super) fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
let ret = ready!(read_until_internal(reader, b'\n', bytes, read, cx));
let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(io::ErrorKind::InvalidData, "stream did not contain valid UTF-8"))
Expand All @@ -56,6 +56,6 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, bytes, read } = &mut *self;
read_line_internal(Pin::new(reader), buf, bytes, read, cx)
read_line_internal(Pin::new(reader), cx, buf, bytes, read)
}
}
19 changes: 13 additions & 6 deletions futures-util/src/io/read_to_end.rs
Expand Up @@ -11,13 +11,19 @@ use std::vec::Vec;
pub struct ReadToEnd<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
buf: &'a mut Vec<u8>,
start_len: usize,
}

impl<R: ?Sized + Unpin> Unpin for ReadToEnd<'_, R> {}

impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToEnd<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut Vec<u8>) -> Self {
ReadToEnd { reader, buf }
let start_len = buf.len();
Self {
reader,
buf,
start_len,
}
}
}

Expand All @@ -38,11 +44,12 @@ impl Drop for Guard<'_> {
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + ?Sized>(
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
) -> Poll<io::Result<()>> {
start_len: usize,
) -> Poll<io::Result<usize>> {
let mut g = Guard { len: buf.len(), buf };
let ret;
loop {
Expand All @@ -57,7 +64,7 @@ fn read_to_end_internal<R: AsyncRead + ?Sized>(

match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(()));
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Expand All @@ -74,10 +81,10 @@ fn read_to_end_internal<R: AsyncRead + ?Sized>(
impl<A> Future for ReadToEnd<'_, A>
where A: AsyncRead + ?Sized + Unpin,
{
type Output = io::Result<()>;
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf)
read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
}
}
66 changes: 66 additions & 0 deletions futures-util/src/io/read_to_string.rs
@@ -0,0 +1,66 @@
use super::read_to_end::read_to_end_internal;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use std::pin::Pin;
use std::vec::Vec;
use std::{io, mem, str};

/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToString<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
buf: &'a mut String,
bytes: Vec<u8>,
start_len: usize,
}

impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}

impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
let start_len = buf.len();
Self {
reader,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
start_len,
}
}
}

fn read_to_string_internal<R: AsyncRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
start_len: usize,
) -> Poll<io::Result<usize>> {
let ret = ready!(read_to_end_internal(reader, cx, bytes, start_len));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}

impl<A> Future for ReadToString<'_, A>
where
A: AsyncRead + ?Sized + Unpin,
{
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, bytes, start_len } = &mut *self;
read_to_string_internal(Pin::new(reader), cx, buf, bytes, *start_len)
}
}
4 changes: 2 additions & 2 deletions futures-util/src/io/read_until.rs
Expand Up @@ -25,10 +25,10 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {

pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
byte: u8,
buf: &mut Vec<u8>,
read: &mut usize,
cx: &mut Context<'_>,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
Expand All @@ -54,6 +54,6 @@ impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadUntil<'_, R> {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, byte, buf, read } = &mut *self;
read_until_internal(Pin::new(reader), *byte, buf, read, cx)
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Expand Up @@ -299,8 +299,8 @@ pub mod io {
pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
BufReader, BufWriter, Close, CopyInto, CopyBufInto, Flush, Lines, Read,
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadUntil, ReadVectored, Seek,
Window, Write, WriteAll, WriteHalf, WriteVectored,
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil,
ReadVectored, Seek, Window, Write, WriteAll, WriteHalf, WriteVectored,
};
}

Expand Down
46 changes: 46 additions & 0 deletions futures/tests/io_read_to_string.rs
@@ -0,0 +1,46 @@
use futures::executor::block_on;
use futures::future::{Future, FutureExt};
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::io::AsyncReadExt;
use futures::task::Poll;
use futures_test::io::AsyncReadTestExt;
use futures_test::task::noop_context;
use std::io::Cursor;

#[test]
fn read_to_string() {
let mut c = Cursor::new(&b""[..]);
let mut v = String::new();
assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 0);
assert_eq!(v, "");

let mut c = Cursor::new(&b"1"[..]);
let mut v = String::new();
assert_eq!(block_on(c.read_to_string(&mut v)).unwrap(), 1);
assert_eq!(v, "1");

let mut c = Cursor::new(&b"\xff"[..]);
let mut v = String::new();
assert!(block_on(c.read_to_string(&mut v)).is_err());
}

fn run<F: Future + Unpin>(mut f: F) -> F::Output {
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
return x;
}
}
}

#[test]
fn interleave_pending() {
let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
.map(Ok)
.into_async_read()
.interleave_pending();

let mut v = String::new();
assert_eq!(run(buf.read_to_string(&mut v)).unwrap(), 5);
assert_eq!(v, "12333");
}