Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make AsyncRead::initializer unstable API #1845

Merged
merged 1 commit into from Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions .travis.yml
Expand Up @@ -136,14 +136,23 @@ matrix:
--no-default-features
--features async-await

- name: cargo check (futures-util)
- name: cargo check (sub crates)
rust: nightly
script:
- cargo run --manifest-path ci/remove-dev-dependencies/Cargo.toml */Cargo.toml

# futures-io
# Check default-features, all-features
- cargo check --manifest-path futures-io/Cargo.toml
- cargo check --manifest-path futures-io/Cargo.toml --all-features
# Check each features
- cargo check --manifest-path futures-io/Cargo.toml --features read_initializer,unstable

# futures-util
# Check default-features, all-features
- cargo check --manifest-path futures-util/Cargo.toml
- cargo check --manifest-path futures-util/Cargo.toml --all-features

# Check each features
- cargo check --manifest-path futures-util/Cargo.toml --features sink
- cargo check --manifest-path futures-util/Cargo.toml --features io
- cargo check --manifest-path futures-util/Cargo.toml --features channel
Expand All @@ -158,6 +167,7 @@ matrix:
- cargo check --manifest-path futures-util/Cargo.toml --features sink,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features io,bilock,unstable
- cargo check --manifest-path futures-util/Cargo.toml --features sink,io
- cargo check --manifest-path futures-util/Cargo.toml --features read_initializer,unstable

- cargo check --manifest-path futures-util/Cargo.toml --no-default-features
- cargo check --manifest-path futures-util/Cargo.toml --no-default-features --features sink
Expand Down
6 changes: 6 additions & 0 deletions futures-io/Cargo.toml
Expand Up @@ -18,6 +18,12 @@ name = "futures_io"
default = ["std"]
std = []

# Unstable features
# These features are outside of the normal semver guarantees and require the
# `unstable` feature as an explicit opt-in to unstable API.
unstable = []
read_initializer = []

[dependencies]

[dev-dependencies]
Expand Down
62 changes: 16 additions & 46 deletions futures-io/src/lib.rs
Expand Up @@ -8,6 +8,8 @@
//! All items of this library are only available when the `std` feature of this
//! library is activated, and it is activated by default.

#![cfg_attr(feature = "read_initializer", feature(read_initializer))]

#![cfg_attr(not(feature = "std"), no_std)]

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms, unreachable_pub)]
Expand All @@ -19,13 +21,15 @@

#![doc(html_root_url = "https://rust-lang-nursery.github.io/futures-api-docs/0.3.0-alpha.18/futures_io")]

#[cfg(all(feature = "read_initializer", not(feature = "unstable")))]
compile_error!("The `read_initializer` feature requires the `unstable` feature as an explicit opt-in to unstable features");

#[cfg(feature = "std")]
mod if_std {
use std::cmp;
use std::io;
use std::ops::DerefMut;
use std::pin::Pin;
use std::ptr;
use std::task::{Context, Poll};

// Re-export some types from `std::io` so that users don't have to deal
Expand All @@ -40,45 +44,9 @@ mod if_std {
SeekFrom as SeekFrom,
};

/// A type used to conditionally initialize buffers passed to `AsyncRead`
/// methods, modeled after `std`.
#[derive(Debug)]
pub struct Initializer(bool);

impl Initializer {
/// Returns a new `Initializer` which will zero out buffers.
#[inline]
pub fn zeroing() -> Initializer {
Initializer(true)
}

/// Returns a new `Initializer` which will not zero out buffers.
///
/// # Safety
///
/// This method may only be called by `AsyncRead`ers which guarantee
/// that they will not read from the buffers passed to `AsyncRead`
/// methods, and that the return value of the method accurately reflects
/// the number of bytes that have been written to the head of the buffer.
#[inline]
pub unsafe fn nop() -> Initializer {
Initializer(false)
}

/// Indicates if a buffer should be initialized.
#[inline]
pub fn should_initialize(&self) -> bool {
self.0
}

/// Initializes a buffer if necessary.
#[inline]
pub fn initialize(&self, buf: &mut [u8]) {
if self.should_initialize() {
unsafe { ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len()) }
}
}
}
#[cfg(feature = "read_initializer")]
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use io::Initializer as Initializer;

/// Read bytes asynchronously.
///
Expand All @@ -99,6 +67,7 @@ mod if_std {
/// This method is `unsafe` because and `AsyncRead`er could otherwise
/// return a non-zeroing `Initializer` from another `AsyncRead` type
/// without an `unsafe` block.
#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::zeroing()
Expand Down Expand Up @@ -342,6 +311,7 @@ mod if_std {

macro_rules! deref_async_read {
() => {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}
Expand Down Expand Up @@ -373,6 +343,7 @@ mod if_std {
P: DerefMut + Unpin,
P::Target: AsyncRead,
{
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
(**self).initializer()
}
Expand All @@ -390,12 +361,11 @@ mod if_std {
}
}

/// `unsafe` because the `io::Read` type must not access the buffer
/// before reading data into it.
macro_rules! unsafe_delegate_async_read_to_stdio {
macro_rules! delegate_async_read_to_stdio {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now safe.

() => {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
io::Read::initializer(self)
}

fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8])
Expand All @@ -413,11 +383,11 @@ mod if_std {
}

impl AsyncRead for &[u8] {
unsafe_delegate_async_read_to_stdio!();
delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncRead for io::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
delegate_async_read_to_stdio!();
}

macro_rules! deref_async_write {
Expand Down
1 change: 1 addition & 0 deletions futures-util/Cargo.toml
Expand Up @@ -33,6 +33,7 @@ select-macro = ["async-await", "futures-select-macro-preview", "proc-macro-hack"
unstable = ["futures-core-preview/unstable"]
cfg-target-has-atomic = ["futures-core-preview/cfg-target-has-atomic"]
bilock = []
read_initializer = ["io", "futures-io-preview/read_initializer", "futures-io-preview/unstable"]

[dependencies]
futures-core-preview = { path = "../futures-core", version = "=0.3.0-alpha.18", default-features = false }
Expand Down
7 changes: 4 additions & 3 deletions futures-util/src/compat/compat01as03.rs
Expand Up @@ -368,9 +368,9 @@ unsafe impl UnsafeNotify01 for NotifyWaker {
#[cfg(feature = "io-compat")]
mod io {
use super::*;
use futures_io::{
AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03, Initializer,
};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead as AsyncRead03, AsyncWrite as AsyncWrite03};
use std::io::Error;
use tokio_io::{AsyncRead as AsyncRead01, AsyncWrite as AsyncWrite01};

Expand Down Expand Up @@ -433,6 +433,7 @@ mod io {
impl<W: AsyncWrite01> AsyncWrite01CompatExt for W {}

impl<R: AsyncRead01> AsyncRead03 for Compat01As03<R> {
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
// check if `prepare_uninitialized_buffer` needs zeroing
if self.inner.get_ref().prepare_uninitialized_buffer(&mut [1]) {
Expand Down
1 change: 1 addition & 0 deletions futures-util/src/compat/compat03as01.rs
Expand Up @@ -262,6 +262,7 @@ mod io {
}

impl<R: AsyncRead03 + Unpin> AsyncRead01 for Compat<R> {
#[cfg(feature = "read_initializer")]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
let initializer = self.inner.initializer();
let does_init = initializer.should_initialize();
Expand Down
6 changes: 4 additions & 2 deletions futures-util/src/future/either.rs
Expand Up @@ -160,16 +160,18 @@ mod if_std {
use super::Either;
use core::pin::Pin;
use core::task::{Context, Poll};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, Initializer, IoSlice, IoSliceMut, Result,
SeekFrom,
AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, Result, SeekFrom,
};

impl<A, B> AsyncRead for Either<A, B>
where
A: AsyncRead,
B: AsyncRead,
{
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
match self {
Either::Left(x) => x.initializer(),
Expand Down
13 changes: 11 additions & 2 deletions futures-util/src/io/allow_std.rs
@@ -1,4 +1,6 @@
use futures_core::task::{Context, Poll};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoSlice, IoSliceMut, SeekFrom};
use std::{fmt, io};
use std::pin::Pin;
Expand Down Expand Up @@ -106,8 +108,10 @@ impl<T> io::Read for AllowStdIo<T> where T: io::Read {
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.0.read_vectored(bufs)
}
// TODO: implement the `initializer` fn when it stabilizes.
// See rust-lang/rust #42788
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.0.initializer()
}
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
self.0.read_to_end(buf)
}
Expand All @@ -131,6 +135,11 @@ impl<T> AsyncRead for AllowStdIo<T> where T: io::Read {
{
Poll::Ready(Ok(try_with_interrupt!(self.0.read_vectored(bufs))))
}

#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.0.initializer()
}
}

impl<T> io::Seek for AllowStdIo<T> where T: io::Seek {
Expand Down
7 changes: 5 additions & 2 deletions futures-util/src/io/buf_reader.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer, IoSliceMut, SeekFrom};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, IoSliceMut, SeekFrom};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::io::{self, Read};
use std::pin::Pin;
Expand Down Expand Up @@ -48,7 +50,7 @@ impl<R: AsyncRead> BufReader<R> {
unsafe {
let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer);
super::initialize(&inner, &mut buffer);
Self {
inner,
buf: buffer.into_boxed_slice(),
Expand Down Expand Up @@ -166,6 +168,7 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
}

// we can't skip unconditionally because of the large buffer case in read.
#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/chain.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, Initializer, IoSliceMut};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, IoSliceMut};
use pin_utils::{unsafe_pinned, unsafe_unpinned};
use std::fmt;
use std::io;
Expand Down Expand Up @@ -117,6 +119,7 @@ where
self.second().poll_read_vectored(cx, bufs)
}

#[cfg(feature = "read_initializer")]
unsafe fn initializer(&self) -> Initializer {
let initializer = self.first.initializer();
if initializer.should_initialize() {
Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/empty.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, Initializer};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead};
use std::fmt;
use std::io;
use std::pin::Pin;
Expand Down Expand Up @@ -42,6 +44,7 @@ impl AsyncRead for Empty {
Poll::Ready(Ok(0))
}

#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
Expand Down
22 changes: 20 additions & 2 deletions futures-util/src/io/mod.rs
Expand Up @@ -9,17 +9,35 @@
//! This module is only available when the `io` and `std` features of this
//! library is activated, and it is activated by default.

#[cfg(feature = "io-compat")]
use crate::compat::Compat;
use std::ptr;

pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, Error, ErrorKind,
IoSlice, IoSliceMut, Result, SeekFrom,
};

#[cfg(feature = "io-compat")] use crate::compat::Compat;
#[cfg(feature = "read_initializer")]
pub use futures_io::Initializer;

// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

/// Initializes a buffer if necessary.
///
/// A buffer is always initialized if `read_initializer` feature is disabled.
#[inline]
unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
#[cfg(feature = "read_initializer")]
{
if !_reader.initializer().should_initialize() {
return;
}
}
ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
}

mod allow_std;
pub use self::allow_std::AllowStdIo;

Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/read_to_end.rs
Expand Up @@ -58,7 +58,7 @@ pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
rd.initializer().initialize(&mut g.buf[g.len..]);
super::initialize(&rd, &mut g.buf[g.len..]);
}
}

Expand Down
5 changes: 4 additions & 1 deletion futures-util/src/io/repeat.rs
@@ -1,5 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, Initializer, IoSliceMut};
#[cfg(feature = "read_initializer")]
use futures_io::Initializer;
use futures_io::{AsyncRead, IoSliceMut};
use std::fmt;
use std::io;
use std::pin::Pin;
Expand Down Expand Up @@ -57,6 +59,7 @@ impl AsyncRead for Repeat {
Poll::Ready(Ok(nwritten))
}

#[cfg(feature = "read_initializer")]
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
Expand Down