Skip to content

Commit

Permalink
rt: Refactor Runtime::block_on to take &self
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Aug 20, 2020
1 parent c393236 commit 013e8ef
Show file tree
Hide file tree
Showing 24 changed files with 149 additions and 141 deletions.
10 changes: 5 additions & 5 deletions benches/mpsc.rs
Expand Up @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}

fn contention_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}

fn contention_bounded_full(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}

fn contention_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}

fn uncontented_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}

fn uncontented_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down
6 changes: 3 additions & 3 deletions benches/scheduler.rs
Expand Up @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;

let mut rt = rt();
let rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;

let mut rt = rt();
let rt = rt();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;

let mut rt = rt();
let rt = rt();

fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
Expand Down
4 changes: 2 additions & 2 deletions benches/spawn.rs
Expand Up @@ -10,7 +10,7 @@ async fn work() -> usize {
}

fn basic_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}

fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_rwlock.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_semaphore.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/rt_shell.rs
Expand Up @@ -18,7 +18,7 @@ fn basic_shell_rt() {
});

for _ in 0..1_000 {
let mut rt = runtime::Builder::new().build().unwrap();
let rt = runtime::Builder::new().build().unwrap();

let (tx, rx) = oneshot::channel();

Expand Down
2 changes: 1 addition & 1 deletion tokio-test/src/lib.rs
Expand Up @@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;

let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
Expand Down
11 changes: 6 additions & 5 deletions tokio/src/runtime/blocking/mod.rs
Expand Up @@ -12,24 +12,25 @@ cfg_blocking_impl! {
pub(crate) mod task;

use crate::runtime::Builder;
use crate::loom::sync::Arc;

pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)

pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> Arc<BlockingPool> {
Arc::new(BlockingPool::new(builder, thread_cap))
}
}

cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;
use crate::loom::sync::Arc;

#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}

pub(crate) use BlockingPool as Spawner;

pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
BlockingPool {}
pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> Arc<BlockingPool> {
Arc::new(BlockingPool {})
}

impl BlockingPool {
Expand Down
13 changes: 6 additions & 7 deletions tokio/src/runtime/builder.rs
@@ -1,10 +1,9 @@
use crate::loom::sync::{Arc, Mutex};
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};

use std::fmt;
#[cfg(not(loom))]
use std::sync::Arc;

/// Builds Tokio Runtime with custom configuration values.
///
Expand Down Expand Up @@ -263,7 +262,7 @@ impl Builder {
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self.after_start = Some(std::sync::Arc::new(f));
self
}

Expand Down Expand Up @@ -303,7 +302,7 @@ impl Builder {
/// ```
/// use tokio::runtime::Builder;
///
/// let mut rt = Builder::new().build().unwrap();
/// let rt = Builder::new().build().unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
Expand Down Expand Up @@ -334,7 +333,7 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Shell(Shell::new(driver)),
kind: Kind::Shell(Arc::new(Mutex::new(Shell::new(driver)))),
handle: Handle {
spawner,
io_handle,
Expand Down Expand Up @@ -433,7 +432,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Basic(scheduler),
kind: Kind::Basic(Arc::new(Mutex::new(scheduler))),
handle: Handle {
spawner,
io_handle,
Expand Down Expand Up @@ -493,7 +492,7 @@ cfg_rt_threaded! {
handle.enter(|| launch.launch());

Ok(Runtime {
kind: Kind::ThreadPool(scheduler),
kind: Kind::ThreadPool(Arc::new(scheduler)),
handle,
blocking_pool,
})
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/handle.rs
Expand Up @@ -244,7 +244,7 @@ cfg_rt_core! {
/// use std::thread;
///
/// // Create the runtime.
/// let mut rt = Builder::new()
/// let rt = Builder::new()
/// .enable_all()
/// .basic_scheduler()
/// .build()
Expand Down

0 comments on commit 013e8ef

Please sign in to comment.