Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Aug 13, 2020
1 parent 022f49c commit d6086f9
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 54 deletions.
1 change: 0 additions & 1 deletion tokio-test/src/io.rs
Expand Up @@ -216,7 +216,6 @@ impl Inner {
// Drain the data from the source
data.drain(..n);

// Return the number of bytes read
Ok(())
}
Some(&mut Action::ReadError(ref mut err)) => {
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/src/compat.rs
Expand Up @@ -110,6 +110,8 @@ where
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// We can't trust the inner type to not peak at the bytes,
// so we must defensively initialize the buffer.
let slice = buf.initialize_unfilled();
let n = ready!(futures_io::AsyncRead::poll_read(
self.project().inner,
Expand Down
13 changes: 6 additions & 7 deletions tokio/src/io/async_read.rs
Expand Up @@ -73,17 +73,16 @@ pub trait AsyncRead {
return Poll::Ready(Ok(0));
}

unsafe {
let n = {
let mut b = ReadBuf::uninit(buf.bytes_mut());
let mut b = ReadBuf::uninit(buf.bytes_mut());

ready!(self.poll_read(cx, &mut b))?;
b.filled().len()
};
ready!(self.poll_read(cx, &mut b))?;
let n = b.filled().len();

// Safety: we can assume `n` bytes were read, since they are in`filled`.
unsafe {
buf.advance_mut(n);
Poll::Ready(Ok(n))
}
Poll::Ready(Ok(n))
}
}

Expand Down
15 changes: 12 additions & 3 deletions tokio/src/io/read_buf.rs
@@ -1,5 +1,5 @@
// This lint claims ugly casting is somehow safer than transmute, but there's
// evidence that is the case. Shush.
// no evidence that is the case. Shush.
#![allow(clippy::transmute_ptr_to_ptr)]

use std::fmt;
Expand Down Expand Up @@ -63,6 +63,7 @@ impl<'a> ReadBuf<'a> {
let slice = &self.buf[..self.filled];
// safety: filled describes how far into the buffer that the
// user has filled with bytes, so it's been initialized.
// TODO: This could use `MaybeUninit::slice_get_ref` when it is stable.
unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) }
}

Expand All @@ -72,6 +73,7 @@ impl<'a> ReadBuf<'a> {
let slice = &mut self.buf[..self.filled];
// safety: filled describes how far into the buffer that the
// user has filled with bytes, so it's been initialized.
// TODO: This could use `MaybeUninit::slice_get_mut` when it is stable.
unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
}

Expand All @@ -83,6 +85,7 @@ impl<'a> ReadBuf<'a> {
let slice = &self.buf[..self.initialized];
// safety: initialized describes how far into the buffer that the
// user has at some point initialized with bytes.
// TODO: This could use `MaybeUninit::slice_get_ref` when it is stable.
unsafe { mem::transmute::<&[MaybeUninit<u8>], &[u8]>(slice) }
}

Expand All @@ -94,6 +97,7 @@ impl<'a> ReadBuf<'a> {
let slice = &mut self.buf[..self.initialized];
// safety: initialized describes how far into the buffer that the
// user has at some point initialized with bytes.
// TODO: This could use `MaybeUninit::slice_get_mut` when it is stable.
unsafe { mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(slice) }
}

Expand Down Expand Up @@ -127,6 +131,7 @@ impl<'a> ReadBuf<'a> {
pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] {
assert!(self.remaining() >= n, "n overflows remaining");

// This can't overflow, otherwise the assert above would have failed.
let end = self.filled + n;

if self.initialized < end {
Expand Down Expand Up @@ -200,7 +205,7 @@ impl<'a> ReadBuf<'a> {
/// The caller must ensure that `n` unfilled bytes of the buffer have already been initialized.
#[inline]
pub unsafe fn assume_init(&mut self, n: usize) {
let new = self.filled.checked_add(n).expect("filled overflow");
let new = self.filled + n;
if new > self.initialized {
self.initialized = new;
}
Expand Down Expand Up @@ -239,6 +244,10 @@ impl<'a> ReadBuf<'a> {

impl fmt::Debug for ReadBuf<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadBuf").finish()
f.debug_struct("ReadBuf")
.field("filled", &self.filled)
.field("initialized", &self.initialized)
.field("capacity", &self.capacity())
.finish()
}
}
53 changes: 10 additions & 43 deletions tokio/src/io/util/read_to_end.rs
Expand Up @@ -28,12 +28,7 @@ where
}
}

/// # Safety
///
/// Before first calling this method, the unused capacity must have been
/// prepared for use with the provided AsyncRead. This can be done using the
/// `prepare_buffer` function later in this file.
pub(super) unsafe fn read_to_end_internal<R: AsyncRead + ?Sized>(
pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
mut reader: Pin<&mut R>,
num_read: &mut usize,
Expand All @@ -55,13 +50,7 @@ pub(super) unsafe fn read_to_end_internal<R: AsyncRead + ?Sized>(
/// Tries to read from the provided AsyncRead.
///
/// The length of the buffer is increased by the number of bytes read.
///
/// # Safety
///
/// The caller ensures that the buffer has been prepared for use with the
/// AsyncRead before calling this function. This can be done using the
/// `prepare_buffer` function later in this file.
unsafe fn poll_read_to_end<R: AsyncRead + ?Sized>(
fn poll_read_to_end<R: AsyncRead + ?Sized>(
buf: &mut Vec<u8>,
read: Pin<&mut R>,
cx: &mut Context<'_>,
Expand All @@ -78,37 +67,16 @@ unsafe fn poll_read_to_end<R: AsyncRead + ?Sized>(

ready!(read.poll_read(cx, &mut unused_capacity))?;

// safety: There are two situations:
//
// 1. The AsyncRead has not overriden `prepare_uninitialized_buffer`.
//
// In this situation, the default implementation of that method will have
// zeroed the unused capacity. This means that setting the length will
// never expose uninitialized memory in the vector.
//
// Note that the assert! below ensures that we don't set the length to
// something larger than the capacity, which malicious implementors might
// try to have us do.
//
// 2. The AsyncRead has overriden `prepare_uninitialized_buffer`.
//
// In this case, the safety of the `set_len` call below relies on this
// guarantee from the documentation on `prepare_uninitialized_buffer`:
//
// > This function isn't actually unsafe to call but unsafe to implement.
// > The implementer must ensure that either the whole buf has been zeroed
// > or poll_read() overwrites the buffer without reading it and returns
// > correct value.
//
// Note that `prepare_uninitialized_buffer` is unsafe to implement, so this
// is a guarantee we can rely on in unsafe code.
//
// The assert!() is technically only necessary in the first case.
let n = unused_capacity.filled().len();
let new_len = buf.len() + n;
assert!(new_len <= buf.capacity());

buf.set_len(new_len);
// This should no longer even be possible in safe Rust. An implementor
// would need to have unsafely *replaced* the buffer inside `ReadBuf`,
// which... yolo?
assert!(new_len <= buf.capacity());
unsafe {
buf.set_len(new_len);
}
Poll::Ready(Ok(n))
}

Expand All @@ -135,8 +103,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, read } = &mut *self;

// safety: The constructor of ReadToEnd calls `prepare_buffer`
unsafe { read_to_end_internal(buf, Pin::new(*reader), read, cx) }
read_to_end_internal(buf, Pin::new(*reader), read, cx)
}
}

Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/util/take.rs
Expand Up @@ -87,6 +87,7 @@ impl<R: AsyncRead> AsyncRead for Take<R> {
let me = self.project();
let max = std::cmp::min(buf.remaining() as u64, *me.limit_) as usize;
// Make a ReadBuf of the unfulled section up to max
// Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`.
let mut b = unsafe { ReadBuf::uninit(&mut buf.unfilled_mut()[..max]) };
ready!(me.inner.poll_read(cx, &mut b))?;
let n = b.filled().len();
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -705,6 +705,7 @@ impl TcpStream {
) -> Poll<io::Result<()>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;

// Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes.
let b =
unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
match self.io.get_ref().read(b) {
Expand All @@ -713,6 +714,8 @@ impl TcpStream {
Poll::Pending
}
Ok(n) => {
// Safety: We trust `TcpStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -213,6 +213,7 @@ impl UnixStream {
) -> Poll<io::Result<()>> {
ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?;

// Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes.
let b =
unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
match self.io.get_ref().read(b) {
Expand All @@ -221,6 +222,8 @@ impl UnixStream {
Poll::Pending
}
Ok(n) => {
// Safety: We trust `UnixStream::read` to have filled up `n` bytes
// in the buffer.
unsafe {
buf.assume_init(n);
}
Expand Down

0 comments on commit d6086f9

Please sign in to comment.