Skip to content

Commit

Permalink
Make AsyncRead::initializer unstable API
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Aug 30, 2019
1 parent 90c83b8 commit cff39cb
Show file tree
Hide file tree
Showing 18 changed files with 111 additions and 66 deletions.
14 changes: 12 additions & 2 deletions .travis.yml
Expand Up @@ -136,14 +136,23 @@ matrix:
--no-default-features
--features async-await

- name: cargo check (futures-util)
- name: cargo check (sub crates)
rust: nightly
script:
- cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml

# futures-io
# Check default-features, all-features
- cargo check --manifest-path futures-io/Cargo.toml
- cargo check --manifest-path futures-io/Cargo.toml --all-features
# Check each features
- cargo check --manifest-path futures-io/Cargo.toml --features read_initializer,unstable

# futures-util
# Check default-features, all-features
- cargo check --manifest-path futures-util/Cargo.toml
- cargo check --manifest-path futures-util/Cargo.toml --all-features

# Check each features
- cargo check --manifest-path futures-util/Cargo.toml --features sink
- cargo check --manifest-path futures-util/Cargo.toml --features io
- cargo check --manifest-path futures-util/Cargo.toml --features channel
Expand All @@ -158,6 +167,7 @@ matrix:
- cargo check --manifest-path futures-util/Cargo.toml --features sink,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features io,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features sink,io
- cargo check --manifest-path futures-util/Cargo.toml --features read_initializer,unstable

- cargo check --manifest-path futures-util/Cargo.toml --no-default-features
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink
Expand Down
6 changes: 6 additions & 0 deletions futures-io/Cargo.toml
Expand Up @@ -18,6 +18,12 @@ name = "futures_io"
default = ["std"]
std = []

# Unstable features
# These features are outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = []
read_initializer = []

[dependencies]

[dev-dependencies]
Expand Down
62 changes: 16 additions & 46 deletions futures-io/src/lib.rs
Expand Up @@ -8,6 +8,8 @@
//! All items of this library are only available when the `std` feature of this
//! library is activated, and it is activated by default.

#![cfg_attr(feature = "read_initializer", feature(read_initializer))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
Expand All @@ -19,13 +21,15 @@

#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")]

#[cfg(all(feature = "read_initializer", not(feature = "unstable")))]
compile_error!("The `read_initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "std")]
mod if_std {
use std::cmp;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll};

// Re-export some types from `std::io` so that users don't have to deal
Expand All @@ -40,45 +44,9 @@ mod if_std {
SeekFrom as SeekFrom,
};

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods, modeled after `std`.
#[derive(Debug)]
pub struct Initializer(bool);

impl Initializer {
/// Returns a new `Initializer` which will zero out buffers.
#[inline]
pub fn zeroing() -> Initializer {
Initializer(true)
}

/// Returns a new `Initializer` which will not zero out buffers.
///
/// # Safety
///
/// This method may only be called by `AsyncRead`ers which guarantee
/// that they will not read from the buffers passed to `AsyncRead`
/// methods, and that the return value of the method accurately reflects
/// the number of bytes that have been written to the head of the buffer.
#[inline]
pub unsafe fn nop() -> Initializer {
Initializer(false)
}

/// Indicates if a buffer should be initialized.
#[inline]
pub fn should_initialize(&self) -> bool {
self.0
}

/// Initializes a buffer if necessary.
#[inline]
pub fn initialize(&self, buf: &mut [u8]) {
if self.should_initialize() {
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
}
}
}
#[cfg(feature = "read_initializer")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use io::Initializer as Initializer;

/// Read bytes asynchronously.
///
Expand All @@ -99,6 +67,7 @@ mod if_std {
/// This method is `unsafe` because and `AsyncRead`er could otherwise
/// return a non-zeroing `Initializer` from another `AsyncRead` type
/// without an `unsafe` block.
#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::zeroing()
Expand Down Expand Up @@ -342,6 +311,7 @@ mod if_std {

macro_rules! deref_async_read {
() => {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}
Expand Down Expand Up @@ -373,6 +343,7 @@ mod if_std {
P: DerefMut + Unpin,
P::Target: AsyncRead,
{
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}
Expand All @@ -390,12 +361,11 @@ mod if_std {
}
}

/// `unsafe` because the `io::Read` type must not access the buffer
/// before reading data into it.
macro_rules! unsafe_delegate_async_read_to_stdio {
macro_rules! delegate_async_read_to_stdio {
() => {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
io::Read::initializer(self)
}

fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8])
Expand All @@ -413,11 +383,11 @@ mod if_std {
}

impl AsyncRead for &[u8] {
unsafe_delegate_async_read_to_stdio!();
delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
delegate_async_read_to_stdio!();
}

macro_rules! deref_async_write {
Expand Down
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack"
unstable = ["futures-core-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"]
bilock = []
read_initializer = ["io", "futures-io-preview/read_initializer", "futures-io-preview/unstable"]

[dependencies]
futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false }
Expand Down
7 changes: 4 additions & 3 deletions futures-util/src/compat/compat01as03.rs
Expand Up @@ -368,9 +368,9 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
#[cfg(feature = "io-compat")]
mod io {
use super::*;
use futures_io::{
AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer,
};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
use std::io::Error;
use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};

Expand Down Expand Up @@ -433,6 +433,7 @@ mod io {
impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}

impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
// check if `prepare_uninitialized_buffer` needs zeroing
if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
Expand Down
1 change: 1 addition & 0 deletions futures-util/src/compat/compat03as01.rs
Expand Up @@ -262,6 +262,7 @@ mod io {
}

impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
#[cfg(feature = "read_initializer")]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
let initializer = self.inner.initializer();
let does_init = initializer.should_initialize();
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/future/either.rs
Expand Up @@ -160,16 +160,18 @@ mod if_std {
use super::Either;
use core::pin::Pin;
use core::task::{Context, Poll};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, Initializer, IoSlice, IoSliceMut, Result,
SeekFrom,
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
};

impl<A, B> AsyncRead for Either<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
match self {
Either::Left(x) => x.initializer(),
Expand Down
13 changes: 11 additions & 2 deletions futures-util/src/io/allow_std.rs
@@ -1,4 +1,6 @@
use futures_core::task::{Context, Poll};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom};
use std::{fmt, io};
use std::pin::Pin;
Expand Down Expand Up @@ -106,8 +108,10 @@ impl<T> io::Read for AllowStdIo<T> where T: io::Read {
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}
// TODO: implement the `initializer` fn when it stabilizes.
// See rust-lang/rust #42788
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.0.initializer()
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
Expand All @@ -131,6 +135,11 @@ impl<T> AsyncRead for AllowStdIo<T> where T: io::Read {
{
Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
}

#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.0.initializer()
}
}

impl<T> io::Seek for AllowStdIo<T> where T: io::Seek {
Expand Down
7 changes: 5 additions & 2 deletions futures-util/src/io/buf_reader.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, IoSliceMut, SeekFrom};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::io::{self, Read};
use std::pin::Pin;
Expand Down Expand Up @@ -48,7 +50,7 @@ impl<R: AsyncRead> BufReader<R> {
unsafe {
let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer);
super::initialize(&inner, &mut buffer);
Self {
inner,
buf: buffer.into_boxed_slice(),
Expand Down Expand Up @@ -166,6 +168,7 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
}

// we can't skip unconditionally because of the large buffer case in read.
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/chain.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, Initializer, IoSliceMut};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::fmt;
use std::io;
Expand Down Expand Up @@ -117,6 +119,7 @@ where
self.second().poll_read_vectored(cx, bufs)
}

#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
let initializer = self.first.initializer();
if initializer.should_initialize() {
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/empty.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, Initializer};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use std::fmt;
use std::io;
use std::pin::Pin;
Expand Down Expand Up @@ -42,6 +44,7 @@ impl AsyncRead for Empty {
Poll::Ready(Ok(0))
}

#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
Expand Down
22 changes: 20 additions & 2 deletions futures-util/src/io/mod.rs
Expand Up @@ -9,17 +9,35 @@
//! This module is only available when the `io` and `std` features of this
//! library is activated, and it is activated by default.

#[cfg(feature = "io-compat")]
use crate::compat::Compat;
use std::ptr;

pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
IoSlice, IoSliceMut, Result, SeekFrom,
};

#[cfg(feature = "io-compat")] use crate::compat::Compat;
#[cfg(feature = "read_initializer")]
pub use futures_io::Initializer;

// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// Initializes a buffer if necessary.
///
/// Always initialize when `read_initializer` feature is disabled.
#[inline]
unsafe fn initialize<R: AsyncRead>(reader: &R, buf: &mut [u8]) {
#[cfg(feature = "read_initializer")]
{
if !reader.initializer().should_initialize() {
return;
}
}
ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
}

mod allow_std;
pub use self::allow_std::AllowStdIo;

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/read_to_end.rs
Expand Up @@ -58,7 +58,7 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
rd.initializer().initialize(&mut g.buf[g.len..]);
super::initialize(&rd, &mut g.buf[g.len..]);
}
}

Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/repeat.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, Initializer, IoSliceMut};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, IoSliceMut};
use std::fmt;
use std::io;
use std::pin::Pin;
Expand Down Expand Up @@ -57,6 +59,7 @@ impl AsyncRead for Repeat {
Poll::Ready(Ok(nwritten))
}

#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
Expand Down

0 comments on commit cff39cb

Please sign in to comment.