New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync: tweak watch
API
#2814
sync: tweak watch
API
#2814
Conversation
Decouples getting the latest `watch` value from receiving the change notification. The `Receiver` async method becomes `Receiver::changed()`. The latest value is obtained from `Receiver::borrow()`. The implementation is updated to use `Notify`. This requires adding `Notify::notify_waiters`. This method is generally useful but is kept private for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is really nice --- the new API is great and i'm pleased by how simple the implementation is now. i had a few questions and minor nits.
// not memory access. | ||
if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) { | ||
// This is the last `Receiver` handle, tasks waiting on `Sender::closed()` | ||
self.shared.notify_tx.notify_waiters(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify_waiters
can theoretically panic (if, e.g. the mutex is poisoned). should we change it so that it can't panic, since it's called in Drop
impls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it can panic unless there is a bug internal to Watch
. IMO we only need to defend against runtime conditions and not bugs (at the module boundary).
} | ||
|
||
// There are waiters, the lock must be acquired to notify. | ||
let mut waiters = self.waiters.lock().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we call notify_waiters
in drop impls in a couple places. should this not panic if the lock is poisoned?
i think we still want to remove the waiters regardless of whether the lock is poisoned, so this should probably be
let mut waiters = self.waiters.lock().unwrap(); | |
let mut waiters = self.waiters.lock().unwrap_or_else(PoisonError::into_inner); |
since parking_lot
doesn't have poisoning, the current code is already equivalent to this for a lot of production users...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really worried about lock poisoning here. In general, I rather pretend lock poisoning isn't a thing for internal usage. This is generally how I think about Mutex across the code base. I guess we should update our internal Mutex
shim to abort the process instead of panic then? That would be a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should update our internal
Mutex
shim to abort the process instead of panic then? That would be a separate PR?
I think it's better to update the internal mutex shim to make locking infallible by ignoring poisoned mutex errors via into_inner
. This would result in essentially the same behavior as we get with parking_lot
regardless of whether or not the parking_lot
feature is enabled. Aborting the process any time a mutex is poisoned could potentially make a lot of errors, including in user code, much harder to debug.
Co-authored-by: Eliza Weisman <eliza@buoyant.io>
use loom::thread; | ||
|
||
#[test] | ||
fn smoke() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test catches the issues that were mentioned by @hawkw
|
||
// Polling the future once is guaranteed to return `Pending` as `watch` | ||
// only notifies using `notify_waiters`. | ||
crate::future::poll_fn(|cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This forces a waiter to be pushed by Notify
. I thought about moving it into a fn, but I opted to punt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, this seems fine for now!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think that if we are going to ignore mutex poisoning, we should be doing so by calling LockResult::into_inner
to get a MutexGuard
even if the lock is poisoned, rather than unwrap()
ing. But, we should probably make that change everywhere, so it doesn't have to be fixed in this PR.
Everything looks good to me! Thanks for the new test.
|
||
// Polling the future once is guaranteed to return `Pending` as `watch` | ||
// only notifies using `notify_waiters`. | ||
crate::future::poll_fn(|cx| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, this seems fine for now!
@hawkw I don't disagree w/ you w.r.t mutex poisoning. I'm just saying it should be done in one swoop across all of Tokio in a single PR :). I'm mostly just following the current style. Feel free to open a tracking issue for the mutex poisoning and assign it to 0.3. |
Yup, we're in alignment on this one --- as I said in my comment, we should probably make this change globally. :) |
Decouples getting the latest
watch
value from receiving the change notification. TheReceiver
async method becomesReceiver::changed()
. The latest value is obtained fromReceiver::borrow()
.The implementation is updated to use
Notify
. This requires addingNotify::notify_waiters
. This method is generally useful but is kept private for now.