Skip to content

Commit

Permalink
Expand the store-pending pattern to Subscriber::next_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
asonix committed Apr 2, 2021
1 parent a7aa2a9 commit ea34f1b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/oneshot.rs
Expand Up @@ -63,8 +63,12 @@ impl<T> OneShot<T> {
/// returning an error if not filled
/// before a given timeout or if the
/// system shuts down before then.
///
/// Upon a successful receive, the
/// oneshot should be dropped, as it
/// will never yield that value again.
pub fn wait_timeout(
self,
&mut self,
mut timeout: Duration,
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
let mut inner = self.mu.lock();
Expand Down
17 changes: 13 additions & 4 deletions src/subscriber.rs
Expand Up @@ -129,12 +129,16 @@ impl Subscriber {
/// an error if no event arrives within the provided `Duration`
/// or if the backing `Db` shuts down.
pub fn next_timeout(
&self,
&mut self,
mut timeout: Duration,
) -> std::result::Result<Event, std::sync::mpsc::RecvTimeoutError> {
loop {
let start = Instant::now();
let future_rx = self.rx.recv_timeout(timeout)?;
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
self.rx.recv_timeout(timeout)?
};
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
timeout
Expand All @@ -143,8 +147,13 @@ impl Subscriber {
};

let start = Instant::now();
if let Some(event) = future_rx.wait_timeout(timeout)? {
return Ok(event);
match future_rx.wait_timeout(timeout) {
Ok(Some(event)) => return Ok(event),
Ok(None) => (),
Err(timeout_error) => {
self.existing = Some(future_rx);
return Err(timeout_error);
}
}
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
Expand Down

0 comments on commit ea34f1b

Please sign in to comment.