Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Just works" WASM (browser) experience #2245

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ jobs:
wasm32-unknown-emscripten,
wasm32-wasi
]
include:
- toolchain: wasm32-unknown-unknown
args: "--features wasm-bindgen"
container:
image: rust
env:
Expand Down Expand Up @@ -76,7 +79,7 @@ jobs:
- name: Build on ${{ matrix.toolchain }}
# TODO: also run `cargo test`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please, that would be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bigger undertaking, which might be more geared towards wasm32-wasi rather than wasm32-unknown-unknown.
I guess for wasm32-unknown-unknown a wasm-bindgen based browser smoke test might be something, but I'll leave that to future-us :-)

# TODO: ideally we would build `--workspace`, but not all crates compile for WASM
run: cargo build --target=${{ matrix.toolchain }}
run: cargo build --target=${{ matrix.toolchain }} ${{ matrix.args }}

check-rustdoc-links:
name: Check rustdoc intra-doc links
Expand Down
9 changes: 6 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ rendezvous = ["libp2p-rendezvous"]
tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
uds = ["libp2p-uds"]
wasm-bindgen = ["parking_lot/wasm-bindgen"]
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "parking_lot/wasm-bindgen", "getrandom/js", "rand/wasm-bindgen"]
wasm-ext = ["libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"]
websocket = ["libp2p-websocket"]
Expand All @@ -67,6 +67,9 @@ all-features = true
atomic = "0.5.0"
bytes = "1"
futures = "0.3.1"
futures-timer = "3.0.2" # Explicit dependency to be used in `wasm-bindgen` feature
getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"
libp2p-core = { version = "0.30.0-rc.1", path = "core", default-features = false }
libp2p-floodsub = { version = "0.31.0-rc.1", path = "protocols/floodsub", optional = true }
Expand All @@ -88,10 +91,10 @@ libp2p-uds = { version = "0.30.0-rc.1", path = "transports/uds", optional = true
libp2p-wasm-ext = { version = "0.30.0-rc.1", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.34.0-rc.1", path = "muxers/yamux", optional = true }
multiaddr = { version = "0.13.0-rc.1" }
parking_lot = "0.11.0"
parking_lot = "0.11.2" # Explicit dependency to be used in `wasm-bindgen` feature
pin-project = "1.0.0"
rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature
smallvec = "1.6.1"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.30.0-rc.1", path = "transports/deflate", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ either = "1.5"
fnv = "1.0"
futures = { version = "0.3.1", features = ["executor", "thread-pool"] }
futures-timer = "3"
instant = "0.1.11"
lazy_static = "1.2"
libsecp256k1 = { version = "0.7.0", optional = true }
log = "0.4"
Expand Down Expand Up @@ -48,7 +49,6 @@ libp2p-tcp = { path = "../transports/tcp" }
multihash = { version = "0.14", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
rand07 = { package = "rand", version = "0.7" }
wasm-timer = "0.2"

[build-dependencies]
prost-build = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion core/src/peer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::identity::error::SigningError;
use crate::identity::Keypair;
use crate::signed_envelope::SignedEnvelope;
use crate::{peer_record_proto, signed_envelope, Multiaddr, PeerId};
use instant::SystemTime;
use std::convert::TryInto;
use std::fmt;
use std::time::SystemTime;

const PAYLOAD_TYPE: &str = "/libp2p/routing-state-record";
const DOMAIN_SEP: &str = "libp2p-routing-state";
Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ fnv = "1.0.7"
futures = "0.3.5"
rand = "0.7.3"
asynchronous-codec = "0.6"
wasm-timer = "0.2.4"
unsigned-varint = { version = "0.7.0", features = ["asynchronous_codec"] }
log = "0.4.11"
sha2 = "0.9.1"
Expand All @@ -27,6 +26,9 @@ smallvec = "1.6.1"
prost = "0.9"
hex_fmt = "0.3.0"
regex = "1.4.0"
futures-timer = "3.0.2"
pin-project = "1.0.8"
instant = "0.1.11"

[dev-dependencies]
async-std = "1.6.3"
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

//! Data structure for efficiently storing known back-off's when pruning peers.
use crate::topic::TopicHash;
use instant::Instant;
use libp2p_core::PeerId;
use std::collections::{
hash_map::{Entry, HashMap},
HashSet,
};
use std::time::Duration;
use wasm_timer::Instant;

#[derive(Copy, Clone)]
struct HeartbeatIndex(usize);
Expand Down
8 changes: 4 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use futures::StreamExt;
use log::{debug, error, trace, warn};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
use wasm_timer::{Instant, Interval};

use instant::Instant;
use libp2p_core::{
connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4,
multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
Expand All @@ -45,7 +45,6 @@ use libp2p_swarm::{
NotifyHandler, PollParameters,
};

use crate::backoff::BackoffStorage;
use crate::config::{GossipsubConfig, ValidationMode};
use crate::error::{PublishError, SubscriptionError, ValidationError};
use crate::gossip_promises::GossipPromises;
Expand All @@ -62,6 +61,7 @@ use crate::types::{
GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage,
};
use crate::types::{GossipsubRpc, PeerConnections, PeerKind};
use crate::{backoff::BackoffStorage, interval::Interval};
use crate::{rpc_proto, TopicScoreParams};
use std::{cmp::Ordering::Equal, fmt::Debug};

Expand Down Expand Up @@ -406,8 +406,8 @@ where
config.backoff_slack(),
),
mcache: MessageCache::new(config.history_gossip(), config.history_length()),
heartbeat: Interval::new_at(
Instant::now() + config.heartbeat_initial_delay(),
heartbeat: Interval::new_initial(
config.heartbeat_initial_delay(),
config.heartbeat_interval(),
),
heartbeat_ticks: 0,
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/gossip_promises.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
use crate::error::ValidationError;
use crate::peer_score::RejectReason;
use crate::MessageId;
use instant::Instant;
use libp2p_core::PeerId;
use log::debug;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
use wasm_timer::Instant;

/// Tracks recently sent `IWANT` messages and checks if peers respond to them
/// for each `IWANT` message we track one random requested message id.
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage};
use asynchronous_codec::Framed;
use futures::prelude::*;
use futures::StreamExt;
use instant::Instant;
use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError};
use libp2p_swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
Expand All @@ -39,7 +40,6 @@ use std::{
task::{Context, Poll},
time::Duration,
};
use wasm_timer::Instant;

/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur.
const INITIAL_KEEP_ALIVE: u64 = 30;
Expand Down
209 changes: 209 additions & 0 deletions protocols/gossipsub/src/interval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
// Copyright 2021 Oliver Wangler <oliver@wngr.de>
// Copyright 2019 Pierre Krieger
// Copyright (c) 2019 Tokio Contributors
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
// Initial version copied from
// https://github.com/tomaka/wasm-timer/blob/8964804eff980dd3eb115b711c57e481ba541708/src/timer/interval.rs
// and adapted.
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::prelude::*;
use futures_timer::Delay;
use instant::Instant;
use pin_project::pin_project;

/// A stream representing notifications at fixed interval
///
/// Intervals are created through the `Interval::new` or
/// `Interval::new_intial` methods indicating when a first notification
/// should be triggered and when it will be repeated.
///
/// Note that intervals are not intended for high resolution timers, but rather
/// they will likely fire some granularity after the exact instant that they're
/// otherwise indicated to fire at.
#[pin_project]
#[derive(Debug)]
pub struct Interval {
#[pin]
delay: Delay,
interval: Duration,
fires_at: Instant,
}

impl Interval {
/// Creates a new interval which will fire at `dur` time into the future,
/// and will repeat every `dur` interval after
///
/// The returned object will be bound to the default timer for this thread.
/// The default timer will be spun up in a helper thread on first use.
pub fn new(dur: Duration) -> Interval {
Interval::new_initial(dur, dur)
}

/// Creates a new interval which will fire the first time after the specified `initial_delay`,
/// and then will repeat every `dur` interval after.
///
/// The returned object will be bound to the default timer for this thread.
/// The default timer will be spun up in a helper thread on first use.
pub fn new_initial(initial_delay: Duration, dur: Duration) -> Interval {
let fires_at = Instant::now() + initial_delay;
Interval {
delay: Delay::new(initial_delay),
interval: dur,
fires_at,
}
}
}

impl Stream for Interval {
type Item = ();

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.as_mut().project().delay.poll(cx).is_pending() {
return Poll::Pending;
}
let next = next_interval(self.fires_at, Instant::now(), self.interval);
self.delay.reset(next);
self.fires_at += next;
Poll::Ready(Some(()))
}
}

/// Converts Duration object to raw nanoseconds if possible
///
/// This is useful to divide intervals.
///
/// While technically for large duration it's impossible to represent any
/// duration as nanoseconds, the largest duration we can represent is about
/// 427_000 years. Large enough for any interval we would use or calculate in
/// tokio.
fn duration_to_nanos(dur: Duration) -> Option<u64> {
let v = dur.as_secs().checked_mul(1_000_000_000)?;
v.checked_add(dur.subsec_nanos() as u64)
}

fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Duration {
let new = prev + interval;
if new > now {
interval
} else {
let spent_ns =
duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
let interval_ns =
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
let mult = spent_ns / interval_ns + 1;
assert!(
mult < (1 << 32),
"can't skip more than 4 billion intervals of {:?} \
(trying to skip {})",
interval,
mult
);
interval * mult as u32
}
}

#[cfg(test)]
mod test {
use super::next_interval;
use std::time::{Duration, Instant};

struct Timeline(Instant);

impl Timeline {
fn new() -> Timeline {
Timeline(Instant::now())
}
fn at(&self, millis: u64) -> Instant {
self.0 + Duration::from_millis(millis)
}
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
self.0 + Duration::new(sec, nanos)
}
}

fn dur(millis: u64) -> Duration {
Duration::from_millis(millis)
}

// The math around Instant/Duration isn't 100% precise due to rounding
// errors
fn almost_eq(a: Instant, b: Instant) -> bool {
let diff = match a.cmp(&b) {
std::cmp::Ordering::Less => b - a,
std::cmp::Ordering::Equal => return true,
std::cmp::Ordering::Greater => a - b,
};

diff < Duration::from_millis(1)
}

#[test]
fn norm_next() {
let tm = Timeline::new();
assert!(almost_eq(
tm.at(1) + next_interval(tm.at(1), tm.at(2), dur(10)),
tm.at(11)
));
assert!(almost_eq(
tm.at(7777) + next_interval(tm.at(7777), tm.at(7788), dur(100)),
tm.at(7877)
));
assert!(almost_eq(
tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(2100)),
tm.at(2101)
));
}

#[test]
fn fast_forward() {
let tm = Timeline::new();

assert!(almost_eq(
tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(10)),
tm.at(1001)
));
assert!(almost_eq(
tm.at(7777) + next_interval(tm.at(7777), tm.at(8888), dur(100)),
tm.at(8977)
));
assert!(almost_eq(
tm.at(1) + next_interval(tm.at(1), tm.at(10000), dur(2100)),
tm.at(10501)
));
}

/// TODO: this test actually should be successful, but since we can't
/// multiply Duration on anything larger than u32 easily we decided
/// to allow it to fail for now
#[test]
#[should_panic(expected = "can't skip more than 4 billion intervals")]
fn large_skip() {
let tm = Timeline::new();
assert_eq!(
tm.0 + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
tm.at_ns(25, 1)
);
}
}
1 change: 1 addition & 0 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ mod behaviour;
mod config;
mod gossip_promises;
mod handler;
mod interval;
mod mcache;
mod peer_score;
pub mod subscription_filter;
Expand Down