From 5f3fdbbfc389a955ef6bd59477cb1a7e58055fb6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 4 Mar 2021 21:49:14 +0100 Subject: [PATCH 1/2] util: fuse PollSemaphore --- tokio-util/src/sync/poll_semaphore.rs | 9 ++++--- tokio-util/tests/poll_semaphore.rs | 38 +++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 3 deletions(-) create mode 100644 tokio-util/tests/poll_semaphore.rs diff --git a/tokio-util/src/sync/poll_semaphore.rs b/tokio-util/src/sync/poll_semaphore.rs index d4594d03e86..2e2e8576518 100644 --- a/tokio-util/src/sync/poll_semaphore.rs +++ b/tokio-util/src/sync/poll_semaphore.rs @@ -55,10 +55,13 @@ impl PollSemaphore { /// the `Waker` from the `Context` passed to the most recent call is /// scheduled to receive a wakeup. pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { - match ready!(self.permit_fut.poll(cx)) { + let result = ready!(self.permit_fut.poll(cx)); + + let next_fut = Arc::clone(&self.semaphore).acquire_owned(); + self.permit_fut.set(next_fut); + + match result { Ok(permit) => { - let next_fut = Arc::clone(&self.semaphore).acquire_owned(); - self.permit_fut.set(next_fut); Poll::Ready(Some(permit)) } Err(_closed) => Poll::Ready(None), diff --git a/tokio-util/tests/poll_semaphore.rs b/tokio-util/tests/poll_semaphore.rs new file mode 100644 index 00000000000..08682fcb826 --- /dev/null +++ b/tokio-util/tests/poll_semaphore.rs @@ -0,0 +1,38 @@ +use tokio_util::sync::PollSemaphore; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use std::sync::Arc; +use std::future::Future; +use std::task::Poll; + +type SemRet = Option; + +fn semaphore_poll<'a>( + sem: &'a mut PollSemaphore +) -> tokio_test::task::Spawn + 'a> +{ + let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx)); + tokio_test::task::spawn(fut) +} + +#[tokio::test] +async fn it_works() { + let sem = Arc::new(Semaphore::new(1)); + let mut poll_sem = PollSemaphore::new(sem.clone()); + + let permit = sem.acquire().await.unwrap(); + let mut poll = semaphore_poll(&mut poll_sem); + assert!(poll.poll().is_pending()); + drop(permit); + + assert!(matches!(poll.poll(), Poll::Ready(Some(_)))); + drop(poll); + + sem.close(); + + assert!(semaphore_poll(&mut poll_sem).await.is_none()); + + // Check that it is fused. + assert!(semaphore_poll(&mut poll_sem).await.is_none()); + assert!(semaphore_poll(&mut poll_sem).await.is_none()); +} + From d589d6a65b98a5f2b57c489bf4258fc10d0f02b1 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 5 Mar 2021 21:34:30 +0100 Subject: [PATCH 2/2] fmt --- tokio-util/src/sync/poll_semaphore.rs | 4 +--- tokio-util/tests/poll_semaphore.rs | 12 +++++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/tokio-util/src/sync/poll_semaphore.rs b/tokio-util/src/sync/poll_semaphore.rs index 2e2e8576518..6b22b0d633f 100644 --- a/tokio-util/src/sync/poll_semaphore.rs +++ b/tokio-util/src/sync/poll_semaphore.rs @@ -61,9 +61,7 @@ impl PollSemaphore { self.permit_fut.set(next_fut); match result { - Ok(permit) => { - Poll::Ready(Some(permit)) - } + Ok(permit) => Poll::Ready(Some(permit)), Err(_closed) => Poll::Ready(None), } } diff --git a/tokio-util/tests/poll_semaphore.rs b/tokio-util/tests/poll_semaphore.rs index 08682fcb826..0fdb3a446f7 100644 --- a/tokio-util/tests/poll_semaphore.rs +++ b/tokio-util/tests/poll_semaphore.rs @@ -1,15 +1,14 @@ -use tokio_util::sync::PollSemaphore; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; -use std::sync::Arc; use std::future::Future; +use std::sync::Arc; use std::task::Poll; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio_util::sync::PollSemaphore; type SemRet = Option; fn semaphore_poll<'a>( - sem: &'a mut PollSemaphore -) -> tokio_test::task::Spawn + 'a> -{ + sem: &'a mut PollSemaphore, +) -> tokio_test::task::Spawn + 'a> { let fut = futures::future::poll_fn(move |cx| sem.poll_acquire(cx)); tokio_test::task::spawn(fut) } @@ -35,4 +34,3 @@ async fn it_works() { assert!(semaphore_poll(&mut poll_sem).await.is_none()); assert!(semaphore_poll(&mut poll_sem).await.is_none()); } -