Skip to content

Commit

Permalink
Add AsyncReadExt::read_to_string
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jul 7, 2019
1 parent b9ba5d4 commit 303406c
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 3 deletions.
30 changes: 30 additions & 0 deletions futures-util/src/io/mod.rs
Expand Up @@ -60,6 +60,9 @@ pub use self::read_line::ReadLine;
mod read_to_end;
pub use self::read_to_end::ReadToEnd;

mod read_to_string;
pub use self::read_to_string::ReadToString;

mod read_until;
pub use self::read_until::ReadUntil;

Expand Down Expand Up @@ -241,6 +244,33 @@ pub trait AsyncReadExt: AsyncRead {
ReadToEnd::new(self, buf)
}

/// Creates a future which will read all the bytes from this `AsyncRead`.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncReadExt;
/// use std::io::Cursor;
///
/// let mut reader = Cursor::new(&b"1234"[..]);
/// let mut buffer = String::with_capacity(4);
///
/// reader.read_to_string(&mut buffer).await?;
///
/// assert_eq!(buffer, String::from("1234"));
/// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
/// ```
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ReadToString<'a, Self>
where Self: Unpin,
{
ReadToString::new(self, buf)
}

/// Helper method for splitting this read/write object into two halves.
///
/// The two halves returned implement the `AsyncRead` and `AsyncWrite`
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/read_to_end.rs
Expand Up @@ -38,7 +38,7 @@ impl Drop for Guard<'_> {
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
fn read_to_end_internal<R: AsyncRead + ?Sized>(
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
Expand Down
62 changes: 62 additions & 0 deletions futures-util/src/io/read_to_string.rs
@@ -0,0 +1,62 @@
use super::read_to_end::read_to_end_internal;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncRead;
use std::pin::Pin;
use std::vec::Vec;
use std::{io, mem, str};

/// Future for the [`read_to_string`](super::AsyncReadExt::read_to_string) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadToString<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
buf: &'a mut String,
bytes: Vec<u8>,
}

impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}

impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
Self {
reader,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
}
}
}

fn read_to_string_internal<R: AsyncRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
) -> Poll<io::Result<()>> {
let ret = ready!(read_to_end_internal(reader, cx, bytes));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}

impl<A> Future for ReadToString<'_, A>
where
A: AsyncRead + ?Sized + Unpin,
{
type Output = io::Result<()>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, bytes } = &mut *self;
read_to_string_internal(Pin::new(reader), cx, buf, bytes)
}
}
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Expand Up @@ -299,8 +299,8 @@ pub mod io {
pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
BufReader, BufWriter, Close, CopyInto, CopyBufInto, Flush, Lines, Read,
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadUntil, ReadVectored, Seek,
Window, Write, WriteAll, WriteHalf, WriteVectored,
ReadExact, ReadHalf, ReadLine, ReadToEnd, ReadToString, ReadUntil,
ReadVectored, Seek, Window, Write, WriteAll, WriteHalf, WriteVectored,
};
}

Expand Down
46 changes: 46 additions & 0 deletions futures/tests/io_read_to_string.rs
@@ -0,0 +1,46 @@
use futures::executor::block_on;
use futures::future::{Future, FutureExt};
use futures::stream::{self, StreamExt, TryStreamExt};
use futures::io::AsyncReadExt;
use futures::task::Poll;
use futures_test::io::AsyncReadTestExt;
use futures_test::task::noop_context;
use std::io::Cursor;

#[test]
fn read_to_string() {
let mut c = Cursor::new(&b""[..]);
let mut v = String::new();
assert!(block_on(c.read_to_string(&mut v)).is_ok());
assert_eq!(v, "");

let mut c = Cursor::new(&b"1"[..]);
let mut v = String::new();
assert!(block_on(c.read_to_string(&mut v)).is_ok());
assert_eq!(v, "1");

let mut c = Cursor::new(&b"\xff"[..]);
let mut v = String::new();
assert!(block_on(c.read_to_string(&mut v)).is_err());
}

fn run<F: Future + Unpin>(mut f: F) -> F::Output {
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = f.poll_unpin(&mut cx) {
return x;
}
}
}

#[test]
fn interleave_pending() {
let mut buf = stream::iter(vec![&b"12"[..], &b"33"[..], &b"3"[..]])
.map(Ok)
.into_async_read()
.interleave_pending();

let mut v = String::new();
assert!(run(buf.read_to_string(&mut v)).is_ok());
assert_eq!(v, "12333");
}

0 comments on commit 303406c

Please sign in to comment.