From 7a43d9ca31400a02f21cc93ae6f615c0ec6c2958 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 11 Sep 2020 17:06:17 -0700 Subject: [PATCH] Put readiness stuff in io-readiness feature --- .github/workflows/ci.yml | 4 +- tokio/Cargo.toml | 8 +- tokio/src/io/driver/scheduled_io.rs | 307 +++++++++++++++------------- tokio/src/io/poll_evented.rs | 17 +- tokio/src/io/registration.rs | 14 +- tokio/src/lib.rs | 4 +- tokio/src/macros/cfg.rs | 5 +- tokio/src/util/linked_list.rs | 4 +- tokio/src/util/mod.rs | 2 +- 9 files changed, 196 insertions(+), 169 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0ab36cd9a11..3e721f520c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,11 +150,11 @@ jobs: run: cargo install cargo-hack - name: check --each-feature - run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps # Try with unstable feature flags - name: check --each-feature --unstable - run: cargo hack check --all --each-feature --skip io-driver -Z avoid-dev-deps + run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps env: RUSTFLAGS: --cfg tokio_unstable diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b1d943e3119..6df368ebd80 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -33,7 +33,6 @@ full = [ "blocking", "dns", "fs", - "io-driver", "io-util", "io-std", "macros", @@ -51,7 +50,8 @@ full = [ blocking = ["rt-core"] dns = ["rt-core"] fs = ["rt-core", "io-util"] -io-driver = ["mio", "lazy_static"] +io-driver = ["mio", "lazy_static"] # internal only +io-readiness = [] # internal only io-util = ["memchr"] # stdin, stdout, stderr io-std = ["rt-core"] @@ -85,8 +85,8 @@ sync = ["fnv"] test-util = [] tcp = ["io-driver", "iovec"] time = ["slab"] -udp = ["io-driver"] -uds = ["io-driver", "mio-uds", "libc"] +udp = ["io-driver", "io-readiness"] +uds = ["io-driver", "io-readiness", "mio-uds", "libc"] [dependencies] tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true } diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 2f1d3316ede..7a33d39bab0 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -2,17 +2,21 @@ use super::{platform, Direction, ReadyEvent, Tick}; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Mutex; use crate::util::bit; -use crate::util::linked_list::{self, LinkedList}; use crate::util::slab::Entry; -use std::cell::UnsafeCell; -use std::future::Future; -use std::marker::PhantomPinned; -use std::pin::Pin; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; +cfg_io_readiness! { + use crate::util::linked_list::{self, LinkedList}; + + use std::cell::UnsafeCell; + use std::future::Future; + use std::marker::PhantomPinned; + use std::pin::Pin; + use std::ptr::NonNull; +} + /// Stored in the I/O driver resource slab. #[derive(Debug)] pub(crate) struct ScheduledIo { @@ -22,10 +26,12 @@ pub(crate) struct ScheduledIo { waiters: Mutex, } +#[cfg(feature = "io-readiness")] type WaitList = LinkedList::Target>; #[derive(Debug, Default)] struct Waiters { + #[cfg(feature = "io-readiness")] /// List of all current waiters list: WaitList, @@ -36,36 +42,38 @@ struct Waiters { writer: Option, } -#[derive(Debug)] -struct Waiter { - pointers: linked_list::Pointers, +cfg_io_readiness! { + #[derive(Debug)] + struct Waiter { + pointers: linked_list::Pointers, - /// The waker for this task - waker: Option, + /// The waker for this task + waker: Option, - /// The interest this waiter is waiting on - interest: mio::Ready, + /// The interest this waiter is waiting on + interest: mio::Ready, - notified: bool, + notified: bool, - /// Should never be `!Unpin` - _p: PhantomPinned, -} + /// Should never be `!Unpin` + _p: PhantomPinned, + } -/// Future returned by `readiness()` -struct Readiness<'a> { - scheduled_io: &'a ScheduledIo, + /// Future returned by `readiness()` + struct Readiness<'a> { + scheduled_io: &'a ScheduledIo, - state: State, + state: State, - /// Entry in the waiter `LinkedList`. - waiter: UnsafeCell, -} + /// Entry in the waiter `LinkedList`. + waiter: UnsafeCell, + } -enum State { - Init, - Waiting, - Done, + enum State { + Init, + Waiting, + Done, + } } // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. @@ -198,15 +206,18 @@ impl ScheduledIo { } } - // check list of waiters - for waiter in waiters - .list - .drain_filter(|w| !(w.interest & ready).is_empty()) + #[cfg(feature = "io-readiness")] { - let waiter = unsafe { &mut *waiter.as_ptr() }; - if let Some(waker) = waiter.waker.take() { - waiter.notified = true; - waker.wake(); + // check list of waiters + for waiter in waiters + .list + .drain_filter(|w| !(w.interest & ready).is_empty()) + { + let waiter = unsafe { &mut *waiter.as_ptr() }; + if let Some(waker) = waiter.waker.take() { + waiter.notified = true; + waker.wake(); + } } } } @@ -254,29 +265,6 @@ impl ScheduledIo { } } - /// An async version of `poll_readiness` which uses a linked list of wakers - pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { - self.readiness_fut(interest).await - } - - // This is in a separate function so that the borrow checker doesn't think - // we are borrowing the `UnsafeCell` possibly over await boundaries. - // - // Go figure. - fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { - Readiness { - scheduled_io: self, - state: State::Init, - waiter: UnsafeCell::new(Waiter { - pointers: linked_list::Pointers::new(), - waker: None, - notified: false, - interest, - _p: PhantomPinned, - }), - } - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { // This consumes the current readiness state **except** for HUP and // error. HUP and error are excluded because a) they are final states @@ -310,114 +298,141 @@ impl Drop for ScheduledIo { unsafe impl Send for ScheduledIo {} unsafe impl Sync for ScheduledIo {} -unsafe impl linked_list::Link for Waiter { - type Handle = NonNull; - type Target = Waiter; +cfg_io_readiness! { + impl ScheduledIo { + /// An async version of `poll_readiness` which uses a linked list of wakers + pub(crate) async fn readiness(&self, interest: mio::Ready) -> ReadyEvent { + self.readiness_fut(interest).await + } - fn as_raw(handle: &NonNull) -> NonNull { - *handle + // This is in a separate function so that the borrow checker doesn't think + // we are borrowing the `UnsafeCell` possibly over await boundaries. + // + // Go figure. + fn readiness_fut(&self, interest: mio::Ready) -> Readiness<'_> { + Readiness { + scheduled_io: self, + state: State::Init, + waiter: UnsafeCell::new(Waiter { + pointers: linked_list::Pointers::new(), + waker: None, + notified: false, + interest, + _p: PhantomPinned, + }), + } + } } - unsafe fn from_raw(ptr: NonNull) -> NonNull { - ptr - } + unsafe impl linked_list::Link for Waiter { + type Handle = NonNull; + type Target = Waiter; - unsafe fn pointers(mut target: NonNull) -> NonNull> { - NonNull::from(&mut target.as_mut().pointers) - } -} + fn as_raw(handle: &NonNull) -> NonNull { + *handle + } -// ===== impl Readiness ===== + unsafe fn from_raw(ptr: NonNull) -> NonNull { + ptr + } -impl Future for Readiness<'_> { - type Output = ReadyEvent; + unsafe fn pointers(mut target: NonNull) -> NonNull> { + NonNull::from(&mut target.as_mut().pointers) + } + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let (scheduled_io, state, waiter) = unsafe { - let me = self.get_unchecked_mut(); - (&me.scheduled_io, &mut me.state, &me.waiter) - }; + // ===== impl Readiness ===== - loop { - match *state { - State::Init => { - let mut waiters = scheduled_io.waiters.lock().unwrap(); + impl Future for Readiness<'_> { + type Output = ReadyEvent; - // Safety: called while locked - unsafe { - (*waiter.get()).waker = Some(cx.waker().clone()); - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (scheduled_io, state, waiter) = unsafe { + let me = self.get_unchecked_mut(); + (&me.scheduled_io, &mut me.state, &me.waiter) + }; - // Insert the waiter into the linked list - // - // safety: pointers from `UnsafeCell` are never null. - waiters - .list - .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); - *state = State::Waiting; - } - State::Waiting => { - // Currently in the "Waiting" state, implying the caller has - // a waiter stored in the waiter list (guarded by - // `notify.waiters`). In order to access the waker fields, - // we must hold the lock. - - let waiters = scheduled_io.waiters.lock().unwrap(); - - // Safety: called while locked - let w = unsafe { &mut *waiter.get() }; - - if w.notified { - // Our waker has been notified. Reset the fields and - // remove it from the list. - w.waker = None; - w.notified = false; - - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); + loop { + match *state { + State::Init => { + let mut waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + unsafe { + (*waiter.get()).waker = Some(cx.waker().clone()); } - return Poll::Pending; + // Insert the waiter into the linked list + // + // safety: pointers from `UnsafeCell` are never null. + waiters + .list + .push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); + *state = State::Waiting; } + State::Waiting => { + // Currently in the "Waiting" state, implying the caller has + // a waiter stored in the waiter list (guarded by + // `notify.waiters`). In order to access the waker fields, + // we must hold the lock. + + let waiters = scheduled_io.waiters.lock().unwrap(); + + // Safety: called while locked + let w = unsafe { &mut *waiter.get() }; + + if w.notified { + // Our waker has been notified. Reset the fields and + // remove it from the list. + w.waker = None; + w.notified = false; + + *state = State::Done; + } else { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; + } - // Explicit drop of the lock to indicate the scope that the - // lock is held. Because holding the lock is required to - // ensure safe access to fields not held within the lock, it - // is helpful to visualize the scope of the critical - // section. - drop(waiters); - } - State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; + // Explicit drop of the lock to indicate the scope that the + // lock is held. Because holding the lock is required to + // ensure safe access to fields not held within the lock, it + // is helpful to visualize the scope of the critical + // section. + drop(waiters); + } + State::Done => { + let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - // Safety: State::Done means it is no longer shared - let w = unsafe { &mut *waiter.get() }; + // Safety: State::Done means it is no longer shared + let w = unsafe { &mut *waiter.get() }; - return Poll::Ready(ReadyEvent { - tick, - readiness: w.interest, - }); + return Poll::Ready(ReadyEvent { + tick, + readiness: w.interest, + }); + } } } } } -} -impl Drop for Readiness<'_> { - fn drop(&mut self) { - let mut waiters = self.scheduled_io.waiters.lock().unwrap(); + impl Drop for Readiness<'_> { + fn drop(&mut self) { + let mut waiters = self.scheduled_io.waiters.lock().unwrap(); - // Safety: `waiter` is only ever stored in `waiters` - unsafe { - waiters - .list - .remove(NonNull::new_unchecked(self.waiter.get())) - }; + // Safety: `waiter` is only ever stored in `waiters` + unsafe { + waiters + .list + .remove(NonNull::new_unchecked(self.waiter.get())) + }; + } } -} -unsafe impl Send for Readiness<'_> {} -unsafe impl Sync for Readiness<'_> {} + unsafe impl Send for Readiness<'_> {} + unsafe impl Sync for Readiness<'_> {} +} diff --git a/tokio/src/io/poll_evented.rs b/tokio/src/io/poll_evented.rs index 64d70ceae34..cfcbae506cd 100644 --- a/tokio/src/io/poll_evented.rs +++ b/tokio/src/io/poll_evented.rs @@ -128,6 +128,7 @@ where /// Returns a shared reference to the underlying I/O object this readiness /// stream is wrapping. + #[cfg(any(feature = "process", feature = "tcp", feature = "udp", feature = "uds",))] pub(crate) fn get_ref(&self) -> &E { self.io.as_ref().unwrap() } @@ -146,16 +147,13 @@ where /// Note that deregistering does not guarantee that the I/O resource can be /// registered with a different reactor. Some I/O resource types can only be /// associated with a single reactor instance for their lifetime. + #[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] pub(crate) fn into_inner(mut self) -> io::Result { let io = self.io.take().unwrap(); self.registration.deregister(&io)?; Ok(io) } - pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { - self.registration.readiness(interest).await - } - pub(crate) fn clear_readiness(&self, event: ReadyEvent) { self.registration.clear_readiness(event); } @@ -219,6 +217,17 @@ where } } +cfg_io_readiness! { + impl PollEvented + where + E: Evented, + { + pub(crate) async fn readiness(&self, interest: mio::Ready) -> io::Result { + self.registration.readiness(interest).await + } + } +} + // ===== Read / Write impls ===== impl AsyncRead for PollEvented diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 259b2d47c3a..e7adfc3c6d1 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -127,11 +127,6 @@ impl Registration { inner.deregister_source(io) } - pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { - // TODO: does this need to return a `Result`? - Ok(self.shared.readiness(interest).await) - } - pub(super) fn clear_readiness(&self, event: ReadyEvent) { self.shared.clear_readiness(event); } @@ -156,3 +151,12 @@ impl Registration { Poll::Ready(Ok(ev)) } } + +cfg_io_readiness! { + impl Registration { + pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result { + // TODO: does this need to return a `Result`? + Ok(self.shared.readiness(interest).await) + } + } +} diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index b31b478dcf1..f057cedc9c9 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -77,7 +77,6 @@ //! - `rt-core`: Enables `tokio::spawn` and the basic (single-threaded) scheduler. //! - `rt-threaded`: Enables the heavier, multi-threaded, work-stealing scheduler. //! - `rt-util`: Enables non-scheduler utilities. -//! - `io-driver`: Enables the `mio` based IO driver. //! - `io-util`: Enables the IO based `Ext` traits. //! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types. //! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`. @@ -269,8 +268,7 @@ //! the [`AsyncRead`], [`AsyncWrite`], and [`AsyncBufRead`] traits. In addition, //! when the "io-util" feature flag is enabled, it also provides combinators and //! functions for working with these traits, forming as an asynchronous -//! counterpart to [`std::io`]. When the "io-driver" feature flag is enabled, it -//! also provides utilities for library authors implementing I/O resources. +//! counterpart to [`std::io`]. //! //! Tokio also includes APIs for performing various kinds of I/O and interacting //! with the operating system asynchronously. These include: diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index ff9f9481952..01a59c88191 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -129,16 +129,15 @@ macro_rules! cfg_io_driver { ($($item:item)*) => { $( #[cfg(feature = "io-driver")] - #[cfg_attr(docsrs, doc(cfg(feature = "io-driver")))] $item )* } } -macro_rules! cfg_not_io_driver { +macro_rules! cfg_io_readiness { ($($item:item)*) => { $( - #[cfg(not(feature = "io-driver"))] + #[cfg(feature = "io-readiness")] $item )* } diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index e296fb41a95..9549f07c56e 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -107,6 +107,7 @@ impl LinkedList { /// Removes the last element from a list and returns it, or None if it is /// empty. + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn pop_back(&mut self) -> Option { unsafe { let last = self.tail?; @@ -126,6 +127,7 @@ impl LinkedList { } /// Returns whether the linked list doesn not contain any node + #[cfg_attr(any(feature = "udp", feature = "uds"), allow(unused))] pub(crate) fn is_empty(&self) -> bool { if self.head.is_some() { return false; @@ -231,7 +233,7 @@ cfg_rt_threaded! { // ===== impl DrainFilter ===== -cfg_io_driver! { +cfg_io_readiness! { pub(crate) struct DrainFilter<'a, T: Link, F> { list: &'a mut LinkedList, filter: F, diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 9fe4d70a456..d966c0f6afb 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,7 +3,7 @@ cfg_io_driver! { pub(crate) mod slab; } -#[cfg(any(feature = "io-driver", feature = "sync", feature = "rt-core"))] +#[cfg(any(feature = "io-readiness", feature = "sync", feature = "rt-core"))] pub(crate) mod linked_list; #[cfg(any(feature = "rt-threaded", feature = "macros", feature = "stream"))]