-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
epollexclusive.rs
104 lines (79 loc) · 2.65 KB
/
epollexclusive.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#![cfg(all(
target_os = "linux",
feature = "net",
feature = "rt",
feature = "sync",
feature = "macros",
feature = "time",
tokio_unstable,
))]
use std::sync::Arc;
use std::thread;
use tokio::sync::Barrier;
const NUM_WORKERS: usize = 8;
const NUM_CONNECTIONS: u64 = 32;
const FUDGE_MIN: f64 = 0.75;
const FUDGE_MAX: f64 = 1.25;
#[test]
fn epoll_exclusive() {
let value = count_accepts_with_flags(NUM_WORKERS, NUM_CONNECTIONS, libc::EPOLLEXCLUSIVE as u32);
let actual_to_expected_ratio = value as f64 / NUM_CONNECTIONS as f64;
assert!(
actual_to_expected_ratio >= FUDGE_MIN && actual_to_expected_ratio <= FUDGE_MAX,
"expected fuzzy {}, got {}",
NUM_CONNECTIONS,
value
);
}
fn count_accepts_with_flags(workers: usize, connections: u64, flags: u32) -> u64 {
let barrier = Arc::new(Barrier::new(workers as usize + 1));
let mut handles = Vec::with_capacity(workers);
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let listener_addr = listener.local_addr().unwrap();
for _ in 0..workers {
let local_listener = listener.try_clone().unwrap();
let local_barrier = barrier.clone();
handles.push(thread::spawn(move || {
count_accepts(local_listener, flags | libc::EPOLLIN as u32, local_barrier)
}))
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
barrier.wait().await;
for _ in 0..connections {
tokio::net::TcpStream::connect(listener_addr).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
barrier.wait().await;
});
let mut num_accepts_total = 0;
for handle in handles {
num_accepts_total += handle.join().unwrap();
}
num_accepts_total
}
fn count_accepts(std: std::net::TcpListener, flags: u32, barrier: Arc<Barrier>) -> u64 {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
std.set_nonblocking(true).unwrap();
let listener = tokio::net::TcpListener::from_std_with_epoll_flags(std, flags).unwrap();
barrier.wait().await;
let mut barr_wait = std::pin::pin!(barrier.wait());
loop {
tokio::select! {
_ = &mut barr_wait => {
return tokio::runtime::Handle::current().metrics().io_driver_ready_count();
}
a = listener.accept() => {
a.unwrap();
}
}
}
})
}