Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare tokio 1.8.3 #3983

Merged
merged 4 commits into from Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions tokio/CHANGELOG.md
@@ -1,3 +1,15 @@
# 1.8.3 (July 26, 2021)

This release backports two fixes from 1.9.0

### Fixed

- Fix leak if output of future panics on drop ([#3967])
- Fix leak in `LocalSet` ([#3978])

[#3967]: https://github.com/tokio-rs/tokio/pull/3967
[#3978]: https://github.com/tokio-rs/tokio/pull/3978

# 1.8.2 (July 19, 2021)

Fixes a missed edge case from 1.8.1.
Expand Down
4 changes: 2 additions & 2 deletions tokio/Cargo.toml
Expand Up @@ -7,12 +7,12 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.0.x" git tag.
version = "1.8.2"
version = "1.8.3"
edition = "2018"
authors = ["Tokio Contributors <team@tokio.rs>"]
license = "MIT"
readme = "README.md"
documentation = "https://docs.rs/tokio/1.8.2/tokio/"
documentation = "https://docs.rs/tokio/1.8.3/tokio/"
repository = "https://github.com/tokio-rs/tokio"
homepage = "https://tokio.rs"
description = """
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/loom/std/atomic_u64.rs
Expand Up @@ -15,8 +15,8 @@ mod imp {

#[cfg(any(target_arch = "arm", target_arch = "mips", target_arch = "powerpc"))]
mod imp {
use crate::loom::sync::Mutex;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

#[derive(Debug)]
pub(crate) struct AtomicU64 {
Expand All @@ -31,15 +31,15 @@ mod imp {
}

pub(crate) fn load(&self, _: Ordering) -> u64 {
*self.inner.lock().unwrap()
*self.inner.lock()
}

pub(crate) fn store(&self, val: u64, _: Ordering) {
*self.inner.lock().unwrap() = val;
*self.inner.lock() = val;
}

pub(crate) fn fetch_or(&self, val: u64, _: Ordering) -> u64 {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();
let prev = *lock;
*lock = prev | val;
prev
Expand All @@ -52,7 +52,7 @@ mod imp {
_success: Ordering,
_failure: Ordering,
) -> Result<u64, u64> {
let mut lock = self.inner.lock().unwrap();
let mut lock = self.inner.lock();

if *lock == current {
*lock = new;
Expand Down
132 changes: 0 additions & 132 deletions tokio/src/runtime/shell.rs

This file was deleted.

43 changes: 31 additions & 12 deletions tokio/src/runtime/task/harness.rs
Expand Up @@ -112,6 +112,8 @@ where
}

pub(super) fn drop_join_handle_slow(self) {
let mut maybe_panic = None;

// Try to unset `JOIN_INTEREST`. This must be done as a first step in
// case the task concurrently completed.
if self.header().state.unset_join_interested().is_err() {
Expand All @@ -120,11 +122,20 @@ where
// the scheduler or `JoinHandle`. i.e. if the output remains in the
// task structure until the task is deallocated, it may be dropped
// by a Waker on any arbitrary thread.
self.core().stage.drop_future_or_output();
let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
self.core().stage.drop_future_or_output();
}));
if let Err(panic) = panic {
maybe_panic = Some(panic);
}
}

// Drop the `JoinHandle` reference, possibly deallocating the task
self.drop_reference();

if let Some(panic) = maybe_panic {
panic::resume_unwind(panic);
}
}

// ===== waker behavior =====
Expand Down Expand Up @@ -183,17 +194,25 @@ where
// ====== internal ======

fn complete(self, output: super::Result<T::Output>, is_join_interested: bool) {
if is_join_interested {
// Store the output. The future has already been dropped
//
// Safety: Mutual exclusion is obtained by having transitioned the task
// state -> Running
let stage = &self.core().stage;
stage.store_output(output);

// Transition to `Complete`, notifying the `JoinHandle` if necessary.
transition_to_complete(self.header(), stage, &self.trailer());
}
// We catch panics here because dropping the output may panic.
//
// Dropping the output can also happen in the first branch inside
// transition_to_complete.
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
if is_join_interested {
// Store the output. The future has already been dropped
//
// Safety: Mutual exclusion is obtained by having transitioned the task
// state -> Running
let stage = &self.core().stage;
stage.store_output(output);

// Transition to `Complete`, notifying the `JoinHandle` if necessary.
transition_to_complete(self.header(), stage, &self.trailer());
} else {
drop(output);
}
}));

// The task has completed execution and will no longer be scheduled.
//
Expand Down
47 changes: 47 additions & 0 deletions tokio/src/runtime/tests/loom_local.rs
@@ -0,0 +1,47 @@
use crate::runtime::tests::loom_oneshot as oneshot;
use crate::runtime::Builder;
use crate::task::LocalSet;

use std::task::Poll;

/// Waking a runtime will attempt to push a task into a queue of notifications
/// in the runtime, however the tasks in such a queue usually have a reference
/// to the runtime itself. This means that if they are not properly removed at
/// runtime shutdown, this will cause a memory leak.
///
/// This test verifies that waking something during shutdown of a LocalSet does
/// not result in tasks lingering in the queue once shutdown is complete. This
/// is verified using loom's leak finder.
#[test]
fn wake_during_shutdown() {
loom::model(|| {
let rt = Builder::new_current_thread().build().unwrap();
let ls = LocalSet::new();

let (send, recv) = oneshot::channel();

ls.spawn_local(async move {
let mut send = Some(send);

let () = futures::future::poll_fn(|cx| {
if let Some(send) = send.take() {
send.send(cx.waker().clone());
}

Poll::Pending
})
.await;
});

let handle = loom::thread::spawn(move || {
let waker = recv.recv();
waker.wake();
});

ls.block_on(&rt, crate::task::yield_now());

drop(ls);
handle.join().unwrap();
drop(rt);
});
}
7 changes: 3 additions & 4 deletions tokio/src/runtime/tests/loom_oneshot.rs
@@ -1,7 +1,6 @@
use crate::loom::sync::{Arc, Mutex};
use loom::sync::Notify;

use std::sync::{Arc, Mutex};

pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner {
notify: Notify::new(),
Expand Down Expand Up @@ -31,15 +30,15 @@ struct Inner<T> {

impl<T> Sender<T> {
pub(crate) fn send(self, value: T) {
*self.inner.value.lock().unwrap() = Some(value);
*self.inner.value.lock() = Some(value);
self.inner.notify.notify();
}
}

impl<T> Receiver<T> {
pub(crate) fn recv(self) -> T {
loop {
if let Some(v) = self.inner.value.lock().unwrap().take() {
if let Some(v) = self.inner.value.lock().take() {
return v;
}

Expand Down
4 changes: 4 additions & 0 deletions tokio/src/runtime/tests/mod.rs
Expand Up @@ -21,6 +21,7 @@ mod joinable_wrapper {

cfg_loom! {
mod loom_basic_scheduler;
mod loom_local;
mod loom_blocking;
mod loom_oneshot;
mod loom_pool;
Expand All @@ -31,6 +32,9 @@ cfg_loom! {
cfg_not_loom! {
mod queue;

#[cfg(not(miri))]
mod task_combinations;

#[cfg(miri)]
mod task;
}