Skip to content

Commit

Permalink
Add AsyncRead::poll_read_buf and AsyncWrite::poll_write_buf
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 24, 2019
1 parent 775e267 commit df29530
Show file tree
Hide file tree
Showing 11 changed files with 299 additions and 4 deletions.
16 changes: 13 additions & 3 deletions .travis.yml
Expand Up @@ -128,14 +128,23 @@ matrix:
--no-default-features
--features alloc

- name: cargo check (futures-util)
- name: cargo check (sub crates)
rust: nightly
script:
- cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml

# futures-io
# Check default-features, all-features
- cargo check --manifest-path futures-io/Cargo.toml
- cargo check --manifest-path futures-io/Cargo.toml --all-features
# Check each features
- cargo check --manifest-path futures-io/Cargo.toml --features bytes,unstable

# futures-util
# Check default-features, all-features
- cargo check --manifest-path futures-util/Cargo.toml
- cargo check --manifest-path futures-util/Cargo.toml --all-features

# Check each features
- cargo check --manifest-path futures-util/Cargo.toml --features sink
- cargo check --manifest-path futures-util/Cargo.toml --features io
- cargo check --manifest-path futures-util/Cargo.toml --features channel
Expand All @@ -146,7 +155,8 @@ matrix:
- cargo check --manifest-path futures-util/Cargo.toml --features io-compat
- cargo check --manifest-path futures-util/Cargo.toml --features sink,compat
- cargo check --manifest-path futures-util/Cargo.toml --features sink,channel

- cargo check --manifest-path futures-util/Cargo.toml --features bytes,unstable
# Check no-default-features
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features alloc,sink
Expand Down
6 changes: 6 additions & 0 deletions futures-io/Cargo.toml
Expand Up @@ -18,7 +18,13 @@ name = "futures_io"
default = ["std"]
std = []

# Unstable features
# `bytes` feature is outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = []

[dependencies]
bytes = { version = "0.4.7", optional = true }

[dev-dependencies]
futures-preview = { path = "../futures", version = "=0.3.0-alpha.18" }
68 changes: 68 additions & 0 deletions futures-io/src/lib.rs
Expand Up @@ -19,6 +19,9 @@

#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")]

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "std")]
mod if_std {
use std::cmp;
Expand All @@ -40,6 +43,10 @@ mod if_std {
SeekFrom as SeekFrom,
};

#[cfg(feature = "bytes")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use bytes::{Buf, BufMut};

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods, modeled after `std`.
#[derive(Debug)]
Expand Down Expand Up @@ -154,6 +161,42 @@ mod if_std {
Poll::Ready(Ok(0))
}
}

/// Pull some bytes from this source into the specified `BufMut`, returning
/// how many bytes were read.
///
/// The `buf` provided will have bytes read into it and the internal cursor
/// will be advanced if any bytes were read. Note that this method typically
/// will not reallocate the buffer provided.
#[cfg(feature = "bytes")]
fn poll_read_buf<B: BufMut>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

unsafe {
let n = {
let mut b = buf.bytes_mut();

self.initializer().initialize(&mut b);

match self.poll_read(cx, b)? {
Poll::Ready(n) => n,
Poll::Pending => return Poll::Pending,
}
};

buf.advance_mut(n);
Poll::Ready(Ok(n))
}
}
}

/// Write bytes asynchronously.
Expand Down Expand Up @@ -250,6 +293,31 @@ mod if_std {
/// `Poll::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>>;

/// Write a `Buf` into this value, returning how many bytes were written.
///
/// Note that this method will advance the `buf` provided automatically by
/// the number of bytes written.
#[cfg(feature = "bytes")]
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}

let n = match self.poll_write(cx, buf.bytes())? {
Poll::Ready(n) => n,
Poll::Pending => return Poll::Pending,
};
buf.advance(n);
Poll::Ready(Ok(n))
}
}

/// Seek bytes asynchronously.
Expand Down
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack"
unstable = ["futures-core-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"]
bench = []
bytes = ["io", "futures-io-preview/bytes", "futures-io-preview/unstable"]

[dependencies]
futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false }
Expand Down
34 changes: 34 additions & 0 deletions futures-util/src/io/mod.rs
Expand Up @@ -13,6 +13,8 @@ pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
IoSlice, IoSliceMut, Result, SeekFrom,
};
#[cfg(feature = "bytes")]
pub use futures_io::{Buf, BufMut};

#[cfg(feature = "io-compat")] use crate::compat::Compat;

Expand Down Expand Up @@ -58,6 +60,11 @@ pub use self::read::Read;
mod read_vectored;
pub use self::read_vectored::ReadVectored;

#[cfg(feature = "bytes")]
mod read_buf;
#[cfg(feature = "bytes")]
pub use self::read_buf::ReadBuf;

mod read_exact;
pub use self::read_exact::ReadExact;

Expand Down Expand Up @@ -91,6 +98,11 @@ pub use self::write::Write;
mod write_vectored;
pub use self::write_vectored::WriteVectored;

#[cfg(feature = "bytes")]
mod write_buf;
#[cfg(feature = "bytes")]
pub use self::write_buf::WriteBuf;

mod write_all;
pub use self::write_all::WriteAll;

Expand Down Expand Up @@ -204,6 +216,17 @@ pub trait AsyncReadExt: AsyncRead {
ReadVectored::new(self, bufs)
}

/// Creates a future which will read from the `AsyncRead` into `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
#[cfg(feature = "bytes")]
fn read_buf<'a, B: BufMut>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where Self: Unpin,
{
ReadBuf::new(self, buf)
}

/// Creates a future which will read exactly enough bytes to fill `buf`,
/// returning an error if end of file (EOF) is hit sooner.
///
Expand Down Expand Up @@ -446,6 +469,17 @@ pub trait AsyncWriteExt: AsyncWrite {
WriteVectored::new(self, bufs)
}

/// Creates a future which will write bytes from `buf` into the object.
///
/// The returned future will resolve to the number of bytes written once the write
/// operation is completed.
#[cfg(feature = "bytes")]
fn write_buf<'a, B: Buf>(&'a mut self, buf: &'a mut B) -> WriteBuf<'a, Self, B>
where Self: Unpin,
{
WriteBuf::new(self, buf)
}

/// Write data into this object.
///
/// Creates a future that will write the entire contents of the buffer `buf` into
Expand Down
30 changes: 30 additions & 0 deletions futures-util/src/io/read_buf.rs
@@ -0,0 +1,30 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, BufMut};
use std::io;
use std::pin::Pin;

/// Future for the [`read_buf`](super::AsyncReadExt::read_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ReadBuf<'a, R: ?Sized + Unpin, B> {
reader: &'a mut R,
buf: &'a mut B,
}

impl<R: ?Sized + Unpin, B> Unpin for ReadBuf<'_, R, B> {}

impl<'a, R: AsyncRead + ?Sized + Unpin, B: BufMut> ReadBuf<'a, R, B> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut B) -> Self {
Self { reader, buf }
}
}

impl<R: AsyncRead + ?Sized + Unpin, B: BufMut> Future for ReadBuf<'_, R, B> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Pin::new(&mut this.reader).poll_read_buf(cx, this.buf)
}
}
30 changes: 30 additions & 0 deletions futures-util/src/io/write_buf.rs
@@ -0,0 +1,30 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncWrite, Buf};
use std::io;
use std::pin::Pin;

/// Future for the [`write_buf`](super::AsyncWriteExt::write_buf) method.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct WriteBuf<'a, W: ?Sized + Unpin, B> {
writer: &'a mut W,
buf: &'a mut B,
}

impl<W: ?Sized + Unpin, B> Unpin for WriteBuf<'_, W, B> {}

impl<'a, W: AsyncWrite + ?Sized + Unpin, B: Buf> WriteBuf<'a, W, B> {
pub(super) fn new(writer: &'a mut W, buf: &'a mut B) -> Self {
Self { writer, buf }
}
}

impl<W: AsyncWrite + ?Sized + Unpin, B: Buf> Future for WriteBuf<'_, W, B> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
Pin::new(&mut this.writer).poll_write_buf(cx, this.buf)
}
}
3 changes: 3 additions & 0 deletions futures-util/src/lib.rs
Expand Up @@ -19,6 +19,9 @@ compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feat
#[cfg(all(feature = "bench", not(feature = "unstable")))]
compile_error!("The `bench` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "alloc")]
extern crate alloc;

Expand Down
4 changes: 3 additions & 1 deletion futures/Cargo.toml
Expand Up @@ -34,6 +34,7 @@ pin-utils = "0.1.0-alpha.4"
futures-test-preview = { path = "../futures-test", version = "=0.3.0-alpha.18" }
tokio = "0.1.11"
assert_matches = "1.3.0"
bytes_crate = { version = "0.4.7", package = "bytes" }

[features]
default = ["std"]
Expand All @@ -46,8 +47,9 @@ io-compat = ["compat", "futures-util-preview/io-compat"]
# Unstable features
# These features are outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-util-preview/unstable"]
unstable = ["futures-core-preview/unstable", "futures-channel-preview/unstable", "futures-io-preview/unstable", "futures-util-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic", "futures-channel-preview/cfg-target-has-atomic", "futures-util-preview/cfg-target-has-atomic"]
bytes = ["futures-io-preview/bytes", "futures-util-preview/bytes"]

[package.metadata.docs.rs]
all-features = true
9 changes: 9 additions & 0 deletions futures/src/lib.rs
Expand Up @@ -37,6 +37,9 @@
#[cfg(all(feature = "cfg-target-has-atomic", not(feature = "unstable")))]
compile_error!("The `cfg-target-has-atomic` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(all(feature = "bytes", not(feature = "unstable")))]
compile_error!("The `bytes` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[doc(hidden)] pub use futures_core::core_reexport;

#[doc(hidden)] pub use futures_core::future::Future;
Expand Down Expand Up @@ -305,6 +308,12 @@ pub mod io {
ReadToString, ReadUntil, ReadVectored, Seek, Take, Window, Write,
WriteAll, WriteHalf, WriteVectored,
};

#[cfg(feature = "bytes")]
pub use futures_io::{Buf, BufMut};

#[cfg(feature = "bytes")]
pub use futures_util::io::{ReadBuf, WriteBuf};
}

#[cfg(feature = "std")]
Expand Down

0 comments on commit df29530

Please sign in to comment.