diff --git a/tokio-test/src/io.rs b/tokio-test/src/io.rs index ac70a06f1aa..4f0b589743b 100644 --- a/tokio-test/src/io.rs +++ b/tokio-test/src/io.rs @@ -22,7 +22,6 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::sync::mpsc; use tokio::time::{self, Delay, Duration, Instant}; -use bytes::Buf; use futures_core::ready; use std::collections::VecDeque; use std::future::Future; @@ -439,16 +438,6 @@ impl AsyncWrite for Mock { } } - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut task::Context<'_>, - buf: &mut B, - ) -> Poll> { - let n = ready!(self.poll_write(cx, buf.bytes()))?; - buf.advance(n); - Poll::Ready(Ok(n)) - } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/tokio-util/src/codec/framed_impl.rs b/tokio-util/src/codec/framed_impl.rs index eb2e0d38c6d..c161808f66e 100644 --- a/tokio-util/src/codec/framed_impl.rs +++ b/tokio-util/src/codec/framed_impl.rs @@ -118,6 +118,8 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut pinned = self.project(); let state: &mut ReadFrame = pinned.state.borrow_mut(); loop { @@ -148,7 +150,7 @@ where // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF state.buffer.reserve(1); - let bytect = match pinned.inner.as_mut().poll_read_buf(cx, &mut state.buffer)? { + let bytect = match poll_read_buf(cx, pinned.inner.as_mut(), &mut state.buffer)? { Poll::Ready(ct) => ct, Poll::Pending => return Poll::Pending, }; diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index bde7cceec5c..ab0c22fba73 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -70,6 +70,8 @@ impl ReaderStream { impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use crate::util::poll_read_buf; + let mut this = self.as_mut().project(); let reader = match this.reader.as_pin_mut() { @@ -81,7 +83,7 @@ impl Stream for ReaderStream { this.buf.reserve(CAPACITY); } - match reader.poll_read_buf(cx, &mut this.buf) { + match poll_read_buf(cx, reader, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs index 5c3ab019eba..def843b1ddd 100644 --- a/tokio-util/src/io/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -1,4 +1,4 @@ -use bytes::{Buf, BufMut}; +use bytes::Buf; use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; @@ -119,29 +119,6 @@ where self.consume(len); Poll::Ready(Ok(())) } - fn poll_read_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut BM, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let inner_buf = match self.as_mut().poll_fill_buf(cx) { - Poll::Ready(Ok(buf)) => buf, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; - let len = std::cmp::min(inner_buf.len(), buf.remaining_mut()); - buf.put_slice(&inner_buf[..len]); - - self.consume(len); - Poll::Ready(Ok(len)) - } } impl AsyncBufRead for StreamReader diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index b96d9044ea1..eb35345e796 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -52,3 +52,37 @@ pub mod context; pub mod sync; pub mod either; + +#[cfg(any(feature = "io", feature = "codec"))] +mod util { + use tokio::io::{AsyncRead, ReadBuf}; + + use bytes::BufMut; + use futures_core::ready; + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + pub(crate) fn poll_read_buf( + cx: &mut Context<'_>, + io: Pin<&mut T>, + buf: &mut impl BufMut, + ) -> Poll> { + if !buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let orig = buf.bytes_mut().as_ptr() as *const u8; + let mut b = ReadBuf::uninit(buf.bytes_mut()); + + ready!(io.poll_read(cx, &mut b))?; + let n = b.filled().len(); + + // Safety: we can assume `n` bytes were read, since they are in`filled`. + assert_eq!(orig, b.filled().as_ptr()); + unsafe { + buf.advance_mut(n); + } + Poll::Ready(Ok(n)) + } +} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 9e7ebf12911..0b2017958f1 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -82,7 +82,7 @@ signal = [ stream = ["futures-core"] sync = ["fnv"] test-util = [] -tcp = ["iovec", "lazy_static", "mio"] +tcp = ["lazy_static", "mio"] time = ["slab"] udp = ["lazy_static", "mio"] uds = ["lazy_static", "libc", "mio", "mio-uds"] @@ -99,7 +99,6 @@ futures-core = { version = "0.3.0", optional = true } lazy_static = { version = "1.0.2", optional = true } memchr = { version = "2.2", optional = true } mio = { version = "0.6.20", optional = true } -iovec = { version = "0.1.4", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } # Not in full slab = { version = "0.4.1", optional = true } # Backs `DelayQueue` diff --git a/tokio/docs/reactor-refactor.md b/tokio/docs/reactor-refactor.md new file mode 100644 index 00000000000..a0b54474b71 --- /dev/null +++ b/tokio/docs/reactor-refactor.md @@ -0,0 +1,276 @@ +# Refactor I/O driver + +Describes changes to the I/O driver for the Tokio 0.3 release. + +## Goals + +* Support `async fn` on I/O types with `&self`. +* Refine the `Registration` API. + +### Non-goals + +* Implement `AsyncRead` / `AsyncWrite` for `&TcpStream` or other reference type. + +## Overview + +Currently, I/O types require `&mut self` for `async` functions. The reason for +this is the task's waker is stored in the I/O resource's internal state +(`ScheduledIo`) instead of in the future returned by the `async` function. +Because of this limitation, I/O types limit the number of wakers to one per +direction (a direction is either read-related events or write-related events). + +Moving the waker from the internal I/O resource's state to the operation's +future enables multiple wakers to be registered per operation. The "intrusive +wake list" strategy used by `Notify` applies to this case, though there are some +concerns unique to the I/O driver. + +## Reworking the `Registration` type + +While `Registration` is made private (per #2728), it remains in Tokio as an +implementation detail backing I/O resources such as `TcpStream`. The API of +`Registration` is updated to support waiting for an arbitrary interest set with +`&self`. This supports concurrent waiters with a different readiness interest. + +```rust +struct Registration { ... } + +// TODO: naming +struct ReadyEvent { + tick: u32, + ready: mio::Ready, +} + +impl Registration { + /// `interest` must be a super set of **all** interest sets specified in + /// the other methods. This is the interest set passed to `mio`. + pub fn new(io: &T, interest: mio::Ready) -> io::Result + where T: mio::Evented; + + /// Awaits for any readiness event included in `interest`. Returns a + /// `ReadyEvent` representing the received readiness event. + async fn readiness(&self, interest: mio::Ready) -> io::Result; + + /// Clears resource level readiness represented by the specified `ReadyEvent` + async fn clear_readiness(&self, ready_event: ReadyEvent); +``` + +A new registration is created for a `T: mio::Evented` and a `interest`. This +creates a `ScheduledIo` entry with the I/O driver and registers the resource +with `mio`. + +Because Tokio uses **edge-triggered** notifications, the I/O driver only +receives readiness from the OS once the ready state **changes**. The I/O driver +must track each resource's known readiness state. This helps prevent syscalls +when the process knows the syscall should return with `EWOULDBLOCK`. + +A call to `readiness()` checks if the currently known resource readiness +overlaps with `interest`. If it does, then the `readiness()` immediately +returns. If it does not, then the task waits until the I/O driver receives a +readiness event. + +The pseudocode to perform a TCP read is as follows. + +```rust +async fn read(&self, buf: &mut [u8]) -> io::Result { + loop { + // Await readiness + let event = self.readiness(interest).await?; + + match self.mio_socket.read(buf) { + Ok(v) => return Ok(v), + Err(ref e) if e.kind() == WouldBlock => { + self.clear_readiness(event); + } + Err(e) => return Err(e), + } + } +} +``` + +## Reworking the `ScheduledIo` type + +The `ScheduledIo` type is switched to use an intrusive waker linked list. Each +entry in the linked list includes the `interest` set passed to `readiness()`. + +```rust +#[derive(Debug)] +pub(crate) struct ScheduledIo { + /// Resource's known state packed with other state that must be + /// atomically updated. + readiness: AtomicUsize, + + /// Tracks tasks waiting on the resource + waiters: Mutex, +} + +#[derive(Debug)] +struct Waiters { + // List of intrusive waiters. + list: LinkedList, + + /// Waiter used by `AsyncRead` implementations. + reader: Option, + + /// Waiter used by `AsyncWrite` implementations. + writer: Option, +} + +// This struct is contained by the **future** returned by `readiness()`. +#[derive(Debug)] +struct Waiter { + /// Intrusive linked-list pointers + pointers: linked_list::Pointers, + + /// Waker for task waiting on I/O resource + waiter: Option, + + /// Readiness events being waited on. This is + /// the value passed to `readiness()` + interest: mio::Ready, + + /// Should not be `Unpin`. + _p: PhantomPinned, +} +``` + +When an I/O event is received from `mio`, the associated resources' readiness is +updated and the waiter list is iterated. All waiters with `interest` that +overlap the received readiness event are notified. Any waiter with an `interest` +that does not overlap the readiness event remains in the list. + +## Cancel interest on drop + +The future returned by `readiness()` uses an intrusive linked list to store the +waker with `ScheduledIo`. Because `readiness()` can be called concurrently, many +wakers may be stored simultaneously in the list. If the `readiness()` future is +dropped early, it is essential that the waker is removed from the list. This +prevents leaking memory. + +## Race condition + +Consider how many tasks may concurrently attempt I/O operations. This, combined +with how Tokio uses edge-triggered events, can result in a race condition. Let's +revisit the TCP read function: + +```rust +async fn read(&self, buf: &mut [u8]) -> io::Result { + loop { + // Await readiness + let event = self.readiness(interest).await?; + + match self.mio_socket.read(buf) { + Ok(v) => return Ok(v), + Err(ref e) if e.kind() == WouldBlock => { + self.clear_readiness(event); + } + Err(e) => return Err(e), + } + } +} +``` + +If care is not taken, if between `mio_socket.read(buf)` returning and +`clear_readiness(event)` is called, a readiness event arrives, the `read()` +function could deadlock. This happens because the readiness event is received, +`clear_readiness()` unsets the readiness event, and on the next iteration, +`readiness().await` will block forever as a new readiness event is not received. + +The current I/O driver handles this condition by always registering the task's +waker before performing the operation. This is not ideal as it will result in +unnecessary task notification. + +Instead, we will use a strategy to prevent clearing readiness if an "unseen" +readiness event has been received. The I/O driver will maintain a "tick" value. +Every time the `mio` `poll()` function is called, the tick is incremented. Each +readiness event has an associated tick. When the I/O driver sets the resource's +readiness, the driver's tick is packed into the atomic `usize`. + +The `ScheduledIo` readiness `AtomicUsize` is structured as: + +``` +| reserved | generation | driver tick | readinesss | +|----------+------------+--------------+------------| +| 1 bit | 7 bits + 8 bits + 16 bits | +``` + +The `reserved` and `generation` components exist today. + +The `readiness()` function returns a `ReadyEvent` value. This value includes the +`tick` component read with the resource's readiness value. When +`clear_readiness()` is called, the `ReadyEvent` is provided. Readiness is only +cleared if the current `tick` matches the `tick` included in the `ReadyEvent`. +If the tick values do not match, the call to `readiness()` on the next iteration +will not block and the new `tick` is included in the new `ReadyToken.` + +TODO + +## Implementing `AsyncRead` / `AsyncWrite` + +The `AsyncRead` and `AsyncWrite` traits use a "poll" based API. This means that +it is not possible to use an intrusive linked list to track the waker. +Additionally, there is no future associated with the operation which means it is +not possible to cancel interest in the readiness events. + +To implement `AsyncRead` and `AsyncWrite`, `ScheduledIo` includes dedicated +waker values for the read direction and the write direction. These values are +used to store the waker. Specific `interest` is not tracked for `AsyncRead` and +`AsyncWrite` implementations. It is assumed that only events of interest are: + +* Read ready +* Read closed +* Write ready +* Write closed + +Note that "read closed" and "write closed" are only available with Mio 0.7. With +Mio 0.6, things were a bit messy. + +It is only possible to implement `AsyncRead` and `AsyncWrite` for resource types +themselves and not for `&Resource`. Implementing the traits for `&Resource` +would permit concurrent operations to the resource. Because only a single waker +is stored per direction, any concurrent usage would result in deadlocks. An +alterate implementation would call for a `Vec` but this would result in +memory leaks. + +## Enabling reads and writes for `&TcpStream` + +Instead of implementing `AsyncRead` and `AsyncWrite` for `&TcpStream`, a new +function is added to `TcpStream`. + +```rust +impl TcpStream { + /// Naming TBD + fn by_ref(&self) -> TcpStreamRef<'_>; +} + +struct TcpStreamRef<'a> { + stream: &'a TcpStream, + + // `Waiter` is the node in the intrusive waiter linked-list + read_waiter: Waiter, + write_waiter: Waiter, +} +``` + +Now, `AsyncRead` and `AsyncWrite` can be implemented on `TcpStreamRef<'a>`. When +the `TcpStreamRef` is dropped, all associated waker resources are cleaned up. + +### Removing all the `split()` functions + +With `TcpStream::by_ref()`, `TcpStream::split()` is no longer needed. Instead, +it is possible to do something as follows. + +```rust +let rd = my_stream.by_ref(); +let wr = my_stream.by_ref(); + +select! { + // use `rd` and `wr` in separate branches. +} +``` + +It is also possible to sotre a `TcpStream` in an `Arc`. + +```rust +let arc_stream = Arc::new(my_tcp_stream); +let n = arc_stream.by_ref().read(buf).await?; +``` \ No newline at end of file diff --git a/tokio/src/io/async_read.rs b/tokio/src/io/async_read.rs index d341b63d41a..ba2303d1d2f 100644 --- a/tokio/src/io/async_read.rs +++ b/tokio/src/io/async_read.rs @@ -1,5 +1,4 @@ use super::ReadBuf; -use bytes::BufMut; use std::io; use std::ops::DerefMut; use std::pin::Pin; @@ -54,36 +53,6 @@ pub trait AsyncRead { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll>; - - /// Pulls some bytes from this source into the specified `BufMut`, returning - /// how many bytes were read. - /// - /// The `buf` provided will have bytes read into it and the internal cursor - /// will be advanced if any bytes were read. Note that this method typically - /// will not reallocate the buffer provided. - fn poll_read_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining_mut() { - return Poll::Ready(Ok(0)); - } - - let mut b = ReadBuf::uninit(buf.bytes_mut()); - - ready!(self.poll_read(cx, &mut b))?; - let n = b.filled().len(); - - // Safety: we can assume `n` bytes were read, since they are in`filled`. - unsafe { - buf.advance_mut(n); - } - Poll::Ready(Ok(n)) - } } macro_rules! deref_async_read { diff --git a/tokio/src/io/async_write.rs b/tokio/src/io/async_write.rs index ecf7575b128..66ba4bf3b27 100644 --- a/tokio/src/io/async_write.rs +++ b/tokio/src/io/async_write.rs @@ -1,4 +1,3 @@ -use bytes::Buf; use std::io; use std::ops::DerefMut; use std::pin::Pin; @@ -128,27 +127,6 @@ pub trait AsyncWrite { /// This function will panic if not called within the context of a future's /// task. fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Writes a `Buf` into this value, returning how many bytes were written. - /// - /// Note that this method will advance the `buf` provided automatically by - /// the number of bytes written. - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> - where - Self: Sized, - { - if !buf.has_remaining() { - return Poll::Ready(Ok(0)); - } - - let n = ready!(self.poll_write(cx, buf.bytes()))?; - buf.advance(n); - Poll::Ready(Ok(n)) - } } macro_rules! deref_async_write { diff --git a/tokio/src/io/split.rs b/tokio/src/io/split.rs index dcd3da2032b..fd3273ee28a 100644 --- a/tokio/src/io/split.rs +++ b/tokio/src/io/split.rs @@ -6,7 +6,6 @@ use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; -use bytes::{Buf, BufMut}; use std::cell::UnsafeCell; use std::fmt; use std::io; @@ -107,15 +106,6 @@ impl AsyncRead for ReadHalf { let mut inner = ready!(self.inner.poll_lock(cx)); inner.stream_pin().poll_read(cx, buf) } - - fn poll_read_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - let mut inner = ready!(self.inner.poll_lock(cx)); - inner.stream_pin().poll_read_buf(cx, buf) - } } impl AsyncWrite for WriteHalf { @@ -137,15 +127,6 @@ impl AsyncWrite for WriteHalf { let mut inner = ready!(self.inner.poll_lock(cx)); inner.stream_pin().poll_shutdown(cx) } - - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - let mut inner = ready!(self.inner.poll_lock(cx)); - inner.stream_pin().poll_write_buf(cx, buf) - } } impl Inner { diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index 0ab66c286d3..d631bd7e423 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1,6 +1,5 @@ use crate::io::util::chain::{chain, Chain}; use crate::io::util::read::{read, Read}; -use crate::io::util::read_buf::{read_buf, ReadBuf}; use crate::io::util::read_exact::{read_exact, ReadExact}; use crate::io::util::read_int::{ ReadI128, ReadI128Le, ReadI16, ReadI16Le, ReadI32, ReadI32Le, ReadI64, ReadI64Le, ReadI8, @@ -13,8 +12,6 @@ use crate::io::util::read_to_string::{read_to_string, ReadToString}; use crate::io::util::take::{take, Take}; use crate::io::AsyncRead; -use bytes::BufMut; - cfg_io_util! { /// Defines numeric reader macro_rules! read_impl { @@ -166,71 +163,6 @@ cfg_io_util! { read(self, buf) } - /// Pulls some bytes from this source into the specified buffer, - /// advancing the buffer's internal cursor. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn read_buf(&mut self, buf: &mut B) -> io::Result; - /// ``` - /// - /// Usually, only a single `read` syscall is issued, even if there is - /// more space in the supplied buffer. - /// - /// This function does not provide any guarantees about whether it - /// completes immediately or asynchronously - /// - /// # Return - /// - /// On a successful read, the number of read bytes is returned. If the - /// supplied buffer is not empty and the function returns `Ok(0)` then - /// the source as reached an "end-of-file" event. - /// - /// # Errors - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. If an error is returned then it must be - /// guaranteed that no bytes were read. - /// - /// # Examples - /// - /// [`File`] implements `Read` and [`BytesMut`] implements [`BufMut`]: - /// - /// [`File`]: crate::fs::File - /// [`BytesMut`]: bytes::BytesMut - /// [`BufMut`]: bytes::BufMut - /// - /// ```no_run - /// use tokio::fs::File; - /// use tokio::io::{self, AsyncReadExt}; - /// - /// use bytes::BytesMut; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let mut f = File::open("foo.txt").await?; - /// let mut buffer = BytesMut::with_capacity(10); - /// - /// assert!(buffer.is_empty()); - /// - /// // read up to 10 bytes, note that the return value is not needed - /// // to access the data that was read as `buffer`'s internal - /// // cursor is updated. - /// f.read_buf(&mut buffer).await?; - /// - /// println!("The bytes: {:?}", &buffer[..]); - /// Ok(()) - /// } - /// ``` - fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> - where - Self: Sized + Unpin, - B: BufMut, - { - read_buf(self, buf) - } - /// Reads the exact number of bytes required to fill `buf`. /// /// Equivalent to: diff --git a/tokio/src/io/util/async_write_ext.rs b/tokio/src/io/util/async_write_ext.rs index 321301e2897..5c6187b7880 100644 --- a/tokio/src/io/util/async_write_ext.rs +++ b/tokio/src/io/util/async_write_ext.rs @@ -2,7 +2,6 @@ use crate::io::util::flush::{flush, Flush}; use crate::io::util::shutdown::{shutdown, Shutdown}; use crate::io::util::write::{write, Write}; use crate::io::util::write_all::{write_all, WriteAll}; -use crate::io::util::write_buf::{write_buf, WriteBuf}; use crate::io::util::write_int::{ WriteI128, WriteI128Le, WriteI16, WriteI16Le, WriteI32, WriteI32Le, WriteI64, WriteI64Le, WriteI8, @@ -13,8 +12,6 @@ use crate::io::util::write_int::{ }; use crate::io::AsyncWrite; -use bytes::Buf; - cfg_io_util! { /// Defines numeric writer macro_rules! write_impl { @@ -119,79 +116,6 @@ cfg_io_util! { write(self, src) } - /// Writes a buffer into this writer, advancing the buffer's internal - /// cursor. - /// - /// Equivalent to: - /// - /// ```ignore - /// async fn write_buf(&mut self, buf: &mut B) -> io::Result; - /// ``` - /// - /// This function will attempt to write the entire contents of `buf`, but - /// the entire write may not succeed, or the write may also generate an - /// error. After the operation completes, the buffer's - /// internal cursor is advanced by the number of bytes written. A - /// subsequent call to `write_buf` using the **same** `buf` value will - /// resume from the point that the first call to `write_buf` completed. - /// A call to `write` represents *at most one* attempt to write to any - /// wrapped object. - /// - /// # Return - /// - /// If the return value is `Ok(n)` then it must be guaranteed that `n <= - /// buf.len()`. A return value of `0` typically means that the - /// underlying object is no longer able to accept bytes and will likely - /// not be able to in the future as well, or that the buffer provided is - /// empty. - /// - /// # Errors - /// - /// Each call to `write` may generate an I/O error indicating that the - /// operation could not be completed. If an error is returned then no bytes - /// in the buffer were written to this writer. - /// - /// It is **not** considered an error if the entire buffer could not be - /// written to this writer. - /// - /// # Examples - /// - /// [`File`] implements `Read` and [`Cursor<&[u8]>`] implements [`Buf`]: - /// - /// [`File`]: crate::fs::File - /// [`Buf`]: bytes::Buf - /// - /// ```no_run - /// use tokio::io::{self, AsyncWriteExt}; - /// use tokio::fs::File; - /// - /// use bytes::Buf; - /// use std::io::Cursor; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let mut file = File::create("foo.txt").await?; - /// let mut buffer = Cursor::new(b"data to write"); - /// - /// // Loop until the entire contents of the buffer are written to - /// // the file. - /// while buffer.has_remaining() { - /// // Writes some prefix of the byte string, not necessarily - /// // all of it. - /// file.write_buf(&mut buffer).await?; - /// } - /// - /// Ok(()) - /// } - /// ``` - fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B> - where - Self: Sized + Unpin, - B: Buf, - { - write_buf(self, src) - } - /// Attempts to write an entire buffer into this writer. /// /// Equivalent to: diff --git a/tokio/src/io/util/buf_reader.rs b/tokio/src/io/util/buf_reader.rs index 3ab78f0eb8d..9264ca59e2a 100644 --- a/tokio/src/io/util/buf_reader.rs +++ b/tokio/src/io/util/buf_reader.rs @@ -1,7 +1,6 @@ use crate::io::util::DEFAULT_BUF_SIZE; use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf}; -use bytes::Buf; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; @@ -151,14 +150,6 @@ impl AsyncWrite for BufReader { self.get_pin_mut().poll_write(cx, buf) } - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - self.get_pin_mut().poll_write_buf(cx, buf) - } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.get_pin_mut().poll_flush(cx) } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 1bd0a3f87b4..52dab990c62 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -39,7 +39,6 @@ cfg_io_util! { pub use mem::{duplex, DuplexStream}; mod read; - mod read_buf; mod read_exact; mod read_int; mod read_line; @@ -68,7 +67,6 @@ cfg_io_util! { mod write; mod write_all; - mod write_buf; mod write_int; diff --git a/tokio/src/io/util/read_buf.rs b/tokio/src/io/util/read_buf.rs deleted file mode 100644 index 6ee3d249f82..00000000000 --- a/tokio/src/io/util/read_buf.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::io::AsyncRead; - -use bytes::BufMut; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pub(crate) fn read_buf<'a, R, B>(reader: &'a mut R, buf: &'a mut B) -> ReadBuf<'a, R, B> -where - R: AsyncRead + Unpin, - B: BufMut, -{ - ReadBuf { reader, buf } -} - -cfg_io_util! { - /// Future returned by [`read_buf`](crate::io::AsyncReadExt::read_buf). - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct ReadBuf<'a, R, B> { - reader: &'a mut R, - buf: &'a mut B, - } -} - -impl Future for ReadBuf<'_, R, B> -where - R: AsyncRead + Unpin, - B: BufMut, -{ - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = &mut *self; - Pin::new(&mut *me.reader).poll_read_buf(cx, me.buf) - } -} diff --git a/tokio/src/io/util/write_buf.rs b/tokio/src/io/util/write_buf.rs deleted file mode 100644 index cedfde64e6e..00000000000 --- a/tokio/src/io/util/write_buf.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::io::AsyncWrite; - -use bytes::Buf; -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -cfg_io_util! { - /// A future to write some of the buffer to an `AsyncWrite`. - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct WriteBuf<'a, W, B> { - writer: &'a mut W, - buf: &'a mut B, - } -} - -/// Tries to write some bytes from the given `buf` to the writer in an -/// asynchronous manner, returning a future. -pub(crate) fn write_buf<'a, W, B>(writer: &'a mut W, buf: &'a mut B) -> WriteBuf<'a, W, B> -where - W: AsyncWrite + Unpin, - B: Buf, -{ - WriteBuf { writer, buf } -} - -impl Future for WriteBuf<'_, W, B> -where - W: AsyncWrite + Unpin, - B: Buf, -{ - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let me = &mut *self; - Pin::new(&mut *me.writer).poll_write_buf(cx, me.buf) - } -} diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 9d99d7bdfbf..6e927f05855 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -12,7 +12,6 @@ use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::net::TcpStream; -use bytes::Buf; use std::io; use std::net::Shutdown; use std::pin::Pin; @@ -148,14 +147,6 @@ impl AsyncWrite for WriteHalf<'_> { self.0.poll_write_priv(cx, buf) } - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - self.0.poll_write_buf_priv(cx, buf) - } - #[inline] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { // tcp flush is a no-op diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index 87be6efd8a1..2f35f495ca2 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -12,7 +12,6 @@ use crate::future::poll_fn; use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::net::TcpStream; -use bytes::Buf; use std::error::Error; use std::net::Shutdown; use std::pin::Pin; @@ -230,14 +229,6 @@ impl AsyncWrite for OwnedWriteHalf { self.inner.poll_write_priv(cx, buf) } - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - self.inner.poll_write_buf_priv(cx, buf) - } - #[inline] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { // tcp flush is a no-op diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 467d09fc7f1..4bf17449170 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -4,8 +4,6 @@ use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::ToSocketAddrs; -use bytes::Buf; -use iovec::IoVec; use std::convert::TryFrom; use std::fmt; use std::io::{self, Read, Write}; @@ -745,44 +743,6 @@ impl TcpStream { } } } - - pub(super) fn poll_write_buf_priv( - &self, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - use std::io::IoSlice; - - loop { - let ev = ready!(self.io.poll_write_ready(cx))?; - - // The `IoVec` (v0.1.x) type can't have a zero-length size, so create - // a dummy version from a 1-length slice which we'll overwrite with - // the `bytes_vectored` method. - static S: &[u8] = &[0]; - const MAX_BUFS: usize = 64; - - let mut slices: [IoSlice<'_>; MAX_BUFS] = [IoSlice::new(S); 64]; - let cnt = buf.bytes_vectored(&mut slices); - - let iovec = <&IoVec>::from(S); - let mut vecs = [iovec; MAX_BUFS]; - for i in 0..cnt { - vecs[i] = (*slices[i]).into(); - } - - match self.io.get_ref().write_bufs(&vecs[..cnt]) { - Ok(n) => { - buf.advance(n); - return Poll::Ready(Ok(n)); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - self.io.clear_readiness(ev); - } - Err(e) => return Poll::Ready(Err(e)), - } - } - } } impl TryFrom for mio::net::TcpStream { @@ -827,14 +787,6 @@ impl AsyncWrite for TcpStream { self.poll_write_priv(cx, buf) } - fn poll_write_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut B, - ) -> Poll> { - self.poll_write_buf_priv(cx, buf) - } - #[inline] fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { // tcp flush is a no-op diff --git a/tokio/tests/io_async_read.rs b/tokio/tests/io_async_read.rs index d1aae9a1a7f..aaeadfa4c11 100644 --- a/tokio/tests/io_async_read.rs +++ b/tokio/tests/io_async_read.rs @@ -1,113 +1,10 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::{AsyncRead, ReadBuf}; -use tokio_test::task; -use tokio_test::{assert_ready_err, assert_ready_ok}; - -use bytes::BytesMut; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +use tokio::io::AsyncRead; #[test] fn assert_obj_safe() { fn _assert() {} _assert::>(); } - -#[test] -fn read_buf_success() { - struct Rd; - - impl AsyncRead for Rd { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - buf.append(b"hello world"); - Poll::Ready(Ok(())) - } - } - - let mut buf = BytesMut::with_capacity(65); - - task::spawn(Rd).enter(|cx, rd| { - let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); - - assert_eq!(11, n); - assert_eq!(buf[..], b"hello world"[..]); - }); -} - -#[test] -fn read_buf_error() { - struct Rd; - - impl AsyncRead for Rd { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &mut ReadBuf<'_>, - ) -> Poll> { - let err = io::ErrorKind::Other.into(); - Poll::Ready(Err(err)) - } - } - - let mut buf = BytesMut::with_capacity(65); - - task::spawn(Rd).enter(|cx, rd| { - let err = assert_ready_err!(rd.poll_read_buf(cx, &mut buf)); - assert_eq!(err.kind(), io::ErrorKind::Other); - }); -} - -#[test] -fn read_buf_no_capacity() { - struct Rd; - - impl AsyncRead for Rd { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &mut ReadBuf<'_>, - ) -> Poll> { - unimplemented!(); - } - } - - let mut buf = [0u8; 0]; - - task::spawn(Rd).enter(|cx, rd| { - let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut &mut buf[..])); - assert_eq!(0, n); - }); -} - -#[test] -fn read_buf_uninitialized_ok() { - struct Rd; - - impl AsyncRead for Rd { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - assert_eq!(buf.remaining(), 64); - assert_eq!(buf.filled().len(), 0); - assert_eq!(buf.initialized().len(), 0); - Poll::Ready(Ok(())) - } - } - - // Can't create BytesMut w/ zero capacity, so fill it up - let mut buf = BytesMut::with_capacity(64); - - task::spawn(Rd).enter(|cx, rd| { - let n = assert_ready_ok!(rd.poll_read_buf(cx, &mut buf)); - assert_eq!(0, n); - }); -}