From 30d4ec0a209c0f71b11d8f358423ec22b1761ff4 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 23 Aug 2020 18:47:20 +0300 Subject: [PATCH 01/22] io: add ReaderStream (#2714) --- tokio/src/io/mod.rs | 1 + tokio/src/io/util/mod.rs | 3 + tokio/src/io/util/reader_stream.rs | 105 +++++++++++++++++++++++++++++ tokio/tests/io_reader_stream.rs | 64 ++++++++++++++++++ 4 files changed, 173 insertions(+) create mode 100644 tokio/src/io/util/reader_stream.rs create mode 100644 tokio/tests/io_reader_stream.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9e0e063195c..37da942ff3d 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,6 +236,7 @@ cfg_io_util! { cfg_stream! { pub use util::{stream_reader, StreamReader}; + pub use util::{reader_stream, ReaderStream}; } } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 609ff2386a6..782a02a8f74 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -66,6 +66,9 @@ cfg_io_util! { cfg_stream! { mod stream_reader; pub use stream_reader::{stream_reader, StreamReader}; + + mod reader_stream; + pub use reader_stream::{reader_stream, ReaderStream}; } mod take; diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs new file mode 100644 index 00000000000..51651cede4d --- /dev/null +++ b/tokio/src/io/util/reader_stream.rs @@ -0,0 +1,105 @@ +use crate::io::AsyncRead; +use crate::stream::Stream; +use bytes::{Bytes, BytesMut}; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Convert an [`AsyncRead`] implementor into a + /// [`Stream`] of Result<[`Bytes`], std::io::Error>. + /// After first error it will stop. + /// Additionally, this stream is fused: after it returns None at some + /// moment, it is guaranteed that further `next()`, `poll_next()` and + /// similar functions will instantly return None. + /// + /// This type can be created using the [`reader_stream`] function + /// + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`Stream`]: crate::stream::Stream + /// [`Bytes`]: bytes::Bytes + /// [`reader_stream`]: crate::io::reader_stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct ReaderStream { + // Reader itself. + // None if we had error reading from the `reader` in the past. + #[pin] + reader: Option, + // Working buffer, used to optimize allocations. + // # Capacity behavior + // Initially `buf` is empty. Also it's getting smaller and smaller + // during polls (because its chunks are returned to stream user). + // But when it's capacity reaches 0, it is growed. + buf: BytesMut, + } +} + +/// Convert an [`AsyncRead`] implementor into a +/// [`Stream`] of Result<[`Bytes`], std::io::Error>. +/// +/// # Example +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// use tokio::stream::StreamExt; +/// +/// let data: &[u8] = b"hello, world!"; +/// let mut stream = tokio::io::reader_stream(data); +/// let mut stream_contents = Vec::new(); +/// while let Some(chunk) = stream.next().await { +/// stream_contents.extend_from_slice(chunk?.as_ref()); +/// } +/// assert_eq!(stream_contents, data); +/// # Ok(()) +/// # } +/// ``` +/// +/// [`AsyncRead`]: crate::io::AsyncRead +/// [`Stream`]: crate::stream::Stream +/// [`Bytes`]: bytes::Bytes +pub fn reader_stream(reader: R) -> ReaderStream +where + R: AsyncRead, +{ + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + } +} + +const CAPACITY: usize = 4096; + +impl Stream for ReaderStream +where + R: AsyncRead, +{ + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + match reader.poll_read_buf(cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} diff --git a/tokio/tests/io_reader_stream.rs b/tokio/tests/io_reader_stream.rs new file mode 100644 index 00000000000..6546a0ef4da --- /dev/null +++ b/tokio/tests/io_reader_stream.rs @@ -0,0 +1,64 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; +use tokio::stream::StreamExt; + +/// produces at most `remaining` zeros, that returns error. +/// each time it reads at most 31 byte. +struct Reader { + remaining: usize, +} + +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = Pin::into_inner(self); + assert_ne!(buf.len(), 0); + if this.remaining > 0 { + let n = std::cmp::min(this.remaining, buf.len()); + let n = std::cmp::min(n, 31); + for x in &mut buf[..n] { + *x = 0; + } + this.remaining -= n; + Poll::Ready(Ok(n)) + } else { + Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) + } + } +} + +#[tokio::test] +async fn correct_behavior_on_errors() { + let reader = Reader { remaining: 8000 }; + let mut stream = tokio::io::reader_stream(reader); + let mut zeros_received = 0; + let mut had_error = false; + loop { + let item = stream.next().await.unwrap(); + match item { + Ok(bytes) => { + let bytes = &*bytes; + for byte in bytes { + assert_eq!(*byte, 0); + zeros_received += 1; + } + } + Err(_) => { + assert!(!had_error); + had_error = true; + break; + } + } + } + + assert!(had_error); + assert_eq!(zeros_received, 8000); + assert!(stream.next().await.is_none()); +} From f0328f78103eb73df0f532b7e69eaca4b4fda6c7 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Thu, 27 Aug 2020 08:23:38 +0200 Subject: [PATCH 02/22] sync: implement map methods of parking_lot fame (#2771) * sync: Implement map methods of parking_lot fame Generally, this mimics the way `MappedRwLock*Guard`s are implemented in `parking_lot`. By storing a raw pointer in the guards themselves referencing the mapped data and maintaining type invariants through `PhantomData`. I didn't try to think too much about this, so if someone has objections I'd love to hear them. I've also dropped the internal use of `ReleasingPermit`, since it made the guards unecessarily large. The number of permits that need to be released are already known by the guards themselves, and is instead governed directly in the relevant `Drop` impls. This has the benefit of making the guards as small as possible, for the non-mapped variants this means a single reference is enough. `fmt::Debug` impls have been adjusted to behave exactly like the delegating impls in `parking_lot`. `fmt::Display` impls have been added for all guard types which behave the same. This does change the format of debug impls, for which I'm not sure if we provide any guarantees. --- .github/workflows/ci.yml | 4 +- tokio/src/sync/rwlock.rs | 332 ++++++++++++++++++++++++++++++++++----- 2 files changed, 298 insertions(+), 38 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f24bdb88cd3..ecc27a9168f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,8 @@ on: push: - branches: ["master"] + branches: ["v0.2.x"] pull_request: - branches: ["master"] + branches: ["v0.2.x"] name: CI diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 3d2a2f7a8fc..1bb579319d0 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,5 +1,8 @@ -use crate::sync::batch_semaphore::{AcquireError, Semaphore}; +use crate::sync::batch_semaphore::Semaphore; use std::cell::UnsafeCell; +use std::fmt; +use std::marker; +use std::mem; use std::ops; #[cfg(not(loom))] @@ -83,10 +86,140 @@ pub struct RwLock { /// [`RwLock`]. /// /// [`read`]: method@RwLock::read -#[derive(Debug)] +/// [`RwLock`]: struct@RwLock pub struct RwLockReadGuard<'a, T: ?Sized> { - permit: ReleasingPermit<'a, T>, - lock: &'a RwLock, + s: &'a Semaphore, + data: *const T, + marker: marker::PhantomData<&'a T>, +} + +impl<'a, T> RwLockReadGuard<'a, T> { + /// Make a new `RwLockReadGuard` for a component of the locked data. + /// + /// This operation cannot fail as the `RwLockReadGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be + /// used as `RwLockReadGuard::map(...)`. A method would interfere with + /// methods of the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockReadGuard::map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockReadGuard::map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockReadGuard.html#method.map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockReadGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// let guard = lock.read().await; + /// let guard = RwLockReadGuard::map(guard, |f| &f.0); + /// + /// assert_eq!(1, *guard); + /// # } + /// ``` + #[inline] + pub fn map(this: Self, f: F) -> RwLockReadGuard<'a, U> + where + F: FnOnce(&T) -> &U, + { + let data = f(&*this) as *const U; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`RwLockReadGuard`] for a component of the + /// locked data. The original guard is returned if the closure returns + /// `None`. + /// + /// This operation cannot fail as the `RwLockReadGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be used as + /// `RwLockReadGuard::try_map(..)`. A method would interfere with methods of the + /// same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockReadGuard::try_map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockReadGuard::try_map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockReadGuard.html#method.try_map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockReadGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// let guard = lock.read().await; + /// let guard = RwLockReadGuard::try_map(guard, |f| Some(&f.0)).expect("should not fail"); + /// + /// assert_eq!(1, *guard); + /// # } + /// ``` + #[inline] + pub fn try_map(this: Self, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + { + let data = match f(&*this) { + Some(data) => data as *const U, + None => return Err(this), + }; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + Ok(RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<'a, T: ?Sized> fmt::Debug for RwLockReadGuard<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> fmt::Display for RwLockReadGuard<'a, T> +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> { + fn drop(&mut self) { + self.s.release(1); + } } /// RAII structure used to release the exclusive write access of a lock when @@ -97,32 +230,143 @@ pub struct RwLockReadGuard<'a, T: ?Sized> { /// /// [`write`]: method@RwLock::write /// [`RwLock`]: struct@RwLock -#[derive(Debug)] pub struct RwLockWriteGuard<'a, T: ?Sized> { - permit: ReleasingPermit<'a, T>, - lock: &'a RwLock, + s: &'a Semaphore, + data: *mut T, + marker: marker::PhantomData<&'a mut T>, } -// Wrapper arround Permit that releases on Drop -#[derive(Debug)] -struct ReleasingPermit<'a, T: ?Sized> { - num_permits: u16, - lock: &'a RwLock, +impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { + /// Make a new `RwLockWriteGuard` for a component of the locked data. + /// + /// This operation cannot fail as the `RwLockWriteGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be used as + /// `RwLockWriteGuard::map(..)`. A method would interfere with methods of + /// the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockWriteGuard::map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockWriteGuard::map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockWriteGuard.html#method.map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockWriteGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// { + /// let mut mapped = RwLockWriteGuard::map(lock.write().await, |f| &mut f.0); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *lock.read().await); + /// # } + /// ``` + #[inline] + pub fn map(mut this: Self, f: F) -> RwLockWriteGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + RwLockWriteGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`RwLockWriteGuard`] for a component of + /// the locked data. The original guard is returned if the closure returns + /// `None`. + /// + /// This operation cannot fail as the `RwLockWriteGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be + /// used as `RwLockWriteGuard::try_map(...)`. A method would interfere with + /// methods of the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockWriteGuard::try_map`] from + /// the [`parking_lot` crate]. + /// + /// [`RwLockWriteGuard::try_map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockWriteGuard.html#method.try_map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockWriteGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// { + /// let guard = lock.write().await; + /// let mut guard = RwLockWriteGuard::try_map(guard, |f| Some(&mut f.0)).expect("should not fail"); + /// *guard = 2; + /// } + /// + /// assert_eq!(Foo(2), *lock.read().await); + /// # } + /// ``` + #[inline] + pub fn try_map(mut this: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + Ok(RwLockWriteGuard { + s, + data, + marker: marker::PhantomData, + }) + } } -impl<'a, T: ?Sized> ReleasingPermit<'a, T> { - async fn acquire( - lock: &'a RwLock, - num_permits: u16, - ) -> Result, AcquireError> { - lock.s.acquire(num_permits.into()).await?; - Ok(Self { num_permits, lock }) +impl<'a, T: ?Sized> fmt::Debug for RwLockWriteGuard<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> fmt::Display for RwLockWriteGuard<'a, T> +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) } } -impl Drop for ReleasingPermit<'_, T> { +impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> { fn drop(&mut self) { - self.lock.s.release(self.num_permits as usize); + self.s.release(MAX_READS); } } @@ -139,9 +383,11 @@ fn bounds() { check_sync::>(); check_unpin::>(); + check_send::>(); check_sync::>(); check_unpin::>(); + check_send::>(); check_sync::>(); check_unpin::>(); @@ -155,8 +401,17 @@ fn bounds() { // RwLock. unsafe impl Send for RwLock where T: ?Sized + Send {} unsafe impl Sync for RwLock where T: ?Sized + Send + Sync {} +// NB: These impls need to be explicit since we're storing a raw pointer. +// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over +// `T` is `Send`. +unsafe impl Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {} unsafe impl Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} +// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over +// `T` is `Send` - but since this is also provides mutable access, we need to +// make sure that `T` is `Send` since its value can be sent across thread +// boundaries. +unsafe impl Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} impl RwLock { /// Creates a new instance of an `RwLock` which is unlocked. @@ -210,12 +465,16 @@ impl RwLock { ///} /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - let permit = ReleasingPermit::acquire(self, 1).await.unwrap_or_else(|_| { + self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - RwLockReadGuard { lock: self, permit } + RwLockReadGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + } } /// Locks this rwlock with exclusive write access, causing the current task @@ -241,15 +500,16 @@ impl RwLock { ///} /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - let permit = ReleasingPermit::acquire(self, MAX_READS as u16) - .await - .unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. - unreachable!() - }); - - RwLockWriteGuard { lock: self, permit } + self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + RwLockWriteGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + } } /// Consumes the lock, returning the underlying data. @@ -265,7 +525,7 @@ impl ops::Deref for RwLockReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.lock.c.get() } + unsafe { &*self.data } } } @@ -273,13 +533,13 @@ impl ops::Deref for RwLockWriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.lock.c.get() } + unsafe { &*self.data } } } impl ops::DerefMut for RwLockWriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.lock.c.get() } + unsafe { &mut *self.data } } } @@ -289,7 +549,7 @@ impl From for RwLock { } } -impl Default for RwLock +impl Default for RwLock where T: Default, { From ce0af8f7a188c9b8b8b4dcc3847674fd0afbe769 Mon Sep 17 00:00:00 2001 From: Zephyr Shannon Date: Fri, 11 Sep 2020 12:44:33 -0700 Subject: [PATCH 03/22] docs: more doc fixes (#2831) Previous docs look like they were based on the docs for `insert_at`. Changed names of variables referred to and the explanation of when the value will be returned and under what condition it will be immediately available to make sense for a Duration argument instead of an Instant. --- tokio/src/sync/mod.rs | 2 +- tokio/src/sync/rwlock.rs | 2 +- tokio/src/time/delay_queue.rs | 26 ++++++++++++++------------ 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 5d66512d5ad..d50041374d8 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -406,7 +406,7 @@ //! //! The remaining synchronization primitives focus on synchronizing state. //! These are asynchronous equivalents to versions provided by `std`. They -//! operate in a similar way as their `std` counterparts parts but will wait +//! operate in a similar way as their `std` counterparts but will wait //! asynchronously instead of blocking the thread. //! //! * [`Barrier`](Barrier) Ensures multiple tasks will wait for each other to diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 1bb579319d0..373ab109e90 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -11,7 +11,7 @@ const MAX_READS: usize = 32; #[cfg(loom)] const MAX_READS: usize = 10; -/// An asynchronous reader-writer lock +/// An asynchronous reader-writer lock. /// /// This type of lock allows a number of readers or at most one writer at any /// point in time. The write portion of this lock typically allows modification diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index b02153b9e0f..6f755ebacb3 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -366,23 +366,25 @@ impl DelayQueue { /// This function is identical to `insert_at`, but takes a `Duration` /// instead of an `Instant`. /// - /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll_expired`]. If `when` has already been - /// reached, then `value` is immediately made available to poll. - /// - /// The return value represents the insertion and is used at an argument to - /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll_expired`] after - /// `when` is reached or by calling [`remove`]. At this point, the caller - /// must take care to not use the returned [`Key`] again as it may reference - /// a different item in the queue. + /// `value` is stored in the queue until `timeout` duration has + /// elapsed after `insert` was called. At that point, `value` will + /// be returned from [`poll_expired`]. If `timeout` a Duration of + /// zero, then `value` is immediately made available to poll. + /// + /// The return value represents the insertion and is used as an + /// argument to [`remove`] and [`reset`]. Note that [`Key`] is a + /// token and is reused once `value` is removed from the queue + /// either by calling [`poll_expired`] after `timeout` has elapsed + /// or by calling [`remove`]. At this point, the caller must not + /// use the returned [`Key`] again as it may reference a different + /// item in the queue. /// /// See [type] level documentation for more details. /// /// # Panics /// - /// This function panics if `timeout` is greater than the maximum supported - /// duration. + /// This function panics if `timeout` is greater than the maximum + /// duration supported by the timer in the current `Runtime`. /// /// # Examples /// From c5a9ede157691ac5ca15283735bd666c6b016188 Mon Sep 17 00:00:00 2001 From: Max Heller Date: Fri, 11 Sep 2020 16:00:04 -0400 Subject: [PATCH 04/22] sync: write guard to read guard downgrading for sync::RwLock (#2733) --- tokio/src/sync/rwlock.rs | 52 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 373ab109e90..650a7cf607a 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -344,6 +344,58 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { marker: marker::PhantomData, }) } + + /// Atomically downgrades a write lock into a read lock without allowing + /// any writers to take exclusive access of the lock in the meantime. + /// + /// **Note:** This won't *necessarily* allow any additional readers to acquire + /// locks, since [`RwLock`] is fair and it is possible that a writer is next + /// in line. + /// + /// Returns an RAII guard which will drop the read access of this rwlock + /// when dropped. + /// + /// # Examples + /// + /// ``` + /// # use tokio::sync::RwLock; + /// # use std::sync::Arc; + /// # + /// # #[tokio::main] + /// # async fn main() { + /// let lock = Arc::new(RwLock::new(1)); + /// + /// let n = lock.write().await; + /// + /// let cloned_lock = lock.clone(); + /// let handle = tokio::spawn(async move { + /// *cloned_lock.write().await = 2; + /// }); + /// + /// let n = n.downgrade(); + /// assert_eq!(*n, 1, "downgrade is atomic"); + /// + /// assert_eq!(*lock.read().await, 1, "additional readers can obtain locks"); + /// + /// drop(n); + /// handle.await.unwrap(); + /// assert_eq!(*lock.read().await, 2, "second writer obtained write lock"); + /// # } + /// ``` + /// + /// [`RwLock`]: struct@RwLock + pub fn downgrade(self) -> RwLockReadGuard<'a, T> { + let RwLockWriteGuard { s, data, .. } = self; + + // Release all but one of the permits held by the write guard + s.release(MAX_READS - 1); + + RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + } + } } impl<'a, T: ?Sized> fmt::Debug for RwLockWriteGuard<'a, T> From 2bc9a4815259c8ff4daa5e24f128ec826970d17f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 11 Sep 2020 15:14:45 -0700 Subject: [PATCH 05/22] sync: tweak `watch` API (#2814) Decouples getting the latest `watch` value from receiving the change notification. The `Receiver` async method becomes `Receiver::changed()`. The latest value is obtained from `Receiver::borrow()`. The implementation is updated to use `Notify`. This requires adding `Notify::notify_waiters`. This method is generally useful but is kept private for now. --- tokio/src/sync/barrier.rs | 4 +- tokio/src/sync/mod.rs | 10 +- tokio/src/sync/notify.rs | 52 ++++- tokio/src/sync/tests/loom_watch.rs | 20 ++ tokio/src/sync/tests/mod.rs | 1 + tokio/src/sync/watch.rs | 353 +++++++++++++---------------- tokio/tests/async_send_sync.rs | 6 +- tokio/tests/sync_watch.rs | 154 ++++--------- 8 files changed, 281 insertions(+), 319 deletions(-) create mode 100644 tokio/src/sync/tests/loom_watch.rs diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index 23713251e9d..fddb3a5b794 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -110,9 +110,11 @@ impl Barrier { let mut wait = self.wait.clone(); loop { + let _ = wait.changed().await; + // note that the first time through the loop, this _will_ yield a generation // immediately, since we cloned a receiver that has never seen any values. - if wait.recv().await >= generation { + if *wait.borrow() >= generation { break; } } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index d50041374d8..522205a0029 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -355,10 +355,8 @@ //! let op = my_async_operation(); //! tokio::pin!(op); //! -//! // Receive the **initial** configuration value. As this is the -//! // first time the config is received from the watch, it will -//! // always complete immediatedly. -//! let mut conf = rx.recv().await; +//! // Get the initial config value +//! let mut conf = rx.borrow().clone(); //! //! let mut op_start = Instant::now(); //! let mut delay = time::delay_until(op_start + conf.timeout); @@ -375,8 +373,8 @@ //! // Restart the timeout //! delay = time::delay_until(op_start + conf.timeout); //! } -//! new_conf = rx.recv() => { -//! conf = new_conf; +//! _ = rx.changed() => { +//! conf = rx.borrow().clone(); //! //! // The configuration has been updated. Update the //! // `delay` using the new `timeout` value. diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4321c974608..74c69e50cb8 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -123,7 +123,7 @@ struct Waiter { /// Future returned from `notified()` #[derive(Debug)] -struct Notified<'a> { +pub struct Notified<'a> { /// The `Notify` being received on. notify: &'a Notify, @@ -172,6 +172,12 @@ impl Notify { /// Wait for a notification. /// + /// Equivalent to: + /// + /// ```ignore + /// async fn notified(&self); + /// ``` + /// /// Each `Notify` value holds a single permit. If a permit is available from /// an earlier call to [`notify_one()`], then `notified().await` will complete /// immediately, consuming that permit. Otherwise, `notified().await` waits @@ -199,7 +205,7 @@ impl Notify { /// notify.notify_one(); /// } /// ``` - pub async fn notified(&self) { + pub fn notified(&self) -> Notified<'_> { Notified { notify: self, state: State::Init, @@ -210,7 +216,6 @@ impl Notify { _p: PhantomPinned, }), } - .await } /// Notifies a waiting task @@ -279,6 +284,45 @@ impl Notify { waker.wake(); } } + + /// Notifies all waiting tasks + pub(crate) fn notify_waiters(&self) { + // There are waiters, the lock must be acquired to notify. + let mut waiters = self.waiters.lock().unwrap(); + + // The state must be reloaded while the lock is held. The state may only + // transition out of WAITING while the lock is held. + let curr = self.state.load(SeqCst); + + if let EMPTY | NOTIFIED = curr { + // There are no waiting tasks. In this case, no synchronization is + // established between `notify` and `notified().await`. + return; + } + + // At this point, it is guaranteed that the state will not + // concurrently change, as holding the lock is required to + // transition **out** of `WAITING`. + // + // Get pending waiters + while let Some(mut waiter) = waiters.pop_back() { + // Safety: `waiters` lock is still held. + let waiter = unsafe { waiter.as_mut() }; + + assert!(!waiter.notified); + + waiter.notified = true; + + if let Some(waker) = waiter.waker.take() { + waker.wake(); + } + } + + // All waiters have been notified, the state must be transitioned to + // `EMPTY`. As transitioning **from** `WAITING` requires the lock to be + // held, a `store` is sufficient. + self.state.store(EMPTY, SeqCst); + } } impl Default for Notify { @@ -430,6 +474,8 @@ impl Future for Notified<'_> { waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) }); *state = Waiting; + + return Poll::Pending; } Waiting => { // Currently in the "Waiting" state, implying the caller has diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs new file mode 100644 index 00000000000..7944cab8d24 --- /dev/null +++ b/tokio/src/sync/tests/loom_watch.rs @@ -0,0 +1,20 @@ +use crate::sync::watch; + +use loom::future::block_on; +use loom::thread; + +#[test] +fn smoke() { + loom::model(|| { + let (tx, mut rx) = watch::channel(1); + + let th = thread::spawn(move || { + tx.send(2).unwrap(); + }); + + block_on(rx.changed()).unwrap(); + assert_eq!(*rx.borrow(), 2); + + th.join().unwrap(); + }) +} diff --git a/tokio/src/sync/tests/mod.rs b/tokio/src/sync/tests/mod.rs index d571754c011..c637cb6b816 100644 --- a/tokio/src/sync/tests/mod.rs +++ b/tokio/src/sync/tests/mod.rs @@ -13,4 +13,5 @@ cfg_loom! { mod loom_oneshot; mod loom_semaphore_batch; mod loom_semaphore_ll; + mod loom_watch; } diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index f6660b6eae9..7d1ac9e8fdc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -6,13 +6,11 @@ //! //! # Usage //! -//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are -//! the producer and sender halves of the channel. The channel is -//! created with an initial value. [`Receiver::recv`] will always -//! be ready upon creation and will yield either this initial value or -//! the latest value that has been sent by `Sender`. -//! -//! Calls to [`Receiver::recv`] will always yield the latest value. +//! [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer +//! and sender halves of the channel. The channel is created with an initial +//! value. The **latest** value stored in the channel is accessed with +//! [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new +//! value to sent by the [`Sender`] half. //! //! # Examples //! @@ -23,8 +21,8 @@ //! let (tx, mut rx) = watch::channel("hello"); //! //! tokio::spawn(async move { -//! while let Some(value) = Some(rx.recv().await) { -//! println!("received = {:?}", value); +//! while rx.changed().await.is_ok() { +//! println!("received = {:?}", *rx.borrow()); //! } //! }); //! @@ -47,20 +45,17 @@ //! //! [`Sender`]: crate::sync::watch::Sender //! [`Receiver`]: crate::sync::watch::Receiver -//! [`Receiver::recv`]: crate::sync::watch::Receiver::recv +//! [`Receiver::changed()`]: crate::sync::watch::Receiver::changed +//! [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow //! [`channel`]: crate::sync::watch::channel //! [`Sender::closed`]: crate::sync::watch::Sender::closed -use crate::future::poll_fn; -use crate::sync::task::AtomicWaker; +use crate::sync::Notify; -use fnv::FnvHashSet; use std::ops; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Relaxed, SeqCst}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, Weak}; -use std::task::Poll::{Pending, Ready}; -use std::task::{Context, Poll}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; /// Receives values from the associated [`Sender`](struct@Sender). /// @@ -70,8 +65,8 @@ pub struct Receiver { /// Pointer to the shared state shared: Arc>, - /// Pointer to the watcher's internal state - inner: Watcher, + /// Last observed version + version: usize, } /// Sends values to the associated [`Receiver`](struct@Receiver). @@ -79,7 +74,7 @@ pub struct Receiver { /// Instances are created by the [`channel`](fn@channel) function. #[derive(Debug)] pub struct Sender { - shared: Weak>, + shared: Arc>, } /// Returns a reference to the inner value @@ -92,6 +87,27 @@ pub struct Ref<'a, T> { inner: RwLockReadGuard<'a, T>, } +#[derive(Debug)] +struct Shared { + /// The most recent value + value: RwLock, + + /// The current version + /// + /// The lowest bit represents a "closed" state. The rest of the bits + /// represent the current version. + version: AtomicUsize, + + /// Tracks the number of `Receiver` instances + ref_count_rx: AtomicUsize, + + /// Notifies waiting receivers that the value changed. + notify_rx: Notify, + + /// Notifies any task listening for `Receiver` dropped events + notify_tx: Notify, +} + pub mod error { //! Watch error types @@ -112,37 +128,20 @@ pub mod error { } impl std::error::Error for SendError {} -} - -#[derive(Debug)] -struct Shared { - /// The most recent value - value: RwLock, - - /// The current version - /// - /// The lowest bit represents a "closed" state. The rest of the bits - /// represent the current version. - version: AtomicUsize, - /// All watchers - watchers: Mutex, - - /// Task to notify when all watchers drop - cancel: AtomicWaker, -} + /// Error produced when receiving a change notification. + #[derive(Debug)] + pub struct RecvError(pub(super) ()); -type Watchers = FnvHashSet; + // ===== impl RecvError ===== -/// The watcher's ID is based on the Arc's pointer. -#[derive(Clone, Debug)] -struct Watcher(Arc); + impl fmt::Display for RecvError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "channel closed") + } + } -#[derive(Debug)] -struct WatchInner { - /// Last observed version - version: AtomicUsize, - waker: AtomicWaker, + impl std::error::Error for RecvError {} } const CLOSED: usize = 1; @@ -162,8 +161,8 @@ const CLOSED: usize = 1; /// let (tx, mut rx) = watch::channel("hello"); /// /// tokio::spawn(async move { -/// while let Some(value) = Some(rx.recv().await) { -/// println!("received = {:?}", value); +/// while rx.changed().await.is_ok() { +/// println!("received = {:?}", *rx.borrow()); /// } /// }); /// @@ -174,29 +173,20 @@ const CLOSED: usize = 1; /// /// [`Sender`]: struct@Sender /// [`Receiver`]: struct@Receiver -pub fn channel(init: T) -> (Sender, Receiver) { - const VERSION_0: usize = 0; - const VERSION_1: usize = 2; - - // We don't start knowing VERSION_1 - let inner = Watcher::new_version(VERSION_0); - - // Insert the watcher - let mut watchers = FnvHashSet::with_capacity_and_hasher(0, Default::default()); - watchers.insert(inner.clone()); - +pub fn channel(init: T) -> (Sender, Receiver) { let shared = Arc::new(Shared { value: RwLock::new(init), - version: AtomicUsize::new(VERSION_1), - watchers: Mutex::new(watchers), - cancel: AtomicWaker::new(), + version: AtomicUsize::new(0), + ref_count_rx: AtomicUsize::new(1), + notify_rx: Notify::new(), + notify_tx: Notify::new(), }); let tx = Sender { - shared: Arc::downgrade(&shared), + shared: shared.clone(), }; - let rx = Receiver { shared, inner }; + let rx = Receiver { shared, version: 0 }; (tx, rx) } @@ -221,41 +211,13 @@ impl Receiver { Ref { inner } } - // TODO: document - #[doc(hidden)] - pub fn poll_recv_ref<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> { - // Make sure the task is up to date - self.inner.waker.register_by_ref(cx.waker()); - - let state = self.shared.version.load(SeqCst); - let version = state & !CLOSED; - - if self.inner.version.swap(version, Relaxed) != version { - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - if CLOSED == state & CLOSED { - // The `Store` handle has been dropped. - let inner = self.shared.value.read().unwrap(); - - return Ready(Ref { inner }); - } - - Pending - } -} - -impl Receiver { - /// Attempts to clone the latest value sent via the channel. + /// Wait for a change notification /// - /// If this is the first time the function is called on a `Receiver` - /// instance, then the function completes immediately with the **current** - /// value held by the channel. On the next call, the function waits until - /// a new value is sent in the channel. + /// Returns when a new value has been sent by the [`Sender`] since the last + /// time `changed()` was called. When the `Sender` half is dropped, `Err` is + /// returned. /// - /// `None` is returned if the `Sender` half is dropped. + /// [`Sender`]: struct@Sender /// /// # Examples /// @@ -266,79 +228,110 @@ impl Receiver { /// async fn main() { /// let (tx, mut rx) = watch::channel("hello"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "hello"); - /// /// tokio::spawn(async move { /// tx.send("goodbye").unwrap(); /// }); /// - /// // Waits for the new task to spawn and send the value. - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// assert!(rx.changed().await.is_ok()); + /// assert_eq!(*rx.borrow(), "goodbye"); /// - /// let v = rx.recv().await; - /// assert_eq!(v, "goodbye"); + /// // The `tx` handle has been dropped + /// assert!(rx.changed().await.is_err()); /// } /// ``` - pub async fn recv(&mut self) -> T { - poll_fn(|cx| { - let v_ref = ready!(self.poll_recv_ref(cx)); - Poll::Ready((*v_ref).clone()) + pub async fn changed(&mut self) -> Result<(), error::RecvError> { + use std::future::Future; + use std::pin::Pin; + use std::task::Poll; + + // In order to avoid a race condition, we first request a notification, + // **then** check the current value's version. If a new version exists, + // the notification request is dropped. Requesting the notification + // requires polling the future once. + let notified = self.shared.notify_rx.notified(); + pin!(notified); + + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) }) - .await + .await; + + if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { + return ret; + } + + notified.await; + + maybe_changed(&self.shared, &mut self.version) + .expect("[bug] failed to observe change after notificaton.") } } -#[cfg(feature = "stream")] -impl crate::stream::Stream for Receiver { - type Item = T; - - fn poll_next(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let v_ref = ready!(self.poll_recv_ref(cx)); +fn maybe_changed( + shared: &Shared, + version: &mut usize, +) -> Option> { + // Load the version from the state + let state = shared.version.load(SeqCst); + let new_version = state & !CLOSED; + + if *version != new_version { + // Observe the new version and return + *version = new_version; + return Some(Ok(())); + } - Poll::Ready(Some((*v_ref).clone())) + if CLOSED == state & CLOSED { + // All receivers have dropped. + return Some(Err(error::RecvError(()))); } + + None } impl Clone for Receiver { fn clone(&self) -> Self { - let ver = self.inner.version.load(Relaxed); - let inner = Watcher::new_version(ver); + let version = self.version; let shared = self.shared.clone(); - shared.watchers.lock().unwrap().insert(inner.clone()); + // No synchronization necessary as this is only used as a counter and + // not memory access. + shared.ref_count_rx.fetch_add(1, Relaxed); - Receiver { shared, inner } + Receiver { version, shared } } } impl Drop for Receiver { fn drop(&mut self) { - self.shared.watchers.lock().unwrap().remove(&self.inner); + // No synchronization necessary as this is only used as a counter and + // not memory access. + if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { + // This is the last `Receiver` handle, tasks waiting on `Sender::closed()` + self.shared.notify_tx.notify_waiters(); + } } } impl Sender { /// Sends a new value via the channel, notifying all receivers. pub fn send(&self, value: T) -> Result<(), error::SendError> { - let shared = match self.shared.upgrade() { - Some(shared) => shared, - // All `Watch` handles have been canceled - None => return Err(error::SendError { inner: value }), - }; - - // Replace the value - { - let mut lock = shared.value.write().unwrap(); - *lock = value; + // This is pretty much only useful as a hint anyway, so synchronization isn't critical. + if 0 == self.shared.ref_count_rx.load(Relaxed) { + return Err(error::SendError { inner: value }); } + *self.shared.value.write().unwrap() = value; + // Update the version. 2 is used so that the CLOSED bit is not set. - shared.version.fetch_add(2, SeqCst); + self.shared.version.fetch_add(2, SeqCst); // Notify all watchers - notify_all(&*shared); + self.shared.notify_rx.notify_waiters(); Ok(()) } @@ -347,37 +340,42 @@ impl Sender { /// /// This allows the producer to get notified when interest in the produced /// values is canceled and immediately stop doing work. - pub async fn closed(&mut self) { - poll_fn(|cx| self.poll_close(cx)).await - } + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::watch; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = watch::channel("hello"); + /// + /// tokio::spawn(async move { + /// // use `rx` + /// drop(rx); + /// }); + /// + /// // Waits for `rx` to drop + /// tx.closed().await; + /// println!("the `rx` handles dropped") + /// } + /// ``` + pub async fn closed(&self) { + let notified = self.shared.notify_tx.notified(); - fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<()> { - match self.shared.upgrade() { - Some(shared) => { - shared.cancel.register_by_ref(cx.waker()); - Pending - } - None => Ready(()), + if self.shared.ref_count_rx.load(Relaxed) == 0 { + return; } - } -} - -/// Notifies all watchers of a change -fn notify_all(shared: &Shared) { - let watchers = shared.watchers.lock().unwrap(); - for watcher in watchers.iter() { - // Notify the task - watcher.waker.wake(); + notified.await; + debug_assert_eq!(0, self.shared.ref_count_rx.load(Relaxed)); } } impl Drop for Sender { fn drop(&mut self) { - if let Some(shared) = self.shared.upgrade() { - shared.version.fetch_or(CLOSED, SeqCst); - notify_all(&*shared); - } + self.shared.version.fetch_or(CLOSED, SeqCst); + self.shared.notify_rx.notify_waiters(); } } @@ -390,44 +388,3 @@ impl ops::Deref for Ref<'_, T> { self.inner.deref() } } - -// ===== impl Shared ===== - -impl Drop for Shared { - fn drop(&mut self) { - self.cancel.wake(); - } -} - -// ===== impl Watcher ===== - -impl Watcher { - fn new_version(version: usize) -> Self { - Watcher(Arc::new(WatchInner { - version: AtomicUsize::new(version), - waker: AtomicWaker::new(), - })) - } -} - -impl std::cmp::PartialEq for Watcher { - fn eq(&self, other: &Watcher) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } -} - -impl std::cmp::Eq for Watcher {} - -impl std::hash::Hash for Watcher { - fn hash(&self, state: &mut H) { - (&*self.0 as *const WatchInner).hash(state) - } -} - -impl std::ops::Deref for Watcher { - type Target = WatchInner; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index f1eed0e410d..e7011e3b4c2 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -205,7 +205,7 @@ async_assert_fn!(tokio::sync::Mutex>::lock(_): !Send & !Sync); async_assert_fn!(tokio::sync::Mutex::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): Send & Sync); async_assert_fn!(tokio::sync::Mutex>::lock_owned(_): !Send & !Sync); -async_assert_fn!(tokio::sync::Notify::notified(_): Send & !Sync); +async_assert_fn!(tokio::sync::Notify::notified(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::read(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock::write(_): Send & Sync); async_assert_fn!(tokio::sync::RwLock>::read(_): !Send & !Sync); @@ -229,9 +229,7 @@ async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver::recv(_): Send & Sync) async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): Send & Sync); async_assert_fn!(tokio::sync::mpsc::UnboundedReceiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver::recv(_): Send & Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); -async_assert_fn!(tokio::sync::watch::Receiver>::recv(_): !Send & !Sync); +async_assert_fn!(tokio::sync::watch::Receiver::changed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); diff --git a/tokio/tests/sync_watch.rs b/tokio/tests/sync_watch.rs index 5d550443f5b..9dcb0c530fa 100644 --- a/tokio/tests/sync_watch.rs +++ b/tokio/tests/sync_watch.rs @@ -4,41 +4,41 @@ use tokio::sync::watch; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready}; +use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; #[test] fn single_rx_recv() { let (tx, mut rx) = watch::channel("one"); { - let mut t = spawn(rx.recv()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "one"); + // Not initially notified + let mut t = spawn(rx.changed()); + assert_pending!(t.poll()); } + assert_eq!(*rx.borrow(), "one"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); tx.send("two").unwrap(); assert!(t.is_woken()); - let v = assert_ready!(t.poll()); - assert_eq!(v, "two"); + assert_ready_ok!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t = spawn(rx.recv()); - + let mut t = spawn(rx.changed()); assert_pending!(t.poll()); drop(tx); - let res = assert_ready!(t.poll()); - assert_eq!(res, "two"); + assert!(t.is_woken()); + assert_ready_err!(t.poll()); } + assert_eq!(*rx.borrow(), "two"); } #[test] @@ -47,20 +47,19 @@ fn multi_rx() { let mut rx2 = rx1.clone(); { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); - - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "one"); + assert_pending!(t1.poll()); + assert_pending!(t2.poll()); } + assert_eq!(*rx1.borrow(), "one"); + assert_eq!(*rx2.borrow(), "one"); - let mut t2 = spawn(rx2.recv()); + let mut t2 = spawn(rx2.changed()); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); @@ -70,12 +69,12 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx1.borrow(), "two"); { - let mut t1 = spawn(rx1.recv()); + let mut t1 = spawn(rx1.changed()); assert_pending!(t1.poll()); @@ -84,45 +83,29 @@ fn multi_rx() { assert!(t1.is_woken()); assert!(t2.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "three"); drop(t2); + assert_eq!(*rx2.borrow(), "three"); + { - let mut t1 = spawn(rx1.recv()); - let mut t2 = spawn(rx2.recv()); + let mut t1 = spawn(rx1.changed()); + let mut t2 = spawn(rx2.changed()); assert_pending!(t1.poll()); assert_pending!(t2.poll()); tx.send("four").unwrap(); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - drop(t1); - - let mut t1 = spawn(rx1.recv()); - assert_pending!(t1.poll()); - - drop(tx); - - assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "four"); - - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); - - drop(t2); - let mut t2 = spawn(rx2.recv()); - let res = assert_ready!(t2.poll()); - assert_eq!(res, "four"); + assert_ready_ok!(t1.poll()); + assert_ready_ok!(t2.poll()); } + assert_eq!(*rx1.borrow(), "four"); + assert_eq!(*rx2.borrow(), "four"); } #[test] @@ -133,16 +116,10 @@ fn rx_observes_final_value() { drop(tx); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); - } - - { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "one"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "one"); // Sending a value @@ -151,13 +128,13 @@ fn rx_observes_final_value() { tx.send("two").unwrap(); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "two"); + let mut t1 = spawn(rx.changed()); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "two"); { - let mut t1 = spawn(rx.recv()); + let mut t1 = spawn(rx.changed()); assert_pending!(t1.poll()); tx.send("three").unwrap(); @@ -165,20 +142,20 @@ fn rx_observes_final_value() { assert!(t1.is_woken()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + assert_ready_ok!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); { - let mut t1 = spawn(rx.recv()); - let res = assert_ready!(t1.poll()); - assert_eq!(res, "three"); + let mut t1 = spawn(rx.changed()); + assert_ready_err!(t1.poll()); } + assert_eq!(*rx.borrow(), "three"); } #[test] fn poll_close() { - let (mut tx, rx) = watch::channel("one"); + let (tx, rx) = watch::channel("one"); { let mut t = spawn(tx.closed()); @@ -192,40 +169,3 @@ fn poll_close() { assert!(tx.send("two").is_err()); } - -#[test] -fn stream_impl() { - use tokio::stream::StreamExt; - - let (tx, mut rx) = watch::channel("one"); - - { - let mut t = spawn(rx.next()); - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "one"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - tx.send("two").unwrap(); - - assert!(t.is_woken()); - - let v = assert_ready!(t.poll()).unwrap(); - assert_eq!(v, "two"); - } - - { - let mut t = spawn(rx.next()); - - assert_pending!(t.poll()); - - drop(tx); - - let res = assert_ready!(t.poll()); - assert_eq!(res, Some("two")); - } -} From 20ef28655354ae729a4af2098426a413e3f4d769 Mon Sep 17 00:00:00 2001 From: mental Date: Sat, 12 Sep 2020 10:55:03 +0100 Subject: [PATCH 06/22] sync: add const-constructors for some sync primitives (#2790) Co-authored-by: Mikail Bagishov Co-authored-by: Eliza Weisman Co-authored-by: Alice Ryhl --- tokio/src/loom/std/atomic_usize.rs | 2 +- tokio/src/loom/std/parking_lot.rs | 7 +++++++ tokio/src/sync/batch_semaphore.rs | 21 +++++++++++++++++++++ tokio/src/sync/mutex.rs | 21 +++++++++++++++++++++ tokio/src/util/linked_list.rs | 1 - 5 files changed, 50 insertions(+), 2 deletions(-) diff --git a/tokio/src/loom/std/atomic_usize.rs b/tokio/src/loom/std/atomic_usize.rs index 0fe998f1f9e..0d5f36e4310 100644 --- a/tokio/src/loom/std/atomic_usize.rs +++ b/tokio/src/loom/std/atomic_usize.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicUsize {} unsafe impl Sync for AtomicUsize {} impl AtomicUsize { - pub(crate) fn new(val: usize) -> AtomicUsize { + pub(crate) const fn new(val: usize) -> AtomicUsize { let inner = UnsafeCell::new(std::sync::atomic::AtomicUsize::new(val)); AtomicUsize { inner } } diff --git a/tokio/src/loom/std/parking_lot.rs b/tokio/src/loom/std/parking_lot.rs index 25d94af44f5..41c47ddd31e 100644 --- a/tokio/src/loom/std/parking_lot.rs +++ b/tokio/src/loom/std/parking_lot.rs @@ -26,6 +26,13 @@ impl Mutex { Mutex(parking_lot::Mutex::new(t)) } + #[inline] + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(all(feature = "parking_lot",))))] + pub(crate) const fn const_new(t: T) -> Mutex { + Mutex(parking_lot::const_mutex(t)) + } + #[inline] pub(crate) fn lock(&self) -> LockResult> { Ok(self.0.lock()) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index c48a911c8f3..a1048ca3734 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -123,6 +123,27 @@ impl Semaphore { } } + /// Creates a new semaphore with the initial number of permits + /// + /// Maximum number of permits on 32-bit platforms is `1<<29`. + /// + /// If the specified number of permits exceeds the maximum permit amount + /// Then the value will get clamped to the maximum number of permits. + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + pub(crate) const fn const_new(mut permits: usize) -> Self { + // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925 + // currently we just clamp the permit count when it exceeds the max + permits = permits & Self::MAX_PERMITS; + + Self { + permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), + waiters: Mutex::const_new(Waitlist { + queue: LinkedList::new(), + closed: false, + }), + } + } + /// Returns the current number of available permits pub(crate) fn available_permits(&self) -> usize { self.permits.load(Acquire) >> Self::PERMIT_SHIFT diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index df348457fec..71f0136f005 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -219,6 +219,27 @@ impl Mutex { } } + /// Creates a new lock in an unlocked state ready for use. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Mutex; + /// + /// static LOCK: Mutex = Mutex::const_new(5); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(t: T) -> Self + where + T: Sized, + { + Self { + c: UnsafeCell::new(t), + s: semaphore::Semaphore::const_new(1), + } + } + /// Locks this mutex, causing the current task /// to yield until the lock has been acquired. /// When the lock has been acquired, function returns a [`MutexGuard`]. diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index e6d6c5885c4..940d6af9200 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -72,7 +72,6 @@ unsafe impl Sync for Pointers {} impl LinkedList { /// Creates an empty linked list. - #[allow(dead_code)] // NOTE: This will get removed with: https://github.com/tokio-rs/tokio/pull/2790 pub(crate) const fn new() -> LinkedList { LinkedList { head: None, From 8d2e3bc575f51815ae7319f1e43fe6c7d664e6e4 Mon Sep 17 00:00:00 2001 From: Frank Steffahn Date: Sat, 12 Sep 2020 22:58:58 +0200 Subject: [PATCH 07/22] sync: add const constructors to RwLock, Notify, and Semaphore (#2833) * Add const constructors to `RwLock`, `Notify`, and `Semaphore`. Referring to the types in `tokio::sync`. Also add `const` to `new` for the remaining atomic integers in `src/loom` and `UnsafeCell`. Builds upon previous work in #2790 Closes #2756 --- tokio/src/loom/std/atomic_u16.rs | 2 +- tokio/src/loom/std/atomic_u32.rs | 2 +- tokio/src/loom/std/atomic_u8.rs | 2 +- tokio/src/loom/std/unsafe_cell.rs | 2 +- tokio/src/sync/notify.rs | 18 ++++++++++++++++++ tokio/src/sync/rwlock.rs | 21 +++++++++++++++++++++ tokio/src/sync/semaphore.rs | 9 +++++++++ 7 files changed, 52 insertions(+), 4 deletions(-) diff --git a/tokio/src/loom/std/atomic_u16.rs b/tokio/src/loom/std/atomic_u16.rs index 70390972b4b..c1c531208c2 100644 --- a/tokio/src/loom/std/atomic_u16.rs +++ b/tokio/src/loom/std/atomic_u16.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU16 {} unsafe impl Sync for AtomicU16 {} impl AtomicU16 { - pub(crate) fn new(val: u16) -> AtomicU16 { + pub(crate) const fn new(val: u16) -> AtomicU16 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU16::new(val)); AtomicU16 { inner } } diff --git a/tokio/src/loom/std/atomic_u32.rs b/tokio/src/loom/std/atomic_u32.rs index 6f786c519f1..61f95fb30ce 100644 --- a/tokio/src/loom/std/atomic_u32.rs +++ b/tokio/src/loom/std/atomic_u32.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU32 {} unsafe impl Sync for AtomicU32 {} impl AtomicU32 { - pub(crate) fn new(val: u32) -> AtomicU32 { + pub(crate) const fn new(val: u32) -> AtomicU32 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU32::new(val)); AtomicU32 { inner } } diff --git a/tokio/src/loom/std/atomic_u8.rs b/tokio/src/loom/std/atomic_u8.rs index 4fcd0df3d45..408aea338c6 100644 --- a/tokio/src/loom/std/atomic_u8.rs +++ b/tokio/src/loom/std/atomic_u8.rs @@ -11,7 +11,7 @@ unsafe impl Send for AtomicU8 {} unsafe impl Sync for AtomicU8 {} impl AtomicU8 { - pub(crate) fn new(val: u8) -> AtomicU8 { + pub(crate) const fn new(val: u8) -> AtomicU8 { let inner = UnsafeCell::new(std::sync::atomic::AtomicU8::new(val)); AtomicU8 { inner } } diff --git a/tokio/src/loom/std/unsafe_cell.rs b/tokio/src/loom/std/unsafe_cell.rs index f2b03d8dc2a..66c1d7943e0 100644 --- a/tokio/src/loom/std/unsafe_cell.rs +++ b/tokio/src/loom/std/unsafe_cell.rs @@ -2,7 +2,7 @@ pub(crate) struct UnsafeCell(std::cell::UnsafeCell); impl UnsafeCell { - pub(crate) fn new(data: T) -> UnsafeCell { + pub(crate) const fn new(data: T) -> UnsafeCell { UnsafeCell(std::cell::UnsafeCell::new(data)) } diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 74c69e50cb8..c69e2b07dbf 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -170,6 +170,24 @@ impl Notify { } } + /// Create a new `Notify`, initialized without a permit. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// static NOTIFY: Notify = Notify::const_new(); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new() -> Notify { + Notify { + state: AtomicU8::new(0), + waiters: Mutex::const_new(LinkedList::new()), + } + } + /// Wait for a notification. /// /// Equivalent to: diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 650a7cf607a..840889bace8 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -485,6 +485,27 @@ impl RwLock { } } + /// Creates a new instance of an `RwLock` which is unlocked. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::RwLock; + /// + /// static LOCK: RwLock = RwLock::const_new(5); + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(value: T) -> RwLock + where + T: Sized, + { + RwLock { + c: UnsafeCell::new(value), + s: Semaphore::const_new(MAX_READS), + } + } + /// Locks this rwlock with shared read access, causing the current task /// to yield until the lock has been acquired. /// diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 2489d34aaaf..136f14e62f9 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -74,6 +74,15 @@ impl Semaphore { } } + /// Creates a new semaphore with the initial number of permits. + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn const_new(permits: usize) -> Self { + Self { + ll_sem: ll::Semaphore::const_new(permits), + } + } + /// Returns the current number of available permits. pub fn available_permits(&self) -> usize { self.ll_sem.available_permits() From 4c4699be00bf4aee6218f84ec606e043286a8c2a Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 13 Sep 2020 15:50:40 +0200 Subject: [PATCH 08/22] doc: fix some links (#2834) --- tokio-util/src/lib.rs | 2 +- tokio/src/io/util/async_read_ext.rs | 2 +- tokio/src/net/udp/socket.rs | 4 ++-- tokio/src/sync/mod.rs | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 137be7a3630..24a8af95f67 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index dd280a188d7..0ab66c286d3 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1067,7 +1067,7 @@ cfg_io_util! { /// (See also the [`crate::fs::read_to_string`] convenience function for /// reading from a file.) /// - /// [`crate::fs::read_to_string`]: crate::fs::read_to_string::read_to_string + /// [`crate::fs::read_to_string`]: fn@crate::fs::read_to_string fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin, diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index f9d88372035..aeb25fb35b6 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -125,7 +125,7 @@ impl UdpSocket { /// should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } @@ -209,7 +209,7 @@ impl UdpSocket { /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io.get_ref().send_to(buf, &target) } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 522205a0029..6dce9a7b8ce 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -397,8 +397,8 @@ //! } //! ``` //! -//! [`watch` channel]: crate::sync::watch -//! [`broadcast` channel]: crate::sync::broadcast +//! [`watch` channel]: mod@crate::sync::watch +//! [`broadcast` channel]: mod@crate::sync::broadcast //! //! # State synchronization //! From 3fd043931e6d37f211e682980edc6e12e9d4fc54 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Thu, 17 Sep 2020 08:03:38 +0200 Subject: [PATCH 09/22] sync: fix some doc typos (#2838) Fixes #2781. --- tokio/src/sync/broadcast.rs | 2 +- tokio/src/sync/mod.rs | 2 +- tokio/src/sync/semaphore.rs | 2 +- tokio/src/sync/semaphore_ll.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 9b3de5309ac..9484e1307b6 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -793,7 +793,7 @@ where /// This is useful for a flavor of "optimistic check" before deciding to /// await on a receiver. /// - /// Compared with [`recv`], this function has three failure cases instead of one + /// Compared with [`recv`], this function has three failure cases instead of two /// (one for closed, one for an empty buffer, one for a lagging receiver). /// /// `Err(TryRecvError::Closed)` is returned when all `Sender` halves have diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 6dce9a7b8ce..1a584383b83 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -20,7 +20,7 @@ //! few flavors of channels provided by Tokio. Each channel flavor supports //! different message passing patterns. When a channel supports multiple //! producers, many separate tasks may **send** messages. When a channel -//! supports muliple consumers, many different separate tasks may **receive** +//! supports multiple consumers, many different separate tasks may **receive** //! messages. //! //! Tokio provides many different channel flavors as different message passing diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 136f14e62f9..4f8b002656b 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,7 +1,7 @@ use super::batch_semaphore as ll; // low level implementation use std::sync::Arc; -/// Counting semaphore performing asynchronous permit aquisition. +/// Counting semaphore performing asynchronous permit acquisition. /// /// A semaphore maintains a set of permits. Permits are used to synchronize /// access to a shared resource. A semaphore differs from a mutex in that it diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 25d25ac88ab..f044095f8fc 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -910,7 +910,7 @@ impl Waiter { } /// Try to decrement the number of permits to acquire. This returns the - /// actual number of permits that were decremented. The delta betweeen `n` + /// actual number of permits that were decremented. The delta between `n` /// and the return has been assigned to the permit and the caller must /// assign these back to the semaphore. fn try_dec_permits_to_acquire(&self, n: usize) -> usize { From 207320dbbbacb4e7c8c12f6e94f5e78eb0f055a5 Mon Sep 17 00:00:00 2001 From: Nylonicious <50183564+nylonicious@users.noreply.github.com> Date: Sat, 19 Sep 2020 00:49:13 +0200 Subject: [PATCH 10/22] process: fix some docs (#2843) * fix docs for Command::status and output Co-authored-by: Alice Ryhl --- tokio/src/process/mod.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index 9c19c72ea97..7da9783be34 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -103,7 +103,7 @@ //! paradigm of dropping-implies-cancellation, a spawned process will, by //! default, continue to execute even after the `Child` handle has been dropped. //! -//! The `Command::kill_on_drop` method can be used to modify this behavior +//! The [`Command::kill_on_drop`] method can be used to modify this behavior //! and kill the child process if the `Child` wrapper is dropped before it //! has exited. //! @@ -581,8 +581,10 @@ impl Command { /// All I/O this child does will be associated with the current default /// event loop. /// - /// If this future is dropped before the future resolves, then - /// the child will be killed, if it was spawned. + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop /// /// # Errors /// @@ -602,6 +604,7 @@ impl Command { /// .await /// .expect("ls command failed to run") /// } + /// ``` pub fn status(&mut self) -> impl Future> { let child = self.spawn(); @@ -637,8 +640,10 @@ impl Command { /// All I/O this child does will be associated with the current default /// event loop. /// - /// If this future is dropped before the future resolves, then - /// the child will be killed, if it was spawned. + /// The destructor of the future returned by this function will kill + /// the child if [`kill_on_drop`] is set to true. + /// + /// [`kill_on_drop`]: fn@Self::kill_on_drop /// /// # Examples /// @@ -654,6 +659,7 @@ impl Command { /// .expect("ls command failed to run"); /// println!("stderr of ls: {:?}", output.stderr); /// } + /// ``` pub fn output(&mut self) -> impl Future> { self.std.stdout(Stdio::piped()); self.std.stderr(Stdio::piped()); From 68f7eff39e5adf2246770d7239ed83731297d1f3 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 19 Sep 2020 20:40:03 +0900 Subject: [PATCH 11/22] time: remove outdated todo comment (#2848) --- tokio/src/time/interval.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/time/interval.rs b/tokio/src/time/interval.rs index 6a97f59b5aa..52e081ce463 100644 --- a/tokio/src/time/interval.rs +++ b/tokio/src/time/interval.rs @@ -159,7 +159,6 @@ impl Interval { /// // approximately 20ms have elapsed. /// } /// ``` - #[allow(clippy::should_implement_trait)] // TODO: rename (tokio-rs/tokio#1261) pub async fn tick(&mut self) -> Instant { poll_fn(|cx| self.poll_tick(cx)).await } From 111894fef9cbd5c87b3e3508d0d983e1aedfe397 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 19 Sep 2020 20:40:20 +0900 Subject: [PATCH 12/22] util: remove Slice wrapper (#2847) --- tokio-util/tests/framed_read.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/tokio-util/tests/framed_read.rs b/tokio-util/tests/framed_read.rs index da38c432326..52c30ef980e 100644 --- a/tokio-util/tests/framed_read.rs +++ b/tokio-util/tests/framed_read.rs @@ -185,8 +185,8 @@ fn read_partial_would_block_then_err() { #[test] fn huge_size() { let mut task = task::spawn(()); - let data = [0; 32 * 1024]; - let mut framed = FramedRead::new(Slice(&data[..]), BigDecoder); + let data = &[0; 32 * 1024][..]; + let mut framed = FramedRead::new(data, BigDecoder); task.enter(|cx, _| { assert_read!(pin!(framed).poll_next(cx), 0); @@ -212,7 +212,7 @@ fn huge_size() { #[test] fn data_remaining_is_error() { let mut task = task::spawn(()); - let slice = Slice(&[0; 5]); + let slice = &[0; 5][..]; let mut framed = FramedRead::new(slice, U32Decoder); task.enter(|cx, _| { @@ -280,16 +280,3 @@ impl AsyncRead for Mock { } } } - -// TODO this newtype is necessary because `&[u8]` does not currently implement `AsyncRead` -struct Slice<'a>(&'a [u8]); - -impl AsyncRead for Slice<'_> { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } -} From ba8680d66773677f201a26b756e41e9a16b458fe Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sat, 19 Sep 2020 20:40:37 +0900 Subject: [PATCH 13/22] io: fix doc-cfg on AsyncSeekExt (#2846) --- tokio/src/io/seek.rs | 14 ++-- tokio/src/io/util/async_seek_ext.rs | 102 ++++++++++++++-------------- 2 files changed, 60 insertions(+), 56 deletions(-) diff --git a/tokio/src/io/seek.rs b/tokio/src/io/seek.rs index e3b5bf6b6f4..8f071167f6c 100644 --- a/tokio/src/io/seek.rs +++ b/tokio/src/io/seek.rs @@ -4,12 +4,14 @@ use std::io::{self, SeekFrom}; use std::pin::Pin; use std::task::{Context, Poll}; -/// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Seek<'a, S: ?Sized> { - seek: &'a mut S, - pos: Option, +cfg_io_util! { + /// Future for the [`seek`](crate::io::AsyncSeekExt::seek) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Seek<'a, S: ?Sized> { + seek: &'a mut S, + pos: Option, + } } pub(crate) fn seek(seek: &mut S, pos: SeekFrom) -> Seek<'_, S> diff --git a/tokio/src/io/util/async_seek_ext.rs b/tokio/src/io/util/async_seek_ext.rs index c7a0f72fb81..35bc94ee34d 100644 --- a/tokio/src/io/util/async_seek_ext.rs +++ b/tokio/src/io/util/async_seek_ext.rs @@ -2,65 +2,67 @@ use crate::io::seek::{seek, Seek}; use crate::io::AsyncSeek; use std::io::SeekFrom; -/// An extension trait which adds utility methods to [`AsyncSeek`] types. -/// -/// As a convenience, this trait may be imported using the [`prelude`]: -/// -/// # Examples -/// -/// ``` -/// use std::io::{Cursor, SeekFrom}; -/// use tokio::prelude::*; -/// -/// #[tokio::main] -/// async fn main() -> io::Result<()> { -/// let mut cursor = Cursor::new(b"abcdefg"); -/// -/// // the `seek` method is defined by this trait -/// cursor.seek(SeekFrom::Start(3)).await?; -/// -/// let mut buf = [0; 1]; -/// let n = cursor.read(&mut buf).await?; -/// assert_eq!(n, 1); -/// assert_eq!(buf, [b'd']); -/// -/// Ok(()) -/// } -/// ``` -/// -/// See [module][crate::io] documentation for more details. -/// -/// [`AsyncSeek`]: AsyncSeek -/// [`prelude`]: crate::prelude -pub trait AsyncSeekExt: AsyncSeek { - /// Creates a future which will seek an IO object, and then yield the - /// new position in the object and the object itself. +cfg_io_util! { + /// An extension trait which adds utility methods to [`AsyncSeek`] types. /// - /// In the case of an error the buffer and the object will be discarded, with - /// the error yielded. + /// As a convenience, this trait may be imported using the [`prelude`]: /// /// # Examples /// - /// ```no_run - /// use tokio::fs::File; + /// ``` + /// use std::io::{Cursor, SeekFrom}; /// use tokio::prelude::*; /// - /// use std::io::SeekFrom; + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let mut cursor = Cursor::new(b"abcdefg"); + /// + /// // the `seek` method is defined by this trait + /// cursor.seek(SeekFrom::Start(3)).await?; /// - /// # async fn dox() -> std::io::Result<()> { - /// let mut file = File::open("foo.txt").await?; - /// file.seek(SeekFrom::Start(6)).await?; + /// let mut buf = [0; 1]; + /// let n = cursor.read(&mut buf).await?; + /// assert_eq!(n, 1); + /// assert_eq!(buf, [b'd']); /// - /// let mut contents = vec![0u8; 10]; - /// file.read_exact(&mut contents).await?; - /// # Ok(()) - /// # } + /// Ok(()) + /// } /// ``` - fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> - where - Self: Unpin, - { - seek(self, pos) + /// + /// See [module][crate::io] documentation for more details. + /// + /// [`AsyncSeek`]: AsyncSeek + /// [`prelude`]: crate::prelude + pub trait AsyncSeekExt: AsyncSeek { + /// Creates a future which will seek an IO object, and then yield the + /// new position in the object and the object itself. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::fs::File; + /// use tokio::prelude::*; + /// + /// use std::io::SeekFrom; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut file = File::open("foo.txt").await?; + /// file.seek(SeekFrom::Start(6)).await?; + /// + /// let mut contents = vec![0u8; 10]; + /// file.read_exact(&mut contents).await?; + /// # Ok(()) + /// # } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> + where + Self: Unpin, + { + seek(self, pos) + } } } From 2b96b1773dd31ff923de4bd8adac2adbac447399 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 21 Sep 2020 18:57:27 +0200 Subject: [PATCH 14/22] ci: update nightly and fix all sorts of new failures (#2852) * ci: update miri flags * ci: fix doc warnings * doc: fix some links Cherry-pick of 18ed761 from #2834 * ci: cherry-pick 00a2849 From: #2793 * ci: cherry-pick 6b61212 From: #2793 Co-authored-by: Blas Rodriguez Irizar --- .github/workflows/ci.yml | 4 ++-- tokio-macros/src/lib.rs | 2 +- tokio-test/src/lib.rs | 2 +- tokio-util/src/lib.rs | 2 +- tokio/src/io/util/async_read_ext.rs | 2 +- tokio/src/lib.rs | 3 +-- tokio/src/net/udp/socket.rs | 4 ++-- tokio/src/sync/mod.rs | 4 ++-- tokio/src/time/delay_queue.rs | 28 ++++++++++++++-------------- 9 files changed, 25 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ecc27a9168f..319fb0de1cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ name: CI env: RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 - nightly: nightly-2020-07-12 + nightly: nightly-2020-09-21 minrust: 1.39.0 jobs: @@ -110,7 +110,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + run: cargo miri test --features rt-core,rt-threaded,rt-util,sync task working-directory: tokio cross: diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 1fd445d877c..6cdffdb03e3 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 185f317b973..69b8362c4e5 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -5,7 +5,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 260c9485c26..e888e3e3e3b 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index dd280a188d7..0ab66c286d3 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1067,7 +1067,7 @@ cfg_io_util! { /// (See also the [`crate::fs::read_to_string`] convenience function for /// reading from a file.) /// - /// [`crate::fs::read_to_string`]: crate::fs::read_to_string::read_to_string + /// [`crate::fs::read_to_string`]: fn@crate::fs::read_to_string fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin, diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 88707c4d1c6..11a11195094 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -10,13 +10,12 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, feature(doc_alias))] //! A runtime for writing reliable, asynchronous, and slim applications. //! diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 16e537739d4..00b00d8d94f 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -125,7 +125,7 @@ impl UdpSocket { /// should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } @@ -209,7 +209,7 @@ impl UdpSocket { /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io.get_ref().send_to(buf, &target) } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 3d96106d2df..f93ff7bb873 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -399,8 +399,8 @@ //! } //! ``` //! -//! [`watch` channel]: crate::sync::watch -//! [`broadcast` channel]: crate::sync::broadcast +//! [`watch` channel]: mod@crate::sync::watch +//! [`broadcast` channel]: mod@crate::sync::broadcast //! //! # State synchronization //! diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index a947cc6fe4e..b02153b9e0f 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -27,13 +27,13 @@ use std::task::{self, Poll}; /// which it should be yielded back. /// /// Once delays have been configured, the `DelayQueue` is used via its -/// [`Stream`] implementation. [`poll`] is called. If an entry has reached its +/// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its /// deadline, it is returned. If not, `Poll::Pending` indicating that the /// current task will be notified once the deadline has been reached. /// /// # `Stream` implementation /// -/// Items are retrieved from the queue via [`Stream::poll`]. If no delays have +/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have /// expired, no items are returned. In this case, `NotReady` is returned and the /// current task is registered to be notified once the next item's delay has /// expired. @@ -115,8 +115,8 @@ use std::task::{self, Poll}; /// [`insert_at`]: method@Self::insert_at /// [`Key`]: struct@Key /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html -/// [`poll`]: method@Self::poll -/// [`Stream::poll`]: method@Self::poll +/// [`poll_expired`]: method@Self::poll_expired +/// [`Stream::poll_expired`]: method@Self::poll_expired /// [`DelayQueue`]: struct@DelayQueue /// [`delay_for`]: fn@super::delay_for /// [`slab`]: slab @@ -146,9 +146,9 @@ pub struct DelayQueue { /// An entry in `DelayQueue` that has expired and removed. /// -/// Values are returned by [`DelayQueue::poll`]. +/// Values are returned by [`DelayQueue::poll_expired`]. /// -/// [`DelayQueue::poll`]: method@DelayQueue::poll +/// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired #[derive(Debug)] pub struct Expired { /// The data stored in the queue @@ -260,12 +260,12 @@ impl DelayQueue { /// of a `Duration`. /// /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll`]. If `when` has already been + /// `value` will be returned from [`poll_expired`]. If `when` has already been /// reached, then `value` is immediately made available to poll. /// /// The return value represents the insertion and is used at an argument to /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll`] after + /// `value` is removed from the queue either by calling [`poll_expired`] after /// `when` is reached or by calling [`remove`]. At this point, the caller /// must take care to not use the returned [`Key`] again as it may reference /// a different item in the queue. @@ -295,7 +295,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -367,12 +367,12 @@ impl DelayQueue { /// instead of an `Instant`. /// /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll`]. If `when` has already been + /// `value` will be returned from [`poll_expired`]. If `when` has already been /// reached, then `value` is immediately made available to poll. /// /// The return value represents the insertion and is used at an argument to /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll`] after + /// `value` is removed from the queue either by calling [`poll_expired`] after /// `when` is reached or by calling [`remove`]. At this point, the caller /// must take care to not use the returned [`Key`] again as it may reference /// a different item in the queue. @@ -403,7 +403,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -578,11 +578,11 @@ impl DelayQueue { /// Clears the queue, removing all items. /// - /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`. + /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. /// /// Note that this method has no effect on the allocated capacity. /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// /// # Examples /// From 1ac10fa80aa847c68e376ebf9ef4e2be891e41d3 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 21 Sep 2020 18:57:33 +0200 Subject: [PATCH 15/22] ci: update miri flags (#2851) * ci: update miri flags * Update 2020-09-20 to 2020-09-21 Co-authored-by: Taiki Endo Co-authored-by: Taiki Endo --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a2ae44af522..61244f90e18 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ name: CI env: RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 - nightly: nightly-2020-07-12 + nightly: nightly-2020-09-21 minrust: 1.45.2 jobs: @@ -110,7 +110,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + run: cargo miri test --features rt-core,rt-threaded,rt-util,sync task working-directory: tokio cross: From c0c7124a4b01892727cd023a651ebb90bcd2e2ad Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 21 Sep 2020 14:29:22 -0700 Subject: [PATCH 16/22] sync: fix missing notification during mpsc close (#2854) When the mpsc channel receiver closes the channel, receiving should return `None` once all in-progress sends have completed. When a sender reserves capacity, this prevents the receiver from fully shutting down. Previously, when the sender, after reserving capacity, dropped without sending a message, the receiver was not notified. This results in blocking the shutdown process until all sender handles drop. This patch adds a receiver notification when the channel is both closed and all outstanding sends have completed. --- tokio/src/sync/mpsc/chan.rs | 22 +++++++++++++++++----- tokio/src/sync/semaphore_ll.rs | 2 +- tokio/tests/sync_mpsc.rs | 22 ++++++++++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad766..0a53cda2038 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,7 +75,11 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; @@ -192,7 +196,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +238,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 25d25ac88ab..f044095f8fc 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -910,7 +910,7 @@ impl Waiter { } /// Try to decrement the number of permits to acquire. This returns the - /// actual number of permits that were decremented. The delta betweeen `n` + /// actual number of permits that were decremented. The delta between `n` /// and the return has been assigned to the permit and the caller must /// assign these back to the semaphore. fn try_dec_permits_to_acquire(&self, n: usize) -> usize { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f02d90aa56d..f4966c31377 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -490,3 +490,25 @@ fn try_recv_unbounded() { _ => panic!(), } } + +#[test] +fn ready_close_cancel_bounded() { + use futures::future::poll_fn; + + let (mut tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + { + let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); + assert_ready_ok!(ready.poll()); + } + + rx.close(); + + let mut recv = task::spawn(async { rx.recv().await }); + assert_pending!(recv.poll()); + + drop(tx); + + assert!(recv.is_woken()); +} From 93f8cb8df2bb0dceb7921556165a8ed8efed9151 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 21 Sep 2020 14:29:22 -0700 Subject: [PATCH 17/22] sync: fix missing notification during mpsc close (#2854) When the mpsc channel receiver closes the channel, receiving should return `None` once all in-progress sends have completed. When a sender reserves capacity, this prevents the receiver from fully shutting down. Previously, when the sender, after reserving capacity, dropped without sending a message, the receiver was not notified. This results in blocking the shutdown process until all sender handles drop. This patch adds a receiver notification when the channel is both closed and all outstanding sends have completed. --- tokio/src/sync/mpsc/chan.rs | 22 +++++++++++++++++----- tokio/tests/sync_mpsc.rs | 22 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad766..0a53cda2038 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,7 +75,11 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; @@ -192,7 +196,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +238,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f571a71caf8..919bddbfb18 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -534,3 +534,25 @@ async fn blocking_send_async() { let (mut tx, _rx) = mpsc::channel::<()>(1); let _ = tx.blocking_send(()); } + +#[test] +fn ready_close_cancel_bounded() { + use futures::future::poll_fn; + + let (mut tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + { + let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); + assert_ready_ok!(ready.poll()); + } + + rx.close(); + + let mut recv = task::spawn(async { rx.recv().await }); + assert_pending!(recv.poll()); + + drop(tx); + + assert!(recv.is_woken()); +} From e7091fde786722a5301270e6281fc3c449dcfc14 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 18:12:57 +0300 Subject: [PATCH 18/22] sync: Remove readiness assertion in `watch::Receiver::changed() (#2839) *In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario. --- tokio/src/sync/notify.rs | 33 ++++++++++++++++++------------ tokio/src/sync/tests/loom_watch.rs | 22 +++++++++++++++++--- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c69e2b07dbf..56bbc51bf28 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -106,6 +106,14 @@ pub struct Notify { waiters: Mutex, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers @@ -115,7 +123,7 @@ struct Waiter { waker: Option, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option, /// Should not be `Unpin`. _p: PhantomPinned, @@ -230,7 +238,7 @@ impl Notify { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } @@ -327,9 +335,9 @@ impl Notify { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { waker.wake(); @@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -582,14 +590,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 7944cab8d24..c575b5b66c5 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -6,14 +6,30 @@ use loom::thread; #[test] fn smoke() { loom::model(|| { - let (tx, mut rx) = watch::channel(1); + let (tx, mut rx1) = watch::channel(1); + let mut rx2 = rx1.clone(); + let mut rx3 = rx1.clone(); + let mut rx4 = rx1.clone(); + let mut rx5 = rx1.clone(); let th = thread::spawn(move || { tx.send(2).unwrap(); }); - block_on(rx.changed()).unwrap(); - assert_eq!(*rx.borrow(), 2); + block_on(rx1.changed()).unwrap(); + assert_eq!(*rx1.borrow(), 2); + + block_on(rx2.changed()).unwrap(); + assert_eq!(*rx2.borrow(), 2); + + block_on(rx3.changed()).unwrap(); + assert_eq!(*rx3.borrow(), 2); + + block_on(rx4.changed()).unwrap(); + assert_eq!(*rx4.borrow(), 2); + + block_on(rx5.changed()).unwrap(); + assert_eq!(*rx5.borrow(), 2); th.join().unwrap(); }) From 6866b24ca1a89fb6e7dbc63e20d8a66ee60a85b8 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 23 Sep 2020 05:55:35 +0900 Subject: [PATCH 19/22] ci: deny warnings on '--cfg tokio_unstable' tests (#2859) --- .github/workflows/ci.yml | 6 +++--- tokio/src/runtime/builder.rs | 4 ++-- tokio/src/runtime/tests/loom_pool.rs | 2 +- tokio/src/util/bit.rs | 6 ------ 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 61244f90e18..756a7677b9f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,7 +91,7 @@ jobs: run: cargo test --features full working-directory: tokio env: - RUSTFLAGS: '--cfg tokio_unstable' + RUSTFLAGS: --cfg tokio_unstable -Dwarnings miri: name: miri @@ -156,7 +156,7 @@ jobs: - name: check --each-feature --unstable run: cargo hack check --all --each-feature -Z avoid-dev-deps env: - RUSTFLAGS: --cfg tokio_unstable + RUSTFLAGS: --cfg tokio_unstable -Dwarnings minrust: name: minrust @@ -239,6 +239,6 @@ jobs: run: cargo test --lib --release --features full -- --nocapture $SCOPE working-directory: tokio env: - RUSTFLAGS: --cfg loom --cfg tokio_unstable + RUSTFLAGS: --cfg loom --cfg tokio_unstable -Dwarnings LOOM_MAX_PREEMPTIONS: 2 SCOPE: ${{ matrix.scope }} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index ed2cd251c35..42aed3e90b0 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::{Arc, Mutex}; +use crate::loom::sync::Mutex; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; @@ -330,7 +330,7 @@ impl Builder { where F: Fn() + Send + Sync + 'static, { - self.before_stop = Some(Arc::new(f)); + self.before_stop = Some(std::sync::Arc::new(f)); self } diff --git a/tokio/src/runtime/tests/loom_pool.rs b/tokio/src/runtime/tests/loom_pool.rs index ed484846311..47ee1981086 100644 --- a/tokio/src/runtime/tests/loom_pool.rs +++ b/tokio/src/runtime/tests/loom_pool.rs @@ -209,7 +209,7 @@ mod group_b { #[test] fn complete_block_on_under_load() { loom::model(|| { - let mut pool = mk_pool(1); + let pool = mk_pool(1); pool.block_on(async { // Trigger a re-schedule diff --git a/tokio/src/util/bit.rs b/tokio/src/util/bit.rs index ee756044a50..cf3cb196884 100644 --- a/tokio/src/util/bit.rs +++ b/tokio/src/util/bit.rs @@ -22,12 +22,6 @@ impl Pack { Pack { mask, shift } } - /// Mask used to unpack value - #[cfg(all(test, loom))] - pub(crate) const fn mask(&self) -> usize { - self.mask - } - /// Width, in bits, dedicated to storing the value. pub(crate) const fn width(&self) -> u32 { pointer_width() - (self.mask >> self.shift).leading_zeros() From cb8f2ceb2eaab72800ecee60a89cbff73c6c79de Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Wed, 23 Sep 2020 05:55:58 +0900 Subject: [PATCH 20/22] chore: remove unused future/pending.rs (#2860) --- tokio/src/future/pending.rs | 44 ------------------------------------- 1 file changed, 44 deletions(-) delete mode 100644 tokio/src/future/pending.rs diff --git a/tokio/src/future/pending.rs b/tokio/src/future/pending.rs deleted file mode 100644 index 287e836fd3c..00000000000 --- a/tokio/src/future/pending.rs +++ /dev/null @@ -1,44 +0,0 @@ -use sdt::pin::Pin; -use std::future::Future; -use std::marker; -use std::task::{Context, Poll}; - -/// Future for the [`pending()`] function. -#[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct Pending { - _data: marker::PhantomData, -} - -/// Creates a future which never resolves, representing a computation that never -/// finishes. -/// -/// The returned future will forever return [`Poll::Pending`]. -/// -/// # Examples -/// -/// ```no_run -/// use tokio::future; -/// -/// #[tokio::main] -/// async fn main { -/// future::pending().await; -/// unreachable!(); -/// } -/// ``` -pub async fn pending() -> ! { - Pending { - _data: marker::PhantomData, - } - .await -} - -impl Future for Pending { - type Output = !; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Pending - } -} - -impl Unpin for Pending {} From e09b90ea32385337170ce17eb55ab372f9388af5 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Tue, 22 Sep 2020 22:56:45 +0200 Subject: [PATCH 21/22] macros: add #[allow(unused_mut)] to select! (#2858) --- tokio/src/macros/select.rs | 1 + tokio/tests/macros_select.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/tokio/src/macros/select.rs b/tokio/src/macros/select.rs index 6497a510e98..ffca0027269 100644 --- a/tokio/src/macros/select.rs +++ b/tokio/src/macros/select.rs @@ -404,6 +404,7 @@ macro_rules! select { // The future returned a value, check if matches // the specified pattern. #[allow(unused_variables)] + #[allow(unused_mut)] match &out { $bind => {} _ => continue, diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 2ebc4efc1ac..a6c8f8f5ec5 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -462,3 +462,20 @@ async fn async_never() -> ! { tokio::time::delay_for(Duration::from_millis(10)).await; } } + +// From https://github.com/tokio-rs/tokio/issues/2857 +#[tokio::test] +async fn mut_on_left_hand_side() { + let v = async move { + let ok = async { 1 }; + tokio::pin!(ok); + tokio::select! { + mut a = &mut ok => { + a += 1; + a + } + } + } + .await; + assert_eq!(v, 2); +} From 7ae5b7bd4f93612f91ab504ffb63aa8241c1d7bb Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Tue, 22 Sep 2020 15:40:44 -0700 Subject: [PATCH 22/22] signal: move driver to runtime thread (#2835) Refactors the signal infrastructure to move the driver to the runtime thread. This follows the model put forth by the I/O driver and time driver. --- benches/Cargo.toml | 8 ++ benches/signal.rs | 96 ++++++++++++++++ tokio/src/io/registration.rs | 13 ++- tokio/src/runtime/builder.rs | 47 ++++---- tokio/src/runtime/context.rs | 16 ++- tokio/src/runtime/driver.rs | 196 ++++++++++++++++++++++++++++++++ tokio/src/runtime/handle.rs | 11 +- tokio/src/runtime/io.rs | 63 ---------- tokio/src/runtime/mod.rs | 11 +- tokio/src/runtime/park.rs | 10 +- tokio/src/runtime/shell.rs | 8 +- tokio/src/runtime/time.rs | 59 ---------- tokio/src/signal/unix.rs | 81 ++----------- tokio/src/signal/unix/driver.rs | 153 +++++++++++++++++++++++++ 14 files changed, 532 insertions(+), 240 deletions(-) create mode 100644 benches/signal.rs create mode 100644 tokio/src/runtime/driver.rs delete mode 100644 tokio/src/runtime/io.rs delete mode 100644 tokio/src/runtime/time.rs create mode 100644 tokio/src/signal/unix/driver.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 3a90943f059..cca0ece5eb8 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -8,6 +8,9 @@ edition = "2018" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } bencher = "0.1.5" +[target.'cfg(unix)'.dependencies] +libc = "0.2.42" + [[bench]] name = "spawn" path = "spawn.rs" @@ -33,3 +36,8 @@ harness = false name = "sync_semaphore" path = "sync_semaphore.rs" harness = false + +[[bench]] +name = "signal" +path = "signal.rs" +harness = false diff --git a/benches/signal.rs b/benches/signal.rs new file mode 100644 index 00000000000..d891326de0d --- /dev/null +++ b/benches/signal.rs @@ -0,0 +1,96 @@ +//! Benchmark the delay in propagating OS signals to any listeners. +#![cfg(unix)] + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::runtime; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::sync::mpsc; + +struct Spinner { + count: usize, +} + +impl Future for Spinner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.count > 3 { + Poll::Ready(()) + } else { + self.count += 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +impl Spinner { + fn new() -> Self { + Self { count: 0 } + } +} + +pub fn send_signal(signal: libc::c_int) { + use libc::{getpid, kill}; + + unsafe { + assert_eq!(kill(getpid(), signal), 0); + } +} + +fn many_signals(bench: &mut Bencher) { + let num_signals = 10; + let (tx, mut rx) = mpsc::channel(num_signals); + + let rt = runtime::Builder::new() + // Intentionally single threaded to measure delays in propagating wakes + .basic_scheduler() + .enable_all() + .build() + .unwrap(); + + let spawn_signal = |kind| { + let mut tx = tx.clone(); + rt.spawn(async move { + let mut signal = signal(kind).expect("failed to create signal"); + + while signal.recv().await.is_some() { + if tx.send(()).await.is_err() { + break; + } + } + }); + }; + + for _ in 0..num_signals { + // Pick some random signals which don't terminate the test harness + spawn_signal(SignalKind::child()); + spawn_signal(SignalKind::io()); + } + drop(tx); + + // Turn the runtime for a while to ensure that all the spawned + // tasks have been polled at least once + rt.block_on(Spinner::new()); + + bench.iter(|| { + rt.block_on(async { + send_signal(libc::SIGCHLD); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + + send_signal(libc::SIGIO); + for _ in 0..num_signals { + rx.recv().await.expect("channel closed"); + } + }); + }); +} + +benchmark_group!(signal_group, many_signals,); + +benchmark_main!(signal_group); diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 8206507280d..b805d2b98c6 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -109,7 +109,18 @@ impl Registration { where T: Evented, { - let handle = Handle::current(); + Self::new_with_ready_and_handle(io, ready, Handle::current()) + } + + /// Same as `new_with_ready` but also accepts an explicit handle. + pub(crate) fn new_with_ready_and_handle( + io: &T, + ready: mio::Ready, + handle: Handle, + ) -> io::Result + where + T: Evented, + { let shared = if let Some(inner) = handle.inner() { inner.add_source(io, ready)? } else { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 42aed3e90b0..4072b04e465 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,7 +1,7 @@ use crate::loom::sync::Mutex; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; -use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; +use crate::runtime::{blocking, driver, io, Callback, Runtime, Spawner}; use std::fmt; #[cfg(feature = "blocking")] @@ -359,14 +359,17 @@ impl Builder { } } + fn get_cfg(&self) -> driver::Cfg { + driver::Cfg { + enable_io: self.enable_io, + enable_time: self.enable_time, + } + } + fn build_shell_runtime(&mut self) -> io::Result { use crate::runtime::Kind; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; let spawner = Spawner::Shell; @@ -377,9 +380,10 @@ impl Builder { kind: Kind::Shell(Mutex::new(Some(Shell::new(driver)))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -478,12 +482,7 @@ cfg_rt_core! { fn build_basic_runtime(&mut self) -> io::Result { use crate::runtime::{BasicScheduler, Kind}; - let clock = time::create_clock(); - - // Create I/O driver - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or @@ -500,9 +499,10 @@ cfg_rt_core! { kind: Kind::Basic(Mutex::new(Some(scheduler))), handle: Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }, blocking_pool, @@ -533,10 +533,8 @@ cfg_rt_threaded! { let core_threads = self.core_threads.unwrap_or_else(|| cmp::min(self.max_threads, num_cpus())); assert!(core_threads <= self.max_threads, "Core threads number cannot be above max limit"); - let clock = time::create_clock(); + let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (io_driver, io_handle) = io::create_driver(self.enable_io)?; - let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); @@ -547,9 +545,10 @@ cfg_rt_threaded! { // Create the runtime handle let handle = Handle { spawner, - io_handle, - time_handle, - clock, + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, blocking_spawner, }; diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index c42b3432dea..7498175f3a7 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -14,7 +14,7 @@ cfg_blocking_impl! { } cfg_io_driver! { - pub(crate) fn io_handle() -> crate::runtime::io::Handle { + pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.io_handle.clone(), None => Default::default(), @@ -22,8 +22,18 @@ cfg_io_driver! { } } +cfg_signal! { + #[cfg(unix)] + pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { + CONTEXT.with(|ctx| match *ctx.borrow() { + Some(ref ctx) => ctx.signal_handle.clone(), + None => Default::default(), + }) + } +} + cfg_time! { - pub(crate) fn time_handle() -> crate::runtime::time::Handle { + pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => ctx.time_handle.clone(), None => Default::default(), @@ -31,7 +41,7 @@ cfg_time! { } cfg_test_util! { - pub(crate) fn clock() -> Option { + pub(crate) fn clock() -> Option { CONTEXT.with(|ctx| match *ctx.borrow() { Some(ref ctx) => Some(ctx.clock.clone()), None => None, diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs new file mode 100644 index 00000000000..2ea5089022d --- /dev/null +++ b/tokio/src/runtime/driver.rs @@ -0,0 +1,196 @@ +//! Abstracts out the entire chain of runtime sub-drivers into common types. +use crate::park::{Park, ParkThread}; +use std::io; +use std::time::Duration; + +// ===== io driver ===== + +cfg_io_driver! { + type IoDriver = crate::park::Either; + pub(crate) type IoHandle = Option; + + fn create_io_driver(enable: bool) -> io::Result<(IoDriver, IoHandle)> { + use crate::park::Either; + + #[cfg(loom)] + assert!(!enable); + + if enable { + let driver = crate::io::driver::Driver::new()?; + let handle = driver.handle(); + + Ok((Either::A(driver), Some(handle))) + } else { + let driver = ParkThread::new(); + Ok((Either::B(driver), None)) + } + } +} + +cfg_not_io_driver! { + type IoDriver = ParkThread; + pub(crate) type IoHandle = (); + + fn create_io_driver(_enable: bool) -> io::Result<(IoDriver, IoHandle)> { + let driver = ParkThread::new(); + Ok((driver, ())) + } +} + +// ===== signal driver ===== + +macro_rules! cfg_unix_and_signal { + ($($item:item)*) => { + $( + #[cfg(all(not(loom), unix, feature = "signal"))] + $item + )* + } +} + +macro_rules! cfg_neither_unix_nor_windows { + ($($item:item)*) => { + $( + #[cfg(any(loom, not(all(unix, feature = "signal"))))] + $item + )* + } +} + +cfg_unix_and_signal! { + type SignalDriver = crate::park::Either; + pub(crate) type SignalHandle = Option; + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + use crate::park::Either; + + // Enable the signal driver if IO is also enabled + match io_driver { + Either::A(io_driver) => { + let driver = crate::signal::unix::driver::Driver::new(io_driver)?; + let handle = driver.handle(); + Ok((Either::A(driver), Some(handle))) + } + Either::B(_) => Ok((Either::B(io_driver), None)), + } + } +} + +cfg_neither_unix_nor_windows! { + type SignalDriver = IoDriver; + pub(crate) type SignalHandle = (); + + fn create_signal_driver(io_driver: IoDriver) -> io::Result<(SignalDriver, SignalHandle)> { + Ok((io_driver, ())) + } +} + +// ===== time driver ===== + +cfg_time! { + type TimeDriver = crate::park::Either, SignalDriver>; + + pub(crate) type Clock = crate::time::Clock; + pub(crate) type TimeHandle = Option; + + fn create_clock() -> Clock { + crate::time::Clock::new() + } + + fn create_time_driver( + enable: bool, + signal_driver: SignalDriver, + clock: Clock, + ) -> (TimeDriver, TimeHandle) { + use crate::park::Either; + + if enable { + let driver = crate::time::driver::Driver::new(signal_driver, clock); + let handle = driver.handle(); + + (Either::A(driver), Some(handle)) + } else { + (Either::B(signal_driver), None) + } + } +} + +cfg_not_time! { + type TimeDriver = SignalDriver; + + pub(crate) type Clock = (); + pub(crate) type TimeHandle = (); + + fn create_clock() -> Clock { + () + } + + fn create_time_driver( + _enable: bool, + signal_driver: SignalDriver, + _clock: Clock, + ) -> (TimeDriver, TimeHandle) { + (signal_driver, ()) + } +} + +// ===== runtime driver ===== + +#[derive(Debug)] +pub(crate) struct Driver { + inner: TimeDriver, +} + +pub(crate) struct Resources { + pub(crate) io_handle: IoHandle, + pub(crate) signal_handle: SignalHandle, + pub(crate) time_handle: TimeHandle, + pub(crate) clock: Clock, +} + +pub(crate) struct Cfg { + pub(crate) enable_io: bool, + pub(crate) enable_time: bool, +} + +impl Driver { + pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { + let clock = create_clock(); + + let (io_driver, io_handle) = create_io_driver(cfg.enable_io)?; + let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; + let (time_driver, time_handle) = + create_time_driver(cfg.enable_time, signal_driver, clock.clone()); + + Ok(( + Self { inner: time_driver }, + Resources { + io_handle, + signal_handle, + time_handle, + clock, + }, + )) + } +} + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = ::Error; + + fn unpark(&self) -> Self::Unpark { + self.inner.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.inner.park() + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.inner.park_timeout(duration) + } + + fn shutdown(&mut self) { + self.inner.shutdown() + } +} diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 516ad4b3aad..dfcc5e97921 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,4 +1,4 @@ -use crate::runtime::{blocking, context, io, time, Spawner}; +use crate::runtime::{blocking, context, driver, Spawner}; /// Handle to the runtime. /// @@ -11,13 +11,16 @@ pub(crate) struct Handle { pub(super) spawner: Spawner, /// Handles to the I/O drivers - pub(super) io_handle: io::Handle, + pub(super) io_handle: driver::IoHandle, + + /// Handles to the signal drivers + pub(super) signal_handle: driver::SignalHandle, /// Handles to the time drivers - pub(super) time_handle: time::Handle, + pub(super) time_handle: driver::TimeHandle, /// Source of `Instant::now()` - pub(super) clock: time::Clock, + pub(super) clock: driver::Clock, /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, diff --git a/tokio/src/runtime/io.rs b/tokio/src/runtime/io.rs deleted file mode 100644 index 6a0953af851..00000000000 --- a/tokio/src/runtime/io.rs +++ /dev/null @@ -1,63 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the I/O -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -/// Re-exported for convenience. -pub(crate) use std::io::Result; - -pub(crate) use variant::*; - -#[cfg(feature = "io-driver")] -mod variant { - use crate::io::driver; - use crate::park::{Either, ParkThread}; - - use std::io; - - /// The driver value the runtime passes to the `timer` layer. - /// - /// When the `io-driver` feature is enabled, this is the "real" I/O driver - /// backed by Mio. Without the `io-driver` feature, this is a thread parker - /// backed by a condition variable. - pub(crate) type Driver = Either; - - /// The handle the runtime stores for future use. - /// - /// When the `io-driver` feature is **not** enabled, this is `()`. - pub(crate) type Handle = Option; - - pub(crate) fn create_driver(enable: bool) -> io::Result<(Driver, Handle)> { - #[cfg(loom)] - assert!(!enable); - - if enable { - let driver = driver::Driver::new()?; - let handle = driver.handle(); - - Ok((Either::A(driver), Some(handle))) - } else { - let driver = ParkThread::new(); - Ok((Either::B(driver), None)) - } - } -} - -#[cfg(not(feature = "io-driver"))] -mod variant { - use crate::park::ParkThread; - - use std::io; - - /// I/O is not enabled, use a condition variable based parker - pub(crate) type Driver = ParkThread; - - /// There is no handle - pub(crate) type Handle = (); - - pub(crate) fn create_driver(_enable: bool) -> io::Result<(Driver, Handle)> { - let driver = ParkThread::new(); - - Ok((driver, ())) - } -} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index bec0ecd5949..884c2b46fa4 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -208,13 +208,18 @@ cfg_blocking_impl! { mod builder; pub use self::builder::Builder; +pub(crate) mod driver; + pub(crate) mod enter; use self::enter::enter; mod handle; use handle::Handle; -mod io; +mod io { + /// Re-exported for convenience. + pub(crate) use std::io::Result; +} cfg_rt_threaded! { mod park; @@ -227,8 +232,6 @@ use self::shell::Shell; mod spawner; use self::spawner::Spawner; -mod time; - cfg_rt_threaded! { mod queue; @@ -293,7 +296,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex>>), + Basic(Mutex>>), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/park.rs index 1dcf65af91b..c994c935ce2 100644 --- a/tokio/src/runtime/park.rs +++ b/tokio/src/runtime/park.rs @@ -6,7 +6,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::park::{Park, Unpark}; -use crate::runtime::time; +use crate::runtime::driver::Driver; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -42,14 +42,14 @@ const NOTIFIED: usize = 3; /// Shared across multiple Parker handles struct Shared { /// Shared driver. Only one thread at a time can use this - driver: TryLock, + driver: TryLock, /// Unpark handle - handle: ::Unpark, + handle: ::Unpark, } impl Parker { - pub(crate) fn new(driver: time::Driver) -> Parker { + pub(crate) fn new(driver: Driver) -> Parker { let handle = driver.unpark(); Parker { @@ -180,7 +180,7 @@ impl Inner { } } - fn park_driver(&self, driver: &mut time::Driver) { + fn park_driver(&self, driver: &mut Driver) { match self .state .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index a65869d0de2..3d631239bf8 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,8 +1,8 @@ #![allow(clippy::redundant_clone)] use crate::park::{Park, Unpark}; +use crate::runtime::driver::Driver; use crate::runtime::enter; -use crate::runtime::time; use crate::util::{waker_ref, Wake}; use std::future::Future; @@ -12,17 +12,17 @@ use std::task::Poll::Ready; #[derive(Debug)] pub(super) struct Shell { - driver: time::Driver, + driver: Driver, /// TODO: don't store this unpark: Arc, } #[derive(Debug)] -struct Handle(::Unpark); +struct Handle(::Unpark); impl Shell { - pub(super) fn new(driver: time::Driver) -> Shell { + pub(super) fn new(driver: Driver) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); Shell { driver, unpark } diff --git a/tokio/src/runtime/time.rs b/tokio/src/runtime/time.rs deleted file mode 100644 index c623d9641a1..00000000000 --- a/tokio/src/runtime/time.rs +++ /dev/null @@ -1,59 +0,0 @@ -//! Abstracts out the APIs necessary to `Runtime` for integrating the time -//! driver. When the `time` feature flag is **not** enabled. These APIs are -//! shells. This isolates the complexity of dealing with conditional -//! compilation. - -pub(crate) use variant::*; - -#[cfg(feature = "time")] -mod variant { - use crate::park::Either; - use crate::runtime::io; - use crate::time::{self, driver}; - - pub(crate) type Clock = time::Clock; - pub(crate) type Driver = Either, io::Driver>; - pub(crate) type Handle = Option; - - pub(crate) fn create_clock() -> Clock { - Clock::new() - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - enable: bool, - io_driver: io::Driver, - clock: Clock, - ) -> (Driver, Handle) { - if enable { - let driver = driver::Driver::new(io_driver, clock); - let handle = driver.handle(); - - (Either::A(driver), Some(handle)) - } else { - (Either::B(io_driver), None) - } - } -} - -#[cfg(not(feature = "time"))] -mod variant { - use crate::runtime::io; - - pub(crate) type Clock = (); - pub(crate) type Driver = io::Driver; - pub(crate) type Handle = (); - - pub(crate) fn create_clock() -> Clock { - () - } - - /// Create a new timer driver / handle pair - pub(crate) fn create_driver( - _enable: bool, - io_driver: io::Driver, - _clock: Clock, - ) -> (Driver, Handle) { - (io_driver, ()) - } -} diff --git a/tokio/src/signal/unix.rs b/tokio/src/signal/unix.rs index bc48bdfaa64..45a091d76a6 100644 --- a/tokio/src/signal/unix.rs +++ b/tokio/src/signal/unix.rs @@ -5,7 +5,6 @@ #![cfg(unix)] -use crate::io::{AsyncRead, PollEvented, ReadBuf}; use crate::signal::registry::{globals, EventId, EventInfo, Globals, Init, Storage}; use crate::sync::mpsc::{channel, Receiver}; @@ -17,6 +16,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Once; use std::task::{Context, Poll}; +pub(crate) mod driver; + pub(crate) type OsStorage = Vec; // Number of different unix signals @@ -202,9 +203,9 @@ impl Default for SignalInfo { /// The purpose of this signal handler is to primarily: /// /// 1. Flag that our specific signal was received (e.g. store an atomic flag) -/// 2. Wake up driver tasks by writing a byte to a pipe +/// 2. Wake up the driver by writing a byte to a pipe /// -/// Those two operations shoudl both be async-signal safe. +/// Those two operations should both be async-signal safe. fn action(globals: Pin<&'static Globals>, signal: c_int) { globals.record_event(signal as EventId); @@ -227,6 +228,9 @@ fn signal_enable(signal: c_int) -> io::Result<()> { )); } + // Check that we have a signal driver running + driver::Handle::current().check_inner()?; + let globals = globals(); let siginfo = match globals.storage().get(signal as EventId) { Some(slot) => slot, @@ -254,69 +258,6 @@ fn signal_enable(signal: c_int) -> io::Result<()> { } } -#[derive(Debug)] -struct Driver { - wakeup: PollEvented, -} - -impl Driver { - fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // Drain the data from the pipe and maintain interest in getting more - self.drain(cx); - // Broadcast any signals which were received - globals().broadcast(); - - Poll::Pending - } -} - -impl Driver { - fn new() -> io::Result { - // NB: We give each driver a "fresh" reciever file descriptor to avoid - // the issues described in alexcrichton/tokio-process#42. - // - // In the past we would reuse the actual receiver file descriptor and - // swallow any errors around double registration of the same descriptor. - // I'm not sure if the second (failed) registration simply doesn't end up - // receiving wake up notifications, or there could be some race condition - // when consuming readiness events, but having distinct descriptors for - // distinct PollEvented instances appears to mitigate this. - // - // Unfortunately we cannot just use a single global PollEvented instance - // either, since we can't compare Handles or assume they will always - // point to the exact same reactor. - let stream = globals().receiver.try_clone()?; - let wakeup = PollEvented::new(stream)?; - - Ok(Driver { wakeup }) - } - - /// Drain all data in the global receiver, ensuring we'll get woken up when - /// there is a write on the other end. - /// - /// We do *NOT* use the existence of any read bytes as evidence a signal was - /// received since the `pending` flags would have already been set if that - /// was the case. See - /// [#38](https://github.com/alexcrichton/tokio-signal/issues/38) for more - /// info. - fn drain(&mut self, cx: &mut Context<'_>) { - let mut buf = [0; 128]; - let mut buf = ReadBuf::new(&mut buf); - loop { - match Pin::new(&mut self.wakeup).poll_read(cx, &mut buf) { - Poll::Ready(Ok(())) => { - if buf.filled().is_empty() { - panic!("EOF on self-pipe") - } - buf.clear(); - } - Poll::Ready(Err(e)) => panic!("Bad read on self-pipe: {}", e), - Poll::Pending => break, - } - } - } -} - /// A stream of events for receiving a particular type of OS signal. /// /// In general signal handling on Unix is a pretty tricky topic, and this @@ -382,7 +323,6 @@ impl Driver { #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Signal { - driver: Driver, rx: Receiver<()>, } @@ -414,16 +354,12 @@ pub fn signal(kind: SignalKind) -> io::Result { // Turn the signal delivery on once we are ready for it signal_enable(signal)?; - // Ensure there's a driver for our associated event loop processing - // signals. - let driver = Driver::new()?; - // One wakeup in a queue is enough, no need for us to buffer up any // more. let (tx, rx) = channel(1); globals().register_listener(signal as EventId, tx); - Ok(Signal { driver, rx }) + Ok(Signal { rx }) } impl Signal { @@ -484,7 +420,6 @@ impl Signal { /// } /// ``` pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - let _ = self.driver.poll(cx); self.rx.poll_recv(cx) } } diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs new file mode 100644 index 00000000000..0458893cce4 --- /dev/null +++ b/tokio/src/signal/unix/driver.rs @@ -0,0 +1,153 @@ +//! Signal driver + +use crate::io::driver::Driver as IoDriver; +use crate::io::Registration; +use crate::park::Park; +use crate::runtime::context; +use crate::signal::registry::globals; +use mio_uds::UnixStream; +use std::io::{self, Read}; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +/// Responsible for registering wakeups when an OS signal is received, and +/// subsequently dispatching notifications to any signal listeners as appropriate. +/// +/// Note: this driver relies on having an enabled IO driver in order to listen to +/// pipe write wakeups. +#[derive(Debug)] +pub(crate) struct Driver { + /// Thread parker. The `Driver` park implementation delegates to this. + park: IoDriver, + + /// A pipe for receiving wake events from the signal handler + receiver: UnixStream, + + /// The actual registraiton for `receiver` when active. + /// Lazily bound at the first signal registration. + registration: Registration, + + /// Shared state + inner: Arc, +} + +#[derive(Clone, Debug)] +pub(crate) struct Handle { + inner: Weak, +} + +#[derive(Debug)] +pub(super) struct Inner(()); + +// ===== impl Driver ===== + +impl Driver { + /// Creates a new signal `Driver` instance that delegates wakeups to `park`. + pub(crate) fn new(park: IoDriver) -> io::Result { + // NB: We give each driver a "fresh" reciever file descriptor to avoid + // the issues described in alexcrichton/tokio-process#42. + // + // In the past we would reuse the actual receiver file descriptor and + // swallow any errors around double registration of the same descriptor. + // I'm not sure if the second (failed) registration simply doesn't end up + // receiving wake up notifications, or there could be some race condition + // when consuming readiness events, but having distinct descriptors for + // distinct PollEvented instances appears to mitigate this. + // + // Unfortunately we cannot just use a single global PollEvented instance + // either, since we can't compare Handles or assume they will always + // point to the exact same reactor. + let receiver = globals().receiver.try_clone()?; + let registration = + Registration::new_with_ready_and_handle(&receiver, mio::Ready::all(), park.handle())?; + + Ok(Self { + park, + receiver, + registration, + inner: Arc::new(Inner(())), + }) + } + + /// Returns a handle to this event loop which can be sent across threads + /// and can be used as a proxy to the event loop itself. + pub(crate) fn handle(&self) -> Handle { + Handle { + inner: Arc::downgrade(&self.inner), + } + } + + fn process(&self) { + // Check if the pipe is ready to read and therefore has "woken" us up + match self.registration.take_read_ready() { + Ok(Some(ready)) => assert!(ready.is_readable()), + Ok(None) => return, // No wake has arrived, bail + Err(e) => panic!("reactor gone: {}", e), + } + + // Drain the pipe completely so we can receive a new readiness event + // if another signal has come in. + let mut buf = [0; 128]; + loop { + match (&self.receiver).read(&mut buf) { + Ok(0) => panic!("EOF on self-pipe"), + Ok(_) => continue, // Keep reading + Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) => panic!("Bad read on self-pipe: {}", e), + } + } + + // Broadcast any signals which were received + globals().broadcast(); + } +} + +// ===== impl Park for Driver ===== + +impl Park for Driver { + type Unpark = ::Unpark; + type Error = io::Error; + + fn unpark(&self) -> Self::Unpark { + self.park.unpark() + } + + fn park(&mut self) -> Result<(), Self::Error> { + self.park.park()?; + self.process(); + Ok(()) + } + + fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + self.park.park_timeout(duration)?; + self.process(); + Ok(()) + } + + fn shutdown(&mut self) { + self.park.shutdown() + } +} + +// ===== impl Handle ===== + +impl Handle { + /// Returns a handle to the current driver + /// + /// # Panics + /// + /// This function panics if there is no current signal driver set. + pub(super) fn current() -> Self { + context::signal_handle().expect( + "there is no signal driver running, must be called from the context of Tokio runtime", + ) + } + + pub(super) fn check_inner(&self) -> io::Result<()> { + if self.inner.strong_count() > 0 { + Ok(()) + } else { + Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + } + } +}