From 26cc0e6b214a31e2f01716c4388ff2562af2a09c Mon Sep 17 00:00:00 2001 From: doyoubi Date: Mon, 4 Nov 2019 21:28:42 +0800 Subject: [PATCH 1/2] Fix SplitSink performance problem caused by flushing inner sink every time --- futures-util/src/stream/stream/split.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 28d7aa71a0..43853edb5f 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -61,6 +61,19 @@ impl + Unpin, Item> SplitSink { } } +impl, Item> SplitSink { + fn poll_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + let mut inner = ready!(this.lock.poll_lock(cx)); + if this.slot.is_some() { + ready!(inner.as_pin_mut().poll_ready(cx))?; + Poll::Ready(inner.as_pin_mut().start_send(this.slot.take().unwrap())) + } else { + Poll::Ready(Ok(())) + } + } +} + impl, Item> Sink for SplitSink { type Error = S::Error; @@ -69,7 +82,7 @@ impl, Item> Sink for SplitSink { if self.slot.is_none() { return Poll::Ready(Ok(())); } - ready!(self.as_mut().poll_flush(cx))?; + ready!(self.as_mut().poll_flush_slot(cx))?; } } From a667b8b0c9a87ce921e176ee03dd3946db30be33 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Wed, 13 Nov 2019 09:48:14 +0800 Subject: [PATCH 2/2] Remove duplicated codes --- futures-util/src/stream/stream/split.rs | 28 ++++++++++++------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 43853edb5f..4118b33462 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -62,16 +62,20 @@ impl + Unpin, Item> SplitSink { } impl, Item> SplitSink { - fn poll_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = &mut *self; - let mut inner = ready!(this.lock.poll_lock(cx)); - if this.slot.is_some() { - ready!(inner.as_pin_mut().poll_ready(cx))?; - Poll::Ready(inner.as_pin_mut().start_send(this.slot.take().unwrap())) + fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option, cx: &mut Context<'_>) -> Poll> { + if slot.is_some() { + ready!(inner.as_mut().poll_ready(cx))?; + Poll::Ready(inner.start_send(slot.take().unwrap())) } else { Poll::Ready(Ok(())) } } + + fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + let mut inner = ready!(this.lock.poll_lock(cx)); + Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) + } } impl, Item> Sink for SplitSink { @@ -82,7 +86,7 @@ impl, Item> Sink for SplitSink { if self.slot.is_none() { return Poll::Ready(Ok(())); } - ready!(self.as_mut().poll_flush_slot(cx))?; + ready!(self.as_mut().poll_lock_and_flush_slot(cx))?; } } @@ -94,20 +98,14 @@ impl, Item> Sink for SplitSink { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); - if this.slot.is_some() { - ready!(inner.as_pin_mut().poll_ready(cx))?; - inner.as_pin_mut().start_send(this.slot.take().unwrap())?; - } + ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); - if this.slot.is_some() { - ready!(inner.as_pin_mut().poll_ready(cx))?; - inner.as_pin_mut().start_send(this.slot.take().unwrap())? - } + ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_close(cx) } }