diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3e721f520c5..16da8c95e41 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ name: CI env: RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 - nightly: nightly-2020-07-12 + nightly: nightly-2020-09-21 minrust: 1.45.2 jobs: @@ -91,7 +91,7 @@ jobs: run: cargo test --features full working-directory: tokio env: - RUSTFLAGS: '--cfg tokio_unstable' + RUSTFLAGS: --cfg tokio_unstable -Dwarnings miri: name: miri @@ -110,7 +110,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + run: cargo miri test --features rt-core,rt-threaded,rt-util,sync task working-directory: tokio cross: @@ -156,7 +156,7 @@ jobs: - name: check --each-feature --unstable run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps env: - RUSTFLAGS: --cfg tokio_unstable + RUSTFLAGS: --cfg tokio_unstable -Dwarnings minrust: name: minrust @@ -239,6 +239,6 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings LOOM_MAX_PREEMPTIONS: 2 SCOPE: ${{ matrix.scope }} diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 3a90943f059..cca0ece5eb8 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -8,6 +8,9 @@ edition = "2018" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } bencher = "0.1.5" +[target.'cfg(unix)'.dependencies] +libc = "0.2.42" + [[bench]] name = "spawn" path = "spawn.rs" @@ -33,3 +36,8 @@ harness = false name = "sync_semaphore" path = "sync_semaphore.rs" harness = false + +[[bench]] +name = "signal" +path = "signal.rs" +harness = false diff --git a/benches/signal.rs b/benches/signal.rs new file mode 100644 index 00000000000..d891326de0d --- /dev/null +++ b/benches/signal.rs @@ -0,0 +1,96 @@ +//! Benchmark the delay in propagating OS signals to any listeners. +#![cfg(unix)] + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::runtime; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc; + +struct Spinner { + count: usize, +} + +impl Future for Spinner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count > 3 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl Spinner { + fn new() -> Self { + Self { count: 0 } + } +} + +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} + +fn many_signals(bench: &mut Bencher) { + let num_signals = 10; + let (tx, mut rx) = mpsc::channel(num_signals); + + let rt = runtime::Builder::new() + // Intentionally single threaded to measure delays in propagating wakes + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + + let spawn_signal = |kind| { + let mut tx = tx.clone(); + rt.spawn(async move { + let mut signal = signal(kind).expect("failed to create signal"); + + while signal.recv().await.is_some() { + if tx.send(()).await.is_err() { + break; + } + } + }); + }; + + for _ in 0..num_signals { + // Pick some random signals which don't terminate the test harness + spawn_signal(SignalKind::child()); + spawn_signal(SignalKind::io()); + } + drop(tx); + + // Turn the runtime for a while to ensure that all the spawned + // tasks have been polled at least once + rt.block_on(Spinner::new()); + + bench.iter(|| { + rt.block_on(async { + send_signal(libc::SIGCHLD); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + + send_signal(libc::SIGIO); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + }); + }); +} + +benchmark_group!(signal_group, many_signals,); + +benchmark_main!(signal_group); diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 0f8cfcd4cf8..b96d9044ea1 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs index da38c432326..52c30ef980e 100644 --- a/tokio-util/tests/framed_read.rs +++ b/tokio-util/tests/framed_read.rs @@ -185,8 +185,8 @@ fn read_partial_would_block_then_err() { #[test] fn huge_size() { let mut task = task::spawn(()); - let data = [0; 32 * 1024]; - let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder); + let data = &[0; 32 * 1024][..]; + let mut framed = FramedRead::new(data, BigDecoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); @@ -212,7 +212,7 @@ fn huge_size() { #[test] fn data_remaining_is_error() { let mut task = task::spawn(()); - let slice = Slice(&[0; 5]); + let slice = &[0; 5][..]; let mut framed = FramedRead::new(slice, U32Decoder); task.enter(|cx, _| { @@ -280,16 +280,3 @@ impl AsyncRead for Mock { } } } - -// TODO this newtype is necessary because `&[u8]` does not currently implement `AsyncRead` -struct Slice<'a>(&'a [u8]); - -impl AsyncRead for Slice<'_> { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} diff --git a/tokio/src/future/pending.rs b/tokio/src/future/pending.rs deleted file mode 100644 index 287e836fd3c..00000000000 --- a/tokio/src/future/pending.rs +++ /dev/null @@ -1,44 +0,0 @@ -use sdt::pin::Pin; -use std::future::Future; -use std::marker; -use std::task::{Context, Poll}; - -/// Future for the [`pending()`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct Pending { - _data: marker::PhantomData, -} - -/// Creates a future which never resolves, representing a computation that never -/// finishes. -/// -/// The returned future will forever return [`Poll::Pending`]. -/// -/// # Examples -/// -/// ```no_run -/// use tokio::future; -/// -/// #[tokio::main] -/// async fn main { -/// future::pending().await; -/// unreachable!(); -/// } -/// ``` -pub async fn pending() -> ! { - Pending { - _data: marker::PhantomData, - } - .await -} - -impl Future for Pending { - type Output = !; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Pending - } -} - -impl Unpin for Pending {} diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index e7adfc3c6d1..7396c4b87f0 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -87,7 +87,18 @@ impl Registration { where T: Evented, { - let handle = Handle::current(); + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + /// Same as `new_with_ready` but also accepts an explicit handle. + pub(crate) fn new_with_ready_and_handle( + io: &T, + ready: mio::Ready, + handle: Handle, + ) -> io::Result + where + T: Evented, + { let shared = if let Some(inner) = handle.inner() { inner.add_source(io, ready)? } else { diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index e3b5bf6b6f4..8f071167f6c 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -4,12 +4,14 @@ use std::io::{self, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Seek<'a, S: ?Sized> { - seek: &'a mut S, - pos: Option, +cfg_io_util! { + /// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Seek<'a, S: ?Sized> { + seek: &'a mut S, + pos: Option, + } } pub(crate) fn seek(seek: &mut S, pos: SeekFrom) -> Seek<'_, S> diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index dd280a188d7..0ab66c286d3 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1067,7 +1067,7 @@ cfg_io_util! { /// (See also the [`crate::fs::read_to_string`] convenience function for /// reading from a file.) /// - /// [`crate::fs::read_to_string`]: crate::fs::read_to_string::read_to_string + /// [`crate::fs::read_to_string`]: fn@crate::fs::read_to_string fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin, diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs index c7a0f72fb81..35bc94ee34d 100644 --- a/tokio/src/io/util/async_seek_ext.rs +++ b/tokio/src/io/util/async_seek_ext.rs @@ -2,65 +2,67 @@ use crate::io::seek::{seek, Seek}; use crate::io::AsyncSeek; use std::io::SeekFrom; -/// An extension trait which adds utility methods to [`AsyncSeek`] types. -/// -/// As a convenience, this trait may be imported using the [`prelude`]: -/// -/// # Examples -/// -/// ``` -/// use std::io::{Cursor, SeekFrom}; -/// use tokio::prelude::*; -/// -/// #[tokio::main] -/// async fn main() -> io::Result<()> { -/// let mut cursor = Cursor::new(b"abcdefg"); -/// -/// // the `seek` method is defined by this trait -/// cursor.seek(SeekFrom::Start(3)).await?; -/// -/// let mut buf = [0; 1]; -/// let n = cursor.read(&mut buf).await?; -/// assert_eq!(n, 1); -/// assert_eq!(buf, [b'd']); -/// -/// Ok(()) -/// } -/// ``` -/// -/// See [module][crate::io] documentation for more details. -/// -/// [`AsyncSeek`]: AsyncSeek -/// [`prelude`]: crate::prelude -pub trait AsyncSeekExt: AsyncSeek { - /// Creates a future which will seek an IO object, and then yield the - /// new position in the object and the object itself. +cfg_io_util! { + /// An extension trait which adds utility methods to [`AsyncSeek`] types. /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. + /// As a convenience, this trait may be imported using the [`prelude`]: /// /// # Examples /// - /// ```no_run - /// use tokio::fs::File; + /// ``` + /// use std::io::{Cursor, SeekFrom}; /// use tokio::prelude::*; /// - /// use std::io::SeekFrom; + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut cursor = Cursor::new(b"abcdefg"); + /// + /// // the `seek` method is defined by this trait + /// cursor.seek(SeekFrom::Start(3)).await?; /// - /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; - /// file.seek(SeekFrom::Start(6)).await?; + /// let mut buf = [0; 1]; + /// let n = cursor.read(&mut buf).await?; + /// assert_eq!(n, 1); + /// assert_eq!(buf, [b'd']); /// - /// let mut contents = vec![0u8; 10]; - /// file.read_exact(&mut contents).await?; - /// # Ok(()) - /// # } + /// Ok(()) + /// } /// ``` - fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> - where - Self: Unpin, - { - seek(self, pos) + /// + /// See [module][crate::io] documentation for more details. + /// + /// [`AsyncSeek`]: AsyncSeek + /// [`prelude`]: crate::prelude + pub trait AsyncSeekExt: AsyncSeek { + /// Creates a future which will seek an IO object, and then yield the + /// new position in the object and the object itself. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// use std::io::SeekFrom; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek(SeekFrom::Start(6)).await?; + /// + /// let mut contents = vec![0u8; 10]; + /// file.read_exact(&mut contents).await?; + /// # Ok(()) + /// # } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> + where + Self: Unpin, + { + seek(self, pos) + } } } diff --git a/tokio/src/loom/std/atomic_u16.rs b/tokio/src/loom/std/atomic_u16.rs index 70390972b4b..c1c531208c2 100644 --- a/tokio/src/loom/std/atomic_u16.rs +++ b/tokio/src/loom/std/atomic_u16.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU16 {} unsafe impl Sync for AtomicU16 {} impl AtomicU16 { - pub(crate) fn new(val: u16) -> AtomicU16 { + pub(crate) const fn new(val: u16) -> AtomicU16 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU16::new(val)); AtomicU16 { inner } } diff --git a/tokio/src/loom/std/atomic_u32.rs b/tokio/src/loom/std/atomic_u32.rs index 6f786c519f1..61f95fb30ce 100644 --- a/tokio/src/loom/std/atomic_u32.rs +++ b/tokio/src/loom/std/atomic_u32.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU32 {} unsafe impl Sync for AtomicU32 {} impl AtomicU32 { - pub(crate) fn new(val: u32) -> AtomicU32 { + pub(crate) const fn new(val: u32) -> AtomicU32 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val)); AtomicU32 { inner } } diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs index 4fcd0df3d45..408aea338c6 100644 --- a/tokio/src/loom/std/atomic_u8.rs +++ b/tokio/src/loom/std/atomic_u8.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU8 {} unsafe impl Sync for AtomicU8 {} impl AtomicU8 { - pub(crate) fn new(val: u8) -> AtomicU8 { + pub(crate) const fn new(val: u8) -> AtomicU8 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); AtomicU8 { inner } } diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 0fe998f1f9e..0d5f36e4310 100644 --- a/tokio/src/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicUsize {} unsafe impl Sync for AtomicUsize {} impl AtomicUsize { - pub(crate) fn new(val: usize) -> AtomicUsize { + pub(crate) const fn new(val: usize) -> AtomicUsize { let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val)); AtomicUsize { inner } } diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index 25d94af44f5..41c47ddd31e 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -26,6 +26,13 @@ impl Mutex { Mutex(parking_lot::Mutex::new(t)) } + #[inline] + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "parking_lot",))))] + pub(crate) const fn const_new(t: T) -> Mutex { + Mutex(parking_lot::const_mutex(t)) + } + #[inline] pub(crate) fn lock(&self) -> LockResult> { Ok(self.0.lock()) diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs index f2b03d8dc2a..66c1d7943e0 100644 --- a/tokio/src/loom/std/unsafe_cell.rs +++ b/tokio/src/loom/std/unsafe_cell.rs @@ -2,7 +2,7 @@ pub(crate) struct UnsafeCell(std::cell::UnsafeCell); impl UnsafeCell { - pub(crate) fn new(data: T) -> UnsafeCell { + pub(crate) const fn new(data: T) -> UnsafeCell { UnsafeCell(std::cell::UnsafeCell::new(data)) } diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index 6497a510e98..ffca0027269 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -404,6 +404,7 @@ macro_rules! select { // The future returned a value, check if matches // the specified pattern. #[allow(unused_variables)] + #[allow(unused_mut)] match &out { $bind => {} _ => continue, diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 937045bd0e4..c9703764e3c 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -116,7 +116,7 @@ impl UdpSocket { /// should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } @@ -166,7 +166,7 @@ impl UdpSocket { /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io.get_ref().send_to(buf, &target) } diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 9c19c72ea97..7da9783be34 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -103,7 +103,7 @@ //! paradigm of dropping-implies-cancellation, a spawned process will, by //! default, continue to execute even after the `Child` handle has been dropped. //! -//! The `Command::kill_on_drop` method can be used to modify this behavior +//! The [`Command::kill_on_drop`] method can be used to modify this behavior //! and kill the child process if the `Child` wrapper is dropped before it //! has exited. //! @@ -581,8 +581,10 @@ impl Command { /// All I/O this child does will be associated with the current default /// event loop. /// - /// If this future is dropped before the future resolves, then - /// the child will be killed, if it was spawned. + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop /// /// # Errors /// @@ -602,6 +604,7 @@ impl Command { /// .await /// .expect("ls command failed to run") /// } + /// ``` pub fn status(&mut self) -> impl Future> { let child = self.spawn(); @@ -637,8 +640,10 @@ impl Command { /// All I/O this child does will be associated with the current default /// event loop. /// - /// If this future is dropped before the future resolves, then - /// the child will be killed, if it was spawned. + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop /// /// # Examples /// @@ -654,6 +659,7 @@ impl Command { /// .expect("ls command failed to run"); /// println!("stderr of ls: {:?}", output.stderr); /// } + /// ``` pub fn output(&mut self) -> impl Future> { self.std.stdout(Stdio::piped()); self.std.stderr(Stdio::piped()); diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index ed2cd251c35..4072b04e465 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,7 +1,7 @@ -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Mutex; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; use std::fmt; #[cfg(feature = "blocking")] @@ -330,7 +330,7 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.before_stop = Some(Arc::new(f)); + self.before_stop = Some(std::sync::Arc::new(f)); self } @@ -359,14 +359,17 @@ impl Builder { } } + fn get_cfg(&self) -> driver::Cfg { + driver::Cfg { + enable_io: self.enable_io, + enable_time: self.enable_time, + } + } + fn build_shell_runtime(&mut self) -> io::Result { use crate::runtime::Kind; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; let spawner = Spawner::Shell; @@ -377,9 +380,10 @@ impl Builder { kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -478,12 +482,7 @@ cfg_rt_core! { fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, Kind}; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or @@ -500,9 +499,10 @@ cfg_rt_core! { kind: Kind::Basic(Mutex::new(Some(scheduler))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -533,10 +533,8 @@ cfg_rt_threaded! { let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); - let clock = time::create_clock(); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); @@ -547,9 +545,10 @@ cfg_rt_threaded! { // Create the runtime handle let handle = Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }; diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index c42b3432dea..7498175f3a7 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -14,7 +14,7 @@ cfg_blocking_impl! { } cfg_io_driver! { - pub(crate) fn io_handle() -> crate::runtime::io::Handle { + pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.io_handle.clone(), None => Default::default(), @@ -22,8 +22,18 @@ cfg_io_driver! { } } +cfg_signal! { + #[cfg(unix)] + pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.signal_handle.clone(), + None => Default::default(), + }) + } +} + cfg_time! { - pub(crate) fn time_handle() -> crate::runtime::time::Handle { + pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.time_handle.clone(), None => Default::default(), @@ -31,7 +41,7 @@ cfg_time! { } cfg_test_util! { - pub(crate) fn clock() -> Option { + pub(crate) fn clock() -> Option { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.clock.clone()), None => None, diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs new file mode 100644 index 00000000000..2ea5089022d --- /dev/null +++ b/tokio/src/runtime/driver.rs @@ -0,0 +1,196 @@ +//! Abstracts out the entire chain of runtime sub-drivers into common types. +use crate::park::{Park, ParkThread}; +use std::io; +use std::time::Duration; + +// ===== io driver ===== + +cfg_io_driver! { + type IoDriver = crate::park::Either; + pub(crate) type IoHandle = Option; + + fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> { + use crate::park::Either; + + #[cfg(loom)] + assert!(!enable); + + if enable { + let driver = crate::io::driver::Driver::new()?; + let handle = driver.handle(); + + Ok((Either::A(driver), Some(handle))) + } else { + let driver = ParkThread::new(); + Ok((Either::B(driver), None)) + } + } +} + +cfg_not_io_driver! { + type IoDriver = ParkThread; + pub(crate) type IoHandle = (); + + fn create_io_driver(_enable: bool) -> io::Result<(IoDriver, IoHandle)> { + let driver = ParkThread::new(); + Ok((driver, ())) + } +} + +// ===== signal driver ===== + +macro_rules! cfg_unix_and_signal { + ($($item:item)*) => { + $( + #[cfg(all(not(loom), unix, feature = "signal"))] + $item + )* + } +} + +macro_rules! cfg_neither_unix_nor_windows { + ($($item:item)*) => { + $( + #[cfg(any(loom, not(all(unix, feature = "signal"))))] + $item + )* + } +} + +cfg_unix_and_signal! { + type SignalDriver = crate::park::Either; + pub(crate) type SignalHandle = Option; + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + use crate::park::Either; + + // Enable the signal driver if IO is also enabled + match io_driver { + Either::A(io_driver) => { + let driver = crate::signal::unix::driver::Driver::new(io_driver)?; + let handle = driver.handle(); + Ok((Either::A(driver), Some(handle))) + } + Either::B(_) => Ok((Either::B(io_driver), None)), + } + } +} + +cfg_neither_unix_nor_windows! { + type SignalDriver = IoDriver; + pub(crate) type SignalHandle = (); + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + Ok((io_driver, ())) + } +} + +// ===== time driver ===== + +cfg_time! { + type TimeDriver = crate::park::Either, SignalDriver>; + + pub(crate) type Clock = crate::time::Clock; + pub(crate) type TimeHandle = Option; + + fn create_clock() -> Clock { + crate::time::Clock::new() + } + + fn create_time_driver( + enable: bool, + signal_driver: SignalDriver, + clock: Clock, + ) -> (TimeDriver, TimeHandle) { + use crate::park::Either; + + if enable { + let driver = crate::time::driver::Driver::new(signal_driver, clock); + let handle = driver.handle(); + + (Either::A(driver), Some(handle)) + } else { + (Either::B(signal_driver), None) + } + } +} + +cfg_not_time! { + type TimeDriver = SignalDriver; + + pub(crate) type Clock = (); + pub(crate) type TimeHandle = (); + + fn create_clock() -> Clock { + () + } + + fn create_time_driver( + _enable: bool, + signal_driver: SignalDriver, + _clock: Clock, + ) -> (TimeDriver, TimeHandle) { + (signal_driver, ()) + } +} + +// ===== runtime driver ===== + +#[derive(Debug)] +pub(crate) struct Driver { + inner: TimeDriver, +} + +pub(crate) struct Resources { + pub(crate) io_handle: IoHandle, + pub(crate) signal_handle: SignalHandle, + pub(crate) time_handle: TimeHandle, + pub(crate) clock: Clock, +} + +pub(crate) struct Cfg { + pub(crate) enable_io: bool, + pub(crate) enable_time: bool, +} + +impl Driver { + pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { + let clock = create_clock(); + + let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?; + let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, signal_driver, clock.clone()); + + Ok(( + Self { inner: time_driver }, + Resources { + io_handle, + signal_handle, + time_handle, + clock, + }, + )) + } +} + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = ::Error; + + fn unpark(&self) -> Self::Unpark { + self.inner.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } + + fn shutdown(&mut self) { + self.inner.shutdown() + } +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 516ad4b3aad..dfcc5e97921 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,4 +1,4 @@ -use crate::runtime::{blocking, context, io, time, Spawner}; +use crate::runtime::{blocking, context, driver, Spawner}; /// Handle to the runtime. /// @@ -11,13 +11,16 @@ pub(crate) struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers - pub(super) io_handle: io::Handle, + pub(super) io_handle: driver::IoHandle, + + /// Handles to the signal drivers + pub(super) signal_handle: driver::SignalHandle, /// Handles to the time drivers - pub(super) time_handle: time::Handle, + pub(super) time_handle: driver::TimeHandle, /// Source of `Instant::now()` - pub(super) clock: time::Clock, + pub(super) clock: driver::Clock, /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs deleted file mode 100644 index 6a0953af851..00000000000 --- a/tokio/src/runtime/io.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -/// Re-exported for convenience. -pub(crate) use std::io::Result; - -pub(crate) use variant::*; - -#[cfg(feature = "io-driver")] -mod variant { - use crate::io::driver; - use crate::park::{Either, ParkThread}; - - use std::io; - - /// The driver value the runtime passes to the `timer` layer. - /// - /// When the `io-driver` feature is enabled, this is the "real" I/O driver - /// backed by Mio. Without the `io-driver` feature, this is a thread parker - /// backed by a condition variable. - pub(crate) type Driver = Either; - - /// The handle the runtime stores for future use. - /// - /// When the `io-driver` feature is **not** enabled, this is `()`. - pub(crate) type Handle = Option; - - pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { - #[cfg(loom)] - assert!(!enable); - - if enable { - let driver = driver::Driver::new()?; - let handle = driver.handle(); - - Ok((Either::A(driver), Some(handle))) - } else { - let driver = ParkThread::new(); - Ok((Either::B(driver), None)) - } - } -} - -#[cfg(not(feature = "io-driver"))] -mod variant { - use crate::park::ParkThread; - - use std::io; - - /// I/O is not enabled, use a condition variable based parker - pub(crate) type Driver = ParkThread; - - /// There is no handle - pub(crate) type Handle = (); - - pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { - let driver = ParkThread::new(); - - Ok((driver, ())) - } -} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bec0ecd5949..884c2b46fa4 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -208,13 +208,18 @@ cfg_blocking_impl! { mod builder; pub use self::builder::Builder; +pub(crate) mod driver; + pub(crate) mod enter; use self::enter::enter; mod handle; use handle::Handle; -mod io; +mod io { + /// Re-exported for convenience. + pub(crate) use std::io::Result; +} cfg_rt_threaded! { mod park; @@ -227,8 +232,6 @@ use self::shell::Shell; mod spawner; use self::spawner::Spawner; -mod time; - cfg_rt_threaded! { mod queue; @@ -293,7 +296,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex>>), + Basic(Mutex>>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index 1dcf65af91b..c994c935ce2 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::park::{Park, Unpark}; -use crate::runtime::time; +use crate::runtime::driver::Driver; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -42,14 +42,14 @@ const NOTIFIED: usize = 3; /// Shared across multiple Parker handles struct Shared { /// Shared driver. Only one thread at a time can use this - driver: TryLock, + driver: TryLock, /// Unpark handle - handle: ::Unpark, + handle: ::Unpark, } impl Parker { - pub(crate) fn new(driver: time::Driver) -> Parker { + pub(crate) fn new(driver: Driver) -> Parker { let handle = driver.unpark(); Parker { @@ -180,7 +180,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut time::Driver) { + fn park_driver(&self, driver: &mut Driver) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index a65869d0de2..3d631239bf8 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,8 +1,8 @@ #![allow(clippy::redundant_clone)] use crate::park::{Park, Unpark}; +use crate::runtime::driver::Driver; use crate::runtime::enter; -use crate::runtime::time; use crate::util::{waker_ref, Wake}; use std::future::Future; @@ -12,17 +12,17 @@ use std::task::Poll::Ready; #[derive(Debug)] pub(super) struct Shell { - driver: time::Driver, + driver: Driver, /// TODO: don't store this unpark: Arc, } #[derive(Debug)] -struct Handle(::Unpark); +struct Handle(::Unpark); impl Shell { - pub(super) fn new(driver: time::Driver) -> Shell { + pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); Shell { driver, unpark } diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index ed484846311..47ee1981086 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -209,7 +209,7 @@ mod group_b { #[test] fn complete_block_on_under_load() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); pool.block_on(async { // Trigger a re-schedule diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs deleted file mode 100644 index c623d9641a1..00000000000 --- a/tokio/src/runtime/time.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the time -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -pub(crate) use variant::*; - -#[cfg(feature = "time")] -mod variant { - use crate::park::Either; - use crate::runtime::io; - use crate::time::{self, driver}; - - pub(crate) type Clock = time::Clock; - pub(crate) type Driver = Either, io::Driver>; - pub(crate) type Handle = Option; - - pub(crate) fn create_clock() -> Clock { - Clock::new() - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - enable: bool, - io_driver: io::Driver, - clock: Clock, - ) -> (Driver, Handle) { - if enable { - let driver = driver::Driver::new(io_driver, clock); - let handle = driver.handle(); - - (Either::A(driver), Some(handle)) - } else { - (Either::B(io_driver), None) - } - } -} - -#[cfg(not(feature = "time"))] -mod variant { - use crate::runtime::io; - - pub(crate) type Clock = (); - pub(crate) type Driver = io::Driver; - pub(crate) type Handle = (); - - pub(crate) fn create_clock() -> Clock { - () - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - _enable: bool, - io_driver: io::Driver, - _clock: Clock, - ) -> (Driver, Handle) { - (io_driver, ()) - } -} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index bc48bdfaa64..45a091d76a6 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -5,7 +5,6 @@ #![cfg(unix)] -use crate::io::{AsyncRead, PollEvented, ReadBuf}; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; @@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +pub(crate) mod driver; + pub(crate) type OsStorage = Vec; // Number of different unix signals @@ -202,9 +203,9 @@ impl Default for SignalInfo { /// The purpose of this signal handler is to primarily: /// /// 1. Flag that our specific signal was received (e.g. store an atomic flag) -/// 2. Wake up driver tasks by writing a byte to a pipe +/// 2. Wake up the driver by writing a byte to a pipe /// -/// Those two operations shoudl both be async-signal safe. +/// Those two operations should both be async-signal safe. fn action(globals: Pin<&'static Globals>, signal: c_int) { globals.record_event(signal as EventId); @@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> { )); } + // Check that we have a signal driver running + driver::Handle::current().check_inner()?; + let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, @@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } } -#[derive(Debug)] -struct Driver { - wakeup: PollEvented, -} - -impl Driver { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Drain the data from the pipe and maintain interest in getting more - self.drain(cx); - // Broadcast any signals which were received - globals().broadcast(); - - Poll::Pending - } -} - -impl Driver { - fn new() -> io::Result { - // NB: We give each driver a "fresh" reciever file descriptor to avoid - // the issues described in alexcrichton/tokio-process#42. - // - // In the past we would reuse the actual receiver file descriptor and - // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. - // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. - let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream)?; - - Ok(Driver { wakeup }) - } - - /// Drain all data in the global receiver, ensuring we'll get woken up when - /// there is a write on the other end. - /// - /// We do *NOT* use the existence of any read bytes as evidence a signal was - /// received since the `pending` flags would have already been set if that - /// was the case. See - /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more - /// info. - fn drain(&mut self, cx: &mut Context<'_>) { - let mut buf = [0; 128]; - let mut buf = ReadBuf::new(&mut buf); - loop { - match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) { - Poll::Ready(Ok(())) => { - if buf.filled().is_empty() { - panic!("EOF on self-pipe") - } - buf.clear(); - } - Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), - Poll::Pending => break, - } - } - } -} - /// A stream of events for receiving a particular type of OS signal. /// /// In general signal handling on Unix is a pretty tricky topic, and this @@ -382,7 +323,6 @@ impl Driver { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - driver: Driver, rx: Receiver<()>, } @@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result { // Turn the signal delivery on once we are ready for it signal_enable(signal)?; - // Ensure there's a driver for our associated event loop processing - // signals. - let driver = Driver::new()?; - // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); globals().register_listener(signal as EventId, tx); - Ok(Signal { driver, rx }) + Ok(Signal { rx }) } impl Signal { @@ -484,7 +420,6 @@ impl Signal { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - let _ = self.driver.poll(cx); self.rx.poll_recv(cx) } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs new file mode 100644 index 00000000000..0458893cce4 --- /dev/null +++ b/tokio/src/signal/unix/driver.rs @@ -0,0 +1,153 @@ +//! Signal driver + +use crate::io::driver::Driver as IoDriver; +use crate::io::Registration; +use crate::park::Park; +use crate::runtime::context; +use crate::signal::registry::globals; +use mio_uds::UnixStream; +use std::io::{self, Read}; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +/// Responsible for registering wakeups when an OS signal is received, and +/// subsequently dispatching notifications to any signal listeners as appropriate. +/// +/// Note: this driver relies on having an enabled IO driver in order to listen to +/// pipe write wakeups. +#[derive(Debug)] +pub(crate) struct Driver { + /// Thread parker. The `Driver` park implementation delegates to this. + park: IoDriver, + + /// A pipe for receiving wake events from the signal handler + receiver: UnixStream, + + /// The actual registraiton for `receiver` when active. + /// Lazily bound at the first signal registration. + registration: Registration, + + /// Shared state + inner: Arc, +} + +#[derive(Clone, Debug)] +pub(crate) struct Handle { + inner: Weak, +} + +#[derive(Debug)] +pub(super) struct Inner(()); + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: IoDriver) -> io::Result { + // NB: We give each driver a "fresh" reciever file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end up + // receiving wake up notifications, or there could be some race condition + // when consuming readiness events, but having distinct descriptors for + // distinct PollEvented instances appears to mitigate this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + let receiver = globals().receiver.try_clone()?; + let registration = + Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + + Ok(Self { + park, + receiver, + registration, + inner: Arc::new(Inner(())), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn process(&self) { + // Check if the pipe is ready to read and therefore has "woken" us up + match self.registration.take_read_ready() { + Ok(Some(ready)) => assert!(ready.is_readable()), + Ok(None) => return, // No wake has arrived, bail + Err(e) => panic!("reactor gone: {}", e), + } + + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {}", e), + } + } + + // Broadcast any signals which were received + globals().broadcast(); + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + context::signal_handle().expect( + "there is no signal driver running, must be called from the context of Tokio runtime", + ) + } + + pub(super) fn check_inner(&self) -> io::Result<()> { + if self.inner.strong_count() > 0 { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + } + } +} diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 23713251e9d..fddb3a5b794 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -110,9 +110,11 @@ impl Barrier { let mut wait = self.wait.clone(); loop { + let _ = wait.changed().await; + // note that the first time through the loop, this _will_ yield a generation // immediately, since we cloned a receiver that has never seen any values. - if wait.recv().await >= generation { + if *wait.borrow() >= generation { break; } } diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index c48a911c8f3..a1048ca3734 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -123,6 +123,27 @@ impl Semaphore { } } + /// Creates a new semaphore with the initial number of permits + /// + /// Maximum number of permits on 32-bit platforms is `1<<29`. + /// + /// If the specified number of permits exceeds the maximum permit amount + /// Then the value will get clamped to the maximum number of permits. + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + pub(crate) const fn const_new(mut permits: usize) -> Self { + // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925 + // currently we just clamp the permit count when it exceeds the max + permits = permits & Self::MAX_PERMITS; + + Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::const_new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + } + } + /// Returns the current number of available permits pub(crate) fn available_permits(&self) -> usize { self.permits.load(Acquire) >> Self::PERMIT_SHIFT diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9b3de5309ac..9484e1307b6 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -793,7 +793,7 @@ where /// This is useful for a flavor of "optimistic check" before deciding to /// await on a receiver. /// - /// Compared with [`recv`], this function has three failure cases instead of one + /// Compared with [`recv`], this function has three failure cases instead of two /// (one for closed, one for an empty buffer, one for a lagging receiver). /// /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 5d66512d5ad..1a584383b83 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -20,7 +20,7 @@ //! few flavors of channels provided by Tokio. Each channel flavor supports //! different message passing patterns. When a channel supports multiple //! producers, many separate tasks may **send** messages. When a channel -//! supports muliple consumers, many different separate tasks may **receive** +//! supports multiple consumers, many different separate tasks may **receive** //! messages. //! //! Tokio provides many different channel flavors as different message passing @@ -355,10 +355,8 @@ //! let op = my_async_operation(); //! tokio::pin!(op); //! -//! // Receive the **initial** configuration value. As this is the -//! // first time the config is received from the watch, it will -//! // always complete immediatedly. -//! let mut conf = rx.recv().await; +//! // Get the initial config value +//! let mut conf = rx.borrow().clone(); //! //! let mut op_start = Instant::now(); //! let mut delay = time::delay_until(op_start + conf.timeout); @@ -375,8 +373,8 @@ //! // Restart the timeout //! delay = time::delay_until(op_start + conf.timeout); //! } -//! new_conf = rx.recv() => { -//! conf = new_conf; +//! _ = rx.changed() => { +//! conf = rx.borrow().clone(); //! //! // The configuration has been updated. Update the //! // `delay` using the new `timeout` value. @@ -399,14 +397,14 @@ //! } //! ``` //! -//! [`watch` channel]: crate::sync::watch -//! [`broadcast` channel]: crate::sync::broadcast +//! [`watch` channel]: mod@crate::sync::watch +//! [`broadcast` channel]: mod@crate::sync::broadcast //! //! # State synchronization //! //! The remaining synchronization primitives focus on synchronizing state. //! These are asynchronous equivalents to versions provided by `std`. They -//! operate in a similar way as their `std` counterparts parts but will wait +//! operate in a similar way as their `std` counterparts but will wait //! asynchronously instead of blocking the thread. //! //! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad766..0a53cda2038 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,7 +75,11 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; @@ -192,7 +196,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +238,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index df348457fec..71f0136f005 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -219,6 +219,27 @@ impl Mutex { } } + /// Creates a new lock in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// static LOCK: Mutex = Mutex::const_new(5); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(t: T) -> Self + where + T: Sized, + { + Self { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::const_new(1), + } + } + /// Locks this mutex, causing the current task /// to yield until the lock has been acquired. /// When the lock has been acquired, function returns a [`MutexGuard`]. diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4321c974608..56bbc51bf28 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -106,6 +106,14 @@ pub struct Notify { waiters: Mutex, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers @@ -115,7 +123,7 @@ struct Waiter { waker: Option, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option, /// Should not be `Unpin`. _p: PhantomPinned, @@ -123,7 +131,7 @@ struct Waiter { /// Future returned from `notified()` #[derive(Debug)] -struct Notified<'a> { +pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -170,8 +178,32 @@ impl Notify { } } + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// static NOTIFY: Notify = Notify::const_new(); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new() -> Notify { + Notify { + state: AtomicU8::new(0), + waiters: Mutex::const_new(LinkedList::new()), + } + } + /// Wait for a notification. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// /// Each `Notify` value holds a single permit. If a permit is available from /// an earlier call to [`notify_one()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits @@ -199,18 +231,17 @@ impl Notify { /// notify.notify_one(); /// } /// ``` - pub async fn notified(&self) { + pub fn notified(&self) -> Notified<'_> { Notified { notify: self, state: State::Init, waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } - .await } /// Notifies a waiting task @@ -279,6 +310,45 @@ impl Notify { waker.wake(); } } + + /// Notifies all waiting tasks + pub(crate) fn notify_waiters(&self) { + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock().unwrap(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = curr { + // There are no waiting tasks. In this case, no synchronization is + // established between `notify` and `notified().await`. + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get pending waiters + while let Some(mut waiter) = waiters.pop_back() { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(waiter.notified.is_none()); + + waiter.notified = Some(NotificationType::AllWaiters); + + if let Some(waker) = waiter.waker.take() { + waker.wake(); + } + } + + // All waiters have been notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + self.state.store(EMPTY, SeqCst); + } } impl Default for Notify { @@ -313,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option { waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); *state = Waiting; + + return Poll::Pending; } Waiting => { // Currently in the "Waiting" state, implying the caller has @@ -442,11 +514,11 @@ impl Future for Notified<'_> { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -518,14 +590,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 1bb579319d0..840889bace8 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -11,7 +11,7 @@ const MAX_READS: usize = 32; #[cfg(loom)] const MAX_READS: usize = 10; -/// An asynchronous reader-writer lock +/// An asynchronous reader-writer lock. /// /// This type of lock allows a number of readers or at most one writer at any /// point in time. The write portion of this lock typically allows modification @@ -344,6 +344,58 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { marker: marker::PhantomData, }) } + + /// Atomically downgrades a write lock into a read lock without allowing + /// any writers to take exclusive access of the lock in the meantime. + /// + /// **Note:** This won't *necessarily* allow any additional readers to acquire + /// locks, since [`RwLock`] is fair and it is possible that a writer is next + /// in line. + /// + /// Returns an RAII guard which will drop the read access of this rwlock + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// # use tokio::sync::RwLock; + /// # use std::sync::Arc; + /// # + /// # #[tokio::main] + /// # async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let n = lock.write().await; + /// + /// let cloned_lock = lock.clone(); + /// let handle = tokio::spawn(async move { + /// *cloned_lock.write().await = 2; + /// }); + /// + /// let n = n.downgrade(); + /// assert_eq!(*n, 1, "downgrade is atomic"); + /// + /// assert_eq!(*lock.read().await, 1, "additional readers can obtain locks"); + /// + /// drop(n); + /// handle.await.unwrap(); + /// assert_eq!(*lock.read().await, 2, "second writer obtained write lock"); + /// # } + /// ``` + /// + /// [`RwLock`]: struct@RwLock + pub fn downgrade(self) -> RwLockReadGuard<'a, T> { + let RwLockWriteGuard { s, data, .. } = self; + + // Release all but one of the permits held by the write guard + s.release(MAX_READS - 1); + + RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + } + } } impl<'a, T: ?Sized> fmt::Debug for RwLockWriteGuard<'a, T> @@ -433,6 +485,27 @@ impl RwLock { } } + /// Creates a new instance of an `RwLock` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::RwLock; + /// + /// static LOCK: RwLock = RwLock::const_new(5); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(value: T) -> RwLock + where + T: Sized, + { + RwLock { + c: UnsafeCell::new(value), + s: Semaphore::const_new(MAX_READS), + } + } + /// Locks this rwlock with shared read access, causing the current task /// to yield until the lock has been acquired. /// diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 2489d34aaaf..4f8b002656b 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,7 +1,7 @@ use super::batch_semaphore as ll; // low level implementation use std::sync::Arc; -/// Counting semaphore performing asynchronous permit aquisition. +/// Counting semaphore performing asynchronous permit acquisition. /// /// A semaphore maintains a set of permits. Permits are used to synchronize /// access to a shared resource. A semaphore differs from a mutex in that it @@ -74,6 +74,15 @@ impl Semaphore { } } + /// Creates a new semaphore with the initial number of permits. + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(permits: usize) -> Self { + Self { + ll_sem: ll::Semaphore::const_new(permits), + } + } + /// Returns the current number of available permits. pub fn available_permits(&self) -> usize { self.ll_sem.available_permits() diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 25d25ac88ab..f044095f8fc 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -910,7 +910,7 @@ impl Waiter { } /// Try to decrement the number of permits to acquire. This returns the - /// actual number of permits that were decremented. The delta betweeen `n` + /// actual number of permits that were decremented. The delta between `n` /// and the return has been assigned to the permit and the caller must /// assign these back to the semaphore. fn try_dec_permits_to_acquire(&self, n: usize) -> usize { diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs new file mode 100644 index 00000000000..c575b5b66c5 --- /dev/null +++ b/tokio/src/sync/tests/loom_watch.rs @@ -0,0 +1,36 @@ +use crate::sync::watch; + +use loom::future::block_on; +use loom::thread; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, mut rx1) = watch::channel(1); + let mut rx2 = rx1.clone(); + let mut rx3 = rx1.clone(); + let mut rx4 = rx1.clone(); + let mut rx5 = rx1.clone(); + + let th = thread::spawn(move || { + tx.send(2).unwrap(); + }); + + block_on(rx1.changed()).unwrap(); + assert_eq!(*rx1.borrow(), 2); + + block_on(rx2.changed()).unwrap(); + assert_eq!(*rx2.borrow(), 2); + + block_on(rx3.changed()).unwrap(); + assert_eq!(*rx3.borrow(), 2); + + block_on(rx4.changed()).unwrap(); + assert_eq!(*rx4.borrow(), 2); + + block_on(rx5.changed()).unwrap(); + assert_eq!(*rx5.borrow(), 2); + + th.join().unwrap(); + }) +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index d571754c011..c637cb6b816 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -13,4 +13,5 @@ cfg_loom! { mod loom_oneshot; mod loom_semaphore_batch; mod loom_semaphore_ll; + mod loom_watch; } diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f6660b6eae9..7d1ac9e8fdc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -6,13 +6,11 @@ //! //! # Usage //! -//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are -//! the producer and sender halves of the channel. The channel is -//! created with an initial value. [`Receiver::recv`] will always -//! be ready upon creation and will yield either this initial value or -//! the latest value that has been sent by `Sender`. -//! -//! Calls to [`Receiver::recv`] will always yield the latest value. +//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer +//! and sender halves of the channel. The channel is created with an initial +//! value. The **latest** value stored in the channel is accessed with +//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new +//! value to sent by the [`Sender`] half. //! //! # Examples //! @@ -23,8 +21,8 @@ //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while let Some(value) = Some(rx.recv().await) { -//! println!("received = {:?}", value); +//! while rx.changed().await.is_ok() { +//! println!("received = {:?}", *rx.borrow()); //! } //! }); //! @@ -47,20 +45,17 @@ //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver -//! [`Receiver::recv`]: crate::sync::watch::Receiver::recv +//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed +//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow //! [`channel`]: crate::sync::watch::channel //! [`Sender::closed`]: crate::sync::watch::Sender::closed -use crate::future::poll_fn; -use crate::sync::task::AtomicWaker; +use crate::sync::Notify; -use fnv::FnvHashSet; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -70,8 +65,8 @@ pub struct Receiver { /// Pointer to the shared state shared: Arc>, - /// Pointer to the watcher's internal state - inner: Watcher, + /// Last observed version + version: usize, } /// Sends values to the associated [`Receiver`](struct@Receiver). @@ -79,7 +74,7 @@ pub struct Receiver { /// Instances are created by the [`channel`](fn@channel) function. #[derive(Debug)] pub struct Sender { - shared: Weak>, + shared: Arc>, } /// Returns a reference to the inner value @@ -92,6 +87,27 @@ pub struct Ref<'a, T> { inner: RwLockReadGuard<'a, T>, } +#[derive(Debug)] +struct Shared { + /// The most recent value + value: RwLock, + + /// The current version + /// + /// The lowest bit represents a "closed" state. The rest of the bits + /// represent the current version. + version: AtomicUsize, + + /// Tracks the number of `Receiver` instances + ref_count_rx: AtomicUsize, + + /// Notifies waiting receivers that the value changed. + notify_rx: Notify, + + /// Notifies any task listening for `Receiver` dropped events + notify_tx: Notify, +} + pub mod error { //! Watch error types @@ -112,37 +128,20 @@ pub mod error { } impl std::error::Error for SendError {} -} - -#[derive(Debug)] -struct Shared { - /// The most recent value - value: RwLock, - - /// The current version - /// - /// The lowest bit represents a "closed" state. The rest of the bits - /// represent the current version. - version: AtomicUsize, - /// All watchers - watchers: Mutex, - - /// Task to notify when all watchers drop - cancel: AtomicWaker, -} + /// Error produced when receiving a change notification. + #[derive(Debug)] + pub struct RecvError(pub(super) ()); -type Watchers = FnvHashSet; + // ===== impl RecvError ===== -/// The watcher's ID is based on the Arc's pointer. -#[derive(Clone, Debug)] -struct Watcher(Arc); + impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } -#[derive(Debug)] -struct WatchInner { - /// Last observed version - version: AtomicUsize, - waker: AtomicWaker, + impl std::error::Error for RecvError {} } const CLOSED: usize = 1; @@ -162,8 +161,8 @@ const CLOSED: usize = 1; /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { -/// while let Some(value) = Some(rx.recv().await) { -/// println!("received = {:?}", value); +/// while rx.changed().await.is_ok() { +/// println!("received = {:?}", *rx.borrow()); /// } /// }); /// @@ -174,29 +173,20 @@ const CLOSED: usize = 1; /// /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver -pub fn channel(init: T) -> (Sender, Receiver) { - const VERSION_0: usize = 0; - const VERSION_1: usize = 2; - - // We don't start knowing VERSION_1 - let inner = Watcher::new_version(VERSION_0); - - // Insert the watcher - let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default()); - watchers.insert(inner.clone()); - +pub fn channel(init: T) -> (Sender, Receiver) { let shared = Arc::new(Shared { value: RwLock::new(init), - version: AtomicUsize::new(VERSION_1), - watchers: Mutex::new(watchers), - cancel: AtomicWaker::new(), + version: AtomicUsize::new(0), + ref_count_rx: AtomicUsize::new(1), + notify_rx: Notify::new(), + notify_tx: Notify::new(), }); let tx = Sender { - shared: Arc::downgrade(&shared), + shared: shared.clone(), }; - let rx = Receiver { shared, inner }; + let rx = Receiver { shared, version: 0 }; (tx, rx) } @@ -221,41 +211,13 @@ impl Receiver { Ref { inner } } - // TODO: document - #[doc(hidden)] - pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { - // Make sure the task is up to date - self.inner.waker.register_by_ref(cx.waker()); - - let state = self.shared.version.load(SeqCst); - let version = state & !CLOSED; - - if self.inner.version.swap(version, Relaxed) != version { - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - Pending - } -} - -impl Receiver { - /// Attempts to clone the latest value sent via the channel. + /// Wait for a change notification /// - /// If this is the first time the function is called on a `Receiver` - /// instance, then the function completes immediately with the **current** - /// value held by the channel. On the next call, the function waits until - /// a new value is sent in the channel. + /// Returns when a new value has been sent by the [`Sender`] since the last + /// time `changed()` was called. When the `Sender` half is dropped, `Err` is + /// returned. /// - /// `None` is returned if the `Sender` half is dropped. + /// [`Sender`]: struct@Sender /// /// # Examples /// @@ -266,79 +228,110 @@ impl Receiver { /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "hello"); - /// /// tokio::spawn(async move { /// tx.send("goodbye").unwrap(); /// }); /// - /// // Waits for the new task to spawn and send the value. - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// assert!(rx.changed().await.is_ok()); + /// assert_eq!(*rx.borrow(), "goodbye"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// // The `tx` handle has been dropped + /// assert!(rx.changed().await.is_err()); /// } /// ``` - pub async fn recv(&mut self) -> T { - poll_fn(|cx| { - let v_ref = ready!(self.poll_recv_ref(cx)); - Poll::Ready((*v_ref).clone()) + pub async fn changed(&mut self) -> Result<(), error::RecvError> { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.shared.notify_rx.notified(); + pin!(notified); + + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) }) - .await + .await; + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + + maybe_changed(&self.shared, &mut self.version) + .expect("[bug] failed to observe change after notificaton.") } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for Receiver { - type Item = T; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let v_ref = ready!(self.poll_recv_ref(cx)); +fn maybe_changed( + shared: &Shared, + version: &mut usize, +) -> Option> { + // Load the version from the state + let state = shared.version.load(SeqCst); + let new_version = state & !CLOSED; + + if *version != new_version { + // Observe the new version and return + *version = new_version; + return Some(Ok(())); + } - Poll::Ready(Some((*v_ref).clone())) + if CLOSED == state & CLOSED { + // All receivers have dropped. + return Some(Err(error::RecvError(()))); } + + None } impl Clone for Receiver { fn clone(&self) -> Self { - let ver = self.inner.version.load(Relaxed); - let inner = Watcher::new_version(ver); + let version = self.version; let shared = self.shared.clone(); - shared.watchers.lock().unwrap().insert(inner.clone()); + // No synchronization necessary as this is only used as a counter and + // not memory access. + shared.ref_count_rx.fetch_add(1, Relaxed); - Receiver { shared, inner } + Receiver { version, shared } } } impl Drop for Receiver { fn drop(&mut self) { - self.shared.watchers.lock().unwrap().remove(&self.inner); + // No synchronization necessary as this is only used as a counter and + // not memory access. + if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { + // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` + self.shared.notify_tx.notify_waiters(); + } } } impl Sender { /// Sends a new value via the channel, notifying all receivers. pub fn send(&self, value: T) -> Result<(), error::SendError> { - let shared = match self.shared.upgrade() { - Some(shared) => shared, - // All `Watch` handles have been canceled - None => return Err(error::SendError { inner: value }), - }; - - // Replace the value - { - let mut lock = shared.value.write().unwrap(); - *lock = value; + // This is pretty much only useful as a hint anyway, so synchronization isn't critical. + if 0 == self.shared.ref_count_rx.load(Relaxed) { + return Err(error::SendError { inner: value }); } + *self.shared.value.write().unwrap() = value; + // Update the version. 2 is used so that the CLOSED bit is not set. - shared.version.fetch_add(2, SeqCst); + self.shared.version.fetch_add(2, SeqCst); // Notify all watchers - notify_all(&*shared); + self.shared.notify_rx.notify_waiters(); Ok(()) } @@ -347,37 +340,42 @@ impl Sender { /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub async fn closed(&mut self) { - poll_fn(|cx| self.poll_close(cx)).await - } + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel("hello"); + /// + /// tokio::spawn(async move { + /// // use `rx` + /// drop(rx); + /// }); + /// + /// // Waits for `rx` to drop + /// tx.closed().await; + /// println!("the `rx` handles dropped") + /// } + /// ``` + pub async fn closed(&self) { + let notified = self.shared.notify_tx.notified(); - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match self.shared.upgrade() { - Some(shared) => { - shared.cancel.register_by_ref(cx.waker()); - Pending - } - None => Ready(()), + if self.shared.ref_count_rx.load(Relaxed) == 0 { + return; } - } -} - -/// Notifies all watchers of a change -fn notify_all(shared: &Shared) { - let watchers = shared.watchers.lock().unwrap(); - for watcher in watchers.iter() { - // Notify the task - watcher.waker.wake(); + notified.await; + debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); } } impl Drop for Sender { fn drop(&mut self) { - if let Some(shared) = self.shared.upgrade() { - shared.version.fetch_or(CLOSED, SeqCst); - notify_all(&*shared); - } + self.shared.version.fetch_or(CLOSED, SeqCst); + self.shared.notify_rx.notify_waiters(); } } @@ -390,44 +388,3 @@ impl ops::Deref for Ref<'_, T> { self.inner.deref() } } - -// ===== impl Shared ===== - -impl Drop for Shared { - fn drop(&mut self) { - self.cancel.wake(); - } -} - -// ===== impl Watcher ===== - -impl Watcher { - fn new_version(version: usize) -> Self { - Watcher(Arc::new(WatchInner { - version: AtomicUsize::new(version), - waker: AtomicWaker::new(), - })) - } -} - -impl std::cmp::PartialEq for Watcher { - fn eq(&self, other: &Watcher) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } -} - -impl std::cmp::Eq for Watcher {} - -impl std::hash::Hash for Watcher { - fn hash(&self, state: &mut H) { - (&*self.0 as *const WatchInner).hash(state) - } -} - -impl std::ops::Deref for Watcher { - type Target = WatchInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index b02153b9e0f..6f755ebacb3 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -366,23 +366,25 @@ impl DelayQueue { /// This function is identical to `insert_at`, but takes a `Duration` /// instead of an `Instant`. /// - /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll_expired`]. If `when` has already been - /// reached, then `value` is immediately made available to poll. - /// - /// The return value represents the insertion and is used at an argument to - /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll_expired`] after - /// `when` is reached or by calling [`remove`]. At this point, the caller - /// must take care to not use the returned [`Key`] again as it may reference - /// a different item in the queue. + /// `value` is stored in the queue until `timeout` duration has + /// elapsed after `insert` was called. At that point, `value` will + /// be returned from [`poll_expired`]. If `timeout` a Duration of + /// zero, then `value` is immediately made available to poll. + /// + /// The return value represents the insertion and is used as an + /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a + /// token and is reused once `value` is removed from the queue + /// either by calling [`poll_expired`] after `timeout` has elapsed + /// or by calling [`remove`]. At this point, the caller must not + /// use the returned [`Key`] again as it may reference a different + /// item in the queue. /// /// See [type] level documentation for more details. /// /// # Panics /// - /// This function panics if `timeout` is greater than the maximum supported - /// duration. + /// This function panics if `timeout` is greater than the maximum + /// duration supported by the timer in the current `Runtime`. /// /// # Examples /// diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 6a97f59b5aa..52e081ce463 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -159,7 +159,6 @@ impl Interval { /// // approximately 20ms have elapsed. /// } /// ``` - #[allow(clippy::should_implement_trait)] // TODO: rename (tokio-rs/tokio#1261) pub async fn tick(&mut self) -> Instant { poll_fn(|cx| self.poll_tick(cx)).await } diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index 8f9ca48a50a..392a0e8b0c9 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -22,12 +22,6 @@ impl Pack { Pack { mask, shift } } - /// Mask used to unpack value - #[cfg(all(test, loom))] - pub(crate) const fn mask(&self) -> usize { - self.mask - } - /// Width, in bits, dedicated to storing the value. pub(crate) const fn width(&self) -> u32 { pointer_width() - (self.mask >> self.shift).leading_zeros() diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index 9549f07c56e..d493efe4514 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -72,7 +72,6 @@ unsafe impl Sync for Pointers {} impl LinkedList { /// Creates an empty linked list. - #[allow(dead_code)] // NOTE: This will get removed with: https://github.com/tokio-rs/tokio/pull/2790 pub(crate) const fn new() -> LinkedList { LinkedList { head: None, diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 6bfa62c7ca9..b3492b5edf5 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -201,7 +201,7 @@ async_assert_fn!(tokio::sync::Mutex>::lock(_): !Send & !Sync); async_assert_fn!(tokio::sync::Mutex::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::read(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::write(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock>::read(_): !Send & !Sync); @@ -225,9 +225,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver::recv(_): Send & Sync) async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): Send & Sync); async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver::changed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 2ebc4efc1ac..a6c8f8f5ec5 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -462,3 +462,20 @@ async fn async_never() -> ! { tokio::time::delay_for(Duration::from_millis(10)).await; } } + +// From https://github.com/tokio-rs/tokio/issues/2857 +#[tokio::test] +async fn mut_on_left_hand_side() { + let v = async move { + let ok = async { 1 }; + tokio::pin!(ok); + tokio::select! { + mut a = &mut ok => { + a += 1; + a + } + } + } + .await; + assert_eq!(v, 2); +} diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f571a71caf8..919bddbfb18 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -534,3 +534,25 @@ async fn blocking_send_async() { let (mut tx, _rx) = mpsc::channel::<()>(1); let _ = tx.blocking_send(()); } + +#[test] +fn ready_close_cancel_bounded() { + use futures::future::poll_fn; + + let (mut tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + { + let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); + assert_ready_ok!(ready.poll()); + } + + rx.close(); + + let mut recv = task::spawn(async { rx.recv().await }); + assert_pending!(recv.poll()); + + drop(tx); + + assert!(recv.is_woken()); +} diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5d550443f5b..9dcb0c530fa 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -4,41 +4,41 @@ use tokio::sync::watch; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; #[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); { - let mut t = spawn(rx.recv()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "one"); + // Not initially notified + let mut t = spawn(rx.changed()); + assert_pending!(t.poll()); } + assert_eq!(*rx.borrow(), "one"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); tx.send("two").unwrap(); assert!(t.is_woken()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "two"); + assert_ready_ok!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); drop(tx); - let res = assert_ready!(t.poll()); - assert_eq!(res, "two"); + assert!(t.is_woken()); + assert_ready_err!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); } #[test] @@ -47,20 +47,19 @@ fn multi_rx() { let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); - - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "one"); + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); } + assert_eq!(*rx1.borrow(), "one"); + assert_eq!(*rx2.borrow(), "one"); - let mut t2 = spawn(rx2.recv()); + let mut t2 = spawn(rx2.changed()); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -70,12 +69,12 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx1.borrow(), "two"); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); @@ -84,45 +83,29 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "three"); drop(t2); + assert_eq!(*rx2.borrow(), "three"); + { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); tx.send("four").unwrap(); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - drop(t1); - - let mut t1 = spawn(rx1.recv()); - assert_pending!(t1.poll()); - - drop(tx); - - assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); - - drop(t2); - let mut t2 = spawn(rx2.recv()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "four"); + assert_eq!(*rx2.borrow(), "four"); } #[test] @@ -133,16 +116,10 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); - } - - { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "one"); // Sending a value @@ -151,13 +128,13 @@ fn rx_observes_final_value() { tx.send("two").unwrap(); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + let mut t1 = spawn(rx.changed()); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t1 = spawn(rx.recv()); + let mut t1 = spawn(rx.changed()); assert_pending!(t1.poll()); tx.send("three").unwrap(); @@ -165,20 +142,20 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); } #[test] fn poll_close() { - let (mut tx, rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); { let mut t = spawn(tx.closed()); @@ -192,40 +169,3 @@ fn poll_close() { assert!(tx.send("two").is_err()); } - -#[test] -fn stream_impl() { - use tokio::stream::StreamExt; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.send("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert_eq!(res, Some("two")); - } -}