diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 007e00e7710..884ead0d96e 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -419,9 +419,9 @@ ## 0.24.0 [2020-11-09] -- Remove `ConnectionInfo` trait and replace it with `PeerId` +- Remove `Connection` trait and replace it with `PeerId` everywhere. This was already effectively the case because - `ConnectionInfo` was implemented on `PeerId`. + `Connection` was implemented on `PeerId`. ## 0.23.1 [2020-10-20] diff --git a/core/src/connection.rs b/core/src/connection.rs index 7a339b0661b..ee90aed1393 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -20,6 +20,20 @@ use crate::multiaddr::{Multiaddr, Protocol}; +pub trait Connection { + /// Should be `Some(_)` if transport itself handles authentication. + fn remote_peer_id(&self) -> Option { + None + } +} + +impl Connection for multistream_select::Negotiated +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner_completed_io().and_then(|io| io.remote_peer_id()) + } +} + /// The endpoint roles associated with a peer-to-peer communication channel. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Endpoint { diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index 941b60765ca..8565b99028e 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -51,6 +51,15 @@ pub struct Negotiated { state: State, } +impl Negotiated { + pub fn inner_completed_io(&self) -> Option<&TInner> { + match &self.state { + State::Completed { io } => Some(io), + _ => None + } + } +} + /// A `Future` that waits on the completion of protocol negotiation. #[derive(Debug)] pub struct NegotiatedComplete { diff --git a/transports/pnet/Cargo.toml b/transports/pnet/Cargo.toml index 97e13767ed1..a12a88e9b7b 100644 --- a/transports/pnet/Cargo.toml +++ b/transports/pnet/Cargo.toml @@ -29,7 +29,7 @@ libp2p-yamux = { workspace = true } quickcheck = { workspace = true } tokio = { version = "1.33.0", features = ["full"] } -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/pnet/src/crypt_writer.rs b/transports/pnet/src/crypt_writer.rs index c5993548239..f4288a697b8 100644 --- a/transports/pnet/src/crypt_writer.rs +++ b/transports/pnet/src/crypt_writer.rs @@ -28,6 +28,8 @@ use pin_project::pin_project; use salsa20::{cipher::StreamCipher, XSalsa20}; use std::{fmt, pin::Pin}; +use libp2p_core::Connection; + /// A writer that encrypts and forwards to an inner writer #[pin_project] pub(crate) struct CryptWriter { @@ -37,6 +39,13 @@ pub(crate) struct CryptWriter { cipher: XSalsa20, } +impl Connection for CryptWriter +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner.remote_peer_id() + } +} + impl CryptWriter { /// Creates a new `CryptWriter` with the specified buffer capacity. pub(crate) fn with_capacity(capacity: usize, inner: W, cipher: XSalsa20) -> CryptWriter { diff --git a/transports/pnet/src/lib.rs b/transports/pnet/src/lib.rs index d8aac22eecd..e5aa3f0850a 100644 --- a/transports/pnet/src/lib.rs +++ b/transports/pnet/src/lib.rs @@ -48,6 +48,8 @@ use std::{ task::{Context, Poll}, }; +use libp2p_core::Connection; + const KEY_SIZE: usize = 32; const NONCE_SIZE: usize = 24; const WRITE_BUFFER_SIZE: usize = 1024; @@ -239,6 +241,13 @@ pub struct PnetOutput { read_cipher: XSalsa20, } +impl Connection for PnetOutput +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner.remote_peer_id() + } +} + impl PnetOutput { fn new(inner: S, write_cipher: XSalsa20, read_cipher: XSalsa20) -> Self { Self { diff --git a/transports/wasm-ext/src/dtls.rs b/transports/wasm-ext/src/dtls.rs new file mode 100644 index 00000000000..5f9da85d1db --- /dev/null +++ b/transports/wasm-ext/src/dtls.rs @@ -0,0 +1,68 @@ +use libp2p_core::{Connection, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; + +#[derive(Debug, Clone)] +pub struct NonDTLSConnectionError; + +impl std::fmt::Display for NonDTLSConnectionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Tried to authenticate using `DTLSAuthenticated` for non DTLS transport connection!") + } +} + +impl std::error::Error for NonDTLSConnectionError { +} + +#[derive(Clone)] +pub struct DTLSAuthenticated; + + +impl DTLSAuthenticated { + pub fn new() -> Self { + Self {} + } +} + +impl UpgradeInfo for DTLSAuthenticated { + type Info = &'static str; + // type InfoIter = std::iter::Once; + type InfoIter = std::iter::Empty; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::empty() + // std::iter::once("/dtls") + } +} + +impl InboundUpgrade for DTLSAuthenticated +where + C: Connection, +{ + type Output = (PeerId, C); + type Error = NonDTLSConnectionError; + type Future = std::future::Ready>; + + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + let peer_id_result = socket + .remote_peer_id() + .map(|id| (id, socket)) + .ok_or(NonDTLSConnectionError); + std::future::ready(peer_id_result) + } +} + +impl OutboundUpgrade for DTLSAuthenticated +where + C: Connection, +{ + type Output = (PeerId, C); + type Error = NonDTLSConnectionError; + type Future = std::future::Ready>; + + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + let peer_id_result = socket + .remote_peer_id() + .map(|id| (id, socket)) + .ok_or(NonDTLSConnectionError); + std::future::ready(peer_id_result) + } +} diff --git a/transports/wasm-ext/src/webrtc.js b/transports/wasm-ext/src/webrtc.js new file mode 100644 index 00000000000..d7c30d9de9f --- /dev/null +++ b/transports/wasm-ext/src/webrtc.js @@ -0,0 +1,292 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + + +function httpSend(opts) { + return new Promise(function (resolve, reject) { + var xhr = new XMLHttpRequest(); + xhr.open(opts.method, opts.url); + xhr.onreadystatechange = function () { + if (xhr.readyState != 4) { + return; + } + if (xhr.status == 200) { + resolve(xhr.response); + } else { + reject({ + status: xhr.status, + statusText: xhr.statusText, + body: xhr.response, + }); + } + }; + xhr.onerror = function () { + reject({ + status: xhr.status, + statusText: xhr.statusText + }); + }; + if (opts.headers) { + Object.keys(opts.headers).forEach(function (key) { + xhr.setRequestHeader(key, opts.headers[key]); + }); + } + var params = opts.params; + // We'll need to stringify if we've been given an object + // If we have a string, this is skipped. + if (params && typeof params === 'object') { + params = Object.keys(params).map(function (key) { + return encodeURIComponent(key) + '=' + encodeURIComponent(params[key]); + }).join('&'); + } + xhr.send(params); + }); +} + +export const webrtc_transport = async (crypto) => { + const cert = await RTCPeerConnection.generateCertificate({ + name: "ECDSA", + namedCurve: "P-256", + }); + return { + crypto, + conn_config: { + certificates: [cert], + iceServers: [], + }, + dial(addr) { return dial(this, addr); }, + listen_on(addr) { + let err = new Error("Listening on WebRTC is not possible from within a browser"); + err.name = "NotSupportedError"; + throw err; + }, + }; +} + +function signalAsSignInput(signal) { + return new TextEncoder().encode(`${signal.type}${signal.sdp}${signal.identity_pub_key}${signal.target_peer_id}`); +} + +function signSignal(self, signal) { + let signalConcat = signalAsSignInput(signal); + return bs58btc.encode(self.crypto.sign(signalConcat)); +} + +/// Throws error if invalid. +function verifyRemoteSignal(self, signal, expected_peer_id) { + if (signal.target_peer_id != self.crypto.peer_id_as_b58()) { + throw "Identity handshake failed! Reason: `target_peer_id` in the WebRTC answer, doesn't match with the expected local peer id."; + } + let pub_key_as_protobuf = bs58btc.decode(signal.identity_pub_key); + let data = signalAsSignInput(signal); + let signature = bs58btc.decode(signal.signature); + self.crypto.assert_signature(pub_key_as_protobuf, data, signature); + let peer_id = self.crypto.pub_key_as_protobuf_to_peer_id_as_b58(pub_key_as_protobuf); + if (peer_id != expected_peer_id) { + throw "Identity handshake failed! Peer's ID doesn't match the expected one." + } +} + +// Attempt to dial a multiaddress. +const dial = async (self, addr) => { + const addrParsed = addr.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/([0-9]+)\/http\/p2p-webrtc-direct\/p2p\/([a-zA-Z0-9]+)$/); + console.log("Dial: ", addr) + console.log("parsed: ", addrParsed) + if (addrParsed == null) { + let err = new Error("Address not supported: " + addr); + err.name = "NotSupportedError"; + throw err; + } + const target_peer_id = addrParsed[4]; + const conn = new RTCPeerConnection(self.conn_config); + const channel = conn.createDataChannel("data", { + ordered: true, + }); + + let offer = await conn.createOffer(); + await conn.setLocalDescription(offer); + offer = { + type: offer.type, + sdp: offer.sdp, + identity_pub_key: bs58btc.encode(self.crypto.pub_key_as_protobuf()), + target_peer_id, + }; + offer.signature = signSignal(self, offer); + + console.log("sending offer:", offer); + const offerBase58 = bs58btc.encode(new TextEncoder().encode(JSON.stringify(offer))); + const respBody = await httpSend({ + method: "GET", + url: "http://" + addrParsed[2] + ":" + addrParsed[3] + "/?signal=" + offerBase58, + }); + const answer = JSON.parse(new TextDecoder().decode(bs58btc.decode(respBody))); + console.log("received answer:", answer); + let remote_pub_key_as_protobuf = bs58btc.decode(answer.identity_pub_key); + try { + verifyRemoteSignal(self, answer, target_peer_id); + } catch (e) { + console.log("verify answer error:", e); + } + + try { + await conn.setRemoteDescription(new RTCSessionDescription(answer)); + } catch(e) { + console.log("setRemoteDescription error:", e); + } + console.log("setRemoteDescription done"); + + return new Promise((open_resolve, open_reject) => { + let reader = read_queue(); + channel.onerror = (ev) => { + console.log(ev); + // If `open_resolve` has been called earlier, calling `open_reject` seems to be + // silently ignored. It is easier to unconditionally call `open_reject` rather than + // check in which state the connection is, which would be error-prone. + open_reject(ev); + // Injecting an EOF is how we report to the reading side that the connection has been + // closed. Injecting multiple EOFs is harmless. + reader.inject_eof(); + }; + channel.onclose = (ev) => { + console.log(ev); + // Same remarks as above. + open_reject(ev); + reader.inject_eof(); + }; + + // We inject all incoming messages into the queue unconditionally. The caller isn't + // supposed to access this queue unless the connection is open. + channel.onmessage = (ev) => { + console.log("received:", ev.data, "\n--str:", new TextDecoder().decode(ev.data)); + reader.inject_array_buffer(ev.data); + } + + channel.onopen = () => { + console.log("DataChannel opened"); + open_resolve({ + read: (function*() { while(channel.readyState == "open") { + let next = reader.next(); + console.log("read:", next); + yield next; + } })(), + write: (data) => { + if (channel.readyState == "open") { + // The passed in `data` is an `ArrayBufferView` [0]. If the + // underlying typed array is a `SharedArrayBuffer` (when + // using WASM threads, so multiple web workers sharing + // memory) the WebSocket's `send` method errors [1][2][3]. + // This limitation will probably be lifted in the future, + // but for now we have to make a copy here .. + // + // [0]: https://developer.mozilla.org/en-US/docs/Web/API/ArrayBufferView + // [1]: https://chromium.googlesource.com/chromium/src/+/1438f63f369fed3766fa5031e7a252c986c69be6%5E%21/ + // [2]: https://bugreports.qt.io/browse/QTBUG-78078 + // [3]: https://chromium.googlesource.com/chromium/src/+/HEAD/third_party/blink/renderer/bindings/IDLExtendedAttributes.md#AllowShared_p + console.log("send:", data, "\n--str:", new TextDecoder().decode(data)); + channel.send(data.slice(0)); + return promise_when_send_finished(channel); + } else { + return Promise.reject("WebRTC DataChannel is " + channel.readyState); + } + }, + remote_pub_key: () => { + return remote_pub_key_as_protobuf; + // const cert = conn.sctp.transport.getRemoteCertificates()[0]; + // if (!cert) { + // return null; + // } + // return new Uint8Array(cert); + }, + shutdown: () => channel.close(), + close: () => {} + }); + } + }); +} + +// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is low enough +// to allow more data to be sent. +const promise_when_send_finished = (channel) => { + return new Promise((resolve, reject) => { + function check() { + if (channel.readyState != "open") { + reject("WebRTC DataChannel is " + channel.readyState); + return; + } + + // We put an arbitrary threshold of 8 kiB of buffered data. + if (channel.bufferedAmount < 8 * 1024) { + resolve(); + } else { + setTimeout(check, 100); + } + } + + check(); + }) +} + +// Creates a queue reading system. +const read_queue = () => { + // State of the queue. + let state = { + // Array of promises resolving to `ArrayBuffer`s, that haven't been transmitted back with + // `next` yet. + queue: new Array(), + // If `resolve` isn't null, it is a "resolve" function of a promise that has already been + // returned by `next`. It should be called with some data. + resolve: null, + }; + + return { + // Inserts a new Blob in the queue. + inject_array_buffer: (buffer) => { + if (state.resolve != null) { + state.resolve(buffer); + state.resolve = null; + } else { + state.queue.push(Promise.resolve(buffer)); + } + }, + + // Inserts an EOF message in the queue. + inject_eof: () => { + if (state.resolve != null) { + state.resolve(null); + state.resolve = null; + } else { + state.queue.push(Promise.resolve(null)); + } + }, + + // Returns a Promise that yields the next entry as an ArrayBuffer. + next: () => { + if (state.queue.length != 0) { + return state.queue.shift(0); + } else { + if (state.resolve !== null) + throw "Internal error: already have a pending promise"; + return new Promise((resolve, reject) => { + state.resolve = resolve; + }); + } + } + }; +};