Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/websocket: Handle websocket CLOSE with reason code #2085

Closed
2 changes: 1 addition & 1 deletion transports/websocket/Cargo.toml
Expand Up @@ -17,7 +17,7 @@ libp2p-core = { version = "0.29.0", path = "../../core" }
log = "0.4.8"
quicksink = "0.1"
rw-stream-sink = "0.2.0"
soketto = { version = "0.4.1", features = ["deflate"] }
soketto = { version = "0.5.0", features = ["deflate"] }
url = "2.1"
webpki-roots = "0.21"

Expand Down
83 changes: 49 additions & 34 deletions transports/websocket/src/framed.rs
Expand Up @@ -29,7 +29,7 @@ use libp2p_core::{
transport::{ListenerEvent, TransportError}
};
use log::{debug, trace};
use soketto::{connection, extension::deflate::Deflate, handshake};
use soketto::{connection::{self, CloseReason}, extension::deflate::Deflate, handshake};
use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use url::Url;

Expand Down Expand Up @@ -442,55 +442,68 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {

/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<IncomingData, connection::Error>>,
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
_marker: std::marker::PhantomData<T>
}

/// Data received over the websocket connection.
/// Data or control information received over the websocket connection.
#[derive(Debug, Clone)]
pub enum IncomingData {
/// Binary application data.
Binary(Vec<u8>),
/// UTF-8 encoded application data.
Text(Vec<u8>),
pub enum Incoming {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

/// Application data.
Data(Data),
/// PONG control frame data.
Pong(Vec<u8>)
Pong(Vec<u8>),
/// Close reason.
Closed(CloseReason),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, it is still a bit confusing why this is treated as an Event and thus returned via Incoming instead of being treated as an error and thus be returned via Error::Closed.

I am not familiar enough with soketto, thus I don't think this should block the effort.

Background for other readers: paritytech/soketto#29 (review).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear cut what the semantics should be. I went for the Error::Closed approach at first too, but @tomaka pointed out that the closing of the connection does carry information (a code in the simple case, but the specs allow for essentially any amount of "extra data").
The use case I have in mind for this is "collaborative backoff" where friendly peers can use the close reason to ask for respite, e.g. substrate telemetry asking nodes to send less data or go away for a while. In that context I think making the close reason an Error might be unfortunate.

Copy link
Member

@tomaka tomaka Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general consensus is that whenever a socket/connection produces an error, it means that the connection is no longer in a usable state. This isn't the case after receiving a WebSocket Closed frame. Receiving a closed frame is a normal situation, and the connection is still in a normal state and can be used to send more data or a Closed frame in return.

One drawback is: if Closed was instead an error, we would have a compile-time guarantee to not receive any other message after this closing. This guarantee can no longer be enforced by the compiler, but I think that this is fine.
EDIT: actually no, because the stream could continue to produce non-errors after having produced an error. We're not losing any compile-time guarantee.

}

/// Application data received over the websocket connection
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Data {
/// Textual data.
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
Text(Vec<u8>),
/// Binary data.
Binary(Vec<u8>)
}

impl Data {
pub fn into_bytes(self) -> Vec<u8> {
match self {
Data::Text(d) => d,
Data::Binary(d) => d
}
}
}

impl AsRef<[u8]> for Data {
fn as_ref(&self) -> &[u8] {
match self {
Data::Text(d) => d,
Data::Binary(d) => d
}
}
}

impl IncomingData {
impl Incoming {
pub fn is_data(&self) -> bool {
self.is_binary() || self.is_text()
}

pub fn is_binary(&self) -> bool {
if let IncomingData::Binary(_) = self { true } else { false }
if let Incoming::Data(Data::Binary(_)) = self { true } else { false }
}

pub fn is_text(&self) -> bool {
if let IncomingData::Text(_) = self { true } else { false }
if let Incoming::Data(Data::Text(_)) = self { true } else { false }
}

pub fn is_pong(&self) -> bool {
if let IncomingData::Pong(_) = self { true } else { false }
}

pub fn into_bytes(self) -> Vec<u8> {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d
}
if let Incoming::Pong(_) = self { true } else { false }
}
}

impl AsRef<[u8]> for IncomingData {
fn as_ref(&self) -> &[u8] {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d
}
pub fn is_close(&self) -> bool {
if let Incoming::Closed(_) = self { true } else { false }
}
}

Expand Down Expand Up @@ -543,15 +556,17 @@ where
let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async {
match receiver.receive(&mut data).await {
Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => {
Some((Ok(IncomingData::Text(mem::take(&mut data))), (data, receiver)))
Some((Ok(Incoming::Data(Data::Text(mem::take(&mut data)))), (data, receiver)))
}
Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => {
Some((Ok(IncomingData::Binary(mem::take(&mut data))), (data, receiver)))
Some((Ok(Incoming::Data(Data::Binary(mem::take(&mut data)))), (data, receiver)))
}
Ok(soketto::Incoming::Pong(pong)) => {
Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver)))
Some((Ok(Incoming::Pong(Vec::from(pong))), (data, receiver)))
}
Ok(soketto::Incoming::Closed(reason)) => {
Some((Ok(Incoming::Closed(reason)), (data, receiver)))
}
Err(connection::Error::Closed) => None,
Err(e) => Some((Err(e), (data, receiver)))
}
});
Expand Down Expand Up @@ -582,7 +597,7 @@ impl<T> Stream for Connection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Item = io::Result<IncomingData>;
type Item = io::Result<Incoming>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.receiver.poll_next_unpin(cx));
Expand Down
6 changes: 3 additions & 3 deletions transports/websocket/src/lib.rs
Expand Up @@ -25,7 +25,7 @@ pub mod framed;
pub mod tls;

use error::Error;
use framed::Connection;
use framed::{Connection, Incoming};
use futures::{future::BoxFuture, prelude::*, stream::BoxStream, ready};
use libp2p_core::{
ConnectedPoint,
Expand Down Expand Up @@ -157,8 +157,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
if item.is_data() {
return Poll::Ready(Some(Ok(item.into_bytes())))
if let Incoming::Data(payload) = item {
return Poll::Ready(Some(Ok(payload.into_bytes())))
}
} else {
return Poll::Ready(None)
Expand Down