diff --git a/Cargo.toml b/Cargo.toml index 7bea3907e12..f6186f8ba39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,6 +44,7 @@ full = [ "wasm-bindgen", "wasm-ext", "wasm-ext-websocket", + "webrtc", "websocket", "yamux", ] @@ -75,11 +76,12 @@ rsa = ["libp2p-core/rsa"] secp256k1 = ["libp2p-core/secp256k1"] serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"] tcp = ["dep:libp2p-tcp"] -tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"] +tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio", "libp2p-webrtc?/tokio"] uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"] wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] +webrtc = ["dep:libp2p-webrtc", "libp2p-webrtc?/pem"] websocket = ["dep:libp2p-websocket"] yamux = ["dep:libp2p-yamux"] @@ -108,6 +110,7 @@ libp2p-request-response = { version = "0.23.0", path = "protocols/request-respon libp2p-swarm = { version = "0.41.0", path = "swarm" } libp2p-uds = { version = "0.37.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.38.0", path = "transports/wasm-ext", optional = true } +libp2p-webrtc = { version = "0.1.0-alpha", path = "transports/webrtc", optional = true } libp2p-yamux = { version = "0.42.0", path = "muxers/yamux", optional = true } multiaddr = { version = "0.16.0" } parking_lot = "0.12.0" @@ -168,7 +171,8 @@ members = [ "transports/tcp", "transports/uds", "transports/websocket", - "transports/wasm-ext" + "transports/wasm-ext", + "transports/webrtc" ] [[example]] diff --git a/misc/prost-codec/CHANGELOG.md b/misc/prost-codec/CHANGELOG.md index 382d68ac625..09ebe0615e1 100644 --- a/misc/prost-codec/CHANGELOG.md +++ b/misc/prost-codec/CHANGELOG.md @@ -1,7 +1,10 @@ + # 0.3.0 [unreleased] +- Implement `From` trait for `std::io::Error`. See [PR 2622]. - Don't leak `prost` dependency in `Error` type. See [PR 3058]. +[PR 2622]: https://github.com/libp2p/rust-libp2p/pull/2622/ [PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058/ # 0.2.0 diff --git a/misc/prost-codec/Cargo.toml b/misc/prost-codec/Cargo.toml index 09c725c1c90..fdfe6eecef7 100644 --- a/misc/prost-codec/Cargo.toml +++ b/misc/prost-codec/Cargo.toml @@ -20,7 +20,7 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } [dev-dependencies] prost-build = "0.11" -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/misc/prost-codec/src/lib.rs b/misc/prost-codec/src/lib.rs index 61c980d34ef..dc5714d742f 100644 --- a/misc/prost-codec/src/lib.rs +++ b/misc/prost-codec/src/lib.rs @@ -65,3 +65,9 @@ impl Decoder for Codec { #[derive(thiserror::Error, Debug)] #[error("Failed to encode/decode message")] pub struct Error(#[from] std::io::Error); + +impl From for std::io::Error { + fn from(e: Error) -> Self { + e.0 + } +} diff --git a/src/lib.rs b/src/lib.rs index 9bc38696674..1836af5896c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,10 @@ pub use libp2p_uds as uds; #[cfg(feature = "wasm-ext")] #[doc(inline)] pub use libp2p_wasm_ext as wasm_ext; +#[cfg(feature = "webrtc")] +#[cfg_attr(docsrs, doc(cfg(feature = "webrtc")))] +#[doc(inline)] +pub use libp2p_webrtc as webrtc; #[cfg(feature = "websocket")] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] diff --git a/transports/webrtc/Cargo.toml b/transports/webrtc/Cargo.toml new file mode 100644 index 00000000000..bb4ae80d0ad --- /dev/null +++ b/transports/webrtc/Cargo.toml @@ -0,0 +1,58 @@ +[package] +name = "libp2p-webrtc" +version = "0.1.0-alpha" +authors = ["Parity Technologies "] +description = "WebRTC transport for libp2p" +repository = "https://github.com/libp2p/rust-libp2p" +license = "MIT" +edition = "2021" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +async-trait = "0.1" +asynchronous-codec = "0.6.1" +bytes = "1" +futures = "0.3" +futures-timer = "3" +hex = "0.4" +if-watch = "2.0" +libp2p-core = { version = "0.38.0", path = "../../core" } +libp2p-noise = { version = "0.41.0", path = "../../transports/noise" } +log = "0.4" +multihash = { version = "0.16", default-features = false, features = ["sha2"] } +prost = "0.11" +prost-codec = { version = "0.3.0", path = "../../misc/prost-codec" } +rand = "0.8" +rcgen = "0.9.3" +serde = { version = "1.0", features = ["derive"] } +stun = "0.4" +thiserror = "1" +tinytemplate = "1.2" +tokio = { version = "1.19", features = ["net"], optional = true} +tokio-util = { version = "0.7", features = ["compat"], optional = true } +webrtc = { version = "0.6.0", optional = true } + +[features] +tokio = ["dep:tokio", "dep:tokio-util", "dep:webrtc"] +pem = ["webrtc?/pem"] + +[build-dependencies] +prost-build = "0.11" + +[dev-dependencies] +anyhow = "1.0" +env_logger = "0.9" +hex-literal = "0.3" +libp2p = { path = "../..", features = ["full"] } +tokio = { version = "1.19", features = ["full"] } +unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } +void = "1" + +[[test]] +name = "smoke" +required-features = ["tokio"] + +[[example]] +name = "listen_ping" +required-features = ["tokio"] diff --git a/transports/webrtc/build.rs b/transports/webrtc/build.rs new file mode 100644 index 00000000000..3f582337a68 --- /dev/null +++ b/transports/webrtc/build.rs @@ -0,0 +1,23 @@ +// Copyright 2022 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +fn main() { + prost_build::compile_protos(&["src/message.proto"], &["src"]).unwrap(); +} diff --git a/transports/webrtc/examples/listen_ping.rs b/transports/webrtc/examples/listen_ping.rs new file mode 100644 index 00000000000..55e1904483b --- /dev/null +++ b/transports/webrtc/examples/listen_ping.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use futures::StreamExt; +use libp2p::swarm::{keep_alive, NetworkBehaviour}; +use libp2p::Transport; +use libp2p::{ping, Swarm}; +use libp2p_core::identity; +use libp2p_core::muxing::StreamMuxerBox; +use rand::thread_rng; +use void::Void; + +/// An example WebRTC server that will accept connections and run the ping protocol on them. +#[tokio::main] +async fn main() -> Result<()> { + let mut swarm = create_swarm()?; + + swarm.listen_on("/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; + + loop { + let event = swarm.next().await.unwrap(); + eprintln!("New event: {event:?}") + } +} + +fn create_swarm() -> Result> { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = id_keys.public().to_peer_id(); + let transport = libp2p_webrtc::tokio::Transport::new( + id_keys, + libp2p_webrtc::tokio::Certificate::generate(&mut thread_rng())?, + ); + + let transport = transport + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) + .boxed(); + + Ok(Swarm::with_tokio_executor( + transport, + Behaviour::default(), + peer_id, + )) +} + +#[derive(NetworkBehaviour, Default)] +#[behaviour(out_event = "Event", event_process = false)] +struct Behaviour { + ping: ping::Behaviour, + keep_alive: keep_alive::Behaviour, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +enum Event { + Ping(ping::Event), +} + +impl From for Event { + fn from(e: ping::Event) -> Self { + Event::Ping(e) + } +} + +impl From for Event { + fn from(event: Void) -> Self { + void::unreachable(event) + } +} diff --git a/transports/webrtc/src/lib.rs b/transports/webrtc/src/lib.rs new file mode 100644 index 00000000000..3210ad38cac --- /dev/null +++ b/transports/webrtc/src/lib.rs @@ -0,0 +1,90 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [`libp2p_core::Transport`] trait for WebRTC protocol without a signaling +//! server. +//! +//! # Overview +//! +//! ## ICE +//! +//! RFCs: 8839, 8445 See also: +//! +//! +//! The WebRTC protocol uses ICE in order to establish a connection. +//! +//! In a typical ICE setup, there are two endpoints, called agents, that want to communicate. One +//! of these two agents can be the local browser, while the other agent is the target of the +//! connection. +//! +//! Even though in this specific context all we want is a simple client-server communication, it is +//! helpful to keep in mind that ICE was designed to solve the problem of NAT traversal. +//! +//! The ICE workflow works as follows: +//! +//! - An "offerer" determines ways in which it could be accessible (either an +//! IP address or through a relay using a TURN server), which are called "candidates". It then +//! generates a small text payload in a format called SDP, that describes the request for a +//! connection. +//! - The offerer sends this SDP-encoded message to the answerer. The medium through which this +//! exchange is done is out of scope of the ICE protocol. +//! - The answerer then finds its own candidates, and generates an answer, again in the SDP format. +//! This answer is sent back to the offerer. +//! - Each agent then tries to connect to the remote's candidates. +//! +//! We pretend to send the offer to the remote agent (the target of the connection), then pretend +//! that it has found a valid IP address for itself (i.e. a candidate), then pretend that the SDP +//! answer containing this candidate has been sent back. This will cause the offerer to execute +//! step 4: try to connect to the remote's candidate. +//! +//! ## TCP or UDP +//! +//! WebRTC by itself doesn't hardcode any specific protocol for media streams. Instead, it is the +//! SDP message of the offerer that specifies which protocol to use. In our use case (one or more +//! data channels), we know that the offerer will always request either TCP+DTLS+SCTP, or +//! UDP+DTLS+SCTP. +//! +//! The implementation only supports UDP at the moment, so if the offerer requests TCP+DTLS+SCTP, it +//! will not respond. Support for TCP may be added in the future (see +//! ). +//! +//! ## DTLS+SCTP +//! +//! RFCs: 8841, 8832 +//! +//! In both cases (TCP or UDP), the next layer is DTLS. DTLS is similar to the well-known TLS +//! protocol, except that it doesn't guarantee ordering of delivery (as this is instead provided by +//! the SCTP layer on top of DTLS). In other words, once the TCP or UDP connection is established, +//! the browser will try to perform a DTLS handshake. +//! +//! During the ICE negotiation, each agent must include in its SDP packet a hash of the self-signed +//! certificate that it will use during the DTLS handshake. In our use-case, where we try to +//! hand-crate the SDP answer generated by the remote, this is problematic. A way to solve this +//! is to make the hash a part of the remote's multiaddr. On the server side, we turn +//! certificate verification off. + +mod message_proto { + #![allow(clippy::derive_partial_eq_without_eq)] + + include!(concat!(env!("OUT_DIR"), "/webrtc.pb.rs")); +} + +#[cfg(feature = "tokio")] +pub mod tokio; diff --git a/transports/webrtc/src/message.proto b/transports/webrtc/src/message.proto new file mode 100644 index 00000000000..eab3ceb720b --- /dev/null +++ b/transports/webrtc/src/message.proto @@ -0,0 +1,20 @@ +syntax = "proto2"; + +package webrtc.pb; + +message Message { + enum Flag { + // The sender will no longer send messages on the stream. + FIN = 0; + // The sender will no longer read messages on the stream. Incoming data is + // being discarded on receipt. + STOP_SENDING = 1; + // The sender abruptly terminates the sending part of the stream. The + // receiver can discard any data that it already received on that stream. + RESET = 2; + } + + optional Flag flag=1; + + optional bytes message = 2; +} diff --git a/transports/webrtc/src/tokio/certificate.rs b/transports/webrtc/src/tokio/certificate.rs new file mode 100644 index 00000000000..748cfdb6ffd --- /dev/null +++ b/transports/webrtc/src/tokio/certificate.rs @@ -0,0 +1,120 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use rand::{distributions::DistString, CryptoRng, Rng}; +use webrtc::peer_connection::certificate::RTCCertificate; + +use crate::tokio::fingerprint::Fingerprint; + +#[derive(Debug, Clone, PartialEq)] +pub struct Certificate { + inner: RTCCertificate, +} + +impl Certificate { + /// Generate new certificate. + /// + /// `_rng` argument is ignored for now. See . + pub fn generate(_rng: &mut R) -> Result + where + R: CryptoRng + Rng, + { + let mut params = rcgen::CertificateParams::new(vec![ + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 16) + ]); + params.alg = &rcgen::PKCS_ECDSA_P256_SHA256; + Ok(Self { + inner: RTCCertificate::from_params(params).expect("default params to work"), + }) + } + + /// Returns SHA-256 fingerprint of this certificate. + /// + /// # Panics + /// + /// This function will panic if there's no fingerprint with the SHA-256 algorithm (see + /// [`RTCCertificate::get_fingerprints`]). + pub fn fingerprint(&self) -> Fingerprint { + let fingerprints = self.inner.get_fingerprints(); + let sha256_fingerprint = fingerprints + .iter() + .find(|f| f.algorithm == "sha-256") + .expect("a SHA-256 fingerprint"); + + Fingerprint::try_from_rtc_dtls(sha256_fingerprint).expect("we filtered by sha-256") + } + + /// Parses a certificate from the ASCII PEM format. + /// + /// See [`RTCCertificate::from_pem`] + #[cfg(feature = "pem")] + pub fn from_pem(pem_str: &str) -> Result { + Ok(Self { + inner: RTCCertificate::from_pem(pem_str).map_err(Kind::InvalidPEM)?, + }) + } + + /// Serializes the certificate (including the private key) in PKCS#8 format in PEM. + /// + /// See [`RTCCertificate::serialize_pem`] + #[cfg(feature = "pem")] + pub fn serialize_pem(&self) -> String { + self.inner.serialize_pem() + } + + /// Extract the [`RTCCertificate`] from this wrapper. + /// + /// This function is `pub(crate)` to avoid leaking the `webrtc` dependency to our users. + pub(crate) fn to_rtc_certificate(&self) -> RTCCertificate { + self.inner.clone() + } +} + +#[derive(thiserror::Error, Debug)] +#[error("Failed to generate certificate")] +pub struct Error(#[from] Kind); + +#[derive(thiserror::Error, Debug)] +enum Kind { + #[error(transparent)] + InvalidPEM(#[from] webrtc::Error), +} + +#[cfg(test)] +mod test { + #[cfg(feature = "pem")] + use anyhow::Result; + + #[cfg(feature = "pem")] + #[test] + fn test_certificate_serialize_pem_and_from_pem() -> Result<()> { + use super::*; + use rand::thread_rng; + + let cert = Certificate::generate(&mut thread_rng()).unwrap(); + + let pem = cert.serialize_pem(); + let loaded_cert = Certificate::from_pem(&pem)?; + + assert_eq!(loaded_cert, cert); + + Ok(()) + } +} diff --git a/transports/webrtc/src/tokio/connection.rs b/transports/webrtc/src/tokio/connection.rs new file mode 100644 index 00000000000..72e39ce525f --- /dev/null +++ b/transports/webrtc/src/tokio/connection.rs @@ -0,0 +1,299 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::stream::FuturesUnordered; +use futures::{ + channel::{ + mpsc, + oneshot::{self, Sender}, + }, + lock::Mutex as FutMutex, + StreamExt, + {future::BoxFuture, ready}, +}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use webrtc::data::data_channel::DataChannel as DetachedDataChannel; +use webrtc::data_channel::RTCDataChannel; +use webrtc::peer_connection::RTCPeerConnection; + +use std::task::Waker; +use std::{ + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::tokio::{error::Error, substream, substream::Substream}; + +/// Maximum number of unprocessed data channels. +/// See [`Connection::poll_inbound`]. +const MAX_DATA_CHANNELS_IN_FLIGHT: usize = 10; + +/// A WebRTC connection, wrapping [`RTCPeerConnection`] and implementing [`StreamMuxer`] trait. +pub struct Connection { + /// [`RTCPeerConnection`] to the remote peer. + /// + /// Uses futures mutex because used in async code (see poll_outbound and poll_close). + peer_conn: Arc>, + + /// Channel onto which incoming data channels are put. + incoming_data_channels_rx: mpsc::Receiver>, + + /// Future, which, once polled, will result in an outbound substream. + outbound_fut: Option, Error>>>, + + /// Future, which, once polled, will result in closing the entire connection. + close_fut: Option>>, + + /// A list of futures, which, once completed, signal that a [`Substream`] has been dropped. + drop_listeners: FuturesUnordered, + no_drop_listeners_waker: Option, +} + +impl Unpin for Connection {} + +impl Connection { + /// Creates a new connection. + pub(crate) async fn new(rtc_conn: RTCPeerConnection) -> Self { + let (data_channel_tx, data_channel_rx) = mpsc::channel(MAX_DATA_CHANNELS_IN_FLIGHT); + + Connection::register_incoming_data_channels_handler( + &rtc_conn, + Arc::new(FutMutex::new(data_channel_tx)), + ) + .await; + + Self { + peer_conn: Arc::new(FutMutex::new(rtc_conn)), + incoming_data_channels_rx: data_channel_rx, + outbound_fut: None, + close_fut: None, + drop_listeners: FuturesUnordered::default(), + no_drop_listeners_waker: None, + } + } + + /// Registers a handler for incoming data channels. + /// + /// NOTE: `mpsc::Sender` is wrapped in `Arc` because cloning a raw sender would make the channel + /// unbounded. "The channel’s capacity is equal to buffer + num-senders. In other words, each + /// sender gets a guaranteed slot in the channel capacity..." + /// See + async fn register_incoming_data_channels_handler( + rtc_conn: &RTCPeerConnection, + tx: Arc>>>, + ) { + rtc_conn.on_data_channel(Box::new(move |data_channel: Arc| { + log::debug!("Incoming data channel {}", data_channel.id()); + + let tx = tx.clone(); + + Box::pin(async move { + data_channel.on_open({ + let data_channel = data_channel.clone(); + Box::new(move || { + log::debug!("Data channel {} open", data_channel.id()); + + Box::pin(async move { + let data_channel = data_channel.clone(); + let id = data_channel.id(); + match data_channel.detach().await { + Ok(detached) => { + let mut tx = tx.lock().await; + if let Err(e) = tx.try_send(detached.clone()) { + log::error!("Can't send data channel {}: {}", id, e); + // We're not accepting data channels fast enough => + // close this channel. + // + // Ideally we'd refuse to accept a data channel + // during the negotiation process, but it's not + // possible with the current API. + if let Err(e) = detached.close().await { + log::error!( + "Failed to close data channel {}: {}", + id, + e + ); + } + } + } + Err(e) => { + log::error!("Can't detach data channel {}: {}", id, e); + } + }; + }) + }) + }); + }) + })); + } +} + +impl StreamMuxer for Connection { + type Substream = Substream; + type Error = Error; + + fn poll_inbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match ready!(self.incoming_data_channels_rx.poll_next_unpin(cx)) { + Some(detached) => { + log::trace!("Incoming substream {}", detached.stream_identifier()); + + let (substream, drop_listener) = Substream::new(detached); + self.drop_listeners.push(drop_listener); + if let Some(waker) = self.no_drop_listeners_waker.take() { + waker.wake() + } + + Poll::Ready(Ok(substream)) + } + None => { + debug_assert!( + false, + "Sender-end of channel should be owned by `RTCPeerConnection`" + ); + + Poll::Pending // Return `Pending` without registering a waker: If the channel is closed, we don't need to be called anymore. + } + } + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match ready!(self.drop_listeners.poll_next_unpin(cx)) { + Some(Ok(())) => {} + Some(Err(e)) => { + log::debug!("a DropListener failed: {e}") + } + None => { + self.no_drop_listeners_waker = Some(cx.waker().clone()); + return Poll::Pending; + } + } + } + } + + fn poll_outbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let peer_conn = self.peer_conn.clone(); + let fut = self.outbound_fut.get_or_insert(Box::pin(async move { + let peer_conn = peer_conn.lock().await; + + let data_channel = peer_conn.create_data_channel("", None).await?; + + // No need to hold the lock during the DTLS handshake. + drop(peer_conn); + + log::trace!("Opening data channel {}", data_channel.id()); + + let (tx, rx) = oneshot::channel::>(); + + // Wait until the data channel is opened and detach it. + register_data_channel_open_handler(data_channel, tx).await; + + // Wait until data channel is opened and ready to use + match rx.await { + Ok(detached) => Ok(detached), + Err(e) => Err(Error::Internal(e.to_string())), + } + })); + + match ready!(fut.as_mut().poll(cx)) { + Ok(detached) => { + self.outbound_fut = None; + + log::trace!("Outbound substream {}", detached.stream_identifier()); + + let (substream, drop_listener) = Substream::new(detached); + self.drop_listeners.push(drop_listener); + if let Some(waker) = self.no_drop_listeners_waker.take() { + waker.wake() + } + + Poll::Ready(Ok(substream)) + } + Err(e) => { + self.outbound_fut = None; + Poll::Ready(Err(e)) + } + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + log::debug!("Closing connection"); + + let peer_conn = self.peer_conn.clone(); + let fut = self.close_fut.get_or_insert(Box::pin(async move { + let peer_conn = peer_conn.lock().await; + peer_conn.close().await?; + + Ok(()) + })); + + match ready!(fut.as_mut().poll(cx)) { + Ok(()) => { + self.incoming_data_channels_rx.close(); + self.close_fut = None; + Poll::Ready(Ok(())) + } + Err(e) => { + self.close_fut = None; + Poll::Ready(Err(e)) + } + } + } +} + +pub(crate) async fn register_data_channel_open_handler( + data_channel: Arc, + data_channel_tx: Sender>, +) { + data_channel.on_open({ + let data_channel = data_channel.clone(); + Box::new(move || { + log::debug!("Data channel {} open", data_channel.id()); + + Box::pin(async move { + let data_channel = data_channel.clone(); + let id = data_channel.id(); + match data_channel.detach().await { + Ok(detached) => { + if let Err(e) = data_channel_tx.send(detached.clone()) { + log::error!("Can't send data channel {}: {:?}", id, e); + if let Err(e) = detached.close().await { + log::error!("Failed to close data channel {}: {}", id, e); + } + } + } + Err(e) => { + log::error!("Can't detach data channel {}: {}", id, e); + } + }; + }) + }) + }); +} diff --git a/transports/webrtc/src/tokio/error.rs b/transports/webrtc/src/tokio/error.rs new file mode 100644 index 00000000000..f91011ffd46 --- /dev/null +++ b/transports/webrtc/src/tokio/error.rs @@ -0,0 +1,46 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::PeerId; +use thiserror::Error; + +/// Error in WebRTC. +#[derive(Error, Debug)] +pub enum Error { + #[error(transparent)] + WebRTC(#[from] webrtc::Error), + #[error("IO error")] + Io(#[from] std::io::Error), + #[error("failed to authenticate peer")] + Authentication(#[from] libp2p_noise::NoiseError), + + // Authentication errors. + #[error("invalid peer ID (expected {expected}, got {got})")] + InvalidPeerID { expected: PeerId, got: PeerId }, + + #[error("no active listeners, can not dial without a previous listen")] + NoListeners, + + #[error("UDP mux error: {0}")] + UDPMux(std::io::Error), + + #[error("internal error: {0} (see debug logs)")] + Internal(String), +} diff --git a/transports/webrtc/src/tokio/fingerprint.rs b/transports/webrtc/src/tokio/fingerprint.rs new file mode 100644 index 00000000000..55cfa1d51d6 --- /dev/null +++ b/transports/webrtc/src/tokio/fingerprint.rs @@ -0,0 +1,116 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use multihash::{Code, Hasher, Multihash, MultihashDigest}; +use webrtc::dtls_transport::dtls_fingerprint::RTCDtlsFingerprint; + +use std::fmt; + +const SHA256: &str = "sha-256"; + +/// A certificate fingerprint that is assumed to be created using the SHA256 hash algorithm. +#[derive(Eq, PartialEq, Copy, Clone)] +pub struct Fingerprint([u8; 32]); + +impl Fingerprint { + pub(crate) const FF: Fingerprint = Fingerprint([0xFF; 32]); + + #[cfg(test)] + pub fn raw(bytes: [u8; 32]) -> Self { + Self(bytes) + } + + /// Creates a fingerprint from a raw certificate. + pub fn from_certificate(bytes: &[u8]) -> Self { + let mut h = multihash::Sha2_256::default(); + h.update(bytes); + + let mut bytes: [u8; 32] = [0; 32]; + bytes.copy_from_slice(h.finalize()); + + Fingerprint(bytes) + } + + /// Converts [`RTCDtlsFingerprint`] to [`Fingerprint`]. + pub fn try_from_rtc_dtls(fp: &RTCDtlsFingerprint) -> Option { + if fp.algorithm != SHA256 { + return None; + } + + let mut buf = [0; 32]; + hex::decode_to_slice(fp.value.replace(':', ""), &mut buf).ok()?; + + Some(Self(buf)) + } + + /// Converts [`type@Multihash`] to [`Fingerprint`]. + pub fn try_from_multihash(hash: Multihash) -> Option { + if hash.code() != u64::from(Code::Sha2_256) { + // Only support SHA256 for now. + return None; + } + + let bytes = hash.digest().try_into().ok()?; + + Some(Self(bytes)) + } + + /// Converts this fingerprint to [`type@Multihash`]. + pub fn to_multihash(self) -> Multihash { + Code::Sha2_256 + .wrap(&self.0) + .expect("fingerprint's len to be 32 bytes") + } + + /// Formats this fingerprint as uppercase hex, separated by colons (`:`). + /// + /// This is the format described in . + pub fn to_sdp_format(self) -> String { + self.0.map(|byte| format!("{:02X}", byte)).join(":") + } + + /// Returns the algorithm used (e.g. "sha-256"). + /// See + pub fn algorithm(&self) -> String { + SHA256.to_owned() + } +} + +impl fmt::Debug for Fingerprint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&hex::encode(self.0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sdp_format() { + let fp = Fingerprint::raw(hex_literal::hex!( + "7DE3D83F81A680592A471E6B6ABB0747ABD35385A8093FDFE112C1EEBB6CC6AC" + )); + + let sdp_format = fp.to_sdp_format(); + + assert_eq!(sdp_format, "7D:E3:D8:3F:81:A6:80:59:2A:47:1E:6B:6A:BB:07:47:AB:D3:53:85:A8:09:3F:DF:E1:12:C1:EE:BB:6C:C6:AC") + } +} diff --git a/transports/webrtc/src/tokio/mod.rs b/transports/webrtc/src/tokio/mod.rs new file mode 100644 index 00000000000..81775c6d0f6 --- /dev/null +++ b/transports/webrtc/src/tokio/mod.rs @@ -0,0 +1,35 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub mod certificate; +mod connection; +mod error; +mod fingerprint; +mod req_res_chan; +mod sdp; +mod substream; +mod transport; +mod udp_mux; +mod upgrade; + +pub use certificate::Certificate; +pub use connection::Connection; +pub use error::Error; +pub use transport::Transport; diff --git a/transports/webrtc/src/tokio/req_res_chan.rs b/transports/webrtc/src/tokio/req_res_chan.rs new file mode 100644 index 00000000000..a102aa0357a --- /dev/null +++ b/transports/webrtc/src/tokio/req_res_chan.rs @@ -0,0 +1,75 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{ + channel::{mpsc, oneshot}, + SinkExt, StreamExt, +}; + +use std::{ + io, + task::{Context, Poll}, +}; + +pub fn new(capacity: usize) -> (Sender, Receiver) { + let (sender, receiver) = mpsc::channel(capacity); + + ( + Sender { + inner: futures::lock::Mutex::new(sender), + }, + Receiver { inner: receiver }, + ) +} + +pub struct Sender { + inner: futures::lock::Mutex)>>, +} + +impl Sender { + pub async fn send(&self, req: Req) -> io::Result { + let (sender, receiver) = oneshot::channel(); + + self.inner + .lock() + .await + .send((req, sender)) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + let res = receiver + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + Ok(res) + } +} + +pub struct Receiver { + inner: mpsc::Receiver<(Req, oneshot::Sender)>, +} + +impl Receiver { + pub fn poll_next_unpin( + &mut self, + cx: &mut Context<'_>, + ) -> Poll)>> { + self.inner.poll_next_unpin(cx) + } +} diff --git a/transports/webrtc/src/tokio/sdp.rs b/transports/webrtc/src/tokio/sdp.rs new file mode 100644 index 00000000000..659057788ff --- /dev/null +++ b/transports/webrtc/src/tokio/sdp.rs @@ -0,0 +1,252 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use serde::Serialize; +use tinytemplate::TinyTemplate; +use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; + +use std::net::{IpAddr, SocketAddr}; + +use crate::tokio::fingerprint::Fingerprint; + +/// Creates the SDP answer used by the client. +pub fn answer( + addr: SocketAddr, + server_fingerprint: &Fingerprint, + client_ufrag: &str, +) -> RTCSessionDescription { + RTCSessionDescription::answer(render_description( + SERVER_SESSION_DESCRIPTION, + addr, + server_fingerprint, + client_ufrag, + )) + .unwrap() +} + +/// Creates the SDP offer used by the server. +/// +/// Certificate verification is disabled which is why we hardcode a dummy fingerprint here. +pub fn offer(addr: SocketAddr, client_ufrag: &str) -> RTCSessionDescription { + RTCSessionDescription::offer(render_description( + CLIENT_SESSION_DESCRIPTION, + addr, + &Fingerprint::FF, + client_ufrag, + )) + .unwrap() +} + +// An SDP message that constitutes the offer. +// +// Main RFC: +// `sctp-port` and `max-message-size` attrs RFC: +// `group` and `mid` attrs RFC: +// `ice-ufrag`, `ice-pwd` and `ice-options` attrs RFC: +// `setup` attr RFC: +// +// Short description: +// +// v= -> always 0 +// o= +// +// identifies the creator of the SDP document. We are allowed to use dummy values +// (`-` and `0.0.0.0` as ) to remain anonymous, which we do. Note that "IN" means +// "Internet". +// +// s= +// +// We are allowed to pass a dummy `-`. +// +// c= +// +// Indicates the IP address of the remote. +// Note that "IN" means "Internet". +// +// t= +// +// Start and end of the validity of the session. `0 0` means that the session never expires. +// +// m= ... +// +// A `m=` line describes a request to establish a certain protocol. The protocol in this line +// (i.e. `TCP/DTLS/SCTP` or `UDP/DTLS/SCTP`) must always be the same as the one in the offer. +// We know that this is true because we tweak the offer to match the protocol. The `` +// component must always be `webrtc-datachannel` for WebRTC. +// RFCs: 8839, 8866, 8841 +// +// a=mid: +// +// Media ID - uniquely identifies this media stream (RFC9143). +// +// a=ice-options:ice2 +// +// Indicates that we are complying with RFC8839 (as oppposed to the legacy RFC5245). +// +// a=ice-ufrag: +// a=ice-pwd: +// +// ICE username and password, which are used for establishing and +// maintaining the ICE connection. (RFC8839) +// MUST match ones used by the answerer (server). +// +// a=fingerprint:sha-256 +// +// Fingerprint of the certificate that the remote will use during the TLS +// handshake. (RFC8122) +// +// a=setup:actpass +// +// The endpoint that is the offerer MUST use the setup attribute value of setup:actpass and be +// prepared to receive a client_hello before it receives the answer. +// +// a=sctp-port: +// +// The SCTP port (RFC8841) +// Note it's different from the "m=" line port value, which indicates the port of the +// underlying transport-layer protocol (UDP or TCP). +// +// a=max-message-size: +// +// The maximum SCTP user message size (in bytes). (RFC8841) +const CLIENT_SESSION_DESCRIPTION: &str = "v=0 +o=- 0 0 IN {ip_version} {target_ip} +s=- +c=IN {ip_version} {target_ip} +t=0 0 + +m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:{ufrag} +a=ice-pwd:{pwd} +a=fingerprint:{fingerprint_algorithm} {fingerprint_value} +a=setup:actpass +a=sctp-port:5000 +a=max-message-size:16384 +"; + +// See [`CLIENT_SESSION_DESCRIPTION`]. +// +// a=ice-lite +// +// A lite implementation is only appropriate for devices that will *always* be connected to +// the public Internet and have a public IP address at which it can receive packets from any +// correspondent. ICE will not function when a lite implementation is placed behind a NAT +// (RFC8445). +// +// a=tls-id: +// +// "TLS ID" uniquely identifies a TLS association. +// The ICE protocol uses a "TLS ID" system to indicate whether a fresh DTLS connection +// must be reopened in case of ICE renegotiation. Considering that ICE renegotiations +// never happen in our use case, we can simply put a random value and not care about +// it. Note however that the TLS ID in the answer must be present if and only if the +// offer contains one. (RFC8842) +// TODO: is it true that renegotiations never happen? what about a connection closing? +// "tls-id" attribute MUST be present in the initial offer and respective answer (RFC8839). +// XXX: but right now browsers don't send it. +// +// a=setup:passive +// +// "passive" indicates that the remote DTLS server will only listen for incoming +// connections. (RFC5763) +// The answerer (server) MUST not be located behind a NAT (RFC6135). +// +// The answerer MUST use either a setup attribute value of setup:active or setup:passive. +// Note that if the answerer uses setup:passive, then the DTLS handshake will not begin until +// the answerer is received, which adds additional latency. setup:active allows the answer and +// the DTLS handshake to occur in parallel. Thus, setup:active is RECOMMENDED. +// +// a=candidate: +// +// A transport address for a candidate that can be used for connectivity checks (RFC8839). +// +// a=end-of-candidates +// +// Indicate that no more candidates will ever be sent (RFC8838). +const SERVER_SESSION_DESCRIPTION: &str = "v=0 +o=- 0 0 IN {ip_version} {target_ip} +s=- +t=0 0 +a=ice-lite +m=application {target_port} UDP/DTLS/SCTP webrtc-datachannel +c=IN {ip_version} {target_ip} +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:{ufrag} +a=ice-pwd:{pwd} +a=fingerprint:{fingerprint_algorithm} {fingerprint_value} + +a=setup:passive +a=sctp-port:5000 +a=max-message-size:16384 +a=candidate:1 1 UDP 1 {target_ip} {target_port} typ host +a=end-of-candidates +"; + +/// Indicates the IP version used in WebRTC: `IP4` or `IP6`. +#[derive(Serialize)] +enum IpVersion { + IP4, + IP6, +} + +/// Context passed to the templating engine, which replaces the above placeholders (e.g. +/// `{IP_VERSION}`) with real values. +#[derive(Serialize)] +struct DescriptionContext { + pub ip_version: IpVersion, + pub target_ip: IpAddr, + pub target_port: u16, + pub fingerprint_algorithm: String, + pub fingerprint_value: String, + pub ufrag: String, + pub pwd: String, +} + +/// Renders a [`TinyTemplate`] description using the provided arguments. +fn render_description( + description: &str, + addr: SocketAddr, + fingerprint: &Fingerprint, + ufrag: &str, +) -> String { + let mut tt = TinyTemplate::new(); + tt.add_template("description", description).unwrap(); + + let context = DescriptionContext { + ip_version: { + if addr.is_ipv4() { + IpVersion::IP4 + } else { + IpVersion::IP6 + } + }, + target_ip: addr.ip(), + target_port: addr.port(), + fingerprint_algorithm: fingerprint.algorithm(), + fingerprint_value: fingerprint.to_sdp_format(), + // NOTE: ufrag is equal to pwd. + ufrag: ufrag.to_owned(), + pwd: ufrag.to_owned(), + }; + tt.render("description", &context).unwrap() +} diff --git a/transports/webrtc/src/tokio/substream.rs b/transports/webrtc/src/tokio/substream.rs new file mode 100644 index 00000000000..8622719a37c --- /dev/null +++ b/transports/webrtc/src/tokio/substream.rs @@ -0,0 +1,295 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use asynchronous_codec::Framed; +use bytes::Bytes; +use futures::{channel::oneshot, prelude::*, ready}; +use tokio_util::compat::Compat; +use webrtc::data::data_channel::{DataChannel, PollDataChannel}; + +use std::{ + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::message_proto::{message::Flag, Message}; +use crate::tokio::{ + substream::drop_listener::GracefullyClosed, + substream::framed_dc::FramedDc, + substream::state::{Closing, State}, +}; + +mod drop_listener; +mod framed_dc; +mod state; + +/// Maximum length of a message. +/// +/// "As long as message interleaving is not supported, the sender SHOULD limit the maximum message +/// size to 16 KB to avoid monopolization." +/// Source: +const MAX_MSG_LEN: usize = 16384; // 16kiB +/// Length of varint, in bytes. +const VARINT_LEN: usize = 2; +/// Overhead of the protobuf encoding, in bytes. +const PROTO_OVERHEAD: usize = 5; +/// Maximum length of data, in bytes. +const MAX_DATA_LEN: usize = MAX_MSG_LEN - VARINT_LEN - PROTO_OVERHEAD; + +pub use drop_listener::DropListener; + +/// A substream on top of a WebRTC data channel. +/// +/// To be a proper libp2p substream, we need to implement [`AsyncRead`] and [`AsyncWrite`] as well +/// as support a half-closed state which we do by framing messages in a protobuf envelope. +pub struct Substream { + io: FramedDc, + state: State, + read_buffer: Bytes, + /// Dropping this will close the oneshot and notify the receiver by emitting `Canceled`. + drop_notifier: Option>, +} + +impl Substream { + /// Returns a new `Substream` and a listener, which will notify the receiver when/if the substream + /// is dropped. + pub(crate) fn new(data_channel: Arc) -> (Self, DropListener) { + let (sender, receiver) = oneshot::channel(); + + let substream = Self { + io: framed_dc::new(data_channel.clone()), + state: State::Open, + read_buffer: Bytes::default(), + drop_notifier: Some(sender), + }; + let listener = DropListener::new(framed_dc::new(data_channel), receiver); + + (substream, listener) + } + + /// Gracefully closes the "read-half" of the substream. + pub fn poll_close_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match self.state.close_read_barrier()? { + Some(Closing::Requested) => { + ready!(self.io.poll_ready_unpin(cx))?; + + self.io.start_send_unpin(Message { + flag: Some(Flag::StopSending.into()), + message: None, + })?; + self.state.close_read_message_sent(); + + continue; + } + Some(Closing::MessageSent) => { + ready!(self.io.poll_flush_unpin(cx))?; + + self.state.read_closed(); + + return Poll::Ready(Ok(())); + } + None => return Poll::Ready(Ok(())), + } + } + } +} + +impl AsyncRead for Substream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + loop { + self.state.read_barrier()?; + + if !self.read_buffer.is_empty() { + let n = std::cmp::min(self.read_buffer.len(), buf.len()); + let data = self.read_buffer.split_to(n); + buf[0..n].copy_from_slice(&data[..]); + + return Poll::Ready(Ok(n)); + } + + let Self { + read_buffer, + io, + state, + .. + } = &mut *self; + + match ready!(io_poll_next(io, cx))? { + Some((flag, message)) => { + if let Some(flag) = flag { + state.handle_inbound_flag(flag, read_buffer); + } + + debug_assert!(read_buffer.is_empty()); + if let Some(message) = message { + *read_buffer = message.into(); + } + } + None => { + state.handle_inbound_flag(Flag::Fin, read_buffer); + return Poll::Ready(Ok(0)); + } + } + } + } +} + +impl AsyncWrite for Substream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + while self.state.read_flags_in_async_write() { + // TODO: In case AsyncRead::poll_read encountered an error or returned None earlier, we will poll the + // underlying I/O resource once more. Is that allowed? How about introducing a state IoReadClosed? + + let Self { + read_buffer, + io, + state, + .. + } = &mut *self; + + match io_poll_next(io, cx)? { + Poll::Ready(Some((Some(flag), message))) => { + // Read side is closed. Discard any incoming messages. + drop(message); + // But still handle flags, e.g. a `Flag::StopSending`. + state.handle_inbound_flag(flag, read_buffer) + } + Poll::Ready(Some((None, message))) => drop(message), + Poll::Ready(None) | Poll::Pending => break, + } + } + + self.state.write_barrier()?; + + ready!(self.io.poll_ready_unpin(cx))?; + + let n = usize::min(buf.len(), MAX_DATA_LEN); + + Pin::new(&mut self.io).start_send(Message { + flag: None, + message: Some(buf[0..n].into()), + })?; + + Poll::Ready(Ok(n)) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.io.poll_flush_unpin(cx).map_err(Into::into) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match self.state.close_write_barrier()? { + Some(Closing::Requested) => { + ready!(self.io.poll_ready_unpin(cx))?; + + self.io.start_send_unpin(Message { + flag: Some(Flag::Fin.into()), + message: None, + })?; + self.state.close_write_message_sent(); + + continue; + } + Some(Closing::MessageSent) => { + ready!(self.io.poll_flush_unpin(cx))?; + + self.state.write_closed(); + let _ = self + .drop_notifier + .take() + .expect("to not close twice") + .send(GracefullyClosed {}); + + return Poll::Ready(Ok(())); + } + None => return Poll::Ready(Ok(())), + } + } + } +} + +fn io_poll_next( + io: &mut Framed, prost_codec::Codec>, + cx: &mut Context<'_>, +) -> Poll, Option>)>>> { + match ready!(io.poll_next_unpin(cx)) + .transpose() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))? + { + Some(Message { flag, message }) => { + let flag = flag + .map(|f| { + Flag::from_i32(f).ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "")) + }) + .transpose()?; + + Poll::Ready(Ok(Some((flag, message)))) + } + None => Poll::Ready(Ok(None)), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use asynchronous_codec::Encoder; + use bytes::BytesMut; + use prost::Message; + use unsigned_varint::codec::UviBytes; + + #[test] + fn max_data_len() { + // Largest possible message. + let message = [0; MAX_DATA_LEN]; + + let protobuf = crate::message_proto::Message { + flag: Some(crate::message_proto::message::Flag::Fin.into()), + message: Some(message.to_vec()), + }; + + let mut encoded_msg = BytesMut::new(); + protobuf + .encode(&mut encoded_msg) + .expect("BytesMut to have sufficient capacity."); + assert_eq!(encoded_msg.len(), message.len() + PROTO_OVERHEAD); + + let mut uvi = UviBytes::default(); + let mut dst = BytesMut::new(); + uvi.encode(encoded_msg.clone().freeze(), &mut dst).unwrap(); + + // Ensure the varint prefixed and protobuf encoded largest message is no longer than the + // maximum limit specified in the libp2p WebRTC specification. + assert_eq!(dst.len(), MAX_MSG_LEN); + + assert_eq!(dst.len() - encoded_msg.len(), VARINT_LEN); + } +} diff --git a/transports/webrtc/src/tokio/substream/drop_listener.rs b/transports/webrtc/src/tokio/substream/drop_listener.rs new file mode 100644 index 00000000000..892d9c876a0 --- /dev/null +++ b/transports/webrtc/src/tokio/substream/drop_listener.rs @@ -0,0 +1,130 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::channel::oneshot; +use futures::channel::oneshot::Canceled; +use futures::{FutureExt, SinkExt}; + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::message_proto::{message::Flag, Message}; +use crate::tokio::substream::framed_dc::FramedDc; + +#[must_use] +pub struct DropListener { + state: State, +} + +impl DropListener { + pub fn new(stream: FramedDc, receiver: oneshot::Receiver) -> Self { + let substream_id = stream.get_ref().stream_identifier(); + + Self { + state: State::Idle { + stream, + receiver, + substream_id, + }, + } + } +} + +enum State { + /// The [`DropListener`] is idle and waiting to be activated. + Idle { + stream: FramedDc, + receiver: oneshot::Receiver, + substream_id: u16, + }, + /// The stream got dropped and we are sending a reset flag. + SendingReset { + stream: FramedDc, + }, + Flushing { + stream: FramedDc, + }, + /// Bad state transition. + Poisoned, +} + +impl Future for DropListener { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let state = &mut self.get_mut().state; + + loop { + match std::mem::replace(state, State::Poisoned) { + State::Idle { + stream, + substream_id, + mut receiver, + } => match receiver.poll_unpin(cx) { + Poll::Ready(Ok(GracefullyClosed {})) => { + return Poll::Ready(Ok(())); + } + Poll::Ready(Err(Canceled)) => { + log::info!("Substream {substream_id} dropped without graceful close, sending Reset"); + *state = State::SendingReset { stream }; + continue; + } + Poll::Pending => { + *state = State::Idle { + stream, + substream_id, + receiver, + }; + return Poll::Pending; + } + }, + State::SendingReset { mut stream } => match stream.poll_ready_unpin(cx)? { + Poll::Ready(()) => { + stream.start_send_unpin(Message { + flag: Some(Flag::Reset.into()), + message: None, + })?; + *state = State::Flushing { stream }; + continue; + } + Poll::Pending => { + *state = State::SendingReset { stream }; + return Poll::Pending; + } + }, + State::Flushing { mut stream } => match stream.poll_flush_unpin(cx)? { + Poll::Ready(()) => return Poll::Ready(Ok(())), + Poll::Pending => { + *state = State::Flushing { stream }; + return Poll::Pending; + } + }, + State::Poisoned => { + unreachable!() + } + } + } + } +} + +/// Indicates that our substream got gracefully closed. +pub struct GracefullyClosed {} diff --git a/transports/webrtc/src/tokio/substream/framed_dc.rs b/transports/webrtc/src/tokio/substream/framed_dc.rs new file mode 100644 index 00000000000..39bd117f1f1 --- /dev/null +++ b/transports/webrtc/src/tokio/substream/framed_dc.rs @@ -0,0 +1,45 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use asynchronous_codec::Framed; +use tokio_util::compat::Compat; +use tokio_util::compat::TokioAsyncReadCompatExt; +use webrtc::data::data_channel::{DataChannel, PollDataChannel}; + +use std::sync::Arc; + +use super::{MAX_DATA_LEN, MAX_MSG_LEN, VARINT_LEN}; +use crate::message_proto::Message; + +pub type FramedDc = Framed, prost_codec::Codec>; + +pub fn new(data_channel: Arc) -> FramedDc { + let mut inner = PollDataChannel::new(data_channel); + inner.set_read_buf_capacity(MAX_MSG_LEN); + + let mut framed = Framed::new( + inner.compat(), + prost_codec::Codec::new(MAX_MSG_LEN - VARINT_LEN), + ); + // If not set, `Framed` buffers up to 131kB of data before sending, which leads to "outbound + // packet larger than maximum message size" error in webrtc-rs. + framed.set_send_high_water_mark(MAX_DATA_LEN); + framed +} diff --git a/transports/webrtc/src/tokio/substream/state.rs b/transports/webrtc/src/tokio/substream/state.rs new file mode 100644 index 00000000000..6a5340fb1f1 --- /dev/null +++ b/transports/webrtc/src/tokio/substream/state.rs @@ -0,0 +1,510 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; + +use std::io; + +use crate::message_proto::message::Flag; + +#[derive(Debug, Copy, Clone)] +pub enum State { + Open, + ReadClosed, + WriteClosed, + ClosingRead { + /// Whether the write side of our channel was already closed. + write_closed: bool, + inner: Closing, + }, + ClosingWrite { + /// Whether the write side of our channel was already closed. + read_closed: bool, + inner: Closing, + }, + BothClosed { + reset: bool, + }, +} + +/// Represents the state of closing one half (either read or write) of the connection. +/// +/// Gracefully closing the read or write requires sending the `STOP_SENDING` or `FIN` flag respectively +/// and flushing the underlying connection. +#[derive(Debug, Copy, Clone)] +pub enum Closing { + Requested, + MessageSent, +} + +impl State { + /// Performs a state transition for a flag contained in an inbound message. + pub(crate) fn handle_inbound_flag(&mut self, flag: Flag, buffer: &mut Bytes) { + let current = *self; + + match (current, flag) { + (Self::Open, Flag::Fin) => { + *self = Self::ReadClosed; + } + (Self::WriteClosed, Flag::Fin) => { + *self = Self::BothClosed { reset: false }; + } + (Self::Open, Flag::StopSending) => { + *self = Self::WriteClosed; + } + (Self::ReadClosed, Flag::StopSending) => { + *self = Self::BothClosed { reset: false }; + } + (_, Flag::Reset) => { + buffer.clear(); + *self = Self::BothClosed { reset: true }; + } + _ => {} + } + } + + pub(crate) fn write_closed(&mut self) { + match self { + State::ClosingWrite { + read_closed: true, + inner, + } => { + debug_assert!(matches!(inner, Closing::MessageSent)); + + *self = State::BothClosed { reset: false }; + } + State::ClosingWrite { + read_closed: false, + inner, + } => { + debug_assert!(matches!(inner, Closing::MessageSent)); + + *self = State::WriteClosed; + } + State::Open + | State::ReadClosed + | State::WriteClosed + | State::ClosingRead { .. } + | State::BothClosed { .. } => { + unreachable!("bad state machine impl") + } + } + } + + pub(crate) fn close_write_message_sent(&mut self) { + match self { + State::ClosingWrite { inner, read_closed } => { + debug_assert!(matches!(inner, Closing::Requested)); + + *self = State::ClosingWrite { + read_closed: *read_closed, + inner: Closing::MessageSent, + }; + } + State::Open + | State::ReadClosed + | State::WriteClosed + | State::ClosingRead { .. } + | State::BothClosed { .. } => { + unreachable!("bad state machine impl") + } + } + } + + pub(crate) fn read_closed(&mut self) { + match self { + State::ClosingRead { + write_closed: true, + inner, + } => { + debug_assert!(matches!(inner, Closing::MessageSent)); + + *self = State::BothClosed { reset: false }; + } + State::ClosingRead { + write_closed: false, + inner, + } => { + debug_assert!(matches!(inner, Closing::MessageSent)); + + *self = State::ReadClosed; + } + State::Open + | State::ReadClosed + | State::WriteClosed + | State::ClosingWrite { .. } + | State::BothClosed { .. } => { + unreachable!("bad state machine impl") + } + } + } + + pub(crate) fn close_read_message_sent(&mut self) { + match self { + State::ClosingRead { + inner, + write_closed, + } => { + debug_assert!(matches!(inner, Closing::Requested)); + + *self = State::ClosingRead { + write_closed: *write_closed, + inner: Closing::MessageSent, + }; + } + State::Open + | State::ReadClosed + | State::WriteClosed + | State::ClosingWrite { .. } + | State::BothClosed { .. } => { + unreachable!("bad state machine impl") + } + } + } + + /// Whether we should read from the stream in the [`futures::AsyncWrite`] implementation. + /// + /// This is necessary for read-closed streams because we would otherwise not read any more flags from + /// the socket. + pub(crate) fn read_flags_in_async_write(&self) -> bool { + matches!(self, Self::ReadClosed) + } + + /// Acts as a "barrier" for [`futures::AsyncRead::poll_read`]. + pub(crate) fn read_barrier(&self) -> io::Result<()> { + use crate::tokio::substream::State::{Open, ReadClosed, WriteClosed}; + use State::*; + + let kind = match self { + Open + | WriteClosed + | ClosingWrite { + read_closed: false, .. + } => return Ok(()), + ClosingWrite { + read_closed: true, .. + } + | ReadClosed + | ClosingRead { .. } + | BothClosed { reset: false } => io::ErrorKind::BrokenPipe, + BothClosed { reset: true } => io::ErrorKind::ConnectionReset, + }; + + Err(kind.into()) + } + + /// Acts as a "barrier" for [`futures::AsyncWrite::poll_write`]. + pub(crate) fn write_barrier(&self) -> io::Result<()> { + use crate::tokio::substream::State::{Open, ReadClosed, WriteClosed}; + use State::*; + + let kind = match self { + Open + | ReadClosed + | ClosingRead { + write_closed: false, + .. + } => return Ok(()), + ClosingRead { + write_closed: true, .. + } + | WriteClosed + | ClosingWrite { .. } + | BothClosed { reset: false } => io::ErrorKind::BrokenPipe, + BothClosed { reset: true } => io::ErrorKind::ConnectionReset, + }; + + Err(kind.into()) + } + + /// Acts as a "barrier" for [`futures::AsyncWrite::poll_close`]. + pub(crate) fn close_write_barrier(&mut self) -> io::Result> { + loop { + match &self { + State::WriteClosed => return Ok(None), + + State::ClosingWrite { inner, .. } => return Ok(Some(*inner)), + + State::Open => { + *self = Self::ClosingWrite { + read_closed: false, + inner: Closing::Requested, + }; + } + State::ReadClosed => { + *self = Self::ClosingWrite { + read_closed: true, + inner: Closing::Requested, + }; + } + + State::ClosingRead { + write_closed: true, .. + } + | State::BothClosed { reset: false } => { + return Err(io::ErrorKind::BrokenPipe.into()) + } + + State::ClosingRead { + write_closed: false, + .. + } => { + return Err(io::Error::new( + io::ErrorKind::Other, + "cannot close read half while closing write half", + )) + } + + State::BothClosed { reset: true } => { + return Err(io::ErrorKind::ConnectionReset.into()) + } + } + } + } + + /// Acts as a "barrier" for [`Substream::poll_close_read`](super::Substream::poll_close_read). + pub fn close_read_barrier(&mut self) -> io::Result> { + loop { + match self { + State::ReadClosed => return Ok(None), + + State::ClosingRead { inner, .. } => return Ok(Some(*inner)), + + State::Open => { + *self = Self::ClosingRead { + write_closed: false, + inner: Closing::Requested, + }; + } + State::WriteClosed => { + *self = Self::ClosingRead { + write_closed: true, + inner: Closing::Requested, + }; + } + + State::ClosingWrite { + read_closed: true, .. + } + | State::BothClosed { reset: false } => { + return Err(io::ErrorKind::BrokenPipe.into()) + } + + State::ClosingWrite { + read_closed: false, .. + } => { + return Err(io::Error::new( + io::ErrorKind::Other, + "cannot close write half while closing read half", + )) + } + + State::BothClosed { reset: true } => { + return Err(io::ErrorKind::ConnectionReset.into()) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::ErrorKind; + + #[test] + fn cannot_read_after_receiving_fin() { + let mut open = State::Open; + + open.handle_inbound_flag(Flag::Fin, &mut Bytes::default()); + let error = open.read_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe) + } + + #[test] + fn cannot_read_after_closing_read() { + let mut open = State::Open; + + open.close_read_barrier().unwrap(); + open.close_read_message_sent(); + open.read_closed(); + let error = open.read_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe) + } + + #[test] + fn cannot_write_after_receiving_stop_sending() { + let mut open = State::Open; + + open.handle_inbound_flag(Flag::StopSending, &mut Bytes::default()); + let error = open.write_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe) + } + + #[test] + fn cannot_write_after_closing_write() { + let mut open = State::Open; + + open.close_write_barrier().unwrap(); + open.close_write_message_sent(); + open.write_closed(); + let error = open.write_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe) + } + + #[test] + fn everything_broken_after_receiving_reset() { + let mut open = State::Open; + + open.handle_inbound_flag(Flag::Reset, &mut Bytes::default()); + let error1 = open.read_barrier().unwrap_err(); + let error2 = open.write_barrier().unwrap_err(); + let error3 = open.close_write_barrier().unwrap_err(); + let error4 = open.close_read_barrier().unwrap_err(); + + assert_eq!(error1.kind(), ErrorKind::ConnectionReset); + assert_eq!(error2.kind(), ErrorKind::ConnectionReset); + assert_eq!(error3.kind(), ErrorKind::ConnectionReset); + assert_eq!(error4.kind(), ErrorKind::ConnectionReset); + } + + #[test] + fn should_read_flags_in_async_write_after_read_closed() { + let mut open = State::Open; + + open.handle_inbound_flag(Flag::Fin, &mut Bytes::default()); + + assert!(open.read_flags_in_async_write()) + } + + #[test] + fn cannot_read_or_write_after_receiving_fin_and_stop_sending() { + let mut open = State::Open; + + open.handle_inbound_flag(Flag::Fin, &mut Bytes::default()); + open.handle_inbound_flag(Flag::StopSending, &mut Bytes::default()); + + let error1 = open.read_barrier().unwrap_err(); + let error2 = open.write_barrier().unwrap_err(); + + assert_eq!(error1.kind(), ErrorKind::BrokenPipe); + assert_eq!(error2.kind(), ErrorKind::BrokenPipe); + } + + #[test] + fn can_read_after_closing_write() { + let mut open = State::Open; + + open.close_write_barrier().unwrap(); + open.close_write_message_sent(); + open.write_closed(); + + open.read_barrier().unwrap(); + } + + #[test] + fn can_write_after_closing_read() { + let mut open = State::Open; + + open.close_read_barrier().unwrap(); + open.close_read_message_sent(); + open.read_closed(); + + open.write_barrier().unwrap(); + } + + #[test] + fn cannot_write_after_starting_close() { + let mut open = State::Open; + + open.close_write_barrier().expect("to close in open"); + let error = open.write_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe); + } + + #[test] + fn cannot_read_after_starting_close() { + let mut open = State::Open; + + open.close_read_barrier().expect("to close in open"); + let error = open.read_barrier().unwrap_err(); + + assert_eq!(error.kind(), ErrorKind::BrokenPipe); + } + + #[test] + fn can_read_in_open() { + let open = State::Open; + + let result = open.read_barrier(); + + result.unwrap(); + } + + #[test] + fn can_write_in_open() { + let open = State::Open; + + let result = open.write_barrier(); + + result.unwrap(); + } + + #[test] + fn write_close_barrier_returns_ok_when_closed() { + let mut open = State::Open; + + open.close_write_barrier().unwrap(); + open.close_write_message_sent(); + open.write_closed(); + + let maybe = open.close_write_barrier().unwrap(); + + assert!(maybe.is_none()) + } + + #[test] + fn read_close_barrier_returns_ok_when_closed() { + let mut open = State::Open; + + open.close_read_barrier().unwrap(); + open.close_read_message_sent(); + open.read_closed(); + + let maybe = open.close_read_barrier().unwrap(); + + assert!(maybe.is_none()) + } + + #[test] + fn reset_flag_clears_buffer() { + let mut open = State::Open; + let mut buffer = Bytes::copy_from_slice(b"foobar"); + + open.handle_inbound_flag(Flag::Reset, &mut buffer); + + assert!(buffer.is_empty()); + } +} diff --git a/transports/webrtc/src/tokio/transport.rs b/transports/webrtc/src/tokio/transport.rs new file mode 100644 index 00000000000..24839c6030d --- /dev/null +++ b/transports/webrtc/src/tokio/transport.rs @@ -0,0 +1,604 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{future::BoxFuture, prelude::*, ready, stream::SelectAll, stream::Stream}; +use if_watch::{IfEvent, IfWatcher}; +use libp2p_core::{ + identity, + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerId, TransportError, TransportEvent}, + PeerId, +}; +use webrtc::peer_connection::configuration::RTCConfiguration; + +use std::net::IpAddr; +use std::{ + net::SocketAddr, + pin::Pin, + task::{Context, Poll}, +}; + +use crate::tokio::{ + certificate::Certificate, + connection::Connection, + error::Error, + fingerprint::Fingerprint, + udp_mux::{UDPMuxEvent, UDPMuxNewAddr}, + upgrade, +}; + +/// A WebRTC transport with direct p2p communication (without a STUN server). +pub struct Transport { + /// The config which holds this peer's keys and certificate. + config: Config, + /// All the active listeners. + listeners: SelectAll, +} + +impl Transport { + /// Creates a new WebRTC transport. + /// + /// # Example + /// + /// ``` + /// use libp2p_core::identity; + /// use rand::thread_rng; + /// use libp2p_webrtc::tokio::{Transport, Certificate}; + /// + /// let id_keys = identity::Keypair::generate_ed25519(); + /// let transport = Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap()); + /// ``` + pub fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self { + Self { + config: Config::new(id_keys, certificate), + listeners: SelectAll::new(), + } + } +} + +impl libp2p_core::Transport for Transport { + type Output = (PeerId, Connection); + type Error = Error; + type ListenerUpgrade = BoxFuture<'static, Result>; + type Dial = BoxFuture<'static, Result>; + + fn listen_on(&mut self, addr: Multiaddr) -> Result> { + let id = ListenerId::new(); + + let socket_addr = + parse_webrtc_listen_addr(&addr).ok_or(TransportError::MultiaddrNotSupported(addr))?; + let udp_mux = UDPMuxNewAddr::listen_on(socket_addr) + .map_err(|io| TransportError::Other(Error::Io(io)))?; + + self.listeners.push(ListenStream::new( + id, + self.config.clone(), + udp_mux, + IfWatcher::new().map_err(|io| TransportError::Other(Error::Io(io)))?, + )); + + Ok(id) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) { + listener.close(Ok(())); + true + } else { + false + } + } + + /// Poll all listeners. + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.listeners.poll_next_unpin(cx) { + Poll::Ready(Some(ev)) => Poll::Ready(ev), + _ => Poll::Pending, + } + } + + fn dial(&mut self, addr: Multiaddr) -> Result> { + let (sock_addr, server_fingerprint) = parse_webrtc_dial_addr(&addr) + .ok_or_else(|| TransportError::MultiaddrNotSupported(addr.clone()))?; + if sock_addr.port() == 0 || sock_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); + } + + let config = self.config.clone(); + let client_fingerprint = self.config.fingerprint; + let udp_mux = self + .listeners + .iter() + .next() + .ok_or(TransportError::Other(Error::NoListeners))? + .udp_mux + .udp_mux_handle(); + + Ok(async move { + let (peer_id, connection) = upgrade::outbound( + sock_addr, + config.inner, + udp_mux, + client_fingerprint, + server_fingerprint, + config.id_keys, + ) + .await?; + + Ok((peer_id, connection)) + } + .boxed()) + } + + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + // TODO: As the listener of a WebRTC hole punch, we need to send a random UDP packet to the + // `addr`. See DCUtR specification below. + // + // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol + self.dial(addr) + } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + libp2p_core::address_translation(server, observed) + } +} + +/// A stream of incoming connections on one or more interfaces. +struct ListenStream { + /// The ID of this listener. + listener_id: ListenerId, + + /// The socket address that the listening socket is bound to, + /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY` + /// when listening on all interfaces for IPv4 respectively IPv6 connections. + listen_addr: SocketAddr, + + /// The config which holds this peer's certificate(s). + config: Config, + + /// The UDP muxer that manages all ICE connections. + udp_mux: UDPMuxNewAddr, + + /// Set to `Some` if this listener should close. + /// + /// Optionally contains a [`TransportEvent::ListenerClosed`] that should be + /// reported before the listener's stream is terminated. + report_closed: Option::Item>>, + + /// Watcher for network interface changes. + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// become or stop being available. + /// + /// `None` if the socket is only listening on a single interface. + if_watcher: IfWatcher, +} + +impl ListenStream { + /// Constructs a `WebRTCListenStream` for incoming connections. + fn new( + listener_id: ListenerId, + config: Config, + udp_mux: UDPMuxNewAddr, + if_watcher: IfWatcher, + ) -> Self { + ListenStream { + listener_id, + listen_addr: udp_mux.listen_addr(), + config, + udp_mux, + report_closed: None, + if_watcher, + } + } + + /// Report the listener as closed in a [`TransportEvent::ListenerClosed`] and + /// terminate the stream. + fn close(&mut self, reason: Result<(), Error>) { + match self.report_closed { + Some(_) => log::debug!("Listener was already closed."), + None => { + // Report the listener event as closed. + let _ = self + .report_closed + .insert(Some(TransportEvent::ListenerClosed { + listener_id: self.listener_id, + reason, + })); + } + } + } + + fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { + while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) { + match event { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if self.listen_addr.is_ipv4() == ip.is_ipv4() + || self.listen_addr.is_ipv6() == ip.is_ipv6() + { + return Poll::Ready(TransportEvent::NewAddress { + listener_id: self.listener_id, + listen_addr: self + .listen_multiaddress(ip, self.config.id_keys.public().to_peer_id()), + }); + } + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if self.listen_addr.is_ipv4() == ip.is_ipv4() + || self.listen_addr.is_ipv6() == ip.is_ipv6() + { + return Poll::Ready(TransportEvent::AddressExpired { + listener_id: self.listener_id, + listen_addr: self + .listen_multiaddress(ip, self.config.id_keys.public().to_peer_id()), + }); + } + } + Err(err) => { + return Poll::Ready(TransportEvent::ListenerError { + listener_id: self.listener_id, + error: Error::Io(err), + }); + } + } + } + + Poll::Pending + } + + /// Constructs a [`Multiaddr`] for the given IP address that represents our listen address. + fn listen_multiaddress(&self, ip: IpAddr, local_peer_id: PeerId) -> Multiaddr { + let socket_addr = SocketAddr::new(ip, self.listen_addr.port()); + + socketaddr_to_multiaddr(&socket_addr, Some(self.config.fingerprint)) + .with(Protocol::P2p(*local_peer_id.as_ref())) + } +} + +impl Stream for ListenStream { + type Item = TransportEvent<::ListenerUpgrade, Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + loop { + if let Some(closed) = self.report_closed.as_mut() { + // Listener was closed. + // Report the transport event if there is one. On the next iteration, return + // `Poll::Ready(None)` to terminate the stream. + return Poll::Ready(closed.take()); + } + + if let Poll::Ready(event) = self.poll_if_watcher(cx) { + return Poll::Ready(Some(event)); + } + + // Poll UDP muxer for new addresses or incoming data for streams. + match ready!(self.udp_mux.poll(cx)) { + UDPMuxEvent::NewAddr(new_addr) => { + let local_addr = + socketaddr_to_multiaddr(&self.listen_addr, Some(self.config.fingerprint)); + let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr, None); + + let upgrade = upgrade::inbound( + new_addr.addr, + self.config.inner.clone(), + self.udp_mux.udp_mux_handle(), + self.config.fingerprint, + new_addr.ufrag, + self.config.id_keys.clone(), + ) + .boxed(); + + return Poll::Ready(Some(TransportEvent::Incoming { + upgrade, + local_addr, + send_back_addr, + listener_id: self.listener_id, + })); + } + UDPMuxEvent::Error(e) => { + self.close(Err(Error::UDPMux(e))); + } + } + } + } +} + +/// A config which holds peer's keys and a x509Cert used to authenticate WebRTC communications. +#[derive(Clone)] +struct Config { + inner: RTCConfiguration, + fingerprint: Fingerprint, + id_keys: identity::Keypair, +} + +impl Config { + /// Returns a new [`Config`] with the given keys and certificate. + fn new(id_keys: identity::Keypair, certificate: Certificate) -> Self { + let fingerprint = certificate.fingerprint(); + + Self { + id_keys, + inner: RTCConfiguration { + certificates: vec![certificate.to_rtc_certificate()], + ..RTCConfiguration::default() + }, + fingerprint, + } + } +} + +/// Turns an IP address and port into the corresponding WebRTC multiaddr. +fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, certhash: Option) -> Multiaddr { + let addr = Multiaddr::empty() + .with(socket_addr.ip().into()) + .with(Protocol::Udp(socket_addr.port())) + .with(Protocol::WebRTC); + + if let Some(fp) = certhash { + return addr.with(Protocol::Certhash(fp.to_multihash())); + } + + addr +} + +/// Parse the given [`Multiaddr`] into a [`SocketAddr`] for listening. +fn parse_webrtc_listen_addr(addr: &Multiaddr) -> Option { + let mut iter = addr.iter(); + + let ip = match iter.next()? { + Protocol::Ip4(ip) => IpAddr::from(ip), + Protocol::Ip6(ip) => IpAddr::from(ip), + _ => return None, + }; + + let port = iter.next()?; + let webrtc = iter.next()?; + + let port = match (port, webrtc) { + (Protocol::Udp(port), Protocol::WebRTC) => port, + _ => return None, + }; + + if iter.next().is_some() { + return None; + } + + Some(SocketAddr::new(ip, port)) +} + +/// Parse the given [`Multiaddr`] into a [`SocketAddr`] and a [`Fingerprint`] for dialing. +fn parse_webrtc_dial_addr(addr: &Multiaddr) -> Option<(SocketAddr, Fingerprint)> { + let mut iter = addr.iter(); + + let ip = match iter.next()? { + Protocol::Ip4(ip) => IpAddr::from(ip), + Protocol::Ip6(ip) => IpAddr::from(ip), + _ => return None, + }; + + let port = iter.next()?; + let webrtc = iter.next()?; + let certhash = iter.next()?; + + let (port, fingerprint) = match (port, webrtc, certhash) { + (Protocol::Udp(port), Protocol::WebRTC, Protocol::Certhash(cert_hash)) => { + let fingerprint = Fingerprint::try_from_multihash(cert_hash)?; + + (port, fingerprint) + } + _ => return None, + }; + + match iter.next() { + Some(Protocol::P2p(_)) => {} + // peer ID is optional + None => {} + // unexpected protocol + Some(_) => return None, + } + + Some((SocketAddr::new(ip, port), fingerprint)) +} + +// Tests ////////////////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use super::*; + use futures::future::poll_fn; + use libp2p_core::{multiaddr::Protocol, Transport as _}; + use rand::thread_rng; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + + #[test] + fn missing_webrtc_protocol() { + let addr = "/ip4/127.0.0.1/udp/1234".parse().unwrap(); + + let maybe_parsed = parse_webrtc_listen_addr(&addr); + + assert!(maybe_parsed.is_none()); + } + + #[test] + fn parse_valid_address_with_certhash_and_p2p() { + let addr = "/ip4/127.0.0.1/udp/39901/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } + + #[test] + fn peer_id_is_not_required() { + let addr = "/ip4/127.0.0.1/udp/39901/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 39901), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } + + #[test] + fn tcp_is_invalid_protocol() { + let addr = "/ip4/127.0.0.1/tcp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_listen_addr(&addr); + + assert!(maybe_parsed.is_none()); + } + + #[test] + fn cannot_follow_other_protocols_after_certhash() { + let addr = "/ip4/127.0.0.1/udp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/tcp/12345" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_listen_addr(&addr); + + assert!(maybe_parsed.is_none()); + } + + #[test] + fn parse_ipv6() { + let addr = + "/ip6/::1/udp/12345/webrtc/certhash/uEiDikp5KVUgkLta1EjUN-IKbHk-dUBg8VzKgf5nXxLK46w/p2p/12D3KooWNpDk9w6WrEEcdsEH1y47W71S36yFjw4sd3j7omzgCSMS" + .parse() + .unwrap(); + + let maybe_parsed = parse_webrtc_dial_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(( + SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345), + Fingerprint::raw(hex_literal::hex!( + "e2929e4a5548242ed6b512350df8829b1e4f9d50183c5732a07f99d7c4b2b8eb" + )) + )) + ); + } + + #[test] + fn can_parse_valid_addr_without_certhash() { + let addr = "/ip6/::1/udp/12345/webrtc".parse().unwrap(); + + let maybe_parsed = parse_webrtc_listen_addr(&addr); + + assert_eq!( + maybe_parsed, + Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 12345)) + ); + } + + #[test] + fn fails_to_parse_if_certhash_present_but_wrong_hash_function() { + // We only support SHA2-256 for now but this certhash has been encoded with SHA3-256. + let addr = + "/ip6/::1/udp/12345/webrtc/certhash/uFiCH_tkkzpAwkoIDbE4I7QtQksFMYs5nQ4MyYrkgCJYi4A" + .parse() + .unwrap(); + + let maybe_addr = parse_webrtc_listen_addr(&addr); + + assert!(maybe_addr.is_none()) + } + + #[tokio::test] + async fn close_listener() { + let id_keys = identity::Keypair::generate_ed25519(); + let mut transport = + Transport::new(id_keys, Certificate::generate(&mut thread_rng()).unwrap()); + + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + + // Run test twice to check that there is no unexpected behaviour if `QuicTransport.listener` + // is temporarily empty. + for _ in 0..2 { + let listener = transport + .listen_on("/ip4/0.0.0.0/udp/0/webrtc".parse().unwrap()) + .unwrap(); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::NewAddress { + listener_id, + listen_addr, + } => { + assert_eq!(listener_id, listener); + assert!( + matches!(listen_addr.iter().next(), Some(Protocol::Ip4(a)) if !a.is_unspecified()) + ); + assert!( + matches!(listen_addr.iter().nth(1), Some(Protocol::Udp(port)) if port != 0) + ); + assert!(matches!(listen_addr.iter().nth(2), Some(Protocol::WebRTC))); + } + e => panic!("Unexpected event: {:?}", e), + } + assert!( + transport.remove_listener(listener), + "Expect listener to exist." + ); + match poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)).await { + TransportEvent::ListenerClosed { + listener_id, + reason: Ok(()), + } => { + assert_eq!(listener_id, listener); + } + e => panic!("Unexpected event: {:?}", e), + } + // Poll once again so that the listener has the chance to return `Poll::Ready(None)` and + // be removed from the list of listeners. + assert!(poll_fn(|cx| Pin::new(&mut transport).as_mut().poll(cx)) + .now_or_never() + .is_none()); + assert!(transport.listeners.is_empty()); + } + } +} diff --git a/transports/webrtc/src/tokio/udp_mux.rs b/transports/webrtc/src/tokio/udp_mux.rs new file mode 100644 index 00000000000..8a18ec648e6 --- /dev/null +++ b/transports/webrtc/src/tokio/udp_mux.rs @@ -0,0 +1,577 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use async_trait::async_trait; +use futures::{ + channel::oneshot, + future::{BoxFuture, FutureExt, OptionFuture}, + stream::FuturesUnordered, + StreamExt, +}; +use stun::{ + attributes::ATTR_USERNAME, + message::{is_message as is_stun_message, Message as STUNMessage}, +}; +use thiserror::Error; +use tokio::{io::ReadBuf, net::UdpSocket}; +use webrtc::ice::udp_mux::{UDPMux, UDPMuxConn, UDPMuxConnParams, UDPMuxWriter}; +use webrtc::util::{Conn, Error}; + +use std::{ + collections::{HashMap, HashSet}, + io, + io::ErrorKind, + net::SocketAddr, + sync::Arc, + task::{Context, Poll}, +}; + +use crate::tokio::req_res_chan; + +const RECEIVE_MTU: usize = 8192; + +/// A previously unseen address of a remote which has sent us an ICE binding request. +#[derive(Debug)] +pub struct NewAddr { + pub addr: SocketAddr, + pub ufrag: String, +} + +/// An event emitted by [`UDPMuxNewAddr`] when it's polled. +#[derive(Debug)] +pub enum UDPMuxEvent { + /// Connection error. UDP mux should be stopped. + Error(std::io::Error), + /// Got a [`NewAddr`] from the socket. + NewAddr(NewAddr), +} + +/// A modified version of [`webrtc::ice::udp_mux::UDPMuxDefault`]. +/// +/// - It has been rewritten to work without locks and channels instead. +/// - It reports previously unseen addresses instead of ignoring them. +pub struct UDPMuxNewAddr { + udp_sock: UdpSocket, + + listen_addr: SocketAddr, + + /// Maps from ufrag to the underlying connection. + conns: HashMap, + + /// Maps from socket address to the underlying connection. + address_map: HashMap, + + /// Set of the new addresses to avoid sending the same address multiple times. + new_addrs: HashSet, + + /// `true` when UDP mux is closed. + is_closed: bool, + + send_buffer: Option<(Vec, SocketAddr, oneshot::Sender>)>, + + close_futures: FuturesUnordered>, + write_future: OptionFuture>, + + close_command: req_res_chan::Receiver<(), Result<(), Error>>, + get_conn_command: req_res_chan::Receiver, Error>>, + remove_conn_command: req_res_chan::Receiver, + registration_command: req_res_chan::Receiver<(UDPMuxConn, SocketAddr), ()>, + send_command: req_res_chan::Receiver<(Vec, SocketAddr), Result>, + + udp_mux_handle: Arc, + udp_mux_writer_handle: Arc, +} + +impl UDPMuxNewAddr { + pub fn listen_on(addr: SocketAddr) -> Result { + let std_sock = std::net::UdpSocket::bind(addr)?; + std_sock.set_nonblocking(true)?; + + let tokio_socket = UdpSocket::from_std(std_sock)?; + let listen_addr = tokio_socket.local_addr()?; + + let (udp_mux_handle, close_command, get_conn_command, remove_conn_command) = + UdpMuxHandle::new(); + let (udp_mux_writer_handle, registration_command, send_command) = UdpMuxWriterHandle::new(); + + Ok(Self { + udp_sock: tokio_socket, + listen_addr, + conns: HashMap::default(), + address_map: HashMap::default(), + new_addrs: HashSet::default(), + is_closed: false, + send_buffer: None, + close_futures: FuturesUnordered::default(), + write_future: OptionFuture::default(), + close_command, + get_conn_command, + remove_conn_command, + registration_command, + send_command, + udp_mux_handle: Arc::new(udp_mux_handle), + udp_mux_writer_handle: Arc::new(udp_mux_writer_handle), + }) + } + + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn udp_mux_handle(&self) -> Arc { + self.udp_mux_handle.clone() + } + + /// Create a muxed connection for a given ufrag. + fn create_muxed_conn(&self, ufrag: &str) -> Result { + let local_addr = self.udp_sock.local_addr()?; + + let params = UDPMuxConnParams { + local_addr, + key: ufrag.into(), + udp_mux: Arc::downgrade( + &(self.udp_mux_writer_handle.clone() as Arc), + ), + }; + + Ok(UDPMuxConn::new(params)) + } + + /// Returns a muxed connection if the `ufrag` from the given STUN message matches an existing + /// connection. + fn conn_from_stun_message( + &self, + buffer: &[u8], + addr: &SocketAddr, + ) -> Option> { + match ufrag_from_stun_message(buffer, true) { + Ok(ufrag) => { + if let Some(conn) = self.conns.get(&ufrag) { + let associated_addrs = conn.get_addresses(); + // This basically ensures only one address is registered per ufrag. + if associated_addrs.is_empty() || associated_addrs.contains(addr) { + return Some(Ok(conn.clone())); + } else { + return Some(Err(ConnQueryError::UfragAlreadyTaken { associated_addrs })); + } + } + None + } + Err(e) => { + log::debug!("{} (addr={})", e, addr); + None + } + } + } + + /// Reads from the underlying UDP socket and either reports a new address or proxies data to the + /// muxed connection. + pub fn poll(&mut self, cx: &mut Context) -> Poll { + let mut recv_buf = [0u8; RECEIVE_MTU]; + + loop { + // => Send data to target + match self.send_buffer.take() { + None => { + if let Poll::Ready(Some(((buf, target), response))) = + self.send_command.poll_next_unpin(cx) + { + self.send_buffer = Some((buf, target, response)); + continue; + } + } + Some((buf, target, response)) => { + match self.udp_sock.poll_send_to(cx, &buf, target) { + Poll::Ready(result) => { + let _ = response.send(result.map_err(|e| Error::Io(e.into()))); + continue; + } + Poll::Pending => { + self.send_buffer = Some((buf, target, response)); + } + } + } + } + + // => Register a new connection + if let Poll::Ready(Some(((conn, addr), response))) = + self.registration_command.poll_next_unpin(cx) + { + let key = conn.key(); + + self.address_map + .entry(addr) + .and_modify(|e| { + if e.key() != key { + e.remove_address(&addr); + *e = conn.clone(); + } + }) + .or_insert_with(|| conn.clone()); + + // remove addr from new_addrs once conn is established + self.new_addrs.remove(&addr); + + let _ = response.send(()); + + continue; + } + + // => Get connection with the given ufrag + if let Poll::Ready(Some((ufrag, response))) = self.get_conn_command.poll_next_unpin(cx) + { + if self.is_closed { + let _ = response.send(Err(Error::ErrUseClosedNetworkConn)); + continue; + } + + if let Some(conn) = self.conns.get(&ufrag).cloned() { + let _ = response.send(Ok(Arc::new(conn))); + continue; + } + + let muxed_conn = match self.create_muxed_conn(&ufrag) { + Ok(conn) => conn, + Err(e) => { + let _ = response.send(Err(e)); + continue; + } + }; + let mut close_rx = muxed_conn.close_rx(); + + self.close_futures.push({ + let ufrag = ufrag.clone(); + let udp_mux_handle = self.udp_mux_handle.clone(); + + Box::pin(async move { + let _ = close_rx.changed().await; + udp_mux_handle.remove_conn_by_ufrag(&ufrag).await; + }) + }); + + self.conns.insert(ufrag, muxed_conn.clone()); + + let _ = response.send(Ok(Arc::new(muxed_conn) as Arc)); + + continue; + } + + // => Close UDPMux + if let Poll::Ready(Some(((), response))) = self.close_command.poll_next_unpin(cx) { + if self.is_closed { + let _ = response.send(Err(Error::ErrAlreadyClosed)); + continue; + } + + for (_, conn) in self.conns.drain() { + conn.close(); + } + + // NOTE: This is important, we need to drop all instances of `UDPMuxConn` to + // avoid a retain cycle due to the use of [`std::sync::Arc`] on both sides. + self.address_map.clear(); + + // NOTE: This is important, we need to drop all instances of `UDPMuxConn` to + // avoid a retain cycle due to the use of [`std::sync::Arc`] on both sides. + self.new_addrs.clear(); + + let _ = response.send(Ok(())); + + self.is_closed = true; + + continue; + } + + // => Remove connection with the given ufrag + if let Poll::Ready(Some((ufrag, response))) = + self.remove_conn_command.poll_next_unpin(cx) + { + // Pion's ice implementation has both `RemoveConnByFrag` and `RemoveConn`, but since `conns` + // is keyed on `ufrag` their implementation is equivalent. + + if let Some(removed_conn) = self.conns.remove(&ufrag) { + for address in removed_conn.get_addresses() { + self.address_map.remove(&address); + } + } + + let _ = response.send(()); + + continue; + } + + // => Remove closed connections + let _ = self.close_futures.poll_next_unpin(cx); + + // => Write previously received data to local connections + match self.write_future.poll_unpin(cx) { + Poll::Ready(Some(())) => { + self.write_future = OptionFuture::default(); + continue; + } + Poll::Ready(None) => { + // => Read from the socket + let mut read = ReadBuf::new(&mut recv_buf); + + match self.udp_sock.poll_recv_from(cx, &mut read) { + Poll::Ready(Ok(addr)) => { + // Find connection based on previously having seen this source address + let conn = self.address_map.get(&addr); + + let conn = match conn { + // If we couldn't find the connection based on source address, see if + // this is a STUN mesage and if so if we can find the connection based on ufrag. + None if is_stun_message(read.filled()) => { + match self.conn_from_stun_message(read.filled(), &addr) { + Some(Ok(s)) => Some(s), + Some(Err(e)) => { + log::debug!("addr={}: Error when querying existing connections: {}", &addr, e); + continue; + } + None => None, + } + } + Some(s) => Some(s.to_owned()), + _ => None, + }; + + match conn { + None => { + if !self.new_addrs.contains(&addr) { + match ufrag_from_stun_message(read.filled(), false) { + Ok(ufrag) => { + log::trace!( + "Notifying about new address addr={} from ufrag={}", + &addr, + ufrag + ); + self.new_addrs.insert(addr); + return Poll::Ready(UDPMuxEvent::NewAddr( + NewAddr { addr, ufrag }, + )); + } + Err(e) => { + log::debug!( + "Unknown address addr={} (non STUN packet: {})", + &addr, + e + ); + } + } + } + } + Some(conn) => { + let mut packet = vec![0u8; read.filled().len()]; + packet.copy_from_slice(read.filled()); + self.write_future = OptionFuture::from(Some( + async move { + if let Err(err) = conn.write_packet(&packet, addr).await + { + log::error!( + "Failed to write packet: {} (addr={})", + err, + addr + ); + } + } + .boxed(), + )); + } + } + + continue; + } + Poll::Ready(Err(err)) if err.kind() == ErrorKind::TimedOut => {} + Poll::Pending => {} + Poll::Ready(Err(err)) => { + log::error!("Could not read udp packet: {}", err); + return Poll::Ready(UDPMuxEvent::Error(err)); + } + } + } + Poll::Pending => {} + } + + return Poll::Pending; + } + } +} + +/// Handle which utilizes [`req_res_chan`] to transmit commands (e.g. remove connection) from the +/// WebRTC ICE agent to [`UDPMuxNewAddr::poll`]. +pub struct UdpMuxHandle { + close_sender: req_res_chan::Sender<(), Result<(), Error>>, + get_conn_sender: req_res_chan::Sender, Error>>, + remove_sender: req_res_chan::Sender, +} + +impl UdpMuxHandle { + /// Returns a new `UdpMuxHandle` and `close`, `get_conn` and `remove` receivers. + pub fn new() -> ( + Self, + req_res_chan::Receiver<(), Result<(), Error>>, + req_res_chan::Receiver, Error>>, + req_res_chan::Receiver, + ) { + let (sender1, receiver1) = req_res_chan::new(1); + let (sender2, receiver2) = req_res_chan::new(1); + let (sender3, receiver3) = req_res_chan::new(1); + + let this = Self { + close_sender: sender1, + get_conn_sender: sender2, + remove_sender: sender3, + }; + + (this, receiver1, receiver2, receiver3) + } +} + +#[async_trait] +impl UDPMux for UdpMuxHandle { + async fn close(&self) -> Result<(), Error> { + self.close_sender + .send(()) + .await + .map_err(|e| Error::Io(e.into()))??; + + Ok(()) + } + + async fn get_conn(self: Arc, ufrag: &str) -> Result, Error> { + let conn = self + .get_conn_sender + .send(ufrag.to_owned()) + .await + .map_err(|e| Error::Io(e.into()))??; + + Ok(conn) + } + + async fn remove_conn_by_ufrag(&self, ufrag: &str) { + if let Err(e) = self.remove_sender.send(ufrag.to_owned()).await { + log::debug!("Failed to send message through channel: {:?}", e); + } + } +} + +/// Handle which utilizes [`req_res_chan`] to transmit commands from [`UDPMuxConn`] connections to +/// [`UDPMuxNewAddr::poll`]. +pub struct UdpMuxWriterHandle { + registration_channel: req_res_chan::Sender<(UDPMuxConn, SocketAddr), ()>, + send_channel: req_res_chan::Sender<(Vec, SocketAddr), Result>, +} + +impl UdpMuxWriterHandle { + /// Returns a new `UdpMuxWriterHandle` and `registration`, `send` receivers. + fn new() -> ( + Self, + req_res_chan::Receiver<(UDPMuxConn, SocketAddr), ()>, + req_res_chan::Receiver<(Vec, SocketAddr), Result>, + ) { + let (sender1, receiver1) = req_res_chan::new(1); + let (sender2, receiver2) = req_res_chan::new(1); + + let this = Self { + registration_channel: sender1, + send_channel: sender2, + }; + + (this, receiver1, receiver2) + } +} + +#[async_trait] +impl UDPMuxWriter for UdpMuxWriterHandle { + async fn register_conn_for_address(&self, conn: &UDPMuxConn, addr: SocketAddr) { + match self + .registration_channel + .send((conn.to_owned(), addr)) + .await + { + Ok(()) => {} + Err(e) => { + log::debug!("Failed to send message through channel: {:?}", e); + return; + } + } + + log::debug!("Registered {} for {}", addr, conn.key()); + } + + async fn send_to(&self, buf: &[u8], target: &SocketAddr) -> Result { + let bytes_written = self + .send_channel + .send((buf.to_owned(), target.to_owned())) + .await + .map_err(|e| Error::Io(e.into()))??; + + Ok(bytes_written) + } +} + +/// Gets the ufrag from the given STUN message or returns an error, if failed to decode or the +/// username attribute is not present. +fn ufrag_from_stun_message(buffer: &[u8], local_ufrag: bool) -> Result { + let (result, message) = { + let mut m = STUNMessage::new(); + + (m.unmarshal_binary(buffer), m) + }; + + if let Err(err) = result { + Err(Error::Other(format!( + "failed to handle decode ICE: {}", + err + ))) + } else { + let (attr, found) = message.attributes.get(ATTR_USERNAME); + if !found { + return Err(Error::Other("no username attribute in STUN message".into())); + } + + match String::from_utf8(attr.value) { + // Per the RFC this shouldn't happen + // https://datatracker.ietf.org/doc/html/rfc5389#section-15.3 + Err(err) => Err(Error::Other(format!( + "failed to decode USERNAME from STUN message as UTF-8: {}", + err + ))), + Ok(s) => { + // s is a combination of the local_ufrag and the remote ufrag separated by `:`. + let res = if local_ufrag { + s.split(':').next() + } else { + s.split(':').last() + }; + match res { + Some(s) => Ok(s.to_owned()), + None => Err(Error::Other("can't get ufrag from username".into())), + } + } + } + } +} + +#[derive(Error, Debug)] +enum ConnQueryError { + #[error("ufrag is already taken (associated_addrs={associated_addrs:?})")] + UfragAlreadyTaken { associated_addrs: Vec }, +} diff --git a/transports/webrtc/src/tokio/upgrade.rs b/transports/webrtc/src/tokio/upgrade.rs new file mode 100644 index 00000000000..9375d58417c --- /dev/null +++ b/transports/webrtc/src/tokio/upgrade.rs @@ -0,0 +1,240 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod noise; + +use futures::channel::oneshot; +use futures::future::Either; +use futures_timer::Delay; +use libp2p_core::{identity, PeerId}; +use rand::distributions::Alphanumeric; +use rand::{thread_rng, Rng}; +use webrtc::api::setting_engine::SettingEngine; +use webrtc::api::APIBuilder; +use webrtc::data::data_channel::DataChannel; +use webrtc::data_channel::data_channel_init::RTCDataChannelInit; +use webrtc::dtls_transport::dtls_role::DTLSRole; +use webrtc::ice::network_type::NetworkType; +use webrtc::ice::udp_mux::UDPMux; +use webrtc::ice::udp_network::UDPNetwork; +use webrtc::peer_connection::configuration::RTCConfiguration; +use webrtc::peer_connection::RTCPeerConnection; + +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +use crate::tokio::{error::Error, fingerprint::Fingerprint, sdp, substream::Substream, Connection}; + +/// Creates a new outbound WebRTC connection. +pub async fn outbound( + addr: SocketAddr, + config: RTCConfiguration, + udp_mux: Arc, + client_fingerprint: Fingerprint, + server_fingerprint: Fingerprint, + id_keys: identity::Keypair, +) -> Result<(PeerId, Connection), Error> { + log::debug!("new outbound connection to {addr})"); + + let (peer_connection, ufrag) = new_outbound_connection(addr, config, udp_mux).await?; + + let offer = peer_connection.create_offer(None).await?; + log::debug!("created SDP offer for outbound connection: {:?}", offer.sdp); + peer_connection.set_local_description(offer).await?; + + let answer = sdp::answer(addr, &server_fingerprint, &ufrag); + log::debug!( + "calculated SDP answer for outbound connection: {:?}", + answer + ); + peer_connection.set_remote_description(answer).await?; // This will start the gathering of ICE candidates. + + let data_channel = create_substream_for_noise_handshake(&peer_connection).await?; + let peer_id = noise::outbound( + id_keys, + data_channel, + server_fingerprint, + client_fingerprint, + ) + .await?; + + Ok((peer_id, Connection::new(peer_connection).await)) +} + +/// Creates a new inbound WebRTC connection. +pub async fn inbound( + addr: SocketAddr, + config: RTCConfiguration, + udp_mux: Arc, + server_fingerprint: Fingerprint, + remote_ufrag: String, + id_keys: identity::Keypair, +) -> Result<(PeerId, Connection), Error> { + log::debug!("new inbound connection from {addr} (ufrag: {remote_ufrag})"); + + let peer_connection = new_inbound_connection(addr, config, udp_mux, &remote_ufrag).await?; + + let offer = sdp::offer(addr, &remote_ufrag); + log::debug!("calculated SDP offer for inbound connection: {:?}", offer); + peer_connection.set_remote_description(offer).await?; + + let answer = peer_connection.create_answer(None).await?; + log::debug!("created SDP answer for inbound connection: {:?}", answer); + peer_connection.set_local_description(answer).await?; // This will start the gathering of ICE candidates. + + let data_channel = create_substream_for_noise_handshake(&peer_connection).await?; + let client_fingerprint = get_remote_fingerprint(&peer_connection).await; + let peer_id = noise::inbound( + id_keys, + data_channel, + client_fingerprint, + server_fingerprint, + ) + .await?; + + Ok((peer_id, Connection::new(peer_connection).await)) +} + +async fn new_outbound_connection( + addr: SocketAddr, + config: RTCConfiguration, + udp_mux: Arc, +) -> Result<(RTCPeerConnection, String), Error> { + let ufrag = random_ufrag(); + let se = setting_engine(udp_mux, &ufrag, addr); + + let connection = APIBuilder::new() + .with_setting_engine(se) + .build() + .new_peer_connection(config) + .await?; + + Ok((connection, ufrag)) +} + +async fn new_inbound_connection( + addr: SocketAddr, + config: RTCConfiguration, + udp_mux: Arc, + ufrag: &str, +) -> Result { + let mut se = setting_engine(udp_mux, ufrag, addr); + { + se.set_lite(true); + se.disable_certificate_fingerprint_verification(true); + // Act as a DTLS server (one which waits for a connection). + // + // NOTE: removing this seems to break DTLS setup (both sides send `ClientHello` messages, + // but none end up responding). + se.set_answering_dtls_role(DTLSRole::Server)?; + } + + let connection = APIBuilder::new() + .with_setting_engine(se) + .build() + .new_peer_connection(config) + .await?; + + Ok(connection) +} + +/// Generates a random ufrag and adds a prefix according to the spec. +fn random_ufrag() -> String { + format!( + "libp2p+webrtc+v1/{}", + thread_rng() + .sample_iter(&Alphanumeric) + .take(64) + .map(char::from) + .collect::() + ) +} + +fn setting_engine( + udp_mux: Arc, + ufrag: &str, + addr: SocketAddr, +) -> SettingEngine { + let mut se = SettingEngine::default(); + + // Set both ICE user and password to our fingerprint because that's what the client is + // expecting.. + se.set_ice_credentials(ufrag.to_owned(), ufrag.to_owned()); + + se.set_udp_network(UDPNetwork::Muxed(udp_mux.clone())); + + // Allow detaching data channels. + se.detach_data_channels(); + + // Set the desired network type. + // + // NOTE: if not set, a [`webrtc_ice::agent::Agent`] might pick a wrong local candidate + // (e.g. IPv6 `[::1]` while dialing an IPv4 `10.11.12.13`). + let network_type = match addr { + SocketAddr::V4(_) => NetworkType::Udp4, + SocketAddr::V6(_) => NetworkType::Udp6, + }; + se.set_network_types(vec![network_type]); + + se +} + +/// Returns the SHA-256 fingerprint of the remote. +async fn get_remote_fingerprint(conn: &RTCPeerConnection) -> Fingerprint { + let cert_bytes = conn.sctp().transport().get_remote_certificate().await; + + Fingerprint::from_certificate(&cert_bytes) +} + +async fn create_substream_for_noise_handshake( + conn: &RTCPeerConnection, +) -> Result { + // NOTE: the data channel w/ `negotiated` flag set to `true` MUST be created on both ends. + let data_channel = conn + .create_data_channel( + "", + Some(RTCDataChannelInit { + negotiated: Some(0), // 0 is reserved for the Noise substream + ..RTCDataChannelInit::default() + }), + ) + .await?; + + let (tx, rx) = oneshot::channel::>(); + + // Wait until the data channel is opened and detach it. + crate::tokio::connection::register_data_channel_open_handler(data_channel, tx).await; + + let channel = match futures::future::select(rx, Delay::new(Duration::from_secs(10))).await { + Either::Left((Ok(channel), _)) => channel, + Either::Left((Err(_), _)) => { + return Err(Error::Internal("failed to open data channel".to_owned())) + } + Either::Right(((), _)) => { + return Err(Error::Internal( + "data channel opening took longer than 10 seconds (see logs)".into(), + )) + } + }; + + let (substream, drop_listener) = Substream::new(channel); + drop(drop_listener); // Don't care about cancelled substreams during initial handshake. + + Ok(substream) +} diff --git a/transports/webrtc/src/tokio/upgrade/noise.rs b/transports/webrtc/src/tokio/upgrade/noise.rs new file mode 100644 index 00000000000..94566d008f0 --- /dev/null +++ b/transports/webrtc/src/tokio/upgrade/noise.rs @@ -0,0 +1,113 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use libp2p_core::{identity, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; +use libp2p_noise::{Keypair, NoiseConfig, X25519Spec}; + +use crate::tokio::fingerprint::Fingerprint; +use crate::tokio::Error; + +pub async fn inbound( + id_keys: identity::Keypair, + stream: T, + client_fingerprint: Fingerprint, + server_fingerprint: Fingerprint, +) -> Result +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let dh_keys = Keypair::::new() + .into_authentic(&id_keys) + .unwrap(); + let noise = NoiseConfig::xx(dh_keys) + .with_prologue(noise_prologue(client_fingerprint, server_fingerprint)); + let info = noise.protocol_info().next().unwrap(); + // Note the roles are reversed because it allows the server (webrtc connection responder) to + // send application data 0.5 RTT earlier. + let (peer_id, mut channel) = noise + .into_authenticated() + .upgrade_outbound(stream, info) + .await?; + + channel.close().await?; + + Ok(peer_id) +} + +pub async fn outbound( + id_keys: identity::Keypair, + stream: T, + server_fingerprint: Fingerprint, + client_fingerprint: Fingerprint, +) -> Result +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + let dh_keys = Keypair::::new() + .into_authentic(&id_keys) + .unwrap(); + let noise = NoiseConfig::xx(dh_keys) + .with_prologue(noise_prologue(client_fingerprint, server_fingerprint)); + let info = noise.protocol_info().next().unwrap(); + // Note the roles are reversed because it allows the server (webrtc connection responder) to + // send application data 0.5 RTT earlier. + let (peer_id, mut channel) = noise + .into_authenticated() + .upgrade_inbound(stream, info) + .await?; + + channel.close().await?; + + Ok(peer_id) +} + +pub fn noise_prologue(client_fingerprint: Fingerprint, server_fingerprint: Fingerprint) -> Vec { + let client = client_fingerprint.to_multihash().to_bytes(); + let server = server_fingerprint.to_multihash().to_bytes(); + const PREFIX: &[u8] = b"libp2p-webrtc-noise:"; + let mut out = Vec::with_capacity(PREFIX.len() + client.len() + server.len()); + out.extend_from_slice(PREFIX); + out.extend_from_slice(&client); + out.extend_from_slice(&server); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use hex_literal::hex; + + #[test] + fn noise_prologue_tests() { + let a = Fingerprint::raw(hex!( + "3e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870" + )); + let b = Fingerprint::raw(hex!( + "30fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99" + )); + + let prologue1 = noise_prologue(a, b); + let prologue2 = noise_prologue(b, a); + + assert_eq!(hex::encode(&prologue1), "6c69627032702d7765627274632d6e6f6973653a12203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b99"); + assert_eq!(hex::encode(&prologue2), "6c69627032702d7765627274632d6e6f6973653a122030fc9f469c207419dfdd0aab5f27a86c973c94e40548db9375cca2e915973b9912203e79af40d6059617a0d83b83a52ce73b0c1f37a72c6043ad2969e2351bdca870"); + } +} diff --git a/transports/webrtc/tests/smoke.rs b/transports/webrtc/tests/smoke.rs new file mode 100644 index 00000000000..c15d9f9aa1e --- /dev/null +++ b/transports/webrtc/tests/smoke.rs @@ -0,0 +1,486 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::Result; +use async_trait::async_trait; +use futures::{ + future::{select, Either, FutureExt}, + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, + stream::StreamExt, +}; +use libp2p::core::{identity, muxing::StreamMuxerBox, upgrade, Transport as _}; +use libp2p::request_response::{ + ProtocolName, ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig, + RequestResponseEvent, RequestResponseMessage, +}; +use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::webrtc::tokio as webrtc; +use rand::{thread_rng, RngCore}; + +use std::{io, iter}; + +#[tokio::test] +async fn smoke() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let mut rng = rand::thread_rng(); + + let mut a = create_swarm()?; + let mut b = create_swarm()?; + + Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; + Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; + + let addr = match a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + // skip other interface addresses + while a.next().now_or_never().is_some() {} + + let _ = match b.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + // skip other interface addresses + while b.next().now_or_never().is_some() {} + + let mut data = vec![0; 4096]; + rng.fill_bytes(&mut data); + + b.behaviour_mut() + .add_address(Swarm::local_peer_id(&a), addr); + b.behaviour_mut() + .send_request(Swarm::local_peer_id(&a), Ping(data.clone())); + + match b.next().await { + Some(SwarmEvent::Dialing(_)) => {} + e => panic!("{:?}", e), + } + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Left((Some(SwarmEvent::IncomingConnection { .. }), _)) => {} + Either::Left((e, _)) => panic!("{:?}", e), + Either::Right(_) => panic!("b completed first"), + } + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} + Either::Left((e, _)) => panic!("{:?}", e), + Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} + Either::Right((e, _)) => panic!("{:?}", e), + } + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Left((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} + Either::Left((e, _)) => panic!("{:?}", e), + Either::Right((Some(SwarmEvent::ConnectionEstablished { .. }), _)) => {} + Either::Right((e, _)) => panic!("{:?}", e), + } + + assert!(b.next().now_or_never().is_none()); + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Left(( + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(ping), + channel, + .. + }, + .. + })), + _, + )) => { + a.behaviour_mut() + .send_response(channel, Pong(ping)) + .unwrap(); + } + Either::Left((e, _)) => panic!("{:?}", e), + Either::Right(_) => panic!("b completed first"), + } + + match a.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} + e => panic!("{:?}", e), + } + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Right(( + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(pong), + .. + }, + .. + })), + _, + )) => assert_eq!(data, pong), + Either::Right((e, _)) => panic!("{:?}", e), + Either::Left(_) => panic!("a completed first"), + } + + a.behaviour_mut().send_request( + Swarm::local_peer_id(&b), + Ping(b"another substream".to_vec()), + ); + + assert!(a.next().now_or_never().is_none()); + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Right(( + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(data), + channel, + .. + }, + .. + })), + _, + )) => { + b.behaviour_mut() + .send_response(channel, Pong(data)) + .unwrap(); + } + Either::Right((e, _)) => panic!("{:?}", e), + Either::Left(_) => panic!("a completed first"), + } + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => {} + e => panic!("{:?}", e), + } + + let pair = select(a.next(), b.next()); + match pair.await { + Either::Left(( + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(data), + .. + }, + .. + })), + _, + )) => assert_eq!(data, b"another substream".to_vec()), + Either::Left((e, _)) => panic!("{:?}", e), + Either::Right(_) => panic!("b completed first"), + } + + Ok(()) +} + +#[tokio::test] +async fn dial_failure() -> Result<()> { + let _ = env_logger::builder().is_test(true).try_init(); + + let mut a = create_swarm()?; + let mut b = create_swarm()?; + + Swarm::listen_on(&mut a, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; + Swarm::listen_on(&mut b, "/ip4/127.0.0.1/udp/0/webrtc".parse()?)?; + + let addr = match a.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + // skip other interface addresses + while a.next().now_or_never().is_some() {} + + let _ = match b.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + // skip other interface addresses + while b.next().now_or_never().is_some() {} + + let a_peer_id = &Swarm::local_peer_id(&a).clone(); + drop(a); // stop a swarm so b can never reach it + + b.behaviour_mut().add_address(a_peer_id, addr); + b.behaviour_mut() + .send_request(a_peer_id, Ping(b"hello world".to_vec())); + + match b.next().await { + Some(SwarmEvent::Dialing(_)) => {} + e => panic!("{:?}", e), + } + + match b.next().await { + Some(SwarmEvent::OutgoingConnectionError { .. }) => {} + e => panic!("{:?}", e), + }; + + match b.next().await { + Some(SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { .. })) => {} + e => panic!("{:?}", e), + }; + + Ok(()) +} + +#[tokio::test] +async fn concurrent_connections_and_streams() { + let _ = env_logger::builder().is_test(true).try_init(); + + let num_listeners = 3usize; + let num_streams = 8usize; + + let mut data = vec![0; 4096]; + rand::thread_rng().fill_bytes(&mut data); + let mut listeners = vec![]; + + // Spawn the listener nodes. + for _ in 0..num_listeners { + let mut listener = create_swarm().unwrap(); + Swarm::listen_on( + &mut listener, + "/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap(), + ) + .unwrap(); + + // Wait to listen on address. + let addr = match listener.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + listeners.push((*listener.local_peer_id(), addr)); + + tokio::spawn(async move { + loop { + match listener.next().await { + Some(SwarmEvent::IncomingConnection { .. }) => { + log::debug!("listener IncomingConnection"); + } + Some(SwarmEvent::ConnectionEstablished { .. }) => { + log::debug!("listener ConnectionEstablished"); + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Request { + request: Ping(ping), + channel, + .. + }, + .. + })) => { + log::debug!("listener got Message"); + listener + .behaviour_mut() + .send_response(channel, Pong(ping)) + .unwrap(); + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { .. })) => { + log::debug!("listener ResponseSent"); + } + Some(SwarmEvent::ConnectionClosed { .. }) => {} + Some(SwarmEvent::NewListenAddr { .. }) => { + log::debug!("listener NewListenAddr"); + } + Some(e) => { + panic!("unexpected event {:?}", e); + } + None => { + panic!("listener stopped"); + } + } + } + }); + } + + let mut dialer = create_swarm().unwrap(); + Swarm::listen_on(&mut dialer, "/ip4/127.0.0.1/udp/0/webrtc".parse().unwrap()).unwrap(); + + // Wait to listen on address. + match dialer.next().await { + Some(SwarmEvent::NewListenAddr { address, .. }) => address, + e => panic!("{:?}", e), + }; + + // For each listener node start `number_streams` requests. + for (listener_peer_id, listener_addr) in &listeners { + dialer + .behaviour_mut() + .add_address(listener_peer_id, listener_addr.clone()); + + dialer.dial(*listener_peer_id).unwrap(); + } + + // Wait for responses to each request. + let mut num_responses = 0; + loop { + match dialer.next().await { + Some(SwarmEvent::Dialing(_)) => { + log::debug!("dialer Dialing"); + } + Some(SwarmEvent::ConnectionEstablished { peer_id, .. }) => { + log::debug!("dialer Connection established"); + for _ in 0..num_streams { + dialer + .behaviour_mut() + .send_request(&peer_id, Ping(data.clone())); + } + } + Some(SwarmEvent::Behaviour(RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + response: Pong(pong), + .. + }, + .. + })) => { + log::debug!("dialer got Message"); + num_responses += 1; + assert_eq!(data, pong); + let should_be = num_listeners * num_streams; + log::debug!( + "num of responses: {}, num of listeners * num of streams: {}", + num_responses, + should_be + ); + if num_responses == should_be { + break; + } + } + Some(SwarmEvent::ConnectionClosed { .. }) => { + log::debug!("dialer ConnectionClosed"); + } + Some(SwarmEvent::NewListenAddr { .. }) => { + log::debug!("dialer NewListenAddr"); + } + e => { + panic!("unexpected event {:?}", e); + } + } + } +} + +#[derive(Debug, Clone)] +struct PingProtocol(); + +#[derive(Clone)] +struct PingCodec(); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct Ping(Vec); + +#[derive(Debug, Clone, PartialEq, Eq)] +struct Pong(Vec); + +impl ProtocolName for PingProtocol { + fn protocol_name(&self) -> &[u8] { + "/ping/1".as_bytes() + } +} + +#[async_trait] +impl RequestResponseCodec for PingCodec { + type Protocol = PingProtocol; + type Request = Ping; + type Response = Pong; + + async fn read_request(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + upgrade::read_length_prefixed(io, 4096) + .map(|res| match res { + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), + Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), + Ok(vec) => Ok(Ping(vec)), + }) + .await + } + + async fn read_response(&mut self, _: &PingProtocol, io: &mut T) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + upgrade::read_length_prefixed(io, 4096) + .map(|res| match res { + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), + Ok(vec) if vec.is_empty() => Err(io::ErrorKind::UnexpectedEof.into()), + Ok(vec) => Ok(Pong(vec)), + }) + .await + } + + async fn write_request( + &mut self, + _: &PingProtocol, + io: &mut T, + Ping(data): Ping, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + upgrade::write_length_prefixed(io, data).await?; + io.close().await?; + Ok(()) + } + + async fn write_response( + &mut self, + _: &PingProtocol, + io: &mut T, + Pong(data): Pong, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + upgrade::write_length_prefixed(io, data).await?; + io.close().await?; + Ok(()) + } +} + +fn create_swarm() -> Result>> { + let id_keys = identity::Keypair::generate_ed25519(); + let peer_id = id_keys.public().to_peer_id(); + let transport = webrtc::Transport::new( + id_keys, + webrtc::Certificate::generate(&mut thread_rng()).unwrap(), + ); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + let behaviour = RequestResponse::new(PingCodec(), protocols, cfg); + let transport = transport + .map(|(peer_id, conn), _| (peer_id, StreamMuxerBox::new(conn))) + .boxed(); + + Ok(Swarm::with_tokio_executor(transport, behaviour, peer_id)) +}