Skip to content

Commit

Permalink
A0-1491: Change protocol naming convention and add block sync protocol (
Browse files Browse the repository at this point in the history
#829)

* Change protocol naming convention and add block sync protocol

* Turn the block sync network on properly

Co-authored-by: timorl <timorl@disroot.org>
  • Loading branch information
timorl and timorl committed Dec 27, 2022
1 parent 151f846 commit d59c6d4
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 74 deletions.
35 changes: 29 additions & 6 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use aleph_primitives::AlephSessionApi;
use aleph_runtime::{self, opaque::Block, RuntimeApi, MAX_BLOCK_SIZE};
use finality_aleph::{
run_nonvalidator_node, run_validator_node, AlephBlockImport, AlephConfig,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, SessionPeriod,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, ProtocolNaming, SessionPeriod,
};
use futures::channel::mpsc;
use log::warn;
use sc_client_api::{Backend, HeaderBackend};
use sc_client_api::{Backend, BlockBackend, HeaderBackend};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_slots::BackoffAuthoringBlocksStrategy;
use sc_network::NetworkService;
Expand Down Expand Up @@ -212,14 +212,35 @@ fn setup(
(
RpcHandlers,
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
ProtocolNaming,
NetworkStarter,
),
ServiceError,
> {
let genesis_hash = client
.block_hash(0)
.ok()
.flatten()
.expect("we should have a hash");
let chain_prefix = match config.chain_spec.fork_id() {
Some(fork_id) => format!("/{}/{}", genesis_hash, fork_id),
None => format!("/{}", genesis_hash),
};
let protocol_naming = ProtocolNaming::new(chain_prefix);
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(
protocol_naming.clone(),
Protocol::Authentication,
));
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Authentication));
.push(finality_aleph::peers_set_config(
protocol_naming.clone(),
Protocol::BlockSync,
));

let (network, system_rpc_tx, tx_handler_controller, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -262,7 +283,7 @@ fn setup(
telemetry: telemetry.as_mut(),
})?;

Ok((rpc_handlers, network, network_starter))
Ok((rpc_handlers, network, protocol_naming, network_starter))
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -308,7 +329,7 @@ pub fn new_authority(
let backoff_authoring_blocks = Some(LimitNonfinalized(aleph_config.max_nonfinalized_blocks()));
let prometheus_registry = config.prometheus_registry().cloned();

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -383,6 +404,7 @@ pub fn new_authority(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};
task_manager.spawn_essential_handle().spawn_blocking(
"aleph",
Expand Down Expand Up @@ -418,7 +440,7 @@ pub fn new_full(
.path(),
);

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -460,6 +482,7 @@ pub fn new_full(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};

task_manager.spawn_essential_handle().spawn_blocking(
Expand Down
19 changes: 10 additions & 9 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData, CURRENT_VERSION, LEGACY_VERSION},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::{data::split::Split, protocol_name},
network::data::split::Split,
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
Expand Down Expand Up @@ -51,7 +51,7 @@ pub use abft::{Keychain, NodeCount, NodeIndex, Recipient, SignatureSet, SpawnHan
pub use aleph_primitives::{AuthorityId, AuthorityPair, AuthoritySignature};
pub use import::AlephBlockImport;
pub use justification::{AlephJustification, JustificationNotification};
pub use network::Protocol;
pub use network::{Protocol, ProtocolNaming};
pub use nodes::{run_nonvalidator_node, run_validator_node};
pub use session::SessionPeriod;

Expand All @@ -67,21 +67,21 @@ enum Error {
}

/// Returns a NonDefaultSetConfig for the specified protocol.
pub fn peers_set_config(protocol: Protocol) -> sc_network_common::config::NonDefaultSetConfig {
let name = protocol_name(&protocol);

pub fn peers_set_config(
naming: ProtocolNaming,
protocol: Protocol,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut config = sc_network_common::config::NonDefaultSetConfig::new(
name,
naming.protocol_name(&protocol),
// max_notification_size should be larger than the maximum possible honest message size (in bytes).
// Max size of alert is UNIT_SIZE * MAX_UNITS_IN_ALERT ~ 100 * 5000 = 50000 bytes
// Max size of parents response UNIT_SIZE * N_MEMBERS ~ 100 * N_MEMBERS
// When adding other (large) message types we need to make sure this limit is fine.
1024 * 1024,
);

config.set_config = match protocol {
Protocol::Authentication => sc_network_common::config::SetConfig::default(),
};
config.set_config = sc_network_common::config::SetConfig::default();
config.add_fallback_names(naming.fallback_protocol_names(&protocol));
config
}

Expand Down Expand Up @@ -254,6 +254,7 @@ pub struct AlephConfig<B: Block, H: ExHashT, C, SC, BB> {
pub backup_saving_path: Option<PathBuf>,
pub external_addresses: Vec<String>,
pub validator_port: u16,
pub protocol_naming: ProtocolNaming,
}

pub trait BlockchainBackend<B: Block> {
Expand Down
5 changes: 4 additions & 1 deletion finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ pub trait Network<D: Data>: Send + 'static {
async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error>;
}

/// The Authentication protocol is used for validator discovery.
/// Protocols used by the network.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
/// The authentication protocol is used for validator discovery.
Authentication,
/// The block synchronization protocol.
BlockSync,
}

/// Abstraction over a sender to the raw network.
Expand Down
64 changes: 51 additions & 13 deletions finality-aleph/src/network/gossip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
pub struct Service<N: RawNetwork, D: Data> {
network: N,
messages_from_user: mpsc::UnboundedReceiver<Command<D, N::PeerId>>,
messages_for_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_authentication_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_block_sync_user: mpsc::UnboundedSender<(D, N::PeerId)>,
authentication_connected_peers: HashSet<N::PeerId>,
authentication_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
block_sync_connected_peers: HashSet<N::PeerId>,
block_sync_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
spawn_handle: SpawnTaskHandle,
}

struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
protocol: Protocol,
messages_from_service: mpsc::UnboundedReceiver<(D, P)>,
messages_for_service: mpsc::UnboundedSender<Command<D, P>>,
}
Expand Down Expand Up @@ -70,7 +74,7 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv

fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Send(data, peer_id, Protocol::Authentication))
.unbounded_send(Command::Send(data, peer_id, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -80,17 +84,13 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::SendToRandom(
data,
peer_ids,
Protocol::Authentication,
))
.unbounded_send(Command::SendToRandom(data, peer_ids, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

fn broadcast(&mut self, data: D) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Broadcast(data, Protocol::Authentication))
.unbounded_send(Command::Broadcast(data, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -115,20 +115,32 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> (
Service<N, D>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
) {
let (messages_for_user, messages_from_service) = mpsc::unbounded();
let (messages_for_authentication_user, messages_from_authentication_service) =
mpsc::unbounded();
let (messages_for_block_sync_user, messages_from_block_sync_service) = mpsc::unbounded();
let (messages_for_service, messages_from_user) = mpsc::unbounded();
(
Service {
network,
messages_from_user,
messages_for_user,
messages_for_authentication_user,
messages_for_block_sync_user,
spawn_handle,
authentication_connected_peers: HashSet::new(),
authentication_peer_senders: HashMap::new(),
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
},
ServiceInterface {
protocol: Protocol::Authentication,
messages_from_service: messages_from_authentication_service,
messages_for_service: messages_for_service.clone(),
},
ServiceInterface {
messages_from_service,
protocol: Protocol::BlockSync,
messages_from_service: messages_from_block_sync_service,
messages_for_service,
},
)
Expand All @@ -141,6 +153,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> Option<&mut TracingUnboundedSender<D>> {
match protocol {
Protocol::Authentication => self.authentication_peer_senders.get_mut(peer),
Protocol::BlockSync => self.block_sync_peer_senders.get_mut(peer),
}
}

Expand Down Expand Up @@ -211,6 +224,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
fn protocol_peers(&self, protocol: Protocol) -> &HashSet<N::PeerId> {
match protocol {
Protocol::Authentication => &self.authentication_connected_peers,
Protocol::BlockSync => &self.block_sync_connected_peers,
}
}

Expand Down Expand Up @@ -262,6 +276,12 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_peer_senders.insert(peer.clone(), tx);
rx
}
Protocol::BlockSync => {
let (tx, rx) = tracing_unbounded("mpsc_notification_stream_block_sync");
self.block_sync_connected_peers.insert(peer.clone());
self.block_sync_peer_senders.insert(peer.clone(), tx);
rx
}
};
self.spawn_handle.spawn(
"aleph/network/peer_sender",
Expand All @@ -276,19 +296,33 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_connected_peers.remove(&peer);
self.authentication_peer_senders.remove(&peer);
}
Protocol::BlockSync => {
self.block_sync_connected_peers.remove(&peer);
self.block_sync_peer_senders.remove(&peer);
}
}
}
Messages(peer_id, messages) => {
for (protocol, data) in messages.into_iter() {
match protocol {
Protocol::Authentication => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_user
.messages_for_authentication_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e)
}
},
// This is a bit of a placeholder for now, as we are not yet using this
// protocol. In the future we will not be using the same D as above.
Protocol::BlockSync => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_block_sync_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding block sync protocol message: {}", e)
}
},
};
}
}
Expand All @@ -303,6 +337,10 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
"authentication connected peers - {:?}; ",
self.authentication_connected_peers.len()
));
status.push_str(&format!(
"block sync connected peers - {:?}; ",
self.block_sync_connected_peers.len()
));

info!(target: "aleph-network", "{}", status);
}
Expand Down Expand Up @@ -379,7 +417,7 @@ mod tests {

// Prepare service
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network) =
let (service, gossip_network, _) =
Service::new(network.clone(), task_manager.spawn_handle());
let gossip_network = Box::new(gossip_network);

Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod tcp;
#[cfg(test)]
pub use gossip::mock::{MockEvent, MockRawNetwork};
pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService};
pub use substrate::protocol_name;
pub use substrate::{ProtocolNaming, SubstrateNetwork};

/// Represents the id of an arbitrary node.
pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send {
Expand Down

0 comments on commit d59c6d4

Please sign in to comment.