Skip to content

Commit

Permalink
Update to tokio 1.1 (#2172)
Browse files Browse the repository at this point in the history
## Issue Addressed

resolves #2129
resolves #2099 
addresses some of #1712
unblocks #2076
unblocks #2153 

## Proposed Changes

- Updates all the dependencies mentioned in #2129, except for web3. They haven't merged their tokio 1.0 update because they are waiting on some dependencies of their own. Since we only use web3 in tests, I think updating it in a separate issue is fine. If they are able to merge soon though, I can update in this PR. 

- Updates `tokio_util` to 0.6.2 and `bytes` to 1.0.1.

- We haven't made a discv5 release since merging tokio 1.0 updates so I'm using a commit rather than release atm. **Edit:** I think we should merge an update of `tokio_util` to 0.6.2 into discv5 before this release because it has panic fixes in `DelayQueue`  --> PR in discv5:  sigp/discv5#58

## Additional Info

tokio 1.0 changes that required some changes in lighthouse:

- `interval.next().await.is_some()` -> `interval.tick().await`
- `sleep` future is now `!Unpin` -> tokio-rs/tokio#3028
- `try_recv` has been temporarily removed from `mpsc` -> tokio-rs/tokio#3350
- stream features have moved to `tokio-stream` and `broadcast::Receiver::into_stream()` has been temporarily removed -> `tokio-rs/tokio#2870
- I've copied over the `BroadcastStream` wrapper from this PR, but can update to use `tokio-stream` once it's merged tokio-rs/tokio#3384

Co-authored-by: realbigsean <seananderson33@gmail.com>
  • Loading branch information
realbigsean and realbigsean committed Feb 10, 2021
1 parent 6f4da9a commit e20f64b
Show file tree
Hide file tree
Showing 74 changed files with 1,136 additions and 1,317 deletions.
1,437 changes: 638 additions & 799 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions account_manager/Cargo.toml
Expand Up @@ -27,14 +27,13 @@ eth2_wallet = { path = "../crypto/eth2_wallet" }
eth2_wallet_manager = { path = "../common/eth2_wallet_manager" }
rand = "0.7.3"
validator_dir = { path = "../common/validator_dir" }
tokio = { version = "0.3.5", features = ["full"] }
tokio = { version = "1.1.0", features = ["full"] }
eth2_keystore = { path = "../crypto/eth2_keystore" }
account_utils = { path = "../common/account_utils" }
slashing_protection = { path = "../validator_client/slashing_protection" }
eth2 = {path = "../common/eth2"}
safe_arith = {path = "../consensus/safe_arith"}
slot_clock = { path = "../common/slot_clock" }
tokio-compat-02 = "0.1"

[dev-dependencies]
tempfile = "3.1.0"
20 changes: 8 additions & 12 deletions account_manager/src/validator/exit.rs
Expand Up @@ -12,7 +12,6 @@ use safe_arith::SafeArith;
use slot_clock::{SlotClock, SystemTimeSlotClock};
use std::path::PathBuf;
use std::time::Duration;
use tokio_compat_02::FutureExt;
use types::{ChainSpec, Epoch, EthSpec, Fork, VoluntaryExit};

pub const CMD: &str = "exit";
Expand Down Expand Up @@ -77,17 +76,14 @@ pub fn cli_run<E: EthSpec>(matches: &ArgMatches, env: Environment<E>) -> Result<
.clone()
.expect("network should have a valid config");

env.runtime().block_on(
publish_voluntary_exit::<E>(
&keystore_path,
password_file_path.as_ref(),
&client,
&spec,
stdin_inputs,
&testnet_config,
)
.compat(),
)?;
env.runtime().block_on(publish_voluntary_exit::<E>(
&keystore_path,
password_file_path.as_ref(),
&client,
&spec,
stdin_inputs,
&testnet_config,
))?;

Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/Cargo.toml
Expand Up @@ -10,7 +10,6 @@ path = "src/lib.rs"

[dev-dependencies]
node_test_rig = { path = "../testing/node_test_rig" }
tokio-compat-02 = "0.1"

[features]
write_ssz_files = ["beacon_chain/write_ssz_files"] # Writes debugging .ssz files to /tmp during block processing.
Expand All @@ -27,7 +26,7 @@ slog = { version = "2.5.2", features = ["max_level_trace", "release_max_level_tr
slog-term = "2.6.0"
slog-async = "2.5.0"
ctrlc = { version = "3.1.6", features = ["termination"] }
tokio = { version = "0.3.2", features = ["time"] }
tokio = { version = "1.1.0", features = ["time"] }
exit-future = "0.2.0"
dirs = "3.0.1"
logging = { path = "../common/logging" }
Expand All @@ -41,7 +40,7 @@ eth2_libp2p = { path = "./eth2_libp2p" }
eth2_ssz = "0.1.2"
serde = "1.0.116"
clap_utils = { path = "../common/clap_utils" }
hyper = "0.13.8"
hyper = "0.14.4"
lighthouse_version = { path = "../common/lighthouse_version" }
hex = "0.4.2"
slasher = { path = "../slasher" }
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/Cargo.toml
Expand Up @@ -40,7 +40,7 @@ eth2_ssz_derive = "0.1.0"
state_processing = { path = "../../consensus/state_processing" }
tree_hash = "0.1.1"
types = { path = "../../consensus/types" }
tokio = "0.3.2"
tokio = "1.1.0"
eth1 = { path = "../eth1" }
futures = "0.3.7"
genesis = { path = "../genesis" }
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/client/Cargo.toml
Expand Up @@ -26,10 +26,10 @@ error-chain = "0.12.4"
serde_yaml = "0.8.13"
slog = { version = "2.5.2", features = ["max_level_trace"] }
slog-async = "2.5.0"
tokio = "0.3.2"
tokio = "1.1.0"
dirs = "3.0.1"
futures = "0.3.7"
reqwest = { version = "0.10.8", features = ["native-tls-vendored"] }
reqwest = { version = "0.11.0", features = ["native-tls-vendored"] }
url = "2.1.1"
eth1 = { path = "../eth1" }
genesis = { path = "../genesis" }
Expand Down
40 changes: 22 additions & 18 deletions beacon_node/client/src/notifier.rs
@@ -1,7 +1,6 @@
use crate::metrics;
use beacon_chain::{BeaconChain, BeaconChainTypes};
use eth2_libp2p::NetworkGlobals;
use futures::prelude::*;
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use slot_clock::SlotClock;
Expand Down Expand Up @@ -64,26 +63,32 @@ pub fn spawn_notifier<T: BeaconChainTypes>(
}

// Perform post-genesis logging.
while interval.next().await.is_some() {
loop {
interval.tick().await;
let connected_peer_count = network.connected_peers();
let sync_state = network.sync_state();

let head_info = beacon_chain.head_info().map_err(|e| {
error!(
log,
"Failed to get beacon chain head info";
"error" => format!("{:?}", e)
)
})?;
let head_info = match beacon_chain.head_info() {
Ok(head_info) => head_info,
Err(e) => {
error!(log, "Failed to get beacon chain head info"; "error" => format!("{:?}", e));
break;
}
};

let head_slot = head_info.slot;
let current_slot = beacon_chain.slot().map_err(|e| {
error!(
log,
"Unable to read current slot";
"error" => format!("{:?}", e)
)
})?;
let current_slot = match beacon_chain.slot() {
Ok(slot) => slot,
Err(e) => {
error!(
log,
"Unable to read current slot";
"error" => format!("{:?}", e)
);
break;
}
};

let current_epoch = current_slot.epoch(T::EthSpec::slots_per_epoch());
let finalized_epoch = head_info.finalized_checkpoint.epoch;
let finalized_root = head_info.finalized_checkpoint.root;
Expand Down Expand Up @@ -175,11 +180,10 @@ pub fn spawn_notifier<T: BeaconChainTypes>(

eth1_logging(&beacon_chain, &log);
}
Ok::<(), ()>(())
};

// run the notifier on the current executor
executor.spawn(interval_future.unwrap_or_else(|_| ()), "notifier");
executor.spawn(interval_future, "notifier");

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth1/Cargo.toml
Expand Up @@ -13,7 +13,7 @@ environment = { path = "../../lighthouse/environment" }
tokio-compat-02 = "0.1"

[dependencies]
reqwest = { version = "0.10.8", features = ["native-tls-vendored"] }
reqwest = { version = "0.11.0", features = ["native-tls-vendored"] }
futures = "0.3.7"
serde_json = "1.0.58"
serde = { version = "1.0.116", features = ["derive"] }
Expand All @@ -26,7 +26,7 @@ tree_hash = "0.1.1"
eth2_hashing = "0.1.0"
parking_lot = "0.11.0"
slog = "2.5.2"
tokio = { version = "0.3.2", features = ["full"] }
tokio = { version = "1.1.0", features = ["full"] }
state_processing = { path = "../../consensus/state_processing" }
libflate = "1.0.2"
lighthouse_metrics = { path = "../../common/lighthouse_metrics"}
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/eth1/src/service.rs
Expand Up @@ -9,7 +9,7 @@ use crate::{
inner::{DepositUpdater, Inner},
};
use fallback::{Fallback, FallbackError};
use futures::{future::TryFutureExt, StreamExt};
use futures::future::TryFutureExt;
use parking_lot::{RwLock, RwLockReadGuard};
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, info, trace, warn, Logger};
Expand Down Expand Up @@ -721,7 +721,8 @@ impl Service {
let mut interval = interval_at(Instant::now(), update_interval);

let update_future = async move {
while interval.next().await.is_some() {
loop {
interval.tick().await;
self.do_update(update_interval).await.ok();
}
};
Expand Down
17 changes: 8 additions & 9 deletions beacon_node/eth2_libp2p/Cargo.toml
Expand Up @@ -5,8 +5,8 @@ authors = ["Sigma Prime <contact@sigmaprime.io>"]
edition = "2018"

[dependencies]
discv5 = { version = "0.1.0-beta.2", features = ["libp2p"] }
unsigned-varint = { git = "https://github.com/sigp/unsigned-varint", branch = "dep-update", features = ["codec"] }
discv5 = { version = "0.1.0-beta.3", features = ["libp2p"] }
unsigned-varint = { version = "0.6.0", features = ["codec"] }
types = { path = "../../consensus/types" }
hashset_delay = { path = "../../common/hashset_delay" }
eth2_ssz_types = { path = "../../consensus/ssz_types" }
Expand All @@ -16,23 +16,24 @@ eth2_ssz = "0.1.2"
eth2_ssz_derive = "0.1.0"
slog = { version = "2.5.2", features = ["max_level_trace"] }
lighthouse_version = { path = "../../common/lighthouse_version" }
tokio = { version = "0.3.2", features = ["time", "macros"] }
tokio = { version = "1.1.0", features = ["time", "macros"] }
futures = "0.3.7"
futures-io = "0.3.7"
error-chain = "0.12.4"
dirs = "3.0.1"
fnv = "1.0.7"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
smallvec = "1.6.1"
tokio-io-timeout = "0.5.0"
tokio-io-timeout = "1.1.1"
lru = "0.6.0"
parking_lot = "0.11.0"
sha2 = "0.9.1"
base64 = "0.13.0"
snap = "1.0.1"
void = "1.0.2"
hex = "0.4.2"
tokio-util = { version = "0.4.0", features = ["codec", "compat", "time"] }
tokio-util = { version = "0.6.2", features = ["codec", "compat", "time"] }
tiny-keccak = "2.0.2"
task_executor = { path = "../../common/task_executor" }
rand = "0.7.3"
Expand All @@ -41,14 +42,12 @@ regex = "1.3.9"
strum = { version = "0.20", features = ["derive"] }

[dependencies.libp2p]
#version = "0.23.0"
git = "https://github.com/sigp/rust-libp2p"
rev = "97000533e4710183124abde017c6c3d68287c1ae"
version = "0.34.0"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns", "tcp-tokio"]

[dev-dependencies]
tokio = { version = "0.3.2", features = ["full"] }
tokio = { version = "1.1.0", features = ["full"] }
slog-term = "2.6.0"
slog-async = "2.5.0"
tempfile = "3.1.0"
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Expand Up @@ -832,7 +832,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
if let Some((peer_id, reason)) = self.peers_to_dc.pop_front() {
return Poll::Ready(NBAction::NotifyHandler {
peer_id,
handler: NotifyHandler::All,
handler: NotifyHandler::Any,
event: BehaviourHandlerIn::Shutdown(
reason.map(|reason| (RequestId::Behaviour, RPCRequest::Goodbye(reason))),
),
Expand Down Expand Up @@ -893,7 +893,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
}

// perform gossipsub score updates when necessary
while let Poll::Ready(Some(_)) = self.update_gossipsub_scores.poll_next_unpin(cx) {
while let Poll::Ready(_) = self.update_gossipsub_scores.poll_tick(cx) {
self.peer_manager.update_gossipsub_scores(&self.gossipsub);
}

Expand Down
7 changes: 4 additions & 3 deletions beacon_node/eth2_libp2p/src/discovery/enr_ext.rs
Expand Up @@ -221,8 +221,9 @@ impl CombinedKeyExt for CombinedKey {
fn from_libp2p(key: &libp2p::core::identity::Keypair) -> Result<CombinedKey, &'static str> {
match key {
Keypair::Secp256k1(key) => {
let secret = discv5::enr::k256::ecdsa::SigningKey::new(&key.secret().to_bytes())
.expect("libp2p key must be valid");
let secret =
discv5::enr::k256::ecdsa::SigningKey::from_bytes(&key.secret().to_bytes())
.expect("libp2p key must be valid");
Ok(CombinedKey::Secp256k1(secret))
}
Keypair::Ed25519(key) => {
Expand Down Expand Up @@ -277,7 +278,7 @@ mod tests {
fn test_secp256k1_peer_id_conversion() {
let sk_hex = "df94a73d528434ce2309abb19c16aedb535322797dbd59c157b1e04095900f48";
let sk_bytes = hex::decode(sk_hex).unwrap();
let secret_key = discv5::enr::k256::ecdsa::SigningKey::new(&sk_bytes).unwrap();
let secret_key = discv5::enr::k256::ecdsa::SigningKey::from_bytes(&sk_bytes).unwrap();

let libp2p_sk = libp2p::identity::secp256k1::SecretKey::from_bytes(sk_bytes).unwrap();
let secp256k1_kp: libp2p::identity::secp256k1::Keypair = libp2p_sk.into();
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/eth2_libp2p/src/discovery/mod.rs
Expand Up @@ -896,7 +896,7 @@ impl<TSpec: EthSpec> Discovery<TSpec> {
}
EventStream::InActive => {} // ignore checking the stream
EventStream::Present(ref mut stream) => {
while let Ok(event) = stream.try_recv() {
while let Poll::Ready(Some(event)) = stream.poll_recv(cx) {
match event {
// We filter out unwanted discv5 events here and only propagate useful results to
// the peer manager.
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/eth2_libp2p/src/peer_manager/mod.rs
Expand Up @@ -972,7 +972,7 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// perform the heartbeat when necessary
while let Poll::Ready(Some(_)) = self.heartbeat.poll_next_unpin(cx) {
while let Poll::Ready(_) = self.heartbeat.poll_tick(cx) {
self.heartbeat();
}

Expand Down Expand Up @@ -1011,8 +1011,10 @@ impl<TSpec: EthSpec> Stream for PeerManager<TSpec> {
}
}

if !matches!(self.network_globals.sync_state(), SyncState::SyncingFinalized{..}|SyncState::SyncingHead{..})
{
if !matches!(
self.network_globals.sync_state(),
SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. }
) {
loop {
match self.status_peers.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => {
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/eth2_libp2p/src/peer_manager/peer_info.rs
Expand Up @@ -156,7 +156,10 @@ impl<T: EthSpec> PeerInfo<T> {

/// Checks if the status is connected.
pub fn is_connected(&self) -> bool {
matches!(self.connection_status, PeerConnectionStatus::Connected { .. })
matches!(
self.connection_status,
PeerConnectionStatus::Connected { .. }
)
}

/// Checks if the status is connected.
Expand Down
20 changes: 14 additions & 6 deletions beacon_node/eth2_libp2p/src/peer_manager/peer_sync_status.rs
Expand Up @@ -29,12 +29,20 @@ pub struct SyncInfo {

impl std::cmp::PartialEq for PeerSyncStatus {
fn eq(&self, other: &Self) -> bool {
matches!((self, other),
(PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. }) |
(PeerSyncStatus::Advanced { .. }, PeerSyncStatus::Advanced { .. }) |
(PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. }) |
(PeerSyncStatus::IrrelevantPeer, PeerSyncStatus::IrrelevantPeer) |
(PeerSyncStatus::Unknown, PeerSyncStatus::Unknown))
matches!(
(self, other),
(PeerSyncStatus::Synced { .. }, PeerSyncStatus::Synced { .. })
| (
PeerSyncStatus::Advanced { .. },
PeerSyncStatus::Advanced { .. }
)
| (PeerSyncStatus::Behind { .. }, PeerSyncStatus::Behind { .. })
| (
PeerSyncStatus::IrrelevantPeer,
PeerSyncStatus::IrrelevantPeer
)
| (PeerSyncStatus::Unknown, PeerSyncStatus::Unknown)
)
}
}

Expand Down
14 changes: 10 additions & 4 deletions beacon_node/eth2_libp2p/src/peer_manager/peerdb.rs
Expand Up @@ -137,14 +137,20 @@ impl<TSpec: EthSpec> PeerDB<TSpec> {

/// If we are connected or currently dialing the peer returns true.
pub fn is_connected_or_dialing(&self, peer_id: &PeerId) -> bool {
matches!(self.connection_status(peer_id), Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Dialing { .. }))
matches!(
self.connection_status(peer_id),
Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Dialing { .. })
)
}

/// If we are connected or in the process of disconnecting
pub fn is_connected_or_disconnecting(&self, peer_id: &PeerId) -> bool {
matches!(self.connection_status(peer_id), Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Disconnecting { .. }))
matches!(
self.connection_status(peer_id),
Some(PeerConnectionStatus::Connected { .. })
| Some(PeerConnectionStatus::Disconnecting { .. })
)
}

/// Returns true if the peer is synced at least to our current head.
Expand Down

0 comments on commit e20f64b

Please sign in to comment.