Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: Refactor Runtime::block_on to take &self #2782

Merged
merged 11 commits into from Aug 28, 2020
10 changes: 5 additions & 5 deletions benches/mpsc.rs
Expand Up @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}

fn contention_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}

fn contention_bounded_full(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}

fn contention_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}

fn uncontented_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}

fn uncontented_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down
6 changes: 3 additions & 3 deletions benches/scheduler.rs
Expand Up @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;

let mut rt = rt();
let rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;

let mut rt = rt();
let rt = rt();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;

let mut rt = rt();
let rt = rt();

fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
Expand Down
12 changes: 6 additions & 6 deletions benches/spawn.rs
Expand Up @@ -10,7 +10,7 @@ async fn work() -> usize {
}

fn basic_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}

fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
Expand All @@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
.basic_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand All @@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
.threaded_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_rwlock.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_semaphore.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/rt_shell.rs
Expand Up @@ -18,7 +18,7 @@ fn basic_shell_rt() {
});

for _ in 0..1_000 {
let mut rt = runtime::Builder::new().build().unwrap();
let rt = runtime::Builder::new().build().unwrap();

let (tx, rx) = oneshot::channel();

Expand Down
2 changes: 1 addition & 1 deletion tokio-test/src/lib.rs
Expand Up @@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;

let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
Expand Down
13 changes: 6 additions & 7 deletions tokio-util/src/context.rs
Expand Up @@ -12,7 +12,7 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use tokio::runtime::Runtime;

pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
Expand All @@ -22,7 +22,7 @@ pin_project! {
pub struct TokioContext<F> {
#[pin]
inner: F,
handle: Handle,
handle: Runtime,
}
}

Expand All @@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> {
}

/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
pub trait RuntimeExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio_util::context::RuntimeExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
Expand All @@ -61,14 +61,13 @@ pub trait HandleExt {
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
}

impl HandleExt for Handle {
impl RuntimeExt for Runtime {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
TokioContext {
inner: fut,
Expand Down
9 changes: 3 additions & 6 deletions tokio-util/tests/context.rs
Expand Up @@ -2,11 +2,11 @@

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;
use tokio_util::context::RuntimeExt;

#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
let rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
Expand All @@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() {

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await }));
}
4 changes: 2 additions & 2 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
Expand Down Expand Up @@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/io/registration.rs
Expand Up @@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/listener.rs
Expand Up @@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/stream.rs
Expand Up @@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/udp/socket.rs
Expand Up @@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/net/unix/listener.rs
Expand Up @@ -60,7 +60,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn bind<P>(path: P) -> io::Result<UnixListener>
where
P: AsRef<Path>,
Expand All @@ -82,7 +82,7 @@ impl UnixListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::UnixListener) -> io::Result<UnixListener> {
let listener = mio_uds::UnixListener::from_listener(listener)?;
let io = PollEvented::new(listener)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/stream.rs
Expand Up @@ -54,7 +54,7 @@ impl UnixStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
let stream = mio_uds::UnixStream::from_stream(stream)?;
let io = PollEvented::new(stream)?;
Expand Down