Skip to content

Commit

Permalink
Merge pull request #284 from alexheretic/flush-writes-less
Browse files Browse the repository at this point in the history
Remove flush on each WebSocketStream::poll_ready call + update to tungstenite 0.20
  • Loading branch information
agalakhov committed Jun 2, 2023
2 parents a6c2d13 + f4dcfec commit 36b9d94
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ futures-util = { version = "0.3.28", default-features = false, features = ["sink
tokio = { version = "1.0.0", default-features = false, features = ["io-util"] }

[dependencies.tungstenite]
version = "0.19.0"
# TODO use 0.20 release
# version = "0.19.0"
git = "https://github.com/snapview/tungstenite-rs"
default-features = false

[dependencies.native-tls-crate]
Expand Down
4 changes: 2 additions & 2 deletions examples/server-headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ fn client() {
debug!("* {}: {:?}", header, _value);
}

socket.write_message(Message::Text("Hello WebSocket".into())).unwrap();
socket.send(Message::Text("Hello WebSocket".into())).unwrap();
loop {
let msg = socket.read_message().expect("Error reading message");
let msg = socket.read().expect("Error reading message");
debug!("Received: {}", msg);
}
}
16 changes: 8 additions & 8 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ where
}

match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| {
trace!("{}:{} Stream.with_context poll_next -> read_message()", file!(), line!());
cvt(s.read_message())
trace!("{}:{} Stream.with_context poll_next -> read()", file!(), line!());
cvt(s.read())
})) {
Ok(v) => Poll::Ready(Some(Ok(v))),
Err(e) => {
Expand Down Expand Up @@ -321,12 +321,12 @@ where
{
type Error = WsError;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.write_pending()))
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
match (*self).with_context(None, |s| s.write_message(item)) {
match (*self).with_context(None, |s| s.write(item)) {
Ok(()) => Ok(()),
Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// the message was accepted and queued
Expand All @@ -341,7 +341,7 @@ where
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.write_pending())).map(|r| {
(*self).with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())).map(|r| {
// WebSocket connection has just been closed. Flushing completed, not an error.
match r {
Err(WsError::ConnectionClosed) => Ok(()),
Expand All @@ -352,8 +352,8 @@ where

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let res = if self.closing {
// After queueing it, we call `write_pending` to drive the close handshake to completion.
(*self).with_context(Some((ContextWaker::Write, cx)), |s| s.write_pending())
// After queueing it, we call `flush` to drive the close handshake to completion.
(*self).with_context(Some((ContextWaker::Write, cx)), |s| s.flush())
} else {
(*self).with_context(Some((ContextWaker::Write, cx)), |s| s.close(None))
};
Expand Down

0 comments on commit 36b9d94

Please sign in to comment.