Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make oneshot::Sender::poll_closed public again #3032

Merged
merged 2 commits into from Oct 26, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
136 changes: 88 additions & 48 deletions tokio/src/sync/oneshot.rs
Expand Up @@ -196,54 +196,6 @@ impl<T> Sender<T> {
Ok(())
}

fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

let mut state = State::load(&inner.state, Acquire);

if state.is_closed() {
coop.made_progress();
return Poll::Ready(());
}

if state.is_tx_task_set() {
let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };

if !will_notify {
state = State::unset_tx_task(&inner.state);

if state.is_closed() {
// Set the flag again so that the waker is released in drop
State::set_tx_task(&inner.state);
coop.made_progress();
return Ready(());
} else {
unsafe { inner.drop_tx_task() };
}
}
}

if !state.is_tx_task_set() {
// Attempt to set the task
unsafe {
inner.set_tx_task(cx);
}

// Update the state
state = State::set_tx_task(&inner.state);

if state.is_closed() {
coop.made_progress();
return Ready(());
}
}

Pending
}

/// Waits for the associated [`Receiver`] handle to close.
///
/// A [`Receiver`] is closed by either calling [`close`] explicitly or the
Expand Down Expand Up @@ -350,6 +302,94 @@ impl<T> Sender<T> {
let state = State::load(&inner.state, Acquire);
state.is_closed()
}

/// Check whether the oneshot channel has been closed, and if not, schedules the
/// `Waker` in the provided `Context` to receive a notification when the channel is
/// closed.
///
/// A [`Receiver`] is closed by either calling [`close`] explicitly, or when the
/// [`Receiver`] value is dropped.
///
/// Note that on multiple calls to poll, only the `Waker` from the `Context` passed
/// to the most recent call will be scheduled to receive a wakeup.
///
/// [`Receiver`]: struct@crate::sync::oneshot::Receiver
/// [`close`]: fn@crate::sync::oneshot::Receiver::close
///
/// # Return value
///
/// This function returns:
///
/// * `Poll::Pending` if the channel is still open.
/// * `Poll::Ready(())` if the channel is closed.
///
/// # Examples
///
/// ```
/// use tokio::sync::oneshot;
///
/// use futures::future::poll_fn;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut tx, mut rx) = oneshot::channel::<()>();
///
/// tokio::spawn(async move {
/// rx.close();
/// });
///
/// poll_fn(|cx| tx.poll_closed(cx)).await;
///
/// println!("the receiver dropped");
/// }
/// ```
pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

let mut state = State::load(&inner.state, Acquire);

if state.is_closed() {
coop.made_progress();
return Poll::Ready(());
}

if state.is_tx_task_set() {
let will_notify = unsafe { inner.with_tx_task(|w| w.will_wake(cx.waker())) };

if !will_notify {
state = State::unset_tx_task(&inner.state);

if state.is_closed() {
// Set the flag again so that the waker is released in drop
State::set_tx_task(&inner.state);
coop.made_progress();
return Ready(());
} else {
unsafe { inner.drop_tx_task() };
}
}
}

if !state.is_tx_task_set() {
// Attempt to set the task
unsafe {
inner.set_tx_task(cx);
}

// Update the state
state = State::set_tx_task(&inner.state);

if state.is_closed() {
coop.made_progress();
return Ready(());
}
}

Pending
}
}

impl<T> Drop for Sender<T> {
Expand Down