diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index 28d7aa71a0..4118b33462 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -61,6 +61,23 @@ impl + Unpin, Item> SplitSink { } } +impl, Item> SplitSink { + fn poll_flush_slot(mut inner: Pin<&mut S>, slot: &mut Option, cx: &mut Context<'_>) -> Poll> { + if slot.is_some() { + ready!(inner.as_mut().poll_ready(cx))?; + Poll::Ready(inner.start_send(slot.take().unwrap())) + } else { + Poll::Ready(Ok(())) + } + } + + fn poll_lock_and_flush_slot(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = &mut *self; + let mut inner = ready!(this.lock.poll_lock(cx)); + Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx) + } +} + impl, Item> Sink for SplitSink { type Error = S::Error; @@ -69,7 +86,7 @@ impl, Item> Sink for SplitSink { if self.slot.is_none() { return Poll::Ready(Ok(())); } - ready!(self.as_mut().poll_flush(cx))?; + ready!(self.as_mut().poll_lock_and_flush_slot(cx))?; } } @@ -81,20 +98,14 @@ impl, Item> Sink for SplitSink { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); - if this.slot.is_some() { - ready!(inner.as_pin_mut().poll_ready(cx))?; - inner.as_pin_mut().start_send(this.slot.take().unwrap())?; - } + ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; let mut inner = ready!(this.lock.poll_lock(cx)); - if this.slot.is_some() { - ready!(inner.as_pin_mut().poll_ready(cx))?; - inner.as_pin_mut().start_send(this.slot.take().unwrap())? - } + ready!(Self::poll_flush_slot(inner.as_pin_mut(), &mut this.slot, cx))?; inner.as_pin_mut().poll_close(cx) } }