From 8cf15470399d0f6965f4026f0d65ad9de1911b05 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Thu, 30 Jan 2020 15:46:35 +0100 Subject: [PATCH 1/8] tokio-tungstenite support --- Cargo.toml | 1 + src/filters/ws.rs | 174 ++++++++++++++++------------------------------ src/test.rs | 2 +- 3 files changed, 62 insertions(+), 115 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22cbb7cf4..6630dca9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ tower-service = "0.3" rustls = { version = "0.16", optional = true } # tls is enabled by default, we don't want that yet tungstenite = { default-features = false, version = "0.9", optional = true } +tokio-tungstenite = { git = "https://github.com/snapview/tokio-tungstenite", branch = "master" } urlencoding = "1.0.0" pin-project = "0.4.5" diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 7b707e858..6ae202aee 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -3,20 +3,19 @@ use std::borrow::Cow; use std::fmt; use std::future::Future; -use std::io::{self, Read, Write}; +use std::io::{self}; use std::pin::Pin; -use std::ptr::null_mut; use std::task::{Context, Poll}; use super::{body, header}; use crate::filter::{Filter, One}; use crate::reject::Rejection; use crate::reply::{Reply, Response}; -use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; +use futures::{future, FutureExt, Stream, Sink}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; -use tokio::io::{AsyncRead, AsyncWrite}; use tungstenite::protocol::{self, WebSocketConfig}; +use tokio_tungstenite::WebSocketStream; /// Creates a Websocket Filter. /// @@ -130,24 +129,20 @@ where .ws .body .on_upgrade() - .and_then(move |upgraded| { + .then(move |upgraded| { log::trace!("websocket upgrade complete"); - - let io = protocol::WebSocket::from_raw_socket( - AllowStd { - inner: upgraded, - context: (true, null_mut()), - }, - protocol::Role::Server, - config, - ); - - on_upgrade(WebSocket { inner: io }).map(Ok) + let socket = WebSocketStream::from_raw_socket(upgraded.unwrap(), protocol::Role::Server, config); + socket + }) + .then(move |socket| { + on_upgrade(WebSocket { inner: socket }) }) - .map(|result| { + .map(|_result| { + /* if let Err(err) = result { log::debug!("ws upgrade error: {}", err); } + */ }); ::tokio::task::spawn(fut); @@ -164,72 +159,13 @@ where } } + /// A websocket `Stream` and `Sink`, provided to `ws` filters. pub struct WebSocket { - inner: protocol::WebSocket, -} - -/// wrapper around hyper Upgraded to allow Read/write from tungstenite's WebSocket -#[derive(Debug)] -pub(crate) struct AllowStd { - inner: ::hyper::upgrade::Upgraded, - context: (bool, *mut ()), -} - -struct Guard<'a>(&'a mut WebSocket); - -impl Drop for Guard<'_> { - fn drop(&mut self) { - (self.0).inner.get_mut().context = (true, null_mut()); - } -} - -// *mut () context is neither Send nor Sync -unsafe impl Send for AllowStd {} -unsafe impl Sync for AllowStd {} - -impl AllowStd { - fn with_context(&mut self, f: F) -> Poll> - where - F: FnOnce(&mut Context<'_>, Pin<&mut ::hyper::upgrade::Upgraded>) -> Poll>, - { - unsafe { - if !self.context.0 { - //was called by start_send without context - return Poll::Pending; - } - assert!(!self.context.1.is_null()); - let waker = &mut *(self.context.1 as *mut _); - f(waker, Pin::new(&mut self.inner)) - } - } -} - -impl Read for AllowStd { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match self.with_context(|ctx, stream| stream.poll_read(ctx, buf)) { - Poll::Ready(r) => r, - Poll::Pending => Err(io::Error::from(io::ErrorKind::WouldBlock)), - } - } -} - -impl Write for AllowStd { - fn write(&mut self, buf: &[u8]) -> io::Result { - match self.with_context(|ctx, stream| stream.poll_write(ctx, buf)) { - Poll::Ready(r) => r, - Poll::Pending => Err(io::Error::from(io::ErrorKind::WouldBlock)), - } - } - - fn flush(&mut self) -> io::Result<()> { - match self.with_context(|ctx, stream| stream.poll_flush(ctx)) { - Poll::Ready(r) => r, - Poll::Pending => Err(io::Error::from(io::ErrorKind::WouldBlock)), - } - } + inner: WebSocketStream, } +/* fn cvt(r: tungstenite::error::Result, err_message: &str) -> Poll> { match r { Ok(v) => Poll::Ready(Ok(v)), @@ -242,36 +178,28 @@ fn cvt(r: tungstenite::error::Result, err_message: &str) -> Poll, + config: Option ) -> Self { - let ws = protocol::WebSocket::from_raw_socket( - AllowStd { - inner, - context: (false, null_mut()), - }, + WebSocketStream::from_raw_socket( + upgraded, role, - config, - ); - - WebSocket { inner: ws } + config + ).map(|inner| { + WebSocket { inner } + }).await } fn with_context(&mut self, ctx: Option<&mut Context<'_>>, f: F) -> R where - F: FnOnce(&mut protocol::WebSocket) -> R, + F: FnOnce(Pin<&mut WebSocketStream>, Option<&mut Context<'_>>) -> R, { - self.inner.get_mut().context = match ctx { - Some(ctx) => (true, ctx as *mut _ as *mut ()), - None => (false, null_mut()), - }; - - let g = Guard(self); - f(&mut (g.0).inner) + f(Pin::new(&mut self.inner), ctx) } /// Gracefully close this websocket. @@ -285,21 +213,28 @@ impl Stream for WebSocket { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { - let msg = match (*self).with_context(Some(cx), |s| s.read_message()) { - Ok(item) => item, - Err(::tungstenite::Error::Io(ref err)) + let msg = match (*self).with_context(Some(cx), |s, cx| s.poll_next(cx.unwrap())) { + Poll::Ready(Some(Ok(item))) => item, + Poll::Ready(Some(Err(::tungstenite::Error::Io(ref err)))) if err.kind() == io::ErrorKind::WouldBlock => { return Poll::Pending; } - Err(::tungstenite::Error::ConnectionClosed) => { + Poll::Ready(Some(Err(::tungstenite::Error::ConnectionClosed))) => { log::trace!("websocket closed"); return Poll::Ready(None); } - Err(e) => { + Poll::Ready(Some(Err(e))) => { log::debug!("websocket poll error: {}", e); return Poll::Ready(Some(Err(crate::Error::new(e)))); } + Poll::Ready(None) => { + log::trace!("websocket closed"); + return Poll::Ready(None); + }, + Poll::Pending => { + return Poll::Pending; + } }; match msg { @@ -321,8 +256,13 @@ impl Sink for WebSocket { type Error = crate::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - (*self).with_context(Some(cx), |s| { - cvt(s.write_pending(), "websocket poll_ready error") + (*self).with_context(Some(cx), |s, cx| { + match s.poll_ready(cx.unwrap()) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), + Poll::Pending => Poll::Pending + } + // cvt(s.write_pending(), "websocket poll_ready error") }) } @@ -337,7 +277,7 @@ impl Sink for WebSocket { return Ok(()); } - match self.with_context(None, |s| s.write_message(item.inner)) { + match self.with_context(None, |s, _| s.start_send(item.inner)) { Ok(()) => Ok(()), // Err(::tungstenite::Error::SendQueueFull(inner)) => { // log::debug!("websocket send queue full"); @@ -356,19 +296,24 @@ impl Sink for WebSocket { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.with_context(Some(cx), |s| { - cvt(s.write_pending(), "websocket poll_flush error") + self.with_context(Some(cx), |s, cx| { + //cvt(s.write_pending(), "websocket poll_flush error") + match s.poll_flush(cx.unwrap()) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), + Poll::Pending => Poll::Pending + } }) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.with_context(Some(cx), |s| s.close(None)) { - Ok(()) => Poll::Ready(Ok(())), - Err(::tungstenite::Error::ConnectionClosed) => Poll::Ready(Ok(())), - Err(err) => { + match self.with_context(Some(cx), |s, cx| s.poll_close(cx.unwrap())) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(err)) => { log::debug!("websocket close error: {}", err); Poll::Ready(Err(crate::Error::new(err))) - } + }, + Poll::Pending => Poll::Pending } } } @@ -379,6 +324,7 @@ impl fmt::Debug for WebSocket { } } + /// A WebSocket message. /// /// Only repesents Text and Binary messages. diff --git a/src/test.rs b/src/test.rs index 7076ab81c..50825b5ab 100644 --- a/src/test.rs +++ b/src/test.rs @@ -509,7 +509,7 @@ impl WsBuilder { upgraded, protocol::Role::Client, Default::default(), - ); + ).await; let (tx, rx) = ws.split(); let write = wr_rx.map(Ok).forward(tx).map(|_| ()); From 429dc51710ce6a3ac2775990ac2be45b47c157e2 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Thu, 30 Jan 2020 18:19:30 +0100 Subject: [PATCH 2/8] debug message on upgrade failed --- src/filters/ws.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/filters/ws.rs b/src/filters/ws.rs index c475fcd30..633bba3d9 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -11,7 +11,7 @@ use super::{body, header}; use crate::filter::{Filter, One}; use crate::reject::Rejection; use crate::reply::{Reply, Response}; -use futures::{future, FutureExt, Stream, Sink}; +use futures::{future, FutureExt, Stream, Sink, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; use tungstenite::protocol::{self, WebSocketConfig}; @@ -129,20 +129,21 @@ where .ws .body .on_upgrade() - .then(move |upgraded| { + .and_then(move |upgraded| { log::trace!("websocket upgrade complete"); - let socket = WebSocketStream::from_raw_socket(upgraded.unwrap(), protocol::Role::Server, config); - socket + WebSocket::from_raw_socket( + upgraded, + protocol::Role::Server, + config + ).map(Ok) }) - .then(move |socket| { - on_upgrade(WebSocket { inner: socket }) + .and_then(move |socket| { + on_upgrade(socket).map(Ok) }) - .map(|_result| { - /* + .map(|result| { if let Err(err) = result { log::debug!("ws upgrade error: {}", err); } - */ }); ::tokio::task::spawn(fut); From 1f50e5fd52fcf539e989528dea71fe4a8aacf903 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Thu, 30 Jan 2020 18:25:44 +0100 Subject: [PATCH 3/8] formatting --- src/filters/ws.rs | 54 ++++++++++++++++++----------------------------- src/test.rs | 3 ++- 2 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 633bba3d9..5c985ed54 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -11,11 +11,11 @@ use super::{body, header}; use crate::filter::{Filter, One}; use crate::reject::Rejection; use crate::reply::{Reply, Response}; -use futures::{future, FutureExt, Stream, Sink, TryFutureExt}; +use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; -use tungstenite::protocol::{self, WebSocketConfig}; use tokio_tungstenite::WebSocketStream; +use tungstenite::protocol::{self, WebSocketConfig}; /// Creates a Websocket Filter. /// @@ -131,15 +131,9 @@ where .on_upgrade() .and_then(move |upgraded| { log::trace!("websocket upgrade complete"); - WebSocket::from_raw_socket( - upgraded, - protocol::Role::Server, - config - ).map(Ok) - }) - .and_then(move |socket| { - on_upgrade(socket).map(Ok) + WebSocket::from_raw_socket(upgraded, protocol::Role::Server, config).map(Ok) }) + .and_then(move |socket| on_upgrade(socket).map(Ok)) .map(|result| { if let Err(err) = result { log::debug!("ws upgrade error: {}", err); @@ -160,7 +154,6 @@ where } } - /// A websocket `Stream` and `Sink`, provided to `ws` filters. pub struct WebSocket { inner: WebSocketStream, @@ -185,20 +178,19 @@ impl WebSocket { pub(crate) async fn from_raw_socket( upgraded: hyper::upgrade::Upgraded, role: protocol::Role, - config: Option + config: Option, ) -> Self { - WebSocketStream::from_raw_socket( - upgraded, - role, - config - ).map(|inner| { - WebSocket { inner } - }).await + WebSocketStream::from_raw_socket(upgraded, role, config) + .map(|inner| WebSocket { inner }) + .await } fn with_context(&mut self, ctx: Option<&mut Context<'_>>, f: F) -> R where - F: FnOnce(Pin<&mut WebSocketStream>, Option<&mut Context<'_>>) -> R, + F: FnOnce( + Pin<&mut WebSocketStream>, + Option<&mut Context<'_>>, + ) -> R, { f(Pin::new(&mut self.inner), ctx) } @@ -214,12 +206,11 @@ impl Stream for WebSocket { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match (*self).with_context(Some(cx), |s, cx| s.poll_next(cx.unwrap())) { - Poll::Ready(Some(Ok(item))) => { - Poll::Ready(Some(Ok(Message { inner: item }))) - } + Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(Message { inner: item }))), Poll::Ready(Some(Err(::tungstenite::Error::Io(ref err)))) - if err.kind() == io::ErrorKind::WouldBlock => { - Poll::Pending + if err.kind() == io::ErrorKind::WouldBlock => + { + Poll::Pending } Poll::Ready(Some(Err(::tungstenite::Error::ConnectionClosed))) => { log::trace!("websocket closed"); @@ -232,10 +223,8 @@ impl Stream for WebSocket { Poll::Ready(None) => { log::trace!("websocket closed"); Poll::Ready(None) - }, - Poll::Pending => { - Poll::Pending } + Poll::Pending => Poll::Pending, } } } @@ -248,7 +237,7 @@ impl Sink for WebSocket { match s.poll_ready(cx.unwrap()) { Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending + Poll::Pending => Poll::Pending, } // cvt(s.write_pending(), "websocket poll_ready error") }) @@ -279,7 +268,7 @@ impl Sink for WebSocket { match s.poll_flush(cx.unwrap()) { Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending + Poll::Pending => Poll::Pending, } }) } @@ -290,8 +279,8 @@ impl Sink for WebSocket { Poll::Ready(Err(err)) => { log::debug!("websocket close error: {}", err); Poll::Ready(Err(crate::Error::new(err))) - }, - Poll::Pending => Poll::Pending + } + Poll::Pending => Poll::Pending, } } } @@ -302,7 +291,6 @@ impl fmt::Debug for WebSocket { } } - /// A WebSocket message. /// /// Only repesents Text and Binary messages. diff --git a/src/test.rs b/src/test.rs index 216d3bd86..6f166526a 100644 --- a/src/test.rs +++ b/src/test.rs @@ -509,7 +509,8 @@ impl WsBuilder { upgraded, protocol::Role::Client, Default::default(), - ).await; + ) + .await; let (tx, rx) = ws.split(); let write = wr_rx.map(Ok).forward(tx).map(|_| ()); From 1609b288a3c305610692cb48d0ad0fe4e60f4055 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Fri, 31 Jan 2020 10:52:58 +0100 Subject: [PATCH 4/8] replaced tungstenite dependency with tokio-tungstenite v0.10 --- Cargo.toml | 5 ++--- src/filters/ws.rs | 7 ++++--- src/test.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 223cd08b0..7293f62f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,7 @@ tokio = { version = "0.2", features = ["blocking", "fs", "stream", "sync", "time tower-service = "0.3" rustls = { version = "0.16", optional = true } # tls is enabled by default, we don't want that yet -tungstenite = { default-features = false, version = "0.9", optional = true } -tokio-tungstenite = { git = "https://github.com/snapview/tokio-tungstenite", branch = "master" } +tokio-tungstenite = { version = "0.10", default-features = false, optional = true } urlencoding = "1.0.0" pin-project = "0.4.5" @@ -47,7 +46,7 @@ tokio = { version = "0.2", features = ["macros"] } [features] default = ["multipart", "websocket"] -websocket = ["tungstenite"] +websocket = ["tokio-tungstenite"] tls = ["rustls"] [profile.release] diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 5c985ed54..ced4067cb 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -14,6 +14,7 @@ use crate::reply::{Reply, Response}; use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; +use tokio_tungstenite::tungstenite; use tokio_tungstenite::WebSocketStream; use tungstenite::protocol::{self, WebSocketConfig}; @@ -207,12 +208,12 @@ impl Stream for WebSocket { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match (*self).with_context(Some(cx), |s, cx| s.poll_next(cx.unwrap())) { Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(Message { inner: item }))), - Poll::Ready(Some(Err(::tungstenite::Error::Io(ref err)))) + Poll::Ready(Some(Err(tungstenite::Error::Io(ref err)))) if err.kind() == io::ErrorKind::WouldBlock => { Poll::Pending } - Poll::Ready(Some(Err(::tungstenite::Error::ConnectionClosed))) => { + Poll::Ready(Some(Err(tungstenite::Error::ConnectionClosed))) => { log::trace!("websocket closed"); Poll::Ready(None) } @@ -250,7 +251,7 @@ impl Sink for WebSocket { // log::debug!("websocket send queue full"); // Err(::tungstenite::Error::SendQueueFull(inner)) // } - Err(::tungstenite::Error::Io(ref err)) if err.kind() == io::ErrorKind::WouldBlock => { + Err(tungstenite::Error::Io(ref err)) if err.kind() == io::ErrorKind::WouldBlock => { // the message was accepted and queued // isn't an error. Ok(()) diff --git a/src/test.rs b/src/test.rs index 6f166526a..bcb002369 100644 --- a/src/test.rs +++ b/src/test.rs @@ -469,7 +469,7 @@ impl WsBuilder { let (rd_tx, rd_rx) = mpsc::unbounded_channel(); tokio::spawn(async move { - use tungstenite::protocol; + use tokio_tungstenite::tungstenite::protocol; let (addr, srv) = crate::serve(f).bind_ephemeral(([127, 0, 0, 1], 0)); From 7e18f1d584cd8f8f0b637bb041aabd40ea1c37e3 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Fri, 31 Jan 2020 15:10:15 +0100 Subject: [PATCH 5/8] removed WebSocket::with_context and WouldBolck check --- src/filters/ws.rs | 73 +++++++++++------------------------------------ 1 file changed, 17 insertions(+), 56 deletions(-) diff --git a/src/filters/ws.rs b/src/filters/ws.rs index ced4067cb..55c0dcb44 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -14,9 +14,10 @@ use crate::reply::{Reply, Response}; use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; -use tokio_tungstenite::tungstenite; -use tokio_tungstenite::WebSocketStream; -use tungstenite::protocol::{self, WebSocketConfig}; +use tokio_tungstenite::{ + WebSocketStream, + tungstenite::{self, protocol::{self, WebSocketConfig}} +}; /// Creates a Websocket Filter. /// @@ -160,21 +161,6 @@ pub struct WebSocket { inner: WebSocketStream, } -/* -fn cvt(r: tungstenite::error::Result, err_message: &str) -> Poll> { - match r { - Ok(v) => Poll::Ready(Ok(v)), - Err(tungstenite::Error::Io(ref e)) if e.kind() == io::ErrorKind::WouldBlock => { - Poll::Pending - } - Err(e) => { - log::debug!("{} {}", err_message, e); - Poll::Ready(Err(crate::Error::new(e))) - } - } -} -*/ - impl WebSocket { pub(crate) async fn from_raw_socket( upgraded: hyper::upgrade::Upgraded, @@ -186,16 +172,6 @@ impl WebSocket { .await } - fn with_context(&mut self, ctx: Option<&mut Context<'_>>, f: F) -> R - where - F: FnOnce( - Pin<&mut WebSocketStream>, - Option<&mut Context<'_>>, - ) -> R, - { - f(Pin::new(&mut self.inner), ctx) - } - /// Gracefully close this websocket. pub async fn close(mut self) -> Result<(), crate::Error> { future::poll_fn(|cx| Pin::new(&mut self).poll_close(cx)).await @@ -206,7 +182,7 @@ impl Stream for WebSocket { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match (*self).with_context(Some(cx), |s, cx| s.poll_next(cx.unwrap())) { + match Pin::new(&mut self.inner).poll_next(cx) { Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(Message { inner: item }))), Poll::Ready(Some(Err(tungstenite::Error::Io(ref err)))) if err.kind() == io::ErrorKind::WouldBlock => @@ -234,28 +210,16 @@ impl Sink for WebSocket { type Error = crate::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - (*self).with_context(Some(cx), |s, cx| { - match s.poll_ready(cx.unwrap()) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending, - } - // cvt(s.write_pending(), "websocket poll_ready error") - }) + match Pin::new(&mut self.inner).poll_ready(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), + Poll::Pending => Poll::Pending, + } } fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> { - match self.with_context(None, |s, _| s.start_send(item.inner)) { + match Pin::new(&mut self.inner).start_send(item.inner) { Ok(()) => Ok(()), - // Err(::tungstenite::Error::SendQueueFull(inner)) => { - // log::debug!("websocket send queue full"); - // Err(::tungstenite::Error::SendQueueFull(inner)) - // } - Err(tungstenite::Error::Io(ref err)) if err.kind() == io::ErrorKind::WouldBlock => { - // the message was accepted and queued - // isn't an error. - Ok(()) - } Err(e) => { log::debug!("websocket start_send error: {}", e); Err(crate::Error::new(e)) @@ -264,18 +228,15 @@ impl Sink for WebSocket { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.with_context(Some(cx), |s, cx| { - //cvt(s.write_pending(), "websocket poll_flush error") - match s.poll_flush(cx.unwrap()) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending, - } - }) + match Pin::new(&mut self.inner).poll_flush(cx) { + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), + Poll::Pending => Poll::Pending, + } } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match self.with_context(Some(cx), |s, cx| s.poll_close(cx.unwrap())) { + match Pin::new(&mut self.inner).poll_close(cx) { Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(err)) => { log::debug!("websocket close error: {}", err); From 90154412da3cea97bae2a2049e592f28f71685da Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Fri, 31 Jan 2020 15:10:44 +0100 Subject: [PATCH 6/8] formatting --- examples/rejections.rs | 12 +++++------- examples/sse_chat.rs | 8 ++++---- src/filters/body.rs | 16 ++++++++-------- src/filters/fs.rs | 24 +++++++++++------------- src/filters/ws.rs | 5 ++++- tests/filter.rs | 8 +++----- 6 files changed, 35 insertions(+), 38 deletions(-) diff --git a/examples/rejections.rs b/examples/rejections.rs index 3ccc51167..286b7772b 100644 --- a/examples/rejections.rs +++ b/examples/rejections.rs @@ -27,13 +27,11 @@ async fn main() { /// Extract a denominator from a "div-by" header, or reject with DivideByZero. fn div_by() -> impl Filter + Copy { - warp::header::("div-by").and_then(|n: u16| { - async move { - if let Some(denom) = NonZeroU16::new(n) { - Ok(denom) - } else { - Err(reject::custom(DivideByZero)) - } + warp::header::("div-by").and_then(|n: u16| async move { + if let Some(denom) = NonZeroU16::new(n) { + Ok(denom) + } else { + Err(reject::custom(DivideByZero)) } }) } diff --git a/examples/sse_chat.rs b/examples/sse_chat.rs index ffb3064ed..693a87f55 100644 --- a/examples/sse_chat.rs +++ b/examples/sse_chat.rs @@ -22,13 +22,13 @@ async fn main() { .and(warp::post()) .and(warp::path::param::()) .and(warp::body::content_length_limit(500)) - .and(warp::body::bytes().and_then(|body: bytes::Bytes| { - async move { + .and( + warp::body::bytes().and_then(|body: bytes::Bytes| async move { std::str::from_utf8(&body) .map(String::from) .map_err(|_e| warp::reject::custom(NotUtf8)) - } - })) + }), + ) .and(users.clone()) .map(|my_id, msg, users| { user_message(my_id, msg, &users); diff --git a/src/filters/body.rs b/src/filters/body.rs index 7b57442e1..771ddce48 100644 --- a/src/filters/body.rs +++ b/src/filters/body.rs @@ -171,14 +171,14 @@ pub fn aggregate() -> impl Filter + Co /// }); /// ``` pub fn json() -> impl Filter + Copy { - is_content_type::().and(aggregate()).and_then(|buf| { - async move { + is_content_type::() + .and(aggregate()) + .and_then(|buf| async move { Json::decode(buf).map_err(|err| { log::debug!("request json body error: {}", err); reject::known(BodyDeserializeError { cause: err }) }) - } - }) + }) } /// Returns a `Filter` that matches any request and extracts a @@ -206,14 +206,14 @@ pub fn json() -> impl Filter() -> impl Filter + Copy { - is_content_type::
().and(aggregate()).and_then(|buf| { - async move { + is_content_type::() + .and(aggregate()) + .and_then(|buf| async move { Form::decode(buf).map_err(|err| { log::debug!("request form body error: {}", err); reject::known(BodyDeserializeError { cause: err }) }) - } - }) + }) } // ===== Decoders ===== diff --git a/src/filters/fs.rs b/src/filters/fs.rs index bba22745f..c79725c68 100644 --- a/src/filters/fs.rs +++ b/src/filters/fs.rs @@ -89,20 +89,18 @@ fn path_from_tail( base: Arc, ) -> impl FilterClone, Error = Rejection> { crate::path::tail().and_then(move |tail: crate::path::Tail| { - future::ready(sanitize_path(base.as_ref(), tail.as_str())).and_then(|mut buf| { - async { - let is_dir = tokio::fs::metadata(buf.clone()) - .await - .map(|m| m.is_dir()) - .unwrap_or(false); - - if is_dir { - log::debug!("dir: appending index.html to directory path"); - buf.push("index.html"); - } - log::trace!("dir: {:?}", buf); - Ok(ArcPath(Arc::new(buf))) + future::ready(sanitize_path(base.as_ref(), tail.as_str())).and_then(|mut buf| async { + let is_dir = tokio::fs::metadata(buf.clone()) + .await + .map(|m| m.is_dir()) + .unwrap_or(false); + + if is_dir { + log::debug!("dir: appending index.html to directory path"); + buf.push("index.html"); } + log::trace!("dir: {:?}", buf); + Ok(ArcPath(Arc::new(buf))) }) }) } diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 55c0dcb44..93baf82c4 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -15,8 +15,11 @@ use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; use tokio_tungstenite::{ + tungstenite::{ + self, + protocol::{self, WebSocketConfig}, + }, WebSocketStream, - tungstenite::{self, protocol::{self, WebSocketConfig}} }; /// Creates a Websocket Filter. diff --git a/tests/filter.rs b/tests/filter.rs index d200db173..5f6941884 100644 --- a/tests/filter.rs +++ b/tests/filter.rs @@ -149,11 +149,9 @@ async fn unify() { #[should_panic] #[tokio::test] async fn nested() { - let f = warp::any().and_then(|| { - async { - let p = warp::path::param::(); - warp::test::request().filter(&p).await - } + let f = warp::any().and_then(|| async { + let p = warp::path::param::(); + warp::test::request().filter(&p).await }); let _ = warp::test::request().filter(&f).await; From 4cb1f1a57a9f14379af7f277e2e05e39a74033ca Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Fri, 31 Jan 2020 17:42:31 +0100 Subject: [PATCH 7/8] removed WouldBlock check and ConnectionClosed check --- src/filters/ws.rs | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 93baf82c4..64cda41d4 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -3,7 +3,6 @@ use std::borrow::Cow; use std::fmt; use std::future::Future; -use std::io::{self}; use std::pin::Pin; use std::task::{Context, Poll}; @@ -15,10 +14,7 @@ use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; use tokio_tungstenite::{ - tungstenite::{ - self, - protocol::{self, WebSocketConfig}, - }, + tungstenite::protocol::{self, WebSocketConfig}, WebSocketStream, }; @@ -187,15 +183,6 @@ impl Stream for WebSocket { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match Pin::new(&mut self.inner).poll_next(cx) { Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(Message { inner: item }))), - Poll::Ready(Some(Err(tungstenite::Error::Io(ref err)))) - if err.kind() == io::ErrorKind::WouldBlock => - { - Poll::Pending - } - Poll::Ready(Some(Err(tungstenite::Error::ConnectionClosed))) => { - log::trace!("websocket closed"); - Poll::Ready(None) - } Poll::Ready(Some(Err(e))) => { log::debug!("websocket poll error: {}", e); Poll::Ready(Some(Err(crate::Error::new(e)))) From 4ea1096e37c3c6608225b023459704643a794424 Mon Sep 17 00:00:00 2001 From: Alex Covizzi Date: Fri, 31 Jan 2020 19:29:21 +0100 Subject: [PATCH 8/8] call ready! on Pin::new.poll --- src/filters/ws.rs | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/src/filters/ws.rs b/src/filters/ws.rs index 64cda41d4..2dd8c8d9d 100644 --- a/src/filters/ws.rs +++ b/src/filters/ws.rs @@ -10,7 +10,7 @@ use super::{body, header}; use crate::filter::{Filter, One}; use crate::reject::Rejection; use crate::reply::{Reply, Response}; -use futures::{future, FutureExt, Sink, Stream, TryFutureExt}; +use futures::{future, ready, FutureExt, Sink, Stream, TryFutureExt}; use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade}; use http; use tokio_tungstenite::{ @@ -181,17 +181,16 @@ impl Stream for WebSocket { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match Pin::new(&mut self.inner).poll_next(cx) { - Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(Ok(Message { inner: item }))), - Poll::Ready(Some(Err(e))) => { + match ready!(Pin::new(&mut self.inner).poll_next(cx)) { + Some(Ok(item)) => Poll::Ready(Some(Ok(Message { inner: item }))), + Some(Err(e)) => { log::debug!("websocket poll error: {}", e); Poll::Ready(Some(Err(crate::Error::new(e)))) } - Poll::Ready(None) => { + None => { log::trace!("websocket closed"); Poll::Ready(None) } - Poll::Pending => Poll::Pending, } } } @@ -200,10 +199,9 @@ impl Sink for WebSocket { type Error = crate::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.inner).poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending, + match ready!(Pin::new(&mut self.inner).poll_ready(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(crate::Error::new(e))), } } @@ -218,21 +216,19 @@ impl Sink for WebSocket { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match Pin::new(&mut self.inner).poll_flush(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(crate::Error::new(e))), - Poll::Pending => Poll::Pending, + match ready!(Pin::new(&mut self.inner).poll_flush(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(e) => Poll::Ready(Err(crate::Error::new(e))), } } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match Pin::new(&mut self.inner).poll_close(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => { + match ready!(Pin::new(&mut self.inner).poll_close(cx)) { + Ok(()) => Poll::Ready(Ok(())), + Err(err) => { log::debug!("websocket close error: {}", err); Poll::Ready(Err(crate::Error::new(err))) } - Poll::Pending => Poll::Pending, } } }