Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: allow configuring I/O events capacity #5186

Merged
merged 10 commits into from Dec 7, 2022
2 changes: 1 addition & 1 deletion tokio/src/process/unix/orphan.rs
Expand Up @@ -294,7 +294,7 @@ pub(crate) mod test {
#[cfg_attr(miri, ignore)] // Miri does not support epoll.
#[test]
fn does_not_register_signal_if_queue_empty() {
let (io_driver, io_handle) = IoDriver::new().unwrap();
let (io_driver, io_handle) = IoDriver::new(1024).unwrap();
let signal_driver = SignalDriver::new(io_driver, &io_handle).unwrap();
let handle = signal_driver.handle();

Expand Down
22 changes: 22 additions & 0 deletions tokio/src/runtime/builder.rs
Expand Up @@ -44,6 +44,7 @@ pub struct Builder {

/// Whether or not to enable the I/O driver
enable_io: bool,
nevents: usize,

/// Whether or not to enable the time driver
enable_time: bool,
Expand Down Expand Up @@ -228,6 +229,7 @@ impl Builder {

// I/O defaults to "off"
enable_io: false,
nevents: 1024,

// Time defaults to "off"
enable_time: false,
Expand Down Expand Up @@ -647,6 +649,7 @@ impl Builder {
enable_io: self.enable_io,
enable_time: self.enable_time,
start_paused: self.start_paused,
nevents: self.nevents,
}
}

Expand Down Expand Up @@ -938,6 +941,25 @@ cfg_io_driver! {
self.enable_io = true;
self
}

/// Enables the I/O driver and configures the capacity of events to be
/// processed per tick.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capacity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with max number of events

///
/// # Examples
///
/// ```
/// use tokio::runtime;
///
/// let rt = runtime::Builder::new_current_thread()
/// .enable_io()
/// .max_io_events_per_tick(1024)
/// .build()
/// .unwrap();
/// ```
pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self {
self.nevents = capacity;
self
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions tokio/src/runtime/driver.rs
Expand Up @@ -36,11 +36,12 @@ pub(crate) struct Cfg {
pub(crate) enable_time: bool,
pub(crate) enable_pause_time: bool,
pub(crate) start_paused: bool,
pub(crate) nevents: usize,
}

impl Driver {
pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> {
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?;
let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io, cfg.nevents)?;

let clock = create_clock(cfg.enable_pause_time, cfg.start_paused);

Expand Down Expand Up @@ -135,12 +136,12 @@ cfg_io_driver! {
Disabled(UnparkThread),
}

fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(enabled: bool, nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
#[cfg(loom)]
assert!(!enabled);

let ret = if enabled {
let (io_driver, io_handle) = crate::runtime::io::Driver::new()?;
let (io_driver, io_handle) = crate::runtime::io::Driver::new(nevents)?;

let (signal_driver, signal_handle) = create_signal_driver(io_driver, &io_handle)?;
let process_driver = create_process_driver(signal_driver);
Expand Down Expand Up @@ -201,7 +202,7 @@ cfg_not_io_driver! {
#[derive(Debug)]
pub(crate) struct IoStack(ParkThread);

fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
fn create_io_stack(_enabled: bool, _nevents: usize) -> io::Result<(IoStack, IoHandle, SignalHandle)> {
let park_thread = ParkThread::new();
let unpark_thread = park_thread.unpark();
Ok((IoStack(park_thread), unpark_thread, Default::default()))
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -104,7 +104,7 @@ fn _assert_kinds() {
impl Driver {
/// Creates a new event loop, returning any error that happened during the
/// creation.
pub(crate) fn new() -> io::Result<(Driver, Handle)> {
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(tokio_wasi))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
Expand All @@ -116,7 +116,7 @@ impl Driver {
let driver = Driver {
tick: 0,
signal_ready: false,
events: mio::Events::with_capacity(1024),
events: mio::Events::with_capacity(nevents),
poll,
resources: slab,
};
Expand Down