Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make global queue and event polling intervals configurable #4671

Merged
merged 4 commits into from May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 13 additions & 12 deletions tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -48,7 +48,7 @@ struct Core {
spawner: Spawner,

/// Current tick
tick: u8,
tick: u32,

/// Runtime driver
///
Expand All @@ -57,6 +57,12 @@ struct Core {

/// Metrics batch
metrics: MetricsBatch,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}

#[derive(Clone)]
Expand Down Expand Up @@ -107,15 +113,6 @@ struct Context {
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
#[cfg(loom)]
const MAX_TASKS_PER_TICK: usize = 4;
#[cfg(not(loom))]
const MAX_TASKS_PER_TICK: usize = 61;

/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

Expand All @@ -125,6 +122,8 @@ impl BasicScheduler {
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
) -> BasicScheduler {
let unpark = driver.unpark();

Expand All @@ -148,6 +147,8 @@ impl BasicScheduler {
tick: 0,
driver: Some(driver),
metrics: MetricsBatch::new(),
global_queue_interval,
event_interval,
})));

BasicScheduler {
Expand Down Expand Up @@ -514,12 +515,12 @@ impl CoreGuard<'_> {
}
}

for _ in 0..MAX_TASKS_PER_TICK {
for _ in 0..core.event_interval {
// Get and increment the current tick
let tick = core.tick;
core.tick = core.tick.wrapping_add(1);

let entry = if tick % REMOTE_FIRST_INTERVAL == 0 {
let entry = if tick % core.global_queue_interval == 0 {
core.spawner.pop().or_else(|| core.tasks.pop_front())
} else {
core.tasks.pop_front().or_else(|| core.spawner.pop())
Expand Down
114 changes: 100 additions & 14 deletions tokio/src/runtime/builder.rs
Expand Up @@ -78,6 +78,12 @@ pub struct Builder {

/// Customizable keep alive timeout for BlockingPool
pub(super) keep_alive: Option<Duration>,

/// How many ticks before pulling a task from the global/remote queue?
pub(super) global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(super) event_interval: u32,
}

pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>;
Expand All @@ -98,7 +104,13 @@ impl Builder {
///
/// [`LocalSet`]: crate::task::LocalSet
pub fn new_current_thread() -> Builder {
Builder::new(Kind::CurrentThread)
#[cfg(loom)]
const EVENT_INTERVAL: u32 = 4;
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
#[cfg(not(loom))]
const EVENT_INTERVAL: u32 = 61;

Builder::new(Kind::CurrentThread, 31, EVENT_INTERVAL)
}

/// Returns a new builder with the multi thread scheduler selected.
Expand All @@ -107,14 +119,15 @@ impl Builder {
#[cfg(feature = "rt-multi-thread")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
pub fn new_multi_thread() -> Builder {
Builder::new(Kind::MultiThread)
// The number `61` is fairly arbitrary. I believe this value was copied from golang.
Builder::new(Kind::MultiThread, 61, 61)
}

/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub(crate) fn new(kind: Kind) -> Builder {
pub(crate) fn new(kind: Kind, global_queue_interval: u32, event_interval: u32) -> Builder {
Builder {
kind,

Expand Down Expand Up @@ -145,6 +158,11 @@ impl Builder {
after_unpark: None,

keep_alive: None,

// Defaults for these values depend on the scheduler kind, so we get them
// as parameters.
global_queue_interval,
event_interval,
}
}

Expand Down Expand Up @@ -286,7 +304,6 @@ impl Builder {
/// ```
/// # use tokio::runtime;
/// # use std::sync::atomic::{AtomicUsize, Ordering};
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .thread_name_fn(|| {
Expand Down Expand Up @@ -338,7 +355,6 @@ impl Builder {
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_start(|| {
Expand All @@ -364,7 +380,6 @@ impl Builder {
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_stop(|| {
Expand Down Expand Up @@ -473,7 +488,6 @@ impl Builder {
///
/// ```
/// # use tokio::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new_multi_thread()
/// .on_thread_unpark(|| {
Expand Down Expand Up @@ -542,7 +556,6 @@ impl Builder {
/// ```
/// # use tokio::runtime;
/// # use std::time::Duration;
///
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .thread_keep_alive(Duration::from_millis(100))
Expand All @@ -554,6 +567,70 @@ impl Builder {
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll the global
/// task queue.
///
/// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
///
/// By default the global queue interval is:
///
/// * `31` for the current-thread scheduler.
/// * `61` for the multithreaded scheduler.
///
/// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming
/// tasks. Setting the interval to a smaller value increases the fairness of the scheduler,
/// at the cost of more synchronization overhead. That can be beneficial for prioritizing
/// getting started on new work, especially if tasks frequently yield rather than complete
/// or await on further I/O. Conversely, a higher value prioritizes existing work, and
/// is a good choice when most tasks quickly complete polling.
///
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// # Examples
///
/// ```
/// # use tokio::runtime;
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .global_queue_interval(31)
/// .build();
/// # }
/// ```
pub fn global_queue_interval(&mut self, val: u32) -> &mut Self {
self.global_queue_interval = val;
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll for
/// external events (timers, I/O, and so on).
///
/// A scheduler "tick" roughly corresponds to one `poll` invocation on a task.
///
/// By default, the event interval is `61` for all scheduler types.
///
/// Setting the event interval determines the effective "priority" of delivering
/// these external events (which may wake up additional tasks), compared to
/// executing tasks that are currently ready to run. A smaller value is useful
/// when tasks frequently spend a long time in polling, or frequently yield,
/// which can result in overly long delays picking up I/O events. Conversely,
/// picking up new events requires extra synchronization and syscall overhead,
/// so if tasks generally complete their polling quickly, a higher event interval
/// will minimize that overhead while still keeping the scheduler responsive to
/// events.
///
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// # Examples
///
/// ```
/// # use tokio::runtime;
/// # pub fn main() {
/// let rt = runtime::Builder::new_multi_thread()
/// .event_interval(31)
/// .build();
/// # }
/// ```
pub fn event_interval(&mut self, val: u32) -> &mut Self {
self.event_interval = val;
self
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, HandleInner, Kind};

Expand All @@ -580,6 +657,8 @@ impl Builder {
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

Expand Down Expand Up @@ -667,14 +746,15 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, HandleInner, ThreadPool};
use crate::runtime::{HandleInner, Kind, ThreadPool};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_pool =
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
Expand All @@ -685,13 +765,19 @@ cfg_rt_multi_thread! {
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone());
let (scheduler, launch) = ThreadPool::new(
core_threads,
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle {
spawner,
};
let handle = Handle { spawner };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
13 changes: 11 additions & 2 deletions tokio/src/runtime/thread_pool/mod.rs
Expand Up @@ -51,10 +51,19 @@ impl ThreadPool {
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
) -> (ThreadPool, Launch) {
let parker = Parker::new(driver);
let (shared, launch) =
worker::create(size, parker, handle_inner, before_park, after_unpark);
let (shared, launch) = worker::create(
size,
parker,
handle_inner,
before_park,
after_unpark,
global_queue_interval,
event_interval,
);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
22 changes: 13 additions & 9 deletions tokio/src/runtime/thread_pool/worker.rs
Expand Up @@ -87,7 +87,7 @@ pub(super) struct Worker {
/// Core data
struct Core {
/// Used to schedule bookkeeping tasks every so often.
tick: u8,
tick: u32,

/// When a task is scheduled from a worker, it is stored in this slot. The
/// worker will check this slot for a task **before** checking the run
Expand Down Expand Up @@ -117,6 +117,12 @@ struct Core {

/// Fast random number generator.
rand: FastRand,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}

/// State shared across all workers
Expand Down Expand Up @@ -198,6 +204,8 @@ pub(super) fn create(
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
Expand All @@ -219,6 +227,8 @@ pub(super) fn create(
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
global_queue_interval,
event_interval,
}));

remotes.push(Remote { steal, unpark });
Expand Down Expand Up @@ -346,12 +356,6 @@ where
}
}

/// After how many ticks is the global queue polled. This helps to ensure
/// fairness.
///
/// The number is fairly arbitrary. I believe this value was copied from golang.
const GLOBAL_POLL_INTERVAL: u8 = 61;

impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
Expand Down Expand Up @@ -464,7 +468,7 @@ impl Context {
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % GLOBAL_POLL_INTERVAL == 0 {
if core.tick % core.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
Expand Down Expand Up @@ -551,7 +555,7 @@ impl Core {

/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % GLOBAL_POLL_INTERVAL == 0 {
if self.tick % self.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
Expand Down