From f61d93d1d3ea218a1656e8d541ce88b4f50b52a9 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sat, 17 Jun 2023 10:10:27 +0100 Subject: [PATCH 1/3] poll_ready pending after WouldBlock errors --- src/lib.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 70e2b84..65eafac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -193,6 +193,11 @@ pub struct WebSocketStream { inner: WebSocket>, closing: bool, ended: bool, + /// Tungstenite is probably ready to receive more data. + /// + /// `false` once start_send hits `WouldBlock` errors. + /// `true` initially and after `flush`ing. + ready: bool, } impl WebSocketStream { @@ -226,7 +231,7 @@ impl WebSocketStream { } pub(crate) fn new(ws: WebSocket>) -> Self { - WebSocketStream { inner: ws, closing: false, ended: false } + Self { inner: ws, closing: false, ended: false, ready: true } } fn with_context(&mut self, ctx: Option<(ContextWaker, &mut Context<'_>)>, f: F) -> R @@ -322,18 +327,27 @@ where type Error = WsError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + if self.ready { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } } fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { match (*self).with_context(None, |s| s.write(item)) { - Ok(()) => Ok(()), + Ok(()) => { + self.ready = true; + Ok(()) + } Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { - // the message was accepted and queued - // isn't an error. + // the message was accepted and queued so not an error + // but `poll_ready` will start returning pending now. + self.ready = false; Ok(()) } Err(e) => { + self.ready = true; debug!("websocket start_send error: {}", e); Err(e) } @@ -341,6 +355,7 @@ where } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.ready = true; (*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| { // WebSocket connection has just been closed. Flushing completed, not an error. match r { @@ -351,6 +366,7 @@ where } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.ready = true; let res = if self.closing { // After queueing it, we call `flush` to drive the close handshake to completion. (*self).with_context(Some((ContextWaker::Write, cx)), |s| s.flush()) From a5e5600e68e3d3eb3bc697faa793c5a9701b49c9 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sat, 17 Jun 2023 11:13:04 +0100 Subject: [PATCH 2/3] poll_ready: try to flush when !ready --- src/lib.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 65eafac..71f9017 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -326,11 +326,15 @@ where { type Error = WsError; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.ready { Poll::Ready(Ok(())) } else { - Poll::Pending + // Currently blocked so try to flush the blockage away + (*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| { + self.ready = true; + r + }) } } @@ -355,10 +359,10 @@ where } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.ready = true; (*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| { - // WebSocket connection has just been closed. Flushing completed, not an error. + self.ready = true; match r { + // WebSocket connection has just been closed. Flushing completed, not an error. Err(WsError::ConnectionClosed) => Ok(()), other => other, } From 5f5a06cb838643768ddd610fbb189c92de61aeb0 Mon Sep 17 00:00:00 2001 From: Alex Butler Date: Sat, 17 Jun 2023 11:25:27 +0100 Subject: [PATCH 3/3] tweak doc --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 71f9017..734fc64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -346,7 +346,7 @@ where } Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { // the message was accepted and queued so not an error - // but `poll_ready` will start returning pending now. + // but `poll_ready` will now start trying to flush the block self.ready = false; Ok(()) }