Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: update MultiThread to use its own Handle #5025

Merged
merged 1 commit into from Sep 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 11 additions & 13 deletions tokio/src/runtime/builder.rs
Expand Up @@ -993,10 +993,9 @@ cfg_test_util! {
cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sync::Arc;
use crate::loom::sys::num_cpus;
use crate::runtime::{Config, Scheduler};
use crate::runtime::scheduler::{self, multi_thread, MultiThread};
use crate::runtime::scheduler::{self, MultiThread};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -1007,9 +1006,16 @@ cfg_rt_multi_thread! {
blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Generate a rng seed for this runtime.
let seed_generator_1 = self.seed_generator.next_generator();
let seed_generator_2 = self.seed_generator.next_generator();

let (scheduler, launch) = MultiThread::new(
core_threads,
driver,
driver_handle,
blocking_spawner,
seed_generator_2,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
Expand All @@ -1018,20 +1024,12 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
seed_generator: self.seed_generator.next_generator(),
seed_generator: seed_generator_1,
},
);

let inner = Arc::new(multi_thread::Handle {
spawner: scheduler.spawner().clone(),
driver: driver_handle,
blocking_spawner,
seed_generator: self.seed_generator.next_generator(),
});
let inner = scheduler::Handle::MultiThread(inner);

// Create the runtime handle
let handle = Handle { inner };
let handle = scheduler::Handle::MultiThread(scheduler.handle().clone());
let handle = Handle { inner: handle };

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/mod.rs
Expand Up @@ -83,7 +83,7 @@ cfg_rt! {
Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),

#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(h) => h.spawner.spawn(future, id),
Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
}
}

Expand All @@ -92,7 +92,7 @@ cfg_rt! {
Handle::CurrentThread(_) => {},

#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
Handle::MultiThread(ref h) => h.spawner.shutdown(),
Handle::MultiThread(ref h) => h.shutdown(),
}
}

Expand Down
59 changes: 50 additions & 9 deletions tokio/src/runtime/scheduler/multi_thread/handle.rs
@@ -1,12 +1,18 @@
use crate::runtime::scheduler::multi_thread::Spawner;
use crate::runtime::{blocking, driver};
use crate::future::Future;
use crate::loom::sync::Arc;
use crate::runtime::scheduler::multi_thread::worker;
use crate::runtime::{
blocking, driver,
task::{self, JoinHandle},
};
use crate::util::RngSeedGenerator;

use std::fmt;

/// Handle to the multi thread scheduler
#[derive(Debug)]
pub(crate) struct Handle {
/// Task spawner
pub(crate) spawner: Spawner,
pub(super) shared: worker::Shared,

/// Resource driver handles
pub(crate) driver: driver::Handle,
Expand All @@ -18,28 +24,63 @@ pub(crate) struct Handle {
pub(crate) seed_generator: RngSeedGenerator,
}

impl Handle {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(me: &Arc<Self>, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
Self::bind_new_task(me, future, id)
}

pub(crate) fn shutdown(&self) {
self.shared.close();
}

pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T, id: task::Id) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);

if let Some(notified) = notified {
me.shared.schedule(notified, false);
}

handle
}
}

cfg_metrics! {
use crate::runtime::{SchedulerMetrics, WorkerMetrics};

impl Handle {
pub(crate) fn num_workers(&self) -> usize {
self.spawner.shared.worker_metrics.len()
self.shared.worker_metrics.len()
}

pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.spawner.shared.scheduler_metrics
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
&self.spawner.shared.worker_metrics[worker]
&self.shared.worker_metrics[worker]
}

pub(crate) fn injection_queue_depth(&self) -> usize {
self.spawner.shared.injection_queue_depth()
self.shared.injection_queue_depth()
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.spawner.shared.worker_local_queue_depth(worker)
self.shared.worker_local_queue_depth(worker)
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("multi_thread::Handle { ... }").finish()
}
}
73 changes: 23 additions & 50 deletions tokio/src/runtime/scheduler/multi_thread/mod.rs
Expand Up @@ -17,42 +17,38 @@ pub(crate) use worker::Launch;
pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Config, Driver};
use crate::runtime::{blocking, driver, Config, Driver};
use crate::util::RngSeedGenerator;

use std::fmt;
use std::future::Future;

/// Work-stealing based thread pool for executing futures.
pub(crate) struct MultiThread {
spawner: Spawner,
}

/// Submits futures to the associated thread pool for execution.
///
/// A `Spawner` instance is a handle to a single thread pool that allows the owner
/// of the handle to spawn futures onto the thread pool.
///
/// The `Spawner` handle is *only* used for spawning new futures. It does not
/// impact the lifecycle of the thread pool in any way. The thread pool may
/// shut down while there are outstanding `Spawner` instances.
///
/// `Spawner` instances are obtained by calling [`MultiThread::spawner`].
///
/// [`MultiThread::spawner`]: method@MultiThread::spawner
#[derive(Clone)]
pub(crate) struct Spawner {
shared: Arc<worker::Shared>,
handle: Arc<Handle>,
}

// ===== impl MultiThread =====

impl MultiThread {
pub(crate) fn new(size: usize, driver: Driver, config: Config) -> (MultiThread, Launch) {
pub(crate) fn new(
size: usize,
driver: Driver,
driver_handle: driver::Handle,
blocking_spawner: blocking::Spawner,
seed_generator: RngSeedGenerator,
config: Config,
) -> (MultiThread, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(size, parker, config);
let spawner = Spawner { shared };
let multi_thread = MultiThread { spawner };
let (handle, launch) = worker::create(
size,
parker,
driver_handle,
blocking_spawner,
seed_generator,
config,
);
let multi_thread = MultiThread { handle };

(multi_thread, launch)
}
Expand All @@ -61,8 +57,8 @@ impl MultiThread {
///
/// The `Spawner` handle can be cloned and enables spawning tasks from other
/// threads.
pub(crate) fn spawner(&self) -> &Spawner {
&self.spawner
pub(crate) fn handle(&self) -> &Arc<Handle> {
&self.handle
}

/// Blocks the current thread waiting for the future to complete.
Expand All @@ -86,29 +82,6 @@ impl fmt::Debug for MultiThread {

impl Drop for MultiThread {
fn drop(&mut self) {
self.spawner.shutdown();
}
}

// ==== impl Spawner =====

impl Spawner {
/// Spawns a future onto the thread pool
pub(crate) fn spawn<F>(&self, future: F, id: task::Id) -> JoinHandle<F::Output>
where
F: crate::future::Future + Send + 'static,
F::Output: Send + 'static,
{
worker::Shared::bind_new_task(&self.shared, future, id)
}

pub(crate) fn shutdown(&self) {
self.shared.close();
}
}

impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
self.handle.shutdown();
}
}