From e16444f2f1307eda722cab426c88f1ae06358273 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 16 Jan 2021 17:05:06 +0100 Subject: [PATCH 1/2] task: add LocalSet example --- tokio/src/task/local.rs | 108 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index a2acf577b7b..4464a8d6eb0 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -43,10 +43,13 @@ cfg_rt! { /// }).await.unwrap(); /// } /// ``` - /// In order to spawn `!Send` futures, we can use a local task set to - /// schedule them on the thread calling [`Runtime::block_on`]. When running - /// inside of the local task set, we can use [`task::spawn_local`], which can - /// spawn `!Send` futures. For example: + /// + /// # Use with `run_until` + /// + /// To spawn `!Send` futures, we can use a local task set to schedule them + /// on the thread calling [`Runtime::block_on`]. When running inside of the + /// local task set, we can use [`task::spawn_local`], which can spawn + /// `!Send` futures. For example: /// /// ```rust /// use std::rc::Rc; @@ -71,6 +74,9 @@ cfg_rt! { /// }).await; /// } /// ``` + /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, + /// or directly inside a call to [`Runtime::block_on`]. It cannot be used + /// inside a task spawned with `tokio::spawn`. /// /// ## Awaiting a `LocalSet` /// @@ -104,11 +110,105 @@ cfg_rt! { /// local.await; /// } /// ``` + /// **Note:** Awaiting a `LocalSet` can only be done inside + /// `#[tokio::main]`, or directly inside a call to [`Runtime::block_on`]. It + /// cannot be used inside a task spawned with `tokio::spawn`. + /// + /// ## Use inside `tokio::spawn` + /// + /// The two methods mentioned above cannot be used inside `tokio::spawn`, so + /// to spawn `!Send` futures from inside `tokio::spawn`, we need to do + /// something else. The solution is to create the `LocalSet` somewhere else, + /// and communicate with it using an [`mpsc`] channel. + /// + /// The following example puts the `LocalSet` inside a new thread. + /// ``` + /// use tokio::runtime::Builder; + /// use tokio::sync::{mpsc, oneshot}; + /// use tokio::task::LocalSet; + /// + /// // This struct describes the task you want to spawn. Here we include + /// // some simple examples. The oneshot channel allows sending a response + /// // to the spawner. + /// #[derive(Debug)] + /// enum Task { + /// PrintNumber(u32), + /// AddOne(u32, oneshot::Sender), + /// } + /// + /// #[derive(Clone)] + /// struct LocalSpawner { + /// send: mpsc::UnboundedSender, + /// } + /// + /// impl LocalSpawner { + /// pub fn new() -> Self { + /// let (send, mut recv) = mpsc::unbounded_channel(); + /// + /// let rt = Builder::new_current_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// std::thread::spawn(move || { + /// let local = LocalSet::new(); + /// + /// local.spawn_local(async move { + /// while let Some(new_task) = recv.recv().await { + /// tokio::task::spawn_local(run_task(new_task)); + /// } + /// // If the while loop returns, then all the LocalSpawner + /// // objects have have been dropped. + /// }); + /// + /// // This will return once all senders are dropped and all + /// // spawned tasks have returned. + /// rt.block_on(local); + /// }); + /// + /// Self { + /// send, + /// } + /// } + /// + /// pub fn spawn(&self, task: Task) { + /// self.send.send(task).expect("Thread with LocalSet has shut down."); + /// } + /// } + /// + /// // This task may do !Send stuff. We use printing a number as an example, + /// // but it could be anything. + /// // + /// // The Task struct is an enum to support spawning many different kinds + /// // of operations. + /// async fn run_task(task: Task) { + /// match task { + /// Task::PrintNumber(n) => { + /// println!("{}", n); + /// }, + /// Task::AddOne(n, response) => { + /// // We ignore failures to send the response. + /// let _ = response.send(n + 1); + /// }, + /// } + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let spawner = LocalSpawner::new(); + /// + /// let (send, response) = oneshot::channel(); + /// spawner.spawn(Task::AddOne(10, send)); + /// let eleven = response.await.unwrap(); + /// assert_eq!(eleven, 11); + /// } + /// ``` /// /// [`Send`]: trait@std::marker::Send /// [local task set]: struct@LocalSet /// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on /// [`task::spawn_local`]: fn@spawn_local + /// [`mpsc`]: mod@crate::sync::mpsc pub struct LocalSet { /// Current scheduler tick tick: Cell, From 9797497b3ac8b23d7d6824b61346a7cc8798df45 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 21 Jan 2021 15:58:14 +0100 Subject: [PATCH 2/2] Add note on #[tokio::test] --- tokio/src/task/local.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 4464a8d6eb0..ee1151135fd 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -75,8 +75,8 @@ cfg_rt! { /// } /// ``` /// **Note:** The `run_until` method can only be used in `#[tokio::main]`, - /// or directly inside a call to [`Runtime::block_on`]. It cannot be used - /// inside a task spawned with `tokio::spawn`. + /// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It + /// cannot be used inside a task spawned with `tokio::spawn`. /// /// ## Awaiting a `LocalSet` /// @@ -111,8 +111,9 @@ cfg_rt! { /// } /// ``` /// **Note:** Awaiting a `LocalSet` can only be done inside - /// `#[tokio::main]`, or directly inside a call to [`Runtime::block_on`]. It - /// cannot be used inside a task spawned with `tokio::spawn`. + /// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to + /// [`Runtime::block_on`]. It cannot be used inside a task spawned with + /// `tokio::spawn`. /// /// ## Use inside `tokio::spawn` ///