From 540e26c4b87cb5469b30c5e20fca237377309a76 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 1 Apr 2021 18:32:44 -0500 Subject: [PATCH 1/3] Store pending oneshot for subscriber future --- src/subscriber.rs | 47 ++++++++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/subscriber.rs b/src/subscriber.rs index 3721899b2..de1e7409f 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -113,6 +113,7 @@ type Senders = Map, SyncSender>>)>; pub struct Subscriber { id: usize, rx: Receiver>>, + existing: Option>>, home: Arc>, } @@ -153,29 +154,41 @@ impl Subscriber { }; } } + + fn poll_oneshot( + self: &mut Pin<&mut Self>, + mut oneshot: OneShot>, + cx: &mut Context<'_>, + ) -> Option>> { + match Future::poll(Pin::new(&mut oneshot), cx) { + Poll::Ready(Some(event)) => Some(Poll::Ready(event)), + Poll::Ready(None) => None, + Poll::Pending => { + self.existing = Some(oneshot); + Some(Poll::Pending) + } + } + } } impl Future for Subscriber { type Output = Option; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { loop { + if let Some(future_rx) = self.existing.take() { + if let Some(poll) = self.poll_oneshot(future_rx, cx) { + return poll; + } + } + match self.rx.try_recv() { - Ok(mut future_rx) => { - #[allow(unsafe_code)] - let future_rx = - unsafe { std::pin::Pin::new_unchecked(&mut future_rx) }; - - match Future::poll(future_rx, cx) { - Poll::Ready(Some(event)) => { - return Poll::Ready(event); - } - Poll::Ready(None) => { - continue; - } - Poll::Pending => { - return Poll::Pending; - } + Ok(future_rx) => { + if let Some(poll) = self.poll_oneshot(future_rx, cx) { + return poll; } } Err(TryRecvError::Empty) => break, @@ -257,7 +270,7 @@ impl Subscribers { w_senders.insert(id, (None, tx)); - Subscriber { id, rx, home: arc_senders.clone() } + Subscriber { id, rx, existing: None, home: arc_senders.clone() } } pub(crate) fn reserve_batch( From a7aa2a9d9e413e01a5cc35dcc951529942d9513c Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 1 Apr 2021 18:46:25 -0500 Subject: [PATCH 2/3] Follow @rrichardson's lead and only poll oneshots in one place --- src/subscriber.rs | 44 +++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/subscriber.rs b/src/subscriber.rs index de1e7409f..ef3a1ac2f 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -154,21 +154,6 @@ impl Subscriber { }; } } - - fn poll_oneshot( - self: &mut Pin<&mut Self>, - mut oneshot: OneShot>, - cx: &mut Context<'_>, - ) -> Option>> { - match Future::poll(Pin::new(&mut oneshot), cx) { - Poll::Ready(Some(event)) => Some(Poll::Ready(event)), - Poll::Ready(None) => None, - Poll::Pending => { - self.existing = Some(oneshot); - Some(Poll::Pending) - } - } - } } impl Future for Subscriber { @@ -179,20 +164,25 @@ impl Future for Subscriber { cx: &mut Context<'_>, ) -> Poll { loop { - if let Some(future_rx) = self.existing.take() { - if let Some(poll) = self.poll_oneshot(future_rx, cx) { - return poll; - } - } - - match self.rx.try_recv() { - Ok(future_rx) => { - if let Some(poll) = self.poll_oneshot(future_rx, cx) { - return poll; + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + match self.rx.try_recv() { + Ok(future_rx) => future_rx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Poll::Ready(None) } } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + }; + + match Future::poll(Pin::new(&mut future_rx), cx) { + Poll::Ready(Some(event)) => return Poll::Ready(event), + Poll::Ready(None) => continue, + Poll::Pending => { + self.existing = Some(future_rx); + return Poll::Pending; + } } } let mut home = self.home.write(); From ea34f1b80eb86bf9de2eb95bc8c5c0186c25d1df Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 1 Apr 2021 19:30:12 -0500 Subject: [PATCH 3/3] Expand the store-pending pattern to Subscriber::next_timeout --- src/oneshot.rs | 6 +++++- src/subscriber.rs | 17 +++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/oneshot.rs b/src/oneshot.rs index a73fea7f5..5b809e184 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -63,8 +63,12 @@ impl OneShot { /// returning an error if not filled /// before a given timeout or if the /// system shuts down before then. + /// + /// Upon a successful receive, the + /// oneshot should be dropped, as it + /// will never yield that value again. pub fn wait_timeout( - self, + &mut self, mut timeout: Duration, ) -> Result { let mut inner = self.mu.lock(); diff --git a/src/subscriber.rs b/src/subscriber.rs index ef3a1ac2f..8613fa8f2 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -129,12 +129,16 @@ impl Subscriber { /// an error if no event arrives within the provided `Duration` /// or if the backing `Db` shuts down. pub fn next_timeout( - &self, + &mut self, mut timeout: Duration, ) -> std::result::Result { loop { let start = Instant::now(); - let future_rx = self.rx.recv_timeout(timeout)?; + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + self.rx.recv_timeout(timeout)? + }; timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) { timeout @@ -143,8 +147,13 @@ impl Subscriber { }; let start = Instant::now(); - if let Some(event) = future_rx.wait_timeout(timeout)? { - return Ok(event); + match future_rx.wait_timeout(timeout) { + Ok(Some(event)) => return Ok(event), + Ok(None) => (), + Err(timeout_error) => { + self.existing = Some(future_rx); + return Err(timeout_error); + } } timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {