Skip to content

Commit

Permalink
sync: add owned future for CancellationToken (#5153)
Browse files Browse the repository at this point in the history
  • Loading branch information
NobodyXu committed Nov 21, 2022
1 parent f15d14e commit 304b515
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 1 deletion.
97 changes: 97 additions & 0 deletions tokio-util/src/sync/cancellation_token.rs
Expand Up @@ -66,6 +66,23 @@ pin_project! {
}
}

pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled.
///
/// This is the counterpart to [`WaitForCancellationFuture`] that takes
/// [`CancellationToken`] by value instead of using a reference.
#[must_use = "futures do nothing unless polled"]
pub struct WaitForCancellationFutureOwned {
// Since `future` is the first field, it is dropped before the
// cancellation_token field. This ensures that the reference inside the
// `Notified` remains valid.
#[pin]
future: tokio::sync::futures::Notified<'static>,
cancellation_token: CancellationToken,
}
}

// ===== impl CancellationToken =====

impl core::fmt::Debug for CancellationToken {
Expand Down Expand Up @@ -183,6 +200,21 @@ impl CancellationToken {
}
}

/// Returns a `Future` that gets fulfilled when cancellation is requested.
///
/// The future will complete immediately if the token is already cancelled
/// when this method is called.
///
/// The function takes self by value and returns a future that owns the
/// token.
///
/// # Cancel safety
///
/// This method is cancel safe.
pub fn cancelled_owned(self) -> WaitForCancellationFutureOwned {
WaitForCancellationFutureOwned::new(self)
}

/// Creates a `DropGuard` for this token.
///
/// Returned guard will cancel this token (and all its children) on drop
Expand Down Expand Up @@ -222,3 +254,68 @@ impl<'a> Future for WaitForCancellationFuture<'a> {
}
}
}

// ===== impl WaitForCancellationFutureOwned =====

impl core::fmt::Debug for WaitForCancellationFutureOwned {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("WaitForCancellationFutureOwned").finish()
}
}

impl WaitForCancellationFutureOwned {
fn new(cancellation_token: CancellationToken) -> Self {
WaitForCancellationFutureOwned {
// cancellation_token holds a heap allocation and is guaranteed to have a
// stable deref, thus it would be ok to move the cancellation_token while
// the future holds a reference to it.
//
// # Safety
//
// cancellation_token is dropped after future due to the field ordering.
future: unsafe { Self::new_future(&cancellation_token) },
cancellation_token,
}
}

/// # Safety
/// The returned future must be destroyed before the cancellation token is
/// destroyed.
unsafe fn new_future(
cancellation_token: &CancellationToken,
) -> tokio::sync::futures::Notified<'static> {
let inner_ptr = Arc::as_ptr(&cancellation_token.inner);
// SAFETY: The `Arc::as_ptr` method guarantees that `inner_ptr` remains
// valid until the strong count of the Arc drops to zero, and the caller
// guarantees that they will drop the future before that happens.
(*inner_ptr).notified()
}
}

impl Future for WaitForCancellationFutureOwned {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut this = self.project();

loop {
if this.cancellation_token.is_cancelled() {
return Poll::Ready(());
}

// No wakeups can be lost here because there is always a call to
// `is_cancelled` between the creation of the future and the call to
// `poll`, and the code that sets the cancelled flag does so before
// waking the `Notified`.
if this.future.as_mut().poll(cx).is_pending() {
return Poll::Pending;
}

// # Safety
//
// cancellation_token is dropped after future due to the field ordering.
this.future
.set(unsafe { Self::new_future(this.cancellation_token) });
}
}
}
4 changes: 3 additions & 1 deletion tokio-util/src/sync/mod.rs
@@ -1,7 +1,9 @@
//! Synchronization primitives

mod cancellation_token;
pub use cancellation_token::{guard::DropGuard, CancellationToken, WaitForCancellationFuture};
pub use cancellation_token::{
guard::DropGuard, CancellationToken, WaitForCancellationFuture, WaitForCancellationFutureOwned,
};

mod mpsc;
pub use mpsc::{PollSendError, PollSender};
Expand Down
21 changes: 21 additions & 0 deletions tokio-util/src/sync/tests/loom_cancellation_token.rs
Expand Up @@ -24,6 +24,27 @@ fn cancel_token() {
});
}

#[test]
fn cancel_token_owned() {
loom::model(|| {
let token = CancellationToken::new();
let token1 = token.clone();

let th1 = thread::spawn(move || {
block_on(async {
token1.cancelled_owned().await;
});
});

let th2 = thread::spawn(move || {
token.cancel();
});

assert_ok!(th1.join());
assert_ok!(th2.join());
});
}

#[test]
fn cancel_with_child() {
loom::model(|| {
Expand Down
50 changes: 50 additions & 0 deletions tokio-util/tests/sync_cancellation_token.rs
Expand Up @@ -39,6 +39,56 @@ fn cancel_token() {
);
}

#[test]
fn cancel_token_owned() {
let (waker, wake_counter) = new_count_waker();
let token = CancellationToken::new();
assert!(!token.is_cancelled());

let wait_fut = token.clone().cancelled_owned();
pin!(wait_fut);

assert_eq!(
Poll::Pending,
wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(wake_counter, 0);

let wait_fut_2 = token.clone().cancelled_owned();
pin!(wait_fut_2);

token.cancel();
assert_eq!(wake_counter, 1);
assert!(token.is_cancelled());

assert_eq!(
Poll::Ready(()),
wait_fut.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(
Poll::Ready(()),
wait_fut_2.as_mut().poll(&mut Context::from_waker(&waker))
);
}

#[test]
fn cancel_token_owned_drop_test() {
let (waker, wake_counter) = new_count_waker();
let token = CancellationToken::new();

let future = token.cancelled_owned();
pin!(future);

assert_eq!(
Poll::Pending,
future.as_mut().poll(&mut Context::from_waker(&waker))
);
assert_eq!(wake_counter, 0);

// let future be dropped while pinned and under pending state to
// find potential memory related bugs.
}

#[test]
fn cancel_child_token_through_parent() {
let (waker, wake_counter) = new_count_waker();
Expand Down

0 comments on commit 304b515

Please sign in to comment.