From 931cde71a82e001b3e8614b23087e72012a3b4df Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 13 Aug 2020 14:00:11 -0700 Subject: [PATCH] address review comments --- tokio-test/src/io.rs | 1 - tokio-util/src/compat.rs | 2 ++ tokio/src/io/async_read.rs | 13 ++++---- tokio/src/io/read_buf.rs | 19 +++++++++--- tokio/src/io/util/read_to_end.rs | 53 ++++++-------------------------- tokio/src/io/util/take.rs | 1 + tokio/src/net/tcp/stream.rs | 3 ++ tokio/src/net/unix/stream.rs | 3 ++ 8 files changed, 39 insertions(+), 56 deletions(-) diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index bf81630f0a1..f1ce77aa248 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -216,7 +216,6 @@ impl Inner { // Drain the data from the source data.drain(..n); - // Return the number of bytes read Ok(()) } Some(&mut Action::ReadError(ref mut err)) => { diff --git a/tokio-util/src/compat.rs b/tokio-util/src/compat.rs index eb90b09f612..34120d43a34 100644 --- a/tokio-util/src/compat.rs +++ b/tokio-util/src/compat.rs @@ -110,6 +110,8 @@ where cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> Poll> { + // We can't trust the inner type to not peak at the bytes, + // so we must defensively initialize the buffer. let slice = buf.initialize_unfilled(); let n = ready!(futures_io::AsyncRead::poll_read( self.project().inner, diff --git a/tokio/src/io/async_read.rs b/tokio/src/io/async_read.rs index 282748678e3..d341b63d41a 100644 --- a/tokio/src/io/async_read.rs +++ b/tokio/src/io/async_read.rs @@ -73,17 +73,16 @@ pub trait AsyncRead { return Poll::Ready(Ok(0)); } - unsafe { - let n = { - let mut b = ReadBuf::uninit(buf.bytes_mut()); + let mut b = ReadBuf::uninit(buf.bytes_mut()); - ready!(self.poll_read(cx, &mut b))?; - b.filled().len() - }; + ready!(self.poll_read(cx, &mut b))?; + let n = b.filled().len(); + // Safety: we can assume `n` bytes were read, since they are in`filled`. + unsafe { buf.advance_mut(n); - Poll::Ready(Ok(n)) } + Poll::Ready(Ok(n)) } } diff --git a/tokio/src/io/read_buf.rs b/tokio/src/io/read_buf.rs index add00457f25..212d20fc5e1 100644 --- a/tokio/src/io/read_buf.rs +++ b/tokio/src/io/read_buf.rs @@ -1,5 +1,5 @@ // This lint claims ugly casting is somehow safer than transmute, but there's -// evidence that is the case. Shush. +// no evidence that is the case. Shush. #![allow(clippy::transmute_ptr_to_ptr)] use std::fmt; @@ -63,6 +63,7 @@ impl<'a> ReadBuf<'a> { let slice = &self.buf[..self.filled]; // safety: filled describes how far into the buffer that the // user has filled with bytes, so it's been initialized. + // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable. unsafe { mem::transmute::<&[MaybeUninit], &[u8]>(slice) } } @@ -72,6 +73,7 @@ impl<'a> ReadBuf<'a> { let slice = &mut self.buf[..self.filled]; // safety: filled describes how far into the buffer that the // user has filled with bytes, so it's been initialized. + // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable. unsafe { mem::transmute::<&mut [MaybeUninit], &mut [u8]>(slice) } } @@ -83,6 +85,7 @@ impl<'a> ReadBuf<'a> { let slice = &self.buf[..self.initialized]; // safety: initialized describes how far into the buffer that the // user has at some point initialized with bytes. + // TODO: This could use `MaybeUninit::slice_get_ref` when it is stable. unsafe { mem::transmute::<&[MaybeUninit], &[u8]>(slice) } } @@ -94,6 +97,7 @@ impl<'a> ReadBuf<'a> { let slice = &mut self.buf[..self.initialized]; // safety: initialized describes how far into the buffer that the // user has at some point initialized with bytes. + // TODO: This could use `MaybeUninit::slice_get_mut` when it is stable. unsafe { mem::transmute::<&mut [MaybeUninit], &mut [u8]>(slice) } } @@ -125,8 +129,9 @@ impl<'a> ReadBuf<'a> { /// Panics if `self.remaining()` is less than `n`. #[inline] pub fn initialize_unfilled_to(&mut self, n: usize) -> &mut [u8] { - assert!(self.remaining() >= n, "n overflows remaining"); + assert!(self.remaining() >= n, "n ({:?}) overflows remaining ({:?})"); + // This can't overflow, otherwise the assert above would have failed. let end = self.filled + n; if self.initialized < end { @@ -200,7 +205,7 @@ impl<'a> ReadBuf<'a> { /// The caller must ensure that `n` unfilled bytes of the buffer have already been initialized. #[inline] pub unsafe fn assume_init(&mut self, n: usize) { - let new = self.filled.checked_add(n).expect("filled overflow"); + let new = self.filled + n; if new > self.initialized { self.initialized = new; } @@ -212,7 +217,7 @@ impl<'a> ReadBuf<'a> { /// /// Panics if `self.remaining()` is less than `buf.len()`. #[inline] - pub fn append(&mut self, buf: &[u8]) { + pub fn put_slice(&mut self, buf: &[u8]) { assert!( self.remaining() >= buf.len(), "buf.len() must fit in remaining()" @@ -239,6 +244,10 @@ impl<'a> ReadBuf<'a> { impl fmt::Debug for ReadBuf<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ReadBuf").finish() + f.debug_struct("ReadBuf") + .field("filled", &self.filled) + .field("initialized", &self.initialized) + .field("capacity", &self.capacity()) + .finish() } } diff --git a/tokio/src/io/util/read_to_end.rs b/tokio/src/io/util/read_to_end.rs index 91906ffe75d..609af28e9fb 100644 --- a/tokio/src/io/util/read_to_end.rs +++ b/tokio/src/io/util/read_to_end.rs @@ -28,12 +28,7 @@ where } } -/// # Safety -/// -/// Before first calling this method, the unused capacity must have been -/// prepared for use with the provided AsyncRead. This can be done using the -/// `prepare_buffer` function later in this file. -pub(super) unsafe fn read_to_end_internal( +pub(super) fn read_to_end_internal( buf: &mut Vec, mut reader: Pin<&mut R>, num_read: &mut usize, @@ -55,13 +50,7 @@ pub(super) unsafe fn read_to_end_internal( /// Tries to read from the provided AsyncRead. /// /// The length of the buffer is increased by the number of bytes read. -/// -/// # Safety -/// -/// The caller ensures that the buffer has been prepared for use with the -/// AsyncRead before calling this function. This can be done using the -/// `prepare_buffer` function later in this file. -unsafe fn poll_read_to_end( +fn poll_read_to_end( buf: &mut Vec, read: Pin<&mut R>, cx: &mut Context<'_>, @@ -78,37 +67,16 @@ unsafe fn poll_read_to_end( ready!(read.poll_read(cx, &mut unused_capacity))?; - // safety: There are two situations: - // - // 1. The AsyncRead has not overriden `prepare_uninitialized_buffer`. - // - // In this situation, the default implementation of that method will have - // zeroed the unused capacity. This means that setting the length will - // never expose uninitialized memory in the vector. - // - // Note that the assert! below ensures that we don't set the length to - // something larger than the capacity, which malicious implementors might - // try to have us do. - // - // 2. The AsyncRead has overriden `prepare_uninitialized_buffer`. - // - // In this case, the safety of the `set_len` call below relies on this - // guarantee from the documentation on `prepare_uninitialized_buffer`: - // - // > This function isn't actually unsafe to call but unsafe to implement. - // > The implementer must ensure that either the whole buf has been zeroed - // > or poll_read() overwrites the buffer without reading it and returns - // > correct value. - // - // Note that `prepare_uninitialized_buffer` is unsafe to implement, so this - // is a guarantee we can rely on in unsafe code. - // - // The assert!() is technically only necessary in the first case. let n = unused_capacity.filled().len(); let new_len = buf.len() + n; - assert!(new_len <= buf.capacity()); - buf.set_len(new_len); + // This should no longer even be possible in safe Rust. An implementor + // would need to have unsafely *replaced* the buffer inside `ReadBuf`, + // which... yolo? + assert!(new_len <= buf.capacity()); + unsafe { + buf.set_len(new_len); + } Poll::Ready(Ok(n)) } @@ -135,8 +103,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let Self { reader, buf, read } = &mut *self; - // safety: The constructor of ReadToEnd calls `prepare_buffer` - unsafe { read_to_end_internal(buf, Pin::new(*reader), read, cx) } + read_to_end_internal(buf, Pin::new(*reader), read, cx) } } diff --git a/tokio/src/io/util/take.rs b/tokio/src/io/util/take.rs index 665875bdf78..2abc7693172 100644 --- a/tokio/src/io/util/take.rs +++ b/tokio/src/io/util/take.rs @@ -87,6 +87,7 @@ impl AsyncRead for Take { let me = self.project(); let max = std::cmp::min(buf.remaining() as u64, *me.limit_) as usize; // Make a ReadBuf of the unfulled section up to max + // Saftey: We don't set any of the `unfilled_mut` with `MaybeUninit::uninit`. let mut b = unsafe { ReadBuf::uninit(&mut buf.unfilled_mut()[..max]) }; ready!(me.inner.poll_read(cx, &mut b))?; let n = b.filled().len(); diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 1095e8f56c4..e624fb9d954 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -705,6 +705,7 @@ impl TcpStream { ) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + // Safety: `TcpStream::read` will not peak at the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; match self.io.get_ref().read(b) { @@ -713,6 +714,8 @@ impl TcpStream { Poll::Pending } Ok(n) => { + // Safety: We trust `TcpStream::read` to have filled up `n` bytes + // in the buffer. unsafe { buf.assume_init(n); } diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index a87e9439340..559fe02a625 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -213,6 +213,7 @@ impl UnixStream { ) -> Poll> { ready!(self.io.poll_read_ready(cx, mio::Ready::readable()))?; + // Safety: `UnixStream::read` will not peak at the maybe uinitialized bytes. let b = unsafe { &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) }; match self.io.get_ref().read(b) { @@ -221,6 +222,8 @@ impl UnixStream { Poll::Pending } Ok(n) => { + // Safety: We trust `UnixStream::read` to have filled up `n` bytes + // in the buffer. unsafe { buf.assume_init(n); }