Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store pending oneshot for subscriber future #1314

Merged
merged 3 commits into from Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/oneshot.rs
Expand Up @@ -63,8 +63,12 @@ impl<T> OneShot<T> {
/// returning an error if not filled
/// before a given timeout or if the
/// system shuts down before then.
///
/// Upon a successful receive, the
/// oneshot should be dropped, as it
/// will never yield that value again.
pub fn wait_timeout(
self,
&mut self,
mut timeout: Duration,
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
let mut inner = self.mu.lock();
Expand Down
60 changes: 36 additions & 24 deletions src/subscriber.rs
Expand Up @@ -113,6 +113,7 @@ type Senders = Map<usize, (Option<Waker>, SyncSender<OneShot<Option<Event>>>)>;
pub struct Subscriber {
id: usize,
rx: Receiver<OneShot<Option<Event>>>,
existing: Option<OneShot<Option<Event>>>,
home: Arc<RwLock<Senders>>,
}

Expand All @@ -128,12 +129,16 @@ impl Subscriber {
/// an error if no event arrives within the provided `Duration`
/// or if the backing `Db` shuts down.
pub fn next_timeout(
&self,
&mut self,
mut timeout: Duration,
) -> std::result::Result<Event, std::sync::mpsc::RecvTimeoutError> {
loop {
let start = Instant::now();
let future_rx = self.rx.recv_timeout(timeout)?;
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
self.rx.recv_timeout(timeout)?
};
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
timeout
Expand All @@ -142,8 +147,13 @@ impl Subscriber {
};

let start = Instant::now();
if let Some(event) = future_rx.wait_timeout(timeout)? {
return Ok(event);
match future_rx.wait_timeout(timeout) {
Ok(Some(event)) => return Ok(event),
Ok(None) => (),
Err(timeout_error) => {
self.existing = Some(future_rx);
return Err(timeout_error);
}
}
timeout =
if let Some(timeout) = timeout.checked_sub(start.elapsed()) {
Expand All @@ -158,28 +168,30 @@ impl Subscriber {
impl Future for Subscriber {
type Output = Option<Event>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
loop {
match self.rx.try_recv() {
Ok(mut future_rx) => {
#[allow(unsafe_code)]
let future_rx =
unsafe { std::pin::Pin::new_unchecked(&mut future_rx) };

match Future::poll(future_rx, cx) {
Poll::Ready(Some(event)) => {
return Poll::Ready(event);
}
Poll::Ready(None) => {
continue;
}
Poll::Pending => {
return Poll::Pending;
}
let mut future_rx = if let Some(future_rx) = self.existing.take() {
future_rx
} else {
match self.rx.try_recv() {
Ok(future_rx) => future_rx,
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
return Poll::Ready(None)
}
}
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return Poll::Ready(None),
};

match Future::poll(Pin::new(&mut future_rx), cx) {
Poll::Ready(Some(event)) => return Poll::Ready(event),
Poll::Ready(None) => continue,
Poll::Pending => {
self.existing = Some(future_rx);
return Poll::Pending;
}
}
}
let mut home = self.home.write();
Expand Down Expand Up @@ -257,7 +269,7 @@ impl Subscribers {

w_senders.insert(id, (None, tx));

Subscriber { id, rx, home: arc_senders.clone() }
Subscriber { id, rx, existing: None, home: arc_senders.clone() }
}

pub(crate) fn reserve_batch(
Expand Down