Skip to content

Commit

Permalink
net: add TcpStream::ready and non-blocking ops (#3130)
Browse files Browse the repository at this point in the history
Adds function to await for readiness on the TcpStream and non-blocking read/write functions.

`async fn TcpStream::ready(Interest)` waits for socket readiness satisfying **any** of the specified
interest. There are also two shorthand functions, `readable()` and `writable()`.

Once the stream is in a ready state, the caller may perform non-blocking operations on it using
`try_read()` and `try_write()`. These function return `WouldBlock` if the stream is not, in fact, ready.

The await readiness function are similar to `AsyncFd`, but do not require a guard. The guard in
`AsyncFd` protect against a potential race between receiving the readiness notification and clearing
it. The guard is needed as Tokio does not control the operations. With `TcpStream`, the `try_read()`
and `try_write()` function handle clearing stream readiness as needed.

This also exposes `Interest` and `Ready`, both defined in Tokio as wrappers for Mio types. These
types will also be useful for fixing #3072 .

Other I/O types, such as `TcpListener`, `UdpSocket`, `Unix*` should get similar functions, but this
is left for later PRs.

Refs: #3130
  • Loading branch information
carllerche committed Nov 13, 2020
1 parent 685da8d commit 02b1117
Show file tree
Hide file tree
Showing 10 changed files with 576 additions and 54 deletions.
70 changes: 62 additions & 8 deletions tokio/src/io/driver/interest.rs
@@ -1,38 +1,92 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]

use crate::io::driver::Ready;

use std::fmt;
use std::ops;

/// Readiness event interest
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
#[derive(Clone, Copy)]
pub(crate) struct Interest(mio::Interest);
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);

impl Interest {
/// Interest in all readable events
pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);

/// Interest in all writable events
pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);

/// Returns true if the value includes readable interest.
pub(crate) const fn is_readable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(Interest::READABLE.is_readable());
/// assert!(!Interest::WRITABLE.is_readable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_readable());
/// ```
pub const fn is_readable(self) -> bool {
self.0.is_readable()
}

/// Returns true if the value includes writable interest.
pub(crate) const fn is_writable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(!Interest::READABLE.is_writable());
/// assert!(Interest::WRITABLE.is_writable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_writable());
/// ```
pub const fn is_writable(self) -> bool {
self.0.is_writable()
}

/// Add together two `Interst` values.
pub(crate) const fn add(self, other: Interest) -> Interest {
///
/// This function works from a `const` context.
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
///
/// assert!(BOTH.is_readable());
/// assert!(BOTH.is_writable());
pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
}

// This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
}

pub(super) fn mask(self) -> Ready {
match self {
Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
_ => Ready::EMPTY,
}
}
}

impl ops::BitOr for Interest {
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/io/driver/mod.rs
@@ -1,10 +1,12 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]

mod interest;
pub(crate) use interest::Interest;
#[allow(unreachable_pub)]
pub use interest::Interest;

mod ready;
use ready::Ready;
#[allow(unreachable_pub)]
pub use ready::Ready;

mod registration;
pub(crate) use registration::Registration;
Expand Down Expand Up @@ -51,7 +53,7 @@ pub(crate) struct Handle {

pub(crate) struct ReadyEvent {
tick: u8,
ready: Ready,
pub(crate) ready: Ready,
}

pub(super) struct Inner {
Expand Down
120 changes: 85 additions & 35 deletions tokio/src/io/driver/ready.rs
@@ -1,3 +1,5 @@
#![cfg_attr(not(feature = "net"), allow(unreachable_pub))]

use std::fmt;
use std::ops;

Expand All @@ -6,36 +8,33 @@ const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;

/// A set of readiness event kinds.
///
/// `Ready` is set of operation descriptors indicating which kind of an
/// operation is ready to be performed.
/// Describes the readiness state of an I/O resources.
///
/// This struct only represents portable event kinds. Portable events are
/// events that can be raised on any platform while guaranteeing no false
/// positives.
/// `Ready` tracks which operation an I/O resource is ready to perform.
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub(crate) struct Ready(usize);
pub struct Ready(usize);

impl Ready {
/// Returns the empty `Ready` set.
pub(crate) const EMPTY: Ready = Ready(0);
pub const EMPTY: Ready = Ready(0);

/// Returns a `Ready` representing readable readiness.
pub(crate) const READABLE: Ready = Ready(READABLE);
pub const READABLE: Ready = Ready(READABLE);

/// Returns a `Ready` representing writable readiness.
pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
pub const WRITABLE: Ready = Ready(WRITABLE);

/// Returns a `Ready` representing read closed readiness.
pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
pub const READ_CLOSED: Ready = Ready(READ_CLOSED);

/// Returns a `Ready` representing write closed readiness.
pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);

/// Returns a `Ready` representing readiness for all operations.
pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);

// Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

Expand All @@ -59,27 +58,78 @@ impl Ready {
}

/// Returns true if `Ready` is the empty set
pub(crate) fn is_empty(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(Ready::EMPTY.is_empty());
/// assert!(!Ready::READABLE.is_empty());
/// ```
pub fn is_empty(self) -> bool {
self == Ready::EMPTY
}

/// Returns true if the value includes readable readiness
pub(crate) fn is_readable(self) -> bool {
/// Returns `true` if the value includes `readable`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_readable());
/// assert!(Ready::READABLE.is_readable());
/// assert!(Ready::READ_CLOSED.is_readable());
/// assert!(!Ready::WRITABLE.is_readable());
/// ```
pub fn is_readable(self) -> bool {
self.contains(Ready::READABLE) || self.is_read_closed()
}

/// Returns true if the value includes writable readiness
pub(crate) fn is_writable(self) -> bool {
/// Returns `true` if the value includes writable `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_writable());
/// assert!(!Ready::READABLE.is_writable());
/// assert!(Ready::WRITABLE.is_writable());
/// assert!(Ready::WRITE_CLOSED.is_writable());
/// ```
pub fn is_writable(self) -> bool {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}

/// Returns true if the value includes read closed readiness
pub(crate) fn is_read_closed(self) -> bool {
/// Returns `true` if the value includes read-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_read_closed());
/// assert!(!Ready::READABLE.is_read_closed());
/// assert!(Ready::READ_CLOSED.is_read_closed());
/// ```
pub fn is_read_closed(self) -> bool {
self.contains(Ready::READ_CLOSED)
}

/// Returns true if the value includes write closed readiness
pub(crate) fn is_write_closed(self) -> bool {
/// Returns `true` if the value includes write-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_write_closed());
/// assert!(!Ready::WRITABLE.is_write_closed());
/// assert!(Ready::WRITE_CLOSED.is_write_closed());
/// ```
pub fn is_write_closed(self) -> bool {
self.contains(Ready::WRITE_CLOSED)
}

Expand Down Expand Up @@ -143,37 +193,37 @@ cfg_io_readiness! {
}
}

impl<T: Into<Ready>> ops::BitOr<T> for Ready {
impl ops::BitOr<Ready> for Ready {
type Output = Ready;

#[inline]
fn bitor(self, other: T) -> Ready {
Ready(self.0 | other.into().0)
fn bitor(self, other: Ready) -> Ready {
Ready(self.0 | other.0)
}
}

impl<T: Into<Ready>> ops::BitOrAssign<T> for Ready {
impl ops::BitOrAssign<Ready> for Ready {
#[inline]
fn bitor_assign(&mut self, other: T) {
self.0 |= other.into().0;
fn bitor_assign(&mut self, other: Ready) {
self.0 |= other.0;
}
}

impl<T: Into<Ready>> ops::BitAnd<T> for Ready {
impl ops::BitAnd<Ready> for Ready {
type Output = Ready;

#[inline]
fn bitand(self, other: T) -> Ready {
Ready(self.0 & other.into().0)
fn bitand(self, other: Ready) -> Ready {
Ready(self.0 & other.0)
}
}

impl<T: Into<Ready>> ops::Sub<T> for Ready {
impl ops::Sub<Ready> for Ready {
type Output = Ready;

#[inline]
fn sub(self, other: T) -> Ready {
Ready(self.0 & !other.into().0)
fn sub(self, other: Ready) -> Ready {
Ready(self.0 & !other.0)
}
}

Expand Down
21 changes: 21 additions & 0 deletions tokio/src/io/driver/registration.rs
Expand Up @@ -182,6 +182,27 @@ impl Registration {
}
}
}

pub(crate) fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
let ev = self.shared.ready_event(interest);

// Don't attempt the operation if the resource is not ready.
if ev.ready.is_empty() {
return Err(io::ErrorKind::WouldBlock.into());
}

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
Err(io::ErrorKind::WouldBlock.into())
}
res => res,
}
}
}

fn gone() -> io::Error {
Expand Down
13 changes: 10 additions & 3 deletions tokio/src/io/driver/scheduled_io.rs
@@ -1,4 +1,4 @@
use super::{Ready, ReadyEvent, Tick};
use super::{Interest, Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
Expand Down Expand Up @@ -49,8 +49,6 @@ struct Waiters {
}

cfg_io_readiness! {
use crate::io::Interest;

#[derive(Debug)]
struct Waiter {
pointers: linked_list::Pointers<Waiter>,
Expand Down Expand Up @@ -280,6 +278,15 @@ impl ScheduledIo {
}
}

pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
let curr = self.readiness.load(Acquire);

ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
}
}

/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
Expand Down
7 changes: 5 additions & 2 deletions tokio/src/io/mod.rs
Expand Up @@ -204,9 +204,12 @@ pub use self::read_buf::ReadBuf;
#[doc(no_inline)]
pub use std::io::{Error, ErrorKind, Result, SeekFrom};

cfg_io_driver! {
cfg_io_driver_impl! {
pub(crate) mod driver;
pub(crate) use driver::Interest;

cfg_net! {
pub use driver::{Interest, Ready};
}

mod poll_evented;

Expand Down

0 comments on commit 02b1117

Please sign in to comment.