Skip to content

Commit

Permalink
Merge pull request #1314 from asonix/main
Browse files Browse the repository at this point in the history
Store pending oneshot for subscriber future
  • Loading branch information
spacejam committed Apr 23, 2021
2 parents d2d15fa + ea34f1b commit 334e737
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 25 deletions.
6 changes: 5 additions & 1 deletion src/oneshot.rs
Expand Up @@ -63,8 +63,12 @@ impl<T> OneShot<T> {
/// returning an error if not filled
/// before a given timeout or if the
/// system shuts down before then.
///
/// Upon a successful receive, the
/// oneshot should be dropped, as it
/// will never yield that value again.
pub fn wait_timeout(
self,
&mut self,
mut timeout: Duration,
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
let mut inner = self.mu.lock();
Expand Down
60 changes: 36 additions & 24 deletions src/subscriber.rs
Expand Up @@ -113,6 +113,7 @@ type Senders = Map<usize, (Option<Waker>, SyncSender<OneShot<Option<Event>>>)>;
pub struct Subscriber {
id: usize,
rx: Receiver<OneShot<Option<Event>>>,
existing: Option<OneShot<Option<Event>>>,
home: Arc<RwLock<Senders>>,
}

Expand All @@ -128,12 +129,16 @@ impl Subscriber {
/// an error if no event arrives within the provided `Duration`
/// or if the backing `Db` shuts down.
pub fn next_timeout(
&self,
&mut self,
mut timeout: Duration,
) -> std::result::Result<Event, std::sync::mpsc::RecvTimeoutError> {
loop {
let start = Instant::now();
let future_rx = self.rx.recv_timeout(timeout)?;
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
self.rx.recv_timeout(timeout)?
};
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
timeout
Expand All @@ -142,8 +147,13 @@ impl Subscriber {
};

let start = Instant::now();
if let Some(event) = future_rx.wait_timeout(timeout)? {
return Ok(event);
match future_rx.wait_timeout(timeout) {
Ok(Some(event)) => return Ok(event),
Ok(None) => (),
Err(timeout_error) => {
self.existing = Some(future_rx);
return Err(timeout_error);
}
}
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
Expand All @@ -158,28 +168,30 @@ impl Subscriber {
impl Future for Subscriber {
type Output = Option<Event>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
match self.rx.try_recv() {
Ok(mut future_rx) => {
#[allow(unsafe_code)]
let future_rx =
unsafe { std::pin::Pin::new_unchecked(&mut future_rx) };

match Future::poll(future_rx, cx) {
Poll::Ready(Some(event)) => {
return Poll::Ready(event);
}
Poll::Ready(None) => {
continue;
}
Poll::Pending => {
return Poll::Pending;
}
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
match self.rx.try_recv() {
Ok(future_rx) => future_rx,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Poll::Ready(None)
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
};

match Future::poll(Pin::new(&mut future_rx), cx) {
Poll::Ready(Some(event)) => return Poll::Ready(event),
Poll::Ready(None) => continue,
Poll::Pending => {
self.existing = Some(future_rx);
return Poll::Pending;
}
}
}
let mut home = self.home.write();
Expand Down Expand Up @@ -257,7 +269,7 @@ impl Subscribers {

w_senders.insert(id, (None, tx));

Subscriber { id, rx, home: arc_senders.clone() }
Subscriber { id, rx, existing: None, home: arc_senders.clone() }
}

pub(crate) fn reserve_batch(
Expand Down

0 comments on commit 334e737

Please sign in to comment.