From b8f0e44b27933808e19171a57550452b05464837 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 30 Oct 2021 12:19:22 +0200 Subject: [PATCH] transports/websocket: Handle websocket CLOSE with reason code (#2319) Pass websocket CLOSE reason to the app as an `Incoming::Closed(soketto::CloseReason)`. This PR is a companion to https://github.com/paritytech/soketto/pull/31 and lets applications know what the remote end claimed to be the reason for closing the connection (if any). `IncomingData` is now renamed to `Incoming` and the actual data (text or binary) is moved into its own `Data` enum, which in turn allows `into_bytes()` and the `AsRef` impl to apply more sanely to something that is actually data. Co-authored-by: David Palm --- CHANGELOG.md | 5 ++ Cargo.toml | 4 +- transports/websocket/CHANGELOG.md | 6 +++ transports/websocket/Cargo.toml | 2 +- transports/websocket/src/framed.rs | 85 ++++++++++++++++++------------ transports/websocket/src/lib.rs | 6 +-- 6 files changed, 69 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 47ad9b610c1..2bbc2429a2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,11 @@ # `libp2p` facade crate +## Version 0.41.0 [unreleased] + +- Update individual crates. + - `libp2p-websocket` + ## Version 0.40.0-rc.3 [2021-10-27] - Update individual crates. diff --git a/Cargo.toml b/Cargo.toml index fe41add1d34..9c9aedad01b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p" edition = "2018" description = "Peer-to-peer networking library" -version = "0.40.0-rc.3" +version = "0.41.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -98,7 +98,7 @@ libp2p-deflate = { version = "0.30.0-rc.1", path = "transports/deflate", optiona libp2p-dns = { version = "0.30.0-rc.1", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.32.0-rc.1", path = "protocols/mdns", optional = true } libp2p-tcp = { version = "0.30.0-rc.1", path = "transports/tcp", default-features = false, optional = true } -libp2p-websocket = { version = "0.31.0-rc.1", path = "transports/websocket", optional = true } +libp2p-websocket = { version = "0.32.0", path = "transports/websocket", optional = true } [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 37d40248370..ca561047866 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,9 @@ +# v0.32.0 [unreleased] + +- Handle websocket CLOSE with reason code (see [PR 2085]). + +[PR 2085]: https://github.com/libp2p/rust-libp2p/pull/2085 + # 0.31.0-rc.1 [2021-10-15] - Make default features of `libp2p-core` optional. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index b4a11d3934e..b7c7f8b32dc 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-websocket" edition = "2018" description = "WebSocket transport for libp2p" -version = "0.31.0-rc.1" +version = "0.32.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 02529036944..35253b3c04c 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -29,7 +29,11 @@ use libp2p_core::{ Transport, }; use log::{debug, trace}; -use soketto::{connection, extension::deflate::Deflate, handshake}; +use soketto::{ + connection::{self, CloseReason}, + extension::deflate::Deflate, + handshake, +}; use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use url::Url; @@ -472,55 +476,68 @@ fn location_to_multiaddr(location: &str) -> Result> { /// The websocket connection. pub struct Connection { - receiver: BoxStream<'static, Result>, + receiver: BoxStream<'static, Result>, sender: Pin + Send>>, _marker: std::marker::PhantomData, } -/// Data received over the websocket connection. +/// Data or control information received over the websocket connection. #[derive(Debug, Clone)] -pub enum IncomingData { - /// Binary application data. - Binary(Vec), - /// UTF-8 encoded application data. - Text(Vec), +pub enum Incoming { + /// Application data. + Data(Data), /// PONG control frame data. Pong(Vec), + /// Close reason. + Closed(CloseReason), +} + +/// Application data received over the websocket connection +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Data { + /// UTF-8 encoded textual data. + Text(Vec), + /// Binary data. + Binary(Vec), } -impl IncomingData { +impl Data { + pub fn into_bytes(self) -> Vec { + match self { + Data::Text(d) => d, + Data::Binary(d) => d, + } + } +} + +impl AsRef<[u8]> for Data { + fn as_ref(&self) -> &[u8] { + match self { + Data::Text(d) => d, + Data::Binary(d) => d, + } + } +} + +impl Incoming { pub fn is_data(&self) -> bool { self.is_binary() || self.is_text() } pub fn is_binary(&self) -> bool { - matches!(self, IncomingData::Binary(_)) + matches!(self, Incoming::Data(Data::Binary(_))) } pub fn is_text(&self) -> bool { - matches!(self, IncomingData::Text(_)) + matches!(self, Incoming::Data(Data::Text(_))) } pub fn is_pong(&self) -> bool { - matches!(self, IncomingData::Pong(_)) - } - - pub fn into_bytes(self) -> Vec { - match self { - IncomingData::Binary(d) => d, - IncomingData::Text(d) => d, - IncomingData::Pong(d) => d, - } + matches!(self, Incoming::Pong(_)) } -} -impl AsRef<[u8]> for IncomingData { - fn as_ref(&self) -> &[u8] { - match self { - IncomingData::Binary(d) => d, - IncomingData::Text(d) => d, - IncomingData::Pong(d) => d, - } + pub fn is_close(&self) -> bool { + matches!(self, Incoming::Closed(_)) } } @@ -573,17 +590,19 @@ where let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async { match receiver.receive(&mut data).await { Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => Some(( - Ok(IncomingData::Text(mem::take(&mut data))), + Ok(Incoming::Data(Data::Text(mem::take(&mut data)))), (data, receiver), )), Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => Some(( - Ok(IncomingData::Binary(mem::take(&mut data))), + Ok(Incoming::Data(Data::Binary(mem::take(&mut data)))), (data, receiver), )), Ok(soketto::Incoming::Pong(pong)) => { - Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver))) + Some((Ok(Incoming::Pong(Vec::from(pong))), (data, receiver))) + } + Ok(soketto::Incoming::Closed(reason)) => { + Some((Ok(Incoming::Closed(reason)), (data, receiver))) } - Ok(soketto::Incoming::Closed(_)) => None, Err(connection::Error::Closed) => None, Err(e) => Some((Err(e), (data, receiver))), } @@ -615,7 +634,7 @@ impl Stream for Connection where T: AsyncRead + AsyncWrite + Send + Unpin + 'static, { - type Item = io::Result; + type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let item = ready!(self.receiver.poll_next_unpin(cx)); diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 387aee7c72c..eb14f078d5d 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -25,7 +25,7 @@ pub mod framed; pub mod tls; use error::Error; -use framed::Connection; +use framed::{Connection, Incoming}; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use libp2p_core::{ multiaddr::Multiaddr, @@ -166,8 +166,8 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) { - if item.is_data() { - return Poll::Ready(Some(Ok(item.into_bytes()))); + if let Incoming::Data(payload) = item { + return Poll::Ready(Some(Ok(payload.into_bytes()))); } } else { return Poll::Ready(None);