diff --git a/src/oneshot.rs b/src/oneshot.rs index a73fea7f5..5b809e184 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -63,8 +63,12 @@ impl OneShot { /// returning an error if not filled /// before a given timeout or if the /// system shuts down before then. + /// + /// Upon a successful receive, the + /// oneshot should be dropped, as it + /// will never yield that value again. pub fn wait_timeout( - self, + &mut self, mut timeout: Duration, ) -> Result { let mut inner = self.mu.lock(); diff --git a/src/subscriber.rs b/src/subscriber.rs index 3721899b2..8613fa8f2 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -113,6 +113,7 @@ type Senders = Map, SyncSender>>)>; pub struct Subscriber { id: usize, rx: Receiver>>, + existing: Option>>, home: Arc>, } @@ -128,12 +129,16 @@ impl Subscriber { /// an error if no event arrives within the provided `Duration` /// or if the backing `Db` shuts down. pub fn next_timeout( - &self, + &mut self, mut timeout: Duration, ) -> std::result::Result { loop { let start = Instant::now(); - let future_rx = self.rx.recv_timeout(timeout)?; + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + self.rx.recv_timeout(timeout)? + }; timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) { timeout @@ -142,8 +147,13 @@ impl Subscriber { }; let start = Instant::now(); - if let Some(event) = future_rx.wait_timeout(timeout)? { - return Ok(event); + match future_rx.wait_timeout(timeout) { + Ok(Some(event)) => return Ok(event), + Ok(None) => (), + Err(timeout_error) => { + self.existing = Some(future_rx); + return Err(timeout_error); + } } timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) { @@ -158,28 +168,30 @@ impl Subscriber { impl Future for Subscriber { type Output = Option; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll { loop { - match self.rx.try_recv() { - Ok(mut future_rx) => { - #[allow(unsafe_code)] - let future_rx = - unsafe { std::pin::Pin::new_unchecked(&mut future_rx) }; - - match Future::poll(future_rx, cx) { - Poll::Ready(Some(event)) => { - return Poll::Ready(event); - } - Poll::Ready(None) => { - continue; - } - Poll::Pending => { - return Poll::Pending; - } + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + match self.rx.try_recv() { + Ok(future_rx) => future_rx, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + return Poll::Ready(None) } } - Err(TryRecvError::Empty) => break, - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + }; + + match Future::poll(Pin::new(&mut future_rx), cx) { + Poll::Ready(Some(event)) => return Poll::Ready(event), + Poll::Ready(None) => continue, + Poll::Pending => { + self.existing = Some(future_rx); + return Poll::Pending; + } } } let mut home = self.home.write(); @@ -257,7 +269,7 @@ impl Subscribers { w_senders.insert(id, (None, tx)); - Subscriber { id, rx, home: arc_senders.clone() } + Subscriber { id, rx, existing: None, home: arc_senders.clone() } } pub(crate) fn reserve_batch(