Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SplitSink performance problem #1969

Merged
merged 3 commits into from Nov 13, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 20 additions & 9 deletions futures-util/src/stream/stream/split.rs
Expand Up @@ -61,6 +61,23 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}
}

impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option<Item>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
if slot.is_some() {
ready!(inner.as_mut().poll_ready(cx))?;
Poll::Ready(inner.start_send(slot.take().unwrap()))
} else {
Poll::Ready(Ok(()))
}
}

fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx)
}
}

impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
type Error = S::Error;

Expand All @@ -69,7 +86,7 @@ impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
if self.slot.is_none() {
return Poll::Ready(Ok(()));
}
ready!(self.as_mut().poll_flush(cx))?;
ready!(self.as_mut().poll_lock_and_flush_slot(cx))?;
}
}

Expand All @@ -81,20 +98,14 @@ impl<S: Sink<Item>, Item> Sink<Item> for SplitSink<S, Item> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
if this.slot.is_some() {
ready!(inner.as_pin_mut().poll_ready(cx))?;
inner.as_pin_mut().start_send(this.slot.take().unwrap())?;
}
ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the this.lock.poll_lock(cx) out of poll_flush_slot to avoid the need to poll_lock again to get the inner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, I missed that it needs the lock across both operations.

inner.as_pin_mut().poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
let this = &mut *self;
let mut inner = ready!(this.lock.poll_lock(cx));
if this.slot.is_some() {
ready!(inner.as_pin_mut().poll_ready(cx))?;
inner.as_pin_mut().start_send(this.slot.take().unwrap())?
}
ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?;
inner.as_pin_mut().poll_close(cx)
}
}
Expand Down