diff --git a/tokio/CHANGELOG.md b/tokio/CHANGELOG.md index a96d27500ed..866f4bff49c 100644 --- a/tokio/CHANGELOG.md +++ b/tokio/CHANGELOG.md @@ -1,3 +1,15 @@ +# 1.8.3 (July 26, 2021) + +This release backports two fixes from 1.9.0 + +### Fixed + + - Fix leak if output of future panics on drop ([#3967]) + - Fix leak in `LocalSet` ([#3978]) + +[#3967]: https://github.com/tokio-rs/tokio/pull/3967 +[#3978]: https://github.com/tokio-rs/tokio/pull/3978 + # 1.8.2 (July 19, 2021) Fixes a missed edge case from 1.8.1. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index bd7937f1b9e..321bb66a701 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -7,12 +7,12 @@ name = "tokio" # - README.md # - Update CHANGELOG.md. # - Create "v1.0.x" git tag. -version = "1.8.2" +version = "1.8.3" edition = "2018" authors = ["Tokio Contributors "] license = "MIT" readme = "README.md" -documentation = "https://docs.rs/tokio/1.8.2/tokio/" +documentation = "https://docs.rs/tokio/1.8.3/tokio/" repository = "https://github.com/tokio-rs/tokio" homepage = "https://tokio.rs" description = """ diff --git a/tokio/src/loom/std/atomic_u64.rs b/tokio/src/loom/std/atomic_u64.rs index a86a195b1d2..7eb457a2405 100644 --- a/tokio/src/loom/std/atomic_u64.rs +++ b/tokio/src/loom/std/atomic_u64.rs @@ -15,8 +15,8 @@ mod imp { #[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))] mod imp { + use crate::loom::sync::Mutex; use std::sync::atomic::Ordering; - use std::sync::Mutex; #[derive(Debug)] pub(crate) struct AtomicU64 { @@ -31,15 +31,15 @@ mod imp { } pub(crate) fn load(&self, _: Ordering) -> u64 { - *self.inner.lock().unwrap() + *self.inner.lock() } pub(crate) fn store(&self, val: u64, _: Ordering) { - *self.inner.lock().unwrap() = val; + *self.inner.lock() = val; } pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 { - let mut lock = self.inner.lock().unwrap(); + let mut lock = self.inner.lock(); let prev = *lock; *lock = prev | val; prev @@ -52,7 +52,7 @@ mod imp { _success: Ordering, _failure: Ordering, ) -> Result { - let mut lock = self.inner.lock().unwrap(); + let mut lock = self.inner.lock(); if *lock == current { *lock = new; diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs deleted file mode 100644 index 486d4fa5bbe..00000000000 --- a/tokio/src/runtime/shell.rs +++ /dev/null @@ -1,132 +0,0 @@ -#![allow(clippy::redundant_clone)] - -use crate::future::poll_fn; -use crate::park::{Park, Unpark}; -use crate::runtime::driver::Driver; -use crate::sync::Notify; -use crate::util::{waker_ref, Wake}; - -use std::sync::{Arc, Mutex}; -use std::task::Context; -use std::task::Poll::{Pending, Ready}; -use std::{future::Future, sync::PoisonError}; - -#[derive(Debug)] -pub(super) struct Shell { - driver: Mutex>, - - notify: Notify, - - /// TODO: don't store this - unpark: Arc, -} - -#[derive(Debug)] -struct Handle(::Unpark); - -impl Shell { - pub(super) fn new(driver: Driver) -> Shell { - let unpark = Arc::new(Handle(driver.unpark())); - - Shell { - driver: Mutex::new(Some(driver)), - notify: Notify::new(), - unpark, - } - } - - pub(super) fn block_on(&self, f: F) -> F::Output - where - F: Future, - { - let mut enter = crate::runtime::enter(true); - - pin!(f); - - loop { - if let Some(driver) = &mut self.take_driver() { - return driver.block_on(f); - } else { - let notified = self.notify.notified(); - pin!(notified); - - if let Some(out) = enter - .block_on(poll_fn(|cx| { - if notified.as_mut().poll(cx).is_ready() { - return Ready(None); - } - - if let Ready(out) = f.as_mut().poll(cx) { - return Ready(Some(out)); - } - - Pending - })) - .expect("Failed to `Enter::block_on`") - { - return out; - } - } - } - } - - fn take_driver(&self) -> Option> { - let mut lock = self.driver.lock().unwrap(); - let driver = lock.take()?; - - Some(DriverGuard { - inner: Some(driver), - shell: &self, - }) - } -} - -impl Wake for Handle { - /// Wake by value - fn wake(self: Arc) { - Wake::wake_by_ref(&self); - } - - /// Wake by reference - fn wake_by_ref(arc_self: &Arc) { - arc_self.0.unpark(); - } -} - -struct DriverGuard<'a> { - inner: Option, - shell: &'a Shell, -} - -impl DriverGuard<'_> { - fn block_on(&mut self, f: F) -> F::Output { - let driver = self.inner.as_mut().unwrap(); - - pin!(f); - - let waker = waker_ref(&self.shell.unpark); - let mut cx = Context::from_waker(&waker); - - loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { - return v; - } - - driver.park().unwrap(); - } - } -} - -impl Drop for DriverGuard<'_> { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - self.shell - .driver - .lock() - .unwrap_or_else(PoisonError::into_inner) - .replace(inner); - - self.shell.notify.notify_one(); - } - } -} diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 9f0b1071130..8cd649dc7f5 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -112,6 +112,8 @@ where } pub(super) fn drop_join_handle_slow(self) { + let mut maybe_panic = None; + // Try to unset `JOIN_INTEREST`. This must be done as a first step in // case the task concurrently completed. if self.header().state.unset_join_interested().is_err() { @@ -120,11 +122,20 @@ where // the scheduler or `JoinHandle`. i.e. if the output remains in the // task structure until the task is deallocated, it may be dropped // by a Waker on any arbitrary thread. - self.core().stage.drop_future_or_output(); + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + self.core().stage.drop_future_or_output(); + })); + if let Err(panic) = panic { + maybe_panic = Some(panic); + } } // Drop the `JoinHandle` reference, possibly deallocating the task self.drop_reference(); + + if let Some(panic) = maybe_panic { + panic::resume_unwind(panic); + } } // ===== waker behavior ===== @@ -183,17 +194,25 @@ where // ====== internal ====== fn complete(self, output: super::Result, is_join_interested: bool) { - if is_join_interested { - // Store the output. The future has already been dropped - // - // Safety: Mutual exclusion is obtained by having transitioned the task - // state -> Running - let stage = &self.core().stage; - stage.store_output(output); - - // Transition to `Complete`, notifying the `JoinHandle` if necessary. - transition_to_complete(self.header(), stage, &self.trailer()); - } + // We catch panics here because dropping the output may panic. + // + // Dropping the output can also happen in the first branch inside + // transition_to_complete. + let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| { + if is_join_interested { + // Store the output. The future has already been dropped + // + // Safety: Mutual exclusion is obtained by having transitioned the task + // state -> Running + let stage = &self.core().stage; + stage.store_output(output); + + // Transition to `Complete`, notifying the `JoinHandle` if necessary. + transition_to_complete(self.header(), stage, &self.trailer()); + } else { + drop(output); + } + })); // The task has completed execution and will no longer be scheduled. // diff --git a/tokio/src/runtime/tests/loom_local.rs b/tokio/src/runtime/tests/loom_local.rs new file mode 100644 index 00000000000..d9a07a45f05 --- /dev/null +++ b/tokio/src/runtime/tests/loom_local.rs @@ -0,0 +1,47 @@ +use crate::runtime::tests::loom_oneshot as oneshot; +use crate::runtime::Builder; +use crate::task::LocalSet; + +use std::task::Poll; + +/// Waking a runtime will attempt to push a task into a queue of notifications +/// in the runtime, however the tasks in such a queue usually have a reference +/// to the runtime itself. This means that if they are not properly removed at +/// runtime shutdown, this will cause a memory leak. +/// +/// This test verifies that waking something during shutdown of a LocalSet does +/// not result in tasks lingering in the queue once shutdown is complete. This +/// is verified using loom's leak finder. +#[test] +fn wake_during_shutdown() { + loom::model(|| { + let rt = Builder::new_current_thread().build().unwrap(); + let ls = LocalSet::new(); + + let (send, recv) = oneshot::channel(); + + ls.spawn_local(async move { + let mut send = Some(send); + + let () = futures::future::poll_fn(|cx| { + if let Some(send) = send.take() { + send.send(cx.waker().clone()); + } + + Poll::Pending + }) + .await; + }); + + let handle = loom::thread::spawn(move || { + let waker = recv.recv(); + waker.wake(); + }); + + ls.block_on(&rt, crate::task::yield_now()); + + drop(ls); + handle.join().unwrap(); + drop(rt); + }); +} diff --git a/tokio/src/runtime/tests/loom_oneshot.rs b/tokio/src/runtime/tests/loom_oneshot.rs index c126fe479af..87eb6386425 100644 --- a/tokio/src/runtime/tests/loom_oneshot.rs +++ b/tokio/src/runtime/tests/loom_oneshot.rs @@ -1,7 +1,6 @@ +use crate::loom::sync::{Arc, Mutex}; use loom::sync::Notify; -use std::sync::{Arc, Mutex}; - pub(crate) fn channel() -> (Sender, Receiver) { let inner = Arc::new(Inner { notify: Notify::new(), @@ -31,7 +30,7 @@ struct Inner { impl Sender { pub(crate) fn send(self, value: T) { - *self.inner.value.lock().unwrap() = Some(value); + *self.inner.value.lock() = Some(value); self.inner.notify.notify(); } } @@ -39,7 +38,7 @@ impl Sender { impl Receiver { pub(crate) fn recv(self) -> T { loop { - if let Some(v) = self.inner.value.lock().unwrap().take() { + if let Some(v) = self.inner.value.lock().take() { return v; } diff --git a/tokio/src/runtime/tests/mod.rs b/tokio/src/runtime/tests/mod.rs index 596e47dfd00..3f2cc9825e8 100644 --- a/tokio/src/runtime/tests/mod.rs +++ b/tokio/src/runtime/tests/mod.rs @@ -21,6 +21,7 @@ mod joinable_wrapper { cfg_loom! { mod loom_basic_scheduler; + mod loom_local; mod loom_blocking; mod loom_oneshot; mod loom_pool; @@ -31,6 +32,9 @@ cfg_loom! { cfg_not_loom! { mod queue; + #[cfg(not(miri))] + mod task_combinations; + #[cfg(miri)] mod task; } diff --git a/tokio/src/runtime/tests/task_combinations.rs b/tokio/src/runtime/tests/task_combinations.rs new file mode 100644 index 00000000000..76ce2330c2c --- /dev/null +++ b/tokio/src/runtime/tests/task_combinations.rs @@ -0,0 +1,380 @@ +use std::future::Future; +use std::panic; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::runtime::Builder; +use crate::sync::oneshot; +use crate::task::JoinHandle; + +use futures::future::FutureExt; + +// Enums for each option in the combinations being tested + +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiRuntime { + CurrentThread, + Multi1, + Multi2, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiLocalSet { + Yes, + No, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiTask { + PanicOnRun, + PanicOnDrop, + PanicOnRunAndDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiOutput { + PanicOnDrop, + NoPanic, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinInterest { + Polled, + NotPolled, +} +#[allow(clippy::enum_variant_names)] // we aren't using glob imports +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiJoinHandle { + DropImmediately = 1, + DropFirstPoll = 2, + DropAfterNoConsume = 3, + DropAfterConsume = 4, +} +#[derive(Copy, Clone, Debug, PartialEq)] +enum CombiAbort { + NotAborted = 0, + AbortedImmediately = 1, + AbortedFirstPoll = 2, + AbortedAfterFinish = 3, + AbortedAfterConsumeOutput = 4, +} + +#[test] +fn test_combinations() { + let mut rt = &[ + CombiRuntime::CurrentThread, + CombiRuntime::Multi1, + CombiRuntime::Multi2, + ][..]; + + if cfg!(miri) { + rt = &[CombiRuntime::CurrentThread]; + } + + let ls = [CombiLocalSet::Yes, CombiLocalSet::No]; + let task = [ + CombiTask::NoPanic, + CombiTask::PanicOnRun, + CombiTask::PanicOnDrop, + CombiTask::PanicOnRunAndDrop, + ]; + let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop]; + let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled]; + let jh = [ + CombiJoinHandle::DropImmediately, + CombiJoinHandle::DropFirstPoll, + CombiJoinHandle::DropAfterNoConsume, + CombiJoinHandle::DropAfterConsume, + ]; + let abort = [ + CombiAbort::NotAborted, + CombiAbort::AbortedImmediately, + CombiAbort::AbortedFirstPoll, + CombiAbort::AbortedAfterFinish, + CombiAbort::AbortedAfterConsumeOutput, + ]; + + for rt in rt.iter().copied() { + for ls in ls.iter().copied() { + for task in task.iter().copied() { + for output in output.iter().copied() { + for ji in ji.iter().copied() { + for jh in jh.iter().copied() { + for abort in abort.iter().copied() { + test_combination(rt, ls, task, output, ji, jh, abort); + } + } + } + } + } + } + } +} + +fn test_combination( + rt: CombiRuntime, + ls: CombiLocalSet, + task: CombiTask, + output: CombiOutput, + ji: CombiJoinInterest, + jh: CombiJoinHandle, + abort: CombiAbort, +) { + if (jh as usize) < (abort as usize) { + // drop before abort not possible + return; + } + if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) { + // this causes double panic + return; + } + if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) { + // this causes double panic + return; + } + + println!("Runtime {:?}, LocalSet {:?}, Task {:?}, Output {:?}, JoinInterest {:?}, JoinHandle {:?}, Abort {:?}", rt, ls, task, output, ji, jh, abort); + + // A runtime optionally with a LocalSet + struct Rt { + rt: crate::runtime::Runtime, + ls: Option, + } + impl Rt { + fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self { + let rt = match rt { + CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(), + CombiRuntime::Multi1 => Builder::new_multi_thread() + .worker_threads(1) + .build() + .unwrap(), + CombiRuntime::Multi2 => Builder::new_multi_thread() + .worker_threads(2) + .build() + .unwrap(), + }; + + let ls = match ls { + CombiLocalSet::Yes => Some(crate::task::LocalSet::new()), + CombiLocalSet::No => None, + }; + + Self { rt, ls } + } + fn block_on(&self, task: T) -> T::Output + where + T: Future, + { + match &self.ls { + Some(ls) => ls.block_on(&self.rt, task), + None => self.rt.block_on(task), + } + } + fn spawn(&self, task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + match &self.ls { + Some(ls) => ls.spawn_local(task), + None => self.rt.spawn(task), + } + } + } + + // The type used for the output of the future + struct Output { + panic_on_drop: bool, + on_drop: Option>, + } + impl Output { + fn disarm(&mut self) { + self.panic_on_drop = false; + } + } + impl Drop for Output { + fn drop(&mut self) { + let _ = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking in Output"); + } + } + } + + // A wrapper around the future that is spawned + struct FutWrapper { + inner: F, + on_drop: Option>, + panic_on_drop: bool, + } + impl Future for FutWrapper { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { + let me = Pin::into_inner_unchecked(self); + let inner = Pin::new_unchecked(&mut me.inner); + inner.poll(cx) + } + } + } + impl Drop for FutWrapper { + fn drop(&mut self) { + let _: Result<(), ()> = self.on_drop.take().unwrap().send(()); + if self.panic_on_drop { + panic!("Panicking in FutWrapper"); + } + } + } + + // The channels passed to the task + struct Signals { + on_first_poll: Option>, + wait_complete: Option>, + on_output_drop: Option>, + } + + // The task we will spawn + async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output { + // Signal that we have been polled once + let _ = signal.on_first_poll.take().unwrap().send(()); + + // Wait for a signal, then complete the future + let _ = signal.wait_complete.take().unwrap().await; + + // If the task gets past wait_complete without yielding, then aborts + // may not be caught without this yield_now. + crate::task::yield_now().await; + + if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop { + panic!("Panicking in my_task on {:?}", std::thread::current().id()); + } + + Output { + panic_on_drop: out == CombiOutput::PanicOnDrop, + on_drop: signal.on_output_drop.take(), + } + } + + let rt = Rt::new(rt, ls); + + let (on_first_poll, wait_first_poll) = oneshot::channel(); + let (on_complete, wait_complete) = oneshot::channel(); + let (on_future_drop, wait_future_drop) = oneshot::channel(); + let (on_output_drop, wait_output_drop) = oneshot::channel(); + let signal = Signals { + on_first_poll: Some(on_first_poll), + wait_complete: Some(wait_complete), + on_output_drop: Some(on_output_drop), + }; + + // === Spawn task === + let mut handle = Some(rt.spawn(FutWrapper { + inner: my_task(signal, task, output), + on_drop: Some(on_future_drop), + panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop, + })); + + // Keep track of whether the task has been killed with an abort + let mut aborted = false; + + // If we want to poll the JoinHandle, do it now + if ji == CombiJoinInterest::Polled { + assert!( + handle.as_mut().unwrap().now_or_never().is_none(), + "Polling handle succeeded" + ); + } + + if abort == CombiAbort::AbortedImmediately { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropImmediately { + drop(handle.take().unwrap()); + } + + // === Wait for first poll === + let got_polled = rt.block_on(wait_first_poll).is_ok(); + if !got_polled { + // it's possible that we are aborted but still got polled + assert!( + aborted, + "Task completed without ever being polled but was not aborted." + ); + } + + if abort == CombiAbort::AbortedFirstPoll { + handle.as_mut().unwrap().abort(); + aborted = true; + } + if jh == CombiJoinHandle::DropFirstPoll { + drop(handle.take().unwrap()); + } + + // Signal the future that it can return now + let _ = on_complete.send(()); + // === Wait for future to be dropped === + assert!( + rt.block_on(wait_future_drop).is_ok(), + "The future should always be dropped." + ); + + if abort == CombiAbort::AbortedAfterFinish { + // Don't set aborted to true here as the task already finished + handle.as_mut().unwrap().abort(); + } + if jh == CombiJoinHandle::DropAfterNoConsume { + // The runtime will usually have dropped every ref-count at this point, + // in which case dropping the JoinHandle drops the output. + // + // (But it might race and still hold a ref-count) + let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| { + drop(handle.take().unwrap()); + })); + if panic.is_err() { + assert!( + (output == CombiOutput::PanicOnDrop) + && (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) + && !aborted, + "Dropping JoinHandle shouldn't panic here" + ); + } + } + + // Check whether we drop after consuming the output + if jh == CombiJoinHandle::DropAfterConsume { + // Using as_mut() to not immediately drop the handle + let result = rt.block_on(handle.as_mut().unwrap()); + + match result { + Ok(mut output) => { + // Don't panic here. + output.disarm(); + assert!(!aborted, "Task was aborted but returned output"); + } + Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"), + Err(err) if err.is_panic() => { + assert!( + (task == CombiTask::PanicOnRun) + || (task == CombiTask::PanicOnDrop) + || (task == CombiTask::PanicOnRunAndDrop) + || (output == CombiOutput::PanicOnDrop), + "Panic but nothing should panic" + ); + } + _ => unreachable!(), + } + + let handle = handle.take().unwrap(); + if abort == CombiAbort::AbortedAfterConsumeOutput { + handle.abort(); + } + drop(handle); + } + + // The output should have been dropped now. Check whether the output + // object was created at all. + let output_created = rt.block_on(wait_output_drop).is_ok(); + assert_eq!( + output_created, + (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted, + "Creation of output object" + ); +} diff --git a/tokio/src/sync/barrier.rs b/tokio/src/sync/barrier.rs index e3c95f6a6b6..0e39dac8bb5 100644 --- a/tokio/src/sync/barrier.rs +++ b/tokio/src/sync/barrier.rs @@ -1,7 +1,6 @@ +use crate::loom::sync::Mutex; use crate::sync::watch; -use std::sync::Mutex; - /// A barrier enables multiple tasks to synchronize the beginning of some computation. /// /// ``` @@ -94,7 +93,7 @@ impl Barrier { // NOTE: the extra scope here is so that the compiler doesn't think `state` is held across // a yield point, and thus marks the returned future as !Send. let generation = { - let mut state = self.state.lock().unwrap(); + let mut state = self.state.lock(); let generation = state.generation; state.arrived += 1; if state.arrived == self.n { diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 49b0ec6c4d4..37c2c508ad3 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -1,4 +1,5 @@ //! Runs `!Send` futures on the current thread. +use crate::loom::sync::{Arc, Mutex}; use crate::runtime::task::{self, JoinHandle, Task}; use crate::sync::AtomicWaker; use crate::util::linked_list::{Link, LinkedList}; @@ -9,7 +10,6 @@ use std::fmt; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; -use std::sync::{Arc, Mutex}; use std::task::Poll; use pin_project_lite::pin_project; @@ -242,7 +242,7 @@ struct Tasks { /// LocalSet state shared between threads. struct Shared { /// Remote run queue sender - queue: Mutex>>>, + queue: Mutex>>>>, /// Wake the `LocalSet` task waker: AtomicWaker, @@ -338,7 +338,7 @@ impl LocalSet { queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), waker: AtomicWaker::new(), }), }, @@ -538,8 +538,8 @@ impl LocalSet { .shared .queue .lock() - .unwrap() - .pop_front() + .as_mut() + .and_then(|queue| queue.pop_front()) .or_else(|| self.context.tasks.borrow_mut().queue.pop_front()) } else { self.context @@ -547,7 +547,14 @@ impl LocalSet { .borrow_mut() .queue .pop_front() - .or_else(|| self.context.shared.queue.lock().unwrap().pop_front()) + .or_else(|| { + self.context + .shared + .queue + .lock() + .as_mut() + .and_then(|queue| queue.pop_front()) + }) } } @@ -611,7 +618,10 @@ impl Drop for LocalSet { task.shutdown(); } - for task in self.context.shared.queue.lock().unwrap().drain(..) { + // Take the queue from the Shared object to prevent pushing + // notifications to it in the future. + let queue = self.context.shared.queue.lock().take().unwrap(); + for task in queue { task.shutdown(); } @@ -661,8 +671,16 @@ impl Shared { cx.tasks.borrow_mut().queue.push_back(task); } _ => { - self.queue.lock().unwrap().push_back(task); - self.waker.wake(); + // First check whether the queue is still there (if not, the + // LocalSet is dropped). Then push to it if so, and if not, + // do nothing. + let mut lock = self.queue.lock(); + + if let Some(queue) = lock.as_mut() { + queue.push_back(task); + drop(lock); + self.waker.wake(); + } } }); } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index b9ec5c5aab3..a44d75f3ce1 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -29,7 +29,7 @@ cfg_not_test_util! { cfg_test_util! { use crate::time::{Duration, Instant}; - use std::sync::{Arc, Mutex}; + use crate::loom::sync::{Arc, Mutex}; cfg_rt! { fn clock() -> Option { @@ -102,7 +102,7 @@ cfg_test_util! { /// runtime. pub fn resume() { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let mut inner = clock.inner.lock().unwrap(); + let mut inner = clock.inner.lock(); if inner.unfrozen.is_some() { panic!("time is not frozen"); @@ -164,7 +164,7 @@ cfg_test_util! { } pub(crate) fn pause(&self) { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock(); if !inner.enable_pausing { drop(inner); // avoid poisoning the lock @@ -178,12 +178,12 @@ cfg_test_util! { } pub(crate) fn is_paused(&self) -> bool { - let inner = self.inner.lock().unwrap(); + let inner = self.inner.lock(); inner.unfrozen.is_none() } pub(crate) fn advance(&self, duration: Duration) { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.lock(); if inner.unfrozen.is_some() { panic!("time is not frozen"); @@ -193,7 +193,7 @@ cfg_test_util! { } pub(crate) fn now(&self) -> Instant { - let inner = self.inner.lock().unwrap(); + let inner = self.inner.lock(); let mut ret = inner.base; diff --git a/tokio/tests/task_abort.rs b/tokio/tests/task_abort.rs index 8f621683faa..cdaa405b86a 100644 --- a/tokio/tests/task_abort.rs +++ b/tokio/tests/task_abort.rs @@ -5,11 +5,21 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; +use tokio::runtime::Builder; + +struct PanicOnDrop; + +impl Drop for PanicOnDrop { + fn drop(&mut self) { + panic!("Well what did you expect would happen..."); + } +} + /// Checks that a suspended task can be aborted without panicking as reported in /// issue #3157: . #[test] fn test_abort_without_panic_3157() { - let rt = tokio::runtime::Builder::new_multi_thread() + let rt = Builder::new_multi_thread() .enable_time() .worker_threads(1) .build() @@ -45,9 +55,7 @@ fn test_abort_without_panic_3662() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); rt.block_on(async move { let drop_flag = Arc::new(AtomicBool::new(false)); @@ -120,9 +128,7 @@ fn remote_abort_local_set_3929() { } } - let rt = tokio::runtime::Builder::new_current_thread() - .build() - .unwrap(); + let rt = Builder::new_current_thread().build().unwrap(); let local = tokio::task::LocalSet::new(); let check = DropCheck::new(); @@ -144,10 +150,7 @@ fn remote_abort_local_set_3929() { /// issue #3964: . #[test] fn test_abort_wakes_task_3964() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let notify_dropped = Arc::new(()); @@ -174,22 +177,11 @@ fn test_abort_wakes_task_3964() { }); } -struct PanicOnDrop; - -impl Drop for PanicOnDrop { - fn drop(&mut self) { - panic!("Well what did you expect would happen..."); - } -} - /// Checks that aborting a task whose destructor panics does not allow the /// panic to escape the task. #[test] fn test_abort_task_that_panics_on_drop_contained() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move { @@ -213,10 +205,7 @@ fn test_abort_task_that_panics_on_drop_contained() { /// Checks that aborting a task whose destructor panics has the expected result. #[test] fn test_abort_task_that_panics_on_drop_returned() { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap(); + let rt = Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { let handle = tokio::spawn(async move {