diff --git a/CHANGELOG.md b/CHANGELOG.md index ba69525d735..29e1720da7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -149,7 +149,6 @@ - `libp2p-swarm-derive` - `libp2p-swarm` - `libp2p-websocket` -- Forward `wasm-bindgen` feature to `futures-timer`, `instant`, `parking_lot`, `getrandom/js` and `rand/wasm-bindgen`. ## Version 0.40.0 [2021-11-01] diff --git a/core/Cargo.toml b/core/Cargo.toml index 0d5c95ea709..7f06826dd19 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,7 +18,6 @@ either = "1.5" fnv = "1.0" futures = { version = "0.3.1", features = ["executor", "thread-pool"] } futures-timer = "3" -instant = "0.1.11" lazy_static = "1.2" libsecp256k1 = { version = "0.7.0", optional = true } log = "0.4" @@ -54,6 +53,7 @@ rmp-serde = "1.0" multihash = { version = "0.16", default-features = false, features = ["arb"] } quickcheck = "0.9.0" rand07 = { package = "rand", version = "0.7" } +wasm-timer = "0.2" [build-dependencies] prost-build = "0.9" diff --git a/core/src/peer_record.rs b/core/src/peer_record.rs index 18b62d2335d..6b7759213c9 100644 --- a/core/src/peer_record.rs +++ b/core/src/peer_record.rs @@ -2,9 +2,9 @@ use crate::identity::error::SigningError; use crate::identity::Keypair; use crate::signed_envelope::SignedEnvelope; use crate::{peer_record_proto, signed_envelope, Multiaddr, PeerId}; -use instant::SystemTime; use std::convert::TryInto; use std::fmt; +use std::time::SystemTime; const PAYLOAD_TYPE: &str = "/libp2p/routing-state-record"; const DOMAIN_SEP: &str = "libp2p-routing-state"; diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 9ffff384946..53da886bd62 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -19,6 +19,7 @@ fnv = "1.0.7" futures = "0.3.5" rand = "0.7.3" asynchronous-codec = "0.6" +wasm-timer = "0.2.4" unsigned-varint = { version = "0.7.0", features = ["asynchronous_codec"] } log = "0.4.11" sha2 = "0.10.0" diff --git a/protocols/gossipsub/src/backoff.rs b/protocols/gossipsub/src/backoff.rs index 2fe0eb7fd56..c10814d289e 100644 --- a/protocols/gossipsub/src/backoff.rs +++ b/protocols/gossipsub/src/backoff.rs @@ -20,13 +20,13 @@ //! Data structure for efficiently storing known back-off's when pruning peers. use crate::topic::TopicHash; -use instant::Instant; use libp2p_core::PeerId; use std::collections::{ hash_map::{Entry, HashMap}, HashSet, }; use std::time::Duration; +use wasm_timer::Instant; #[derive(Copy, Clone)] struct HeartbeatIndex(usize); diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c354936ef2f..7502dd473dd 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -35,8 +35,8 @@ use log::{debug, error, trace, warn}; use prometheus_client::registry::Registry; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; +use wasm_timer::{Instant, Interval}; -use instant::Instant; use libp2p_core::{ connection::ConnectionId, identity::Keypair, multiaddr::Protocol::Ip4, multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId, @@ -46,6 +46,7 @@ use libp2p_swarm::{ IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, }; +use crate::backoff::BackoffStorage; use crate::config::{GossipsubConfig, ValidationMode}; use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; @@ -63,7 +64,6 @@ use crate::types::{ GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, }; use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; -use crate::{backoff::BackoffStorage, interval::Interval}; use crate::{rpc_proto, TopicScoreParams}; use std::{cmp::Ordering::Equal, fmt::Debug}; @@ -439,8 +439,8 @@ where config.backoff_slack(), ), mcache: MessageCache::new(config.history_gossip(), config.history_length()), - heartbeat: Interval::new_initial( - config.heartbeat_initial_delay(), + heartbeat: Interval::new_at( + Instant::now() + config.heartbeat_initial_delay(), config.heartbeat_interval(), ), heartbeat_ticks: 0, diff --git a/protocols/gossipsub/src/gossip_promises.rs b/protocols/gossipsub/src/gossip_promises.rs index 4cd692e27e9..40ea89a0629 100644 --- a/protocols/gossipsub/src/gossip_promises.rs +++ b/protocols/gossipsub/src/gossip_promises.rs @@ -21,10 +21,10 @@ use crate::error::ValidationError; use crate::peer_score::RejectReason; use crate::MessageId; -use instant::Instant; use libp2p_core::PeerId; use log::debug; use std::collections::HashMap; +use wasm_timer::Instant; /// Tracks recently sent `IWANT` messages and checks if peers respond to them. #[derive(Default)] diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 7fc1bd43925..b2869d9e9c9 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -25,7 +25,6 @@ use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; -use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError}; use libp2p_swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, @@ -40,6 +39,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use wasm_timer::Instant; /// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. const INITIAL_KEEP_ALIVE: u64 = 30; diff --git a/protocols/gossipsub/src/interval.rs b/protocols/gossipsub/src/interval.rs deleted file mode 100644 index 3080f541a55..00000000000 --- a/protocols/gossipsub/src/interval.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2021 Oliver Wangler -// Copyright 2019 Pierre Krieger -// Copyright (c) 2019 Tokio Contributors -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS -// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -// -// Initial version copied from -// https://github.com/tomaka/wasm-timer/blob/8964804eff980dd3eb115b711c57e481ba541708/src/timer/interval.rs -// and adapted. -use std::{ - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use futures::prelude::*; -use futures_timer::Delay; -use instant::Instant; -use pin_project::pin_project; - -/// A stream representing notifications at fixed interval -/// -/// Intervals are created through the `Interval::new` or -/// `Interval::new_intial` methods indicating when a first notification -/// should be triggered and when it will be repeated. -/// -/// Note that intervals are not intended for high resolution timers, but rather -/// they will likely fire some granularity after the exact instant that they're -/// otherwise indicated to fire at. -#[pin_project] -#[derive(Debug)] -pub struct Interval { - #[pin] - delay: Delay, - interval: Duration, - fires_at: Instant, -} - -impl Interval { - /// Creates a new interval which will fire at `dur` time into the future, - /// and will repeat every `dur` interval after - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - pub fn new(dur: Duration) -> Interval { - Interval::new_initial(dur, dur) - } - - /// Creates a new interval which will fire the first time after the specified `initial_delay`, - /// and then will repeat every `dur` interval after. - /// - /// The returned object will be bound to the default timer for this thread. - /// The default timer will be spun up in a helper thread on first use. - pub fn new_initial(initial_delay: Duration, dur: Duration) -> Interval { - let fires_at = Instant::now() + initial_delay; - Interval { - delay: Delay::new(initial_delay), - interval: dur, - fires_at, - } - } -} - -impl Stream for Interval { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.as_mut().project().delay.poll(cx).is_pending() { - return Poll::Pending; - } - let next = next_interval(self.fires_at, Instant::now(), self.interval); - self.delay.reset(next); - self.fires_at += next; - Poll::Ready(Some(())) - } -} - -/// Converts Duration object to raw nanoseconds if possible -/// -/// This is useful to divide intervals. -/// -/// While technically for large duration it's impossible to represent any -/// duration as nanoseconds, the largest duration we can represent is about -/// 427_000 years. Large enough for any interval we would use or calculate in -/// tokio. -fn duration_to_nanos(dur: Duration) -> Option { - let v = dur.as_secs().checked_mul(1_000_000_000)?; - v.checked_add(dur.subsec_nanos() as u64) -} - -fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Duration { - let new = prev + interval; - if new > now { - interval - } else { - let spent_ns = - duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); - let interval_ns = - duration_to_nanos(interval).expect("interval is less that 427 thousand years"); - let mult = spent_ns / interval_ns + 1; - assert!( - mult < (1 << 32), - "can't skip more than 4 billion intervals of {:?} \ - (trying to skip {})", - interval, - mult - ); - interval * mult as u32 - } -} - -#[cfg(test)] -mod test { - use super::next_interval; - use std::time::{Duration, Instant}; - - struct Timeline(Instant); - - impl Timeline { - fn new() -> Timeline { - Timeline(Instant::now()) - } - fn at(&self, millis: u64) -> Instant { - self.0 + Duration::from_millis(millis) - } - fn at_ns(&self, sec: u64, nanos: u32) -> Instant { - self.0 + Duration::new(sec, nanos) - } - } - - fn dur(millis: u64) -> Duration { - Duration::from_millis(millis) - } - - // The math around Instant/Duration isn't 100% precise due to rounding - // errors - fn almost_eq(a: Instant, b: Instant) -> bool { - let diff = match a.cmp(&b) { - std::cmp::Ordering::Less => b - a, - std::cmp::Ordering::Equal => return true, - std::cmp::Ordering::Greater => a - b, - }; - - diff < Duration::from_millis(1) - } - - #[test] - fn norm_next() { - let tm = Timeline::new(); - assert!(almost_eq( - tm.at(1) + next_interval(tm.at(1), tm.at(2), dur(10)), - tm.at(11) - )); - assert!(almost_eq( - tm.at(7777) + next_interval(tm.at(7777), tm.at(7788), dur(100)), - tm.at(7877) - )); - assert!(almost_eq( - tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(2100)), - tm.at(2101) - )); - } - - #[test] - fn fast_forward() { - let tm = Timeline::new(); - - assert!(almost_eq( - tm.at(1) + next_interval(tm.at(1), tm.at(1000), dur(10)), - tm.at(1001) - )); - assert!(almost_eq( - tm.at(7777) + next_interval(tm.at(7777), tm.at(8888), dur(100)), - tm.at(8977) - )); - assert!(almost_eq( - tm.at(1) + next_interval(tm.at(1), tm.at(10000), dur(2100)), - tm.at(10501) - )); - } - - /// TODO: this test actually should be successful, but since we can't - /// multiply Duration on anything larger than u32 easily we decided - /// to allow it to fail for now - #[test] - #[should_panic(expected = "can't skip more than 4 billion intervals")] - fn large_skip() { - let tm = Timeline::new(); - assert_eq!( - tm.0 + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), - tm.at_ns(25, 1) - ); - } -} diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 4088a71f128..0168a0a2842 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -142,7 +142,6 @@ mod behaviour; mod config; mod gossip_promises; mod handler; -mod interval; mod mcache; pub mod metrics; mod peer_score; diff --git a/protocols/gossipsub/src/peer_score.rs b/protocols/gossipsub/src/peer_score.rs index 5fe1cbfcb85..4f568a9fdba 100644 --- a/protocols/gossipsub/src/peer_score.rs +++ b/protocols/gossipsub/src/peer_score.rs @@ -24,12 +24,11 @@ use crate::metrics::{Metrics, Penalty}; use crate::time_cache::TimeCache; use crate::{MessageId, TopicHash}; -use instant::Instant; use libp2p_core::PeerId; use log::{debug, trace, warn}; use std::collections::{hash_map, HashMap, HashSet}; use std::net::IpAddr; -use std::time::Duration; +use std::time::{Duration, Instant}; mod params; use crate::error::ValidationError; diff --git a/protocols/gossipsub/src/time_cache.rs b/protocols/gossipsub/src/time_cache.rs index 69ec67d3710..7c33bc521df 100644 --- a/protocols/gossipsub/src/time_cache.rs +++ b/protocols/gossipsub/src/time_cache.rs @@ -21,13 +21,12 @@ //! This implements a time-based LRU cache for checking gossipsub message duplicates. use fnv::FnvHashMap; -use instant::Instant; use std::collections::hash_map::{ self, Entry::{Occupied, Vacant}, }; use std::collections::VecDeque; -use std::time::Duration; +use std::time::{Duration, Instant}; struct ExpiringElement { /// The element that expires diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index ccce7b2984a..47ea44e6478 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -19,6 +19,7 @@ log = "0.4.1" lru = "0.7.2" prost = "0.9" smallvec = "1.6.1" +wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/identify/src/handler.rs b/protocols/identify/src/handler.rs index 4ceb8d25e3d..f0d05f79dc9 100644 --- a/protocols/identify/src/handler.rs +++ b/protocols/identify/src/handler.rs @@ -22,7 +22,6 @@ use crate::protocol::{ IdentifyInfo, IdentifyProtocol, IdentifyPushProtocol, InboundPush, OutboundPush, ReplySubstream, }; use futures::prelude::*; -use futures_timer::Delay; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::upgrade::{ EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade, UpgradeError, @@ -33,6 +32,7 @@ use libp2p_swarm::{ }; use smallvec::SmallVec; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; +use wasm_timer::Delay; /// Protocol handler for sending and receiving identification requests. /// @@ -189,13 +189,14 @@ impl ProtocolsHandler for IdentifyHandler { // Poll the future that fires when we need to identify the node again. match Future::poll(Pin::new(&mut self.next_id), cx) { Poll::Pending => Poll::Pending, - Poll::Ready(()) => { + Poll::Ready(Ok(())) => { self.next_id.reset(self.interval); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(EitherUpgrade::A(IdentifyProtocol), ()), }; Poll::Ready(ev) } + Poll::Ready(Err(err)) => Poll::Ready(ProtocolsHandlerEvent::Close(err)), } } } diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 684a71e7d6b..b10e95de69c 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -24,6 +24,7 @@ prost = "0.9" rand = "0.7.2" sha2 = "0.10.0" smallvec = "1.6.1" +wasm-timer = "0.2" uint = "0.9" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1.0" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index be45798dc12..769094d0431 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -38,7 +38,6 @@ use crate::record::{ }; use crate::K_VALUE; use fnv::{FnvHashMap, FnvHashSet}; -use instant::Instant; use libp2p_core::{ connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, @@ -56,6 +55,7 @@ use std::task::{Context, Poll}; use std::vec; use std::{borrow::Cow, time::Duration}; use thiserror::Error; +use wasm_timer::Instant; pub use crate::query::QueryStats; diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 2b3e92ff0ee..b92c89bfee5 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -24,7 +24,6 @@ use crate::protocol::{ }; use crate::record::{self, Record}; use futures::prelude::*; -use instant::Instant; use libp2p_core::{ either::EitherOutput, upgrade::{self, InboundUpgrade, OutboundUpgrade}, @@ -38,6 +37,7 @@ use log::trace; use std::{ error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration, }; +use wasm_timer::Instant; /// A prototype from which [`KademliaHandler`]s can be constructed. pub struct KademliaHandlerProto { diff --git a/protocols/kad/src/jobs.rs b/protocols/kad/src/jobs.rs index 7ce201a4150..402a797a52d 100644 --- a/protocols/kad/src/jobs.rs +++ b/protocols/kad/src/jobs.rs @@ -63,14 +63,13 @@ use crate::record::{self, store::RecordStore, ProviderRecord, Record}; use futures::prelude::*; -use futures_timer::Delay; -use instant::Instant; use libp2p_core::PeerId; use std::collections::HashSet; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; use std::vec; +use wasm_timer::{Delay, Instant}; /// The maximum number of queries towards which background jobs /// are allowed to start new queries on an invocation of @@ -102,7 +101,7 @@ impl PeriodicJob { if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state { let new_deadline = Instant::now() - Duration::from_secs(1); *deadline = new_deadline; - delay.reset(Duration::from_secs(1)); + delay.reset_at(new_deadline); } } @@ -149,7 +148,7 @@ impl PutRecordJob { ) -> Self { let now = Instant::now(); let deadline = now + replicate_interval; - let delay = Delay::new(replicate_interval); + let delay = Delay::new_at(deadline); let next_publish = publish_interval.map(|i| now + i); Self { local_id, @@ -237,7 +236,7 @@ impl PutRecordJob { // Wait for the next run. let deadline = now + self.inner.interval; - let delay = Delay::new(self.inner.interval); + let delay = Delay::new_at(deadline); self.inner.state = PeriodicJobState::Waiting(delay, deadline); assert!(!self.inner.is_ready(cx, now)); } @@ -263,7 +262,7 @@ impl AddProviderJob { interval, state: { let deadline = now + interval; - PeriodicJobState::Waiting(Delay::new(interval), deadline) + PeriodicJobState::Waiting(Delay::new_at(deadline), deadline) }, }, } @@ -315,7 +314,7 @@ impl AddProviderJob { } let deadline = now + self.inner.interval; - let delay = Delay::new(self.inner.interval); + let delay = Delay::new_at(deadline); self.inner.state = PeriodicJobState::Waiting(delay, deadline); assert!(!self.inner.is_ready(cx, now)); } diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 648f7fc9e07..db3c99067e1 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -32,13 +32,13 @@ use asynchronous_codec::Framed; use bytes::BytesMut; use codec::UviBytes; use futures::prelude::*; -use instant::Instant; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::{Multiaddr, PeerId}; use prost::Message; use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{io, iter}; use unsigned_varint::codec; +use wasm_timer::Instant; /// The protocol name used for negotiating with multistream-select. pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0"; diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 708c758464f..6fcf90df79f 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -30,9 +30,9 @@ use crate::kbucket::{Key, KeyBytes}; use crate::{ALPHA_VALUE, K_VALUE}; use either::Either; use fnv::FnvHashMap; -use instant::Instant; use libp2p_core::PeerId; use std::{num::NonZeroUsize, time::Duration}; +use wasm_timer::Instant; /// A `QueryPool` provides an aggregate state machine for driving `Query`s to completion. /// diff --git a/protocols/kad/src/query/peers/closest.rs b/protocols/kad/src/query/peers/closest.rs index 2b3cb124274..684c109b934 100644 --- a/protocols/kad/src/query/peers/closest.rs +++ b/protocols/kad/src/query/peers/closest.rs @@ -22,10 +22,10 @@ use super::*; use crate::kbucket::{Distance, Key, KeyBytes}; use crate::{ALPHA_VALUE, K_VALUE}; -use instant::Instant; use libp2p_core::PeerId; use std::collections::btree_map::{BTreeMap, Entry}; use std::{iter::FromIterator, num::NonZeroUsize, time::Duration}; +use wasm_timer::Instant; pub mod disjoint; diff --git a/protocols/kad/src/query/peers/closest/disjoint.rs b/protocols/kad/src/query/peers/closest/disjoint.rs index af91b8c1f0b..01506ff6f7b 100644 --- a/protocols/kad/src/query/peers/closest/disjoint.rs +++ b/protocols/kad/src/query/peers/closest/disjoint.rs @@ -20,13 +20,13 @@ use super::*; use crate::kbucket::{Key, KeyBytes}; -use instant::Instant; use libp2p_core::PeerId; use std::{ collections::HashMap, iter::{Cycle, Map, Peekable}, ops::{Index, IndexMut, Range}, }; +use wasm_timer::Instant; /// Wraps around a set of [`ClosestPeersIter`], enforcing a disjoint discovery /// path per configured parallelism according to the S/Kademlia paper. diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index e321992e5c7..804b1637f6f 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -23,12 +23,12 @@ pub mod store; use bytes::Bytes; -use instant::Instant; use libp2p_core::{multihash::Multihash, Multiaddr, PeerId}; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use std::borrow::Borrow; use std::hash::{Hash, Hasher}; +use wasm_timer::Instant; /// The (opaque) key of a record. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index a4be5688e13..9c6230c85dd 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -19,6 +19,7 @@ libp2p-swarm = { version = "0.34.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" +wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index d10538562de..435a5048485 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -21,7 +21,6 @@ use crate::protocol; use futures::future::BoxFuture; use futures::prelude::*; -use futures_timer::Delay; use libp2p_core::{upgrade::NegotiationError, UpgradeError}; use libp2p_swarm::{ KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, @@ -36,6 +35,7 @@ use std::{ time::Duration, }; use void::Void; +use wasm_timer::Delay; /// The configuration for outbound pings. #[derive(Clone, Debug)] @@ -349,10 +349,15 @@ impl ProtocolsHandler for Handler { self.outbound = Some(PingState::Idle(stream)); break; } - Poll::Ready(()) => { + Poll::Ready(Ok(())) => { self.timer.reset(self.config.timeout); self.outbound = Some(PingState::Ping(protocol::send_ping(stream).boxed())); } + Poll::Ready(Err(e)) => { + return Poll::Ready(ProtocolsHandlerEvent::Close(Failure::Other { + error: Box::new(e), + })) + } }, Some(PingState::OpenStream) => { self.outbound = Some(PingState::OpenStream); diff --git a/protocols/ping/src/protocol.rs b/protocols/ping/src/protocol.rs index 703a9275d87..a3138568777 100644 --- a/protocols/ping/src/protocol.rs +++ b/protocols/ping/src/protocol.rs @@ -19,12 +19,12 @@ // DEALINGS IN THE SOFTWARE. use futures::prelude::*; -use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::NegotiatedSubstream; use rand::{distributions, prelude::*}; use std::{io, iter, time::Duration}; use void::Void; +use wasm_timer::Instant; /// The `Ping` protocol upgrade. /// diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 13415e00725..79bfffe6a80 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -28,6 +28,7 @@ static_assertions = "1" thiserror = "1.0" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1" +wasm-timer = "0.2" [build-dependencies] prost-build = "0.9" diff --git a/protocols/relay/src/v1/handler.rs b/protocols/relay/src/v1/handler.rs index 4da3ed93f14..cb7a66e44eb 100644 --- a/protocols/relay/src/v1/handler.rs +++ b/protocols/relay/src/v1/handler.rs @@ -24,7 +24,6 @@ use futures::channel::oneshot::{self, Canceled}; use futures::future::BoxFuture; use futures::prelude::*; use futures::stream::FuturesUnordered; -use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::{upgrade, ConnectedPoint, Multiaddr, PeerId}; @@ -36,6 +35,7 @@ use log::warn; use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; +use wasm_timer::Instant; pub struct RelayHandlerConfig { pub connection_idle_timeout: Duration, diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 02e1bb13287..c67d1ce8db1 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -23,8 +23,7 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } bimap = "0.6.1" sha2 = "0.10" rand = "0.8" -futures-timer = "3.0.2" -instant = "0.1.11" +wasm-timer = "0.2" [dev-dependencies] async-trait = "0.1" diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 9a64d59bfc0..73eb90edb38 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -27,7 +27,6 @@ use futures::future::BoxFuture; use futures::future::FutureExt; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; -use instant::Duration; use libp2p_core::connection::ConnectionId; use libp2p_core::identity::error::SigningError; use libp2p_core::identity::Keypair; @@ -38,6 +37,7 @@ use libp2p_swarm::{ use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; +use std::time::Duration; pub struct Behaviour { events: VecDeque< @@ -310,8 +310,8 @@ fn handle_outbound_event( expiring_registrations.extend(registrations.iter().cloned().map(|registration| { async move { // if the timer errors we consider it expired - let _ = futures_timer::Delay::new(Duration::from_secs(registration.ttl as u64)) - .await; + let _ = + wasm_timer::Delay::new(Duration::from_secs(registration.ttl as u64)).await; (registration.record.peer_id(), registration.namespace) } diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index ecfd8ce50c5..93682dda422 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -381,8 +381,14 @@ impl Registrations { self.registrations .insert(registration_id, registration.clone()); - let next_expiry = futures_timer::Delay::new(Duration::from_secs(ttl as u64)) - .map(move |_| registration_id) + let next_expiry = wasm_timer::Delay::new(Duration::from_secs(ttl as u64)) + .map(move |result| { + if result.is_err() { + log::warn!("Timer for registration {} has unexpectedly errored, treating it as expired", registration_id.0); + } + + registration_id + }) .boxed(); self.next_expiry.push(next_expiry); @@ -490,8 +496,8 @@ pub struct CookieNamespaceMismatch; #[cfg(test)] mod tests { - use instant::SystemTime; use std::option::Option::None; + use std::time::SystemTime; use libp2p_core::{identity, PeerRecord}; diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index 55b2a3638c9..efd7956b13c 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -27,7 +27,6 @@ use futures::future::{self, BoxFuture, Fuse, FusedFuture}; use futures::FutureExt; -use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; use libp2p_swarm::{ @@ -39,7 +38,7 @@ use std::fmt; use std::future::Future; use std::hash::Hash; use std::task::{Context, Poll}; -use std::time::Duration; +use std::time::{Duration, Instant}; use void::Void; /// Handles a substream throughout its lifetime. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index a7f28429fcc..2da993eabb9 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -21,6 +21,7 @@ log = "0.4.11" rand = "0.7" smallvec = "1.6.1" unsigned-varint = { version = "0.7", features = ["std", "futures"] } +wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index e0b482ccd3e..ee2550df183 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -26,7 +26,6 @@ use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; pub use protocol::{ProtocolSupport, RequestProtocol, ResponseProtocol}; use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; -use instant::Instant; use libp2p_core::upgrade::{NegotiationError, UpgradeError}; use libp2p_swarm::{ protocols_handler::{ @@ -45,6 +44,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; +use wasm_timer::Instant; /// A connection handler of a `RequestResponse` protocol. #[doc(hidden)] diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 184acb95a38..14df1ab8378 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -17,9 +17,8 @@ libp2p-core = { version = "0.32.0", path = "../core", default-features = false } log = "0.4" rand = "0.7" smallvec = "1.6.1" +wasm-timer = "0.2" void = "1" -futures-timer = "3.0.2" -instant = "0.1.11" [dev-dependencies] libp2p = { path = "../", default-features = false, features = ["identify", "ping", "plaintext", "yamux"] } diff --git a/swarm/src/protocols_handler.rs b/swarm/src/protocols_handler.rs index 9d6a2fae7cf..4f67b5ebb34 100644 --- a/swarm/src/protocols_handler.rs +++ b/swarm/src/protocols_handler.rs @@ -48,9 +48,9 @@ mod select; pub use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper, UpgradeInfoSend}; -use instant::Instant; use libp2p_core::{upgrade::UpgradeError, ConnectedPoint, Multiaddr, PeerId}; use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration}; +use wasm_timer::Instant; pub use dummy::DummyProtocolsHandler; pub use map_in::MapInEvent; diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index fa339fd9416..8254968c6e8 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -26,8 +26,6 @@ use crate::upgrade::SendWrapper; use futures::prelude::*; use futures::stream::FuturesUnordered; -use futures_timer::Delay; -use instant::Instant; use libp2p_core::{ connection::{ ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler, Substream, @@ -38,6 +36,7 @@ use libp2p_core::{ Connected, Multiaddr, }; use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; +use wasm_timer::{Delay, Instant}; /// Prototype for a `NodeHandlerWrapper`. pub struct NodeHandlerWrapperBuilder { @@ -160,7 +159,7 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.timeout.poll_unpin(cx) { - Poll::Ready(()) => { + Poll::Ready(Ok(_)) => { return Poll::Ready(( self.user_data .take() @@ -168,7 +167,14 @@ where Err(ProtocolsHandlerUpgrErr::Timeout), )) } - + Poll::Ready(Err(_)) => { + return Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(ProtocolsHandlerUpgrErr::Timer), + )) + } Poll::Pending => {} } @@ -356,16 +362,10 @@ where (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) - } - } - } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - self.shutdown = Shutdown::Later(Delay::new(dur), t) + timer.reset_at(t) } } + (_, KeepAlive::Until(t)) => self.shutdown = Shutdown::Later(Delay::new_at(t), t), (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, }; diff --git a/swarm/src/protocols_handler/one_shot.rs b/swarm/src/protocols_handler/one_shot.rs index 0851ff82e43..01a2951efc5 100644 --- a/swarm/src/protocols_handler/one_shot.rs +++ b/swarm/src/protocols_handler/one_shot.rs @@ -22,9 +22,10 @@ use crate::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend}; -use instant::Instant; + use smallvec::SmallVec; use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration}; +use wasm_timer::Instant; /// A `ProtocolsHandler` that opens a new substream for each request. // TODO: Debug diff --git a/transports/wasm-ext/src/websockets.js b/transports/wasm-ext/src/websockets.js index 1ef2faf6ded..290af968e70 100644 --- a/transports/wasm-ext/src/websockets.js +++ b/transports/wasm-ext/src/websockets.js @@ -80,18 +80,7 @@ const dial = (addr) => { read: (function*() { while(ws.readyState == 1) { yield reader.next(); } })(), write: (data) => { if (ws.readyState == 1) { - // The passed in `data` is an `ArrayBufferView` [0]. If the - // underlying typed array is a `SharedArrayBuffer` (when - // using WASM threads, so multiple web workers sharing - // memory) the WebSocket's `send` method errors [1][2][3]. - // This limitation will probably be lifted in the future, - // but for now we have to make a copy here .. - // - // [0]: https://developer.mozilla.org/en-US/docs/Web/API/ArrayBufferView - // [1]: https://chromium.googlesource.com/chromium/src/+/1438f63f369fed3766fa5031e7a252c986c69be6%5E%21/ - // [2]: https://bugreports.qt.io/browse/QTBUG-78078 - // [3]: https://chromium.googlesource.com/chromium/src/+/HEAD/third_party/blink/renderer/bindings/IDLExtendedAttributes.md#AllowShared_p - ws.send(data.slice(0)); + ws.send(data); return promise_when_send_finished(ws); } else { return Promise.reject("WebSocket is closed");