Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LocalSet example #3438

Merged
merged 2 commits into from Jan 21, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
109 changes: 105 additions & 4 deletions tokio/src/task/local.rs
Expand Up @@ -43,10 +43,13 @@ cfg_rt! {
/// }).await.unwrap();
/// }
/// ```
/// In order to spawn `!Send` futures, we can use a local task set to
/// schedule them on the thread calling [`Runtime::block_on`]. When running
/// inside of the local task set, we can use [`task::spawn_local`], which can
/// spawn `!Send` futures. For example:
///
/// # Use with `run_until`
///
/// To spawn `!Send` futures, we can use a local task set to schedule them
/// on the thread calling [`Runtime::block_on`]. When running inside of the
/// local task set, we can use [`task::spawn_local`], which can spawn
/// `!Send` futures. For example:
///
/// ```rust
/// use std::rc::Rc;
Expand All @@ -71,6 +74,9 @@ cfg_rt! {
/// }).await;
/// }
/// ```
/// **Note:** The `run_until` method can only be used in `#[tokio::main]`,
/// `#[tokio::test]` or directly inside a call to [`Runtime::block_on`]. It
/// cannot be used inside a task spawned with `tokio::spawn`.
///
/// ## Awaiting a `LocalSet`
///
Expand Down Expand Up @@ -104,11 +110,106 @@ cfg_rt! {
/// local.await;
/// }
/// ```
/// **Note:** Awaiting a `LocalSet` can only be done inside
/// `#[tokio::main]`, `#[tokio::test]` or directly inside a call to
/// [`Runtime::block_on`]. It cannot be used inside a task spawned with
/// `tokio::spawn`.
///
/// ## Use inside `tokio::spawn`
///
/// The two methods mentioned above cannot be used inside `tokio::spawn`, so
/// to spawn `!Send` futures from inside `tokio::spawn`, we need to do
/// something else. The solution is to create the `LocalSet` somewhere else,
/// and communicate with it using an [`mpsc`] channel.
///
/// The following example puts the `LocalSet` inside a new thread.
/// ```
/// use tokio::runtime::Builder;
/// use tokio::sync::{mpsc, oneshot};
/// use tokio::task::LocalSet;
///
/// // This struct describes the task you want to spawn. Here we include
/// // some simple examples. The oneshot channel allows sending a response
/// // to the spawner.
/// #[derive(Debug)]
/// enum Task {
/// PrintNumber(u32),
/// AddOne(u32, oneshot::Sender<u32>),
/// }
///
/// #[derive(Clone)]
/// struct LocalSpawner {
/// send: mpsc::UnboundedSender<Task>,
/// }
///
/// impl LocalSpawner {
/// pub fn new() -> Self {
/// let (send, mut recv) = mpsc::unbounded_channel();
///
/// let rt = Builder::new_current_thread()
/// .enable_all()
/// .build()
/// .unwrap();
///
/// std::thread::spawn(move || {
/// let local = LocalSet::new();
///
/// local.spawn_local(async move {
/// while let Some(new_task) = recv.recv().await {
/// tokio::task::spawn_local(run_task(new_task));
/// }
/// // If the while loop returns, then all the LocalSpawner
/// // objects have have been dropped.
/// });
///
/// // This will return once all senders are dropped and all
/// // spawned tasks have returned.
/// rt.block_on(local);
/// });
///
/// Self {
/// send,
/// }
/// }
///
/// pub fn spawn(&self, task: Task) {
/// self.send.send(task).expect("Thread with LocalSet has shut down.");
/// }
/// }
///
/// // This task may do !Send stuff. We use printing a number as an example,
/// // but it could be anything.
/// //
/// // The Task struct is an enum to support spawning many different kinds
/// // of operations.
/// async fn run_task(task: Task) {
/// match task {
/// Task::PrintNumber(n) => {
/// println!("{}", n);
/// },
/// Task::AddOne(n, response) => {
/// // We ignore failures to send the response.
/// let _ = response.send(n + 1);
/// },
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let spawner = LocalSpawner::new();
///
/// let (send, response) = oneshot::channel();
/// spawner.spawn(Task::AddOne(10, send));
/// let eleven = response.await.unwrap();
/// assert_eq!(eleven, 11);
/// }
/// ```
///
/// [`Send`]: trait@std::marker::Send
/// [local task set]: struct@LocalSet
/// [`Runtime::block_on`]: method@crate::runtime::Runtime::block_on
/// [`task::spawn_local`]: fn@spawn_local
/// [`mpsc`]: mod@crate::sync::mpsc
pub struct LocalSet {
/// Current scheduler tick
tick: Cell<u8>,
Expand Down