From 7e0cbbefcfbaa6e975c7f25cdb50ace730d1a38a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 11:09:35 -0800 Subject: [PATCH 01/12] net: add TcpStream::ready and try_read Sketch out splitting up awaiting for readiness and performing the operation. --- tokio/src/io/driver/interest.rs | 67 ++++++++++++++++++++--- tokio/src/io/driver/mod.rs | 8 ++- tokio/src/io/driver/ready.rs | 85 ++++++++++++++++++++++------- tokio/src/io/driver/registration.rs | 21 +++++++ tokio/src/io/driver/scheduled_io.rs | 9 +++ tokio/src/io/mod.rs | 2 +- tokio/src/net/tcp/stream.rs | 29 +++++++++- 7 files changed, 189 insertions(+), 32 deletions(-) diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index f9887e86fc1..861ac7a32e4 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -1,3 +1,5 @@ +use crate::io::Ready; + use std::fmt; use std::ops; @@ -5,34 +7,83 @@ use std::ops; /// /// Specifies the readiness events the caller is interested in when awaiting on /// I/O resource readiness states. -#[derive(Clone, Copy)] -pub(crate) struct Interest(mio::Interest); +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Interest(mio::Interest); impl Interest { - /// Interest in all readable events - pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE); + /// Interest in all readable events. + /// + /// Readable interest includes read-closed events. + pub const READABLE: Interest = Interest(mio::Interest::READABLE); /// Interest in all writable events - pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); + /// + /// Writable interest includes write-closed events. + pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE); /// Returns true if the value includes readable interest. - pub(crate) const fn is_readable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(Interest::READABLE.is_readable()); + /// assert!(!Interest::WRITABLE.is_readable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_readable()); + /// ``` + pub const fn is_readable(self) -> bool { self.0.is_readable() } /// Returns true if the value includes writable interest. - pub(crate) const fn is_writable(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// assert!(!Interest::READABLE.is_writable()); + /// assert!(Interest::WRITABLE.is_writable()); + /// + /// let both = Interest::READABLE | Interest::WRITABLE; + /// assert!(both.is_writable()); + /// ``` + pub const fn is_writable(self) -> bool { self.0.is_writable() } /// Add together two `Interst` values. - pub(crate) const fn add(self, other: Interest) -> Interest { + /// + /// This function works from a `const` context. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Interest; + /// + /// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// + /// assert!(BOTH.is_readable()); + /// assert!(BOTH.is_writable()); + pub const fn add(self, other: Interest) -> Interest { Interest(self.0.add(other.0)) } + // This function must be crate-private to avoid exposing a `mio` dependency. pub(crate) const fn to_mio(self) -> mio::Interest { self.0 } + + pub(super) fn mask(self) -> Ready { + match self { + Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED, + Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED, + _ => Ready::EMPTY, + } + } } impl ops::BitOr for Interest { diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index c494db41bad..a1784dff414 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -1,10 +1,12 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] mod interest; -pub(crate) use interest::Interest; +#[allow(unreachable_pub)] +pub use interest::Interest; mod ready; -use ready::Ready; +#[allow(unreachable_pub)] +pub use ready::Ready; mod registration; pub(crate) use registration::Registration; @@ -51,7 +53,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, - ready: Ready, + pub(crate) ready: Ready, } pub(super) struct Inner { diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 2790cc13059..dd660405921 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -6,36 +6,32 @@ const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; -/// A set of readiness event kinds. +/// Describes the readiness state of an I/O resources. /// -/// `Ready` is set of operation descriptors indicating which kind of an -/// operation is ready to be performed. -/// -/// This struct only represents portable event kinds. Portable events are -/// events that can be raised on any platform while guaranteeing no false -/// positives. +/// `Ready` tracks which operation an I/O resource is ready to perform. #[derive(Clone, Copy, PartialEq, PartialOrd)] -pub(crate) struct Ready(usize); +pub struct Ready(usize); impl Ready { /// Returns the empty `Ready` set. - pub(crate) const EMPTY: Ready = Ready(0); + pub const EMPTY: Ready = Ready(0); /// Returns a `Ready` representing readable readiness. - pub(crate) const READABLE: Ready = Ready(READABLE); + pub const READABLE: Ready = Ready(READABLE); /// Returns a `Ready` representing writable readiness. - pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + pub const WRITABLE: Ready = Ready(WRITABLE); /// Returns a `Ready` representing read closed readiness. - pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + pub const READ_CLOSED: Ready = Ready(READ_CLOSED); /// Returns a `Ready` representing write closed readiness. - pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); /// Returns a `Ready` representing readiness for all operations. - pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + // Must remain crate-private to avoid adding a public dependency on Mio. pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; @@ -59,26 +55,77 @@ impl Ready { } /// Returns true if `Ready` is the empty set - pub(crate) fn is_empty(self) -> bool { + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(Ready::EMPTY.is_empty()); + /// assert!(!Ready::READABLE.is_empty()); + /// ``` + pub fn is_empty(self) -> bool { self == Ready::EMPTY } - /// Returns true if the value includes readable readiness + /// Returns `true` if the value includes `readable` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_readable()); + /// assert!(Ready::READABLE.is_readable()); + /// assert!(Ready::READ_CLOSED.is_readable()); + /// assert!(!Ready::WRITABLE.is_readable()); + /// ``` pub(crate) fn is_readable(self) -> bool { self.contains(Ready::READABLE) || self.is_read_closed() } - /// Returns true if the value includes writable readiness + /// Returns `true` if the value includes writable `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_writable()); + /// assert!(!Ready::READABLE.is_writable()); + /// assert!(Ready::WRITABLE.is_writable()); + /// assert!(Ready::WRITE_CLOSED.is_writable()); + /// ``` pub(crate) fn is_writable(self) -> bool { self.contains(Ready::WRITABLE) || self.is_write_closed() } - /// Returns true if the value includes read closed readiness + /// Returns `true` if the value includes read-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_read_closed()); + /// assert!(!Ready::READABLE.is_read_closed()); + /// assert!(Ready::READ_CLOSED.is_read_closed()); + /// ``` pub(crate) fn is_read_closed(self) -> bool { self.contains(Ready::READ_CLOSED) } - /// Returns true if the value includes write closed readiness + /// Returns `true` if the value includes write-closed `readiness` + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_write_closed()); + /// assert!(!Ready::WRITABLE.is_write_closed()); + /// assert!(Ready::WRITE_CLOSED.is_write_closed()); + /// ``` pub(crate) fn is_write_closed(self) -> bool { self.contains(Ready::WRITE_CLOSED) } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index db9afdd7c81..93125814f84 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -182,6 +182,27 @@ impl Registration { } } } + + pub(crate) fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + let ev = self.shared.ready_event(interest); + + // Don't attempt the operation if the resource is not ready. + if ev.ready.is_empty() { + return Err(io::ErrorKind::WouldBlock.into()); + } + + match f() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + self.clear_readiness(ev); + Err(io::ErrorKind::WouldBlock.into()) + } + res => res, + } + } } fn gone() -> io::Error { diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index ed3adc39d71..99464a32fde 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -280,6 +280,15 @@ impl ScheduledIo { } } + pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent { + let curr = self.readiness.load(Acquire); + + ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + } + } + /// Poll version of checking readiness for a certain direction. /// /// These are to support `AsyncRead` and `AsyncWrite` polling methods, diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 499633ee1b1..d965e61879f 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -206,7 +206,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom}; cfg_io_driver! { pub(crate) mod driver; - pub(crate) use driver::Interest; + pub use driver::{Interest, Ready}; mod poll_evented; diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0a784b5f719..ab6f3fb8033 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -1,5 +1,5 @@ use crate::future::poll_fn; -use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf}; +use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; use crate::net::{to_socket_addrs, ToSocketAddrs}; @@ -264,6 +264,33 @@ impl TcpStream { } } + /// Wait for any of the requested ready states. + pub async fn ready(&self, interest: Interest) -> io::Result { + let event = self.io.registration().readiness(interest).await?; + Ok(event.ready) + } + + /// Attempt a non-blocking read. + pub async fn try_read(&self, buf: &mut ReadBuf<'_>) -> io::Result<()> { + use std::io::Read; + + let n = self.io.registration().try_io(Interest::READABLE, || { + // Safety: respecting the ReadBuf contract + let b = unsafe { + &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) + }; + (&*self.io).read(b) + })?; + + // Safety: `TcpStream::read` initializes the memory when reading into the buffer. + unsafe { + buf.assume_init(n); + buf.advance(n); + } + + Ok(()) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. From e59e8263bb2bfd39ca407d0427fbff36667e0044 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 11:42:41 -0800 Subject: [PATCH 02/12] tweak read, add write --- tokio/src/net/tcp/stream.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index ab6f3fb8033..0d6d73464b6 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -271,24 +271,21 @@ impl TcpStream { } /// Attempt a non-blocking read. - pub async fn try_read(&self, buf: &mut ReadBuf<'_>) -> io::Result<()> { + pub fn try_read(&self, buf: &mut [u8]) -> io::Result { use std::io::Read; - let n = self.io.registration().try_io(Interest::READABLE, || { - // Safety: respecting the ReadBuf contract - let b = unsafe { - &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit] as *mut [u8]) - }; - (&*self.io).read(b) - })?; - - // Safety: `TcpStream::read` initializes the memory when reading into the buffer. - unsafe { - buf.assume_init(n); - buf.advance(n); - } + self.io.registration().try_io(Interest::READABLE, || { + (&*self.io).read(buf) + }) + } + + /// Attempt a non-blocking write. + pub fn try_write(&self, buf: &[u8]) -> io::Result { + use std::io::Write; - Ok(()) + self.io.registration().try_io(Interest::WRITABLE, || { + (&*self.io).write(buf) + }) } /// Receives data on the socket from the remote address to which it is From 588bd494d1160e803a0733a31970ff9aebe3138a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 15:48:42 -0800 Subject: [PATCH 03/12] expand functions and add a test --- tokio/src/io/driver/ready.rs | 8 ++-- tokio/src/net/tcp/stream.rs | 12 ++++++ tokio/tests/tcp_stream.rs | 82 ++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 tokio/tests/tcp_stream.rs diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index dd660405921..95925369120 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -80,7 +80,7 @@ impl Ready { /// assert!(Ready::READ_CLOSED.is_readable()); /// assert!(!Ready::WRITABLE.is_readable()); /// ``` - pub(crate) fn is_readable(self) -> bool { + pub fn is_readable(self) -> bool { self.contains(Ready::READABLE) || self.is_read_closed() } @@ -96,7 +96,7 @@ impl Ready { /// assert!(Ready::WRITABLE.is_writable()); /// assert!(Ready::WRITE_CLOSED.is_writable()); /// ``` - pub(crate) fn is_writable(self) -> bool { + pub fn is_writable(self) -> bool { self.contains(Ready::WRITABLE) || self.is_write_closed() } @@ -111,7 +111,7 @@ impl Ready { /// assert!(!Ready::READABLE.is_read_closed()); /// assert!(Ready::READ_CLOSED.is_read_closed()); /// ``` - pub(crate) fn is_read_closed(self) -> bool { + pub fn is_read_closed(self) -> bool { self.contains(Ready::READ_CLOSED) } @@ -126,7 +126,7 @@ impl Ready { /// assert!(!Ready::WRITABLE.is_write_closed()); /// assert!(Ready::WRITE_CLOSED.is_write_closed()); /// ``` - pub(crate) fn is_write_closed(self) -> bool { + pub fn is_write_closed(self) -> bool { self.contains(Ready::WRITE_CLOSED) } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0d6d73464b6..f0e5d47c77d 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -270,6 +270,12 @@ impl TcpStream { Ok(event.ready) } + /// Wait for the socket to become readable. + pub async fn readable(&self) -> io::Result<()> { + self.ready(Interest::READABLE).await?; + Ok(()) + } + /// Attempt a non-blocking read. pub fn try_read(&self, buf: &mut [u8]) -> io::Result { use std::io::Read; @@ -279,6 +285,12 @@ impl TcpStream { }) } + /// Wait for the socket to become writable. + pub async fn writable(&self) -> io::Result<()> { + self.ready(Interest::WRITABLE).await?; + Ok(()) + } + /// Attempt a non-blocking write. pub fn try_write(&self, buf: &[u8]) -> io::Result { use std::io::Write; diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs new file mode 100644 index 00000000000..9b577377960 --- /dev/null +++ b/tokio/tests/tcp_stream.rs @@ -0,0 +1,82 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::io::Interest; +use tokio::net::{TcpListener, TcpStream}; +use tokio_test::{assert_pending, assert_ready_ok}; +use tokio_test::task; + +use std::io; + +#[tokio::test] +async fn try_read_write() { + const DATA: &[u8] = b"this is some data to write to the socket"; + + // Create listener + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + + // Create socket pair + let client = TcpStream::connect(listener.local_addr().unwrap()).await.unwrap(); + let (server, _) = listener.accept().await.unwrap(); + let mut written = DATA.to_vec(); + + // Track the server receiving data + let mut readable = task::spawn(server.readable()); + assert_pending!(readable.poll()); + + // Write data. + client.writable().await.unwrap(); + assert_eq!(DATA.len(), client.try_write(DATA).unwrap()); + + // The task should be notified + while !readable.is_woken() { + tokio::task::yield_now().await; + } + + // Fill the write buffer + loop { + // Still ready + let mut writable = task::spawn(client.writable()); + assert_ready_ok!(writable.poll()); + + match client.try_write(DATA) { + Ok(n) => written.extend(&DATA[..n]), + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(e) => panic!("error = {:?}", e), + } + } + + { + // Write buffer full + let mut writable = task::spawn(client.writable()); + assert_pending!(writable.poll()); + + // Drain the socket from the server end + let mut read = vec![0; written.len()]; + let mut i = 0; + + while i < read.len() { + server.readable().await.unwrap(); + + let n = server.try_read(&mut read[i..]).unwrap(); + i += n; + } + + assert_eq!(read, written); + } + + // Now, we listen for shutdown + drop(client); + + loop { + let ready = server.ready(Interest::READABLE).await.unwrap(); + + if ready.is_read_closed() { + return; + } else { + tokio::task::yield_now().await; + } + } +} From 12d636812d951587dd679f782656b9c9c3c5a9e3 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 15:59:42 -0800 Subject: [PATCH 04/12] write some docs --- tokio/src/net/tcp/stream.rs | 97 ++++++++++++++++++++++++++++++++++--- tokio/tests/tcp_stream.rs | 8 +-- 2 files changed, 96 insertions(+), 9 deletions(-) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index f0e5d47c77d..76ea8505b7f 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -271,18 +271,103 @@ impl TcpStream { } /// Wait for the socket to become readable. + /// + /// This function is usually paired with `try_read`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` pub async fn readable(&self) -> io::Result<()> { self.ready(Interest::READABLE).await?; Ok(()) } /// Attempt a non-blocking read. + /// + /// Receives any pending data from the socket but does not wait for new data + /// to arrive. On success, returns the number of bytes read. + /// + /// Usually, [`readable()`] or [`ready()`] is used with this function. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut msg = vec![0; 1024]; + /// + /// loop { + /// // Wait for the socket to be readable + /// stream.readable().await?; + /// + /// // Try to read data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_read(&mut msg) { + /// Ok(n) => { + /// msg.truncate(n); + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// println!("GOT = {:?}", msg); + /// Ok(()) + /// } + /// ``` pub fn try_read(&self, buf: &mut [u8]) -> io::Result { use std::io::Read; - self.io.registration().try_io(Interest::READABLE, || { - (&*self.io).read(buf) - }) + self.io + .registration() + .try_io(Interest::READABLE, || (&*self.io).read(buf)) } /// Wait for the socket to become writable. @@ -295,9 +380,9 @@ impl TcpStream { pub fn try_write(&self, buf: &[u8]) -> io::Result { use std::io::Write; - self.io.registration().try_io(Interest::WRITABLE, || { - (&*self.io).write(buf) - }) + self.io + .registration() + .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) } /// Receives data on the socket from the remote address to which it is diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 9b577377960..a2f606fe9d1 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -3,8 +3,8 @@ use tokio::io::Interest; use tokio::net::{TcpListener, TcpStream}; -use tokio_test::{assert_pending, assert_ready_ok}; use tokio_test::task; +use tokio_test::{assert_pending, assert_ready_ok}; use std::io; @@ -16,7 +16,9 @@ async fn try_read_write() { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); // Create socket pair - let client = TcpStream::connect(listener.local_addr().unwrap()).await.unwrap(); + let client = TcpStream::connect(listener.local_addr().unwrap()) + .await + .unwrap(); let (server, _) = listener.accept().await.unwrap(); let mut written = DATA.to_vec(); @@ -38,7 +40,7 @@ async fn try_read_write() { // Still ready let mut writable = task::spawn(client.writable()); assert_ready_ok!(writable.poll()); - + match client.try_write(DATA) { Ok(n) => written.extend(&DATA[..n]), Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { From 7266d3c3bbe72938187e5a728cbfe4603aa9be55 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 22:13:25 -0800 Subject: [PATCH 05/12] more docs and tests --- tokio/src/net/tcp/stream.rs | 156 +++++++++++++++++++++++++++++++++--- tokio/tests/tcp_stream.rs | 25 ++++++ 2 files changed, 171 insertions(+), 10 deletions(-) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 76ea8505b7f..b649bea0be7 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -265,6 +265,44 @@ impl TcpStream { } /// Wait for any of the requested ready states. + /// + /// This function is usually paired with `try_read()` or `try_write()`. It + /// can be used to concurrently read / write to the same socket on a single + /// task without splitting the socket. + /// + /// # Examples + /// + /// Concurrently read and write to the stream on the same task without + /// splitting. + /// + /// ```no_run + /// use tokio::io::Interest; + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; + /// + /// if ready.is_readable() { + /// // The buffer is **not** included in the async task and will only exist + /// // on the stack. + /// let mut data = [0; 1024]; + /// let n = stream.try_read(&mut data[..]).unwrap(); + /// + /// println!("GOT {:?}", &data[..n]); + /// } + /// + /// if ready.is_writable() { + /// // Write some data + /// stream.try_write(b"hello world").unwrap(); + /// } + /// } + /// } + /// ``` pub async fn ready(&self, interest: Interest) -> io::Result { let event = self.io.registration().readiness(interest).await?; Ok(event.ready) @@ -272,7 +310,8 @@ impl TcpStream { /// Wait for the socket to become readable. /// - /// This function is usually paired with `try_read`. + /// This function is equivalent to `ready(Interest::READABLE)` is usually + /// paired with `try_read()`. /// /// # Examples /// @@ -317,13 +356,23 @@ impl TcpStream { Ok(()) } - /// Attempt a non-blocking read. + /// Try to read data from the stream into the provided buffer, returning how + /// many bytes were read. /// /// Receives any pending data from the socket but does not wait for new data - /// to arrive. On success, returns the number of bytes read. + /// to arrive. On success, returns the number of bytes read. Because + /// `try_read()` is non-blocking, the buffer does not have to be stored by + /// the async task and can exist entirely on the stack. /// /// Usually, [`readable()`] or [`ready()`] is used with this function. /// + /// # Return + /// + /// If data is successfully read, `Ok(n)` is returned, where `n` is the + /// number of bytes read. `Ok(n)` indicates the stream's read half is closed + /// and will no longer yield data. If the stream is not ready to read data + /// `Err(io::ErrorKinid::WouldBlock)` is returned. + /// /// # Examples /// /// ```no_run @@ -336,18 +385,20 @@ impl TcpStream { /// // Connect to a peer /// let stream = TcpStream::connect("127.0.0.1:8080").await?; /// - /// let mut msg = vec![0; 1024]; - /// /// loop { /// // Wait for the socket to be readable /// stream.readable().await?; /// + /// // Creating the buffer **after** the `await` prevents it from + /// // being stored in the async task. + /// let mut buf = [0; 4096]; + /// /// // Try to read data, this may still fail with `WouldBlock` /// // if the readiness event is a false positive. - /// match stream.try_read(&mut msg) { + /// match stream.try_read(&mut buf) { + /// Ok(0) => break, /// Ok(n) => { - /// msg.truncate(n); - /// break; + /// println!("read {} bytes", n); /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; @@ -358,7 +409,6 @@ impl TcpStream { /// } /// } /// - /// println!("GOT = {:?}", msg); /// Ok(()) /// } /// ``` @@ -371,12 +421,98 @@ impl TcpStream { } /// Wait for the socket to become writable. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub async fn writable(&self) -> io::Result<()> { self.ready(Interest::WRITABLE).await?; Ok(()) } - /// Attempt a non-blocking write. + /// Try to write a buffer to the stream, returning how many bytes were + /// written. + /// + /// The function will attempt to write the entire contents of `buf`, but + /// only part of the buffer may be written. + /// + /// This function is equivalent to `ready(Interest::WRITABLE)` is usually + /// paired with `try_write()`. + /// + /// # Return + /// + /// If data is successfully written, `Ok(n)` is returned, where `n` is the + /// number of bytes written. If the stream is not ready to write data, + /// `Err(io::ErrorKind::WouldBlock)` is returned. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::net::TcpStream; + /// use std::error::Error; + /// use std::io; + /// + /// #[tokio::main] + /// async fn main() -> Result<(), Box> { + /// // Connect to a peer + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// loop { + /// // Wait for the socket to be writable + /// stream.writable().await?; + /// + /// // Try to write data, this may still fail with `WouldBlock` + /// // if the readiness event is a false positive. + /// match stream.try_write(b"hello world") { + /// Ok(n) => { + /// break; + /// } + /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + /// continue; + /// } + /// Err(e) => { + /// return Err(e.into()); + /// } + /// } + /// } + /// + /// Ok(()) + /// } + /// ``` pub fn try_write(&self, buf: &[u8]) -> io::Result { use std::io::Write; diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index a2f606fe9d1..8bd746d1a7f 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -82,3 +82,28 @@ async fn try_read_write() { } } } + +#[test] +fn buffer_not_included_in_future() { + use std::mem; + + const N: usize = 4096; + + let fut = async { + let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap(); + + loop { + stream.readable().await.unwrap(); + + let mut buf = [0; N]; + let n = stream.try_read(&mut buf[..]).unwrap(); + + if n == 0 { + break; + } + } + }; + + let n = mem::size_of_val(&fut); + assert!(n < 1000); +} From 79052f28a3f6eebf94115fbd3ce77e994e9d484a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 22:18:12 -0800 Subject: [PATCH 06/12] fix feature build --- tokio/src/io/driver/scheduled_io.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 99464a32fde..75d562327c1 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -1,4 +1,4 @@ -use super::{Ready, ReadyEvent, Tick}; +use super::{Interest, Ready, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; @@ -49,8 +49,6 @@ struct Waiters { } cfg_io_readiness! { - use crate::io::Interest; - #[derive(Debug)] struct Waiter { pointers: linked_list::Pointers, From c1dcc79f4d403056af9c89246ee985ae3bac3f5c Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 22:23:10 -0800 Subject: [PATCH 07/12] fix docs build --- tokio/src/net/tcp/stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index b649bea0be7..2ac37a2bc85 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -366,6 +366,9 @@ impl TcpStream { /// /// Usually, [`readable()`] or [`ready()`] is used with this function. /// + /// [`readable()`]: TcpStream::readable() + /// [`ready()`]: TcpStream::ready() + /// /// # Return /// /// If data is successfully read, `Ok(n)` is returned, where `n` is the From 9853bf2e0ffb4dbf42105c7e189e65014c9f1798 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 11 Nov 2020 22:40:28 -0800 Subject: [PATCH 08/12] fix test --- tokio/tests/tcp_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 8bd746d1a7f..784ade8a411 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -62,8 +62,11 @@ async fn try_read_write() { while i < read.len() { server.readable().await.unwrap(); - let n = server.try_read(&mut read[i..]).unwrap(); - i += n; + match server.try_read(&mut read[i..]) { + Ok(n) => i += n, + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(e) => panic!("error = {:?}", e), + } } assert_eq!(read, written); From 4ff52630f3dbeae87e7c7007dd37377cf7e45206 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 12 Nov 2020 08:43:19 -0800 Subject: [PATCH 09/12] fix doc feature flag --- tokio/src/io/driver/interest.rs | 18 +++++++++++------- tokio/src/io/driver/ready.rs | 14 +++++++++----- tokio/src/io/mod.rs | 7 +++++-- tokio/src/macros/cfg.rs | 13 +++++++++++++ tokio/src/signal/unix/driver.rs | 4 ++-- 5 files changed, 40 insertions(+), 16 deletions(-) diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index 861ac7a32e4..4304050b123 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -1,14 +1,18 @@ -use crate::io::Ready; +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + +use crate::io::driver::Ready; use std::fmt; use std::ops; -/// Readiness event interest -/// -/// Specifies the readiness events the caller is interested in when awaiting on -/// I/O resource readiness states. -#[derive(Clone, Copy, Eq, PartialEq)] -pub struct Interest(mio::Interest); +cfg_net! { + /// Readiness event interest + /// + /// Specifies the readiness events the caller is interested in when awaiting on + /// I/O resource readiness states. + #[derive(Clone, Copy, Eq, PartialEq)] + pub struct Interest(mio::Interest); +} impl Interest { /// Interest in all readable events. diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 95925369120..72c0b13504a 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -1,3 +1,5 @@ +#![cfg_attr(not(feature = "net"), allow(unreachable_pub))] + use std::fmt; use std::ops; @@ -6,11 +8,13 @@ const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; -/// Describes the readiness state of an I/O resources. -/// -/// `Ready` tracks which operation an I/O resource is ready to perform. -#[derive(Clone, Copy, PartialEq, PartialOrd)] -pub struct Ready(usize); +cfg_net! { + /// Describes the readiness state of an I/O resources. + /// + /// `Ready` tracks which operation an I/O resource is ready to perform. + #[derive(Clone, Copy, PartialEq, PartialOrd)] + pub struct Ready(usize); +} impl Ready { /// Returns the empty `Ready` set. diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index d965e61879f..14be3e06d83 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -204,9 +204,12 @@ pub use self::read_buf::ReadBuf; #[doc(no_inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom}; -cfg_io_driver! { +cfg_io_driver_impl! { pub(crate) mod driver; - pub use driver::{Interest, Ready}; + + cfg_net! { + pub use driver::{Interest, Ready}; + } mod poll_evented; diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index edf681a4b8c..15216560b61 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -79,6 +79,19 @@ macro_rules! cfg_io_driver { } } +macro_rules! cfg_io_driver_impl { + ( $( $item:item )* ) => { + $( + #[cfg(any( + feature = "net", + feature = "process", + all(unix, feature = "signal"), + ))] + $item + )* + } +} + macro_rules! cfg_not_io_driver { ($($item:item)*) => { $( diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 323bb9dff12..315f3bd5830 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -2,8 +2,8 @@ //! Signal driver -use crate::io::driver::Driver as IoDriver; -use crate::io::{Interest, PollEvented}; +use crate::io::driver::{Driver as IoDriver, Interest}; +use crate::io::PollEvented; use crate::park::Park; use crate::signal::registry::globals; From cf8840e5798b70b3c99d2e4ff0a22e6ffe24b3f5 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 12 Nov 2020 08:48:03 -0800 Subject: [PATCH 10/12] Apply suggestions from code review Co-authored-by: Alice Ryhl --- tokio/src/io/driver/interest.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index 4304050b123..b80ec67bee3 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -61,7 +61,8 @@ impl Interest { /// Add together two `Interst` values. /// - /// This function works from a `const` context. + /// This method is an alias for the `|` operator that works from a + /// `const` context. /// /// # Examples /// From 06325326919db65aa3e1aebb4b66f204b9a7d850 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 12 Nov 2020 08:52:59 -0800 Subject: [PATCH 11/12] simplify Ready ops --- tokio/src/io/driver/ready.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 72c0b13504a..c5173741ec8 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -194,37 +194,37 @@ cfg_io_readiness! { } } -impl> ops::BitOr for Ready { +impl ops::BitOr for Ready { type Output = Ready; #[inline] - fn bitor(self, other: T) -> Ready { - Ready(self.0 | other.into().0) + fn bitor(self, other: Ready) -> Ready { + Ready(self.0 | other.0) } } -impl> ops::BitOrAssign for Ready { +impl ops::BitOrAssign for Ready { #[inline] - fn bitor_assign(&mut self, other: T) { - self.0 |= other.into().0; + fn bitor_assign(&mut self, other: Ready) { + self.0 |= other.0; } } -impl> ops::BitAnd for Ready { +impl ops::BitAnd for Ready { type Output = Ready; #[inline] - fn bitand(self, other: T) -> Ready { - Ready(self.0 & other.into().0) + fn bitand(self, other: Ready) -> Ready { + Ready(self.0 & other.0) } } -impl> ops::Sub for Ready { +impl ops::Sub for Ready { type Output = Ready; #[inline] - fn sub(self, other: T) -> Ready { - Ready(self.0 & !other.into().0) + fn sub(self, other: Ready) -> Ready { + Ready(self.0 & !other.0) } } From c1f3da6b81754a579f72f65886f9001394f9b5ea Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 12 Nov 2020 11:08:18 -0800 Subject: [PATCH 12/12] fix doc build --- tokio/src/io/driver/interest.rs | 18 ++++++++---------- tokio/src/io/driver/ready.rs | 13 ++++++------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/tokio/src/io/driver/interest.rs b/tokio/src/io/driver/interest.rs index b80ec67bee3..8c8049dfb53 100644 --- a/tokio/src/io/driver/interest.rs +++ b/tokio/src/io/driver/interest.rs @@ -5,14 +5,13 @@ use crate::io::driver::Ready; use std::fmt; use std::ops; -cfg_net! { - /// Readiness event interest - /// - /// Specifies the readiness events the caller is interested in when awaiting on - /// I/O resource readiness states. - #[derive(Clone, Copy, Eq, PartialEq)] - pub struct Interest(mio::Interest); -} +/// Readiness event interest +/// +/// Specifies the readiness events the caller is interested in when awaiting on +/// I/O resource readiness states. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, Eq, PartialEq)] +pub struct Interest(mio::Interest); impl Interest { /// Interest in all readable events. @@ -61,8 +60,7 @@ impl Interest { /// Add together two `Interst` values. /// - /// This method is an alias for the `|` operator that works from a - /// `const` context. + /// This function works from a `const` context. /// /// # Examples /// diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index c5173741ec8..2ac01bdbec3 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -8,13 +8,12 @@ const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; -cfg_net! { - /// Describes the readiness state of an I/O resources. - /// - /// `Ready` tracks which operation an I/O resource is ready to perform. - #[derive(Clone, Copy, PartialEq, PartialOrd)] - pub struct Ready(usize); -} +/// Describes the readiness state of an I/O resources. +/// +/// `Ready` tracks which operation an I/O resource is ready to perform. +#[cfg_attr(docsrs, doc(cfg(feature = "net")))] +#[derive(Clone, Copy, PartialEq, PartialOrd)] +pub struct Ready(usize); impl Ready { /// Returns the empty `Ready` set.