Skip to content

Commit

Permalink
libp2p upgrade + gossipsub interval fix
Browse files Browse the repository at this point in the history
fix rpc NB impl

use commit hash

update tests

more test updates

more test updates
  • Loading branch information
divagant-martian committed Feb 10, 2022
1 parent 7e38d20 commit caf0f4b
Show file tree
Hide file tree
Showing 13 changed files with 340 additions and 256 deletions.
381 changes: 217 additions & 164 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use eth2::{
};
use execution_layer::ExecutionLayer;
use genesis::{interop_genesis_state, Eth1GenesisService, DEFAULT_ETH1_BLOCK_HASH};
use lighthouse_network::{open_metrics_client::registry::Registry, NetworkGlobals};
use lighthouse_network::{prometheus_client::registry::Registry, NetworkGlobals};
use monitoring_api::{MonitoringHttpClient, ProcessType};
use network::{NetworkConfig, NetworkMessage, NetworkService};
use slasher::Slasher;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub async fn create_api_server<T: BeaconChainTypes>(
send_back_addr: EXTERNAL_ADDR.parse().unwrap(),
};
let con_id = ConnectionId::new(1);
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None);
pm.inject_connection_established(&peer_id, &con_id, &connected_point, None, 0);
*network_globals.sync_state.write() = SyncState::Synced;

let eth1_service = eth1::Service::new(eth1::Config::default(), log.clone(), chain.spec.clone());
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
mod metrics;

use beacon_chain::{BeaconChain, BeaconChainTypes};
use lighthouse_network::open_metrics_client::registry::Registry;
use lighthouse_network::prometheus_client::registry::Registry;
use lighthouse_version::version_with_platform;
use serde::{Deserialize, Serialize};
use slog::{crit, info, Logger};
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_metrics/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Context;
use beacon_chain::BeaconChainTypes;
use lighthouse_metrics::{Encoder, TextEncoder};
use lighthouse_network::open_metrics_client::encoding::text::encode;
use lighthouse_network::prometheus_client::encoding::text::encode;
use malloc_utils::scrape_allocator_metrics;

pub use lighthouse_metrics::*;
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ directory = { path = "../../common/directory" }
regex = "1.3.9"
strum = { version = "0.21.0", features = ["derive"] }
superstruct = "0.4.0"
open-metrics-client = "0.14.0"
prometheus-client = "0.15.0"

[dependencies.libp2p]
version = "0.42.1"
git = "https://github.com/sigp/rust-libp2p"
# branch libp2p-gossipsub-interval-hotfix
rev = "e213703e616eaba3c482d7714775e0d37c4ae8e5"
default-features = false
features = ["websocket", "identify", "mplex", "yamux", "noise", "gossipsub", "dns-tokio", "tcp-tokio", "plaintext", "secp256k1"]

Expand Down
18 changes: 0 additions & 18 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,24 +927,6 @@ impl<TSpec: EthSpec> NetworkBehaviour for Discovery<TSpec> {
}
}

fn inject_connected(&mut self, _peer_id: &PeerId) {}
fn inject_disconnected(&mut self, _peer_id: &PeerId) {}
fn inject_connection_established(
&mut self,
_peer_id: &PeerId,
_connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
) {
}
fn inject_connection_closed(
&mut self,
_: &PeerId,
_: &ConnectionId,
_connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) {
}
fn inject_event(
&mut self,
_: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use crate::types::{
SubnetDiscovery,
};

pub use open_metrics_client;
pub use prometheus_client;

pub use behaviour::{BehaviourEvent, Gossipsub, PeerRequestId, Request, Response};
pub use config::Config as NetworkConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
_connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
debug!(self.log, "Connection established"; "peer_id" => %peer_id, "connection" => ?endpoint.to_endpoint());
// Check NAT if metrics are enabled
Expand Down Expand Up @@ -172,8 +173,18 @@ impl<TSpec: EthSpec> NetworkBehaviour for PeerManager<TSpec> {
self.update_connected_peer_metrics();
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: DummyProtocolsHandler,
remaining_established: usize,
) {
if remaining_established > 0 {
return;
}

fn inject_disconnected(&mut self, peer_id: &PeerId) {
// There are no more connections
if self
.network_globals
Expand Down
37 changes: 13 additions & 24 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,36 +202,25 @@ where
}

// Use connection established/closed instead of these currently
fn inject_connected(&mut self, peer_id: &PeerId) {
// find the peer's meta-data
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
let rpc_event =
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: rpc_event,
});
}

fn inject_disconnected(&mut self, _peer_id: &PeerId) {}

fn inject_connection_established(
&mut self,
_peer_id: &PeerId,
peer_id: &PeerId,
_connection_id: &ConnectionId,
_endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
}

fn inject_connection_closed(
&mut self,
_peer_id: &PeerId,
_: &ConnectionId,
_connected_point: &ConnectedPoint,
_handler: Self::ProtocolsHandler,
) {
if other_established == 0 {
// find the peer's meta-data
debug!(self.log, "Requesting new peer's metadata"; "peer_id" => %peer_id);
let rpc_event =
RPCSend::Request(RequestId::Behaviour, OutboundRequest::MetaData(PhantomData));
self.events.push(NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::Any,
event: rpc_event,
});
}
}

fn inject_event(
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/lighthouse_network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use libp2p::{
swarm::{SwarmBuilder, SwarmEvent},
PeerId, Swarm, Transport,
};
use open_metrics_client::registry::Registry;
use prometheus_client::registry::Registry;
use slog::{crit, debug, info, o, trace, warn, Logger};
use ssz::Decode;
use std::fs::File;
Expand Down
127 changes: 87 additions & 40 deletions beacon_node/lighthouse_network/tests/common/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ where
inner: TInner,

pub addresses_of_peer: Vec<PeerId>,
pub inject_connected: Vec<PeerId>,
pub inject_disconnected: Vec<PeerId>,
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint)>,
pub inject_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub inject_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub inject_event: Vec<(
PeerId,
ConnectionId,
Expand All @@ -128,8 +126,6 @@ where
Self {
inner,
addresses_of_peer: Vec::new(),
inject_connected: Vec::new(),
inject_disconnected: Vec::new(),
inject_connection_established: Vec::new(),
inject_connection_closed: Vec::new(),
inject_event: Vec::new(),
Expand All @@ -148,8 +144,6 @@ where
#[allow(dead_code)]
pub fn reset(&mut self) {
self.addresses_of_peer = Vec::new();
self.inject_connected = Vec::new();
self.inject_disconnected = Vec::new();
self.inject_connection_established = Vec::new();
self.inject_connection_closed = Vec::new();
self.inject_event = Vec::new();
Expand All @@ -176,7 +170,13 @@ where
expected_disconnections: usize,
) -> bool {
if self.inject_connection_closed.len() == expected_closed_connections {
assert_eq!(self.inject_disconnected.len(), expected_disconnections);
assert_eq!(
self.inject_connection_closed
.iter()
.filter(|(.., remaining_established)| { *remaining_established == 0 })
.count(),
expected_disconnections
);
return true;
}

Expand All @@ -193,7 +193,15 @@ where
expected_connections: usize,
) -> bool {
if self.inject_connection_established.len() == expected_established_connections {
assert_eq!(self.inject_connected.len(), expected_connections);
assert_eq!(
self.inject_connection_established
.iter()
.filter(|(.., reported_aditional_connections)| {
*reported_aditional_connections == 0
})
.count(),
expected_connections
);
return true;
}

Expand All @@ -219,37 +227,45 @@ where
self.inner.addresses_of_peer(p)
}

fn inject_connected(&mut self, peer: &PeerId) {
assert!(
self.inject_connection_established
.iter()
.any(|(peer_id, _, _)| peer_id == peer),
"`inject_connected` is called after at least one `inject_connection_established`."
);
self.inject_connected.push(*peer);
self.inner.inject_connected(peer);
}

fn inject_connection_established(
&mut self,
p: &PeerId,
c: &ConnectionId,
e: &ConnectedPoint,
errors: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.inject_connection_established.push((*p, *c, e.clone()));
self.inner.inject_connection_established(p, c, e, errors);
}

fn inject_disconnected(&mut self, peer: &PeerId) {
assert!(
self.inject_connection_closed
.iter()
.any(|(peer_id, _, _)| peer_id == peer),
"`inject_disconnected` is called after at least one `inject_connection_closed`."
);
self.inject_disconnected.push(*peer);
self.inner.inject_disconnected(peer);
let mut other_peer_connections = self
.inject_connection_established
.iter()
.rev() // take last to first
.filter_map(|(peer, .., other_established)| {
if p == peer {
Some(other_established)
} else {
None
}
})
.take(other_established);

// We are informed that there are `other_established` additional connections. Ensure that the
// number of previous connections is consistent with this
if let Some(&prev) = other_peer_connections.next() {
if prev < other_established {
assert_eq!(
prev,
other_established - 1,
"Inconsistent connection reporting"
)
}
assert_eq!(other_peer_connections.count(), other_established - 1);
} else {
assert_eq!(other_established, 0)
}
self.inject_connection_established
.push((*p, *c, e.clone(), other_established));
self.inner
.inject_connection_established(p, c, e, errors, other_established);
}

fn inject_connection_closed(
Expand All @@ -258,15 +274,46 @@ where
c: &ConnectionId,
e: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
remaining_established: usize,
) {
let connection = (*p, *c, e.clone());
let mut other_closed_connections = self
.inject_connection_established
.iter()
.rev() // take last to first
.filter_map(|(peer, .., remaining_established)| {
if p == peer {
Some(remaining_established)
} else {
None
}
})
.take(remaining_established);

// We are informed that there are `other_established` additional connections. Ensure that the
// number of previous connections is consistent with this
if let Some(&prev) = other_closed_connections.next() {
if prev < remaining_established {
assert_eq!(
prev,
remaining_established - 1,
"Inconsistent closed connection reporting"
)
}
assert_eq!(other_closed_connections.count(), remaining_established - 1);
} else {
assert_eq!(remaining_established, 0)
}
assert!(
self.inject_connection_established.contains(&connection),
self.inject_connection_established
.iter()
.any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) == (p, c, e)),
"`inject_connection_closed` is called only for connections for \
which `inject_connection_established` was called first."
);
self.inject_connection_closed.push(connection);
self.inner.inject_connection_closed(p, c, e, handler);
self.inject_connection_closed
.push((*p, *c, e.clone(), remaining_established));
self.inner
.inject_connection_closed(p, c, e, handler, remaining_established);
}

fn inject_event(
Expand All @@ -278,14 +325,14 @@ where
assert!(
self.inject_connection_established
.iter()
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
"`inject_event` is called for reported connections."
);
assert!(
!self
.inject_connection_closed
.iter()
.any(|(peer_id, conn_id, _)| *peer_id == p && c == *conn_id),
.any(|(peer_id, conn_id, ..)| *peer_id == p && c == *conn_id),
"`inject_event` is never called for closed connections."
);

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::channel::mpsc::Sender;
use futures::future::OptionFuture;
use futures::prelude::*;
use lighthouse_network::{
open_metrics_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
prometheus_client::registry::Registry, MessageAcceptance, Service as LibP2PService,
};
use lighthouse_network::{
rpc::{GoodbyeReason, RPCResponseErrorCode, RequestId},
Expand Down

0 comments on commit caf0f4b

Please sign in to comment.