From 0f7b2cfd0e9e4a7f95a5ba740deeb2661d4774e3 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Sun, 3 Oct 2021 18:09:27 -0400 Subject: [PATCH] task: Add more tips + links to `spawn_blocking` docs I'm working on some code which heavily uses `spawn_blocking` to run synchronous code, and it took me a while to find and understand the relevant APIs and patterns here. Let's link to the channel blocking APIs, and provide a small example. I plan to mention https://github.com/tokio-rs/tokio/pull/4146 here too once that merges. --- tokio/src/task/blocking.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tokio/src/task/blocking.rs b/tokio/src/task/blocking.rs index e4fe254a087..2b462cf94aa 100644 --- a/tokio/src/task/blocking.rs +++ b/tokio/src/task/blocking.rs @@ -111,9 +111,22 @@ cfg_rt! { /// still spawn additional threads for blocking operations. The basic /// scheduler's single thread is only used for asynchronous code. /// + /// # Related APIs and patterns for bridging asynchronous and blocking code + /// + /// In simple cases, it is sufficient to have the closure accept input + /// parameters at creation time and return a single value (or struct/tuple, etc.). + /// + /// For more complex situations in which it is desirable to stream data to or from + /// the synchronous context, the [`channel APIs`] have [`crate::sync::mpsc::Sender::blocking_send`] and + /// [`crate::sync::mpsc::Receiver::blocking_recv`] that support being called + /// from the spawned thread's blocking context. You may also find it useful + /// to wrap the channels in e.g. [`ReceiverStream`]. + /// /// [`Builder`]: struct@crate::runtime::Builder /// [blocking]: ../index.html#cpu-bound-tasks-and-blocking-code /// [rayon]: https://docs.rs/rayon + /// [`channel APIs`]: crate::sync::mpsc + /// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html /// [`thread::spawn`]: fn@std::thread::spawn /// [`shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout /// @@ -132,6 +145,29 @@ cfg_rt! { /// # Ok(()) /// # } /// ``` + /// + /// ``` + /// use tokio::task; + /// use tokio::sync::mpsc; + /// + /// # async fn docs() { + /// let (tx, mut rx) = mpsc::channel(2); + /// let start = 5; + /// let worker = task::spawn_blocking(move || { + /// for x in 0..10 { + /// // Stand in for complex computation + /// tx.blocking_send(start + x).unwrap(); + /// } + /// }); + /// + /// let mut acc = 0; + /// while let Some(v) = rx.recv().await { + /// acc += v; + /// } + /// assert_eq!(acc, 95); + /// worker.await.unwrap(); + /// # } + /// ``` #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(f: F) -> JoinHandle where