diff --git a/src/oneshot.rs b/src/oneshot.rs index a73fea7f5..5b809e184 100644 --- a/src/oneshot.rs +++ b/src/oneshot.rs @@ -63,8 +63,12 @@ impl OneShot { /// returning an error if not filled /// before a given timeout or if the /// system shuts down before then. + /// + /// Upon a successful receive, the + /// oneshot should be dropped, as it + /// will never yield that value again. pub fn wait_timeout( - self, + &mut self, mut timeout: Duration, ) -> Result { let mut inner = self.mu.lock(); diff --git a/src/subscriber.rs b/src/subscriber.rs index ef3a1ac2f..8613fa8f2 100644 --- a/src/subscriber.rs +++ b/src/subscriber.rs @@ -129,12 +129,16 @@ impl Subscriber { /// an error if no event arrives within the provided `Duration` /// or if the backing `Db` shuts down. pub fn next_timeout( - &self, + &mut self, mut timeout: Duration, ) -> std::result::Result { loop { let start = Instant::now(); - let future_rx = self.rx.recv_timeout(timeout)?; + let mut future_rx = if let Some(future_rx) = self.existing.take() { + future_rx + } else { + self.rx.recv_timeout(timeout)? + }; timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) { timeout @@ -143,8 +147,13 @@ impl Subscriber { }; let start = Instant::now(); - if let Some(event) = future_rx.wait_timeout(timeout)? { - return Ok(event); + match future_rx.wait_timeout(timeout) { + Ok(Some(event)) => return Ok(event), + Ok(None) => (), + Err(timeout_error) => { + self.existing = Some(future_rx); + return Err(timeout_error); + } } timeout = if let Some(timeout) = timeout.checked_sub(start.elapsed()) {