Skip to content

Commit

Permalink
sync: elaborate on cross-runtime message passing (#4240)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Nov 17, 2021
1 parent 095b5dc commit b1afd95
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 2 deletions.
5 changes: 5 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -563,6 +563,11 @@ impl<T> Sender<T> {
/// [`close`]: Receiver::close
/// [`Receiver`]: Receiver
///
/// # Panics
///
/// This function panics if it is called outside the context of a Tokio
/// runtime [with time enabled](crate::runtime::Builder::enable_time).
///
/// # Examples
///
/// In the following example, each call to `send_timeout` will block until the
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/mpsc/error.rs
Expand Up @@ -19,7 +19,7 @@ impl<T: fmt::Debug> std::error::Error for SendError<T> {}

/// This enumeration is the list of the possible error outcomes for the
/// [try_send](super::Sender::try_send) method.
#[derive(Debug)]
#[derive(Debug, Eq, PartialEq)]
pub enum TrySendError<T> {
/// The data could not be sent on the channel because the channel is
/// currently full and sending would require blocking.
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Error for RecvError {}
cfg_time! {
// ===== SendTimeoutError =====

#[derive(Debug)]
#[derive(Debug, Eq, PartialEq)]
/// Error returned by [`Sender::send_timeout`](super::Sender::send_timeout)].
pub enum SendTimeoutError<T> {
/// The data could not be sent on the channel because the channel is
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/sync/mpsc/mod.rs
Expand Up @@ -58,6 +58,22 @@
//! [crossbeam][crossbeam-unbounded]. Similarly, for sending a message _from sync
//! to async_, you should use an unbounded Tokio `mpsc` channel.
//!
//! Please be aware that the above remarks were written with the `mpsc` channel
//! in mind, but they can also be generalized to other kinds of channels. In
//! general, any channel method that isn't marked async can be called anywhere,
//! including outside of the runtime. For example, sending a message on a
//! oneshot channel from outside the runtime is perfectly fine.
//!
//! # Multiple runtimes
//!
//! The mpsc channel does not care about which runtime you use it in, and can be
//! used to send messages from one runtime to another. It can also be used in
//! non-Tokio runtimes.
//!
//! There is one exception to the above: the [`send_timeout`] must be used from
//! within a Tokio runtime, however it is still not tied to one specific Tokio
//! runtime, and the sender may be moved from one Tokio runtime to another.
//!
//! [`Sender`]: crate::sync::mpsc::Sender
//! [`Receiver`]: crate::sync::mpsc::Receiver
//! [bounded-send]: crate::sync::mpsc::Sender::send()
Expand All @@ -69,6 +85,7 @@
//! [`Handle::block_on`]: crate::runtime::Handle::block_on()
//! [std-unbounded]: std::sync::mpsc::channel
//! [crossbeam-unbounded]: https://docs.rs/crossbeam/*/crossbeam/channel/fn.unbounded.html
//! [`send_timeout`]: crate::sync::mpsc::Sender::send_timeout

pub(super) mod block;

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -9,6 +9,9 @@
//!
//! Each handle can be used on separate tasks.
//!
//! Since the `send` method is not async, it can be used anywhere. This includes
//! sending between two runtimes, and using it from non-async code.
//!
//! # Examples
//!
//! ```
Expand Down
34 changes: 34 additions & 0 deletions tokio/tests/sync_mpsc.rs
Expand Up @@ -597,3 +597,37 @@ fn try_recv_close_while_empty_unbounded() {
drop(tx);
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
}

#[tokio::test(start_paused = true)]
async fn recv_timeout() {
use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout};
use tokio::time::Duration;

let (tx, rx) = mpsc::channel(5);

assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(()));
assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(()));
assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(()));
assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(()));
assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(()));
assert_eq!(
tx.send_timeout(60, Duration::from_secs(1)).await,
Err(Timeout(60))
);

drop(rx);
assert_eq!(
tx.send_timeout(70, Duration::from_secs(1)).await,
Err(Closed(70))
);
}

#[test]
#[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime"]
fn recv_timeout_panic() {
use futures::future::FutureExt;
use tokio::time::Duration;

let (tx, _rx) = mpsc::channel(5);
tx.send_timeout(10, Duration::from_secs(1)).now_or_never();
}

0 comments on commit b1afd95

Please sign in to comment.