Skip to content

Commit

Permalink
Make global queue and event polling intervals configurable
Browse files Browse the repository at this point in the history
Adds knobs to the runtime builder to control the number of ticks
between polling the global task queue (fairness) and the event driver
(I/O prioritization).

Both varieties of scheduler already supported these intervals, but they
were defined by private constants. Some workloads benefit from
customizing these values.

Closes #4651
  • Loading branch information
aturon committed May 9, 2022
1 parent 71e18f7 commit 36d6d2f
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 30 deletions.
23 changes: 12 additions & 11 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -57,6 +57,12 @@ struct Core {

/// Metrics batch
metrics: MetricsBatch,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u8,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u8,
}

#[derive(Clone)]
Expand Down Expand Up @@ -107,15 +113,6 @@ struct Context {
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
#[cfg(loom)]
const MAX_TASKS_PER_TICK: usize = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: usize = 61;

/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

Expand All @@ -125,6 +122,8 @@ impl BasicScheduler {
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u8,
event_interval: u8,
) -> BasicScheduler {
let unpark = driver.unpark();

Expand All @@ -148,6 +147,8 @@ impl BasicScheduler {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
global_queue_interval,
event_interval,
})));

BasicScheduler {
Expand Down Expand Up @@ -514,12 +515,12 @@ impl CoreGuard<'_> {
}
}

for _ in 0..MAX_TASKS_PER_TICK {
for _ in 0..core.event_interval {
// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);

let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
let entry = if tick % core.global_queue_interval == 0 {
core.spawner.pop().or_else(|| core.tasks.pop_front())
} else {
core.tasks.pop_front().or_else(|| core.spawner.pop())
Expand Down
111 changes: 102 additions & 9 deletions tokio/src/runtime/builder.rs
Expand Up @@ -78,6 +78,12 @@ pub struct Builder {

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,

/// How many ticks before pulling a task from the global/remote queue?
pub(super) global_queue_interval: u8,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u8,
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand All @@ -98,7 +104,20 @@ impl Builder {
///
/// [`LocalSet`]: crate::task::LocalSet
pub fn new_current_thread() -> Builder {
Builder::new(Kind::CurrentThread)
/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

/// Max number of tasks to poll per tick.
#[cfg(loom)]
const MAX_TASKS_PER_TICK: u8 = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: u8 = 61;

Builder::new(
Kind::CurrentThread,
REMOTE_FIRST_INTERVAL,
MAX_TASKS_PER_TICK,
)
}

/// Returns a new builder with the multi thread scheduler selected.
Expand All @@ -107,14 +126,26 @@ impl Builder {
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
Builder::new(Kind::MultiThread)
/// After how many ticks is the global queue polled. This helps to ensure
/// fairness.
///
/// The same value is used to control when to yield to the driver for events.
///
/// The number is fairly arbitrary. I believe this value was copied from golang.
const GLOBAL_POLL_INTERVAL: u8 = 61;

Builder::new(
Kind::MultiThread,
GLOBAL_POLL_INTERVAL,
GLOBAL_POLL_INTERVAL,
)
}

/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub(crate) fn new(kind: Kind) -> Builder {
pub(crate) fn new(kind: Kind, global_queue_interval: u8, event_interval: u8) -> Builder {
Builder {
kind,

Expand Down Expand Up @@ -145,6 +176,11 @@ impl Builder {
after_unpark: None,

keep_alive: None,

// Defaults for these values depend on the scheduler kind, so we get them
// as parameters.
global_queue_interval,
event_interval,
}
}

Expand Down Expand Up @@ -554,6 +590,54 @@ impl Builder {
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll the global
/// task queue.
///
/// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. Schedulers
/// have a local queue of already-claimed tasks, and a global queue of incoming tasks.
/// Setting the interval to a smaller value increases the fairness of the scheduler,
/// at the cost of more synchronization overhead.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .global_queue_interval(31)
/// .build();
/// # }
/// ```
pub fn global_queue_interval(&mut self, val: u8) -> &mut Self {
self.global_queue_interval = val;
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll for
/// external events (timers, I/O, and so on).
///
/// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
/// Setting the event interval determines the effective "priority" of delivering
/// these external events (which may wake up additional tasks), compared to
/// executing tasks that are currently ready to run.
///
/// # Examples
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .event_interval(31)
/// .build();
/// # }
/// ```
pub fn event_interval(&mut self, val: u8) -> &mut Self {
self.event_interval = val;
self
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, HandleInner, Kind};

Expand All @@ -580,6 +664,8 @@ impl Builder {
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

Expand Down Expand Up @@ -667,14 +753,15 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, HandleInner, ThreadPool};
use crate::runtime::{HandleInner, Kind, ThreadPool};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
Expand All @@ -685,13 +772,19 @@ cfg_rt_multi_thread! {
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone());
let (scheduler, launch) = ThreadPool::new(
core_threads,
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle {
spawner,
};
let handle = Handle { spawner };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
13 changes: 11 additions & 2 deletions tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -51,10 +51,19 @@ impl ThreadPool {
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u8,
event_interval: u8,
) -> (ThreadPool, Launch) {
let parker = Parker::new(driver);
let (shared, launch) =
worker::create(size, parker, handle_inner, before_park, after_unpark);
let (shared, launch) = worker::create(
size,
parker,
handle_inner,
before_park,
after_unpark,
global_queue_interval,
event_interval,
);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
20 changes: 12 additions & 8 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -117,6 +117,12 @@ struct Core {

/// Fast random number generator.
rand: FastRand,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u8,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u8,
}

/// State shared across all workers
Expand Down Expand Up @@ -198,6 +204,8 @@ pub(super) fn create(
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u8,
event_interval: u8,
) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
Expand All @@ -219,6 +227,8 @@ pub(super) fn create(
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
global_queue_interval,
event_interval,
}));

remotes.push(Remote { steal, unpark });
Expand Down Expand Up @@ -346,12 +356,6 @@ where
}
}

/// After how many ticks is the global queue polled. This helps to ensure
/// fairness.
///
/// The number is fairly arbitrary. I believe this value was copied from golang.
const GLOBAL_POLL_INTERVAL: u8 = 61;

impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
Expand Down Expand Up @@ -464,7 +468,7 @@ impl Context {
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % GLOBAL_POLL_INTERVAL == 0 {
if core.tick % core.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
Expand Down Expand Up @@ -551,7 +555,7 @@ impl Core {

/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % GLOBAL_POLL_INTERVAL == 0 {
if self.tick % self.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
Expand Down

0 comments on commit 36d6d2f

Please sign in to comment.