From 6fce541fa35d210b6ffcd3b8dee4117d67a0d031 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Mon, 24 Aug 2020 23:08:06 +0200 Subject: [PATCH 01/18] tokio-utils: create TokioContext Run a future on a custom executor using the tokio runtime. Resolves: #2776 --- tokio-util/src/context.rs | 30 ++++++++++++++++++++++++++++++ tokio-util/src/lib.rs | 1 + tokio-util/tests/context.rs | 6 ++++++ 3 files changed, 37 insertions(+) create mode 100644 tokio-util/src/context.rs create mode 100644 tokio-util/tests/context.rs diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs new file mode 100644 index 00000000000..85750f746a4 --- /dev/null +++ b/tokio-util/src/context.rs @@ -0,0 +1,30 @@ +//! This module introduces `TokioContext`, which enables a simple way of using +//! the tokio runtime with any other executor. + +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::runtime::Handle; + +pin_project! { + /// TODO: add docs + pub struct TokioContext { + #[pin] + inner: F, + handle: Handle, + } +} + +impl Future for TokioContext { + type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + let handle = me.handle; + let fut = me.inner; + + handle.enter(|| fut.poll(cx)) + } +} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index ec69f59d04b..cd4c76e5142 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -38,4 +38,5 @@ cfg_compat! { pub mod compat; } +pub mod context; pub mod sync; diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs new file mode 100644 index 00000000000..6ab61a8d407 --- /dev/null +++ b/tokio-util/tests/context.rs @@ -0,0 +1,6 @@ +#![warn(rust_2018_idioms)] + +use tokio_util::context::TokioContext; + +#[test] +fn tokio_context_with_another_runtime() {} From e7913a4784810ed31436f093b3bbf29cc0193d30 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Mon, 24 Aug 2020 23:47:25 +0200 Subject: [PATCH 02/18] test passing --- tokio-util/Cargo.toml | 3 ++- tokio-util/src/context.rs | 7 ++++++ tokio-util/src/lib.rs | 1 + tokio-util/tests/context.rs | 45 ++++++++++++++++++++++++++++++++++++- 4 files changed, 54 insertions(+), 2 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index b47c9dfc21c..6739e8b5d3b 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -40,12 +40,13 @@ futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } log = "0.4" pin-project-lite = "0.1.4" +lazy_static = "1.4.0" [dev-dependencies] tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.3.0", path = "../tokio-test" } -futures = "0.3.0" +futures = { version = "0.3.0", features = ["thread-pool"] } futures-test = "0.3.5" [package.metadata.docs.rs] diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 85750f746a4..aa3a6b7a604 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -28,3 +28,10 @@ impl Future for TokioContext { handle.enter(|| fut.poll(cx)) } } + +impl TokioContext { + /// docs + pub fn new(f: F, handle: Handle) -> Self { + Self { inner: f, handle } + } +} diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index cd4c76e5142..3e9a3b7e6db 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -39,4 +39,5 @@ cfg_compat! { } pub mod context; + pub mod sync; diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 6ab61a8d407..3269f952811 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -1,6 +1,49 @@ #![warn(rust_2018_idioms)] +use tokio::{net::TcpListener, sync::oneshot}; use tokio_util::context::TokioContext; +use lazy_static::lazy_static; +use std::future::Future; + +struct ThreadPool { + inner: futures::executor::ThreadPool, + rt: tokio::runtime::Runtime, +} + +lazy_static! { + static ref EXECUTOR: ThreadPool = { + // Spawn tokio runtime on a single background thread + // enabling IO and timers. + let rt = tokio::runtime::Builder::new() + .basic_scheduler() + .enable_all() + .core_threads(1) + .build() + .unwrap(); + + let inner = futures::executor::ThreadPool::builder().create().unwrap(); + + ThreadPool { inner, rt } + }; +} + +impl ThreadPool { + fn spawn(&self, f: impl Future + Send + 'static) { + let handle = self.rt.handle().clone(); + self.inner.spawn_ok(TokioContext::new(f, handle)); + } +} + #[test] -fn tokio_context_with_another_runtime() {} +fn tokio_context_with_another_runtime() { + let (tx, rx) = oneshot::channel(); + + EXECUTOR.spawn(async move { + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + println!("addr: {:?}", listener.local_addr()); + tx.send(()).unwrap(); + }); + + futures::executor::block_on(rx).unwrap(); +} From 473b8d873400ab52592018b8754dc3b059ee20f6 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Mon, 24 Aug 2020 23:52:52 +0200 Subject: [PATCH 03/18] add some docs --- tokio-util/src/context.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index aa3a6b7a604..b5c007318bd 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -10,7 +10,7 @@ use std::{ use tokio::runtime::Handle; pin_project! { - /// TODO: add docs + /// TODO: add docs. Get some inspiration? pub struct TokioContext { #[pin] inner: F, @@ -30,7 +30,8 @@ impl Future for TokioContext { } impl TokioContext { - /// docs + /// Creates a new `TokioContext`. Expects a future as its first argument, and a + /// `tokio::runtime::Handle` as a second. pub fn new(f: F, handle: Handle) -> Self { Self { inner: f, handle } } From ef1cd37ba826f942e71bcfff2dc88cd4e0c2181b Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Tue, 25 Aug 2020 11:29:16 +0200 Subject: [PATCH 04/18] move lazy_static into dev-dependencies --- tokio-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 6739e8b5d3b..e03297ea91b 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -40,9 +40,9 @@ futures-sink = "0.3.0" futures-io = { version = "0.3.0", optional = true } log = "0.4" pin-project-lite = "0.1.4" -lazy_static = "1.4.0" [dev-dependencies] +lazy_static = "1.4.0" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.3.0", path = "../tokio-test" } From 8006c4263e4d3c2b6ee2709a0a02d013b323c120 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Tue, 25 Aug 2020 22:05:36 +0200 Subject: [PATCH 05/18] include suggestions --- tokio-util/Cargo.toml | 1 - tokio-util/src/context.rs | 34 +++++++++++++++++++++++++++------- tokio-util/tests/context.rs | 31 ++++++++++++++----------------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e03297ea91b..c9133fe0730 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -42,7 +42,6 @@ log = "0.4" pin-project-lite = "0.1.4" [dev-dependencies] -lazy_static = "1.4.0" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.3.0", path = "../tokio-test" } diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index b5c007318bd..2f9c0a7523b 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -10,7 +10,7 @@ use std::{ use tokio::runtime::Handle; pin_project! { - /// TODO: add docs. Get some inspiration? + /// TokioContext allows connecting a custom executor with the tokio runtime. pub struct TokioContext { #[pin] inner: F, @@ -28,11 +28,31 @@ impl Future for TokioContext { handle.enter(|| fut.poll(cx)) } } - -impl TokioContext { - /// Creates a new `TokioContext`. Expects a future as its first argument, and a - /// `tokio::runtime::Handle` as a second. - pub fn new(f: F, handle: Handle) -> Self { - Self { inner: f, handle } +/// Trait extension that simplifies bundling a `Handle` with a `Future`. +pub trait HandleExt: Into + Clone { + /// Convenience method that takes a Future and returns a `TokioContext`. + /// # Example + /// ```rust,no_run + /// + /// use std::futures::Future; + /// use tokio-utils::context::{HandleExt}; + /// use tokio::runtime::Handle; + /// + /// impl ThreadPool { + /// fn spawn(&self, f: impl Future + Send + 'static) { + /// let handle = self.rt.handle().clone(); + /// // create a TokioContext from the handle and future. + /// let h = handle.wrap(f); + /// self.inner.spawn_ok(h); + /// } + /// ``` + /// + fn wrap(&self, fut: F) -> TokioContext + where + F: Future, + { + TokioContext {inner: fut, handle: (*self).clone().into()} } } + +impl HandleExt for Handle {} \ No newline at end of file diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 3269f952811..f1a46e9339b 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -1,9 +1,8 @@ #![warn(rust_2018_idioms)] use tokio::{net::TcpListener, sync::oneshot}; -use tokio_util::context::TokioContext; +use tokio_util::context::{HandleExt, TokioContext}; -use lazy_static::lazy_static; use std::future::Future; struct ThreadPool { @@ -11,8 +10,18 @@ struct ThreadPool { rt: tokio::runtime::Runtime, } -lazy_static! { - static ref EXECUTOR: ThreadPool = { +impl ThreadPool { + fn spawn(&self, f: impl Future + Send + 'static) { + let handle = self.rt.handle().clone(); + let h: TokioContext<_> = handle.wrap(f); + self.inner.spawn_ok(h); + } +} + +#[test] +fn tokio_context_with_another_runtime() { + let (tx, rx) = oneshot::channel(); + let custom_executor: ThreadPool = { // Spawn tokio runtime on a single background thread // enabling IO and timers. let rt = tokio::runtime::Builder::new() @@ -26,20 +35,8 @@ lazy_static! { ThreadPool { inner, rt } }; -} - -impl ThreadPool { - fn spawn(&self, f: impl Future + Send + 'static) { - let handle = self.rt.handle().clone(); - self.inner.spawn_ok(TokioContext::new(f, handle)); - } -} - -#[test] -fn tokio_context_with_another_runtime() { - let (tx, rx) = oneshot::channel(); - EXECUTOR.spawn(async move { + custom_executor.spawn(async move { let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); println!("addr: {:?}", listener.local_addr()); tx.send(()).unwrap(); From 21f2fa7dbc6272dd658cb7e5746cdff6560e0344 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Tue, 25 Aug 2020 22:05:51 +0200 Subject: [PATCH 06/18] fix doc warnings --- tokio-macros/src/lib.rs | 2 +- tokio/src/time/delay_queue.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 2bb0c2100f9..320fdbe4cd8 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(intra_doc_link_resolution_failure)] +#![deny(broken_intra_doc_links)] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index a947cc6fe4e..e6bc6bc0df6 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -403,7 +403,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: method@Self::poll + /// [`poll`]: struct@DelayQueue::poll /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -582,7 +582,7 @@ impl DelayQueue { /// /// Note that this method has no effect on the allocated capacity. /// - /// [`poll`]: method@Self::poll + /// [`poll`]: struct@DelayQueue::poll /// /// # Examples /// From 61f5a3e5c1ecd96125dfc996cd103353d015d50a Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 16:53:12 +0200 Subject: [PATCH 07/18] add a new test w/o timer or io --- tokio-util/tests/context.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index f1a46e9339b..c4fc51a07a9 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -5,6 +5,9 @@ use tokio_util::context::{HandleExt, TokioContext}; use std::future::Future; + +use tokio::runtime::Builder; + struct ThreadPool { inner: futures::executor::ThreadPool, rt: tokio::runtime::Runtime, @@ -20,11 +23,12 @@ impl ThreadPool { #[test] fn tokio_context_with_another_runtime() { + let (tx, rx) = oneshot::channel(); let custom_executor: ThreadPool = { // Spawn tokio runtime on a single background thread // enabling IO and timers. - let rt = tokio::runtime::Builder::new() + let rt = Builder::new() .basic_scheduler() .enable_all() .core_threads(1) @@ -44,3 +48,32 @@ fn tokio_context_with_another_runtime() { futures::executor::block_on(rx).unwrap(); } + +#[test] +#[should_panic] +fn tokio_context_with_another_runtime_no_timer_io() { + let (tx, rx) = oneshot::channel(); + let custom_executor: ThreadPool = { + // Spawn tokio runtime on a single background thread + // enabling IO and timers. + let rt = Builder::new() + .basic_scheduler() + .core_threads(1) + .build() + .unwrap(); + + let inner = futures::executor::ThreadPool::builder().create().unwrap(); + + ThreadPool { inner, rt } + }; + + custom_executor.spawn(async move { + let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); + println!("addr: {:?}", listener.local_addr()); + tx.send(()).unwrap(); + }); + + // panics: "there is no reactor running, must be called from the context + // of Tokio runtime" + futures::executor::block_on(rx).unwrap(); +} From a498afe7b078e82b76449e70cda3ab814d758b14 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 16:59:14 +0200 Subject: [PATCH 08/18] rustfmt --- tokio-util/src/context.rs | 15 ++++++++------- tokio-util/tests/context.rs | 4 +--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 2f9c0a7523b..1e990db6055 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -32,7 +32,6 @@ impl Future for TokioContext { pub trait HandleExt: Into + Clone { /// Convenience method that takes a Future and returns a `TokioContext`. /// # Example - /// ```rust,no_run /// /// use std::futures::Future; /// use tokio-utils::context::{HandleExt}; @@ -45,14 +44,16 @@ pub trait HandleExt: Into + Clone { /// let h = handle.wrap(f); /// self.inner.spawn_ok(h); /// } - /// ``` /// - fn wrap(&self, fut: F) -> TokioContext - where - F: Future, + fn wrap(&self, fut: F) -> TokioContext + where + F: Future, { - TokioContext {inner: fut, handle: (*self).clone().into()} + TokioContext { + inner: fut, + handle: (*self).clone().into(), + } } } -impl HandleExt for Handle {} \ No newline at end of file +impl HandleExt for Handle {} diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index c4fc51a07a9..95964049087 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -5,7 +5,6 @@ use tokio_util::context::{HandleExt, TokioContext}; use std::future::Future; - use tokio::runtime::Builder; struct ThreadPool { @@ -23,7 +22,6 @@ impl ThreadPool { #[test] fn tokio_context_with_another_runtime() { - let (tx, rx) = oneshot::channel(); let custom_executor: ThreadPool = { // Spawn tokio runtime on a single background thread @@ -73,7 +71,7 @@ fn tokio_context_with_another_runtime_no_timer_io() { tx.send(()).unwrap(); }); - // panics: "there is no reactor running, must be called from the context + // panics: "there is no reactor running, must be called from the context // of Tokio runtime" futures::executor::block_on(rx).unwrap(); } From e4a0085cbd730291acf8e3fc9218d9f36dce7d20 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 19:41:34 +0200 Subject: [PATCH 09/18] two runtime tests (WIP) - failing --- tokio-util/tests/context.rs | 89 ++++++++++--------------------------- 1 file changed, 23 insertions(+), 66 deletions(-) diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 95964049087..9a15f536781 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -1,77 +1,34 @@ #![warn(rust_2018_idioms)] -use tokio::{net::TcpListener, sync::oneshot}; -use tokio_util::context::{HandleExt, TokioContext}; - -use std::future::Future; +use tokio_util::context::HandleExt; use tokio::runtime::Builder; -struct ThreadPool { - inner: futures::executor::ThreadPool, - rt: tokio::runtime::Runtime, -} - -impl ThreadPool { - fn spawn(&self, f: impl Future + Send + 'static) { - let handle = self.rt.handle().clone(); - let h: TokioContext<_> = handle.wrap(f); - self.inner.spawn_ok(h); - } -} - +use tokio::time::*; #[test] fn tokio_context_with_another_runtime() { - let (tx, rx) = oneshot::channel(); - let custom_executor: ThreadPool = { - // Spawn tokio runtime on a single background thread - // enabling IO and timers. - let rt = Builder::new() - .basic_scheduler() - .enable_all() - .core_threads(1) - .build() - .unwrap(); - - let inner = futures::executor::ThreadPool::builder().create().unwrap(); - - ThreadPool { inner, rt } - }; - - custom_executor.spawn(async move { - let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); - println!("addr: {:?}", listener.local_addr()); - tx.send(()).unwrap(); + let mut rt1 = Builder::new() + .basic_scheduler() + .core_threads(1) + .build() + .unwrap(); + let rt2 = Builder::new() + .basic_scheduler() + .core_threads(1) + .enable_all() + .build() + .unwrap(); + + #[allow(unused_attributes)] + #[should_panic] + let _ = rt1.block_on(async move { + delay_for(Duration::from_micros(10)).await; }); - futures::executor::block_on(rx).unwrap(); -} - -#[test] -#[should_panic] -fn tokio_context_with_another_runtime_no_timer_io() { - let (tx, rx) = oneshot::channel(); - let custom_executor: ThreadPool = { - // Spawn tokio runtime on a single background thread - // enabling IO and timers. - let rt = Builder::new() - .basic_scheduler() - .core_threads(1) - .build() - .unwrap(); - - let inner = futures::executor::ThreadPool::builder().create().unwrap(); - - ThreadPool { inner, rt } - }; - - custom_executor.spawn(async move { - let listener = TcpListener::bind("0.0.0.0:0").await.unwrap(); - println!("addr: {:?}", listener.local_addr()); - tx.send(()).unwrap(); + let _ = rt1.block_on(async move { + rt2.handle() + .clone() + .wrap(tokio::time::delay_for(Duration::from_micros(10))) + .await }); - - // panics: "there is no reactor running, must be called from the context - // of Tokio runtime" - futures::executor::block_on(rx).unwrap(); } From c50a48dfbd8586db8a973cdfce9a7a5f57e5c1f4 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 20:38:34 +0200 Subject: [PATCH 10/18] enrich the docs --- tokio-util/src/context.rs | 42 +++++++++++++++++++++++++++++-------- tokio-util/tests/context.rs | 25 ++++++++-------------- 2 files changed, 42 insertions(+), 25 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 1e990db6055..81a70985d91 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -11,6 +11,9 @@ use tokio::runtime::Handle; pin_project! { /// TokioContext allows connecting a custom executor with the tokio runtime. + /// + /// It contains a `Handle` to the runtime. A handle to the runtime can be + /// obtain by calling the `Runtime::handle()` method. pub struct TokioContext { #[pin] inner: F, @@ -31,20 +34,41 @@ impl Future for TokioContext { /// Trait extension that simplifies bundling a `Handle` with a `Future`. pub trait HandleExt: Into + Clone { /// Convenience method that takes a Future and returns a `TokioContext`. - /// # Example /// - /// use std::futures::Future; - /// use tokio-utils::context::{HandleExt}; - /// use tokio::runtime::Handle; + /// # Example: calling Tokio Runtime from a custom ThreadPool + /// + /// use std::futures::{self, Future}; + /// use tokio-utils::context::HandleExt; + /// use tokio::runtime::{Builder, Handle, Runtime}; + /// use tokio::time::{delay_for, Duration}; + /// + /// struct ThreadPool { + /// pub inner: futures::executor::ThreadPool, + /// pub rt: Runtime, + /// } /// /// impl ThreadPool { - /// fn spawn(&self, f: impl Future + Send + 'static) { - /// let handle = self.rt.handle().clone(); - /// // create a TokioContext from the handle and future. - /// let h = handle.wrap(f); - /// self.inner.spawn_ok(h); + /// fn spawn(&self, f: impl Future + Send + 'static) { + /// let handle = self.rt.handle().clone(); + /// // create a TokioContext from the handle and future. + /// let h = handle.wrap(f); + /// self.inner.spawn_ok(h); + /// } /// } /// + /// let rt = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .enable_all() + /// .build.unwrap(); + /// let inner = futures::executor::ThreadPool::builder().create().unwrap(); + /// let executor: ThreadPool { + /// inner, rt + /// } + /// + /// executor.spawn(async { + /// delay_for(Duration::from_millis(2)).await + /// }); + fn wrap(&self, fut: F) -> TokioContext where F: Future, diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 9a15f536781..93aaa70425c 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -1,34 +1,27 @@ #![warn(rust_2018_idioms)] -use tokio_util::context::HandleExt; - use tokio::runtime::Builder; - use tokio::time::*; +use tokio_util::context::HandleExt; #[test] fn tokio_context_with_another_runtime() { let mut rt1 = Builder::new() - .basic_scheduler() + .threaded_scheduler() .core_threads(1) + // no timer! .build() .unwrap(); let rt2 = Builder::new() - .basic_scheduler() + .threaded_scheduler() .core_threads(1) .enable_all() .build() .unwrap(); - #[allow(unused_attributes)] - #[should_panic] - let _ = rt1.block_on(async move { - delay_for(Duration::from_micros(10)).await; - }); - - let _ = rt1.block_on(async move { + // Without the `HandleExt.wrap()` there would be a panic because there is + // no timer running, since it would be referencing runtime r1. + let _ = rt1.block_on( rt2.handle() - .clone() - .wrap(tokio::time::delay_for(Duration::from_micros(10))) - .await - }); + .wrap(async move { delay_for(Duration::from_millis(2)).await }), + ); } From abea02463ff4707bd332fcbefd442859895e2bf2 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 20:56:06 +0200 Subject: [PATCH 11/18] Revert "fix doc warnings" This reverts commit 21f2fa7dbc6272dd658cb7e5746cdff6560e0344. --- tokio-macros/src/lib.rs | 2 +- tokio/src/time/delay_queue.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index 320fdbe4cd8..2bb0c2100f9 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -6,7 +6,7 @@ rust_2018_idioms, unreachable_pub )] -#![deny(broken_intra_doc_links)] +#![deny(intra_doc_link_resolution_failure)] #![doc(test( no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) diff --git a/tokio/src/time/delay_queue.rs b/tokio/src/time/delay_queue.rs index e6bc6bc0df6..a947cc6fe4e 100644 --- a/tokio/src/time/delay_queue.rs +++ b/tokio/src/time/delay_queue.rs @@ -403,7 +403,7 @@ impl DelayQueue { /// # } /// ``` /// - /// [`poll`]: struct@DelayQueue::poll + /// [`poll`]: method@Self::poll /// [`remove`]: method@Self::remove /// [`reset`]: method@Self::reset /// [`Key`]: struct@Key @@ -582,7 +582,7 @@ impl DelayQueue { /// /// Note that this method has no effect on the allocated capacity. /// - /// [`poll`]: struct@DelayQueue::poll + /// [`poll`]: method@Self::poll /// /// # Examples /// From 889627be9c747ede215799f1b6bdf669a964b548 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 21:15:22 +0200 Subject: [PATCH 12/18] remove thread-pool feature --- tokio-util/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index c9133fe0730..b47c9dfc21c 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -45,7 +45,7 @@ pin-project-lite = "0.1.4" tokio = { version = "0.3.0", path = "../tokio", features = ["full"] } tokio-test = { version = "0.3.0", path = "../tokio-test" } -futures = { version = "0.3.0", features = ["thread-pool"] } +futures = "0.3.0" futures-test = "0.3.5" [package.metadata.docs.rs] From 20e931f9041bc7c1d5ed190bcee027c98d13e432 Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 22:04:41 +0200 Subject: [PATCH 13/18] adding some newlines --- tokio-util/src/context.rs | 2 ++ tokio-util/tests/context.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 81a70985d91..f23762660de 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -23,6 +23,7 @@ pin_project! { impl Future for TokioContext { type Output = F::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); let handle = me.handle; @@ -31,6 +32,7 @@ impl Future for TokioContext { handle.enter(|| fut.poll(cx)) } } + /// Trait extension that simplifies bundling a `Handle` with a `Future`. pub trait HandleExt: Into + Clone { /// Convenience method that takes a Future and returns a `TokioContext`. diff --git a/tokio-util/tests/context.rs b/tokio-util/tests/context.rs index 93aaa70425c..49038ddbdb4 100644 --- a/tokio-util/tests/context.rs +++ b/tokio-util/tests/context.rs @@ -3,6 +3,7 @@ use tokio::runtime::Builder; use tokio::time::*; use tokio_util::context::HandleExt; + #[test] fn tokio_context_with_another_runtime() { let mut rt1 = Builder::new() From 3be110b9512a4fc131f9692328ce31373f3f167d Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 22:05:44 +0200 Subject: [PATCH 14/18] rm newline --- tokio-util/src/context.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index f23762660de..6d3ad72675a 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -70,7 +70,6 @@ pub trait HandleExt: Into + Clone { /// executor.spawn(async { /// delay_for(Duration::from_millis(2)).await /// }); - fn wrap(&self, fut: F) -> TokioContext where F: Future, From c7c658aeda44e09062b155ff29760d4b2a7c99dc Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 22:18:58 +0200 Subject: [PATCH 15/18] docs for HandleExt::wrap() --- tokio-util/src/context.rs | 43 ++++++++++++++++----------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 6d3ad72675a..b626f9ad039 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -39,37 +39,28 @@ pub trait HandleExt: Into + Clone { /// /// # Example: calling Tokio Runtime from a custom ThreadPool /// - /// use std::futures::{self, Future}; - /// use tokio-utils::context::HandleExt; - /// use tokio::runtime::{Builder, Handle, Runtime}; - /// use tokio::time::{delay_for, Duration}; - /// - /// struct ThreadPool { - /// pub inner: futures::executor::ThreadPool, - /// pub rt: Runtime, - /// } /// - /// impl ThreadPool { - /// fn spawn(&self, f: impl Future + Send + 'static) { - /// let handle = self.rt.handle().clone(); - /// // create a TokioContext from the handle and future. - /// let h = handle.wrap(f); - /// self.inner.spawn_ok(h); - /// } - /// } + /// ```no_run + /// use tokio_util::context::HandleExt; + /// use tokio::time::{delay_for, Duration}; /// - /// let rt = tokio::runtime::Builder::new() + /// let mut rt = tokio::runtime::Builder::new() /// .threaded_scheduler() /// .enable_all() - /// .build.unwrap(); - /// let inner = futures::executor::ThreadPool::builder().create().unwrap(); - /// let executor: ThreadPool { - /// inner, rt - /// } + /// .build().unwrap(); + /// + /// let rt2 = tokio::runtime::Builder::new() + /// .threaded_scheduler() + /// .build().unwrap(); + /// + /// let fut = delay_for(Duration::from_millis(2)); /// - /// executor.spawn(async { - /// delay_for(Duration::from_millis(2)).await - /// }); + /// rt.block_on( + /// rt2 + /// .handle() + /// .wrap(async { delay_for(Duration::from_millis(2)).await }), + /// ); + ///``` fn wrap(&self, fut: F) -> TokioContext where F: Future, From caa830064f37e04c5861ff4e43c80226eb75b81b Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Wed, 26 Aug 2020 22:31:10 +0200 Subject: [PATCH 16/18] remove unnecessary dereference --- tokio-util/src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index b626f9ad039..9b878558e67 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -67,7 +67,7 @@ pub trait HandleExt: Into + Clone { { TokioContext { inner: fut, - handle: (*self).clone().into(), + handle: self.clone().into(), } } } From 647dafcd446a5f97ef24fba70bacda718ffe926f Mon Sep 17 00:00:00 2001 From: Blas Rodriguez Irizar Date: Thu, 27 Aug 2020 09:44:52 +0200 Subject: [PATCH 17/18] implement wrap directly for Handle --- tokio-util/src/context.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 9b878558e67..4edc61c634a 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -34,7 +34,7 @@ impl Future for TokioContext { } /// Trait extension that simplifies bundling a `Handle` with a `Future`. -pub trait HandleExt: Into + Clone { +pub trait HandleExt { /// Convenience method that takes a Future and returns a `TokioContext`. /// /// # Example: calling Tokio Runtime from a custom ThreadPool @@ -61,15 +61,14 @@ pub trait HandleExt: Into + Clone { /// .wrap(async { delay_for(Duration::from_millis(2)).await }), /// ); ///``` - fn wrap(&self, fut: F) -> TokioContext - where - F: Future, - { + fn wrap(&self, fut: F) -> TokioContext; +} + +impl HandleExt for Handle { + fn wrap(&self, fut: F) -> TokioContext { TokioContext { inner: fut, - handle: self.clone().into(), + handle: self.clone(), } } } - -impl HandleExt for Handle {} From 7095c67d880614e09936aae58a4255ef5b944207 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 27 Aug 2020 10:22:29 -0400 Subject: [PATCH 18/18] Apply suggestions from code review --- tokio-util/src/context.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tokio-util/src/context.rs b/tokio-util/src/context.rs index 4edc61c634a..f6289093299 100644 --- a/tokio-util/src/context.rs +++ b/tokio-util/src/context.rs @@ -1,5 +1,10 @@ -//! This module introduces `TokioContext`, which enables a simple way of using -//! the tokio runtime with any other executor. +//! Tokio context aware futures utilities. +//! +//! This module includes utilities around integrating tokio with other runtimes +//! by allowing the context to be attached to futures. This allows spawning +//! futures on other executors while still using tokio to drive them. This +//! can be useful if you need to use a tokio based library in an executor/runtime +//! that does not provide a tokio context. use pin_project_lite::pin_project; use std::{ @@ -10,7 +15,7 @@ use std::{ use tokio::runtime::Handle; pin_project! { - /// TokioContext allows connecting a custom executor with the tokio runtime. + /// `TokioContext` allows connecting a custom executor with the tokio runtime. /// /// It contains a `Handle` to the runtime. A handle to the runtime can be /// obtain by calling the `Runtime::handle()` method. @@ -39,7 +44,6 @@ pub trait HandleExt { /// /// # Example: calling Tokio Runtime from a custom ThreadPool /// - /// /// ```no_run /// use tokio_util::context::HandleExt; /// use tokio::time::{delay_for, Duration};