New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Documenting cancel safety of SinkExt::send()
#2754
Comments
Going to be using So there's an apparent contradiction here with @Darksonn's comment at https://users.rust-lang.org/t/is-tokio-codec-framed-cancel-safe/86408/5. But I think there might be slightly different notions of cancel safety being talked about here. In this case what we're concerned about is:
Since This means that the most idiomatic way to use I think one solution is to define new I'm going to try and implement these methods in a separate crate to give this a shot. (This is akin to how tokio's |
Another potential new API, which might be a bit nicer to use than the In this API, the user awaits a For example, using a feed- // The sink.
let mut sink = /* ... */;
// The item to send.
let item = /* ... */;
let mut item = Some(item);
futures::select! {
// where `try_feed` is the new API
_ = sink.try_feed(&mut item) => {
// item sent, do other stuff...
}
_ = some_other_future() => {
// do other stuff...
}
}
if let Some(item) = item {
// item is still there, `try_feed` was cancelled!
// do something else...
} On the other hand, with a // The sink.
let mut sink = /* ... */;
// The item to send.
let item = /* ... */;
futures::select! {
Ok(permit) = sink.reserve() => {
// we got a permit, send the item!
permit.feed(item);
// the item is sending, do other stuff...
// (we can guarantee that the item has been fed to the sink,
// since `Permit::feed` is synchronous and cannot be cancelled).
}
_ = some_other_future() => {
// do other stuff...
}
} This also has an interesting side benefit over the Of course, this API does not handle the case where the A |
@taiki-e I have an implementation working: pub trait SinkExt<Item>: Sink<Item> {
fn reserve(&mut self) -> Reserve<'_, Self, Item>
where
Self: Unpin,
{ /* ... */ }
fn flush_reserve(&mut self) -> FlushReserve<'_, Self, Item>
where
Self: Unpin,
{ /* ... */ }
}
#[derive(Debug)]
#[must_use]
pub struct Permit<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
_phantom: PhantomData<fn(Item)>,
}
impl<'a, Item, Si: Sink<Item> + Unpin + ?Sized> Permit<'a, Si, Item> {
pub fn feed(self, item: Item) -> Result<(), Si::Error> {
Pin::new(self.sink).start_send(item)
}
pub fn send(mut self, item: Item) -> Result<Flush<'a, Si, Item>, Si::Error> {
Pin::new(&mut self.sink).start_send(item)?;
Ok(self.sink.flush())
}
} Would you be willing to accept this in the futures crate? The alternative is that we maintain our own crate that implements this. |
I've put up https://github.com/oxidecomputer/cancel-safe-futures for this. |
Just wanted to follow up and say that the reserve pattern, as implemented in |
I think
SinkExt::send()
is not cancel safe:send()
is called, theitem
is moved into aSend
(and from there into its innerFeed
)Send
is polled, it waits until itsfeed
is ready, which in turn waits until the underlyingsink
is ready.Send
is dropped before it becomes ready, theitem
will be dropped with theSend
and never sent to the underlying sink.I tried to confirm this with a custom
Sink
that is not ready for some period of time and aselect!
that tries tosend()
items to it with a timeout: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=2608a62c8695fbcfbb22d1856bfe930a. The first two items (which time out before the sink is ready) are never delivered.A couple of questions:
StreamExt::send()
's docs to note that it is not cancel safe?The text was updated successfully, but these errors were encountered: