Skip to content

Commit

Permalink
sync: fix missing notification during mpsc close (#2854)
Browse files Browse the repository at this point in the history
When the mpsc channel receiver closes the channel, receiving should
return `None` once all in-progress sends have completed. When a sender
reserves capacity, this prevents the receiver from fully shutting down.
Previously, when the sender, after reserving capacity, dropped without
sending a message, the receiver was not notified. This results in
blocking the shutdown process until all sender handles drop.

This patch adds a receiver notification when the channel is both closed
and all outstanding sends have completed.
  • Loading branch information
carllerche committed Sep 21, 2020
1 parent 2b96b17 commit c0c7124
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 6 deletions.
22 changes: 17 additions & 5 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ pub(crate) trait Semaphore {

/// The permit is dropped without a value being sent. In this case, the
/// permit must be returned to the semaphore.
fn drop_permit(&self, permit: &mut Self::Permit);
///
/// # Return
///
/// Returns true if the permit was acquired.
fn drop_permit(&self, permit: &mut Self::Permit) -> bool;

fn is_idle(&self) -> bool;

Expand Down Expand Up @@ -192,7 +196,7 @@ where

pub(crate) fn disarm(&mut self) {
// TODO: should this error if not acquired?
self.inner.semaphore.drop_permit(&mut self.permit)
self.inner.semaphore.drop_permit(&mut self.permit);
}

/// Send a message and notify the receiver.
Expand Down Expand Up @@ -234,7 +238,11 @@ where
S: Semaphore,
{
fn drop(&mut self) {
self.inner.semaphore.drop_permit(&mut self.permit);
let notify = self.inner.semaphore.drop_permit(&mut self.permit);

if notify && self.inner.semaphore.is_idle() {
self.inner.rx_waker.wake();
}

if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 {
return;
Expand Down Expand Up @@ -424,8 +432,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
Permit::new()
}

fn drop_permit(&self, permit: &mut Permit) {
fn drop_permit(&self, permit: &mut Permit) -> bool {
let ret = permit.is_acquired();
permit.release(1, &self.0);
ret
}

fn add_permit(&self) {
Expand Down Expand Up @@ -477,7 +487,9 @@ impl Semaphore for AtomicUsize {

fn new_permit() {}

fn drop_permit(&self, _permit: &mut ()) {}
fn drop_permit(&self, _permit: &mut ()) -> bool {
false
}

fn add_permit(&self) {
let prev = self.fetch_sub(2, Release);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/semaphore_ll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ impl Waiter {
}

/// Try to decrement the number of permits to acquire. This returns the
/// actual number of permits that were decremented. The delta betweeen `n`
/// actual number of permits that were decremented. The delta between `n`
/// and the return has been assigned to the permit and the caller must
/// assign these back to the semaphore.
fn try_dec_permits_to_acquire(&self, n: usize) -> usize {
Expand Down
22 changes: 22 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,25 @@ fn try_recv_unbounded() {
_ => panic!(),
}
}

#[test]
fn ready_close_cancel_bounded() {
use futures::future::poll_fn;

let (mut tx, mut rx) = mpsc::channel::<()>(100);
let _tx2 = tx.clone();

{
let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await });
assert_ready_ok!(ready.poll());
}

rx.close();

let mut recv = task::spawn(async { rx.recv().await });
assert_pending!(recv.poll());

drop(tx);

assert!(recv.is_woken());
}

0 comments on commit c0c7124

Please sign in to comment.