Skip to content

Commit

Permalink
sync: add blocking_recv and blocking_send in mpsc (#2684)
Browse files Browse the repository at this point in the history
Fixes: #2629
  • Loading branch information
xd009642 committed Aug 26, 2020
1 parent 2e7e42b commit 347e18b
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
70 changes: 70 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,42 @@ impl<T> Receiver<T> {
self.chan.recv(cx)
}

/// Blocking receive to call outside of asynchronous contexts.
///
/// # Panics
///
/// This function panics if called within an asynchronous execution
/// context.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use tokio::runtime::Runtime;
/// use tokio::sync::mpsc;
///
/// fn main() {
/// let (mut tx, mut rx) = mpsc::channel::<u8>(10);
///
/// let sync_code = thread::spawn(move || {
/// assert_eq!(Some(10), rx.blocking_recv());
/// });
///
/// Runtime::new()
/// .unwrap()
/// .block_on(async move {
/// let _ = tx.send(10).await;
/// });
/// sync_code.join().unwrap()
/// }
/// ```
#[cfg(feature = "rt-core")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))]
pub fn blocking_recv(&mut self) -> Option<T> {
let mut enter_handle = crate::runtime::enter::enter(false);
enter_handle.block_on(self.recv()).unwrap()
}

/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to
Expand Down Expand Up @@ -393,6 +429,40 @@ impl<T> Sender<T> {
}
}

/// Blocking send to call outside of asynchronous contexts.
///
/// # Panics
///
/// This function panics if called within an asynchronous execution
/// context.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use tokio::runtime::Runtime;
/// use tokio::sync::mpsc;
///
/// fn main() {
/// let (mut tx, mut rx) = mpsc::channel::<u8>(1);
///
/// let sync_code = thread::spawn(move || {
/// tx.blocking_send(10).unwrap();
/// });
///
/// Runtime::new().unwrap().block_on(async move {
/// assert_eq!(Some(10), rx.recv().await);
/// });
/// sync_code.join().unwrap()
/// }
/// ```
#[cfg(feature = "rt-core")]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-core")))]
pub fn blocking_send(&mut self, value: T) -> Result<(), SendError<T>> {
let mut enter_handle = crate::runtime::enter::enter(false);
enter_handle.block_on(self.send(value)).unwrap()
}

/// Returns `Poll::Ready(Ok(()))` when the channel is able to accept another item.
///
/// If the channel is full, then `Poll::Pending` is returned and the task is notified when a
Expand Down
11 changes: 6 additions & 5 deletions tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@
//! are two situations to consider:
//!
//! **Bounded channel**: If you need a bounded channel, you should use a bounded
//! Tokio `mpsc` channel for both directions of communication. To call the async
//! [`send`][bounded-send] or [`recv`][bounded-recv] methods in sync code, you
//! will need to use [`Handle::block_on`], which allow you to execute an async
//! method in synchronous code. This is necessary because a bounded channel may
//! need to wait for additional capacity to become available.
//! Tokio `mpsc` channel for both directions of communication. Instead of calling
//! the async [`send`][bounded-send] or [`recv`][bounded-recv] methods, in
//! synchronous code you will need to use the [`blocking_send`][blocking-send] or
//! [`blocking_recv`][blocking-recv] methods.
//!
//! **Unbounded channel**: You should use the kind of channel that matches where
//! the receiver is. So for sending a message _from async to sync_, you should
Expand All @@ -59,6 +58,8 @@
//! [`Receiver`]: crate::sync::mpsc::Receiver
//! [bounded-send]: crate::sync::mpsc::Sender::send()
//! [bounded-recv]: crate::sync::mpsc::Receiver::recv()
//! [blocking-send]: crate::sync::mpsc::Sender::blocking_send()
//! [blocking-recv]: crate::sync::mpsc::Receiver::blocking_recv()
//! [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
//! [`Handle::block_on`]: crate::runtime::Handle::block_on()
//! [std-unbounded]: std::sync::mpsc::channel
Expand Down
44 changes: 44 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]

use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::{TryRecvError, TrySendError};
use tokio_test::task;
Expand Down Expand Up @@ -490,3 +492,45 @@ fn try_recv_unbounded() {
_ => panic!(),
}
}

#[test]
fn blocking_recv() {
let (mut tx, mut rx) = mpsc::channel::<u8>(1);

let sync_code = thread::spawn(move || {
assert_eq!(Some(10), rx.blocking_recv());
});

Runtime::new().unwrap().block_on(async move {
let _ = tx.send(10).await;
});
sync_code.join().unwrap()
}

#[tokio::test]
#[should_panic]
async fn blocking_recv_async() {
let (_tx, mut rx) = mpsc::channel::<()>(1);
let _ = rx.blocking_recv();
}

#[test]
fn blocking_send() {
let (mut tx, mut rx) = mpsc::channel::<u8>(1);

let sync_code = thread::spawn(move || {
tx.blocking_send(10).unwrap();
});

Runtime::new().unwrap().block_on(async move {
assert_eq!(Some(10), rx.recv().await);
});
sync_code.join().unwrap()
}

#[tokio::test]
#[should_panic]
async fn blocking_send_async() {
let (mut tx, _rx) = mpsc::channel::<()>(1);
let _ = tx.blocking_send(());
}

0 comments on commit 347e18b

Please sign in to comment.