Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve bounded mpsc documentation #3458

Merged
merged 1 commit into from
Jan 22, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 48 additions & 6 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -108,8 +108,21 @@ impl<T> Receiver<T> {

/// Receives the next value for this receiver.
///
/// `None` is returned when all `Sender` halves have dropped, indicating
/// that no further values can be sent on the channel.
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. This indicates that no
/// further values can ever be received from this `Receiver`. The channel is
/// closed when all senders have been dropped, or when [`close`] is called.
///
/// If there are no messages in the channel's buffer, but the channel has
/// not yet been closed, this method will sleep until a message is sent or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep feels weird here but I can't think of anything better.

/// the channel is closed.
///
/// Note that if [`close`] is called, but there are still outstanding
/// [`Permits`] from before it was closed, the channel is not considered
/// closed by `recv` until the permits are released.
Comment on lines +120 to +122
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tested by

#[tokio::test]
async fn recv_close_gets_none_reserved() {
let (tx1, mut rx) = mpsc::channel::<i32>(1);
let tx2 = tx1.clone();
let permit1 = assert_ok!(tx1.reserve().await);
let mut permit2 = task::spawn(tx2.reserve());
assert_pending!(permit2.poll());
rx.close();
assert!(permit2.is_woken());
assert_ready_err!(permit2.poll());
{
let mut recv = task::spawn(rx.recv());
assert_pending!(recv.poll());
permit1.send(123);
assert!(recv.is_woken());
let v = assert_ready!(recv.poll());
assert_eq!(v, Some(123));
}
assert!(rx.recv().await.is_none());
}

///
/// [`close`]: Self::close
/// [`Permits`]: struct@crate::sync::mpsc::Permit
///
/// # Examples
///
Expand Down Expand Up @@ -152,6 +165,27 @@ impl<T> Receiver<T> {

/// Blocking receive to call outside of asynchronous contexts.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. This indicates that no
/// further values can ever be received from this `Receiver`. The channel is
/// closed when all senders have been dropped, or when [`close`] is called.
///
/// If there are no messages in the channel's buffer, but the channel has
/// not yet been closed, this method will block until a message is sent or
/// the channel is closed.
///
/// This method is intended for use cases where you are sending from
/// asynchronous code to synchronous code, and will work even if the sender
/// is not using [`blocking_send`] to send the message.
Comment on lines +177 to +179
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular note is what inspired this PR, since whether the channels works when one end is sync and the other is async is a surprisingly common question.

///
/// Note that if [`close`] is called, but there are still outstanding
/// [`Permits`] from before it was closed, the channel is not considered
/// closed by `blocking_recv` until the permits are released.
///
/// [`close`]: Self::close
/// [`Permits`]: struct@crate::sync::mpsc::Permit
/// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
///
/// # Panics
///
/// This function panics if called within an asynchronous execution
Expand Down Expand Up @@ -201,14 +235,16 @@ impl<T> Receiver<T> {
self.chan.try_recv()
}

/// Closes the receiving half of a channel, without dropping it.
/// Closes the receiving half of a channel without dropping it.
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered. Any
/// outstanding [`Permit`] values will still be able to send messages.
///
/// In order to guarantee no messages are dropped, after calling `close()`,
/// `recv()` must be called until `None` is returned.
/// To guarantee that no messages are dropped, after calling `close()`,
/// `recv()` must be called until `None` is returned. If there are
/// outstanding [`Permit`] values, the `recv` method will not return `None`
/// until those are released.
///
/// [`Permit`]: Permit
///
Expand Down Expand Up @@ -360,7 +396,7 @@ impl<T> Sender<T> {
/// tx4.closed(),
/// tx5.closed()
/// );
//// println!("Receiver dropped");
/// println!("Receiver dropped");
/// }
/// ```
pub async fn closed(&self) {
Expand Down Expand Up @@ -505,6 +541,12 @@ impl<T> Sender<T> {

/// Blocking send to call outside of asynchronous contexts.
///
/// This method is intended for use cases where you are sending from
/// synchronous code to asynchronous code, and will work even if the
/// receiver is not using [`blocking_recv`] to receive the message.
///
/// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
///
/// # Panics
///
/// This function panics if called within an asynchronous execution
Expand Down