Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: Refactor Runtime::block_on to take &self #2782

Merged
merged 11 commits into from Aug 28, 2020
10 changes: 5 additions & 5 deletions benches/mpsc.rs
Expand Up @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}

fn contention_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}

fn contention_bounded_full(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}

fn contention_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}

fn uncontented_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}

fn uncontented_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down
6 changes: 3 additions & 3 deletions benches/scheduler.rs
Expand Up @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;

let mut rt = rt();
let rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;

let mut rt = rt();
let rt = rt();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;

let mut rt = rt();
let rt = rt();

fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
Expand Down
12 changes: 6 additions & 6 deletions benches/spawn.rs
Expand Up @@ -10,7 +10,7 @@ async fn work() -> usize {
}

fn basic_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}

fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
Expand All @@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
.basic_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand All @@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
.threaded_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_rwlock.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_semaphore.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/rt_shell.rs
Expand Up @@ -18,7 +18,7 @@ fn basic_shell_rt() {
});

for _ in 0..1_000 {
let mut rt = runtime::Builder::new().build().unwrap();
let rt = runtime::Builder::new().build().unwrap();

let (tx, rx) = oneshot::channel();

Expand Down
2 changes: 1 addition & 1 deletion tokio-test/src/lib.rs
Expand Up @@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;

let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
Expand Down
23 changes: 11 additions & 12 deletions tokio-util/src/context.rs
Expand Up @@ -12,21 +12,21 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use tokio::runtime::Runtime;

pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
pub struct TokioContext<'a, F> {
#[pin]
inner: F,
handle: Handle,
handle: &'a Runtime,
Comment on lines +22 to +25
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...unlike the prior version, this now has a borrow in it, so it's not 'static. You'd have to write

let rt = ...;
some_other_future_executor::spawn(async move {
    rt.wrap(some_future).await
});

rather than

let rt = ...;
some_other_future_executor::spawn(rt.wrap(some_future));

which seems like a minor ergonomic speed bump in what i imagine is more or less the most common use case for this?

Maybe we should just clone it...what do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So Runtime doesn't implement clone anymore. But you can achieve the same thing if you async move { rt.wrap(fut) } and if you warp the Runtime with an Arc you can clone before moving.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revisit this in the near future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup,

So Runtime doesn't implement clone anymore. But you can achieve the same thing if you async move { rt.wrap(fut) } and if you warp the Runtime with an Arc you can clone before moving.

yup, i missed Carl's review feedback while looking at this. seems good to me, now that the arc is factored out. 👍

}
}

impl<F: Future> Future for TokioContext<F> {
impl<F: Future> Future for TokioContext<'_, F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> {
}

/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
pub trait RuntimeExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio_util::context::RuntimeExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
Expand All @@ -61,18 +61,17 @@ pub trait HandleExt {
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>;
}

impl HandleExt for Handle {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
impl RuntimeExt for Runtime {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> {
TokioContext {
inner: fut,
handle: self.clone(),
handle: self,
}
}
}
9 changes: 3 additions & 6 deletions tokio-util/tests/context.rs
Expand Up @@ -2,11 +2,11 @@

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;
use tokio_util::context::RuntimeExt;

#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
let rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
Expand All @@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() {

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await }));
}
4 changes: 2 additions & 2 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
Expand Down Expand Up @@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/io/registration.rs
Expand Up @@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/listener.rs
Expand Up @@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/stream.rs
Expand Up @@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/udp/socket.rs
Expand Up @@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;
Expand Down