Skip to content

Commit

Permalink
io: remove poll_{read,write}_buf from traits (#2882)
Browse files Browse the repository at this point in the history
These functions have object safety issues. It also has been decided to
avoid vectored operations on the I/O traits. A later PR will bring back
vectored operations on specific types that support them.

Refs: #2879, #2716
  • Loading branch information
carllerche committed Sep 25, 2020
1 parent 760ae89 commit 4186b0a
Show file tree
Hide file tree
Showing 20 changed files with 319 additions and 514 deletions.
11 changes: 0 additions & 11 deletions tokio-test/src/io.rs
Expand Up @@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::sync::mpsc;
use tokio::time::{self, Delay, Duration, Instant};

use bytes::Buf;
use futures_core::ready;
use std::collections::VecDeque;
use std::future::Future;
Expand Down Expand Up @@ -439,16 +438,6 @@ impl AsyncWrite for Mock {
}
}

fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down
4 changes: 3 additions & 1 deletion tokio-util/src/codec/framed_impl.rs
Expand Up @@ -118,6 +118,8 @@ where
type Item = Result<U::Item, U::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;

let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
loop {
Expand Down Expand Up @@ -148,7 +150,7 @@ where
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
state.buffer.reserve(1);
let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? {
let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? {
Poll::Ready(ct) => ct,
Poll::Pending => return Poll::Pending,
};
Expand Down
4 changes: 3 additions & 1 deletion tokio-util/src/io/reader_stream.rs
Expand Up @@ -70,6 +70,8 @@ impl<R: AsyncRead> ReaderStream<R> {
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use crate::util::poll_read_buf;

let mut this = self.as_mut().project();

let reader = match this.reader.as_pin_mut() {
Expand All @@ -81,7 +83,7 @@ impl<R: AsyncRead> Stream for ReaderStream<R> {
this.buf.reserve(CAPACITY);
}

match reader.poll_read_buf(cx, &mut this.buf) {
match poll_read_buf(cx, reader, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Expand Down
25 changes: 1 addition & 24 deletions tokio-util/src/io/stream_reader.rs
@@ -1,4 +1,4 @@
use bytes::{Buf, BufMut};
use bytes::Buf;
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::io;
Expand Down Expand Up @@ -119,29 +119,6 @@ where
self.consume(len);
Poll::Ready(Ok(()))
}
fn poll_read_buf<BM: BufMut>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut BM,
) -> Poll<io::Result<usize>>
where
Self: Sized,
{
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let inner_buf = match self.as_mut().poll_fill_buf(cx) {
Poll::Ready(Ok(buf)) => buf,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let len = std::cmp::min(inner_buf.len(), buf.remaining_mut());
buf.put_slice(&inner_buf[..len]);

self.consume(len);
Poll::Ready(Ok(len))
}
}

impl<S, B, E> AsyncBufRead for StreamReader<S, B>
Expand Down
34 changes: 34 additions & 0 deletions tokio-util/src/lib.rs
Expand Up @@ -52,3 +52,37 @@ pub mod context;
pub mod sync;

pub mod either;

#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};

use bytes::BufMut;
use futures_core::ready;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};

pub(crate) fn poll_read_buf<T: AsyncRead>(
cx: &mut Context<'_>,
io: Pin<&mut T>,
buf: &mut impl BufMut,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let orig = buf.bytes_mut().as_ptr() as *const u8;
let mut b = ReadBuf::uninit(buf.bytes_mut());

ready!(io.poll_read(cx, &mut b))?;
let n = b.filled().len();

// Safety: we can assume `n` bytes were read, since they are in`filled`.
assert_eq!(orig, b.filled().as_ptr());
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
}
3 changes: 1 addition & 2 deletions tokio/Cargo.toml
Expand Up @@ -82,7 +82,7 @@ signal = [
stream = ["futures-core"]
sync = ["fnv"]
test-util = []
tcp = ["iovec", "lazy_static", "mio"]
tcp = ["lazy_static", "mio"]
time = ["slab"]
udp = ["lazy_static", "mio"]
uds = ["lazy_static", "libc", "mio", "mio-uds"]
Expand All @@ -99,7 +99,6 @@ futures-core = { version = "0.3.0", optional = true }
lazy_static = { version = "1.0.2", optional = true }
memchr = { version = "2.2", optional = true }
mio = { version = "0.6.20", optional = true }
iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.11.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
Expand Down

0 comments on commit 4186b0a

Please sign in to comment.