-
Notifications
You must be signed in to change notification settings - Fork 250
/
ring_buf_async.rs
72 lines (63 loc) · 2.19 KB
/
ring_buf_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
use std::os::fd::AsRawFd as _;
use aya::maps::RingBuf;
use aya::{include_bytes_aligned, programs::UProbe, Bpf};
use tokio::{
io::unix::AsyncFd,
time::{sleep, Duration},
};
use super::tokio_integration_test;
#[tokio_integration_test]
async fn ring_buf_async() {
let bytes = include_bytes_aligned!("../../../../target/bpfel-unknown-none/release/ring_buf");
let mut bpf = Bpf::load(bytes).unwrap();
let ring_buf = bpf.take_map("RING_BUF").unwrap();
let mut ring_buf = RingBuf::try_from(ring_buf).unwrap();
let prog: &mut UProbe = bpf
.program_mut("ring_buf_test")
.unwrap()
.try_into()
.unwrap();
prog.load().unwrap();
prog.attach(
Some("ring_buf_trigger_ebpf_program"),
0,
"/proc/self/exe",
None,
)
.unwrap();
// Generate some random data.
let data = super::ring_buf::gen_data();
let write_handle =
tokio::task::spawn(call_ring_buf_trigger_ebpf_program_over_time(data.clone()));
// Construct an AsyncFd from the RingBuf in order to receive readiness notifications.
let async_fd = AsyncFd::new(ring_buf.as_raw_fd()).unwrap();
let seen = {
let mut seen = Vec::with_capacity(data.len());
while seen.len() < data.len() {
// Wait for readiness, then clear the bit before reading so that no notifications
// are missed.
async_fd.readable().await.unwrap().clear_ready();
while let Some(data) = ring_buf.next() {
let data: [u8; 8] = (*data).try_into().unwrap();
let arg = u64::from_ne_bytes(data);
seen.push(arg);
}
}
seen
};
// Ensure that the data that was read matches what was passed.
assert_eq!(seen, data);
write_handle.await.unwrap();
}
async fn call_ring_buf_trigger_ebpf_program_over_time(data: Vec<u64>) {
let random_duration = || {
use rand::Rng as _;
let mut rng = rand::thread_rng();
let micros = rng.gen_range(0..1_000);
Duration::from_micros(micros)
};
for value in data {
sleep(random_duration()).await;
super::ring_buf::ring_buf_trigger_ebpf_program(value);
}
}