Skip to content

Commit

Permalink
Merge pull request #287 from alexheretic/poll-flush-pending
Browse files Browse the repository at this point in the history
`poll_ready` flush after `WouldBlock` errors
  • Loading branch information
daniel-abramov committed Jul 22, 2023
2 parents 5dada5d + 5f5a06c commit 6d8d11e
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions src/lib.rs
Expand Up @@ -193,6 +193,11 @@ pub struct WebSocketStream<S> {
inner: WebSocket<AllowStd<S>>,
closing: bool,
ended: bool,
/// Tungstenite is probably ready to receive more data.
///
/// `false` once start_send hits `WouldBlock` errors.
/// `true` initially and after `flush`ing.
ready: bool,
}

impl<S> WebSocketStream<S> {
Expand Down Expand Up @@ -226,7 +231,7 @@ impl<S> WebSocketStream<S> {
}

pub(crate) fn new(ws: WebSocket<AllowStd<S>>) -> Self {
WebSocketStream { inner: ws, closing: false, ended: false }
Self { inner: ws, closing: false, ended: false, ready: true }
}

fn with_context<F, R>(&mut self, ctx: Option<(ContextWaker, &mut Context<'_>)>, f: F) -> R
Expand Down Expand Up @@ -321,19 +326,32 @@ where
{
type Error = WsError;

fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.ready {
Poll::Ready(Ok(()))
} else {
// Currently blocked so try to flush the blockage away
(*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| {
self.ready = true;
r
})
}
}

fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match (*self).with_context(None, |s| s.write(item)) {
Ok(()) => Ok(()),
Ok(()) => {
self.ready = true;
Ok(())
}
Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// the message was accepted and queued
// isn't an error.
// the message was accepted and queued so not an error
// but `poll_ready` will now start trying to flush the block
self.ready = false;
Ok(())
}
Err(e) => {
self.ready = true;
debug!("websocket start_send error: {}", e);
Err(e)
}
Expand All @@ -342,15 +360,17 @@ where

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| {
// WebSocket connection has just been closed. Flushing completed, not an error.
self.ready = true;
match r {
// WebSocket connection has just been closed. Flushing completed, not an error.
Err(WsError::ConnectionClosed) => Ok(()),
other => other,
}
})
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.ready = true;
let res = if self.closing {
// After queueing it, we call `flush` to drive the close handshake to completion.
(*self).with_context(Some((ContextWaker::Write, cx)), |s| s.flush())
Expand Down

0 comments on commit 6d8d11e

Please sign in to comment.