/
app.rs
180 lines (159 loc) · 4.62 KB
/
app.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
use std::{future::Future, task::Poll, time::Duration};
static HELP: &str = r#"
Example console-instrumented app
USAGE:
app [OPTIONS]
OPTIONS:
-h, help prints this message
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
console_subscriber::init();
// spawn optional extras from CLI args
// skip first which is command name
for opt in std::env::args().skip(1) {
match &*opt {
"blocks" => {
tokio::task::Builder::new()
.name("blocks")
.spawn(double_sleepy(1, 10))
.unwrap();
}
"coma" => {
tokio::task::Builder::new()
.name("coma")
.spawn(std::future::pending::<()>())
.unwrap();
}
"burn" => {
tokio::task::Builder::new()
.name("burn")
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"blocking" => {
tokio::task::Builder::new()
.name("spawns_blocking")
.spawn(spawn_blocking(5))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
}
wat => {
return Err(
format!("unknown option: {:?}, run with '-h' to see options", wat).into(),
)
}
}
}
let task1 = tokio::task::Builder::new()
.name("task1")
.spawn(spawn_tasks(1, 10))
.unwrap();
let task2 = tokio::task::Builder::new()
.name("task2")
.spawn(spawn_tasks(10, 30))
.unwrap();
let result = tokio::try_join! {
task1,
task2,
};
result?;
Ok(())
}
#[tracing::instrument]
async fn spawn_tasks(min: u64, max: u64) {
loop {
for i in min..max {
tracing::trace!(i, "spawning wait task");
tokio::task::Builder::new()
.name("wait")
.spawn(wait(i))
.unwrap();
let sleep = Duration::from_secs(max) - Duration::from_secs(i);
tracing::trace!(?sleep, "sleeping...");
tokio::time::sleep(sleep).await;
}
}
}
#[tracing::instrument]
async fn wait(seconds: u64) {
tracing::debug!("waiting...");
tokio::time::sleep(Duration::from_secs(seconds)).await;
tracing::trace!("done!");
}
#[tracing::instrument]
async fn double_sleepy(min: u64, max: u64) {
loop {
for i in min..max {
// woops!
std::thread::sleep(Duration::from_secs(i));
tokio::time::sleep(Duration::from_secs(max - i)).await;
}
}
}
#[tracing::instrument]
async fn burn(min: u64, max: u64) {
loop {
for i in min..max {
for _ in 0..i {
self_wake().await;
}
tokio::time::sleep(Duration::from_secs(i - min)).await;
}
}
}
#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");
_ = handle.await;
}
}
#[tracing::instrument]
async fn spawn_blocking(seconds: u64) {
loop {
let seconds = seconds;
_ = tokio::task::spawn_blocking(move || {
std::thread::sleep(Duration::from_secs(seconds));
})
.await;
}
}
fn self_wake() -> impl Future<Output = ()> {
struct SelfWake {
yielded: bool,
}
impl Future for SelfWake {
type Output = ();
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
if self.yielded == true {
return Poll::Ready(());
}
self.yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
}
SelfWake { yielded: false }
}