Skip to content

Commit

Permalink
rt: Refactor Runtime::block_on to take &self (#2782)
Browse files Browse the repository at this point in the history
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
LucioFranco and hawkw committed Aug 28, 2020
1 parent d9d909c commit d600ab9
Show file tree
Hide file tree
Showing 41 changed files with 331 additions and 693 deletions.
10 changes: 5 additions & 5 deletions benches/mpsc.rs
Expand Up @@ -43,7 +43,7 @@ fn send_large(b: &mut Bencher) {
}

fn contention_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -70,7 +70,7 @@ fn contention_bounded(b: &mut Bencher) {
}

fn contention_bounded_full(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contention_bounded_full(b: &mut Bencher) {
}

fn contention_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -124,7 +124,7 @@ fn contention_unbounded(b: &mut Bencher) {
}

fn uncontented_bounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -146,7 +146,7 @@ fn uncontented_bounded(b: &mut Bencher) {
}

fn uncontented_unbounded(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down
6 changes: 3 additions & 3 deletions benches/scheduler.rs
Expand Up @@ -13,7 +13,7 @@ use std::sync::{mpsc, Arc};
fn spawn_many(b: &mut Bencher) {
const NUM_SPAWN: usize = 10_000;

let mut rt = rt();
let rt = rt();

let (tx, rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -68,7 +68,7 @@ fn yield_many(b: &mut Bencher) {
fn ping_pong(b: &mut Bencher) {
const NUM_PINGS: usize = 1_000;

let mut rt = rt();
let rt = rt();

let (done_tx, done_rx) = mpsc::sync_channel(1000);
let rem = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -111,7 +111,7 @@ fn ping_pong(b: &mut Bencher) {
fn chained_spawn(b: &mut Bencher) {
const ITER: usize = 1_000;

let mut rt = rt();
let rt = rt();

fn iter(done_tx: mpsc::SyncSender<()>, n: usize) {
if n == 0 {
Expand Down
12 changes: 6 additions & 6 deletions benches/spawn.rs
Expand Up @@ -10,7 +10,7 @@ async fn work() -> usize {
}

fn basic_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -23,7 +23,7 @@ fn basic_scheduler_local_spawn(bench: &mut Bencher) {
}

fn threaded_scheduler_local_spawn(bench: &mut Bencher) {
let mut runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.build()
.unwrap();
Expand All @@ -40,9 +40,9 @@ fn basic_scheduler_remote_spawn(bench: &mut Bencher) {
.basic_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand All @@ -52,9 +52,9 @@ fn threaded_scheduler_remote_spawn(bench: &mut Bencher) {
.threaded_scheduler()
.build()
.unwrap();
let handle = runtime.handle();

bench.iter(|| {
let h = handle.spawn(work());
let h = runtime.spawn(work());
black_box(h);
});
}
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_rwlock.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::RwLock, task};

fn read_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -22,7 +22,7 @@ fn read_uncontended(b: &mut Bencher) {
}

fn read_concurrent_uncontended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -51,7 +51,7 @@ fn read_concurrent_uncontended_multi(b: &mut Bencher) {
}

fn read_concurrent_uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -78,7 +78,7 @@ fn read_concurrent_uncontended(b: &mut Bencher) {
}

fn read_concurrent_contended_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand Down Expand Up @@ -108,7 +108,7 @@ fn read_concurrent_contended_multi(b: &mut Bencher) {
}

fn read_concurrent_contended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions benches/sync_semaphore.rs
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use tokio::{sync::Semaphore, task};

fn uncontended(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -27,7 +27,7 @@ async fn task(s: Arc<Semaphore>) {
}

fn uncontended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -51,7 +51,7 @@ fn uncontended_concurrent_multi(b: &mut Bencher) {
}

fn uncontended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand All @@ -73,7 +73,7 @@ fn uncontended_concurrent_single(b: &mut Bencher) {
}

fn contended_concurrent_multi(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.core_threads(6)
.threaded_scheduler()
.build()
Expand All @@ -97,7 +97,7 @@ fn contended_concurrent_multi(b: &mut Bencher) {
}

fn contended_concurrent_single(b: &mut Bencher) {
let mut rt = tokio::runtime::Builder::new()
let rt = tokio::runtime::Builder::new()
.basic_scheduler()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests-integration/tests/rt_shell.rs
Expand Up @@ -18,7 +18,7 @@ fn basic_shell_rt() {
});

for _ in 0..1_000 {
let mut rt = runtime::Builder::new().build().unwrap();
let rt = runtime::Builder::new().build().unwrap();

let (tx, rx) = oneshot::channel();

Expand Down
2 changes: 1 addition & 1 deletion tokio-test/src/lib.rs
Expand Up @@ -28,7 +28,7 @@ pub mod task;
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
use tokio::runtime;

let mut rt = runtime::Builder::new()
let rt = runtime::Builder::new()
.basic_scheduler()
.enable_all()
.build()
Expand Down
23 changes: 11 additions & 12 deletions tokio-util/src/context.rs
Expand Up @@ -12,21 +12,21 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use tokio::runtime::Runtime;

pin_project! {
/// `TokioContext` allows connecting a custom executor with the tokio runtime.
///
/// It contains a `Handle` to the runtime. A handle to the runtime can be
/// obtain by calling the `Runtime::handle()` method.
pub struct TokioContext<F> {
pub struct TokioContext<'a, F> {
#[pin]
inner: F,
handle: Handle,
handle: &'a Runtime,
}
}

impl<F: Future> Future for TokioContext<F> {
impl<F: Future> Future for TokioContext<'_, F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -39,16 +39,16 @@ impl<F: Future> Future for TokioContext<F> {
}

/// Trait extension that simplifies bundling a `Handle` with a `Future`.
pub trait HandleExt {
pub trait RuntimeExt {
/// Convenience method that takes a Future and returns a `TokioContext`.
///
/// # Example: calling Tokio Runtime from a custom ThreadPool
///
/// ```no_run
/// use tokio_util::context::HandleExt;
/// use tokio_util::context::RuntimeExt;
/// use tokio::time::{delay_for, Duration};
///
/// let mut rt = tokio::runtime::Builder::new()
/// let rt = tokio::runtime::Builder::new()
/// .threaded_scheduler()
/// .enable_all()
/// .build().unwrap();
Expand All @@ -61,18 +61,17 @@ pub trait HandleExt {
///
/// rt.block_on(
/// rt2
/// .handle()
/// .wrap(async { delay_for(Duration::from_millis(2)).await }),
/// );
///```
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F>;
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F>;
}

impl HandleExt for Handle {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<F> {
impl RuntimeExt for Runtime {
fn wrap<F: Future>(&self, fut: F) -> TokioContext<'_, F> {
TokioContext {
inner: fut,
handle: self.clone(),
handle: self,
}
}
}
9 changes: 3 additions & 6 deletions tokio-util/tests/context.rs
Expand Up @@ -2,11 +2,11 @@

use tokio::runtime::Builder;
use tokio::time::*;
use tokio_util::context::HandleExt;
use tokio_util::context::RuntimeExt;

#[test]
fn tokio_context_with_another_runtime() {
let mut rt1 = Builder::new()
let rt1 = Builder::new()
.threaded_scheduler()
.core_threads(1)
// no timer!
Expand All @@ -21,8 +21,5 @@ fn tokio_context_with_another_runtime() {

// Without the `HandleExt.wrap()` there would be a panic because there is
// no timer running, since it would be referencing runtime r1.
let _ = rt1.block_on(
rt2.handle()
.wrap(async move { delay_for(Duration::from_millis(2)).await }),
);
let _ = rt1.block_on(rt2.wrap(async move { delay_for(Duration::from_millis(2)).await }));
}
4 changes: 2 additions & 2 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -173,7 +173,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new(io: E) -> io::Result<Self> {
PollEvented::new_with_ready(io, mio::Ready::all())
}
Expand Down Expand Up @@ -201,7 +201,7 @@ where
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready(io: E, ready: mio::Ready) -> io::Result<Self> {
let registration = Registration::new_with_ready(&io, ready)?;
Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/io/registration.rs
Expand Up @@ -67,7 +67,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new<T>(io: &T) -> io::Result<Registration>
where
T: Evented,
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Registration {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
where
T: Evented,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/listener.rs
Expand Up @@ -262,7 +262,7 @@ impl TcpListener {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/tcp/stream.rs
Expand Up @@ -187,7 +187,7 @@ impl TcpStream {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/udp/socket.rs
Expand Up @@ -64,7 +64,7 @@ impl UdpSocket {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
let io = mio::net::UdpSocket::from_socket(socket)?;
let io = PollEvented::new(io)?;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -164,7 +164,7 @@ impl UnixDatagram {
///
/// The runtime is usually set implicitly when this function is called
/// from a future driven by a Tokio runtime, otherwise runtime can be set
/// explicitly with [`Handle::enter`](crate::runtime::Handle::enter) function.
/// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
/// # Examples
/// ```
/// # use std::error::Error;
Expand Down

0 comments on commit d600ab9

Please sign in to comment.