From 30d4ec0a209c0f71b11d8f358423ec22b1761ff4 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 23 Aug 2020 18:47:20 +0300 Subject: [PATCH 1/4] io: add ReaderStream (#2714) --- tokio/src/io/mod.rs | 1 + tokio/src/io/util/mod.rs | 3 + tokio/src/io/util/reader_stream.rs | 105 +++++++++++++++++++++++++++++ tokio/tests/io_reader_stream.rs | 64 ++++++++++++++++++ 4 files changed, 173 insertions(+) create mode 100644 tokio/src/io/util/reader_stream.rs create mode 100644 tokio/tests/io_reader_stream.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9e0e063195c..37da942ff3d 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,6 +236,7 @@ cfg_io_util! { cfg_stream! { pub use util::{stream_reader, StreamReader}; + pub use util::{reader_stream, ReaderStream}; } } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 609ff2386a6..782a02a8f74 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -66,6 +66,9 @@ cfg_io_util! { cfg_stream! { mod stream_reader; pub use stream_reader::{stream_reader, StreamReader}; + + mod reader_stream; + pub use reader_stream::{reader_stream, ReaderStream}; } mod take; diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs new file mode 100644 index 00000000000..51651cede4d --- /dev/null +++ b/tokio/src/io/util/reader_stream.rs @@ -0,0 +1,105 @@ +use crate::io::AsyncRead; +use crate::stream::Stream; +use bytes::{Bytes, BytesMut}; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Convert an [`AsyncRead`] implementor into a + /// [`Stream`] of Result<[`Bytes`], std::io::Error>. + /// After first error it will stop. + /// Additionally, this stream is fused: after it returns None at some + /// moment, it is guaranteed that further `next()`, `poll_next()` and + /// similar functions will instantly return None. + /// + /// This type can be created using the [`reader_stream`] function + /// + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`Stream`]: crate::stream::Stream + /// [`Bytes`]: bytes::Bytes + /// [`reader_stream`]: crate::io::reader_stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct ReaderStream { + // Reader itself. + // None if we had error reading from the `reader` in the past. + #[pin] + reader: Option, + // Working buffer, used to optimize allocations. + // # Capacity behavior + // Initially `buf` is empty. Also it's getting smaller and smaller + // during polls (because its chunks are returned to stream user). + // But when it's capacity reaches 0, it is growed. + buf: BytesMut, + } +} + +/// Convert an [`AsyncRead`] implementor into a +/// [`Stream`] of Result<[`Bytes`], std::io::Error>. +/// +/// # Example +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// use tokio::stream::StreamExt; +/// +/// let data: &[u8] = b"hello, world!"; +/// let mut stream = tokio::io::reader_stream(data); +/// let mut stream_contents = Vec::new(); +/// while let Some(chunk) = stream.next().await { +/// stream_contents.extend_from_slice(chunk?.as_ref()); +/// } +/// assert_eq!(stream_contents, data); +/// # Ok(()) +/// # } +/// ``` +/// +/// [`AsyncRead`]: crate::io::AsyncRead +/// [`Stream`]: crate::stream::Stream +/// [`Bytes`]: bytes::Bytes +pub fn reader_stream(reader: R) -> ReaderStream +where + R: AsyncRead, +{ + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + } +} + +const CAPACITY: usize = 4096; + +impl Stream for ReaderStream +where + R: AsyncRead, +{ + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + match reader.poll_read_buf(cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} diff --git a/tokio/tests/io_reader_stream.rs b/tokio/tests/io_reader_stream.rs new file mode 100644 index 00000000000..6546a0ef4da --- /dev/null +++ b/tokio/tests/io_reader_stream.rs @@ -0,0 +1,64 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; +use tokio::stream::StreamExt; + +/// produces at most `remaining` zeros, that returns error. +/// each time it reads at most 31 byte. +struct Reader { + remaining: usize, +} + +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = Pin::into_inner(self); + assert_ne!(buf.len(), 0); + if this.remaining > 0 { + let n = std::cmp::min(this.remaining, buf.len()); + let n = std::cmp::min(n, 31); + for x in &mut buf[..n] { + *x = 0; + } + this.remaining -= n; + Poll::Ready(Ok(n)) + } else { + Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) + } + } +} + +#[tokio::test] +async fn correct_behavior_on_errors() { + let reader = Reader { remaining: 8000 }; + let mut stream = tokio::io::reader_stream(reader); + let mut zeros_received = 0; + let mut had_error = false; + loop { + let item = stream.next().await.unwrap(); + match item { + Ok(bytes) => { + let bytes = &*bytes; + for byte in bytes { + assert_eq!(*byte, 0); + zeros_received += 1; + } + } + Err(_) => { + assert!(!had_error); + had_error = true; + break; + } + } + } + + assert!(had_error); + assert_eq!(zeros_received, 8000); + assert!(stream.next().await.is_none()); +} From f0328f78103eb73df0f532b7e69eaca4b4fda6c7 Mon Sep 17 00:00:00 2001 From: John-John Tedro Date: Thu, 27 Aug 2020 08:23:38 +0200 Subject: [PATCH 2/4] sync: implement map methods of parking_lot fame (#2771) * sync: Implement map methods of parking_lot fame Generally, this mimics the way `MappedRwLock*Guard`s are implemented in `parking_lot`. By storing a raw pointer in the guards themselves referencing the mapped data and maintaining type invariants through `PhantomData`. I didn't try to think too much about this, so if someone has objections I'd love to hear them. I've also dropped the internal use of `ReleasingPermit`, since it made the guards unecessarily large. The number of permits that need to be released are already known by the guards themselves, and is instead governed directly in the relevant `Drop` impls. This has the benefit of making the guards as small as possible, for the non-mapped variants this means a single reference is enough. `fmt::Debug` impls have been adjusted to behave exactly like the delegating impls in `parking_lot`. `fmt::Display` impls have been added for all guard types which behave the same. This does change the format of debug impls, for which I'm not sure if we provide any guarantees. --- .github/workflows/ci.yml | 4 +- tokio/src/sync/rwlock.rs | 332 ++++++++++++++++++++++++++++++++++----- 2 files changed, 298 insertions(+), 38 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f24bdb88cd3..ecc27a9168f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,8 @@ on: push: - branches: ["master"] + branches: ["v0.2.x"] pull_request: - branches: ["master"] + branches: ["v0.2.x"] name: CI diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 3d2a2f7a8fc..1bb579319d0 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,5 +1,8 @@ -use crate::sync::batch_semaphore::{AcquireError, Semaphore}; +use crate::sync::batch_semaphore::Semaphore; use std::cell::UnsafeCell; +use std::fmt; +use std::marker; +use std::mem; use std::ops; #[cfg(not(loom))] @@ -83,10 +86,140 @@ pub struct RwLock { /// [`RwLock`]. /// /// [`read`]: method@RwLock::read -#[derive(Debug)] +/// [`RwLock`]: struct@RwLock pub struct RwLockReadGuard<'a, T: ?Sized> { - permit: ReleasingPermit<'a, T>, - lock: &'a RwLock, + s: &'a Semaphore, + data: *const T, + marker: marker::PhantomData<&'a T>, +} + +impl<'a, T> RwLockReadGuard<'a, T> { + /// Make a new `RwLockReadGuard` for a component of the locked data. + /// + /// This operation cannot fail as the `RwLockReadGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be + /// used as `RwLockReadGuard::map(...)`. A method would interfere with + /// methods of the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockReadGuard::map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockReadGuard::map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockReadGuard.html#method.map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockReadGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// let guard = lock.read().await; + /// let guard = RwLockReadGuard::map(guard, |f| &f.0); + /// + /// assert_eq!(1, *guard); + /// # } + /// ``` + #[inline] + pub fn map(this: Self, f: F) -> RwLockReadGuard<'a, U> + where + F: FnOnce(&T) -> &U, + { + let data = f(&*this) as *const U; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`RwLockReadGuard`] for a component of the + /// locked data. The original guard is returned if the closure returns + /// `None`. + /// + /// This operation cannot fail as the `RwLockReadGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be used as + /// `RwLockReadGuard::try_map(..)`. A method would interfere with methods of the + /// same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockReadGuard::try_map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockReadGuard::try_map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockReadGuard.html#method.try_map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockReadGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// let guard = lock.read().await; + /// let guard = RwLockReadGuard::try_map(guard, |f| Some(&f.0)).expect("should not fail"); + /// + /// assert_eq!(1, *guard); + /// # } + /// ``` + #[inline] + pub fn try_map(this: Self, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + { + let data = match f(&*this) { + Some(data) => data as *const U, + None => return Err(this), + }; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + Ok(RwLockReadGuard { + s, + data, + marker: marker::PhantomData, + }) + } +} + +impl<'a, T: ?Sized> fmt::Debug for RwLockReadGuard<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> fmt::Display for RwLockReadGuard<'a, T> +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Drop for RwLockReadGuard<'a, T> { + fn drop(&mut self) { + self.s.release(1); + } } /// RAII structure used to release the exclusive write access of a lock when @@ -97,32 +230,143 @@ pub struct RwLockReadGuard<'a, T: ?Sized> { /// /// [`write`]: method@RwLock::write /// [`RwLock`]: struct@RwLock -#[derive(Debug)] pub struct RwLockWriteGuard<'a, T: ?Sized> { - permit: ReleasingPermit<'a, T>, - lock: &'a RwLock, + s: &'a Semaphore, + data: *mut T, + marker: marker::PhantomData<&'a mut T>, } -// Wrapper arround Permit that releases on Drop -#[derive(Debug)] -struct ReleasingPermit<'a, T: ?Sized> { - num_permits: u16, - lock: &'a RwLock, +impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> { + /// Make a new `RwLockWriteGuard` for a component of the locked data. + /// + /// This operation cannot fail as the `RwLockWriteGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be used as + /// `RwLockWriteGuard::map(..)`. A method would interfere with methods of + /// the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockWriteGuard::map`] from the + /// [`parking_lot` crate]. + /// + /// [`RwLockWriteGuard::map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockWriteGuard.html#method.map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockWriteGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// { + /// let mut mapped = RwLockWriteGuard::map(lock.write().await, |f| &mut f.0); + /// *mapped = 2; + /// } + /// + /// assert_eq!(Foo(2), *lock.read().await); + /// # } + /// ``` + #[inline] + pub fn map(mut this: Self, f: F) -> RwLockWriteGuard<'a, U> + where + F: FnOnce(&mut T) -> &mut U, + { + let data = f(&mut *this) as *mut U; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + RwLockWriteGuard { + s, + data, + marker: marker::PhantomData, + } + } + + /// Attempts to make a new [`RwLockWriteGuard`] for a component of + /// the locked data. The original guard is returned if the closure returns + /// `None`. + /// + /// This operation cannot fail as the `RwLockWriteGuard` passed in already + /// locked the data. + /// + /// This is an associated function that needs to be + /// used as `RwLockWriteGuard::try_map(...)`. A method would interfere with + /// methods of the same name on the contents of the locked data. + /// + /// This is an asynchronous version of [`RwLockWriteGuard::try_map`] from + /// the [`parking_lot` crate]. + /// + /// [`RwLockWriteGuard::try_map`]: https://docs.rs/lock_api/latest/lock_api/struct.RwLockWriteGuard.html#method.try_map + /// [`parking_lot` crate]: https://crates.io/crates/parking_lot + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::{RwLock, RwLockWriteGuard}; + /// + /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] + /// struct Foo(u32); + /// + /// # #[tokio::main] + /// # async fn main() { + /// let lock = RwLock::new(Foo(1)); + /// + /// { + /// let guard = lock.write().await; + /// let mut guard = RwLockWriteGuard::try_map(guard, |f| Some(&mut f.0)).expect("should not fail"); + /// *guard = 2; + /// } + /// + /// assert_eq!(Foo(2), *lock.read().await); + /// # } + /// ``` + #[inline] + pub fn try_map(mut this: Self, f: F) -> Result, Self> + where + F: FnOnce(&mut T) -> Option<&mut U>, + { + let data = match f(&mut *this) { + Some(data) => data as *mut U, + None => return Err(this), + }; + let s = this.s; + // NB: Forget to avoid drop impl from being called. + mem::forget(this); + Ok(RwLockWriteGuard { + s, + data, + marker: marker::PhantomData, + }) + } } -impl<'a, T: ?Sized> ReleasingPermit<'a, T> { - async fn acquire( - lock: &'a RwLock, - num_permits: u16, - ) -> Result, AcquireError> { - lock.s.acquire(num_permits.into()).await?; - Ok(Self { num_permits, lock }) +impl<'a, T: ?Sized> fmt::Debug for RwLockWriteGuard<'a, T> +where + T: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> fmt::Display for RwLockWriteGuard<'a, T> +where + T: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) } } -impl Drop for ReleasingPermit<'_, T> { +impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> { fn drop(&mut self) { - self.lock.s.release(self.num_permits as usize); + self.s.release(MAX_READS); } } @@ -139,9 +383,11 @@ fn bounds() { check_sync::>(); check_unpin::>(); + check_send::>(); check_sync::>(); check_unpin::>(); + check_send::>(); check_sync::>(); check_unpin::>(); @@ -155,8 +401,17 @@ fn bounds() { // RwLock. unsafe impl Send for RwLock where T: ?Sized + Send {} unsafe impl Sync for RwLock where T: ?Sized + Send + Sync {} +// NB: These impls need to be explicit since we're storing a raw pointer. +// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over +// `T` is `Send`. +unsafe impl Send for RwLockReadGuard<'_, T> where T: ?Sized + Sync {} unsafe impl Sync for RwLockReadGuard<'_, T> where T: ?Sized + Send + Sync {} unsafe impl Sync for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} +// Safety: Stores a raw pointer to `T`, so if `T` is `Sync`, the lock guard over +// `T` is `Send` - but since this is also provides mutable access, we need to +// make sure that `T` is `Send` since its value can be sent across thread +// boundaries. +unsafe impl Send for RwLockWriteGuard<'_, T> where T: ?Sized + Send + Sync {} impl RwLock { /// Creates a new instance of an `RwLock` which is unlocked. @@ -210,12 +465,16 @@ impl RwLock { ///} /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - let permit = ReleasingPermit::acquire(self, 1).await.unwrap_or_else(|_| { + self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() }); - RwLockReadGuard { lock: self, permit } + RwLockReadGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + } } /// Locks this rwlock with exclusive write access, causing the current task @@ -241,15 +500,16 @@ impl RwLock { ///} /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - let permit = ReleasingPermit::acquire(self, MAX_READS as u16) - .await - .unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. - unreachable!() - }); - - RwLockWriteGuard { lock: self, permit } + self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); + RwLockWriteGuard { + s: &self.s, + data: self.c.get(), + marker: marker::PhantomData, + } } /// Consumes the lock, returning the underlying data. @@ -265,7 +525,7 @@ impl ops::Deref for RwLockReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.lock.c.get() } + unsafe { &*self.data } } } @@ -273,13 +533,13 @@ impl ops::Deref for RwLockWriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { - unsafe { &*self.lock.c.get() } + unsafe { &*self.data } } } impl ops::DerefMut for RwLockWriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.lock.c.get() } + unsafe { &mut *self.data } } } @@ -289,7 +549,7 @@ impl From for RwLock { } } -impl Default for RwLock +impl Default for RwLock where T: Default, { From 2b96b1773dd31ff923de4bd8adac2adbac447399 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 21 Sep 2020 18:57:27 +0200 Subject: [PATCH 3/4] ci: update nightly and fix all sorts of new failures (#2852) * ci: update miri flags * ci: fix doc warnings * doc: fix some links Cherry-pick of 18ed761 from #2834 * ci: cherry-pick 00a2849 From: #2793 * ci: cherry-pick 6b61212 From: #2793 Co-authored-by: Blas Rodriguez Irizar --- .github/workflows/ci.yml | 4 ++-- tokio-macros/src/lib.rs | 2 +- tokio-test/src/lib.rs | 2 +- tokio-util/src/lib.rs | 2 +- tokio/src/io/util/async_read_ext.rs | 2 +- tokio/src/lib.rs | 3 +-- tokio/src/net/udp/socket.rs | 4 ++-- tokio/src/sync/mod.rs | 4 ++-- tokio/src/time/delay_queue.rs | 28 ++++++++++++++-------------- 9 files changed, 25 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ecc27a9168f..319fb0de1cf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,7 +9,7 @@ name: CI env: RUSTFLAGS: -Dwarnings RUST_BACKTRACE: 1 - nightly: nightly-2020-07-12 + nightly: nightly-2020-09-21 minrust: 1.39.0 jobs: @@ -110,7 +110,7 @@ jobs: rm -rf tokio/tests - name: miri - run: cargo miri test --features rt-core,rt-threaded,rt-util,sync -- -- task + run: cargo miri test --features rt-core,rt-threaded,rt-util,sync task working-directory: tokio cross: diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 1fd445d877c..6cdffdb03e3 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs index 185f317b973..69b8362c4e5 100644 --- a/tokio-test/src/lib.rs +++ b/tokio-test/src/lib.rs @@ -5,7 +5,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 260c9485c26..e888e3e3e3b 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio/src/io/util/async_read_ext.rs b/tokio/src/io/util/async_read_ext.rs index dd280a188d7..0ab66c286d3 100644 --- a/tokio/src/io/util/async_read_ext.rs +++ b/tokio/src/io/util/async_read_ext.rs @@ -1067,7 +1067,7 @@ cfg_io_util! { /// (See also the [`crate::fs::read_to_string`] convenience function for /// reading from a file.) /// - /// [`crate::fs::read_to_string`]: crate::fs::read_to_string::read_to_string + /// [`crate::fs::read_to_string`]: fn@crate::fs::read_to_string fn read_to_string<'a>(&'a mut self, dst: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin, diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 88707c4d1c6..11a11195094 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -10,13 +10,12 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![cfg_attr(docsrs, deny(broken_intra_doc_links))] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] #![cfg_attr(docsrs, feature(doc_cfg))] -#![cfg_attr(docsrs, feature(doc_alias))] //! A runtime for writing reliable, asynchronous, and slim applications. //! diff --git a/tokio/src/net/udp/socket.rs b/tokio/src/net/udp/socket.rs index 16e537739d4..00b00d8d94f 100644 --- a/tokio/src/net/udp/socket.rs +++ b/tokio/src/net/udp/socket.rs @@ -125,7 +125,7 @@ impl UdpSocket { /// should ensure that when the remote cannot receive, the /// [`ErrorKind::WouldBlock`] is properly handled. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send(&self, buf: &[u8]) -> io::Result { self.io.get_ref().send(buf) } @@ -209,7 +209,7 @@ impl UdpSocket { /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur /// if the IP version of the socket does not match that of `target`. /// - /// [`ErrorKind::WouldBlock`]: std::io::error::ErrorKind::WouldBlock + /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { self.io.get_ref().send_to(buf, &target) } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 3d96106d2df..f93ff7bb873 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -399,8 +399,8 @@ //! } //! ``` //! -//! [`watch` channel]: crate::sync::watch -//! [`broadcast` channel]: crate::sync::broadcast +//! [`watch` channel]: mod@crate::sync::watch +//! [`broadcast` channel]: mod@crate::sync::broadcast //! //! # State synchronization //! diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index a947cc6fe4e..b02153b9e0f 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -27,13 +27,13 @@ use std::task::{self, Poll}; /// which it should be yielded back. /// /// Once delays have been configured, the `DelayQueue` is used via its -/// [`Stream`] implementation. [`poll`] is called. If an entry has reached its +/// [`Stream`] implementation. [`poll_expired`] is called. If an entry has reached its /// deadline, it is returned. If not, `Poll::Pending` indicating that the /// current task will be notified once the deadline has been reached. /// /// # `Stream` implementation /// -/// Items are retrieved from the queue via [`Stream::poll`]. If no delays have +/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have /// expired, no items are returned. In this case, `NotReady` is returned and the /// current task is registered to be notified once the next item's delay has /// expired. @@ -115,8 +115,8 @@ use std::task::{self, Poll}; /// [`insert_at`]: method@Self::insert_at /// [`Key`]: struct@Key /// [`Stream`]: https://docs.rs/futures/0.1/futures/stream/trait.Stream.html -/// [`poll`]: method@Self::poll -/// [`Stream::poll`]: method@Self::poll +/// [`poll_expired`]: method@Self::poll_expired +/// [`Stream::poll_expired`]: method@Self::poll_expired /// [`DelayQueue`]: struct@DelayQueue /// [`delay_for`]: fn@super::delay_for /// [`slab`]: slab @@ -146,9 +146,9 @@ pub struct DelayQueue { /// An entry in `DelayQueue` that has expired and removed. /// -/// Values are returned by [`DelayQueue::poll`]. +/// Values are returned by [`DelayQueue::poll_expired`]. /// -/// [`DelayQueue::poll`]: method@DelayQueue::poll +/// [`DelayQueue::poll_expired`]: method@DelayQueue::poll_expired #[derive(Debug)] pub struct Expired { /// The data stored in the queue @@ -260,12 +260,12 @@ impl DelayQueue { /// of a `Duration`. /// /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll`]. If `when` has already been + /// `value` will be returned from [`poll_expired`]. If `when` has already been /// reached, then `value` is immediately made available to poll. /// /// The return value represents the insertion and is used at an argument to /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll`] after + /// `value` is removed from the queue either by calling [`poll_expired`] after /// `when` is reached or by calling [`remove`]. At this point, the caller /// must take care to not use the returned [`Key`] again as it may reference /// a different item in the queue. @@ -295,7 +295,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -367,12 +367,12 @@ impl DelayQueue { /// instead of an `Instant`. /// /// `value` is stored in the queue until `when` is reached. At which point, - /// `value` will be returned from [`poll`]. If `when` has already been + /// `value` will be returned from [`poll_expired`]. If `when` has already been /// reached, then `value` is immediately made available to poll. /// /// The return value represents the insertion and is used at an argument to /// [`remove`] and [`reset`]. Note that [`Key`] is token and is reused once - /// `value` is removed from the queue either by calling [`poll`] after + /// `value` is removed from the queue either by calling [`poll_expired`] after /// `when` is reached or by calling [`remove`]. At this point, the caller /// must take care to not use the returned [`Key`] again as it may reference /// a different item in the queue. @@ -403,7 +403,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -578,11 +578,11 @@ impl DelayQueue { /// Clears the queue, removing all items. /// - /// After calling `clear`, [`poll`] will return `Ok(Ready(None))`. + /// After calling `clear`, [`poll_expired`] will return `Ok(Ready(None))`. /// /// Note that this method has no effect on the allocated capacity. /// - /// [`poll`]: method@Self::poll + /// [`poll_expired`]: method@Self::poll_expired /// /// # Examples /// From c0c7124a4b01892727cd023a651ebb90bcd2e2ad Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 21 Sep 2020 14:29:22 -0700 Subject: [PATCH 4/4] sync: fix missing notification during mpsc close (#2854) When the mpsc channel receiver closes the channel, receiving should return `None` once all in-progress sends have completed. When a sender reserves capacity, this prevents the receiver from fully shutting down. Previously, when the sender, after reserving capacity, dropped without sending a message, the receiver was not notified. This results in blocking the shutdown process until all sender handles drop. This patch adds a receiver notification when the channel is both closed and all outstanding sends have completed. --- tokio/src/sync/mpsc/chan.rs | 22 +++++++++++++++++----- tokio/src/sync/semaphore_ll.rs | 2 +- tokio/tests/sync_mpsc.rs | 22 ++++++++++++++++++++++ 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad766..0a53cda2038 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,7 +75,11 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; @@ -192,7 +196,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +238,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 25d25ac88ab..f044095f8fc 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -910,7 +910,7 @@ impl Waiter { } /// Try to decrement the number of permits to acquire. This returns the - /// actual number of permits that were decremented. The delta betweeen `n` + /// actual number of permits that were decremented. The delta between `n` /// and the return has been assigned to the permit and the caller must /// assign these back to the semaphore. fn try_dec_permits_to_acquire(&self, n: usize) -> usize { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f02d90aa56d..f4966c31377 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -490,3 +490,25 @@ fn try_recv_unbounded() { _ => panic!(), } } + +#[test] +fn ready_close_cancel_bounded() { + use futures::future::poll_fn; + + let (mut tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + { + let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); + assert_ready_ok!(ready.poll()); + } + + rx.close(); + + let mut recv = task::spawn(async { rx.recv().await }); + assert_pending!(recv.poll()); + + drop(tx); + + assert!(recv.is_woken()); +}