-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
task_abort.rs
96 lines (80 loc) · 3.03 KB
/
task_abort.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
/// Checks that a suspended task can be aborted without panicking as reported in
/// issue #3157: <https://github.com/tokio-rs/tokio/issues/3157>.
#[test]
fn test_abort_without_panic_3157() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.worker_threads(1)
.build()
.unwrap();
rt.block_on(async move {
let handle = tokio::spawn(async move {
println!("task started");
tokio::time::sleep(std::time::Duration::new(100, 0)).await
});
// wait for task to sleep.
tokio::time::sleep(std::time::Duration::new(1, 0)).await;
handle.abort();
let _ = handle.await;
});
}
/// Checks that a suspended task can be aborted inside of a current_thread
/// executor without panicking as reported in issue #3662:
/// <https://github.com/tokio-rs/tokio/issues/3662>.
#[test]
fn test_abort_without_panic_3662() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::Poll;
struct DropCheck(Arc<AtomicBool>);
impl Drop for DropCheck {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async move {
let drop_flag = Arc::new(AtomicBool::new(false));
let drop_check = DropCheck(drop_flag.clone());
let j = tokio::spawn(async move {
// NB: just grab the drop check here so that it becomes part of the
// task.
let _drop_check = drop_check;
std::future::pending::<()>().await;
});
let drop_flag2 = drop_flag.clone();
let task = std::thread::spawn(move || {
// This runs in a separate thread so it doesn't have immediate
// thread-local access to the executor. It does however transition
// the underlying task to be completed, which will cause it to be
// dropped (in this thread no less).
assert!(!drop_flag2.load(Ordering::SeqCst));
j.abort();
// TODO: is this guaranteed at this point?
// assert!(drop_flag2.load(Ordering::SeqCst));
j
})
.await
.unwrap();
assert!(drop_flag.load(Ordering::SeqCst));
let result = task.await;
assert!(result.unwrap_err().is_cancelled());
// Note: We do the following to trigger a deferred task cleanup.
//
// The relevant piece of code you want to look at is in:
// `Inner::block_on` of `basic_scheduler.rs`.
//
// We cause the cleanup to happen by having a poll return Pending once
// so that the scheduler can go into the "auxilliary tasks" mode, at
// which point the task is removed from the scheduler.
let i = tokio::spawn(async move {
tokio::task::yield_now().await;
task.await;
});
i.await.unwrap();
});
}