From a4e06bf0e6ea3fa8eab3c35e31729e2024c3be5e Mon Sep 17 00:00:00 2001 From: Zura Benashvili Date: Tue, 20 Sep 2022 15:21:04 +0400 Subject: [PATCH] WIP on (no branch): 83c67954 *: Prepare v0.48.0 (#2869) --- CHANGELOG.md | 2 +- Cargo.toml | 5 +- core/CHANGELOG.md | 4 +- core/src/connection.rs | 14 ++ core/src/lib.rs | 2 +- misc/multistream-select/src/negotiated.rs | 9 + protocols/gossipsub/Cargo.toml | 3 +- src/lib.rs | 2 +- transports/pnet/Cargo.toml | 1 + transports/pnet/src/crypt_writer.rs | 9 + transports/pnet/src/lib.rs | 9 + transports/wasm-ext/Cargo.toml | 4 +- transports/wasm-ext/src/dtls.rs | 68 +++++ transports/wasm-ext/src/lib.rs | 137 +++++++++- transports/wasm-ext/src/webrtc.js | 292 ++++++++++++++++++++++ 15 files changed, 550 insertions(+), 11 deletions(-) create mode 100644 transports/wasm-ext/src/dtls.rs create mode 100644 transports/wasm-ext/src/webrtc.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e41bb020c..1572fc5f777 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -833,7 +833,7 @@ must not be skipped! - Reworked the API of `Multiaddr`. - Removed the `ToMultiaddr` trait in favour of `TryFrom`. - Added `Swarm::ban_peer_id` and `Swarm::unban_peer_id`. -- The `TPeerId` generic parameter of `RawSwarm` is now `TConnInfo` and must now implement a `ConnectionInfo` trait. +- The `TPeerId` generic parameter of `RawSwarm` is now `TConnInfo` and must now implement a `Connection` trait. - Reworked the `PingEvent`. - Renamed `KeepAlive::Forever` to `Yes` and `KeepAlive::Now` to `No`. diff --git a/Cargo.toml b/Cargo.toml index 62b8df404f8..c89f53f5883 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ uds = ["dep:libp2p-uds"] wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js", "rand/wasm-bindgen"] wasm-ext = ["dep:libp2p-wasm-ext"] wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext?/websocket"] +wasm-ext-webrtc = ["wasm-ext", "libp2p-wasm-ext?/webrtc"] websocket = ["dep:libp2p-websocket"] yamux = ["dep:libp2p-yamux"] secp256k1 = ["libp2p-core/secp256k1"] @@ -100,6 +101,7 @@ libp2p-swarm-derive = { version = "0.30.0", path = "swarm-derive" } libp2p-uds = { version = "0.35.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.36.0", path = "transports/wasm-ext", default-features = false, optional = true } libp2p-yamux = { version = "0.40.0", path = "muxers/yamux", optional = true } +libp2p-gossipsub = { version = "0.41.0", path = "protocols/gossipsub", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" @@ -113,9 +115,6 @@ libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, de libp2p-tcp = { version = "0.36.0", path = "transports/tcp", default-features = false, optional = true } libp2p-websocket = { version = "0.38.0", path = "transports/websocket", optional = true } -[target.'cfg(not(target_os = "unknown"))'.dependencies] -libp2p-gossipsub = { version = "0.41.0", path = "protocols/gossipsub", optional = true } - [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } async-trait = "0.1" diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 4a5e07f14f8..d1ad6d299b5 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -291,9 +291,9 @@ # 0.24.0 [2020-11-09] -- Remove `ConnectionInfo` trait and replace it with `PeerId` +- Remove `Connection` trait and replace it with `PeerId` everywhere. This was already effectively the case because - `ConnectionInfo` was implemented on `PeerId`. + `Connection` was implemented on `PeerId`. # 0.23.1 [2020-10-20] diff --git a/core/src/connection.rs b/core/src/connection.rs index 91008408fe2..a1e06220260 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -20,6 +20,20 @@ use crate::multiaddr::{Multiaddr, Protocol}; +pub trait Connection { + /// Should be `Some(_)` if transport itself handles authentication. + fn remote_peer_id(&self) -> Option { + None + } +} + +impl Connection for multistream_select::Negotiated +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner_completed_io().and_then(|io| io.remote_peer_id()) + } +} + /// Connection identifier. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ConnectionId(usize); diff --git a/core/src/lib.rs b/core/src/lib.rs index fc5b6c2426e..e253da763a3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -68,7 +68,7 @@ pub mod signed_envelope; pub mod transport; pub mod upgrade; -pub use connection::{ConnectedPoint, Endpoint}; +pub use connection::{ConnectedPoint, Connection, Endpoint}; pub use identity::PublicKey; pub use multiaddr::Multiaddr; pub use multihash; diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index 2f78daf0376..e136348e3fc 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -51,6 +51,15 @@ pub struct Negotiated { state: State, } +impl Negotiated { + pub fn inner_completed_io(&self) -> Option<&TInner> { + match &self.state { + State::Completed { io } => Some(io), + _ => None + } + } +} + /// A `Future` that waits on the completion of protocol negotiation. #[derive(Debug)] pub struct NegotiatedComplete { diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 2adccfd6607..82e1adcf043 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -28,7 +28,8 @@ prost = "0.11" hex_fmt = "0.3.0" regex = "1.5.5" serde = { version = "1", optional = true, features = ["derive"] } -wasm-timer = "0.2.5" +# wasm-timer = "0.2.5" +wasm-timer = { git = "https://github.com/fusetim/wasm-timer", branch = "tim-add-missing-methods" } instant = "0.1.11" # Metrics dependencies prometheus-client = "0.18.0" diff --git a/src/lib.rs b/src/lib.rs index 3ed00408cb5..02f9717b8f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,7 +68,7 @@ pub use libp2p_dns as dns; pub use libp2p_floodsub as floodsub; #[cfg(feature = "gossipsub")] #[cfg_attr(docsrs, doc(cfg(feature = "gossipsub")))] -#[cfg(not(target_os = "unknown"))] +// #[cfg(not(target_os = "unknown"))] #[doc(inline)] pub use libp2p_gossipsub as gossipsub; #[cfg(feature = "identify")] diff --git a/transports/pnet/Cargo.toml b/transports/pnet/Cargo.toml index b674f18ab97..9a744eba1f9 100644 --- a/transports/pnet/Cargo.toml +++ b/transports/pnet/Cargo.toml @@ -17,6 +17,7 @@ salsa20 = "0.9" sha3 = "0.10" rand = "0.7" pin-project = "1.0.2" +libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } [dev-dependencies] quickcheck = "0.9.0" diff --git a/transports/pnet/src/crypt_writer.rs b/transports/pnet/src/crypt_writer.rs index e13bb446ce6..2b65cb57f5e 100644 --- a/transports/pnet/src/crypt_writer.rs +++ b/transports/pnet/src/crypt_writer.rs @@ -28,6 +28,8 @@ use pin_project::pin_project; use salsa20::{cipher::StreamCipher, XSalsa20}; use std::{fmt, pin::Pin}; +use libp2p_core::Connection; + /// A writer that encrypts and forwards to an inner writer #[pin_project] pub struct CryptWriter { @@ -37,6 +39,13 @@ pub struct CryptWriter { cipher: XSalsa20, } +impl Connection for CryptWriter +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner.remote_peer_id() + } +} + impl CryptWriter { /// Creates a new `CryptWriter` with the specified buffer capacity. pub fn with_capacity(capacity: usize, inner: W, cipher: XSalsa20) -> CryptWriter { diff --git a/transports/pnet/src/lib.rs b/transports/pnet/src/lib.rs index efd27b14667..d80239b2405 100644 --- a/transports/pnet/src/lib.rs +++ b/transports/pnet/src/lib.rs @@ -45,6 +45,8 @@ use std::{ task::{Context, Poll}, }; +use libp2p_core::Connection; + const KEY_SIZE: usize = 32; const NONCE_SIZE: usize = 24; const WRITE_BUFFER_SIZE: usize = 1024; @@ -234,6 +236,13 @@ pub struct PnetOutput { read_cipher: XSalsa20, } +impl Connection for PnetOutput +where C: Connection { + fn remote_peer_id(&self) -> Option { + self.inner.remote_peer_id() + } +} + impl PnetOutput { fn new(inner: S, write_cipher: XSalsa20, read_cipher: XSalsa20) -> Self { Self { diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 6c6a645c8c2..eaffce4d072 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -13,10 +13,12 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.50" -libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.36.0", path = "../../core", default-features = false, features = ["ecdsa"] } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" +x509-parser = "0.14.0" [features] websocket = [] +webrtc = [] diff --git a/transports/wasm-ext/src/dtls.rs b/transports/wasm-ext/src/dtls.rs new file mode 100644 index 00000000000..5f9da85d1db --- /dev/null +++ b/transports/wasm-ext/src/dtls.rs @@ -0,0 +1,68 @@ +use libp2p_core::{Connection, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; + +#[derive(Debug, Clone)] +pub struct NonDTLSConnectionError; + +impl std::fmt::Display for NonDTLSConnectionError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Tried to authenticate using `DTLSAuthenticated` for non DTLS transport connection!") + } +} + +impl std::error::Error for NonDTLSConnectionError { +} + +#[derive(Clone)] +pub struct DTLSAuthenticated; + + +impl DTLSAuthenticated { + pub fn new() -> Self { + Self {} + } +} + +impl UpgradeInfo for DTLSAuthenticated { + type Info = &'static str; + // type InfoIter = std::iter::Once; + type InfoIter = std::iter::Empty; + + fn protocol_info(&self) -> Self::InfoIter { + std::iter::empty() + // std::iter::once("/dtls") + } +} + +impl InboundUpgrade for DTLSAuthenticated +where + C: Connection, +{ + type Output = (PeerId, C); + type Error = NonDTLSConnectionError; + type Future = std::future::Ready>; + + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + let peer_id_result = socket + .remote_peer_id() + .map(|id| (id, socket)) + .ok_or(NonDTLSConnectionError); + std::future::ready(peer_id_result) + } +} + +impl OutboundUpgrade for DTLSAuthenticated +where + C: Connection, +{ + type Output = (PeerId, C); + type Error = NonDTLSConnectionError; + type Future = std::future::Ready>; + + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + let peer_id_result = socket + .remote_peer_id() + .map(|id| (id, socket)) + .ok_or(NonDTLSConnectionError); + std::future::ready(peer_id_result) + } +} diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index bb1e3ea0653..76caeb8d9db 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -36,15 +36,19 @@ use futures::{future::Ready, prelude::*, ready, stream::SelectAll}; use libp2p_core::{ connection::Endpoint, transport::{ListenerId, TransportError, TransportEvent}, - Multiaddr, Transport, + Multiaddr, Transport, identity::Keypair, PublicKey, }; use parity_send_wrapper::SendWrapper; use std::{collections::VecDeque, error, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use wasm_bindgen::{prelude::*, JsCast}; use wasm_bindgen_futures::JsFuture; +#[cfg(feature = "webrtc")] +pub mod dtls; + /// Contains the definition that one must match on the JavaScript side. pub mod ffi { + use libp2p_core::{identity::Keypair, PublicKey, PeerId}; use wasm_bindgen::prelude::*; #[wasm_bindgen] @@ -97,6 +101,9 @@ pub mod ffi { #[wasm_bindgen(method, catch)] pub fn write(this: &Connection, data: &[u8]) -> Result; + #[wasm_bindgen(method)] + pub fn remote_pub_key(this: &Connection) -> Option; + /// Shuts down the writing side of the connection. After this has been called, the `write` /// method will no longer be called. #[wasm_bindgen(method, catch)] @@ -142,6 +149,76 @@ pub mod ffi { /// Returns a `Transport` implemented using websockets. pub fn websocket_transport() -> Transport; } + + #[cfg(feature = "webrtc")] + #[wasm_bindgen] + pub struct Crypto { + keypair: Keypair, + } + + impl Crypto { + pub fn new(keypair: Keypair) -> Self { + Self { keypair } + } + } + + #[cfg(feature = "webrtc")] + #[wasm_bindgen] + impl Crypto { + pub fn pub_key_as_protobuf(&self) -> Vec { + self.keypair.public().to_protobuf_encoding() + } + + pub fn peer_id_as_b58(&self) -> String { + self.keypair.public().to_peer_id().to_base58() + } + + pub fn sign(&self, msg: &[u8]) -> Result, JsError> { + Ok(self.keypair.sign(msg)?) + } + + pub fn pub_key_as_protobuf_to_peer_id_as_b58(&self, pub_key_as_protobuf: &[u8]) -> Result { + // return Err(JsError::new(&format!("len: {}, input: {:?}, result: {:?}", pub_key_as_protobuf.len(), pub_key_as_protobuf, PublicKey::from_protobuf_encoding(&pub_key_as_protobuf)))); + let pub_key = PublicKey::from_protobuf_encoding(&pub_key_as_protobuf)?; + Ok(pub_key.to_peer_id().to_string()) + } + + pub fn assert_signature(&self, pub_key_as_protobuf: &[u8], msg: &[u8], sig: &[u8]) -> Result<(), JsError> { + let pub_key = PublicKey::from_protobuf_encoding(&*pub_key_as_protobuf)?; + if pub_key.verify(msg, sig) { + Ok(()) + } else { + Err(JsError::new("Identity handshake failed! Invalid signature.")) + } + } + + pub fn pub_key_as_protobuf_to_peer_id(&self, pub_key_as_protobuf: &[u8], expected_peer_id: &str) -> Result<(), JsError> { + let pub_key = PublicKey::from_protobuf_encoding(pub_key_as_protobuf)?; + let peer_id = pub_key.to_peer_id().to_string(); + if peer_id == expected_peer_id { + Ok(()) + } else { + Err(JsError::new(&format!("Identity handshake failed! Peer's ID doesn't match the expected one. `expected` => '{}', `found` => '{}'", expected_peer_id, peer_id))) + } + } + } + + #[cfg(feature = "webrtc")] + #[wasm_bindgen(module = "/src/webrtc.js")] + extern "C" { + /// Returns a `Transport` implemented using websockets. + // pub fn webrtc_transport() -> Transport; + // pub fn webrtc_transport() -> js_sys::Promise; + pub fn webrtc_transport(crypto: Crypto) -> js_sys::Promise; + } +} + +pub async fn webrtc_transport(keypair: Keypair) -> ffi::Transport { + let crypto = ffi::Crypto::new(keypair); + match JsFuture::from(ffi::webrtc_transport(crypto)).await { + Ok(v) => unsafe { std::mem::transmute(v) }, + Err(err) => panic!("{:?}", err), + } } /// Implementation of `Transport` whose implementation is handled by some FFI. @@ -435,6 +512,8 @@ pub struct Connection { /// underlying transport is ready to accept data again. This promise is stored here. /// If this is `Some`, we must wait until the contained promise is resolved to write again. previous_write_promise: Option>, + + remote_peer_id: libp2p_core::PeerId, } impl Connection { @@ -442,13 +521,69 @@ impl Connection { fn new(inner: ffi::Connection) -> Self { let read_iterator = inner.read(); + // TODO(zura): remove unwraps + let bytes: Vec = inner.remote_pub_key().unwrap().to_vec(); + let remote_pub_key = PublicKey::from_protobuf_encoding(&bytes).unwrap(); + let remote_peer_id = remote_pub_key.to_peer_id(); + Connection { inner: SendWrapper::new(inner), read_iterator: SendWrapper::new(read_iterator), read_state: ConnectionReadState::PendingData(Vec::new()), previous_write_promise: None, + remote_peer_id, } } + + pub fn remote_peer_id(&self) -> &libp2p_core::PeerId { + &self.remote_peer_id + } +} + +impl libp2p_core::Connection for Connection { + fn remote_peer_id(&self) -> Option { + Some(self.remote_peer_id().clone()) + } +} + +// TODO(zura): cleanup and remove panics. +fn peer_id_from_x509_cert_bytes(bytes: &[u8]) -> libp2p_core::PeerId { + let cert = x509_parser::parse_x509_certificate(&bytes).unwrap(); + let pub_key = match cert.1.tbs_certificate.public_key().parsed() { + Ok(x509_parser::public_key::PublicKey::EC(ec_point)) => { + let ecdsa = libp2p_core::identity::ecdsa::PublicKey::from_bytes(ec_point.data()); + // TODO(zura) + libp2p_core::PublicKey::Ecdsa(ecdsa.unwrap()) + } + // TODO(zura) + _ => unimplemented!(), + }; + pub_key.to_peer_id() +} + +#[test] +fn test_peer_id_from_x509_cert_bytes() { + let bytes: Vec = vec![ + 48, 130, 1, 34, 48, 129, 201, 160, 3, 2, 1, 2, 2, 17, 3, 238, 253, 15, 63, 135, 218, 213, + 234, 171, 162, 1, 234, 134, 36, 102, 101, 48, 10, 6, 8, 42, 134, 72, 206, 61, 4, 3, 2, 48, + 17, 49, 15, 48, 13, 6, 3, 85, 4, 3, 19, 6, 87, 101, 98, 82, 84, 67, 48, 30, 23, 13, 50, 50, + 48, 57, 49, 52, 49, 48, 50, 55, 51, 53, 90, 23, 13, 50, 50, 49, 48, 49, 52, 49, 48, 50, 55, + 51, 53, 90, 48, 17, 49, 15, 48, 13, 6, 3, 85, 4, 3, 19, 6, 87, 101, 98, 82, 84, 67, 48, 89, + 48, 19, 6, 7, 42, 134, 72, 206, 61, 2, 1, 6, 8, 42, 134, 72, 206, 61, 3, 1, 7, 3, 66, 0, 4, + 252, 218, 246, 195, 200, 160, 150, 229, 30, 43, 103, 108, 207, 211, 206, 117, 126, 133, + 105, 124, 91, 10, 76, 49, 80, 57, 69, 60, 11, 89, 228, 35, 12, 140, 89, 8, 75, 254, 217, + 35, 86, 132, 118, 216, 122, 234, 55, 106, 253, 58, 92, 227, 81, 130, 36, 166, 148, 0, 243, + 217, 118, 134, 177, 220, 163, 2, 48, 0, 48, 10, 6, 8, 42, 134, 72, 206, 61, 4, 3, 2, 3, 72, + 0, 48, 69, 2, 32, 60, 50, 225, 235, 6, 45, 39, 233, 7, 113, 244, 199, 143, 158, 252, 175, + 85, 78, 157, 61, 117, 93, 105, 62, 230, 11, 110, 164, 206, 187, 115, 205, 2, 33, 0, 200, + 229, 21, 158, 184, 247, 154, 49, 0, 15, 199, 207, 237, 49, 166, 153, 80, 65, 155, 117, 51, + 178, 218, 21, 159, 158, 224, 82, 112, 69, 156, 237, + ]; + let peer_id_b58 = peer_id_from_x509_cert_bytes(&bytes).to_base58(); + assert_eq!( + peer_id_b58, + "QmegiCDEULhpyW55B2qQNMSURWBKSR72445DS6JgQsfkPj" + ) } /// Reading side of the connection. diff --git a/transports/wasm-ext/src/webrtc.js b/transports/wasm-ext/src/webrtc.js new file mode 100644 index 00000000000..d7c30d9de9f --- /dev/null +++ b/transports/wasm-ext/src/webrtc.js @@ -0,0 +1,292 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + + +function httpSend(opts) { + return new Promise(function (resolve, reject) { + var xhr = new XMLHttpRequest(); + xhr.open(opts.method, opts.url); + xhr.onreadystatechange = function () { + if (xhr.readyState != 4) { + return; + } + if (xhr.status == 200) { + resolve(xhr.response); + } else { + reject({ + status: xhr.status, + statusText: xhr.statusText, + body: xhr.response, + }); + } + }; + xhr.onerror = function () { + reject({ + status: xhr.status, + statusText: xhr.statusText + }); + }; + if (opts.headers) { + Object.keys(opts.headers).forEach(function (key) { + xhr.setRequestHeader(key, opts.headers[key]); + }); + } + var params = opts.params; + // We'll need to stringify if we've been given an object + // If we have a string, this is skipped. + if (params && typeof params === 'object') { + params = Object.keys(params).map(function (key) { + return encodeURIComponent(key) + '=' + encodeURIComponent(params[key]); + }).join('&'); + } + xhr.send(params); + }); +} + +export const webrtc_transport = async (crypto) => { + const cert = await RTCPeerConnection.generateCertificate({ + name: "ECDSA", + namedCurve: "P-256", + }); + return { + crypto, + conn_config: { + certificates: [cert], + iceServers: [], + }, + dial(addr) { return dial(this, addr); }, + listen_on(addr) { + let err = new Error("Listening on WebRTC is not possible from within a browser"); + err.name = "NotSupportedError"; + throw err; + }, + }; +} + +function signalAsSignInput(signal) { + return new TextEncoder().encode(`${signal.type}${signal.sdp}${signal.identity_pub_key}${signal.target_peer_id}`); +} + +function signSignal(self, signal) { + let signalConcat = signalAsSignInput(signal); + return bs58btc.encode(self.crypto.sign(signalConcat)); +} + +/// Throws error if invalid. +function verifyRemoteSignal(self, signal, expected_peer_id) { + if (signal.target_peer_id != self.crypto.peer_id_as_b58()) { + throw "Identity handshake failed! Reason: `target_peer_id` in the WebRTC answer, doesn't match with the expected local peer id."; + } + let pub_key_as_protobuf = bs58btc.decode(signal.identity_pub_key); + let data = signalAsSignInput(signal); + let signature = bs58btc.decode(signal.signature); + self.crypto.assert_signature(pub_key_as_protobuf, data, signature); + let peer_id = self.crypto.pub_key_as_protobuf_to_peer_id_as_b58(pub_key_as_protobuf); + if (peer_id != expected_peer_id) { + throw "Identity handshake failed! Peer's ID doesn't match the expected one." + } +} + +// Attempt to dial a multiaddress. +const dial = async (self, addr) => { + const addrParsed = addr.match(/^\/(ip4|ip6|dns4|dns6|dns)\/(.*?)\/tcp\/([0-9]+)\/http\/p2p-webrtc-direct\/p2p\/([a-zA-Z0-9]+)$/); + console.log("Dial: ", addr) + console.log("parsed: ", addrParsed) + if (addrParsed == null) { + let err = new Error("Address not supported: " + addr); + err.name = "NotSupportedError"; + throw err; + } + const target_peer_id = addrParsed[4]; + const conn = new RTCPeerConnection(self.conn_config); + const channel = conn.createDataChannel("data", { + ordered: true, + }); + + let offer = await conn.createOffer(); + await conn.setLocalDescription(offer); + offer = { + type: offer.type, + sdp: offer.sdp, + identity_pub_key: bs58btc.encode(self.crypto.pub_key_as_protobuf()), + target_peer_id, + }; + offer.signature = signSignal(self, offer); + + console.log("sending offer:", offer); + const offerBase58 = bs58btc.encode(new TextEncoder().encode(JSON.stringify(offer))); + const respBody = await httpSend({ + method: "GET", + url: "http://" + addrParsed[2] + ":" + addrParsed[3] + "/?signal=" + offerBase58, + }); + const answer = JSON.parse(new TextDecoder().decode(bs58btc.decode(respBody))); + console.log("received answer:", answer); + let remote_pub_key_as_protobuf = bs58btc.decode(answer.identity_pub_key); + try { + verifyRemoteSignal(self, answer, target_peer_id); + } catch (e) { + console.log("verify answer error:", e); + } + + try { + await conn.setRemoteDescription(new RTCSessionDescription(answer)); + } catch(e) { + console.log("setRemoteDescription error:", e); + } + console.log("setRemoteDescription done"); + + return new Promise((open_resolve, open_reject) => { + let reader = read_queue(); + channel.onerror = (ev) => { + console.log(ev); + // If `open_resolve` has been called earlier, calling `open_reject` seems to be + // silently ignored. It is easier to unconditionally call `open_reject` rather than + // check in which state the connection is, which would be error-prone. + open_reject(ev); + // Injecting an EOF is how we report to the reading side that the connection has been + // closed. Injecting multiple EOFs is harmless. + reader.inject_eof(); + }; + channel.onclose = (ev) => { + console.log(ev); + // Same remarks as above. + open_reject(ev); + reader.inject_eof(); + }; + + // We inject all incoming messages into the queue unconditionally. The caller isn't + // supposed to access this queue unless the connection is open. + channel.onmessage = (ev) => { + console.log("received:", ev.data, "\n--str:", new TextDecoder().decode(ev.data)); + reader.inject_array_buffer(ev.data); + } + + channel.onopen = () => { + console.log("DataChannel opened"); + open_resolve({ + read: (function*() { while(channel.readyState == "open") { + let next = reader.next(); + console.log("read:", next); + yield next; + } })(), + write: (data) => { + if (channel.readyState == "open") { + // The passed in `data` is an `ArrayBufferView` [0]. If the + // underlying typed array is a `SharedArrayBuffer` (when + // using WASM threads, so multiple web workers sharing + // memory) the WebSocket's `send` method errors [1][2][3]. + // This limitation will probably be lifted in the future, + // but for now we have to make a copy here .. + // + // [0]: https://developer.mozilla.org/en-US/docs/Web/API/ArrayBufferView + // [1]: https://chromium.googlesource.com/chromium/src/+/1438f63f369fed3766fa5031e7a252c986c69be6%5E%21/ + // [2]: https://bugreports.qt.io/browse/QTBUG-78078 + // [3]: https://chromium.googlesource.com/chromium/src/+/HEAD/third_party/blink/renderer/bindings/IDLExtendedAttributes.md#AllowShared_p + console.log("send:", data, "\n--str:", new TextDecoder().decode(data)); + channel.send(data.slice(0)); + return promise_when_send_finished(channel); + } else { + return Promise.reject("WebRTC DataChannel is " + channel.readyState); + } + }, + remote_pub_key: () => { + return remote_pub_key_as_protobuf; + // const cert = conn.sctp.transport.getRemoteCertificates()[0]; + // if (!cert) { + // return null; + // } + // return new Uint8Array(cert); + }, + shutdown: () => channel.close(), + close: () => {} + }); + } + }); +} + +// Takes a WebSocket object and returns a Promise that resolves when bufferedAmount is low enough +// to allow more data to be sent. +const promise_when_send_finished = (channel) => { + return new Promise((resolve, reject) => { + function check() { + if (channel.readyState != "open") { + reject("WebRTC DataChannel is " + channel.readyState); + return; + } + + // We put an arbitrary threshold of 8 kiB of buffered data. + if (channel.bufferedAmount < 8 * 1024) { + resolve(); + } else { + setTimeout(check, 100); + } + } + + check(); + }) +} + +// Creates a queue reading system. +const read_queue = () => { + // State of the queue. + let state = { + // Array of promises resolving to `ArrayBuffer`s, that haven't been transmitted back with + // `next` yet. + queue: new Array(), + // If `resolve` isn't null, it is a "resolve" function of a promise that has already been + // returned by `next`. It should be called with some data. + resolve: null, + }; + + return { + // Inserts a new Blob in the queue. + inject_array_buffer: (buffer) => { + if (state.resolve != null) { + state.resolve(buffer); + state.resolve = null; + } else { + state.queue.push(Promise.resolve(buffer)); + } + }, + + // Inserts an EOF message in the queue. + inject_eof: () => { + if (state.resolve != null) { + state.resolve(null); + state.resolve = null; + } else { + state.queue.push(Promise.resolve(null)); + } + }, + + // Returns a Promise that yields the next entry as an ArrayBuffer. + next: () => { + if (state.queue.length != 0) { + return state.queue.shift(0); + } else { + if (state.resolve !== null) + throw "Internal error: already have a pending promise"; + return new Promise((resolve, reject) => { + state.resolve = resolve; + }); + } + } + }; +};