diff --git a/.travis.yml b/.travis.yml index 99b369bfbe..2f02f2cfd6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -128,14 +128,23 @@ matrix: --no-default-features --features alloc - - name: cargo check (futures-util) + - name: cargo check (sub crates) rust: nightly script: - cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml + # futures-io + # Check default-features, all-features + - cargo check --manifest-path futures-io/Cargo.toml + - cargo check --manifest-path futures-io/Cargo.toml --all-features + # Check each features + - cargo check --manifest-path futures-io/Cargo.toml --features bytes,unstable + + # futures-util + # Check default-features, all-features - cargo check --manifest-path futures-util/Cargo.toml - cargo check --manifest-path futures-util/Cargo.toml --all-features - + # Check each features - cargo check --manifest-path futures-util/Cargo.toml --features sink - cargo check --manifest-path futures-util/Cargo.toml --features io - cargo check --manifest-path futures-util/Cargo.toml --features channel @@ -150,6 +159,7 @@ matrix: - cargo check --manifest-path futures-util/Cargo.toml --features sink,bilock,unstable - cargo check --manifest-path futures-util/Cargo.toml --features io,bilock,unstable - cargo check --manifest-path futures-util/Cargo.toml --features sink,io + - cargo check --manifest-path futures-util/Cargo.toml --features bytes,unstable - cargo check --manifest-path futures-util/Cargo.toml --no-default-features - cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink diff --git a/futures-io/Cargo.toml b/futures-io/Cargo.toml index 8b5c4a1ff5..4867088cef 100644 --- a/futures-io/Cargo.toml +++ b/futures-io/Cargo.toml @@ -18,7 +18,13 @@ name = "futures_io" default = ["std"] std = [] +# Unstable features +# `bytes` feature is outside of the normal semver guarantees and require the +# `unstable` feature as an explicit opt-in to unstable API. +unstable = [] + [dependencies] +bytes = { version = "0.4.7", optional = true } [dev-dependencies] futures-preview = { path = "../futures", version = "=0.3.0-alpha.18" } diff --git a/futures-io/src/lib.rs b/futures-io/src/lib.rs index ff2aff9f46..f657ce20e7 100644 --- a/futures-io/src/lib.rs +++ b/futures-io/src/lib.rs @@ -19,6 +19,9 @@ #![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")] +#[cfg(all(feature = "bytes", not(feature = "unstable")))] +compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + #[cfg(feature = "std")] mod if_std { use std::cmp; @@ -40,6 +43,10 @@ mod if_std { SeekFrom as SeekFrom, }; + #[cfg(feature = "bytes")] + #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 + pub use bytes::{Buf, BufMut}; + /// A type used to conditionally initialize buffers passed to `AsyncRead` /// methods, modeled after `std`. #[derive(Debug)] @@ -154,6 +161,42 @@ mod if_std { Poll::Ready(Ok(0)) } } + + /// Pull some bytes from this source into the specified `BufMut`, returning + /// how many bytes were read. + /// + /// The `buf` provided will have bytes read into it and the internal cursor + /// will be advanced if any bytes were read. Note that this method typically + /// will not reallocate the buffer provided. + #[cfg(feature = "bytes")] + fn poll_read_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> + where + Self: Sized, + { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + unsafe { + let n = { + let mut b = buf.bytes_mut(); + + self.initializer().initialize(&mut b); + + match self.poll_read(cx, b)? { + Poll::Ready(n) => n, + Poll::Pending => return Poll::Pending, + } + }; + + buf.advance_mut(n); + Poll::Ready(Ok(n)) + } + } } /// Write bytes asynchronously. @@ -250,6 +293,31 @@ mod if_std { /// `Poll::Pending` and either internally retry or convert /// `Interrupted` into another error kind. fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Write a `Buf` into this value, returning how many bytes were written. + /// + /// Note that this method will advance the `buf` provided automatically by + /// the number of bytes written. + #[cfg(feature = "bytes")] + fn poll_write_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut B, + ) -> Poll> + where + Self: Sized, + { + if !buf.has_remaining() { + return Poll::Ready(Ok(0)); + } + + let n = match self.poll_write(cx, buf.bytes())? { + Poll::Ready(n) => n, + Poll::Pending => return Poll::Pending, + }; + buf.advance(n); + Poll::Ready(Ok(n)) + } } /// Seek bytes asynchronously. diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 360dd3ec3f..3a0c65ba1b 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack" unstable = ["futures-core-preview/unstable"] cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"] bilock = [] +bytes = ["io", "futures-io-preview/bytes", "futures-io-preview/unstable"] [dependencies] futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false } diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index 3f2e6ba271..35d9c175ad 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -13,6 +13,8 @@ pub use futures_io::{ AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom, }; +#[cfg(feature = "bytes")] +pub use futures_io::{Buf, BufMut}; #[cfg(feature = "io-compat")] use crate::compat::Compat; @@ -61,6 +63,11 @@ pub use self::read::Read; mod read_vectored; pub use self::read_vectored::ReadVectored; +#[cfg(feature = "bytes")] +mod read_buf; +#[cfg(feature = "bytes")] +pub use self::read_buf::ReadBuf; + mod read_exact; pub use self::read_exact::ReadExact; @@ -100,6 +107,11 @@ pub use self::write::Write; mod write_vectored; pub use self::write_vectored::WriteVectored; +#[cfg(feature = "bytes")] +mod write_buf; +#[cfg(feature = "bytes")] +pub use self::write_buf::WriteBuf; + mod write_all; pub use self::write_all::WriteAll; @@ -213,6 +225,17 @@ pub trait AsyncReadExt: AsyncRead { ReadVectored::new(self, bufs) } + /// Creates a future which will read from the `AsyncRead` into `buf`. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + #[cfg(feature = "bytes")] + fn read_buf<'a, B: BufMut>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> + where Self: Unpin, + { + ReadBuf::new(self, buf) + } + /// Creates a future which will read exactly enough bytes to fill `buf`, /// returning an error if end of file (EOF) is hit sooner. /// @@ -455,6 +478,17 @@ pub trait AsyncWriteExt: AsyncWrite { WriteVectored::new(self, bufs) } + /// Creates a future which will write bytes from `buf` into the object. + /// + /// The returned future will resolve to the number of bytes written once the write + /// operation is completed. + #[cfg(feature = "bytes")] + fn write_buf<'a, B: Buf>(&'a mut self, buf: &'a mut B) -> WriteBuf<'a, Self, B> + where Self: Unpin, + { + WriteBuf::new(self, buf) + } + /// Write data into this object. /// /// Creates a future that will write the entire contents of the buffer `buf` into diff --git a/futures-util/src/io/read_buf.rs b/futures-util/src/io/read_buf.rs new file mode 100644 index 0000000000..c7fda8a66b --- /dev/null +++ b/futures-util/src/io/read_buf.rs @@ -0,0 +1,30 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncRead, BufMut}; +use std::io; +use std::pin::Pin; + +/// Future for the [`read_buf`](super::AsyncReadExt::read_buf) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ReadBuf<'a, R: ?Sized + Unpin, B> { + reader: &'a mut R, + buf: &'a mut B, +} + +impl Unpin for ReadBuf<'_, R, B> {} + +impl<'a, R: AsyncRead + ?Sized + Unpin, B: BufMut> ReadBuf<'a, R, B> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut B) -> Self { + Self { reader, buf } + } +} + +impl Future for ReadBuf<'_, R, B> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + Pin::new(&mut this.reader).poll_read_buf(cx, this.buf) + } +} diff --git a/futures-util/src/io/write_buf.rs b/futures-util/src/io/write_buf.rs new file mode 100644 index 0000000000..1653fb99d5 --- /dev/null +++ b/futures-util/src/io/write_buf.rs @@ -0,0 +1,30 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncWrite, Buf}; +use std::io; +use std::pin::Pin; + +/// Future for the [`write_buf`](super::AsyncWriteExt::write_buf) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct WriteBuf<'a, W: ?Sized + Unpin, B> { + writer: &'a mut W, + buf: &'a mut B, +} + +impl Unpin for WriteBuf<'_, W, B> {} + +impl<'a, W: AsyncWrite + ?Sized + Unpin, B: Buf> WriteBuf<'a, W, B> { + pub(super) fn new(writer: &'a mut W, buf: &'a mut B) -> Self { + Self { writer, buf } + } +} + +impl Future for WriteBuf<'_, W, B> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + Pin::new(&mut this.writer).poll_write_buf(cx, this.buf) + } +} diff --git a/futures-util/src/lib.rs b/futures-util/src/lib.rs index ef216e650e..aec34b6f4d 100644 --- a/futures-util/src/lib.rs +++ b/futures-util/src/lib.rs @@ -19,6 +19,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); +#[cfg(all(feature = "bytes", not(feature = "unstable")))] +compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + #[cfg(feature = "alloc")] extern crate alloc; diff --git a/futures/Cargo.toml b/futures/Cargo.toml index e17f4833d1..2231a6ef75 100644 --- a/futures/Cargo.toml +++ b/futures/Cargo.toml @@ -34,6 +34,7 @@ pin-utils = "0.1.0-alpha.4" futures-test-preview = { path = "../futures-test", version = "=0.3.0-alpha.18" } tokio = "0.1.11" assert_matches = "1.3.0" +bytes_crate = { version = "0.4.7", package = "bytes" } [features] default = ["std"] @@ -46,9 +47,10 @@ io-compat = ["compat", "futures-util-preview/io-compat"] # Unstable features # These features are outside of the normal semver guarantees and require the # `unstable` feature as an explicit opt-in to unstable API. -unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-util-preview/unstable"] +unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-io-preview/unstable", "futures-util-preview/unstable"] cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic", "futures-channel-preview/cfg-target-has-atomic", "futures-util-preview/cfg-target-has-atomic"] bilock = ["futures-util-preview/bilock"] +bytes = ["futures-io-preview/bytes", "futures-util-preview/bytes"] [package.metadata.docs.rs] all-features = true diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 3cfef84425..aee351cf2d 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -40,6 +40,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat #[cfg(all(feature = "bilock", not(feature = "unstable")))] compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features"); +#[cfg(all(feature = "bytes", not(feature = "unstable")))] +compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features"); + #[doc(hidden)] pub use futures_core::core_reexport; #[doc(hidden)] pub use futures_core::future::Future; @@ -308,6 +311,12 @@ pub mod io { ReadToString, ReadUntil, ReadVectored, repeat, Repeat, Seek, sink, Sink, Take, Window, Write, WriteAll, WriteHalf, WriteVectored, }; + + #[cfg(feature = "bytes")] + pub use futures_io::{Buf, BufMut}; + + #[cfg(feature = "bytes")] + pub use futures_util::io::{ReadBuf, WriteBuf}; } #[cfg_attr( diff --git a/futures/tests/io_bytes.rs b/futures/tests/io_bytes.rs new file mode 100644 index 0000000000..60fa19d630 --- /dev/null +++ b/futures/tests/io_bytes.rs @@ -0,0 +1,102 @@ +#![cfg(feature = "bytes")] + +use bytes_crate::BytesMut; +use futures::executor::block_on; +use futures::io::{AsyncRead, AsyncReadExt, BufMut, Initializer}; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[test] +fn read_buf() { + let mut buf = BytesMut::with_capacity(65); + + let mut reader = &b"hello world"[..]; + + let n = block_on(reader.read_buf(&mut buf)).unwrap(); + assert_eq!(11, n); + assert_eq!(buf[..], b"hello world"[..]); +} + +#[test] +fn read_buf_no_capacity() { + struct Reader; + + impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll> { + unimplemented!(); + } + } + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + + buf.put(&[0; 64][..]); + + let mut reader = Reader; + + let n = block_on(reader.read_buf(&mut buf)).unwrap(); + assert_eq!(0, n); +} + +#[test] +fn read_buf_no_uninitialized() { + struct Reader; + + impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + for b in buf { + assert_eq!(0, *b); + } + + Poll::Ready(Ok(0)) + } + } + + let mut buf = BytesMut::with_capacity(64); + + let mut reader = Reader; + + let n = block_on(reader.read_buf(&mut buf)).unwrap(); + assert_eq!(0, n); +} + +#[test] +fn read_buf_uninitialized_ok() { + struct Reader; + + impl AsyncRead for Reader { + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } + + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + assert_eq!(buf[0..11], b"hello world"[..]); + Poll::Ready(Ok(0)) + } + } + + // Can't create BytesMut w/ zero capacity, so fill it up + let mut buf = BytesMut::with_capacity(64); + + unsafe { + buf.bytes_mut()[0..11].copy_from_slice(b"hello world"); + } + + let mut reader = Reader; + + let n = block_on(reader.read_buf(&mut buf)).unwrap(); + assert_eq!(0, n); +}