Skip to content

Commit

Permalink
rt: allow configuring I/O events capacity (#5186)
Browse files Browse the repository at this point in the history
Adds a method `Builder::max_io_events_per_tick()` to the runtime builder. This can be used to configure the capacity of events that may be processed per OS poll.
  • Loading branch information
littledivy committed Dec 7, 2022
1 parent 22cff80 commit 36039d0
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tokio/src/process/unix/orphan.rs
Expand Up @@ -294,7 +294,7 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let (io_driver, io_handle) = IoDriver::new().unwrap();
let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();

Expand Down
22 changes: 22 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -44,6 +44,7 @@ pub struct Builder {

/// Whether or not to enable the I/O driver
enable_io: bool,
nevents: usize,

/// Whether or not to enable the time driver
enable_time: bool,
Expand Down Expand Up @@ -228,6 +229,7 @@ impl Builder {

// I/O defaults to "off"
enable_io: false,
nevents: 1024,

// Time defaults to "off"
enable_time: false,
Expand Down Expand Up @@ -647,6 +649,7 @@ impl Builder {
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
nevents: self.nevents,
}
}

Expand Down Expand Up @@ -938,6 +941,25 @@ cfg_io_driver! {
self.enable_io = true;
self
}

/// Enables the I/O driver and configures the max number of events to be
/// processed per tick.
///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .max_io_events_per_tick(1024)
/// .build()
/// .unwrap();
/// ```
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
self.nevents = capacity;
self
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions tokio/src/runtime/driver.rs
Expand Up @@ -36,11 +36,12 @@ pub(crate) struct Cfg {
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
pub(crate) nevents: usize,
}

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

Expand Down Expand Up @@ -135,12 +136,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}

fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);

let ret = if enabled {
let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
Expand Down Expand Up @@ -201,7 +202,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -104,7 +104,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<(Driver, Handle)> {
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
Expand All @@ -116,7 +116,7 @@ impl Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
Expand Down

0 comments on commit 36039d0

Please sign in to comment.