Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Add is_closed method to mpsc senders #2726

Merged
merged 15 commits into from Sep 28, 2020

Conversation

MikailBag
Copy link
Contributor

Motivation

Closes #2469

Solution

I should note that I looked at mpsc internals first time, so I can be wrong / selected wrong implementation path.
I discovered that both Sender kinds (bounded and Unbounded) implement on top of tokio::sync::mpsc::Chan, which in turn uses Semaphore trait. I added new method is_closed to this trait. I was able to implement it properly for unbounded implementation.
However, I didn't find beautiful way to check if semaphore_ll is closed, so I implemented hack (we try to acquire new permit and immediately release it back). While it is inefficient and probably ugly, from the user side it should work OK (for example, it does not mess Sender internal state and it does not take mutable reference to sender).

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync labels Jul 30, 2020
@zeroed
Copy link
Contributor

zeroed commented Jul 31, 2020

Thanks for exploring this issue and giving it a try; I'm still very interested in seeing an approach to this feature.
I did my exploration when it was marked as E-easy (^_^) but unfortunately I had to lose traction on it.

@LucioFranco
Copy link
Member

@zeroed This was likely not a E-easy issue...sorry 😄

@LucioFranco
Copy link
Member

@MikailBag looks like the loom ci test is failing, could you fix that up?

Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good and simple start, I want to make sure loom is happy though. Also would be good to add more tests.

tokio/src/sync/mpsc/bounded.rs Show resolved Hide resolved
@@ -432,6 +438,27 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) {
self.0.add_permits(1)
}

fn is_closed(&self) -> bool {
// TODO find more efficient way
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good if we can open an issue for this TODO.

@MikailBag
Copy link
Contributor Author

IIUC these are compile errors, because in loom mode Arc != std::sync::Arc.

I'll be able to fix these problems (and doc-comment) around 17th of August.

@LucioFranco
Copy link
Member

@MikailBag 👍 no rush, feel free to ping me here or discord when you get a new revision up.

@zeroed
Copy link
Contributor

zeroed commented Aug 12, 2020

@LucioFranco:

This was likely not a E-easy issue...sorry smile

Please, don't be: It has been an interesting learning experience. Mikail anticipated me with the "permits" approach and it's more than fine. Thanks!

@@ -488,6 +515,10 @@ impl Semaphore for AtomicUsize {
}
}

fn is_closed(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean unbounded channels should not have fn is_closed?

@LucioFranco LucioFranco requested a review from hawkw August 20, 2020 19:28
Comment on lines 442 to 459
// TODO find more efficient way
struct NoopWaker;
impl crate::util::Wake for NoopWaker {
fn wake(self: std::sync::Arc<Self>) {}
fn wake_by_ref(_arc_self: &std::sync::Arc<Self>) {}
}
let waker = std::sync::Arc::new(NoopWaker);
let waker = crate::util::waker_ref(&waker);
let mut noop_cx = std::task::Context::from_waker(&*waker);
let mut permit = Permit::new();
match permit.poll_acquire(&mut noop_cx, 1, &self.0) {
Poll::Ready(Err(_)) => true,
Poll::Ready(Ok(())) => {
permit.release(1, &self.0);
false
}
Poll::Pending => false,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that the semaphore's state field has a bit that we can read to check if it's closed:

/// Semaphore has been closed, no more permits will be issued.
const CLOSED: usize = 0b10;

For example, we use this here:

// Load the current state
let mut curr = SemState(self.state.load(Acquire));
// Saves a ref to the waiter node
let mut maybe_waiter: Option<NonNull<Waiter>> = None;
/// Used in branches where we attempt to push the waiter into the wait
/// queue but fail due to permits becoming available or the wait queue
/// transitioning to "closed". In this case, the waiter must be
/// transitioned back to the "idle" state.
macro_rules! revert_to_idle {
() => {
if let Some(waiter) = maybe_waiter {
unsafe { waiter.as_ref() }.revert_to_idle();
}
};
}
loop {
let mut next = curr;
if curr.is_closed() {
revert_to_idle!();
return Ready(Err(AcquireError::closed()));
}

I think it should be possible to have an is_closed method on semaphore_ll that looks something like this:

impl Semaphore {
    // ...

    pub(crate) fn is_closed(&self) -> bool {
        SemState(self.state.load(Acquire)).is_closed()
    }
    
    // ...
}

There's additional logic that occurs when closing a semaphore, to assign permits to any pending waiters prior to marking the semaphore as closed. However, if this process is currently taking place, I think it's correct to not return true from is_closed yet. So AFAICT, this implementation ought to be valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added is_closed method to semaphore_ll and now it is used instead of hacks with null waker.

@LucioFranco
Copy link
Member

@hawkw can you take another look if you get the chance?

@MikailBag
Copy link
Contributor Author

I've rebased onto master.
Now diff is pretty minimal.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some documentation suggestions.

tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my last comment.

tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
Co-authored-by: Alice Ryhl <alice@ryhl.io>
Copy link
Member

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@LucioFranco LucioFranco changed the title Add is_closed method to mpsc senders sync: Add is_closed method to mpsc senders Sep 28, 2020
@LucioFranco LucioFranco merged commit 078d0a2 into tokio-rs:master Sep 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

mpsc channel should have an is_closed function
5 participants