From 19a6d14a20c191c9f4714a20da43fa1432ad33cf Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 7 Dec 2020 10:24:28 -0800 Subject: [PATCH] sync: fix mpsc bug related to closing When closing a channel, it is possible to get into an invalid state when outstanding permits release capacity back to the channel. --- tokio/src/sync/semaphore_ll.rs | 4 ++++ tokio/tests/sync_mpsc.rs | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index f044095f8fc..1312f56ea3e 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -917,6 +917,10 @@ impl Waiter { let mut curr = WaiterState(self.state.load(Acquire)); loop { + if curr.is_closed() { + return 0; + } + if !curr.is_queued() { assert_eq!(0, curr.permits_to_acquire()); } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f4966c31377..5dd8b06b551 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -512,3 +512,27 @@ fn ready_close_cancel_bounded() { assert!(recv.is_woken()); } + +#[tokio::test] +async fn permit_available_not_acquired_close() { + use futures::future::poll_fn; + + let (mut tx1, mut rx) = mpsc::channel::<()>(1); + let mut tx2 = tx1.clone(); + + { + let mut ready = task::spawn(poll_fn(|cx| tx1.poll_ready(cx))); + assert_ready_ok!(ready.poll()); + } + + let mut ready = task::spawn(poll_fn(|cx| tx2.poll_ready(cx))); + assert_pending!(ready.poll()); + + rx.close(); + + drop(tx1); + assert!(ready.is_woken()); + + drop(tx2); + assert!(rx.recv().await.is_none()); +} \ No newline at end of file