Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transports/websocket: Handle websocket CLOSE with reason code #2319

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Expand Up @@ -42,6 +42,11 @@

# `libp2p` facade crate

## Version 0.41.0 [unreleased]

- Update individual crates.
- `libp2p-websocket`

## Version 0.40.0-rc.3 [2021-10-27]

- Update individual crates.
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p"
edition = "2018"
description = "Peer-to-peer networking library"
version = "0.40.0-rc.3"
version = "0.41.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -98,7 +98,7 @@ libp2p-deflate = { version = "0.30.0-rc.1", path = "transports/deflate", optiona
libp2p-dns = { version = "0.30.0-rc.1", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.32.0-rc.1", path = "protocols/mdns", optional = true }
libp2p-tcp = { version = "0.30.0-rc.1", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.31.0-rc.1", path = "transports/websocket", optional = true }
libp2p-websocket = { version = "0.32.0", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
@@ -1,3 +1,9 @@
# v0.32.0 [unreleased]

- Handle websocket CLOSE with reason code (see [PR 2085]).

[PR 2085]: https://github.com/libp2p/rust-libp2p/pull/2085

# 0.31.0-rc.1 [2021-10-15]

- Make default features of `libp2p-core` optional.
Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/Cargo.toml
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-websocket"
edition = "2018"
description = "WebSocket transport for libp2p"
version = "0.31.0-rc.1"
version = "0.32.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
85 changes: 52 additions & 33 deletions transports/websocket/src/framed.rs
Expand Up @@ -29,7 +29,11 @@ use libp2p_core::{
Transport,
};
use log::{debug, trace};
use soketto::{connection, extension::deflate::Deflate, handshake};
use soketto::{
connection::{self, CloseReason},
extension::deflate::Deflate,
handshake,
};
use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll};
use url::Url;

Expand Down Expand Up @@ -472,55 +476,68 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {

/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<IncomingData, connection::Error>>,
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
_marker: std::marker::PhantomData<T>,
}

/// Data received over the websocket connection.
/// Data or control information received over the websocket connection.
#[derive(Debug, Clone)]
pub enum IncomingData {
/// Binary application data.
Binary(Vec<u8>),
/// UTF-8 encoded application data.
Text(Vec<u8>),
pub enum Incoming {
/// Application data.
Data(Data),
/// PONG control frame data.
Pong(Vec<u8>),
/// Close reason.
Closed(CloseReason),
}

/// Application data received over the websocket connection
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Data {
/// UTF-8 encoded textual data.
Text(Vec<u8>),
/// Binary data.
Binary(Vec<u8>),
}

impl IncomingData {
impl Data {
pub fn into_bytes(self) -> Vec<u8> {
match self {
Data::Text(d) => d,
Data::Binary(d) => d,
}
}
}

impl AsRef<[u8]> for Data {
fn as_ref(&self) -> &[u8] {
match self {
Data::Text(d) => d,
Data::Binary(d) => d,
}
}
}

impl Incoming {
pub fn is_data(&self) -> bool {
self.is_binary() || self.is_text()
}

pub fn is_binary(&self) -> bool {
matches!(self, IncomingData::Binary(_))
matches!(self, Incoming::Data(Data::Binary(_)))
}

pub fn is_text(&self) -> bool {
matches!(self, IncomingData::Text(_))
matches!(self, Incoming::Data(Data::Text(_)))
}

pub fn is_pong(&self) -> bool {
matches!(self, IncomingData::Pong(_))
}

pub fn into_bytes(self) -> Vec<u8> {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d,
}
matches!(self, Incoming::Pong(_))
}
}

impl AsRef<[u8]> for IncomingData {
fn as_ref(&self) -> &[u8] {
match self {
IncomingData::Binary(d) => d,
IncomingData::Text(d) => d,
IncomingData::Pong(d) => d,
}
pub fn is_close(&self) -> bool {
matches!(self, Incoming::Closed(_))
}
}

Expand Down Expand Up @@ -573,17 +590,19 @@ where
let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async {
match receiver.receive(&mut data).await {
Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => Some((
Ok(IncomingData::Text(mem::take(&mut data))),
Ok(Incoming::Data(Data::Text(mem::take(&mut data)))),
(data, receiver),
)),
Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => Some((
Ok(IncomingData::Binary(mem::take(&mut data))),
Ok(Incoming::Data(Data::Binary(mem::take(&mut data)))),
(data, receiver),
)),
Ok(soketto::Incoming::Pong(pong)) => {
Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver)))
Some((Ok(Incoming::Pong(Vec::from(pong))), (data, receiver)))
}
Ok(soketto::Incoming::Closed(reason)) => {
Some((Ok(Incoming::Closed(reason)), (data, receiver)))
}
Ok(soketto::Incoming::Closed(_)) => None,
Err(connection::Error::Closed) => None,
Err(e) => Some((Err(e), (data, receiver))),
}
Expand Down Expand Up @@ -615,7 +634,7 @@ impl<T> Stream for Connection<T>
where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type Item = io::Result<IncomingData>;
type Item = io::Result<Incoming>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.receiver.poll_next_unpin(cx));
Expand Down
6 changes: 3 additions & 3 deletions transports/websocket/src/lib.rs
Expand Up @@ -25,7 +25,7 @@ pub mod framed;
pub mod tls;

use error::Error;
use framed::Connection;
use framed::{Connection, Incoming};
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
use libp2p_core::{
multiaddr::Multiaddr,
Expand Down Expand Up @@ -166,8 +166,8 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
if item.is_data() {
return Poll::Ready(Some(Ok(item.into_bytes())));
if let Incoming::Data(payload) = item {
return Poll::Ready(Some(Ok(payload.into_bytes())));
}
} else {
return Poll::Ready(None);
Expand Down