-
Notifications
You must be signed in to change notification settings - Fork 247
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Co-authored-by: William Findlay <william@williamfindlay.com>
- Loading branch information
1 parent
cc37460
commit 9fb0403
Showing
3 changed files
with
314 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,19 +1,326 @@ | ||
#![allow(missing_docs)] | ||
|
||
//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs. | ||
//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF | ||
//! programs to userspace. | ||
//! | ||
//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html | ||
|
||
use std::{ops::DerefMut, sync::Arc}; | ||
use std::{ | ||
io, | ||
ops::DerefMut, | ||
os::unix::prelude::AsRawFd, | ||
ptr, | ||
sync::{ | ||
atomic::{AtomicUsize, Ordering}, | ||
Arc, | ||
}, | ||
}; | ||
|
||
use libc::{munmap, sysconf, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, _SC_PAGESIZE}; | ||
use thiserror::Error; | ||
|
||
use crate::{ | ||
generated::bpf_map_type::BPF_MAP_TYPE_RINGBUF, | ||
maps::{Map, MapError, MapRefMut}, | ||
generated::{ | ||
bpf_map_type::BPF_MAP_TYPE_RINGBUF, BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, | ||
BPF_RINGBUF_HDR_SZ, | ||
}, | ||
maps::{Map, MapError}, | ||
sys::mmap, | ||
}; | ||
|
||
/// Ring buffer error. | ||
#[derive(Error, Debug)] | ||
pub enum RingBufferError { | ||
/// `mmap`-ping the consumer buffer failed. | ||
#[error("consumer mmap failed: {io_error}")] | ||
ConsumerMMapError { | ||
#[source] | ||
io_error: io::Error, | ||
}, | ||
|
||
/// `mmap`-ping the produer buffer failed. | ||
#[error("consumer mmap failed: {io_error}")] | ||
ProducerMMapError { | ||
#[source] | ||
io_error: io::Error, | ||
}, | ||
|
||
/// An error occurred related to the inner map. | ||
#[error(transparent)] | ||
MapError(#[from] MapError), | ||
|
||
/// An IO error occurred. | ||
#[error(transparent)] | ||
IOError(#[from] io::Error), | ||
|
||
/// An error occurred in a per-event callback. | ||
#[error(transparent)] | ||
CallbackError(#[from] CallbackError), | ||
} | ||
|
||
/// A per-event callback error. | ||
#[derive(Error, Debug)] | ||
pub enum CallbackError { | ||
#[error(transparent)] | ||
Error(#[from] anyhow::Error), | ||
} | ||
|
||
/// A map that can be used to receive events from eBPF programs. | ||
/// | ||
/// This is similar to [`PerfEventArray`], but different in a few ways: | ||
/// * It's shared across all CPUs, which allows a strong ordering between events. It also makes the | ||
/// buffer creation easier. | ||
/// * Data notifications are delivered for every event instead of being sampled for every N event; | ||
/// the eBPF program can also control notification delivery if sampling is desired for performance reasons. | ||
/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly | ||
/// written into the ring without copying from a temporary location. | ||
/// * Dropped sample notifications goes to the eBPF program as the return value of `reserve`/`output`, | ||
/// and not the userspace reader. This might require extra code to handle, but allows for more | ||
/// flexible schemes to handle dropped samples. | ||
/// | ||
/// To receive events you need to: | ||
/// * call [`RingBuf::open`] | ||
/// * poll the returned [`RingBuf`] to be notified when events are inserted in the buffer | ||
/// * call [`RingBuf::read_events`] to read the events | ||
/// | ||
/// # Minimum kernel version | ||
/// | ||
/// The minimum kernel version required to use this feature is 5.8. | ||
/// | ||
/// # Examples | ||
/// | ||
/// The following example shows how to read samples as well as using an async runtime | ||
/// to wait for samples to be ready: | ||
/// | ||
/// ```no_run | ||
/// # use aya::maps::{Map, RingBuf}; | ||
/// # use std::ops::DerefMut; | ||
/// # use anyhow::Error; | ||
/// # struct Poll<T: DerefMut<Target=Map>>(RingBuf<T>); | ||
/// # impl<T: DerefMut<Target=Map>> Poll<T> { | ||
/// # fn new(inner: RingBuf<T>) -> Self { Self (inner) } | ||
/// # fn readable(&mut self) {} | ||
/// # fn get_mut(&mut self) -> &mut RingBuf<T> { &mut self.0 } | ||
/// # } | ||
/// # let bpf = aya::Bpf::load(&[])?; | ||
/// use std::convert::{TryFrom, TryInto}; | ||
/// | ||
/// let mut ring = RingBuf::open(bpf.map_mut("EVENTS")?)?; | ||
/// | ||
/// // Poll would be a struct that wraps `AsRawFd`. | ||
/// let mut poll = Poll::new(ring); | ||
/// loop { | ||
/// // readable() should be a function that waits ring's fd to be readable. | ||
/// // If you're using an async library, you can .await here | ||
/// poll.readable(); | ||
/// | ||
/// poll.get_mut().process_ring(&mut |data| { | ||
/// // Do something with the data bytes | ||
/// Ok(()) // continue reading from the ring buffer | ||
/// }); | ||
/// } | ||
/// # Ok::<(), Error>(()) | ||
/// ``` | ||
/// | ||
/// [`perf`]: https://perf.wiki.kernel.org/index.php/Main_Page | ||
/// [epoll]: https://docs.rs/epoll | ||
/// [mio]: https://docs.rs/mio | ||
/// [tokio]: https://docs.rs/tokio | ||
/// [async-std]: https://docs.rs/async-std | ||
#[doc(alias = "BPF_MAP_TYPE_RINGBUF")] | ||
pub struct RingBuf<T: DerefMut<Target = Map>> { | ||
_map: Arc<T>, | ||
map_fd: i32, | ||
data_ptr: *mut u8, | ||
consumer_pos_ptr: *mut AtomicUsize, | ||
producer_pos_ptr: *mut AtomicUsize, | ||
page_size: usize, | ||
mask: usize, | ||
} | ||
|
||
impl<T: DerefMut<Target = Map>> RingBuf<T> { | ||
pub fn open(map: T) -> Result<Self, RingBufferError> { | ||
// Check that the map is a ringbuf | ||
let map_type = map.obj.def.map_type; | ||
if map_type != BPF_MAP_TYPE_RINGBUF as u32 { | ||
return Err(MapError::InvalidMapType { map_type }.into()); | ||
} | ||
|
||
// Determine page_size, map_fd, and set mask to map size - 1 | ||
let page_size = unsafe { sysconf(_SC_PAGESIZE) } as usize; | ||
let map_fd = map.fd_or_err().map_err(RingBufferError::from)?; | ||
let mask = (map.obj.def.max_entries - 1) as usize; | ||
|
||
// Map writable consumer page | ||
let consumer_page = unsafe { | ||
mmap( | ||
ptr::null_mut(), | ||
page_size, | ||
PROT_READ | PROT_WRITE, | ||
MAP_SHARED, | ||
map_fd, | ||
0, | ||
) | ||
}; | ||
if consumer_page == MAP_FAILED { | ||
return Err(RingBufferError::ConsumerMMapError { | ||
io_error: io::Error::last_os_error(), | ||
}); | ||
} | ||
|
||
// From kernel/bpf/ringbuf.c: | ||
// Each data page is mapped twice to allow "virtual" | ||
// continuous read of samples wrapping around the end of ring | ||
// buffer area: | ||
// ------------------------------------------------------ | ||
// | meta pages | real data pages | same data pages | | ||
// ------------------------------------------------------ | ||
// | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | | ||
// ------------------------------------------------------ | ||
// | | TA DA | TA DA | | ||
// ------------------------------------------------------ | ||
// ^^^^^^^ | ||
// | | ||
// Here, no need to worry about special handling of wrapped-around | ||
// data due to double-mapped data pages. This works both in kernel and | ||
// when mmap()'ed in user-space, simplifying both kernel and | ||
// user-space implementations significantly. | ||
let producer_pages = unsafe { | ||
mmap( | ||
ptr::null_mut(), | ||
page_size + 2 * (mask + 1), | ||
PROT_READ, | ||
MAP_SHARED, | ||
map_fd, | ||
page_size as i64, | ||
) | ||
}; | ||
if producer_pages == MAP_FAILED { | ||
return Err(RingBufferError::ProducerMMapError { | ||
io_error: io::Error::last_os_error(), | ||
}); | ||
} | ||
|
||
Ok(RingBuf { | ||
_map: Arc::new(map), | ||
map_fd, | ||
data_ptr: unsafe { (producer_pages as *mut u8).add(page_size) }, | ||
consumer_pos_ptr: consumer_page as *mut _, | ||
producer_pos_ptr: producer_pages as *mut _, | ||
page_size, | ||
mask, | ||
}) | ||
} | ||
|
||
/// Retrieve an event from the ring, pass it to the callback, mark it as consumed, then repeat. | ||
/// Returns when either the callback returns an Err or there's no more events. | ||
pub fn process_ring( | ||
&mut self, | ||
callback: &mut dyn FnMut(&[u8]) -> Result<(), CallbackError>, | ||
) -> Result<u64, CallbackError> { | ||
let mut count = 0u64; | ||
|
||
let mut consumer_pos = unsafe { (*self.consumer_pos_ptr).load(Ordering::Acquire) }; | ||
loop { | ||
let mut got_new = false; | ||
|
||
let producer_pos = unsafe { (*self.producer_pos_ptr).load(Ordering::Acquire) }; | ||
while consumer_pos < producer_pos { | ||
let sample_head = unsafe { self.data_ptr.add(consumer_pos as usize & self.mask) }; | ||
let len_and_flags = unsafe { *(sample_head as *mut u32) }; | ||
|
||
// The sample has not been committed yet, so bail | ||
if (len_and_flags as usize & BPF_RINGBUF_BUSY_BIT as usize) != 0 { | ||
return Ok(count); | ||
} | ||
|
||
// Got a new sample | ||
got_new = true; | ||
consumer_pos += roundup_len(len_and_flags) as usize; | ||
|
||
if (len_and_flags & BPF_RINGBUF_DISCARD_BIT) == 0 { | ||
// Coerce the sample into a &[u8] | ||
let sample_ptr = unsafe { sample_head.add(BPF_RINGBUF_HDR_SZ as usize) }; | ||
let sample = unsafe { | ||
std::slice::from_raw_parts(sample_ptr as *mut u8, len_and_flags as usize) | ||
}; | ||
|
||
if let Err(e) = callback(sample) { | ||
// Store new consumer position and forward error from callback | ||
unsafe { (*self.consumer_pos_ptr).store(consumer_pos, Ordering::Release) }; | ||
return Err(e); | ||
}; | ||
count += 1; | ||
} | ||
|
||
// Store new consumer position | ||
unsafe { (*self.consumer_pos_ptr).store(consumer_pos, Ordering::Release) }; | ||
} | ||
|
||
if !got_new { | ||
break; | ||
} | ||
} | ||
|
||
Ok(count) | ||
} | ||
} | ||
|
||
impl<T: DerefMut<Target = Map>> RingBuf<T> {} | ||
impl<T: DerefMut<Target = Map>> Drop for RingBuf<T> { | ||
fn drop(&mut self) { | ||
if !self.consumer_pos_ptr.is_null() { | ||
// SAFETY: `consumer_pos` is not null and consumer page is not null and | ||
// consumer page was mapped with size `self.page_size` | ||
unsafe { munmap(self.consumer_pos_ptr as *mut _, self.page_size) }; | ||
} | ||
|
||
if !self.producer_pos_ptr.is_null() { | ||
// SAFETY: `producer_pos` is not null and producer pages were mapped with size | ||
// `self.page_size + 2 * (self.mask + 1)` | ||
unsafe { | ||
munmap( | ||
self.producer_pos_ptr as *mut _, | ||
self.page_size + 2 * (self.mask + 1), | ||
) | ||
}; | ||
} | ||
} | ||
} | ||
|
||
impl<T: DerefMut<Target = Map>> AsRawFd for RingBuf<T> { | ||
fn as_raw_fd(&self) -> std::os::unix::prelude::RawFd { | ||
self.map_fd | ||
} | ||
} | ||
|
||
/// Round up a `len` to the nearest 8 byte alignment, adding BPF_RINGBUF_HDR_SZ and | ||
/// clearing out the upper two bits of `len`. | ||
pub(crate) fn roundup_len(len: u32) -> u32 { | ||
let mut len = len; | ||
// clear out the upper two bits (busy and discard) | ||
len &= 0x3fffffff; | ||
// add the size of the header prefix | ||
len += BPF_RINGBUF_HDR_SZ; | ||
// round to up to next multiple of 8 | ||
(len + 7) & !7 | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
||
#[test] | ||
fn test_roundup_len() { | ||
// should always round up to nearest 8 byte alignment + BPF_RINGBUF_HDR_SZ | ||
assert_eq!(roundup_len(0), BPF_RINGBUF_HDR_SZ); | ||
assert_eq!(roundup_len(1), BPF_RINGBUF_HDR_SZ + 8); | ||
assert_eq!(roundup_len(8), BPF_RINGBUF_HDR_SZ + 8); | ||
assert_eq!(roundup_len(9), BPF_RINGBUF_HDR_SZ + 16); | ||
// should discard the upper two bits of len | ||
assert_eq!( | ||
roundup_len(0 | (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT)), | ||
BPF_RINGBUF_HDR_SZ | ||
); | ||
} | ||
} |