Skip to content

Commit

Permalink
chore(net): support multiple eth protocol versions. (paradigmxyz#1152)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinsankim authored and literallymarvellous committed Feb 6, 2023
1 parent 4f375a1 commit c5e6673
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 5 deletions.
3 changes: 3 additions & 0 deletions crates/net/network/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,9 @@ async fn authenticate_stream(
};

// if the hello handshake was successful we can try status handshake
//
// Before trying status handshake, set up the version to shared_capability
let status = Status { version: p2p_stream.shared_capability().version(), ..status };
let eth_unauthed = UnauthedEthStream::new(p2p_stream);
let (eth_stream, their_status) = match eth_unauthed.handshake(status, fork_filter).await {
Ok(stream_res) => stream_res,
Expand Down
26 changes: 21 additions & 5 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::DisconnectReason;
use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder};
use reth_primitives::PeerId;
use reth_provider::{test_utils::NoopProvider, BlockProvider, HeaderProvider};
use secp256k1::SecretKey;
Expand Down Expand Up @@ -281,17 +281,33 @@ where
/// to any available IP and port.
pub fn new(client: Arc<C>) -> Self {
let secret_key = SecretKey::new(&mut rand::thread_rng());
Self::with_secret_key(client, secret_key)
let config = Self::network_config_builder(secret_key).build(Arc::clone(&client));
Self { config, client, secret_key }
}

/// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
/// available IP and port.
pub fn with_secret_key(client: Arc<C>, secret_key: SecretKey) -> Self {
let config = NetworkConfigBuilder::new(secret_key)
let config = Self::network_config_builder(secret_key).build(Arc::clone(&client));
Self { config, client, secret_key }
}

/// Initialize the network with a given capabilities.
pub fn with_capabilities(client: Arc<C>, capabilities: Vec<Capability>) -> Self {
let secret_key = SecretKey::new(&mut rand::thread_rng());

let builder = Self::network_config_builder(secret_key);
let hello_message =
HelloBuilder::new(builder.get_peer_id()).capabilities(capabilities).build();
let config = builder.hello_message(hello_message).build(Arc::clone(&client));

Self { config, client, secret_key }
}

fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
NetworkConfigBuilder::new(secret_key)
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
.build(Arc::clone(&client));
Self { config, client, secret_key }
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/net/network/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod connect;
mod requests;
mod session;

fn main() {}
87 changes: 87 additions & 0 deletions crates/net/network/tests/it/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//! Session tests

use futures::StreamExt;
use reth_eth_wire::{capability::Capability, EthVersion};
use reth_network::{
test_utils::{PeerConfig, Testnet},
NetworkEvent,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::test_utils::NoopProvider;
use std::sync::Arc;

#[tokio::test(flavor = "multi_thread")]
async fn test_session_established_with_highest_version() {
reth_tracing::init_test_tracing();

let net = Testnet::create(2).await;

net.for_each(|peer| assert_eq!(0, peer.num_peers()));

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);

let handle = net.spawn();

handle0.add_peer(*handle1.peer_id(), handle1.local_addr());

let mut events = handle0.event_listener().take(2);
while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth67 as u8);
}
_ => {
panic!("unexpected event")
}
}
}

handle.terminate().await;
}

#[tokio::test(flavor = "multi_thread")]
async fn test_session_established_with_different_capability() {
reth_tracing::init_test_tracing();

let mut net = Testnet::create(1).await;

let capabilities = vec![Capability::new("eth".into(), EthVersion::Eth66 as usize)];
let p1 = PeerConfig::with_capabilities(Arc::new(NoopProvider::default()), capabilities);
net.add_peer_with_config(p1).await.unwrap();

net.for_each(|peer| assert_eq!(0, peer.num_peers()));

let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);

let handle = net.spawn();

handle0.add_peer(*handle1.peer_id(), handle1.local_addr());

let mut events = handle0.event_listener().take(2);
while let Some(event) = events.next().await {
match event {
NetworkEvent::PeerAdded(peer_id) => {
assert_eq!(handle1.peer_id(), &peer_id);
}
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
assert_eq!(handle1.peer_id(), &peer_id);
assert_eq!(status.version, EthVersion::Eth66 as u8);
}
_ => {
panic!("unexpected event")
}
}
}

handle.terminate().await;
}

0 comments on commit c5e6673

Please sign in to comment.