Skip to content

Commit

Permalink
transports/websocket: Handle websocket CLOSE with reason code (#2319)
Browse files Browse the repository at this point in the history
Pass websocket CLOSE reason to the app as an
`Incoming::Closed(soketto::CloseReason)`.

This PR is a companion to paritytech/soketto#31 and lets
applications know what the remote end claimed to be the reason for closing the
connection (if any).

`IncomingData` is now renamed to `Incoming` and the actual data (text or binary)
is moved into its own `Data` enum, which in turn allows `into_bytes()` and the
`AsRef` impl to apply more sanely to something that is actually data.

Co-authored-by: David Palm <dvdplm@gmail.com>
  • Loading branch information
mxinden and dvdplm committed Oct 30, 2021
1 parent 997cc9a commit b8f0e44
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 39 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -42,6 +42,11 @@

# `libp2p` facade crate

## Version 0.41.0 [unreleased]

- Update individual crates.
- `libp2p-websocket`

## Version 0.40.0-rc.3 [2021-10-27]

- Update individual crates.
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.40.0-rc.3"
version = "0.41.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -98,7 +98,7 @@ libp2p-deflate = { version = "0.30.0-rc.1", path = "transports/deflate", optiona
libp2p-dns = { version = "0.30.0-rc.1", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.32.0-rc.1", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.30.0-rc.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.31.0-rc.1", path = "transports/websocket", optional = true }
libp2p-websocket = { version = "0.32.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
@@ -1,3 +1,9 @@
# v0.32.0 [unreleased]

- Handle websocket CLOSE with reason code (see [PR 2085]).

[PR 2085]: https://github.com/libp2p/rust-libp2p/pull/2085

# 0.31.0-rc.1 [2021-10-15]

- Make default features of `libp2p-core` optional.
Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-websocket"
edition = "2018"
description = "WebSocket transport for libp2p"
version = "0.31.0-rc.1"
version = "0.32.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
85 changes: 52 additions & 33 deletions transports/websocket/src/framed.rs
Expand Up @@ -29,7 +29,11 @@ use libp2p_core::{
Transport,
};
use log::{debug, trace};
use soketto::{connection, extension::deflate::Deflate, handshake};
use soketto::{
connection::{self, CloseReason},
extension::deflate::Deflate,
handshake,
};
use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use url::Url;

Expand Down Expand Up @@ -472,55 +476,68 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {

/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<IncomingData, connection::Error>>,
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
_marker: std::marker::PhantomData<T>,
}

/// Data received over the websocket connection.
/// Data or control information received over the websocket connection.
#[derive(Debug, Clone)]
pub enum IncomingData {
/// Binary application data.
Binary(Vec<u8>),
/// UTF-8 encoded application data.
Text(Vec<u8>),
pub enum Incoming {
/// Application data.
Data(Data),
/// PONG control frame data.
Pong(Vec<u8>),
/// Close reason.
Closed(CloseReason),
}

/// Application data received over the websocket connection
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Data {
/// UTF-8 encoded textual data.
Text(Vec<u8>),
/// Binary data.
Binary(Vec<u8>),
}

impl IncomingData {
impl Data {
pub fn into_bytes(self) -> Vec<u8> {
match self {
Data::Text(d) => d,
Data::Binary(d) => d,
}
}
}

impl AsRef<[u8]> for Data {
fn as_ref(&self) -> &[u8] {
match self {
Data::Text(d) => d,
Data::Binary(d) => d,
}
}
}

impl Incoming {
pub fn is_data(&self) -> bool {
self.is_binary() || self.is_text()
}

pub fn is_binary(&self) -> bool {
matches!(self, IncomingData::Binary(_))
matches!(self, Incoming::Data(Data::Binary(_)))
}

pub fn is_text(&self) -> bool {
matches!(self, IncomingData::Text(_))
matches!(self, Incoming::Data(Data::Text(_)))
}

pub fn is_pong(&self) -> bool {
matches!(self, IncomingData::Pong(_))
}

pub fn into_bytes(self) -> Vec<u8> {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d,
}
matches!(self, Incoming::Pong(_))
}
}

impl AsRef<[u8]> for IncomingData {
fn as_ref(&self) -> &[u8] {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d,
}
pub fn is_close(&self) -> bool {
matches!(self, Incoming::Closed(_))
}
}

Expand Down Expand Up @@ -573,17 +590,19 @@ where
let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async {
match receiver.receive(&mut data).await {
Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => Some((
Ok(IncomingData::Text(mem::take(&mut data))),
Ok(Incoming::Data(Data::Text(mem::take(&mut data)))),
(data, receiver),
)),
Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => Some((
Ok(IncomingData::Binary(mem::take(&mut data))),
Ok(Incoming::Data(Data::Binary(mem::take(&mut data)))),
(data, receiver),
)),
Ok(soketto::Incoming::Pong(pong)) => {
Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver)))
Some((Ok(Incoming::Pong(Vec::from(pong))), (data, receiver)))
}
Ok(soketto::Incoming::Closed(reason)) => {
Some((Ok(Incoming::Closed(reason)), (data, receiver)))
}
Ok(soketto::Incoming::Closed(_)) => None,
Err(connection::Error::Closed) => None,
Err(e) => Some((Err(e), (data, receiver))),
}
Expand Down Expand Up @@ -615,7 +634,7 @@ impl<T> Stream for Connection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Item = io::Result<IncomingData>;
type Item = io::Result<Incoming>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.receiver.poll_next_unpin(cx));
Expand Down
6 changes: 3 additions & 3 deletions transports/websocket/src/lib.rs
Expand Up @@ -25,7 +25,7 @@ pub mod framed;
pub mod tls;

use error::Error;
use framed::Connection;
use framed::{Connection, Incoming};
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
use libp2p_core::{
multiaddr::Multiaddr,
Expand Down Expand Up @@ -166,8 +166,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
if item.is_data() {
return Poll::Ready(Some(Ok(item.into_bytes())));
if let Incoming::Data(payload) = item {
return Poll::Ready(Some(Ok(payload.into_bytes())));
}
} else {
return Poll::Ready(None);
Expand Down

0 comments on commit b8f0e44

Please sign in to comment.