Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net: add TcpStream::ready and non-blocking ops #3130

Merged
merged 12 commits into from Nov 13, 2020
67 changes: 59 additions & 8 deletions tokio/src/io/driver/interest.rs
@@ -1,38 +1,89 @@
use crate::io::Ready;

use std::fmt;
use std::ops;

/// Readiness event interest
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
#[derive(Clone, Copy)]
pub(crate) struct Interest(mio::Interest);
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);
carllerche marked this conversation as resolved.
Show resolved Hide resolved

impl Interest {
/// Interest in all readable events
pub(crate) const READABLE: Interest = Interest(mio::Interest::READABLE);
/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);

/// Interest in all writable events
pub(crate) const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);

/// Returns true if the value includes readable interest.
pub(crate) const fn is_readable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(Interest::READABLE.is_readable());
/// assert!(!Interest::WRITABLE.is_readable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_readable());
/// ```
pub const fn is_readable(self) -> bool {
self.0.is_readable()
}

/// Returns true if the value includes writable interest.
pub(crate) const fn is_writable(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(!Interest::READABLE.is_writable());
/// assert!(Interest::WRITABLE.is_writable());
///
/// let both = Interest::READABLE | Interest::WRITABLE;
/// assert!(both.is_writable());
/// ```
pub const fn is_writable(self) -> bool {
self.0.is_writable()
}

/// Add together two `Interst` values.
pub(crate) const fn add(self, other: Interest) -> Interest {
///
/// This function works from a `const` context.
carllerche marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// const BOTH: Interest = Interest::READABLE.add(Interest::WRITABLE);
///
/// assert!(BOTH.is_readable());
/// assert!(BOTH.is_writable());
pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
}

// This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
}

pub(super) fn mask(self) -> Ready {
match self {
Interest::READABLE => Ready::READABLE | Ready::READ_CLOSED,
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
_ => Ready::EMPTY,
}
}
}

impl ops::BitOr for Interest {
Expand Down
8 changes: 5 additions & 3 deletions tokio/src/io/driver/mod.rs
@@ -1,10 +1,12 @@
#![cfg_attr(not(feature = "rt"), allow(dead_code))]

mod interest;
pub(crate) use interest::Interest;
#[allow(unreachable_pub)]
pub use interest::Interest;

mod ready;
use ready::Ready;
#[allow(unreachable_pub)]
pub use ready::Ready;

mod registration;
pub(crate) use registration::Registration;
Expand Down Expand Up @@ -51,7 +53,7 @@ pub(crate) struct Handle {

pub(crate) struct ReadyEvent {
tick: u8,
ready: Ready,
pub(crate) ready: Ready,
}

pub(super) struct Inner {
Expand Down
85 changes: 66 additions & 19 deletions tokio/src/io/driver/ready.rs
Expand Up @@ -6,36 +6,32 @@ const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;

/// A set of readiness event kinds.
/// Describes the readiness state of an I/O resources.
///
/// `Ready` is set of operation descriptors indicating which kind of an
/// operation is ready to be performed.
///
/// This struct only represents portable event kinds. Portable events are
/// events that can be raised on any platform while guaranteeing no false
/// positives.
/// `Ready` tracks which operation an I/O resource is ready to perform.
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub(crate) struct Ready(usize);
pub struct Ready(usize);
carllerche marked this conversation as resolved.
Show resolved Hide resolved

impl Ready {
/// Returns the empty `Ready` set.
pub(crate) const EMPTY: Ready = Ready(0);
pub const EMPTY: Ready = Ready(0);

/// Returns a `Ready` representing readable readiness.
pub(crate) const READABLE: Ready = Ready(READABLE);
pub const READABLE: Ready = Ready(READABLE);

/// Returns a `Ready` representing writable readiness.
pub(crate) const WRITABLE: Ready = Ready(WRITABLE);
pub const WRITABLE: Ready = Ready(WRITABLE);

/// Returns a `Ready` representing read closed readiness.
pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED);
pub const READ_CLOSED: Ready = Ready(READ_CLOSED);

/// Returns a `Ready` representing write closed readiness.
pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);
pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);

/// Returns a `Ready` representing readiness for all operations.
pub(crate) const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);

// Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

Expand All @@ -59,26 +55,77 @@ impl Ready {
}

/// Returns true if `Ready` is the empty set
pub(crate) fn is_empty(self) -> bool {
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(Ready::EMPTY.is_empty());
/// assert!(!Ready::READABLE.is_empty());
/// ```
pub fn is_empty(self) -> bool {
self == Ready::EMPTY
}

/// Returns true if the value includes readable readiness
/// Returns `true` if the value includes `readable`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_readable());
/// assert!(Ready::READABLE.is_readable());
/// assert!(Ready::READ_CLOSED.is_readable());
/// assert!(!Ready::WRITABLE.is_readable());
/// ```
pub(crate) fn is_readable(self) -> bool {
self.contains(Ready::READABLE) || self.is_read_closed()
}

/// Returns true if the value includes writable readiness
/// Returns `true` if the value includes writable `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_writable());
/// assert!(!Ready::READABLE.is_writable());
/// assert!(Ready::WRITABLE.is_writable());
/// assert!(Ready::WRITE_CLOSED.is_writable());
/// ```
pub(crate) fn is_writable(self) -> bool {
self.contains(Ready::WRITABLE) || self.is_write_closed()
}

/// Returns true if the value includes read closed readiness
/// Returns `true` if the value includes read-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_read_closed());
/// assert!(!Ready::READABLE.is_read_closed());
/// assert!(Ready::READ_CLOSED.is_read_closed());
/// ```
pub(crate) fn is_read_closed(self) -> bool {
self.contains(Ready::READ_CLOSED)
}

/// Returns true if the value includes write closed readiness
/// Returns `true` if the value includes write-closed `readiness`
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_write_closed());
/// assert!(!Ready::WRITABLE.is_write_closed());
/// assert!(Ready::WRITE_CLOSED.is_write_closed());
/// ```
pub(crate) fn is_write_closed(self) -> bool {
self.contains(Ready::WRITE_CLOSED)
}
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/io/driver/registration.rs
Expand Up @@ -182,6 +182,27 @@ impl Registration {
}
}
}

pub(crate) fn try_io<R>(
&self,
interest: Interest,
f: impl FnOnce() -> io::Result<R>,
) -> io::Result<R> {
let ev = self.shared.ready_event(interest);

// Don't attempt the operation if the resource is not ready.
if ev.ready.is_empty() {
return Err(io::ErrorKind::WouldBlock.into());
}

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(ev);
Err(io::ErrorKind::WouldBlock.into())
}
res => res,
}
}
}

fn gone() -> io::Error {
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/io/driver/scheduled_io.rs
Expand Up @@ -280,6 +280,15 @@ impl ScheduledIo {
}
}

pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
let curr = self.readiness.load(Acquire);

ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
}
}

/// Poll version of checking readiness for a certain direction.
///
/// These are to support `AsyncRead` and `AsyncWrite` polling methods,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/mod.rs
Expand Up @@ -206,7 +206,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom};

cfg_io_driver! {
pub(crate) mod driver;
pub(crate) use driver::Interest;
pub use driver::{Interest, Ready};

mod poll_evented;

Expand Down
29 changes: 28 additions & 1 deletion tokio/src/net/tcp/stream.rs
@@ -1,5 +1,5 @@
use crate::future::poll_fn;
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf};
use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
use crate::net::{to_socket_addrs, ToSocketAddrs};
Expand Down Expand Up @@ -264,6 +264,33 @@ impl TcpStream {
}
}

/// Wait for any of the requested ready states.
pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
let event = self.io.registration().readiness(interest).await?;
Ok(event.ready)
}

/// Attempt a non-blocking read.
pub async fn try_read(&self, buf: &mut ReadBuf<'_>) -> io::Result<()> {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
use std::io::Read;

let n = self.io.registration().try_io(Interest::READABLE, || {
// Safety: respecting the ReadBuf contract
let b = unsafe {
&mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
};
(&*self.io).read(b)
})?;

// Safety: `TcpStream::read` initializes the memory when reading into the buffer.
unsafe {
buf.assume_init(n);
buf.advance(n);
}

Ok(())
}

/// Receives data on the socket from the remote address to which it is
/// connected, without removing that data from the queue. On success,
/// returns the number of bytes peeked.
Expand Down