Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - libp2p upgrade + gossipsub interval fix #3012

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
381 changes: 217 additions & 164 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use eth2::{
};
use execution_layer::ExecutionLayer;
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::{open_metrics_client::registry::Registry, NetworkGlobals};
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
use monitoring_api::{MonitoringHttpClient, ProcessType};
use network::{NetworkConfig, NetworkMessage, NetworkService};
use slasher::Slasher;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
send_back_addr: EXTERNAL_ADDR.parse().unwrap(),
};
let con_id = ConnectionId::new(1);
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None);
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None, 0);
*network_globals.sync_state.write() = SyncState::Synced;

let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone());
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod metrics;

use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::open_metrics_client::registry::Registry;
use lighthouse_network::prometheus_client::registry::Registry;
use lighthouse_version::version_with_platform;
use serde::{Deserialize, Serialize};
use slog::{crit, info, Logger};
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Context;
use beacon_chain::BeaconChainTypes;
use lighthouse_metrics::{Encoder, TextEncoder};
use lighthouse_network::open_metrics_client::encoding::text::encode;
use lighthouse_network::prometheus_client::encoding::text::encode;
use malloc_utils::scrape_allocator_metrics;

pub use lighthouse_metrics::*;
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ directory = { path = "../../common/directory" }
regex = "1.3.9"
strum = { version = "0.21.0", features = ["derive"] }
superstruct = "0.4.0"
open-metrics-client = "0.14.0"
prometheus-client = "0.15.0"

[dependencies.libp2p]
version = "0.42.1"
git = "https://github.com/sigp/rust-libp2p"
# branch libp2p-gossipsub-interval-hotfix
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]

Expand Down
18 changes: 0 additions & 18 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,24 +927,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
}

fn inject_connected(&mut self, _peer_id: &PeerId) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_connection_established(
&mut self,
_peer_id: &PeerId,
_connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
) {
}
fn inject_connection_closed(
&mut self,
_: &PeerId,
_: &ConnectionId,
_connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) {
}
fn inject_event(
&mut self,
_: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use crate::types::{
SubnetDiscovery,
};

pub use open_metrics_client;
pub use prometheus_client;

pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
_connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
// Check NAT if metrics are enabled
Expand Down Expand Up @@ -172,8 +173,18 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.update_connected_peer_metrics();
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: DummyProtocolsHandler,
remaining_established: usize,
) {
if remaining_established > 0 {
return;
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
// There are no more connections
if self
.network_globals
Expand Down
37 changes: 13 additions & 24 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,36 +202,25 @@ where
}

// Use connection established/closed instead of these currently
fn inject_connected(&mut self, peer_id: &PeerId) {
// find the peer's meta-data
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
let rpc_event =
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: rpc_event,
});
}

fn inject_disconnected(&mut self, _peer_id: &PeerId) {}

fn inject_connection_established(
&mut self,
_peer_id: &PeerId,
peer_id: &PeerId,
_connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
}

fn inject_connection_closed(
&mut self,
_peer_id: &PeerId,
_: &ConnectionId,
_connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) {
if other_established == 0 {
// find the peer's meta-data
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
let rpc_event =
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: rpc_event,
});
}
}

fn inject_event(
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use libp2p::{
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use open_metrics_client::registry::Registry;
use prometheus_client::registry::Registry;
use slog::{crit, debug, info, o, trace, warn, Logger};
use ssz::Decode;
use std::fs::File;
Expand Down
127 changes: 87 additions & 40 deletions beacon_node/lighthouse_network/tests/common/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ where
inner: TInner,

pub addresses_of_peer: Vec<PeerId>,
pub inject_connected: Vec<PeerId>,
pub inject_disconnected: Vec<PeerId>,
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub inject_event: Vec<(
PeerId,
ConnectionId,
Expand All @@ -128,8 +126,6 @@ where
Self {
inner,
addresses_of_peer: Vec::new(),
inject_connected: Vec::new(),
inject_disconnected: Vec::new(),
inject_connection_established: Vec::new(),
inject_connection_closed: Vec::new(),
inject_event: Vec::new(),
Expand All @@ -148,8 +144,6 @@ where
#[allow(dead_code)]
pub fn reset(&mut self) {
self.addresses_of_peer = Vec::new();
self.inject_connected = Vec::new();
self.inject_disconnected = Vec::new();
self.inject_connection_established = Vec::new();
self.inject_connection_closed = Vec::new();
self.inject_event = Vec::new();
Expand All @@ -176,7 +170,13 @@ where
expected_disconnections: usize,
) -> bool {
if self.inject_connection_closed.len() == expected_closed_connections {
assert_eq!(self.inject_disconnected.len(), expected_disconnections);
assert_eq!(
self.inject_connection_closed
.iter()
.filter(|(.., remaining_established)| { *remaining_established == 0 })
.count(),
expected_disconnections
);
return true;
}

Expand All @@ -193,7 +193,15 @@ where
expected_connections: usize,
) -> bool {
if self.inject_connection_established.len() == expected_established_connections {
assert_eq!(self.inject_connected.len(), expected_connections);
assert_eq!(
self.inject_connection_established
.iter()
.filter(|(.., reported_aditional_connections)| {
*reported_aditional_connections == 0
})
.count(),
expected_connections
);
return true;
}

Expand All @@ -219,37 +227,45 @@ where
self.inner.addresses_of_peer(p)
}

fn inject_connected(&mut self, peer: &PeerId) {
assert!(
self.inject_connection_established
.iter()
.any(|(peer_id, _, _)| peer_id == peer),
"`inject_connected` is called after at least one `inject_connection_established`."
);
self.inject_connected.push(*peer);
self.inner.inject_connected(peer);
}

fn inject_connection_established(
&mut self,
p: &PeerId,
c: &ConnectionId,
e: &ConnectedPoint,
errors: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.inject_connection_established.push((*p, *c, e.clone()));
self.inner.inject_connection_established(p, c, e, errors);
}

fn inject_disconnected(&mut self, peer: &PeerId) {
assert!(
self.inject_connection_closed
.iter()
.any(|(peer_id, _, _)| peer_id == peer),
"`inject_disconnected` is called after at least one `inject_connection_closed`."
);
self.inject_disconnected.push(*peer);
self.inner.inject_disconnected(peer);
let mut other_peer_connections = self
.inject_connection_established
.iter()
.rev() // take last to first
.filter_map(|(peer, .., other_established)| {
if p == peer {
Some(other_established)
} else {
None
}
})
.take(other_established);

// We are informed that there are `other_established` additional connections. Ensure that the
// number of previous connections is consistent with this
if let Some(&prev) = other_peer_connections.next() {
if prev < other_established {
assert_eq!(
prev,
other_established - 1,
"Inconsistent connection reporting"
)
}
assert_eq!(other_peer_connections.count(), other_established - 1);
} else {
assert_eq!(other_established, 0)
}
self.inject_connection_established
.push((*p, *c, e.clone(), other_established));
self.inner
.inject_connection_established(p, c, e, errors, other_established);
}

fn inject_connection_closed(
Expand All @@ -258,15 +274,46 @@ where
c: &ConnectionId,
e: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
let connection = (*p, *c, e.clone());
let mut other_closed_connections = self
.inject_connection_established
.iter()
.rev() // take last to first
.filter_map(|(peer, .., remaining_established)| {
if p == peer {
Some(remaining_established)
} else {
None
}
})
.take(remaining_established);

// We are informed that there are `other_established` additional connections. Ensure that the
// number of previous connections is consistent with this
if let Some(&prev) = other_closed_connections.next() {
if prev < remaining_established {
assert_eq!(
prev,
remaining_established - 1,
"Inconsistent closed connection reporting"
)
}
assert_eq!(other_closed_connections.count(), remaining_established - 1);
} else {
assert_eq!(remaining_established, 0)
}
assert!(
self.inject_connection_established.contains(&connection),
self.inject_connection_established
.iter()
.any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)),
"`inject_connection_closed` is called only for connections for \
which `inject_connection_established` was called first."
);
self.inject_connection_closed.push(connection);
self.inner.inject_connection_closed(p, c, e, handler);
self.inject_connection_closed
.push((*p, *c, e.clone(), remaining_established));
self.inner
.inject_connection_closed(p, c, e, handler, remaining_established);
}

fn inject_event(
Expand All @@ -278,14 +325,14 @@ where
assert!(
self.inject_connection_established
.iter()
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
"`inject_event` is called for reported connections."
);
assert!(
!self
.inject_connection_closed
.iter()
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
"`inject_event` is never called for closed connections."
);

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::channel::mpsc::Sender;
use futures::future::OptionFuture;
use futures::prelude::*;
use lighthouse_network::{
open_metrics_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
};
use lighthouse_network::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Expand Down