-
Notifications
You must be signed in to change notification settings - Fork 203
/
stop_token.rs
79 lines (66 loc) 路 2.22 KB
/
stop_token.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//! A stop token used to stop a listener.
use std::{future::Future, pin::Pin, task};
use futures::future::{pending, AbortHandle, Abortable, Pending};
/// A stop token allows you to stop a listener.
///
/// See also: [`UpdateListener::stop_token`].
///
/// [`UpdateListener::stop_token`]:
/// crate::dispatching::update_listeners::UpdateListener::stop_token
pub trait StopToken {
/// Stop the listener linked to this token.
fn stop(self);
}
/// A stop token which does nothing. May be used in prototyping or in cases
/// where you do not care about graceful shutdowning.
pub struct Noop;
impl StopToken for Noop {
fn stop(self) {}
}
/// A stop token which corresponds to [`AsyncStopFlag`].
#[derive(Clone)]
pub struct AsyncStopToken(AbortHandle);
/// A flag which corresponds to [`AsyncStopToken`].
///
/// To know if the stop token was used you can either repeatedly call
/// [`is_stopped`] or use this type as a `Future`.
///
/// [`is_stopped`]: AsyncStopFlag::is_stopped
#[pin_project::pin_project]
#[derive(Clone)]
pub struct AsyncStopFlag(#[pin] Abortable<Pending<()>>);
impl AsyncStopToken {
/// Create a new token/flag pair.
#[must_use = "This function is pure, that is does nothing unless its output is used"]
pub fn new_pair() -> (Self, AsyncStopFlag) {
let (handle, reg) = AbortHandle::new_pair();
let token = Self(handle);
let flag = AsyncStopFlag(Abortable::new(pending(), reg));
(token, flag)
}
}
impl StopToken for AsyncStopToken {
fn stop(self) {
self.0.abort()
}
}
impl AsyncStopFlag {
/// Returns true if the stop token linked to `self` was used.
#[must_use = "This function is pure, that is does nothing unless its output is used"]
pub fn is_stopped(&self) -> bool {
self.0.is_aborted()
}
}
/// This future resolves when a stop token was used.
impl Future for AsyncStopFlag {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
self.project().0.poll(cx).map(|res| {
debug_assert!(
res.is_err(),
"Pending Future can't ever be resolved, so Abortable is only resolved when \
canceled"
);
})
}
}