diff --git a/benches/spawn.rs b/benches/spawn.rs index 72a4035757a..7d4b8137354 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -2,63 +2,83 @@ //! This essentially measure the time to enqueue a task in the local and remote //! case. +#[macro_use] +extern crate bencher; + use bencher::{black_box, Bencher}; async fn work() -> usize { let val = 1 + 1; + tokio::task::yield_now().await; black_box(val) } -fn basic_scheduler_local_spawn(bench: &mut Bencher) { +fn basic_scheduler_spawn(bench: &mut Bencher) { let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - runtime.block_on(async { - bench.iter(|| { + bench.iter(|| { + runtime.block_on(async { let h = tokio::spawn(work()); - black_box(h); - }) + assert_eq!(h.await.unwrap(), 2); + }); }); } -fn threaded_scheduler_local_spawn(bench: &mut Bencher) { +fn basic_scheduler_spawn10(bench: &mut Bencher) { let runtime = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - runtime.block_on(async { - bench.iter(|| { - let h = tokio::spawn(work()); - black_box(h); - }) + bench.iter(|| { + runtime.block_on(async { + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + handles.push(tokio::spawn(work())); + } + for handle in handles { + assert_eq!(handle.await.unwrap(), 2); + } + }); }); } -fn basic_scheduler_remote_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new_current_thread() +fn threaded_scheduler_spawn(bench: &mut Bencher) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) .build() .unwrap(); - bench.iter(|| { - let h = runtime.spawn(work()); - black_box(h); + runtime.block_on(async { + let h = tokio::spawn(work()); + assert_eq!(h.await.unwrap(), 2); + }); }); } -fn threaded_scheduler_remote_spawn(bench: &mut Bencher) { - let runtime = tokio::runtime::Builder::new_multi_thread().build().unwrap(); - +fn threaded_scheduler_spawn10(bench: &mut Bencher) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(); bench.iter(|| { - let h = runtime.spawn(work()); - black_box(h); + runtime.block_on(async { + let mut handles = Vec::with_capacity(10); + for _ in 0..10 { + handles.push(tokio::spawn(work())); + } + for handle in handles { + assert_eq!(handle.await.unwrap(), 2); + } + }); }); } bencher::benchmark_group!( spawn, - basic_scheduler_local_spawn, - threaded_scheduler_local_spawn, - basic_scheduler_remote_spawn, - threaded_scheduler_remote_spawn + basic_scheduler_spawn, + basic_scheduler_spawn10, + threaded_scheduler_spawn, + threaded_scheduler_spawn10, ); bencher::benchmark_main!(spawn); diff --git a/tokio-macros/CHANGELOG.md b/tokio-macros/CHANGELOG.md index 6df34c0c030..0d58f9737d9 100644 --- a/tokio-macros/CHANGELOG.md +++ b/tokio-macros/CHANGELOG.md @@ -1,3 +1,9 @@ +# 1.3.0 (July 7, 2021) + +- macros: don't trigger `clippy::unwrap_used` ([#3926]) + +[#3926]: https://github.com/tokio-rs/tokio/pull/3926 + # 1.2.0 (May 14, 2021) - macros: forward input arguments in `#[tokio::test]` ([#3691]) diff --git a/tokio-macros/Cargo.toml b/tokio-macros/Cargo.toml index 875ceb62400..5399bc6c0f1 100644 --- a/tokio-macros/Cargo.toml +++ b/tokio-macros/Cargo.toml @@ -6,13 +6,13 @@ name = "tokio-macros" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-macros-1.0.x" git tag. -version = "1.2.0" +version = "1.3.0" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-macros/1.2.0/tokio_macros" +documentation = "https://docs.rs/tokio-macros/1.3.0/tokio_macros" description = """ Tokio's proc macros. """ diff --git a/tokio-stream/CHANGELOG.md b/tokio-stream/CHANGELOG.md index 0c598041824..a0cdef0f651 100644 --- a/tokio-stream/CHANGELOG.md +++ b/tokio-stream/CHANGELOG.md @@ -1,3 +1,13 @@ +# 0.1.7 (July 7, 2021) + +### Fixed + +- sync: fix watch wrapper ([#3914]) +- time: fix `Timeout::size_hint` ([#3902]) + +[#3902]: https://github.com/tokio-rs/tokio/pull/3902 +[#3914]: https://github.com/tokio-rs/tokio/pull/3914 + # 0.1.6 (May 14, 2021) ### Added diff --git a/tokio-stream/Cargo.toml b/tokio-stream/Cargo.toml index e169a2becb8..911657c37d6 100644 --- a/tokio-stream/Cargo.toml +++ b/tokio-stream/Cargo.toml @@ -6,13 +6,13 @@ name = "tokio-stream" # - Cargo.toml # - Update CHANGELOG.md. # - Create "tokio-stream-0.1.x" git tag. -version = "0.1.6" +version = "0.1.7" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" -documentation = "https://docs.rs/tokio-stream/0.1.6/tokio_stream" +documentation = "https://docs.rs/tokio-stream/0.1.7/tokio_stream" description = """ Utilities to work with `Stream` and `tokio`. """ diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index c71da3e3212..b2d256c9ecb 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,13 @@ +# 1.8.1 (July 6, 2021) + +Forward ports 1.5.1 fixes. + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.8.0 (July 2, 2021) ### Added @@ -36,6 +46,16 @@ [#3899]: https://github.com/tokio-rs/tokio/pull/3899 [#3900]: https://github.com/tokio-rs/tokio/pull/3900 +# 1.7.2 (July 6, 2021) + +Forward ports 1.5.1 fixes. + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.7.1 (June 18, 2021) ### Fixed @@ -78,6 +98,16 @@ [#3840]: https://github.com/tokio-rs/tokio/pull/3840 [#3850]: https://github.com/tokio-rs/tokio/pull/3850 +# 1.6.3 (July 6, 2021) + +Forward ports 1.5.1 fixes. + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.6.2 (June 14, 2021) ### Fixes @@ -140,6 +170,14 @@ a kernel bug. ([#3803]) [#3775]: https://github.com/tokio-rs/tokio/pull/3775 [#3780]: https://github.com/tokio-rs/tokio/pull/3780 +# 1.5.1 (July 6, 2021) + +### Fixed + +- runtime: remotely abort tasks on `JoinHandle::abort` ([#3934]) + +[#3934]: https://github.com/tokio-rs/tokio/pull/3934 + # 1.5.0 (April 12, 2021) ### Added diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 0058315770b..2945b6a387b 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.8.0" +version = "1.8.1" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.8.0/tokio/" +documentation = "https://docs.rs/tokio/1.8.1/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0277a360d09..daf3dccde7f 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -936,6 +936,41 @@ impl TcpStream { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) } + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `TcpStream` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: TcpStream::readable() + /// [`writable()`]: TcpStream::writable() + /// [`ready()`]: TcpStream::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } + /// Receives data on the socket from the remote address to which it is /// connected, without removing that data from the queue. On success, /// returns the number of bytes peeked. diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 301a85cc06b..a47f071d770 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -1170,6 +1170,41 @@ impl UdpSocket { .try_io(Interest::READABLE, || self.io.recv_from(buf)) } + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `UdpSocket` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UdpSocket::readable() + /// [`writable()`]: UdpSocket::writable() + /// [`ready()`]: UdpSocket::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } + /// Receives data from the socket, without removing it from the input queue. /// On success, returns the number of bytes read and the address from whence /// the data came. diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 2d2177803b1..c7c6568f21e 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -1143,6 +1143,41 @@ impl UnixDatagram { Ok((n, SocketAddr(addr))) } + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `UnixDatagram` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UnixDatagram::readable() + /// [`writable()`]: UnixDatagram::writable() + /// [`ready()`]: UnixDatagram::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } + /// Returns the local address that this socket is bound to. /// /// # Examples diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index 4baac606209..39f6ee258f7 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -653,6 +653,41 @@ impl UnixStream { .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `UnixStream` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: UnixStream::readable() + /// [`writable()`]: UnixStream::writable() + /// [`ready()`]: UnixStream::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } + /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. /// /// This function is intended to be used to wrap a UnixStream from the diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index b9f7d49d789..4e7458b58b7 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -723,6 +723,41 @@ impl NamedPipeServer { .registration() .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `NamedPipeServer` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeServer::readable() + /// [`writable()`]: NamedPipeServer::writable() + /// [`ready()`]: NamedPipeServer::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } } impl AsyncRead for NamedPipeServer { @@ -1343,6 +1378,41 @@ impl NamedPipeClient { .registration() .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf)) } + + /// Try to perform IO operation from the socket using a user-provided IO operation. + /// + /// If the socket is ready, the provided closure is called. The + /// closure should attempt to perform IO operation from the socket by manually calling the + /// appropriate syscall. If the operation fails because the socket is not + /// actually ready, then the closure should return a `WouldBlock` error and + /// the readiness flag is cleared. The return value of the closure is + /// then returned by `try_io`. + /// + /// If the socket is not ready, then the closure is not called + /// and a `WouldBlock` error is returned. + /// + /// The closure should only return a `WouldBlock` error if it has performed + /// an IO operation on the socket that failed due to the socket not being + /// ready. Returning a `WouldBlock` error in any other situation will + /// incorrectly clear the readiness flag, which can cause the socket to + /// behave incorrectly. + /// + /// The closure should not perform the read operation using any of the + /// methods defined on the Tokio `NamedPipeClient` type, as this will mess with + /// the readiness flag and can cause the socket to behave incorrectly. + /// + /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. + /// + /// [`readable()`]: NamedPipeClient::readable() + /// [`writable()`]: NamedPipeClient::writable() + /// [`ready()`]: NamedPipeClient::ready() + pub fn try_io( + &self, + interest: Interest, + f: impl FnOnce() -> io::Result, + ) -> io::Result { + self.io.registration().try_io(interest, f) + } } impl AsyncRead for NamedPipeClient { diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 13dfb69739f..9efe3844a39 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -2,16 +2,14 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; -use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::sync::notify::Notify; -use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::ptr::NonNull; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; @@ -57,9 +55,6 @@ pub(crate) struct Spawner { } struct Tasks { - /// Collection of all active tasks spawned onto this executor. - owned: LinkedList>, > as Link>::Target>, - /// Local run queue. /// /// Tasks notified from the current thread are pushed into this queue. @@ -69,23 +64,23 @@ struct Tasks { /// A remote scheduler entry. /// /// These are filled in by remote threads sending instructions to the scheduler. -enum Entry { +enum RemoteMsg { /// A remote thread wants to spawn a task. Schedule(task::Notified>), - /// A remote thread wants a task to be released by the scheduler. We only - /// have access to its header. - Release(NonNull), } // Safety: Used correctly, the task header is "thread safe". Ultimately the task // is owned by the current thread executor, for which this instruction is being // sent. -unsafe impl Send for Entry {} +unsafe impl Send for RemoteMsg {} /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. - queue: Mutex>>, + queue: Mutex>>, + + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks>, /// Unpark the blocked thread. unpark: Box, @@ -125,6 +120,7 @@ impl BasicScheduler

{ let spawner = Spawner { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), + owned: OwnedTasks::new(), unpark: unpark as Box, woken: AtomicBool::new(false), }), @@ -132,7 +128,6 @@ impl BasicScheduler

{ let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { - owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), spawner: spawner.clone(), @@ -227,7 +222,7 @@ impl Inner

{ .borrow_mut() .queue .pop_front() - .map(Entry::Schedule) + .map(RemoteMsg::Schedule) }) } else { context @@ -235,7 +230,7 @@ impl Inner

{ .borrow_mut() .queue .pop_front() - .map(Entry::Schedule) + .map(RemoteMsg::Schedule) .or_else(|| scheduler.spawner.pop()) }; @@ -251,26 +246,7 @@ impl Inner

{ }; match entry { - Entry::Schedule(task) => crate::coop::budget(|| task.run()), - Entry::Release(ptr) => { - // Safety: the task header is only legally provided - // internally in the header, so we know that it is a - // valid (or in particular *allocated*) header that - // is part of the linked list. - unsafe { - let removed = context.tasks.borrow_mut().owned.remove(ptr); - - // TODO: This seems like it should hold, because - // there doesn't seem to be an avenue for anyone - // else to fiddle with the owned tasks - // collection *after* a remote thread has marked - // it as released, and at that point, the only - // location at which it can be removed is here - // or in the Drop implementation of the - // scheduler. - debug_assert!(removed.is_some()); - } - } + RemoteMsg::Schedule(task) => crate::coop::budget(|| task.run()), } } @@ -335,14 +311,7 @@ impl Drop for BasicScheduler

{ }; enter(&mut inner, |scheduler, context| { - // Loop required here to ensure borrow is dropped between iterations - #[allow(clippy::while_let_loop)] - loop { - let task = match context.tasks.borrow_mut().owned.pop_back() { - Some(task) => task, - None => break, - }; - + while let Some(task) = context.shared.owned.pop_back() { task.shutdown(); } @@ -358,13 +327,9 @@ impl Drop for BasicScheduler

{ if let Some(remote_queue) = remote_queue.take() { for entry in remote_queue { match entry { - Entry::Schedule(task) => { + RemoteMsg::Schedule(task) => { task.shutdown(); } - Entry::Release(..) => { - // Do nothing, each entry in the linked list was *just* - // dropped by the scheduler above. - } } } } @@ -375,7 +340,7 @@ impl Drop for BasicScheduler

{ // The assert below is unrelated to this mutex. drop(remote_queue); - assert!(context.tasks.borrow().owned.is_empty()); + assert!(context.shared.owned.is_empty()); }); } } @@ -400,7 +365,7 @@ impl Spawner { handle } - fn pop(&self) -> Option { + fn pop(&self) -> Option { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), None => None, @@ -430,39 +395,14 @@ impl Schedule for Arc { fn bind(task: Task) -> Arc { CURRENT.with(|maybe_cx| { let cx = maybe_cx.expect("scheduler context missing"); - cx.tasks.borrow_mut().owned.push_front(task); + cx.shared.owned.push_front(task); cx.shared.clone() }) } fn release(&self, task: &Task) -> Option> { - CURRENT.with(|maybe_cx| { - let ptr = NonNull::from(task.header()); - - if let Some(cx) = maybe_cx { - // safety: the task is inserted in the list in `bind`. - unsafe { cx.tasks.borrow_mut().owned.remove(ptr) } - } else { - // By sending an `Entry::Release` to the runtime, we ask the - // runtime to remove this task from the linked list in - // `Tasks::owned`. - // - // If the queue is `None`, then the task was already removed - // from that list in the destructor of `BasicScheduler`. We do - // not do anything in this case for the same reason that - // `Entry::Release` messages are ignored in the remote queue - // drain loop of `BasicScheduler`'s destructor. - if let Some(queue) = self.queue.lock().as_mut() { - queue.push_back(Entry::Release(ptr)); - } - - self.unpark.unpark(); - // Returning `None` here prevents the task plumbing from being - // freed. It is then up to the scheduler through the queue we - // just added to, or its Drop impl to free the task. - None - } - }) + // SAFETY: Inserted into the list in bind above. + unsafe { self.owned.remove(task) } } fn schedule(&self, task: task::Notified) { @@ -473,7 +413,7 @@ impl Schedule for Arc { _ => { let mut guard = self.queue.lock(); if let Some(queue) = guard.as_mut() { - queue.push_back(Entry::Schedule(task)); + queue.push_back(RemoteMsg::Schedule(task)); drop(guard); self.unpark.unpark(); } else { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 428c921fe0d..e4624c7b709 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -66,9 +66,6 @@ pub(crate) struct Header { /// Pointer to next task, used with the injection queue pub(crate) queue_next: UnsafeCell>>, - /// Pointer to the next task in the transfer stack - pub(super) stack_next: UnsafeCell>>, - /// Table of function pointers for executing actions on the task. pub(super) vtable: &'static Vtable, @@ -104,7 +101,6 @@ impl Cell { state, owned: UnsafeCell::new(linked_list::Pointers::new()), queue_next: UnsafeCell::new(None), - stack_next: UnsafeCell::new(None), vtable: raw::vtable::(), #[cfg(all(tokio_unstable, feature = "tracing"))] id, @@ -299,13 +295,6 @@ impl CoreStage { cfg_rt_multi_thread! { impl Header { - pub(crate) fn shutdown(&self) { - use crate::runtime::task::RawTask; - - let task = unsafe { RawTask::from_raw(self.into()) }; - task.shutdown(); - } - pub(crate) unsafe fn set_next(&self, next: Option>) { self.queue_next.with_mut(|ptr| *ptr = next); } diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index a3a14083fd6..7f1c4e4cb0c 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -169,6 +169,17 @@ where self.complete(Err(err), true) } + /// Remotely abort the task + /// + /// This is similar to `shutdown` except that it asks the runtime to perform + /// the shutdown. This is necessary to avoid the shutdown happening in the + /// wrong thread for non-Send tasks. + pub(super) fn remote_abort(self) { + if self.header().state.transition_to_notified_and_cancel() { + self.core().scheduler.schedule(Notified(self.to_task())); + } + } + // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index dedfb387949..2fe40a72195 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -192,7 +192,7 @@ impl JoinHandle { /// ``` pub fn abort(&self) { if let Some(raw) = self.raw { - raw.shutdown(); + raw.remote_abort(); } } } diff --git a/tokio/src/runtime/task/list.rs b/tokio/src/runtime/task/list.rs new file mode 100644 index 00000000000..45e22a72af2 --- /dev/null +++ b/tokio/src/runtime/task/list.rs @@ -0,0 +1,33 @@ +use crate::loom::sync::Mutex; +use crate::runtime::task::Task; +use crate::util::linked_list::{Link, LinkedList}; + +pub(crate) struct OwnedTasks { + list: Mutex, as Link>::Target>>, +} + +impl OwnedTasks { + pub(crate) fn new() -> Self { + Self { + list: Mutex::new(LinkedList::new()), + } + } + + pub(crate) fn push_front(&self, task: Task) { + self.list.lock().push_front(task); + } + + pub(crate) fn pop_back(&self) -> Option> { + self.list.lock().pop_back() + } + + /// The caller must ensure that if the provided task is stored in a + /// linked list, then it is in this linked list. + pub(crate) unsafe fn remove(&self, task: &Task) -> Option> { + self.list.lock().remove(task.header().into()) + } + + pub(crate) fn is_empty(&self) -> bool { + self.list.lock().is_empty() + } +} diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 58b8c2a15e8..6b1b8c63886 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -13,6 +13,9 @@ mod join; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::join::JoinHandle; +mod list; +pub(super) use self::list::OwnedTasks; + mod raw; use self::raw::RawTask; @@ -21,11 +24,6 @@ use self::state::State; mod waker; -cfg_rt_multi_thread! { - mod stack; - pub(crate) use self::stack::TransferStack; -} - use crate::future::Future; use crate::util::linked_list; @@ -62,11 +60,10 @@ pub(crate) trait Schedule: Sync + Sized + 'static { fn bind(task: Task) -> Self; /// The task has completed work and is ready to be released. The scheduler - /// is free to drop it whenever. + /// should release it immediately and return it. The task module will batch + /// the ref-dec with setting other options. /// - /// If the scheduler can immediately release the task, it should return - /// it as part of the function. This enables the task module to batch - /// the ref-dec with other options. + /// If the scheduler has already released the task, then None is returned. fn release(&self, task: &Task) -> Option>; /// Schedule the task diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index a9cd4e6f4c7..56d65d5a649 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -22,6 +22,9 @@ pub(super) struct Vtable { /// The join handle has been dropped pub(super) drop_join_handle_slow: unsafe fn(NonNull

), + /// The task is remotely aborted + pub(super) remote_abort: unsafe fn(NonNull
), + /// Scheduler is being shutdown pub(super) shutdown: unsafe fn(NonNull
), } @@ -33,6 +36,7 @@ pub(super) fn vtable() -> &'static Vtable { dealloc: dealloc::, try_read_output: try_read_output::, drop_join_handle_slow: drop_join_handle_slow::, + remote_abort: remote_abort::, shutdown: shutdown::, } } @@ -89,6 +93,11 @@ impl RawTask { let vtable = self.header().vtable; unsafe { (vtable.shutdown)(self.ptr) } } + + pub(super) fn remote_abort(self) { + let vtable = self.header().vtable; + unsafe { (vtable.remote_abort)(self.ptr) } + } } impl Clone for RawTask { @@ -125,6 +134,11 @@ unsafe fn drop_join_handle_slow(ptr: NonNull
) { harness.drop_join_handle_slow() } +unsafe fn remote_abort(ptr: NonNull
) { + let harness = Harness::::from_raw(ptr); + harness.remote_abort() +} + unsafe fn shutdown(ptr: NonNull
) { let harness = Harness::::from_raw(ptr); harness.shutdown() diff --git a/tokio/src/runtime/task/stack.rs b/tokio/src/runtime/task/stack.rs deleted file mode 100644 index 9dd8d3f43f9..00000000000 --- a/tokio/src/runtime/task/stack.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::loom::sync::atomic::AtomicPtr; -use crate::runtime::task::{Header, Task}; - -use std::marker::PhantomData; -use std::ptr::{self, NonNull}; -use std::sync::atomic::Ordering::{Acquire, Relaxed, Release}; - -/// Concurrent stack of tasks, used to pass ownership of a task from one worker -/// to another. -pub(crate) struct TransferStack { - head: AtomicPtr
, - _p: PhantomData, -} - -impl TransferStack { - pub(crate) fn new() -> TransferStack { - TransferStack { - head: AtomicPtr::new(ptr::null_mut()), - _p: PhantomData, - } - } - - pub(crate) fn push(&self, task: Task) { - let task = task.into_raw(); - - // We don't care about any memory associated w/ setting the `head` - // field, just the current value. - // - // The compare-exchange creates a release sequence. - let mut curr = self.head.load(Relaxed); - - loop { - unsafe { - task.as_ref() - .stack_next - .with_mut(|ptr| *ptr = NonNull::new(curr)) - }; - - let res = self - .head - .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed); - - match res { - Ok(_) => return, - Err(actual) => { - curr = actual; - } - } - } - } - - pub(crate) fn drain(&self) -> impl Iterator> { - struct Iter(Option>, PhantomData); - - impl Iterator for Iter { - type Item = Task; - - fn next(&mut self) -> Option> { - let task = self.0?; - - // Move the cursor forward - self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) }; - - // Return the task - unsafe { Some(Task::from_raw(task)) } - } - } - - impl Drop for Iter { - fn drop(&mut self) { - use std::process; - - if self.0.is_some() { - // we have bugs - process::abort(); - } - } - } - - let ptr = self.head.swap(ptr::null_mut(), Acquire); - Iter(NonNull::new(ptr), PhantomData) - } -} diff --git a/tokio/src/runtime/task/state.rs b/tokio/src/runtime/task/state.rs index 1f08d6d8782..6037721623f 100644 --- a/tokio/src/runtime/task/state.rs +++ b/tokio/src/runtime/task/state.rs @@ -180,6 +180,15 @@ impl State { prev.will_need_queueing() } + /// Set the cancelled bit and transition the state to `NOTIFIED`. + /// + /// Returns `true` if the task needs to be submitted to the pool for + /// execution + pub(super) fn transition_to_notified_and_cancel(&self) -> bool { + let prev = Snapshot(self.val.fetch_or(NOTIFIED | CANCELLED, AcqRel)); + prev.will_need_queueing() + } + /// Set the `CANCELLED` bit and attempt to transition to `Running`. /// /// Returns `true` if the transition to `Running` succeeded. diff --git a/tokio/src/runtime/tests/task.rs b/tokio/src/runtime/tests/task.rs index 7c2012523cb..1f3e89d7661 100644 --- a/tokio/src/runtime/tests/task.rs +++ b/tokio/src/runtime/tests/task.rs @@ -1,5 +1,4 @@ -use crate::runtime::task::{self, Schedule, Task}; -use crate::util::linked_list::{Link, LinkedList}; +use crate::runtime::task::{self, OwnedTasks, Schedule, Task}; use crate::util::TryLock; use std::collections::VecDeque; @@ -51,10 +50,9 @@ fn with(f: impl FnOnce(Runtime)) { let _reset = Reset; let rt = Runtime(Arc::new(Inner { - released: task::TransferStack::new(), + owned: OwnedTasks::new(), core: TryLock::new(Core { queue: VecDeque::new(), - tasks: LinkedList::new(), }), })); @@ -66,13 +64,12 @@ fn with(f: impl FnOnce(Runtime)) { struct Runtime(Arc); struct Inner { - released: task::TransferStack, core: TryLock, + owned: OwnedTasks, } struct Core { queue: VecDeque>, - tasks: LinkedList, as Link>::Target>, } static CURRENT: TryLock> = TryLock::new(None); @@ -91,8 +88,6 @@ impl Runtime { task.run(); } - self.0.maintenance(); - n } @@ -107,7 +102,7 @@ impl Runtime { fn shutdown(&self) { let mut core = self.0.core.try_lock().unwrap(); - for task in core.tasks.iter() { + while let Some(task) = self.0.owned.pop_back() { task.shutdown(); } @@ -117,40 +112,20 @@ impl Runtime { drop(core); - while !self.0.core.try_lock().unwrap().tasks.is_empty() { - self.0.maintenance(); - } - } -} - -impl Inner { - fn maintenance(&self) { - use std::mem::ManuallyDrop; - - for task in self.released.drain() { - let task = ManuallyDrop::new(task); - - // safety: see worker.rs - unsafe { - let ptr = task.header().into(); - self.core.try_lock().unwrap().tasks.remove(ptr); - } - } + assert!(self.0.owned.is_empty()); } } impl Schedule for Runtime { fn bind(task: Task) -> Runtime { let rt = CURRENT.try_lock().unwrap().as_ref().unwrap().clone(); - rt.0.core.try_lock().unwrap().tasks.push_front(task); + rt.0.owned.push_front(task); rt } fn release(&self, task: &Task) -> Option> { // safety: copying worker.rs - let task = unsafe { Task::from_raw(task.header().into()) }; - self.0.released.push(task); - None + unsafe { self.0.owned.remove(task) } } fn schedule(&self, task: task::Notified) { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 70cbddbd05e..4ae0f5a2592 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -11,9 +11,9 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::task::OwnedTasks; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; -use crate::util::linked_list::{Link, LinkedList}; use crate::util::FastRand; use std::cell::RefCell; @@ -53,9 +53,6 @@ struct Core { /// True if the scheduler is being shutdown is_shutdown: bool, - /// Tasks owned by the core - tasks: LinkedList::Target>, - /// Parker /// /// Stored in an `Option` as the parker is added / removed to make the @@ -78,6 +75,9 @@ pub(super) struct Shared { /// Coordinates idle workers idle: Idle, + /// Collection of all active tasks spawned onto this executor. + owned: OwnedTasks>, + /// Cores that have observed the shutdown signal /// /// The core is **not** placed back in the worker to avoid it from being @@ -91,10 +91,6 @@ struct Remote { /// Steal tasks from this worker. steal: queue::Steal>, - /// Transfers tasks to be released. Any worker pushes tasks, only the owning - /// worker pops. - pending_drop: task::TransferStack>, - /// Unparks the associated worker thread unpark: Unparker, } @@ -142,22 +138,18 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { run_queue, is_searching: false, is_shutdown: false, - tasks: LinkedList::new(), park: Some(park), rand: FastRand::new(seed()), })); - remotes.push(Remote { - steal, - pending_drop: task::TransferStack::new(), - unpark, - }); + remotes.push(Remote { steal, unpark }); } let shared = Arc::new(Shared { remotes: remotes.into_boxed_slice(), inject: queue::Inject::new(), idle: Idle::new(size), + owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), }); @@ -203,18 +195,20 @@ where CURRENT.with(|maybe_cx| { match (crate::runtime::enter::context(), maybe_cx.is_some()) { (EnterContext::Entered { .. }, true) => { - // We are on a thread pool runtime thread, so we just need to set up blocking. + // We are on a thread pool runtime thread, so we just need to + // set up blocking. had_entered = true; } (EnterContext::Entered { allow_blocking }, false) => { - // We are on an executor, but _not_ on the thread pool. - // That is _only_ okay if we are in a thread pool runtime's block_on method: + // We are on an executor, but _not_ on the thread pool. That is + // _only_ okay if we are in a thread pool runtime's block_on + // method: if allow_blocking { had_entered = true; return; } else { - // This probably means we are on the basic_scheduler or in a LocalSet, - // where it is _not_ okay to block. + // This probably means we are on the basic_scheduler or in a + // LocalSet, where it is _not_ okay to block. panic!("can call blocking only when running on the multi-threaded runtime"); } } @@ -538,42 +532,25 @@ impl Core { true } - /// Runs maintenance work such as free pending tasks and check the pool's - /// state. + /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.drain_pending_drop(worker); - if !self.is_shutdown { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); } } - // Signals all tasks to shut down, and waits for them to complete. Must run - // before we enter the single-threaded phase of shutdown processing. + /// Signals all tasks to shut down, and waits for them to complete. Must run + /// before we enter the single-threaded phase of shutdown processing. fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. - for header in self.tasks.iter() { + while let Some(header) = worker.shared.owned.pop_back() { header.shutdown(); } - - loop { - self.drain_pending_drop(worker); - - if self.tasks.is_empty() { - break; - } - - // Wait until signalled - let park = self.park.as_mut().expect("park missing"); - park.park().expect("park failed"); - } } // Shutdown the core fn shutdown(&mut self) { - assert!(self.tasks.is_empty()); - // Take the core let mut park = self.park.take().expect("park missing"); @@ -582,24 +559,6 @@ impl Core { park.shutdown(); } - - fn drain_pending_drop(&mut self, worker: &Worker) { - use std::mem::ManuallyDrop; - - for task in worker.remote().pending_drop.drain() { - let task = ManuallyDrop::new(task); - - // safety: tasks are only pushed into the `pending_drop` stacks that - // are associated with the list they are inserted into. When a task - // is pushed into `pending_drop`, the ref-inc is skipped, so we must - // not ref-dec here. - // - // See `bind` and `release` implementations. - unsafe { - self.tasks.remove(task.header().into()); - } - } - } } impl Worker { @@ -607,15 +566,6 @@ impl Worker { fn inject(&self) -> &queue::Inject> { &self.shared.inject } - - /// Return a reference to this worker's remote data - fn remote(&self) -> &Remote { - &self.shared.remotes[self.index] - } - - fn eq(&self, other: &Worker) -> bool { - self.shared.ptr_eq(&other.shared) && self.index == other.index - } } impl task::Schedule for Arc { @@ -624,12 +574,7 @@ impl task::Schedule for Arc { let cx = maybe_cx.expect("scheduler context missing"); // Track the task - cx.core - .borrow_mut() - .as_mut() - .expect("scheduler core missing") - .tasks - .push_front(task); + cx.worker.shared.owned.push_front(task); // Return a clone of the worker cx.worker.clone() @@ -637,75 +582,8 @@ impl task::Schedule for Arc { } fn release(&self, task: &Task) -> Option { - use std::ptr::NonNull; - - enum Immediate { - // Task has been synchronously removed from the Core owned by the - // current thread - Removed(Option), - // Task is owned by another thread, so we need to notify it to clean - // up the task later. - MaybeRemote, - } - - let immediate = CURRENT.with(|maybe_cx| { - let cx = match maybe_cx { - Some(cx) => cx, - None => return Immediate::MaybeRemote, - }; - - if !self.eq(&cx.worker) { - // Task owned by another core, so we need to notify it. - return Immediate::MaybeRemote; - } - - let mut maybe_core = cx.core.borrow_mut(); - - if let Some(core) = &mut *maybe_core { - // Directly remove the task - // - // safety: the task is inserted in the list in `bind`. - unsafe { - let ptr = NonNull::from(task.header()); - return Immediate::Removed(core.tasks.remove(ptr)); - } - } - - Immediate::MaybeRemote - }); - - // Checks if we were called from within a worker, allowing for immediate - // removal of a scheduled task. Else we have to go through the slower - // process below where we remotely mark a task as dropped. - match immediate { - Immediate::Removed(task) => return task, - Immediate::MaybeRemote => (), - }; - - // Track the task to be released by the worker that owns it - // - // Safety: We get a new handle without incrementing the ref-count. - // A ref-count is held by the "owned" linked list and it is only - // ever removed from that list as part of the release process: this - // method or popping the task from `pending_drop`. Thus, we can rely - // on the ref-count held by the linked-list to keep the memory - // alive. - // - // When the task is removed from the stack, it is forgotten instead - // of dropped. - let task = unsafe { Task::from_raw(task.header().into()) }; - - self.remote().pending_drop.push(task); - - // The worker core has been handed off to another thread. In the - // event that the scheduler is currently shutting down, the thread - // that owns the task may be waiting on the release to complete - // shutdown. - if self.inject().is_closed() { - self.remote().unpark.unpark(); - } - - None + // SAFETY: Inserted into owned in bind. + unsafe { self.shared.owned.remove(task) } } fn schedule(&self, task: Notified) { @@ -825,6 +703,8 @@ impl Shared { return; } + debug_assert!(self.owned.is_empty()); + for mut core in cores.drain(..) { core.shutdown(); } diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7852b0cb1bf..3d52e4b0a10 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -56,7 +56,7 @@ use crate::sync::notify::Notify; use crate::loom::sync::atomic::AtomicUsize; -use crate::loom::sync::atomic::Ordering::{Relaxed, SeqCst}; +use crate::loom::sync::atomic::Ordering::Relaxed; use crate::loom::sync::{Arc, RwLock, RwLockReadGuard}; use std::ops; @@ -74,7 +74,7 @@ pub struct Receiver { shared: Arc>, /// Last observed version - version: usize, + version: Version, } /// Sends values to the associated [`Receiver`](struct@Receiver). @@ -104,7 +104,7 @@ struct Shared { /// /// The lowest bit represents a "closed" state. The rest of the bits /// represent the current version. - version: AtomicUsize, + state: AtomicState, /// Tracks the number of `Receiver` instances ref_count_rx: AtomicUsize, @@ -152,7 +152,69 @@ pub mod error { impl std::error::Error for RecvError {} } -const CLOSED: usize = 1; +use self::state::{AtomicState, Version}; +mod state { + use crate::loom::sync::atomic::AtomicUsize; + use crate::loom::sync::atomic::Ordering::SeqCst; + + const CLOSED: usize = 1; + + /// The version part of the state. The lowest bit is always zero. + #[derive(Copy, Clone, Debug, Eq, PartialEq)] + pub(super) struct Version(usize); + + /// Snapshot of the state. The first bit is used as the CLOSED bit. + /// The remaining bits are used as the version. + #[derive(Copy, Clone, Debug)] + pub(super) struct StateSnapshot(usize); + + /// The state stored in an atomic integer. + #[derive(Debug)] + pub(super) struct AtomicState(AtomicUsize); + + impl Version { + /// Get the initial version when creating the channel. + pub(super) fn initial() -> Self { + Version(0) + } + } + + impl StateSnapshot { + /// Extract the version from the state. + pub(super) fn version(self) -> Version { + Version(self.0 & !CLOSED) + } + + /// Is the closed bit set? + pub(super) fn is_closed(self) -> bool { + (self.0 & CLOSED) == CLOSED + } + } + + impl AtomicState { + /// Create a new `AtomicState` that is not closed and which has the + /// version set to `Version::initial()`. + pub(super) fn new() -> Self { + AtomicState(AtomicUsize::new(0)) + } + + /// Load the current value of the state. + pub(super) fn load(&self) -> StateSnapshot { + StateSnapshot(self.0.load(SeqCst)) + } + + /// Increment the version counter. + pub(super) fn increment_version(&self) { + // Increment by two to avoid touching the CLOSED bit. + self.0.fetch_add(2, SeqCst); + } + + /// Set the closed bit in the state. + pub(super) fn set_closed(&self) { + self.0.fetch_or(CLOSED, SeqCst); + } + } +} /// Creates a new watch channel, returning the "send" and "receive" handles. /// @@ -184,7 +246,7 @@ const CLOSED: usize = 1; pub fn channel(init: T) -> (Sender, Receiver) { let shared = Arc::new(Shared { value: RwLock::new(init), - version: AtomicUsize::new(0), + state: AtomicState::new(), ref_count_rx: AtomicUsize::new(1), notify_rx: Notify::new(), notify_tx: Notify::new(), @@ -194,13 +256,16 @@ pub fn channel(init: T) -> (Sender, Receiver) { shared: shared.clone(), }; - let rx = Receiver { shared, version: 0 }; + let rx = Receiver { + shared, + version: Version::initial(), + }; (tx, rx) } impl Receiver { - fn from_shared(version: usize, shared: Arc>) -> Self { + fn from_shared(version: Version, shared: Arc>) -> Self { // No synchronization necessary as this is only used as a counter and // not memory access. shared.ref_count_rx.fetch_add(1, Relaxed); @@ -247,7 +312,7 @@ impl Receiver { /// [`changed`]: Receiver::changed pub fn borrow_and_update(&mut self) -> Ref<'_, T> { let inner = self.shared.value.read().unwrap(); - self.version = self.shared.version.load(SeqCst) & !CLOSED; + self.version = self.shared.state.load().version(); Ref { inner } } @@ -315,11 +380,11 @@ impl Receiver { fn maybe_changed( shared: &Shared, - version: &mut usize, + version: &mut Version, ) -> Option> { // Load the version from the state - let state = shared.version.load(SeqCst); - let new_version = state & !CLOSED; + let state = shared.state.load(); + let new_version = state.version(); if *version != new_version { // Observe the new version and return @@ -327,7 +392,7 @@ fn maybe_changed( return Some(Ok(())); } - if CLOSED == state & CLOSED { + if state.is_closed() { // All receivers have dropped. return Some(Err(error::RecvError(()))); } @@ -368,8 +433,7 @@ impl Sender { let mut lock = self.shared.value.write().unwrap(); *lock = value; - // Update the version. 2 is used so that the CLOSED bit is not set. - self.shared.version.fetch_add(2, SeqCst); + self.shared.state.increment_version(); // Release the write lock. // @@ -463,7 +527,7 @@ impl Sender { cfg_signal_internal! { pub(crate) fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - let version = shared.version.load(SeqCst); + let version = shared.state.load().version(); Receiver::from_shared(version, shared) } @@ -494,7 +558,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - self.shared.version.fetch_or(CLOSED, SeqCst); + self.shared.state.set_closed(); self.shared.notify_rx.notify_waiters(); } } diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index ae4c35c9ce8..ea9878736db 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -304,4 +304,9 @@ cfg_rt! { mod builder; pub use builder::Builder; } + + /// Task-related futures. + pub mod futures { + pub use super::task_local::TaskLocalFuture; + } } diff --git a/tokio/src/task/task_local.rs b/tokio/src/task/task_local.rs index 6571ffd7b8b..b55c6abc470 100644 --- a/tokio/src/task/task_local.rs +++ b/tokio/src/task/task_local.rs @@ -115,7 +115,7 @@ impl LocalKey { /// }).await; /// # } /// ``` - pub async fn scope(&'static self, value: T, f: F) -> F::Output + pub fn scope(&'static self, value: T, f: F) -> TaskLocalFuture where F: Future, { @@ -124,7 +124,6 @@ impl LocalKey { slot: Some(value), future: f, } - .await } /// Sets a value `T` as the task-local value for the closure `F`. @@ -206,7 +205,31 @@ impl fmt::Debug for LocalKey { } pin_project! { - struct TaskLocalFuture { + /// A future that sets a value `T` of a task local for the future `F` during + /// its execution. + /// + /// The value of the task-local must be `'static` and will be dropped on the + /// completion of the future. + /// + /// Created by the function [`LocalKey::scope`](self::LocalKey::scope). + /// + /// ### Examples + /// + /// ``` + /// # async fn dox() { + /// tokio::task_local! { + /// static NUMBER: u32; + /// } + /// + /// NUMBER.scope(1, async move { + /// println!("task local value: {}", NUMBER.get()); + /// }).await; + /// # } + /// ``` + pub struct TaskLocalFuture + where + T: 'static + { local: &'static LocalKey, slot: Option, #[pin] @@ -252,10 +275,6 @@ impl Future for TaskLocalFuture { } } -// Required to make `pin_project` happy. -trait StaticLifetime: 'static {} -impl StaticLifetime for T {} - /// An error returned by [`LocalKey::try_with`](method@LocalKey::try_with). #[derive(Clone, Copy, Eq, PartialEq)] pub struct AccessError { diff --git a/tokio/src/util/linked_list.rs b/tokio/src/util/linked_list.rs index a74f56215d9..1eab81c317c 100644 --- a/tokio/src/util/linked_list.rs +++ b/tokio/src/util/linked_list.rs @@ -236,37 +236,6 @@ impl Default for LinkedList { } } -// ===== impl Iter ===== - -cfg_rt_multi_thread! { - pub(crate) struct Iter<'a, T: Link> { - curr: Option>, - _p: core::marker::PhantomData<&'a T>, - } - - impl LinkedList { - pub(crate) fn iter(&self) -> Iter<'_, L> { - Iter { - curr: self.head, - _p: core::marker::PhantomData, - } - } - } - - impl<'a, T: Link> Iterator for Iter<'a, T> { - type Item = &'a T::Target; - - fn next(&mut self) -> Option<&'a T::Target> { - let curr = self.curr?; - // safety: the pointer references data contained by the list - self.curr = unsafe { T::pointers(curr).as_ref() }.get_next(); - - // safety: the value is still owned by the linked list. - Some(unsafe { &*curr.as_ptr() }) - } - } -} - // ===== impl DrainFilter ===== cfg_io_readiness! { @@ -645,24 +614,6 @@ mod tests { } } - #[test] - fn iter() { - let a = entry(5); - let b = entry(7); - - let mut list = LinkedList::<&Entry, <&Entry as Link>::Target>::new(); - - assert_eq!(0, list.iter().count()); - - list.push_front(a.as_ref()); - list.push_front(b.as_ref()); - - let mut i = list.iter(); - assert_eq!(7, i.next().unwrap().val); - assert_eq!(5, i.next().unwrap().val); - assert!(i.next().is_none()); - } - proptest::proptest! { #[test] fn fuzz_linked_list(ops: Vec) { diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index ddeb0ee174f..c524dc287d1 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::thread::sleep; +use std::time::Duration; + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] @@ -62,18 +65,16 @@ fn test_abort_without_panic_3662() { // This runs in a separate thread so it doesn't have immediate // thread-local access to the executor. It does however transition // the underlying task to be completed, which will cause it to be - // dropped (in this thread no less). + // dropped (but not in this thread). assert!(!drop_flag2.load(Ordering::SeqCst)); j.abort(); - // TODO: is this guaranteed at this point? - // assert!(drop_flag2.load(Ordering::SeqCst)); j }) .join() .unwrap(); - assert!(drop_flag.load(Ordering::SeqCst)); let result = task.await; + assert!(drop_flag.load(Ordering::SeqCst)); assert!(result.unwrap_err().is_cancelled()); // Note: We do the following to trigger a deferred task cleanup. @@ -91,3 +92,49 @@ fn test_abort_without_panic_3662() { i.await.unwrap(); }); } + +/// Checks that a suspended LocalSet task can be aborted from a remote thread +/// without panicking and without running the tasks destructor on the wrong thread. +/// +#[test] +fn remote_abort_local_set_3929() { + struct DropCheck { + created_on: std::thread::ThreadId, + not_send: std::marker::PhantomData<*const ()>, + } + + impl DropCheck { + fn new() -> Self { + Self { + created_on: std::thread::current().id(), + not_send: std::marker::PhantomData, + } + } + } + impl Drop for DropCheck { + fn drop(&mut self) { + if std::thread::current().id() != self.created_on { + panic!("non-Send value dropped in another thread!"); + } + } + } + + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .unwrap(); + let local = tokio::task::LocalSet::new(); + + let check = DropCheck::new(); + let jh = local.spawn_local(async move { + futures::future::pending::<()>().await; + drop(check); + }); + + let jh2 = std::thread::spawn(move || { + sleep(Duration::from_millis(50)); + jh.abort(); + }); + + rt.block_on(local); + jh2.join().unwrap(); +}